Oracle® Database JDBC Developer's Guide and Reference, 11g Release 1 (11.1) Part Number B31224-01 |
|
|
View PDF |
Oracle Advanced Queuing (AQ) provides database-integrated message queuing functionality. It is built on top of Oracle Streams and leverages the functions of Oracle Database so that messages can be stored persistently, propagated between queues on different computers and databases, and transmitted using Oracle Net Services, HTTP, and HTTPS. Because Oracle AQ is implemented in database tables, all operational benefits of high availability, scalability, and reliability are also applicable to queue data. This chapter provides information about the Java interface to Oracle AQ.
This chapters covers the following topics:
Oracle Database 11g Release 1 (11.1) provides a fast Java interface to Oracle AQ by introducing a new Java package, oracle.jdbc.aq
. This package contains the following:
Classes
AQDequeueOptions
Specifies the options available for the dequeue operation
AQEnqueueOptions
Specifies the options available for the enqueue operation
AQFactory
Is a factory class for AQ
AQNotificationEvent
Is created whenever a new message is enqueued in a queue for which you have registered your interest
Interfaces
AQAgent
Used to represent and identify a user of the queue or a producer or consumer of the message
AQMessage
Represents a message that is enqueued or dequeued
AQMessageProperties
Contains message properties such as Correlation, Sender, Delay and Expiration, Recipients, and Priority and Ordering
AQNotificationListener
Is a listener interface for receiving AQ notification events
AQNotificationRegistration
Represents your interest in being notified when a new message is enqueued in a particular queue
These classes and interfaces enable you to access an existing queue, create messages, and enqueue and dequeue messages.
Note:
Oracle JDBC drivers do not provide any API to create a queue. Queues must be created through theDBMS_AQADM
PL/SQL package.See:
For more information about the APIs, refer to the JavaDoc.A JDBC application can:
Register interest in notifications in the AQ namespace and receive notifications when an enqueue occurs
Register interest in subscriptions to database events and receive notifications when the events are triggered
Registered clients are notified asynchronously when events are triggered or on an explicit AQ enqueue (or a new message is enqueued in a queue for which you have registered your interest). Clients do not need to be connected to a database.
Example
Example 25-1 illustrates the use of the new JDBC AQ interface which is new feature of the 11.1 JDBC thin driver. It shows how to enqueue and dequeue from a RAW type single and multi consumer queue. It also shows how AQ asynchronous notification works. In this example, the SCOTT
user is connecting to the database. Therefore in the database you need to grant the following privileges to the user:
GRANT EXECUTE ON DBMS_AQ to SCOTT; GRANT EXECUTE ON DBMS_AQADM to SCOTT; GRANT AQ_ADMINISTRATOR_ROLE TO SCOTT; GRANT ADMINISTER DATABASE TRIGGER TO SCOTT;
Example 25-1 AQ asynchronous event notification example
import java.sql.*; import java.util.Properties; import oracle.jdbc.*; import oracle.jdbc.aq.*; public class DemoAQRawQueue { static final String USERNAME= "scott"; static final String PASSWORD= "tiger"; static final String URL = "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS=(PROTOCOL=tcp)(HOST=oracleserver.mydomain.com) (PORT=1521))"+"(CONNECT_DATA=(SERVICE_NAME=mydatabaseinstance)))"; public static final void main(String[] argv) { DemoAQRawQueue demo = new DemoAQRawQueue(); try { demo.run(); }catch(SQLException ex) { ex.printStackTrace(); } } void run() throws SQLException { OracleConnection connection = connect(); connection.setAutoCommit(false); cleanup(connection); setup(connection); // run the demo for single consumer queue: demoSingleConsumerQueue(connection); // run the demo for multi consumer queue: demoMultipleConsumerQueue(connection); connection.close(); } /** * Single consumer queue demo: * This method enqueues a dummy message into the RAW_SINGLE_QUEUE queue and dequeues it. * Then it registers for AQ notification on this same queue and enqueues a * message again. The AQ listener will be notified that a new message has arrived * in the queue and it will dequeue it. */ void demoSingleConsumerQueue(OracleConnection connection) throws SQLException { System.out.println("\n ============= Start single consumer queue demo ============= \n"); String queueType = "RAW"; String queueName = USERNAME+".RAW_SINGLE_QUEUE"; enqueueDummyMessage(connection,queueName,null); dequeue(connection,queueName,queueType,null); AQNotificationRegistration reg = registerForAQEvents( connection,queueName); DemoAQRawQueueListener single_li = new DemoAQRawQueueListener( queueName,queueType); reg.addListener(single_li); enqueueDummyMessage(connection,queueName,null); connection.commit(); while(single_li.getEventsCount() < 1) { try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) { } } single_li.closeConnection(); connection.unregisterAQNotification(reg); } /** * Multi consumer queue demo: * This method first registers for AQ notification upon the agent "BLUE". It * then enqueues a message for "RED" and "BLUE" */ void demoMultipleConsumerQueue(OracleConnection connection) throws SQLException { System.out.println("\n ============= Start multi consumer queue demo ============= \n"); String queueType = "RAW"; String queueName = USERNAME+".RAW_MULTIPLE_QUEUE"; AQNotificationRegistration reg = registerForAQEvents( connection,queueName+":BLUE"); DemoAQRawQueueListener multi_li = new DemoAQRawQueueListener( queueName,queueType); reg.addListener(multi_li); AQAgent[] recipients = new AQAgent[2]; recipients[0] = AQFactory.createAQAgent(); recipients[0].setName("BLUE"); recipients[1] = AQFactory.createAQAgent(); recipients[1].setName("RED"); enqueueDummyMessage(connection,queueName,recipients); connection.commit(); while(multi_li.getEventsCount() < 1) { try { Thread.currentThread().sleep(1000); } catch (InterruptedException e) { } } dequeue(connection,queueName,queueType,"RED"); multi_li.closeConnection(); connection.unregisterAQNotification(reg); } /** * This method enqueues a dummy message in the queue specified by its name. */ public void enqueueDummyMessage(OracleConnection conn, String queueName, AQAgent[] recipients) throws SQLException { System.out.println("----------- Enqueue start ------------"); // First create the message properties: AQMessageProperties msgprop = AQFactory.createAQMessageProperties(); msgprop.setCorrelation("mycorrelation"); msgprop.setExceptionQueue("MY_EXCEPTION_QUEUE"); // Specify an agent as the sender: AQAgent ag = AQFactory.createAQAgent(); ag.setName("MY_SENDER_AGENT_NAME"); ag.setAddress("MY_SENDER_AGENT_ADDRESS"); msgprop.setSender(ag); // handle multi consumer case: if(recipients != null) msgprop.setRecipientList(recipients); System.out.println(msgprop.toString()); // Create the actual AQMessage instance: AQMessage mesg = AQFactory.createAQMessage(msgprop); // and add a payload: byte[] rawPayload = new byte[500]; for(int i=0;i<rawPayload.length;i++) rawPayload[i] = 'b'; mesg.setPayload(new oracle.sql.RAW(rawPayload)); // We want to retrieve the message id after enqueue: AQEnqueueOptions opt = new AQEnqueueOptions(); opt.setRetrieveMessageId(true); // execute the actual enqueue operation: conn.enqueue(queueName,opt,mesg); byte[] mesgId = mesg.getMessageId(); if(mesgId != null) { String mesgIdStr = byteBufferToHexString(mesgId,20); System.out.println("Message ID from enqueue call: "+mesgIdStr); } System.out.println("----------- Enqueue done ------------"); } /** * This methods dequeues the next available message from the queue specified * by "queueName". */ public void dequeue(OracleConnection conn, String queueName, String queueType, String consumerName ) throws SQLException { System.out.println("----------- Dequeue start ------------"); AQDequeueOptions deqopt = new AQDequeueOptions(); deqopt.setRetrieveMessageId(true); if(consumerName != null) deqopt.setConsumerName(consumerName); // dequeue operation: AQMessage msg = conn.dequeue(queueName,deqopt,queueType); // print out the message that has been dequeued: byte[] payload = msg.getPayload(); byte[] msgId = msg.getMessageId(); if(msgId != null) { String mesgIdStr = byteBufferToHexString(msgId,20); System.out.println("ID of message dequeued = "+mesgIdStr); } AQMessageProperties msgProp = msg.getMessageProperties(); System.out.println(msgProp.toString()); String payloadStr = new String(payload,0,10); System.out.println("payload.length="+payload.length+", value="+payloadStr); System.out.println("----------- Dequeue done ------------"); } public AQNotificationRegistration registerForAQEvents( OracleConnection conn, String queueName) throws SQLException { Properties globalOptions = new Properties(); String[] queueNameArr = new String[1]; queueNameArr[0] = queueName; Properties[] opt = new Properties[1]; opt[0] = new Properties(); opt[0].setProperty(OracleConnection.NTF_AQ_PAYLOAD,"true"); AQNotificationRegistration[] regArr = conn.registerAQNotification(queueNameArr,opt,globalOptions); AQNotificationRegistration reg = regArr[0]; return reg; } /** * */ private void setup(Connection conn) throws SQLException { doUpdateDatabase(conn, "BEGIN "+ "DBMS_AQADM.CREATE_QUEUE_TABLE( "+ " QUEUE_TABLE => '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE', "+ " QUEUE_PAYLOAD_TYPE => 'RAW', "+ " COMPATIBLE => '10.0'); "+ "END; "); doUpdateDatabase(conn, "BEGIN "+ "DBMS_AQADM.CREATE_QUEUE( "+ " QUEUE_NAME => '"+USERNAME+".RAW_SINGLE_QUEUE', "+ " QUEUE_TABLE => '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE'); "+ "END; "); doUpdateDatabase(conn, "BEGIN "+ " DBMS_AQADM.START_QUEUE('"+USERNAME+".RAW_SINGLE_QUEUE'); "+ "END; "); // create a multi consumer RAW queue: doUpdateDatabase(conn, "BEGIN "+ "DBMS_AQADM.CREATE_QUEUE_TABLE( "+ " QUEUE_TABLE => '"+USERNAME+".RAW_MULTIPLE_QUEUE_TABLE', "+ " QUEUE_PAYLOAD_TYPE => 'RAW', "+ " MULTIPLE_CONSUMERS => TRUE, "+ " COMPATIBLE => '10.0'); "+ "END; "); doUpdateDatabase(conn, "BEGIN "+ "DBMS_AQADM.CREATE_QUEUE( "+ " QUEUE_NAME => '"+USERNAME+".RAW_MULTIPLE_QUEUE', "+ " QUEUE_TABLE => '"+USERNAME+".RAW_MULTIPLE_QUEUE_TABLE'); "+ "END; "); doUpdateDatabase(conn, "BEGIN "+ " DBMS_AQADM.START_QUEUE('"+USERNAME+".RAW_MULTIPLE_QUEUE'); "+ "END; "); } void cleanup( Connection conn) { tryUpdateDatabase(conn, "BEGIN "+ " DBMS_AQADM.STOP_QUEUE('"+USERNAME+".RAW_SINGLE_QUEUE'); "+ "END; "); tryUpdateDatabase(conn, "BEGIN "+ " DBMS_AQADM.DROP_QUEUE_TABLE( "+ " QUEUE_TABLE => '"+USERNAME+".RAW_SINGLE_QUEUE_TABLE', "+ " FORCE => TRUE); "+ "END; "); tryUpdateDatabase(conn, "BEGIN "+ " DBMS_AQADM.STOP_QUEUE('"+USERNAME+".RAW_MULTIPLE_QUEUE'); "+ "END; "); tryUpdateDatabase(conn, "BEGIN "+ " DBMS_AQADM.DROP_QUEUE_TABLE( "+ " QUEUE_TABLE => '"+USERNAME+".RAW_MULTIPLE_QUEUE_TABLE', "+ " FORCE => TRUE); "+ "END; "); } /** * Creates a connection the database. */ OracleConnection connect() throws SQLException { OracleDriver dr = new OracleDriver(); Properties prop = new Properties(); prop.setProperty("user",DemoAQRawQueue.USERNAME); prop.setProperty("password",DemoAQRawQueue.PASSWORD); return (OracleConnection)dr.connect(DemoAQRawQueue.URL,prop); } /** * Utility method: executes a DML query but doesn't throw any exception if * an error occurs. */ private void tryUpdateDatabase(Connection conn,String sql) { Statement stmt = null; try { stmt = conn.createStatement(); stmt.executeUpdate(sql); }catch(SQLException sqlex) { System.out.println("Exception ("+sqlex.getMessage()+") while trying to execute \""+sql+"\""); } finally { if(stmt != null) { try { stmt.close(); }catch(SQLException exx){exx.printStackTrace();} } } } /** * Utility method: executes a DML query and throws an exception if * an error occurs. */ private void doUpdateDatabase(Connection conn,String sql) throws SQLException { Statement stmt = null; try { stmt = conn.createStatement(); stmt.executeUpdate(sql); } finally { if(stmt != null) { try { stmt.close(); }catch(SQLException exx){} } } } static final String byteBufferToHexString(byte[] buffer, int maxNbOfBytes) { if(buffer == null) return null; int offset = 0; boolean isFirst = true; StringBuffer sb = new StringBuffer(); while(offset < buffer.length && offset < maxNbOfBytes) { if(!isFirst) sb.append(' '); else isFirst = false; String hexrep = Integer.toHexString((int)buffer[offset]&0xFF); if(hexrep.length() == 1) hexrep = "0"+hexrep; sb.append(hexrep); offset++; } String ret= sb.toString(); return ret; } } class DemoAQRawQueueListener implements AQNotificationListener { OracleConnection conn; String queueName; String typeName; int eventsCount = 0; public DemoAQRawQueueListener(String _queueName, String _typeName) throws SQLException { queueName = _queueName; typeName = _typeName; conn = (OracleConnection)DriverManager.getConnection (DemoAQRawQueue.URL, DemoAQRawQueue.USERNAME, DemoAQRawQueue.PASSWORD); } public void onAQNotification(AQNotificationEvent e) { System.out.println("\n----------- DemoAQRawQueueListener: got an event ----------- "); System.out.println(e.toString()); System.out.println("------------------------------------------------------"); System.out.println("----------- DemoAQRawQueueListener: Dequeue start -----------"); try { AQDequeueOptions deqopt = new AQDequeueOptions(); deqopt.setRetrieveMessageId(true); if(e.getConsumerName() != null) deqopt.setConsumerName(e.getConsumerName()); if((e.getMessageProperties()).getDeliveryMode() == AQMessageProperties.MESSAGE_BUFFERED) { deqopt.setDeliveryMode(AQDequeueOptions.DEQUEUE_BUFFERED); deqopt.setVisibility(AQDequeueOptions.DEQUEUE_IMMEDIATE); } AQMessage msg = conn.dequeue(queueName,deqopt,typeName); byte[] msgId = msg.getMessageId(); if(msgId != null) { String mesgIdStr = DemoAQRawQueue.byteBufferToHexString(msgId,20); System.out.println("ID of message dequeued = "+mesgIdStr); } System.out.println(msg.getMessageProperties().toString()); byte[] payload = msg.getPayload(); if(typeName.equals("RAW")) { String payloadStr = new String(payload,0,10); System.out.println("payload.length="+payload.length+", value="+payloadStr); } System.out.println("----------- DemoAQRawQueueListener: Dequeue done -----------"); } catch(SQLException sqlex) { System.out.println(sqlex.getMessage()); } eventsCount++; } public int getEventsCount() { return eventsCount; } public void closeConnection() throws SQLException { conn.close(); } }
Before you enqueue a message you need to create the message. An instance of a class implementing the AQMessage
interface represents an AQ message. An AQ message contains properties (metadata) and a payload (data). To create an AQ message:
Create an instance of AQMessageProperties
.
Set the property attributes.
Create the AQ message using the AQMessageProperties
object.
Set the payload.
AQ Message Properties
The properties of the AQ message are represented by an instance of the AQMessageProperties
interface. You can set or get the following message properties:
Dequeue Attempts Count: Specifies the number of attempts that have been made to dequeue the message. This property cannot be set.
Correlation: Is an identifier supplied by the producer of the message at the time of enqueuing the message.
Delay: Is the number of seconds for which the message is in the WAITING
state. After the specified delay, the message is in the READY
state and available for dequeuing. Dequeuing a message by using the message ID (msgid) overrides the delay specification.
Note:
Delay is not supported with buffered messaging.Delivery Mode: Specifies whether the message is a buffered message or a persistent message. This property cannot be set.
Enqueue Time: Specifies the time at which the message was enqueued. This value is determined by the system and cannot be set by the user.
Exception Queue: Specifies the name of the queue into which the message is moved if it cannot be processed successfully. Messages are moved in two cases:
The number of unsuccessful dequeue attempts has exceeded max_retries.
The message has expired.
Expiration: Is the number of seconds during which the message is available for dequeuing, starting from when the message reaches the READY
state. If the message is not dequeued before it expires, then it is moved to the exception queue in the EXPIRED
state.
Message State: Specifies the state of the message at the time of dequeuing the message. This property cannot be set.
Previous Queue Message ID: Is the ID of the message in the last queue that generated the current message. When a message is propagated from one queue to another, this attribute identifies the ID of the queue from which it was last propagated. This property cannot be set.
Priority: Specifies the priority of the message. It can be any integer including negative integers. Smaller the value, higher the priority.
Recipient list: Is a list of AQAgent
objects that represent the recipients. The default recipients are the queue subscribers. This parameter is valid only for multiconsumer queues.
Sender: Is an identifier specified by the producer at the time of enqueuing the message. It is an instance of AQAgent
.
Transaction group: Specifies the transaction group of the message for transaction-grouped queues. It is set after a successful call to the dequeueArray
method.
The following code snippet illustrates how to create an AQMessageProperties
object and create an AQ message using it:
... // Create the message properties object AQMessageProperties msgprop = AQFactory.createAQMessageProperties(); // Set some properties (optional) msgprop.setCorrelation("mycorrelation"); msgprop.setExceptionQueue("MY_EXCEPTION_QUEUE"); msgprop.setExpiration(0); msgprop.setPriority(1); AQAgent ag = AQFactory.createAQAgent(); ag.setName("MY_SENDER_AGENT_NAME");ag.setAddress("MY_SENDER_AGENT_ADDRESS");msgprop.setSender(ag); msgprop.setRecipientList(recipients); // Create the message AQMessage mesg = AQFactory.createAQMessage(msgprop); ...
AQ Message Payload
Depending on the type of the queue, the payload of the AQ message can be specified using the setPayload
method of the AQMessage
interface. The following code snippet illustrates how to set the payload:
... byte[] rawPayload = new byte[500];for(int i=0;i<rawPayload.length;i++) rawPayload[i] = 'b'; mesg.setPayload(new oracle.sql.RAW(rawPayload)); ...
You can retrieve the payload of an AQ message using the getPayload
method or the appropriate get
XXX
Payload
method. These methods are defined in the AQMessage
interface.
After you create a message and set the message properties and payload, you can enqueue the message using the enqueue
method of the OracleConnection
interface. Before you enqueue the message, you can specify some enqueue options. The AQEnqueueOptions
class enables you to specify the following enqueue options:
Delivery mode: Specifies the delivery mode. Delivery mode can be set to either persistent (ENQUEUE_PERSISTENT
) or buffered (ENQUEUE_BUFFERED
).
Retrieve Message ID: Specifies whether the message ID has to be retrieved from the server when the message has been enqueued. By default, the message ID is not retrieved.
Transformation: Specifies a transformation that will be applied before enqueuing the message. The return type of the transformation function must match the type of the queue.
Note:
Transformations must be created in PL/SQL usingDBMS_TRANSFORM.CREATE_TRANSFORMATION(...)
.Visibility: Specifies the transactional behavior of the enqueue request. The default value for this option is ENQUEUE_ON_COMMIT
. It indicates that the enqueue operation is part of the current transaction. ENQUEUE_IMMEDIATE
indicates that the enqueue operation is an autonomous transaction, which commits at the end of the operation. For buffered messaging, you must use ENQUEUE_IMMEDIATE
.
The following code snippet illustrates how to set the enqueue options and enqueue the message:
... // Set the enqueue options AQEnqueueOptions opt = new AQEnqueueOptions();opt.setRetrieveMessageId(true); // Enqueue the message // conn is an instance of OracleConnection // queueName is a String // mesg is an instance of AQMessage conn.enqueue(queueName, opt, mesg); ...
Enqueued messages can be dequeued using the dequeue
method of the OracleConnection
interface. Before you dequeue a message you must set the dequeue options. The AQDequeueOptions
class enables you to specify the following dequeue options:
Condition: Specifies a conditional expression based on the message properties, the message data properties, and PL/SQL functions. A dequeue condition is specified as a Boolean
expression using syntax similar to the WHERE
clause of a SQL query.
Consumer name: If specified, only the messages matching the consumer name are accessed.
Note:
If the queue is a single-consumer queue, do not set this option.Correlation: Specifies a correlation criterion (or search criterion) for the dequeue operation.
Delivery Filter: Specifies the type of message to be dequeued. You dequeue buffered messages only (DEQEUE_BUFFERED
) or persistent messages only (DEQUEUE_PERSISTENT
), which is the default, or both (DEQUEUE_PERSISTENT_OR_BUFFERED
).
Dequeue Message ID: Specifies the message identifier of the message to be dequeued. This can be used to dequeue a unique message whose ID is known.
Dequeue mode: Specifies the locking behavior associated with the dequeue operation. It can take one of the following values:
DequeueMode.BROWSE
: Message is dequeued without acquiring any lock
DequeueMode.LOCKED
: Message is dequeued with a write lock that lasts for the duration of the transaction.
DequeueMode.REMOVE
: (default) Message is dequeued and deleted. The message can be retained in the queue based on the retention properties.
DequeueMode.REMOVE_NO_DATA
: Message is marked as updated or deleted.
Maximum Buffer Length: Specifies the maximum number of bytes that will be allocated when dequeuing a message from a RAW
queue. The default maximum is DEFAULT_MAX_PAYLOAD_LENGTH
but it can be changed to any other non zero value. If the buffer is not large enough to contain the entire message, then the exceeding bytes will be silently ignored.
Navigation: Specifies the position of the message that will be retrieved. It can take one of the following values:
NavigationOption.FIRST_MESSAGE
: The first available message matching the search criteria is dequeued.
NavigationOption.NEXT_MESSAGE
: (default) The next available message matching the search criteria is dequeued. If the previous message belongs to a message group, then the next available message matching the search criteria in the message group is dequeued.
NavigationOption.NEXT_TRANSACTION
: Messages in the current transaction group are skipped, and the first message of the next transaction group is dequeued. This setting can be used only if message grouping is enabled for the queue.
Retrieve Message ID: Specifies whether the message identifier of the dequeued message needs to be retrieved. By default, it is not retrieved.
Transformation: Specifies a transformation that will be applied after dequeuing the message. The source type of the transformation must match the type of the queue.
Note:
Transformations must be created in PL/SQL usingDBMS_TRANSFORM.CREATE_TRANSFORMATION(...)
.Visibility: Specifies whether the message is dequeued as part of the current transaction. It can take one of the following values:
VisibilityOption.ON_COMMIT
: (default) The dequeue operation is part of the current transaction.
VisibilityOption.IMMEDIATE
: The dequeue operation is an autonomous transaction that commits at the end of the operation.
Note:
The Visibility option is ignored in theDequeueMode.BROWSE
dequeue mode. If the delivery filter is DEQUEUE_BUFFERED
or DEQUEUE_PERSISTENT_OR_BUFFERED
, then this option must be set to VisibilityOption.IMMEDIATE
.Wait: Specifies the wait time for the dequeue operation, if none of the messages match the search criteria. The default value is DEQUEUE_WAIT_FOREVER
indicating that the operation waits forever. If set to DEQUEUE_NO_WAIT
, then the operation does not wait. If a number is specified, then the dequeue operation waits for the specified number of seconds.
Note:
If you useDEQUEUE_WAIT_FOREVER
, then the dequeue operation will not return until a message which matches the search criterion is available in the queue. However, you can interrupt the dequeue operation by calling the cancel
method on the OracleConnection
object.The following code snippet illustrates how to set the dequeue options and dequeue the message:
... // Set the dequeue options AQDequeueOptions deqopt = new AQDequeueOptions(); deqopt.setRetrieveMessageId(true); deqopt.setConsumerName(consumerName); // Dequeue the message // conn is an OracleConnection object // queueName is a String identifying the queue // queueType is a String specifying the type of the queue, such as RAW AQMessage msg = conn.dequeue(queueName,deqopt,queueType);
This section provides a few examples that illustrate how to enqueue and dequeue messages.
Example 25-2 illustrates how to enqueue a message, and Example 25-3 illustrates how to dequeue a message.
Example 25-2 Enqueuing a Single Message
This example illustrates how to obtain access to a queue, create a message, and enqueue it.
// Get access to the queue
// conn is an instance of oracle.jdbc.OracleConnection
oracle.jdbc.aq.AQQueue queue = conn.getAQQueue("SCOTT.MY_QUEUE","RAW");
// Create the message properties object
oracle.jdbc.aq.AQMessageProperties msgprop =
oracle.jdbc.aq.AQMessageProperties.createAQMessageProperties();
// Set some properties (optional)
msgprop.setPriority(1);
msgprop.setExceptionQueue("EXCEPTION_QUEUE");
msgprop.setExpiration(0);
AQAgent agent = AQAgent.createAQAgent("AGENTNAME");
ag2.setAddress("AGENTADDRESS");
msgprop.setSender(agent);
// Create the message
AQMessage mesg = AQMessage.createAQMessage(msgprop);
// Set the payload
// where buffer contains the RAW payload:
mesg.setRawPayload(new AQRawPayload(buffer));
// Set the enqueue options
AQEnqueueOptions options = new AQEnqueueOptions();
//Set the enqueue options using the setXXX methods of AQEnqueueOptions.
// Enqueue the message
queue.enqueue(options, mesg);
Example 25-3 Dequeuing a Single Message
This example illustrates how to obtain access to a queue, set the dequeue options, and dequeue the message.
// Get access to the queue // conn is an instance of oracle.jdbc.OracleConnection oracle.jdbc.aq.AQQueue queue = conn.getAQQueue ("SCOTT.MY_QUEUE","RAW"); // Set the dequeue options AQDequeueOptions options = new AQDequeueOptions(); options.setDeliveryMode(AQDequeueOptions.BUFFERED); // Dequeue the message AQMessage mesg = queue.dequeue(options);