Oracle® Streams Advanced Queuing User's Guide and Reference Release 10.1 Part Number B10785-01 |
|
|
View PDF |
This chapter describes the Oracle Streams Advanced Queuing (AQ) basic operational interface.
This chapter contains these topics:
See Also:
|
This section contains these topics:
Adds a message to the specified queue.
DBMS_AQ.ENQUEUE ( queue_name IN VARCHAR2, payload IN "type_name", msgid OUT RAW);
If a message is enqueued to a multiconsumer queue with no recipient and the queue has no subscribers (or rule-based subscribers that match this message), then Oracle error ORA 24033 is raised. This is a warning that the message will be discarded because there are no recipients or subscribers to whom it can be delivered.
Examples are provided in the following programmatic environments:
PL/SQL: Enqueue a Single Message and Specify the Queue Name and Payload
PL/SQL: Enqueue a Single Message and Specify a Transformation
Specifies options available for the enqueue operation.
DBMS_AQ.ENQUEUE ( queue_name IN VARCHAR2, enqueue_options IN enqueue_options_t, message_properties IN message_properties_t, payload IN "type_name", msgid OUT RAW);
Do not use the immediate
option when you want to use LOB locators. LOB locators are valid only for the duration of the transaction. Your locator will not be valid, because the immediate
option automatically commits the transaction.
The sequence
deviation
parameter in enqueue options can be used to change the order of processing between two messages. The identity of the other message, if any, is specified by the enqueue options parameter relative msgid. The relationship is identified by the sequence
deviation
parameter.
Specifying sequence deviation
for a message introduces some restrictions for the delay and priority values that can be specified for this message. The delay of this message must be less than or equal to the delay of the message before which this message is to be enqueued. The priority of this message must be greater than or equal to the priority of the message before which this message is to be enqueued.
The visibility option must be immediate for nonpersistent queues.
Only local recipients are supported for nonpersistent queues.
If a transformation is specified, then it is applied to the message before enqueuing it to the queue. The transformation must map the message into an object whose type is the Oracle object type of the queue.
For secure queues, you must specify the sender_id
in the messages_properties
parameter. See "MESSAGE_PROPERTIES_T Type" in PL/SQL Packages and Types Reference for more information about sender_id
.
When you use secure queues, the following are required:
You must have created a valid Oracle Streams AQ agent using DBMS_AQADM.CREATE_AQ_AGENT
.
You must map sender_id
to a database user with enqueue privileges on the secure queue. Use DBMS_AQADM.ENABLE_DB_ACCESS
to do this.
Specifies message properties for the enqueue operation.
DBMS_AQ.ENQUEUE ( queue_name IN VARCHAR2, message_properties IN message_properties_t, payload IN "type_name", msgid OUT RAW);
Oracle Streams AQ uses message properties to manage individual messages. They are set when a message is enqueued, and their values are returned when the message is dequeued. To view messages in a waiting or processed state, you can either dequeue or browse by message ID, or use SELECT
statements.
Message delay and expiration are enforced by the queue monitor (QMN
) background processes. You must start the QMN processes for the database if you intend to use the delay and expiration features of Oracle Streams AQ.
Identifies the producer of a message.
DBMS_AQ.ENQUEUE ( queue_name IN VARCHAR2, payload IN "type_name", msgid OUT RAW);
To store a payload of type RAW
, Oracle Streams AQ creates a queue table with LOB
column as the payload repository. The maximum size of the payload is determined by which programmatic environment you use to access Oracle Streams AQ. For PL/SQL, Java and precompilers the limit is 32K; for the OCI the limit is 4G.
You must set up the following data structures for certain examples to work:
CONNECT system/manager CREATE USER aq IDENTIFIED BY aq; GRANT Aq_administrator_role TO aq; ***** CREATE TYPE ****** EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE ( Queue_table => 'aq.objsgs_qtab', Queue_payload_type => 'aq.message_typ'); EXECUTE DBMS_AQADM.CREATE_QUEUE ( Queue_name => 'aq.msg_queue', Queue_table => 'aq.objmsgs_qtab'); EXECUTE DBMS_AQADM.START_QUEUE ( Queue_name => 'aq.msg_queue', Enqueue => TRUE); EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE ( Queue_table => 'aq.prioritymsgs_qtab', Sort_list => 'PRIORITY,ENQ_TIME', Queue_payload_type => 'aq.message_typ'); EXECUTE DBMS_AQADM.CREATE_QUEUE ( Queue_name => 'aq.priority_msg_queue', Queue_table => 'aq.prioritymsgs_qtab'); EXECUTE DBMS_AQADM.START_QUEUE ( Queue_name => 'aq.priority_msg_queue', Enqueue => TRUE);
Example 10-1 PL/SQL: Enqueue a Single Message and Specify the Queue Name and Payload
/* Enqueue to msg_queue: */ DECLARE Enqueue_options DBMS_AQ.enqueue_options_t; Message_properties DBMS_AQ.message_properties_t; Message_handle RAW(16); Message aq.message_typ; BEGIN Message := aq.message_typ('NORMAL MESSAGE', 'enqueued to msg_queue first.'); DBMS_AQ.ENQUEUE(queue_name => 'msg_queue', Enqueue_options => enqueue_options, Message_properties => message_properties, Payload => message, Msgid => message_handle); COMMIT; END;
Example 10-2 PL/SQL: Enqueue a Single Message and Specify the Priority
/* The queue name priority_msg_queue is defined as an object type queue table. The payload object type is message. The schema of the queue is aq. */ /* Enqueue a message with priority 30: */ DECLARE Enqueue_options dbms_aq.enqueue_options_t; Message_properties dbms_aq.message_properties_t; Message_handle RAW(16); Message aq.Message_typ; BEGIN Message := Message_typ('PRIORITY MESSAGE', 'enqued at priority 30.'); message_properties.priority := 30; DBMS_AQ.ENQUEUE(queue_name => 'priority_msg_queue', enqueue_options => enqueue_options, message_properties => message_properties, payload => message, msgid => message_handle); COMMIT; END;
Example 10-3 PL/SQL: Enqueue a Single Message and Specify a Transformation
/* Enqueue to msg_queue: */ DECLARE Enqueue_options DBMS_AQ.enqueue_options_t; Message_properties DBMS_AQ.message_properties_t; Message_handle RAW(16); Message aq.message_typ; BEGIN Message := aq.message_typ('NORMAL MESSAGE', 'enqueued to msg_queue first.'); DBMS_AQ.ENQUEUE(queue_name => 'msg_queue', Enqueue_options => enqueue_options, Message_properties => message_properties, transformation => 'AQ.MSG_MAP', Payload => message, Msgid => message_handle); COMMIT; END;
Where MSG_MAP
was created as follows:
BEGIN DBMS.TRANSFORM.CREATE_TRANSFORMATION ( schema => 'AQ', name => 'MSG_MAP', from_schema => 'AQ', from_type => 'PO_ORDER1', to_schema => 'AQ', to_type => 'PO_ORDER2', transformation => 'AQ.MAP_PO_ORDER (source.user_data)'), END;
Example 10-4 Java (JDBC): Enqueue a Message and Add Payload
/* Setup */ connect system/manager CREATE USER aq IDENTIFIED BY aq; grant aq_administrator_role to aq; public static void setup(AQSession aq_sess) throws AQException { AQQueueTableProperty qtable_prop; AQQueueProperty queue_prop; AQQueueTable q_table; AQQueue queue; AQAgent agent; qtable_prop = new AQQueueTableProperty("RAW"); q_table = aq_sess.createQueueTable ("aq", "rawmsgs_qtab", qtable_prop); queue_prop = new AQQueueProperty(); queue = aq_sess.createQueue (q_table, "msg_queue", queue_prop); queue.start(); qtable_prop = new AQQueueTableProperty("RAW"); qtable_prop.setMultiConsumer(true); qtable_prop.setSortOrder("priority,enq_time"); q_table = aq_sess.createQueueTable ("aq", "rawmsgs_qtab2", qtable_prop); queue_prop = new AQQueueProperty(); queue = aq_sess.createQueue (q_table, "priority_msg_queue", queue_prop); queue.start(); agent = new AQAgent("subscriber1", null); queue.addSubscriber(agent, null); } /* Enqueue a message */ public static void example(AQSession aq_sess) throws AQException, SQLException { AQQueue queue; AQMessage message; AQRawPayload raw_payload; AQEnqueueOption enq_option; String test_data = "new message"; byte[] b_array; Connection db_conn; db_conn = ((AQOracleSession)aq_sess).getDBConnection(); /* Get a handle to the queue */ queue = aq_sess.getQueue ("aq", "msg_queue"); /* Create a message to contain raw payload: */ message = queue.createMessage(); /* Get handle to the AQRawPayload object and populate it with raw data: */ b_array = test_data.getBytes(); raw_payload = message.getRawPayload(); raw_payload.setStream(b_array, b_array.length); /* Create a AQEnqueueOption object with default options: */ enq_option = new AQEnqueueOption(); /* Enqueue the message: */ queue.enqueue(enq_option, message); db_conn.commit(); } /* Enqueue a message with priority = 5 */ public static void example(AQSession aq_sess) throws AQException, SQLException { AQQueue queue; AQMessage message; AQMessageProperty msg_prop; AQRawPayload raw_payload; AQEnqueueOption enq_option; String test_data = "priority message"; byte[] b_array; Connection db_conn; db_conn = ((AQOracleSession)aq_sess).getDBConnection(); /* Get a handle to the queue */ queue = aq_sess.getQueue ("aq", "msg_queue"); /* Create a message to contain raw payload: */ message = queue.createMessage(); /* Get Message property */ msg_prop = message.getMessageProperty(); /* Set priority */ msg_prop.setPriority(5); /* Get handle to the AQRawPayload object and populate it with raw data: */ b_array = test_data.getBytes(); raw_payload = message.getRawPayload(); raw_payload.setStream(b_array, b_array.length); /* Create a AQEnqueueOption object with default options: */ enq_option = new AQEnqueueOption(); /* Enqueue the message: */ queue.enqueue(enq_option, message); db_conn.commit(); }
Example 10-5 Visual Basic (OO4O): Enqueue a message
Enqueuing messages of type objects
'Prepare the message. MESSAGE_TYPE is a user-defined type ' in the "AQ" schema Set OraMsg = Q.AQMsg(1, "MESSAGE_TYPE") Set OraObj = DB.CreateOraObject("MESSAGE_TYPE") OraObj("subject").Value = "Greetings from OO4O" OraObj("text").Value = "Text of a message originated from OO4O" Msgid = Q.Enqueue
Enqueuing messages of type RAW
'Create an OraAQ object for the queue "DBQ" Dim Q as object Dim Msg as object Dim OraSession as object Dim DB as object Set OraSession = CreateObject("OracleInProcServer.XOraSession") Set OraDatabase = OraSession.OpenDatabase(mydb, "scott/tiger" 0&) Set Q = DB.CreateAQ("DBQ") 'Get a reference to the AQMsg object Set Msg = Q.AQMsg Msg.Value = "Enqueue the first message to a RAW queue." 'Enqueue the message Q.Enqueue() 'Enqueue another message. Msg.Value = "Another message" Q.Enqueue() 'Enqueue a message with nondefault properties. Msg.Priority = ORAQMSG_HIGH_PRIORITY Msg.Delay = 5 Msg.Value = "Urgent message" Q.Enqueue() Msg.Value = "The visibility option used in the enqueue call is ORAAQ_ENQ_IMMEDIATE" Q.Visible = ORAAQ_ENQ_IMMEDIATE Msgid = Q.Enqueue 'Enqueue Ahead of message Msgid_1 Msg.Value = "First Message to test Relative Message id" Msg.Correlation = "RELATIVE_MESSAGE_ID" Msgid_1 = Q.Enqueue Msg.Value = "Second message to test RELATIVE_MESSAGE_ID is queued ahead of the First Message " OraAq.relmsgid = Msgid_1 Msgid = Q.Enqueue
Use the ENQUEUE_ARRAY
function to enqueue an array of payloads using a corresponding array of message properties. The output is an array of message IDs of the enqueued messages. The function returns the number of messages successfully enqueued.
DBMS_AQ.ENQUEUE_ARRAY ( queue_name IN VARCHAR2, enqueue_options IN enqueue_options_t, array_size IN pls_integer, message_properties_array IN message_properties_array_t, payload_array IN VARRAY, msid_array OUT msgid_array_t) RETURN pls_integer;
The payload structure can be a VARRAY or nested table. The message IDs are returned into an array of RAW(16) entries of type DBMS_AQ.msgid_array_t
.
As with array operations in the relational world, it is not possible to provide a single optimum array size that will be correct in all circumstances. Application developers must experiment with different array sizes to determine the optimal value for their particular applications.
Examples are provided in the following programmatic environments:
Example 10-6 PL/SQL: Array Enqueuing into a Queue of Type Message
CREATE OR REPLACE TYPE message as OBJECT ( data VARCHAR2(10)) ;/ CREATE OR REPLACE TYPE message_tbl AS TABLE OF message;/ .... DECLARE enqopt dbms_aq.enqueue_options_t; msgproparr dbms_aq.message_properties_array_t; msgprop dbms_aq.message_properties_t; payloadarr message_tbl; msgidarr dbms_aq.msgid_array_t; retval pls_integer; BEGIN payloadarr := message_tbl(message('Oracle') ,message('Corp')) ; msgproparr := dbms_aq.message_properties_array_t(msgprop, msgprop); retval := dbms_aq.enqueue_array( queue_name => 'AQUSER.MY_QUEUE', enqueue_options => enqopt , array_size => 2, message_properties_array => msgproparr, payload_array => payloadarr, msgid_array => msgidarr ) ; commit; END;/
Example 10-7 C(OCI): Array Enqueuing into a Queue of Type Message
struct message{ OCIString *data; }; typedef struct message message; struct null_message{ OCIInd null_adt; OCIInd null_data; }; typedef struct null_message null_message; int main( argc, argv) int argc ; char **argv ;{ OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; OCISession *usrhp; dvoid *tmp; OCIType *mesg_tdo = (OCIType *) 0; message mesg[NMESGS]; message *mesgp[NMESGS]; null_message nmesg[NMESGS]; null_message *nmesgp[NMESGS]; int i, j, k; OCIInd ind[NMESGS]; dvoid *indptr[NMESGS]; ub4 priority; OCIAQEnqOptions *enqopt = (OCIAQEnqOptions *)0; OCIAQMsgProperties *msgprop= (OCIAQMsgProperties *)0; ub4 wait = 1; ub4 navigation = OCI_DEQ_NEXT_MSG; ub4 iters = 2; text *qname ; text mesgdata[30]; ub4 payload_size = 5; text *payload = (text *)0; ub4 batch_size = 2; ub4 enq_size = 2; printf("session start\n"); /* establish a session */ OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); printf("server attach\n"); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); /* set attribute server context in the service context */ OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"AQUSER", (ub4)strlen("AQUSER"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"AQUSER", (ub4)strlen("AQUSER"), OCI_ATTR_PASSWORD, errhp); checkerr(errhp, OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT)); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); /* get descriptor for enqueue options */ checkerr(errhp, OCIDescriptorAlloc(envhp, (dvoid **)&enqopt, OCI_DTYPE_AQENQ_OPTIONS, 0, (dvoid **)0)); printf("enq options set\n"); /* set enqueue options - for consumer name, wait and navigation */ /* construct null terminated payload string */ payload = (text *)malloc(payload_size+1); for (k=0 ; k < payload_size ; k++) payload[k] = 'a'; payload[payload_size] = '\0'; for (k=0 ; k < batch_size ; k++) { indptr[k] = &ind[k]; mesgp[k] = &mesg[k]; nmesgp[k] = &nmesg[k]; nmesg[k].null_adt = nmesg[k].null_data = OCI_IND_NOTNULL; mesg[k].data = (OCIString *)0; OCIStringAssignText(envhp, errhp, (const unsigned char *)payload, strlen((const char *)payload), &(mesg[k].data)); } printf("check message tdo\n"); checkerr(errhp, OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQUSER", strlen("AQUSER"), (CONST text *)"MESSAGE", strlen("MESSAGE"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo)); k=0; while (k < iters) { enq_size = batch_size; checkerr(errhp, OCIAQEnqArray(svchp, errhp, (dvoid *)"AQUSER.MY_QUEUE", (OCIAQEnqOptions *)0, &enq_size, 0, mesg_tdo, (dvoid **)&mesgp, (dvoid **)&nmesgp, 0, 0, 0, 0)); k+=batch_size; } checkerr(errhp, OCITransCommit(svchp, errhp, (ub4) 0)); checkerr(errhp, OCIServerDetach( srvhp, errhp, (ub4) OCI_DEFAULT)); return 0;}
Specifies which queue or queues to monitor
DBMS_AQ.LISTEN ( agent_list IN aq$_agent_list_t, wait IN BINARY_INTEGER DEFAULT DBMS_AQ.FOREVER, agent OUT sys.aq$_agent); TYPE aq$_agent_list_t IS TABLE of aq$_agent INDEXED BY BINARY_INTEGER;
The call takes a list of agents as an argument. You specify the queue to be monitored in the address field of each agent listed. You also must specify the name of the agent when monitoring multiconsumer queues. For single-consumer queues, an agent name must not be specified. Only local queues are supported as addresses. Protocol is reserved for future use.
This is a blocking call that returns when there is a message ready for consumption for an agent in the list. If there are messages for more than one agent, then only the first agent listed is returned. If there are no messages found when the wait time expires, then an error is raised.
A successful return from the listen
call is only an indication that there is a message for one of the listed agents in one of the specified queues. The interested agent must still dequeue the relevant message.
Examples are provided in the following programmatic environments:
Example 10-8 PL/SQL: Listen to Single-Consumer Queue (Timeout of Zero)
/* The listen call monitors a list of queues for messages for specific agents. You must have dequeue privileges for all the queues you wish to monitor. */ DECLARE Agent_w_msg aq$_agent; My_agent_list dbms_aq.agent_list_t; BEGIN /* NOTE: MCQ1, MCQ2, MCQ3 are multiconsumer queues in SCOTT's schema * SCQ1, SCQ2, SCQ3 are single-consumer queues in SCOTT's schema */ Qlist(1):= aq$_agent(NULL, 'scott.SCQ1', NULL); Qlist(2):= aq$_agent(NULL, 'SCQ2', NULL); Qlist(3):= aq$_agent(NULL, 'SCQ3', NULL); /* Listen with a timeout of zero: */ DBMS_AQ.LISTEN( Agent_list => My_agent_list, Wait => 0, Agent => agent_w_msg); DBMS_OUTPUT.PUT_LINE('Message in Queue :- ' || agent_w_msg.address); DBMS_OUTPUT.PUT_LINE(''); END;
Example 10-9 Java (JDBC): Listen to Queues
public static void monitor_status_queue(Connection db_conn) { AQSession aq_sess; AQAgent[] agt_list = null; AQAgent ret_agt = null; try { /* Create an AQ Session: */ aq_sess = AQDriverManager.createAQSession(db_conn); /* Construct the waiters list: */ agt_list = new AQAgent[3]; agt_list[0] = new AQAgent(null, "scott.SCQ1",0); agt_list[1] = new AQAgent (null, "SCQ2",0); agt_list[2] = new AQAgent (null, "SCQ3",0); /* Wait for order status messages for 120 seconds: */ ret_agt = aq_sess.listen(agt_list, 120); System.out.println("Message available for agent: " + ret_agt.getName() + " " + ret_agt.getAddress()); } catch (AQException aqex) { System.out.println("Exception-1: " + aqex); } catch (Exception ex) { System.out.println("Exception-2: " + ex); } }
Example 10-10 C (OCI): Listening for Single-Consumer Queues with Zero Timeout
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> static void checkerr(errhp, status) OCIError *errhp; sword status; { text errbuf[512]; ub4 buflen; sb4 errcode; switch (status) { case OCI_SUCCESS: break; case OCI_SUCCESS_WITH_INFO: printf("Error - OCI_SUCCESS_WITH_INFO\n"); break; case OCI_NEED_DATA: printf("Error - OCI_NEED_DATA\n"); break; case OCI_NO_DATA: printf("Error - OCI_NO_DATA\n"); break; case OCI_ERROR: OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode, errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR); printf("Error - %s\n", errbuf); break; case OCI_INVALID_HANDLE: printf("Error - OCI_INVALID_HANDLE\n"); break; case OCI_STILL_EXECUTING: printf("Error - OCI_STILL_EXECUTE\n"); break; case OCI_CONTINUE: printf("Error - OCI_CONTINUE\n"); break; default: break; } } /* set agent into descriptor */ void SetAgent(agent, appname, queue,errhp) OCIAQAgent *agent; text *appname; text *queue; OCIError *errhp; { OCIAttrSet(agent, OCI_DTYPE_AQAGENT, appname ? (dvoid *)appname : (dvoid *)"", appname ? strlen((const char *)appname) : 0, OCI_ATTR_AGENT_NAME, errhp); OCIAttrSet(agent, OCI_DTYPE_AQAGENT, queue ? (dvoid *)queue : (dvoid *)"", queue ? strlen((const char *)queue) : 0, OCI_ATTR_AGENT_ADDRESS, errhp); printf("Set agent name to %s\n", appname ? (char *)appname : "NULL"); printf("Set agent address to %s\n", queue ? (char *)queue : "NULL"); } /* get agent from descriptor */ void GetAgent(agent, errhp) OCIAQAgent *agent; OCIError *errhp; { text *appname; text *queue; ub4 appsz; ub4 queuesz; if (!agent ) { printf("agent was NULL \n"); return; } checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, (dvoid *)&appname, &appsz, OCI_ATTR_AGENT_NAME, errhp)); checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, (dvoid *)&queue, &queuesz, OCI_ATTR_AGENT_ADDRESS, errhp)); if (!appsz) printf("agent name: NULL\n"); else printf("agent name: %.*s\n", appsz, (char *)appname); if (!queuesz) printf("agent address: NULL\n"); else printf("agent address: %.*s\n", queuesz, (char *)queue); } int main() { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; OCISession *usrhp; OCIAQAgent *agent_list[3]; OCIAQAgent *agent = (OCIAQAgent *)0; /* added next 2 121598 */ int i; /* Standard OCI Initialization */ OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 0, (dvoid **) 0); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 0, (dvoid **) 0); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 0, (dvoid **) 0); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 0, (dvoid **) 0); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 0, (dvoid **) 0); /* set attribute server context in the service context */ OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *) usrhp, (ub4) OCI_HTYPE_SESSION, (dvoid *) "tiger", (ub4) strlen("tiger"), (ub4) OCI_ATTR_PASSWORD, errhp); OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); /* AQ LISTEN Initialization - allocate agent handles */ for (i = 0; i < 3; i++) { agent_list[i] = (OCIAQAgent *)0; OCIDescriptorAlloc(envhp, (dvoid **)&agent_list[i], OCI_DTYPE_AQAGENT, 0, (dvoid **)0); } /* * SCQ1, SCQ2, SCQ3 are single-consumer queues in SCOTT's schema */ SetAgent(agent_list[0], (text *)0, "SCOTT.SCQ1", errhp); SetAgent(agent_list[1], (text *)0, "SCOTT.SCQ2", errhp); SetAgent(agent_list[2], (text *)0, "SCOTT.SCQ3", errhp); checkerr(errhp,OCIAQListen(svchp, errhp, agent_list, 3, 0, &agent, 0)); printf("MESSAGE for :- \n"); GetAgent(agent, errhp); printf("\n"); }
Example 10-11 C (OCI): Listening for Single-Consumer Queues with Timeout of 120 Seconds
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> static void checkerr(errhp, status) OCIError *errhp; sword status; { text errbuf[512]; ub4 buflen; sb4 errcode; switch (status) { case OCI_SUCCESS: break; case OCI_SUCCESS_WITH_INFO: printf("Error - OCI_SUCCESS_WITH_INFO\n"); break; case OCI_NEED_DATA: printf("Error - OCI_NEED_DATA\n"); break; case OCI_NO_DATA: printf("Error - OCI_NO_DATA\n"); break; case OCI_ERROR: OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode, errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR); printf("Error - %s\n", errbuf); break; case OCI_INVALID_HANDLE: printf("Error - OCI_INVALID_HANDLE\n"); break; case OCI_STILL_EXECUTING: printf("Error - OCI_STILL_EXECUTE\n"); break; case OCI_CONTINUE: printf("Error - OCI_CONTINUE\n"); break; default: break; } } /* set agent into descriptor */ /* void SetAgent(agent, appname, queue) */ void SetAgent(agent, appname, queue,errhp) OCIAQAgent *agent; text *appname; text *queue; OCIError *errhp; { OCIAttrSet(agent, OCI_DTYPE_AQAGENT, appname ? (dvoid *)appname : (dvoid *)"", appname ? strlen((const char *)appname) : 0, OCI_ATTR_AGENT_NAME, errhp); OCIAttrSet(agent, OCI_DTYPE_AQAGENT, queue ? (dvoid *)queue : (dvoid *)"", queue ? strlen((const char *)queue) : 0, OCI_ATTR_AGENT_ADDRESS, errhp); printf("Set agent name to %s\n", appname ? (char *)appname : "NULL"); printf("Set agent address to %s\n", queue ? (char *)queue : "NULL"); } /* get agent from descriptor */ void GetAgent(agent, errhp) OCIAQAgent *agent; OCIError *errhp; { text *appname; text *queue; ub4 appsz; ub4 queuesz; if (!agent ) { printf("agent was NULL \n"); return; } checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, (dvoid *)&appname, &appsz, OCI_ATTR_AGENT_NAME, errhp)); checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, (dvoid *)&queue, &queuesz, OCI_ATTR_AGENT_ADDRESS, errhp)); if (!appsz) printf("agent name: NULL\n"); else printf("agent name: %.*s\n", appsz, (char *)appname); if (!queuesz) printf("agent address: NULL\n"); else printf("agent address: %.*s\n", queuesz, (char *)queue); } int main() { OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; OCISession *usrhp; OCIAQAgent *agent_list[3]; OCIAQAgent *agent = (OCIAQAgent *)0; /* added next 2 121598 */ int i; /* Standard OCI Initialization */ OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 0, (dvoid **) 0); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 0, (dvoid **) 0); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 0, (dvoid **) 0); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 0, (dvoid **) 0); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 0, (dvoid **) 0); /* set attribute server context in the service context */ OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *) usrhp, (ub4) OCI_HTYPE_SESSION, (dvoid *) "tiger", (ub4) strlen("tiger"), (ub4) OCI_ATTR_PASSWORD, errhp); OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); /* AQ LISTEN Initialization - allocate agent handles */ for (i = 0; i < 3; i++) { agent_list[i] = (OCIAQAgent *)0; OCIDescriptorAlloc(envhp, (dvoid **)&agent_list[i], OCI_DTYPE_AQAGENT, 0, (dvoid **)0); } /* * SCQ1, SCQ2, SCQ3 are single-consumer queues in SCOTT's schema */ SetAgent(agent_list[0], (text *)0, "SCOTT.SCQ1", errhp); SetAgent(agent_list[1], (text *)0, "SCOTT.SCQ2", errhp); SetAgent(agent_list[2], (text *)0, "SCOTT.SCQ3", errhp); checkerr(errhp,OCIAQListen(svchp, errhp, agent_list, 3, 120, &agent, 0)); printf("MESSAGE for :- \n"); GetAgent(agent, errhp); printf("\n"); }
This section contains these topics:
Dequeuing a Message from a Single-Consumer Queue and Specifying Options
Dequeuing a Message from a Multiconsumer Queue and Specifying Options
Dequeues a message from the specified queue.
DBMS_AQ.DEQUEUE ( queue_name IN VARCHAR2, dequeue_options IN dequeue_options_t, message_properties OUT message_properties_t, payload OUT "type_name", msgid OUT RAW);
The search criteria for messages to be dequeued is determined by the consumer name
, msgid
and correlation
parameters in the dequeue options. Parameter msgid
uniquely identifies the message to be dequeued. Only messages in the READY
state are dequeued unless msgid
is specified. Correlation identifiers are application-defined identifiers that are not interpreted by Oracle Streams AQ.
The dequeue order is determined by the values specified at the time the queue table is created unless overridden by the message ID and correlation ID in dequeue options.
The database consistent read mechanism is applicable for queue operations. For example, a BROWSE
call may not see a message that is enqueued after the beginning of the browsing transaction.
The default NAVIGATION
parameter during dequeue is NEXT
MESSAGE
. This means that subsequent dequeues retrieve the messages from the queue based on the snapshot obtained in the first dequeue. In particular, a message that is enqueued after the first dequeue command is processed only after processing all the remaining messages in the queue. This is usually sufficient when all the messages have already been enqueued into the queue, or when the queue does not have a priority-based ordering. However, applications must use the FIRST_MESSAGE n
avigation option when the first message in the queue must be processed by every dequeue command. This usually becomes necessary when a higher priority message arrives in the queue while messages already enqueued are being processed.
Note: It can also be more efficient to use theFIRST_MESSAGE navigation option when there are messages being concurrently enqueued. If the FIRST_MESSAGE option is not specified, then Oracle Streams AQ continually generates the snapshot as of the first dequeue command, leading to poor performance. If the FIRST_MESSAGE option is specified, then Oracle Streams AQ uses a new snapshot for every dequeue command. |
Messages enqueued in the same transaction into a queue that has been enabled for message grouping form a group. If only one message is enqueued in the transaction, then this effectively forms a group of one message. There is no upper limit to the number of messages that can be grouped in a single transaction.
In queues that have not been enabled for message grouping, a dequeue in LOCKED
or REMOVE
mode locks only a single message. By contrast, a dequeue operation that seeks to dequeue a message that is part of a group locks the entire group. This is useful when all the messages in a group must be processed as a unit.
When all the messages in a group have been dequeued, the dequeue returns an error indicating that all messages in the group have been processed. The application can then use the NEXT
TRANSACTION
to start dequeuing messages from the next available group. In the event that no groups are available, the dequeue times out after the specified WAIT
period.
Examples are provided in the following programmatic environments:
Specifies the options available for the dequeue operation.
Typically, you expect the consumer of messages to access messages using the dequeue interface. You can view processed messages or messages still to be processed by browsing by message ID or by using SELECT
commands.
The transformation, if specified, is applied before returning the message to the caller. The transformation should be defined to map the queue Oracle object type to the return type wanted by the caller.
Examples are provided in the following programmatic environments:
Example 10-12 PL/SQL: Dequeue of Object Type Messages
/* Dequeue from msg_queue: */ DECLARE dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message aq.message_typ; BEGIN DBMS_AQ.DEQUEUE( queue_name => 'msg_queue', dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text ); COMMIT; END;
Example 10-13 Java (JDBC): Dequeue a message from a single-consumer queue (specify options)
/* Dequeue a message with correlation ID = 'RUSH' */ public static void example(AQSession aq_sess) throws AQException, SQLException { AQQueue queue; AQMessage message; AQRawPayload raw_payload; AQDequeueOption deq_option; byte[] b_array; Connection db_conn; db_conn = ((AQOracleSession)aq_sess).getDBConnection(); queue = aq_sess.getQueue ("aq", "msg_queue"); /* Create a AQDequeueOption object with default options: */ deq_option = new AQDequeueOption(); deq_option.setCorrelation("RUSH"); /* Dequeue a message */ message = queue.dequeue(deq_option); System.out.println("Successful dequeue"); /* Retrieve raw data from the message: */ raw_payload = message.getRawPayload(); b_array = raw_payload.getBytes(); db_conn.commit(); }
Example 10-14 Visual Basic (OO4O): Dequeue a message
Dequeuing messages of RAW type
'Dequeue the first message available Q.Dequeue() Set Msg = Q.QMsg 'Display the message content MsgBox Msg.Value 'Dequeue the first message available without removing it ' from the queue Q.DequeueMode = ORAAQ_DEQ_BROWSE 'Dequeue the first message with the correlation identifier ' equal to "RELATIVE_MSG_ID" Q.Navigation = ORAAQ_DQ_FIRST_MSG Q.correlate = "RELATIVE_MESSAGE_ID" Q.Dequeue 'Dequeue the next message with the correlation identifier ' of "RELATIVE_MSG_ID" Q.Navigation = ORAAQ_DQ_NEXT_MSG Q.Dequeue() 'Dequeue the first high priority message Msg.Priority = ORAQMSG_HIGH_PRIORITY Q.Dequeue() 'Dequeue the message enqueued with message ID of Msgid_1 Q.DequeueMsgid = Msgid_1 Q.Dequeue() 'Dequeue the message meant for "ANDY" Q.consumer = "ANDY" Q.Dequeue() 'Return immediately if there is no message on the queue Q.wait = ORAAQ_DQ_NOWAIT Q.Dequeue()
Dequeuing messages of Oracle object type
Set OraObj = DB.CreateOraObject("MESSAGE_TYPE") Set QMsg = Q.AQMsg(1, "MESSAGE_TYPE") 'Dequeue the first message available without removing it Q.Dequeue() OraObj = QMsg.Value 'Display the subject and data MsgBox OraObj!subject & OraObj!Data
Specifies the options available for the dequeue operation.
See "Dequeuing a Message from a Single-Consumer Queue and Specifying Options ".
Examples are provided in the following programmatic environments:
Example 10-15 Java (JDBC): Dequeue a message from a multiconsumer queue (specify options)
/* Dequeue a message for subscriber1 in browse mode*/ public static void example(AQSession aq_sess) throws AQException, SQLException { AQQueue queue; AQMessage message; AQRawPayload raw_payload; AQDequeueOption deq_option; byte[] b_array; Connection db_conn; db_conn = ((AQOracleSession)aq_sess).getDBConnection(); queue = aq_sess.getQueue ("aq", "priority_msg_queue"); /* Create a AQDequeueOption object with default options: */ deq_option = new AQDequeueOption(); /* Set dequeue mode to BROWSE */ deq_option.setDequeueMode(AQDequeueOption.DEQUEUE_BROWSE); /* Dequeue messages for subscriber1 */ deq_option.setConsumerName("subscriber1"); /* Dequeue a message: */ message = queue.dequeue(deq_option); System.out.println("Successful dequeue"); /* Retrieve raw data from the message: */ raw_payload = message.getRawPayload(); b_array = raw_payload.getBytes(); db_conn.commit(); }
Use the DEQUEUE_ARRAY
function to dequeue an array of payloads and a corresponding array of message properties. The output is an array of payloads, message IDs, and message properties of the dequeued messages. The function returns the number of messages successfully dequeued.
DBMS_AQ.DEQUEUE_ARRAY ( queue_name IN VARCHAR2, dequeue_options IN dequeue_options_t, array_size IN pls_integer, message_properties_array OUT message_properties_array_t, payload_array OUT VARRAY, msgid_array OUT msgid_array_t) RETURN pls_integer;
A nonzero wait time, as specified in dequeue_options
, is recognized only when there are no messages in the queue. If the queue contains messages that are eligible for dequeue, then the DEQUEUE_ARRAY
function will dequeue up to array_size
messages and return immediately.
The payload structure can be a VARRAY or nested table. The message IDs are returned into an array of RAW(16) entries of type DBMS_AQ.msgid_array_t
. The message properties are returned into an array of type DBMS_AQ.message_properties_array_t
.
As with array operations in the relational world, it is not possible to provide a single optimum array size that will be correct in all circumstances. Application developers must experiment with different array sizes to determine the optimal value for their particular applications.
When dequeuing messages, you might want to dequeue all the messages for a transaction group with a single call. You might also want to dequeue messages that span multiple transaction groups. You can specify either of these methods by using one of the following navigation methods:
NEXT_MESSAGE_ONE_GROUP
FIRST_MESSAGE_ONE_GROUP
NEXT_MESSAGE_MULTI_GROUP
FIRST_MESSAGE_MULTI_GROUP
Navigation method NEXT_MESSAGE_ONE_GROUP
dequeues messages that match the search criteria from the next available transaction group into an array. navigation method FIRST_MESSAGE_ONE_GROUP
resets the position to the beginning of the queue and dequeues all the messages in a single transaction group that are available and match the search criteria.
The number of messages dequeued is determined by an array size limit. If the number of messages in the transaction group exceeds array_size
, then multiple calls to DEQUEUE_ARRAY
must be made to dequeue all the messages for the transaction group.
Navigation methods NEXT_MESSAGE_MULTI_GROUP
and FIRST_MESSAGE_MULTI_GROUP
work like their ONE_GROUP
counterparts, but they are not limited to a single transaction group. Each message that is dequeued into the array has an associated set of message properties. Message property transaction_group
determines which messages belong to the same transaction group.
Examples are provided in the following programmatic environments:
Example 10-16 PL/SQL: Array Dequeuing from a Queue of Type Message
CREATE OR REPLACE TYPE message as OBJECT (data VARCHAR2(10)); / CREATE OR REPLACE TYPE message_arr AS VARRAY(2000) OF message;/ .... DECLARE deqopt dbms_aq.dequeue_options_t ; msgproparr dbms_aq.message_properties_array_t := dbms_aq.message_properties_array_t(); payloadarr message_arr := message_arr() ; msgidarr dbms_aq.msgid_array_t ; retval pls_integer ; BEGIN payloadarr.extend(2); msgproparr.extend(2); deqopt.consumer_name := 'SUB1'; retval := dbms_aq.dequeue_array( queue_name => 'AQUSER.MY_QUEUE', dequeue_options => deqopt , array_size => payloadarr.count, message_properties_array => msgproparr, payload_array => payloadarr, msgid_array => msgidarr ) ; END;/
Example 10-17 C(OCI): Array Dequeuing from a Queue of Type Message
struct message{ OCIString *data; }; typedef struct message message; struct null_message{ OCIInd null_adt; OCIInd null_data; }; typedef struct null_message null_message; int main(argc, argv) int argc; char **argv;{ OCIEnv *envhp; OCIServer *srvhp; OCIError *errhp; OCISvcCtx *svchp; OCISession *usrhp; dvoid *tmp; message *mesgp[NMESGS]; int i, j, k; null_message *nmesgp[NMESGS]; ub4 priority = 0; OCIAQDeqOptions *deqopt = (OCIAQDeqOptions *)0; ub4 iters = 2; OCIType *mesg_tdo = (OCIType *) 0; ub4 batch_size = 2; ub4 deq_size = batch_size; printf("session start\n"); /* establish a session */ OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)()) 0, (dvoid * (*)()) 0, (void (*)()) 0 ); OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV, 52, (dvoid **) &tmp); OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp ); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR, 52, (dvoid **) &tmp); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER, 52, (dvoid **) &tmp); printf("server attach\n"); OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT); OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX, 52, (dvoid **) &tmp); /* set attribute server context in the service context */ OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, (ub4) OCI_ATTR_SERVER, (OCIError *) errhp); /* allocate a user context handle */ OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"AQUSER", (ub4)strlen("AQUSER"), OCI_ATTR_USERNAME, errhp); OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION, (dvoid *)"AQUSER", (ub4)strlen("AQUSER"), OCI_ATTR_PASSWORD, errhp); checkerr(errhp, OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT)); OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX, (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp); /* get descriptor for dequeue options */ checkerr(errhp, OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **)0)); printf("deq options set\n"); /* set dequeue options - for consumer name, wait and navigation */ checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)"SUB1", (ub4)strlen("SUB1"), OCI_ATTR_CONSUMER_NAME, errhp)); for (k=0 ; k < NMESGS ; k++) { mesgp[k] = 0; nmesgp[k] = 0; } printf("check message tdo\n"); checkerr(errhp, OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQUSER", strlen("AQUSER"), (CONST text *)"MESSAGE", strlen("MESSAGE"), (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo)); k=0; while (k < iters) { deq_size = batch_size; checkerr(errhp, OCIAQDeqArray(svchp, errhp, (text *)"AQUSER.MY_QUEUE", (OCIAQDeqOptions *)deqopt, &deq_size, 0, mesg_tdo, (dvoid **)mesgp, (dvoid **)nmesgp, 0, 0, 0, 0)); k+=batch_size; } checkerr(errhp, OCITransCommit(svchp, errhp, (ub4) 0)); checkerr(errhp, OCIServerDetach( srvhp, errhp, (ub4) OCI_DEFAULT)); return 0;}
Registers a callback for message notification.
DBMS_AQ.REGISTER ( reg_list IN SYS.AQ$_REG_INFO_LIST, count IN NUMBER);
This call is invoked for registration to a subscription which identifies the subscription name of interest and the associated callback to be invoked. Interest in several subscriptions can be registered at one time.
This interface is only valid for the asynchronous mode of message delivery. In this mode, a subscriber applies a registration call which specifies a callback. When messages are received that match the subscription criteria, the callback is invoked. The callback can then apply an explicit message_receive
(dequeue) to retrieve the message.
The user must specify a subscription handle at registration time with the namespace attribute set to OCI_SUBSCR_NAMESPACE_AQ
.
The subscription name is the string schema
.queue
if the registration is for a single-consumer queue and schema.queue:consumer_name
if the registration is for a multiconsumer queues.
Related Functions: OCIAQListen
(), OCISubscriptionDisable
(), OCISubscriptionEnable
(), OCISubscriptionUnRegister
()
Example 10-18 C (OCI): Register for Notifications For Single-Consumer and Multiconsumer Queries
/* OCIRegister can be used by the client to register to receive notifications when messages are enqueued into nonpersistent and usual queues. */ #include <stdio.h> #include <stdlib.h> #include <string.h> #include <oci.h> static OCIEnv *envhp; static OCIServer *srvhp; static OCIError *errhp; static OCISvcCtx *svchp; /* The callback that gets invoked on notification */ ub4 notifyCB(ctx, subscrhp, pay, payl, desc, mode) dvoid *ctx; OCISubscription *subscrhp; /* subscription handle */ dvoid *pay; /* payload */ ub4 payl; /* payload length */ dvoid *desc; /* the AQ notification descriptor */ ub4 mode; { text *subname; ub4 size; ub4 *number = (ub4 *)ctx; text *queue; text *consumer; OCIRaw *msgid; OCIAQMsgProperties *msgprop; (*number)++; /* Get the subscription name */ OCIAttrGet((dvoid *)subscrhp, OCI_HTYPE_SUBSCRIPTION, (dvoid *)&subname, &size, OCI_ATTR_SUBSCR_NAME, errhp); printf("got notification number %d for %.*s %d \n", *number, size, subname, payl); /* Get the queue name from the AQ notify descriptor */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&queue, &size, OCI_ATTR_QUEUE_NAME, errhp); /* Get the consumer name for which this notification was received */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&consumer, &size, OCI_ATTR_CONSUMER_NAME, errhp); /* Get the message ID of the message for which we were notified */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgid, &size, OCI_ATTR_NFY_MSGID, errhp); /* Get the message properties of the message for which we were notified */ OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgprop, &size, OCI_ATTR_MSG_PROP, errhp); } int main(argc, argv) int argc; char *argv[]; { OCISession *authp = (OCISession *) 0; /* The subscription handles */ OCISubscription *subscrhp[5]; /* Registrations are for AQ namespace */ ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ; /* The context fot the callback */ ub4 ctx[5] = {0,0,0,0,0}; printf("Initializing OCI Process\n"); /* The OCI Process Environment must be initialized with OCI_EVENTS */ /* OCI_OBJECT flag is set to enable us dequeue */ (void) OCIInitialize((ub4) OCI_EVENTS|OCI_OBJECT, (dvoid *)0, (dvoid * (*)(dvoid *, size_t)) 0, (dvoid * (*)(dvoid *, dvoid *, size_t))0, (void (*)(dvoid *, dvoid *)) 0 ); printf("Initialization successful\n"); /* The standard OCI setup */ printf("Initializing OCI Env\n"); (void) OCIEnvInit((OCIEnv **) &envhp, OCI_DEFAULT, (size_t) 0, (dvoid **) 0 ); (void) OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, OCI_HTYPE_ERROR, (size_t) 0, (dvoid **) 0); /* Server contexts */ (void) OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, OCI_HTYPE_SERVER, (size_t) 0, (dvoid **) 0); (void) OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, OCI_HTYPE_SVCCTX, (size_t) 0, (dvoid **) 0); printf("connecting to server\n"); (void) OCIServerAttach( srvhp, errhp, (text *)"", strlen(""), 0); printf("connect successful\n"); /* Set attribute server context in the service context */ (void) OCIAttrSet( (dvoid *) svchp, OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0, OCI_ATTR_SERVER, (OCIError *) errhp); (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&authp, (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0); (void) OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION, (dvoid *) "scott", (ub4) strlen("scott"), (ub4) OCI_ATTR_USERNAME, errhp); (void) OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION, (dvoid *) "tiger", (ub4) strlen("tiger"), (ub4) OCI_ATTR_PASSWORD, errhp); checkerr(errhp, OCISessionBegin ( svchp, errhp, authp, OCI_CRED_RDBMS, (ub4) OCI_DEFAULT)); (void) OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *) authp, (ub4) 0, (ub4) OCI_ATTR_SESSION, errhp); /* Setting the subscription handle for notification on a NORMAL single-consumer queue */ printf("allocating subscription handle\n"); subscrhp[0] = (OCISubscription *)0; (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (size_t) 0, (dvoid **) 0); printf("setting subscription name\n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) "SCOTT.SCQ1", (ub4) strlen("SCOTT.SCQ1"), (ub4) OCI_ATTR_SUBSCR_NAME, errhp); printf("setting subscription callback\n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) notifyCB, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); printf("setting subscription context \n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *)&ctx[0], (ub4)sizeof(ctx[0]), (ub4) OCI_ATTR_SUBSCR_CTX, errhp); printf("setting subscription namespace\n"); (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) &namespace, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); /* Setting the subscription handle for notification on a NORMAL multiconsumer consumer queue */ subscrhp[1] = (OCISubscription *)0; (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (size_t) 0, (dvoid **) 0); (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) "SCOTT.MCQ1:APP1", (ub4) strlen("SCOTT.MCQ1:APP1"), (ub4) OCI_ATTR_SUBSCR_NAME, errhp); (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) notifyCB, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *)&ctx[1], (ub4)sizeof(ctx[1]), (ub4) OCI_ATTR_SUBSCR_CTX, errhp); (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) &namespace, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); /* Setting the subscription handle for notification on a nonpersistent single-consumer queue */ subscrhp[2] = (OCISubscription *)0; (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, (size_t) 0, (dvoid **) 0); (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) "SCOTT.NP_SCQ1", (ub4) strlen("SCOTT.NP_SCQ1"), (ub4) OCI_ATTR_SUBSCR_NAME, errhp); (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) notifyCB, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *)&ctx[2], (ub4)sizeof(ctx[2]), (ub4) OCI_ATTR_SUBSCR_CTX, errhp); (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) &namespace, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); /* Setting the subscription handle for notification on a nonpersistent multi consumer queue */ /* Waiting on user specified recipient */ subscrhp[3] = (OCISubscription *)0; (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION, (size_t) 0, (dvoid **) 0); (void) OCIAttrSet((dvoid *) subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) "SCOTT.NP_MCQ1", (ub4) strlen("SCOTT.NP_MCQ1"), (ub4) OCI_ATTR_SUBSCR_NAME, errhp); (void) OCIAttrSet((dvoid *) subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) notifyCB, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); (void) OCIAttrSet((dvoid *) subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *)&ctx[3], (ub4)sizeof(ctx[3]), (ub4) OCI_ATTR_SUBSCR_CTX, errhp); (void) OCIAttrSet((dvoid *) subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION, (dvoid *) &namespace, (ub4) 0, (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); printf("Registering for all the subscriptiosn \n"); checkerr(errhp, OCISubscriptionRegister(svchp, subscrhp, 4, errhp, OCI_DEFAULT)); printf("Waiting for notifcations \n"); /* wait for minutes for notifications */ sleep(300); printf("Exiting\n"); }
Posts to a list of anonymous subscriptions so clients registered for the subscription get notifications.
DBMS_AQ.POST ( post_list IN SYS.AQ$_POST_INFO_LIST, count IN NUMBER);
Several subscriptions can be posted to at one time. Posting to a subscription involves identifying the subscription name and the payload, if wanted. It is possible for no payload to be associated with this call. This call provides a best-effort guarantee. A notification goes to registered clients at most once.
This call is primarily used for lightweight notification and is useful in the case of several system events. If an application needs more rigid guarantees, then it can use Oracle Streams AQ functionality by enqueuing to a queue.
When using OCI, you must specify a subscription handle at registration time with the namespace attribute set to OCI_SUBSCR_NAMESPACE_ANONYMOUS
.
When using PL/SQL, the namespace attribute in aq$_post_info
must be set to DBMS_AQ.NAMESPACE_ANONYMOUS
.
Related functions: OCIAQListen(), OCISvcCtxToLda(), OCISubscriptionEnable(), OCISubscriptionRegister(), OCISubscriptionUnRegister(), dbms_aq.register, dbms_aq.unregister.
Example 10-19 PL/SQL: Post of Object-Type Messages
-- Register for notification DECLARE reginfo sys.aq$_reg_info; reginfolist sys.aq$_reg_info_list; BEGIN -- Register for anonymous subscription PUBSUB1.ANONSTR, consumer_name ADMIN -- The PL/SQL callback pubsub1.mycallbk is invoked -- when a notification is received reginfo := sys.aq$_reg_info('PUBSUB1.ANONSTR:ADMIN', DBMS_AQ.NAMESPACE_ANONYMOUS, 'plsql://PUBSUB1.mycallbk', HEXTORAW('FF')); reginfolist := sys.aq$_reg_info_list(reginfo); sys.dbms_aq.register(reginfolist, 1); commit; END; / -- Post to an anonymous subscription DECLARE postinfo sys.aq$_post_info; postinfolist sys.aq$_post_info_list; BEGIN -- Post to the anonymous subscription PUBSUB1.ANONSTR, consumer_name ADMIN postinfo := sys.aq$_post_info('PUBSUB1.ANONSTR:ADMIN',0,HEXTORAW('FF')); postinfolist := sys.aq$_post_info_list(postinfo); sys.dbms_aq.post(postinfolist, 1); commit; END; /
Adds an agent to the Lightweight Directory Access Protocol (LDAP) server.
DBMS_AQ.BIND_AGENT( agent IN SYS.AQ$_AGENT, certificate IN VARCHAR2 default NULL);
This call takes an agent and an optional certificate location as the arguments, and adds the agent entry to the LDAP server. The certificate location parameter is the distinguished name of the LDAP entry that contains the digital certificate which the agent uses. If the agent does not have a digital certificate, then this parameter is defaulted to null.