Oracle® Streams Advanced Queuing User's Guide and Reference Release 10.1 Part Number B10785-01 |
|
|
View PDF |
This chapter illustrates a messaging environment that can be constructed using Oracle Streams.
This chapter contains these topics:
Enqueue and Dequeue Events Using JMS
See Also: Oracle Streams Concepts and Administration for more information about messaging andSYS.AnyData queues |
This example illustrates using a single SYS.AnyData
queue at a database called oedb.net
to create a Oracle Streams messaging environment in which events containing message payloads of different types are stored in the same queue. Specifically, this example illustrates the following messaging features of Oracle Streams:
Enqueuing messages containing order payload and customer payload as SYS.Anydata
events into the queue
Enqueuing messages containing row LCR payload as SYS.Anydata
events into the queue
Creating a rule set for applying the events
Creating an evaluation context used by the rule set
Creating a Oracle Streams apply process to dequeue and process the events based on rules
Creating a message handler and associating it with the apply process
Explicitly dequeuing and processing events based on rules without using the apply process
Figure 24-1 provides an overview of this environment.
Figure 24-1 Example Oracle Streams Messaging Environment
The following tasks must be completed before you begin the example in this section.
Set initialization parameter COMPATIBLE
to 9.2.0
or higher.
Configure your network and Oracle Net so that you can access the oedb.net
database from the client where you run these scripts.
This example creates a new user to function as the Oracle Streams administrator (strmadmin
) and prompts you for the tablespace you want to use for this user's data. Before you start this example, either create a new tablespace or identify an existing tablespace for the Oracle Streams administrator to use. The Oracle Streams administrator should not use the SYSTEM
tablespace.
Complete the following steps to set up users and create a SYS.AnyData
queue for a Oracle Streams messaging environment.
Note: If you are viewing this document online, then you can copy the text from the "BEGINNING OF SCRIPT" line on this page to the next "END OF SCRIPT" line into a text editor and then edit the text to create a script for your environment. Run the script with SQL*Plus on a computer that can connect to the database. |
/************************* BEGINNING OF SCRIPT ******************************
Step 1 Show Output and Spool Results
Run SET
ECHO
ON
and specify the spool file for the script. Check the spool file for errors after you run this script.
*/ SET ECHO ON SPOOL streams_setup_message.out /*
Step 2 Set Up Users
Connect to oedb.net
as SYS
user.
*/ CONNECT SYS/CHANGE_ON_INSTALL@oedb.net AS SYSDBA /*
This example uses the oe
sample schema. For this example to work properly, the oe
user must have privileges to run the subprograms in the DBMS_AQ
package. The oe
user is specified as the queue user when the SYS.AnyData
queue is created in Step 3. The SET_UP_QUEUE
procedure grants the oe user enqueue and dequeue privileges on the queue, but the oe
user also needs EXECUTE
privilege on the DBMS_AQ
package to enqueue events into and dequeue events from the queue.
Also, most of the configuration and administration actions illustrated in this example are performed by the Oracle Streams administrator. In this step, create the Oracle Streams administrator named strmadmin
and grant this user the necessary privileges. These privileges enable the user to run subprograms in packages related to Oracle Streams, create rule sets, create rules, and monitor the Oracle Streams environment by querying data dictionary views. You can choose a different name for this user.
Note:
|
*/ GRANT EXECUTE ON DBMS_AQ TO oe; GRANT CONNECT, RESOURCE, SELECT_CATALOG_ROLE TO strmadmin IDENTIFIED BY strmadminpw; ACCEPT streams_tbs PROMPT 'Enter the tablespace for the Oracle Streams administrator: ' ALTER USER strmadmin DEFAULT TABLESPACE &streams_tbs QUOTA UNLIMITED ON &streams_tbs; GRANT EXECUTE ON DBMS_APPLY_ADM TO strmadmin; GRANT EXECUTE ON DBMS_AQ TO strmadmin; GRANT EXECUTE ON DBMS_AQADM TO strmadmin; GRANT EXECUTE ON DBMS_STREAMS_ADM TO strmadmin; BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ, grantee => 'strmadmin', grant_option => FALSE); END; / BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ, grantee => 'strmadmin', grant_option => FALSE); END; / BEGIN DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( privilege => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ, grantee => 'strmadmin', grant_option => FALSE); END; / /*
Step 3 Create the SYS.AnyData Queue
Connect as the Oracle Streams administrator.
*/ CONNECT strmadmin/strmadminpw@oedb.net /*
Run the SET_UP_QUEUE
procedure to create a queue named oe_queue
at oedb.net
. This queue functions as the SYS.AnyData
queue by holding events used in the messaging environment.
Running the SET_UP_QUEUE
procedure performs the following actions:
Creates a queue table named oe_queue_table
. This queue table is owned by the Oracle Streams administrator (strmadmin
) and uses the default storage of this user.
Creates a queue named oe_queue
owned by the Oracle Streams administrator (strmadmin
)
Starts the queue
*/ BEGIN DBMS_STREAMS_ADM.SET_UP_QUEUE( queue_table => 'oe_queue_table', queue_name => 'oe_queue'); END; / /*
Step 4 Grant the oe User Privileges on the Queue
*/
BEGIN SYS.DBMS_AQADM.GRANT_QUEUE_PRIVILEGE( privilege => 'ALL', queue_name => 'strmadmin.oe_queue', grantee => 'oe'); END; / /*
Step 5 Create an Agent for Explicit Enqueue
Create an agent that will be used to perform explicit enqueue operations on the oe_queue
queue.
*/ BEGIN SYS.DBMS_AQADM.CREATE_AQ_AGENT( agent_name => 'explicit_enq'); END; / /*
Step 6 Associate the oe User with the explicit_enq Agent
For a user to perform queue operations, such as enqueue and dequeue, on a secure queue, the user must be configured as a secure queue user of the queue. The oe_queue
queue is a secure queue because it was created using SET_UP_QUEUE
. This step enables the oe
user to perform enqueue operations on this queue.
*/ BEGIN DBMS_AQADM.ENABLE_DB_ACCESS( agent_name => 'explicit_enq', db_username => 'oe'); END; / /*
Step 7 Check the Spool Results
Check the streams_setup_message.out
spool file to ensure that all actions completed successfully after this script completes.
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
Complete the following steps to create one PL/SQL procedure that enqueues non-LCR events into the SYS.AnyData
queue and one PL/SQL procedure that enqueues row LCR events into the SYS.AnyData
queue.
Note: If you are viewing this document online, then you can copy the text from the "BEGINNING OF SCRIPT" line on this page to the next "END OF SCRIPT" line into a text editor and then edit the text to create a script for your environment. Run the script with SQL*Plus on a computer that can connect to the database. |
/************************* BEGINNING OF SCRIPT ******************************
Step 1 Show Output and Spool Results
Run SET
ECHO
ON
and specify the spool file for the script. Check the spool file for errors after you run this script.
*/ SET ECHO ON SPOOL streams_enqprocs_message.out /*
Step 2 Create a Type to Represent Orders
Connect as oe
.
*/ CONNECT oe/oe@oedb.net /*
Create a type to represent orders based on the columns in the oe.orders
table. The type attributes include the columns in the oe.orders
table, along with one extra attribute named action
. The value of the action
attribute for instances of this type is used to determine correct action to perform on the instance (either apply process dequeue or explicit dequeue). This type is used for events that are enqueued into the SYS.AnyData
queue.
*/ CREATE OR REPLACE TYPE order_event_typ AS OBJECT ( order_id NUMBER(12), order_date TIMESTAMP(6) WITH LOCAL TIME ZONE, order_mode VARCHAR2(8), customer_id NUMBER(6), order_status NUMBER(2), order_total NUMBER(8,2), sales_rep_id NUMBER(6), promotion_id NUMBER(6), action VARCHAR(7)); / /*
Step 3 Create a Type to Represent Customers
Create a type to represent customers based on the columns in the oe.customers
table. The type attributes include the columns in the oe.customers
table, along with one extra attribute named action
. The value of the action
attribute for instances of this type is used to determine correct action to perform on the instance (either apply process dequeue or explicit dequeue). This type is used for events that are enqueued into the SYS.AnyData
queue.
*/ CREATE OR REPLACE TYPE customer_event_typ AS OBJECT ( customer_id NUMBER(6), cust_first_name VARCHAR2(20), cust_last_name VARCHAR2(20), cust_address CUST_ADDRESS_TYP, phone_numbers PHONE_LIST_TYP, nls_language VARCHAR2(3), nls_territory VARCHAR2(30), credit_limit NUMBER(9,2), cust_email VARCHAR2(30), account_mgr_id NUMBER(6), cust_geo_location MDSYS.SDO_GEOMETRY, action VARCHAR(7)); / /*
Step 4 Create the Procedure to Enqueue Non-LCR Events
Create a PL/SQL procedure called enq_proc
to enqueue events into the SYS.AnyData
queue.
Note: A single enqueued message can be dequeued by an apply process and by an explicit dequeue, but this example does not illustrate this capability. |
*/ CREATE OR REPLACE PROCEDURE oe.enq_proc (event IN SYS.Anydata) IS enqopt DBMS_AQ.ENQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; enq_eventid RAW(16); BEGIN mprop.SENDER_ID := SYS.AQ$_AGENT('explicit_enq', NULL, NULL); DBMS_AQ.ENQUEUE( queue_name => 'strmadmin.oe_queue', enqueue_options => enqopt, message_properties => mprop, payload => event, msgid => enq_eventid); END; / /*
Step 5 Create a Procedure to Construct and Enqueue Row LCR Events
Create a procedure called enq_row_lcr
that constructs a row LCR and then enqueues the row LCR into the queue.
*/ CREATE OR REPLACE PROCEDURE oe.enq_row_lcr( source_dbname VARCHAR2, cmd_type VARCHAR2, obj_owner VARCHAR2, obj_name VARCHAR2, old_vals SYS.LCR$_ROW_LIST, new_vals SYS.LCR$_ROW_LIST) AS eopt DBMS_AQ.ENQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; enq_msgid RAW(16); row_lcr SYS.LCR$_ROW_RECORD; BEGIN mprop.SENDER_ID := SYS.AQ$_AGENT('explicit_enq', NULL, NULL); -- Construct the LCR based on information passed to procedure row_lcr := SYS.LCR$_ROW_RECORD.CONSTRUCT( source_database_name => source_dbname, command_type => cmd_type, object_owner => obj_owner, object_name => obj_name, old_values => old_vals, new_values => new_vals); -- Enqueue the created row LCR DBMS_AQ.ENQUEUE( queue_name => 'strmadmin.oe_queue', enqueue_options => eopt, message_properties => mprop, payload => SYS.AnyData.ConvertObject(row_lcr), msgid => enq_msgid); END enq_row_lcr; / /*
Step 6 Check the Spool Results
Check the streams_enqprocs_message.out
spool file to ensure that all actions completed successfully after this script completes.
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
Complete the following steps to configure an apply process to apply the user-enqueued events in the SYS.AnyData
queue.
Create a Function to Determine the Value of the action Attribute
Create a Rule that Evaluates to TRUE if the Event Action Is apply
Note: If you are viewing this document online, then you can copy the text from the "BEGINNING OF SCRIPT" line on this page to the next "END OF SCRIPT" line into a text editor and then edit the text to create a script for your environment. Run the script with SQL*Plus on a computer that can connect to the database. |
/************************* BEGINNING OF SCRIPT ******************************
Step 1 Show Output and Spool Results
Run SET
ECHO
ON
and specify the spool file for the script. Check the spool file for errors after you run this script.
*/ SET ECHO ON SPOOL streams_apply_message.out /*
Step 2 Create a Function to Determine the Value of the action Attribute
Connect as oe
.
*/ CONNECT oe/oe@oedb.net /*
Create a function called get_oe_action
to determine the value of the action
attribute in the events in the queue. This function is used in rules later in this example to determine the value of the action
attribute for an event. Then, the clients of the rules engine perform the appropriate action for the event (either dequeue by apply process or explicit dequeue). In this example, the clients of the rules engine are the apply process and the oe.explicit_dq
PL/SQL procedure.
*/ CREATE OR REPLACE FUNCTION oe.get_oe_action (event IN SYS.Anydata) RETURN VARCHAR2 IS ord oe.order_event_typ; cust oe.customer_event_typ; num NUMBER; type_name VARCHAR2(61); BEGIN type_name := event.GETTYPENAME; IF type_name = 'OE.ORDER_EVENT_TYP' THEN num := event.GETOBJECT(ord); RETURN ord.action; ELSIF type_name = 'OE.CUSTOMER_EVENT_TYP' THEN num := event.GETOBJECT(cust); RETURN cust.action; ELSE RETURN NULL; END IF; END; / /*
Step 3 Create a Message Handler
Create a message handler called mes_handler
that will be used as a message handler by the apply process. This procedure takes the payload in a user-enqueued event of type oe.order_event_typ
or oe.customer_event_typ
and inserts it as a row in the oe.orders
table and oe.customers
table, respectively.
*/ CREATE OR REPLACE PROCEDURE oe.mes_handler (event SYS.AnyData) IS ord oe.order_event_typ; cust oe.customer_event_typ; num NUMBER; type_name VARCHAR2(61); BEGIN type_name := event.GETTYPENAME; IF type_name = 'OE.ORDER_EVENT_TYP' THEN num := event.GETOBJECT(ord); INSERT INTO oe.orders VALUES (ord.order_id, ord.order_date, ord.order_mode, ord.customer_id, ord.order_status, ord.order_total, ord.sales_rep_id, ord.promotion_id); ELSIF type_name = 'OE.CUSTOMER_EVENT_TYP' THEN num := event.GETOBJECT(cust); INSERT INTO oe.customers VALUES (cust.customer_id, cust.cust_first_name, cust.cust_last_name, cust.cust_address, cust.phone_numbers, cust.nls_language, cust.nls_territory, cust.credit_limit, cust.cust_email, cust.account_mgr_id, cust.cust_geo_location); END IF; END; / /*
Step 4 Grant strmadmin EXECUTE Privilege on the Procedures
*/
GRANT EXECUTE ON get_oe_action TO strmadmin; GRANT EXECUTE ON mes_handler TO strmadmin; /*
Step 5 Create the Evaluation Context for the Rule Set
Connect as the Oracle Streams administrator.
*/ CONNECT strmadmin/strmadminpw@oedb.net /*
Create the evaluation context for the rule set. The table alias is tab
in this example, but you can use a different table alias name if you wish.
*/ DECLARE table_alias SYS.RE$TABLE_ALIAS_LIST; BEGIN table_alias := SYS.RE$TABLE_ALIAS_LIST(SYS.RE$TABLE_ALIAS( 'tab', 'strmadmin.oe_queue_table')); DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT( evaluation_context_name => 'oe_eval_context', table_aliases => table_alias); END; / /*
Step 6 Create a Rule Set for the Apply Process
Create the rule set for the apply process.
*/ BEGIN DBMS_RULE_ADM.CREATE_RULE_SET( rule_set_name => 'apply_oe_rs', evaluation_context => 'strmadmin.oe_eval_context'); END; / /*
Step 7 Create a Rule that Evaluates to TRUE if the Event Action Is apply
Create a rule that evaluates to TRUE
if the action
value of an event
is apply
. Notice that tab.user_data
is passed to the oe.get_oe_action
function. The tab.user_data
column holds the event payload in a queue table. The table alias for the queue table was specified as tab
in Step 5.
*/ BEGIN DBMS_RULE_ADM.CREATE_RULE( rule_name => 'strmadmin.apply_action', condition => ' oe.get_oe_action(tab.user_data) = ''APPLY'' '); END; / /*
Step 8 Create a Rule that Evaluates to TRUE for the Row LCR Events
Create a rule that evaluates to TRUE
if the event in the queue is a row LCR that changes either the oe.orders
table or the oe.customers
table. This rule enables the apply process to apply user-enqueued changes to the tables directly. For convenience, this rule uses the Oracle-supplied evaluation context SYS.STREAMS$_EVALUATION_CONTEXT
because the rule is used to evaluate LCRs. When this rule is added to the rule set, this evaluation context is used for the rule during evaluation instead of the rule set's evaluation context.
*/ BEGIN DBMS_RULE_ADM.CREATE_RULE( rule_name => 'apply_lcrs', condition => ':dml.GET_OBJECT_OWNER() = ''OE'' AND ' || ' (:dml.GET_OBJECT_NAME() = ''ORDERS'' OR ' || ':dml.GET_OBJECT_NAME() = ''CUSTOMERS'') ', evaluation_context => 'SYS.STREAMS$_EVALUATION_CONTEXT'); END; / /*
Step 9 Add the Rules to the Rule Set
Add the rules created in Step 7 and Step 8 to the rule set created in Step 6.
*/ BEGIN DBMS_RULE_ADM.ADD_RULE( rule_name => 'apply_action', rule_set_name => 'apply_oe_rs'); DBMS_RULE_ADM.ADD_RULE( rule_name => 'apply_lcrs', rule_set_name => 'apply_oe_rs'); END; / /*
Step 10 Create an Apply Process
Create an apply process that is associated with the oe_queue
, that uses the apply_oe_rs
rule set, and that uses the mes_handler
procedure as a message handler.
*/ BEGIN DBMS_APPLY_ADM.CREATE_APPLY( queue_name => 'strmadmin.oe_queue', apply_name => 'apply_oe', rule_set_name => 'strmadmin.apply_oe_rs', message_handler => 'oe.mes_handler', apply_user => 'oe', apply_captured => false); END; / /*
Step 11 Grant EXECUTE Privilege on the Rule Set To oe User
Grant EXECUTE
privilege on the strmadmin.apply_oe_rs
rule set. Because oe
was specified as the apply user when the apply process was created in Step 10, oe
needs EXECUTE
privilege on the rule set used by the apply process.
*/ BEGIN DBMS_RULE_ADM.GRANT_OBJECT_PRIVILEGE( privilege => DBMS_RULE_ADM.EXECUTE_ON_RULE_SET, object_name => 'strmadmin.apply_oe_rs', grantee => 'oe', grant_option => FALSE); END; / /*
Step 12 Start the Apply Process
Set the disable_on_error
parameter to n
so that the apply process is not disabled if it encounters an error, and start the apply process at oedb.net
.
*/ BEGIN DBMS_APPLY_ADM.SET_PARAMETER( apply_name => 'apply_oe', parameter => 'disable_on_error', value => 'n'); END; / BEGIN DBMS_APPLY_ADM.START_APPLY( apply_name => 'apply_oe'); END; / /*
Step 13 Check the Spool Results
Check the streams_apply_message.out
spool file to ensure that all actions completed successfully after this script completes.
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
Complete the following steps to configure explicit dequeue of messages based on message contents.
Note: If you are viewing this document online, then you can copy the text from the "BEGINNING OF SCRIPT" line on this page to the next "END OF SCRIPT" line into a text editor and then edit the text to create a script for your environment. Run the script with SQL*Plus on a computer that can connect to the database. |
/************************* BEGINNING OF SCRIPT ******************************
Step 1 Show Output and Spool Results
Run SET
ECHO
ON
and specify the spool file for the script. Check the spool file for errors after you run this script.
*/ SET ECHO ON SPOOL streams_explicit_dq.out /*
Step 2 Create an Agent for Explicit Dequeue
Connect as the Oracle Streams administrator.
*/ CONNECT strmadmin/strmadminpw@oedb.net /*
Create an agent that will be used to perform explicit dequeue operations on the oe_queue
queue.
*/ BEGIN SYS.DBMS_AQADM.CREATE_AQ_AGENT( agent_name => 'explicit_dq'); END; / /*
Step 3 Associate the oe User with the explicit_dq Agent
For a user to perform queue operations, such as enqueue and dequeue, on a secure queue, the user must be configured as a secure queue user of the queue. The oe_queue
queue is a secure queue because it was created using SET_UP_QUEUE
. The oe
user is able to perform dequeue operations on this queue when the agent is used to create a subscriber to the queue in the next step.
*/ BEGIN DBMS_AQADM.ENABLE_DB_ACCESS( agent_name => 'explicit_dq', db_username => 'oe'); END; / /*
Step 4 Add a Subscriber to the oe_queue Queue
Add a subscriber to the oe_queue
queue. This subscriber will perform explicit dequeues of events. A subscriber rule is used to dequeue any events where the action
value is not apply
. If the action value is apply
for an event, then the event is ignored by the subscriber. Such events are dequeued and processed by the apply process.
*/ DECLARE subscriber SYS.AQ$_AGENT; BEGIN subscriber := SYS.AQ$_AGENT('explicit_dq', NULL, NULL); SYS.DBMS_AQADM.ADD_SUBSCRIBER( queue_name => 'strmadmin.oe_queue', subscriber => subscriber, rule => 'oe.get_oe_action(tab.user_data) != ''APPLY'''); END; / /*
Step 5 Create a Procedure to Dequeue Events Explicitly
Connect as oe
.
*/ CONNECT oe/oe@oedb.net /*
Create a PL/SQL procedure called explicit_dq
to dequeue events explicitly using the subscriber created in Step 4.
Note:
|
*/ CREATE OR REPLACE PROCEDURE oe.explicit_dq (consumer IN VARCHAR2) AS deqopt DBMS_AQ.DEQUEUE_OPTIONS_T; mprop DBMS_AQ.MESSAGE_PROPERTIES_T; msgid RAW(16); payload SYS.AnyData; new_messages BOOLEAN := TRUE; ord oe.order_event_typ; cust oe.customer_event_typ; tc pls_integer; next_trans EXCEPTION; no_messages EXCEPTION; pragma exception_init (next_trans, -25235); pragma exception_init (no_messages, -25228); BEGIN deqopt.consumer_name := consumer; deqopt.wait := 1; WHILE (new_messages) LOOP BEGIN DBMS_AQ.DEQUEUE( queue_name => 'strmadmin.oe_queue', dequeue_options => deqopt, message_properties => mprop, payload => payload, msgid => msgid); COMMIT; deqopt.navigation := DBMS_AQ.NEXT; DBMS_OUTPUT.PUT_LINE('Event Dequeued'); DBMS_OUTPUT.PUT_LINE('Type Name := ' || payload.GetTypeName); IF (payload.GetTypeName = 'OE.ORDER_EVENT_TYP') THEN tc := payload.GetObject(ord); DBMS_OUTPUT.PUT_LINE('order_id - ' || ord.order_id); DBMS_OUTPUT.PUT_LINE('order_date - ' || ord.order_date); DBMS_OUTPUT.PUT_LINE('order_mode - ' || ord.order_mode); DBMS_OUTPUT.PUT_LINE('customer_id - ' || ord.customer_id); DBMS_OUTPUT.PUT_LINE('order_status - ' || ord.order_status); DBMS_OUTPUT.PUT_LINE('order_total - ' || ord.order_total); DBMS_OUTPUT.PUT_LINE('sales_rep_id - ' || ord.sales_rep_id); DBMS_OUTPUT.PUT_LINE('promotion_id - ' || ord.promotion_id); END IF; IF (payload.GetTypeName = 'OE.CUSTOMER_EVENT_TYP') THEN tc := payload.GetObject(cust); DBMS_OUTPUT.PUT_LINE('customer_id - ' || cust.customer_id); DBMS_OUTPUT.PUT_LINE('cust_first_name - ' || cust.cust_first_name); DBMS_OUTPUT.PUT_LINE('cust_last_name - ' || cust.cust_last_name); DBMS_OUTPUT.PUT_LINE('street_address - ' || cust.cust_address.street_address); DBMS_OUTPUT.PUT_LINE('postal_code - ' || cust.cust_address.postal_code); DBMS_OUTPUT.PUT_LINE('city - ' || cust.cust_address.city); DBMS_OUTPUT.PUT_LINE('state_province - ' || cust.cust_address.state_province); DBMS_OUTPUT.PUT_LINE('country_id - ' || cust.cust_address.country_id); DBMS_OUTPUT.PUT_LINE('phone_number1 - ' || cust.phone_numbers(1)); DBMS_OUTPUT.PUT_LINE('phone_number2 - ' || cust.phone_numbers(2)); DBMS_OUTPUT.PUT_LINE('phone_number3 - ' || cust.phone_numbers(3)); DBMS_OUTPUT.PUT_LINE('nls_language - ' || cust.nls_language); DBMS_OUTPUT.PUT_LINE('nls_territory - ' || cust.nls_territory); DBMS_OUTPUT.PUT_LINE('credit_limit - ' || cust.credit_limit); DBMS_OUTPUT.PUT_LINE('cust_email - ' || cust.cust_email); DBMS_OUTPUT.PUT_LINE('account_mgr_id - ' || cust.account_mgr_id); END IF; EXCEPTION WHEN next_trans THEN deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION; WHEN no_messages THEN new_messages := FALSE; DBMS_OUTPUT.PUT_LINE('No more events'); END; END LOOP; END; / /*
Step 6 Check Spool Results
Check the streams_explicit_dq.out
spool file to ensure that all actions completed successfully after this script completes.
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
Complete the following steps to enqueue non-LCR events and row LCR events into the queue.
Note:
|
/************************* BEGINNING OF SCRIPT ******************************
Step 1 Show Output and Spool Results
Run SET
ECHO
ON
and specify the spool file for the script. Check the spool file for errors after you run this script.
*/ SET ECHO ON SPOOL streams_enq_deq.out /*
Step 2 Enqueue Non-LCR Events to be Dequeued by the Apply Process
Connect as oe
.
*/ CONNECT oe/oe@oedb.net /*
Enqueue events with apply
for the action
value. Based on the apply process rules, the apply process dequeues and processes these events with the oe.mes_handler
message handler procedure created in "Create a Message Handler". The COMMIT
after the enqueues makes these two enqueues part of the same transaction. An enqueued message is not visible until the session that enqueued it commits the enqueue.
*/ BEGIN oe.enq_proc(SYS.AnyData.convertobject(oe.order_event_typ( 2500,'05-MAY-01','online',117,3,44699,161,NULL,'APPLY'))); END; / BEGIN oe.enq_proc(SYS.AnyData.convertobject(oe.customer_event_typ( 990,'Hester','Prynne',oe.cust_address_typ('555 Beacon Street','Boston', 'MA',02109,'US'),oe.phone_list_typ('+1 617 123 4104', '+1 617 083 4381', '+1 617 742 5813'),'i','AMERICA',5000,'a@scarlet_letter.com',145, NULL,'APPLY'))); END; / COMMIT; /*
Step 3 Enqueue Non-LCR Events to be Dequeued Explicitly
Enqueue events with dequeue
for the action
value. The oe.explicit_dq
procedure created in "Create a Procedure to Dequeue Events Explicitly" dequeues these events because the action
is not apply
. Based on the apply process rules, the apply process ignores these events. The COMMIT
after the enqueues makes these two enqueues part of the same transaction.
*/ BEGIN oe.enq_proc(SYS.AnyData.convertobject(oe.order_event_typ( 2501,'22-JAN-00','direct',117,3,22788,161,NULL,'DEQUEUE'))); END; / BEGIN oe.enq_proc(SYS.AnyData.convertobject(oe.customer_event_typ( 991,'Nick','Carraway',oe.cust_address_typ('10th Street', 11101,'Long Island','NY','US'),oe.phone_list_typ('+1 718 786 2287', '+1 718 511 9114', '+1 718 888 4832'),'i','AMERICA',3000, 'nick@great_gatsby.com',149,NULL,'DEQUEUE'))); END; / COMMIT; /*
Step 4 Enqueue Row LCR Events to be Dequeued by the Apply Process
Enqueue row LCR events. The apply process applies these events directly. Enqueued LCRs should commit at transaction boundaries. In this step, a COMMIT
statement is run after each enqueue, making each enqueue a separate transaction. However, you can perform multiple LCR enqueues before a commit if there is more than one LCR in a transaction.
Create a row LCR that inserts a row into the oe.orders
table.
*/ DECLARE newunit1 SYS.LCR$_ROW_UNIT; newunit2 SYS.LCR$_ROW_UNIT; newunit3 SYS.LCR$_ROW_UNIT; newunit4 SYS.LCR$_ROW_UNIT; newunit5 SYS.LCR$_ROW_UNIT; newunit6 SYS.LCR$_ROW_UNIT; newunit7 SYS.LCR$_ROW_UNIT; newunit8 SYS.LCR$_ROW_UNIT; newvals SYS.LCR$_ROW_LIST; BEGIN newunit1 := SYS.LCR$_ROW_UNIT( 'ORDER_ID', SYS.AnyData.ConvertNumber(2502), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit2 := SYS.LCR$_ROW_UNIT( 'ORDER_DATE', SYS.AnyData.ConvertTimestampLTZ('04-NOV-00'), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit3 := SYS.LCR$_ROW_UNIT( 'ORDER_MODE', SYS.AnyData.ConvertVarchar2('online'), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit4 := SYS.LCR$_ROW_UNIT( 'CUSTOMER_ID', SYS.AnyData.ConvertNumber(145), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit5 := SYS.LCR$_ROW_UNIT( 'ORDER_STATUS', SYS.AnyData.ConvertNumber(3), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit6 := SYS.LCR$_ROW_UNIT( 'ORDER_TOTAL', SYS.AnyData.ConvertNumber(35199), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit7 := SYS.LCR$_ROW_UNIT( 'SALES_REP_ID', SYS.AnyData.ConvertNumber(160), DBMS_LCR.NOT_A_LOB, NULL, NULL); newunit8 := SYS.LCR$_ROW_UNIT( 'PROMOTION_ID', SYS.AnyData.ConvertNumber(1), DBMS_LCR.NOT_A_LOB, NULL, NULL); newvals := SYS.LCR$_ROW_LIST(newunit1,newunit2,newunit3,newunit4, newunit5,newunit6,newunit7,newunit8); oe.enq_row_lcr( source_dbname => 'OEDB.NET', cmd_type => 'INSERT', obj_owner => 'OE', obj_name => 'ORDERS', old_vals => NULL, new_vals => newvals); END; / COMMIT; /*
Create a row LCR that updates the row inserted into the oe.orders
table previously.
*/ DECLARE oldunit1 SYS.LCR$_ROW_UNIT; oldunit2 SYS.LCR$_ROW_UNIT; oldvals SYS.LCR$_ROW_LIST; newunit1 SYS.LCR$_ROW_UNIT; newvals SYS.LCR$_ROW_LIST; BEGIN oldunit1 := SYS.LCR$_ROW_UNIT( 'ORDER_ID', SYS.AnyData.ConvertNumber(2502), DBMS_LCR.NOT_A_LOB, NULL, NULL); oldunit2 := SYS.LCR$_ROW_UNIT( 'ORDER_TOTAL', SYS.AnyData.ConvertNumber(35199), DBMS_LCR.NOT_A_LOB, NULL, NULL); oldvals := SYS.LCR$_ROW_LIST(oldunit1,oldunit2); newunit1 := SYS.LCR$_ROW_UNIT( 'ORDER_TOTAL', SYS.AnyData.ConvertNumber(5235), DBMS_LCR.NOT_A_LOB, NULL, NULL); newvals := SYS.LCR$_ROW_LIST(newunit1); oe.enq_row_lcr( source_dbname => 'OEDB.NET', cmd_type => 'UPDATE', obj_owner => 'OE', obj_name => 'ORDERS', old_vals => oldvals, new_vals => newvals); END; / COMMIT; /*
Step 5 Check Spool Results
Check the streams_enq_deq.out
spool file to ensure that all actions completed successfully after this script completes.
*/ SET ECHO OFF SPOOL OFF /*************************** END OF SCRIPT ******************************/
Complete the following steps to dequeue the events explicitly and query the events that were applied by the apply process. These events were enqueued in the "Enqueue Events".
Step 1 Run the Procedure to Dequeue Events Explicitly
Run the procedure you created in "Create a Procedure to Dequeue Events Explicitly" and specify the consumer of the events you want to dequeue. In this case, the consumer is the subscriber you added in "Add a Subscriber to the oe_queue Queue". In this example, events that are not dequeued explicitly by this procedure are dequeued by the apply process.
CONNECT oe/oe@oedb.net SET SERVEROUTPUT ON SIZE 100000 EXEC oe.explicit_dq('explicit_dq');
You should see the non-LCR events that were enqueued in "Enqueue Non-LCR Events to be Dequeued Explicitly".
Step 2 Query for Applied Events
Query the oe.orders
and oe.customers
table to see the rows corresponding to the events applied by the apply process:
SELECT * FROM oe.orders WHERE order_id = 2500; SELECT cust_first_name, cust_last_name, cust_email FROM oe.customers WHERE customer_id = 990; SELECT * FROM oe.orders WHERE order_id = 2502;
You should see the non-LCR event that was enqueued in "Enqueue Non-LCR Events to be Dequeued by the Apply Process" and the row LCR events that were enqueued in "Enqueue Row LCR Events to be Dequeued by the Apply Process".
This example enqueues non-LCR events and row LCR events into the queue using Java Message Service (JMS). Then, this example dequeues these events from the queue using JMS.
Note: Enqueue of JMS types and XML types does not work with Oracle StreamsSys.Anydata queues unless you call DBMS_AQADM.ENABLE_JMS_TYPES( queue_table_name ) after DBMS_STREAMS_ADM.SET_UP_QUEUE() . Enabling an Oracle Streams queue for these types may affect import/export of the queue table. |
Complete the following steps:
Step 1 Run the catxlcr.sql Script
For this example to complete successfully, the LCR schema must be loaded into the SYS
schema using the catxlcr.sql
script in Oracle home in the rdbms/admin/
directory. Run this script now if it has not been run already.
Note: To run catxlcr.sql, you must either have created the database using Database Configuration Assistant or separately installed Java Virtual Machine, XDB, and XML Schema. |
For example, if your Oracle home directory is /usr/oracle
, then enter the following to run the script:
CONNECT SYS/CHANGE_ON_INSTALL AS SYSDBA @/usr/oracle/rdbms/admin/catxlcr.sql
Step 2 Create the Types for User Events
CONNECT oe/oe
CREATE TYPE address AS OBJECT (street VARCHAR (30), num NUMBER) / CREATE TYPE person AS OBJECT (name VARCHAR (30), home ADDRESS) /
Step 3 Set the CLASSPATH
The following jar and zip files should be in the CLASSPATH
based on the release of JDK you are using.
Also, make sure LD_LIBRARY_PATH
(Solaris operating system) or PATH
(Windows) has $ORACLE_HOME/lib
set.
For JDK 1.4.x, the CLASSPATH
must contain:
$ORACLE_HOME/jdbc/lib/classes12.jar $ORACLE_HOME/jdbc/lib/ojdbc14.jar $ORACLE_HOME/jlib/jndi.jar $ORACLE_HOME/rdbms/jlib/aqapi14.jar $ORACLE_HOME/rdbms/jlib/jmscommon.jar $ORACLE_HOME/rdbms/jlib/xdb.jar $ORACLE_HOME/xdk/lib/xmlparserv2.jar
For JDK 1.3.x, the CLASSPATH
must contain:
$ORACLE_HOME/jdbc/lib/classes12.jar $ORACLE_HOME/jlib/jndi.jar $ORACLE_HOME/rdbms/jlib/aqapi13.jar $ORACLE_HOME/rdbms/jlib/jmscommon.jar $ORACLE_HOME/rdbms/jlib/xdb.jar $ORACLE_HOME/xdk/lib/xmlparserv2.jar
For JDK 1.2.x, the CLASSPATH
must contain:
$ORACLE_HOME/jdbc/lib/classes12.jar $ORACLE_HOME/jlib/jndi.jar $ORACLE_HOME/rdbms/jlib/aqapi12.jar $ORACLE_HOME/rdbms/jlib/jmscommon.jar $ORACLE_HOME/rdbms/jlib/xdb.jar $ORACLE_HOME/xdk/lib/xmlparserv2.jar
Step 4 Create Java Classes that Map to the Oracle Object Types
First, create a file input.typ
with the following lines:
SQL PERSON AS JPerson SQL ADDRESS AS JAddress
Then, run Jpublisher.
jpub -input=input.typ -user=OE/OE
Completing these actions generates two Java classes named JPerson
and JAddress
for the person
and address
types, respectively.
Step 5 Create a Java Code for Enqueuing Messages
This program uses the Oracle JMS API to publish messages into a Oracle Streams topic.
This program does the following:
Publishes a non-LCR based ADT message to the topic
Publishes a JMS text message to a topic
Publish an LCR based message to the topic
import oracle.AQ.*; import oracle.jms.*; import javax.jms.*; import java.lang.*; import oracle.xdb.*; public class StreamsEnq { public static void main (String args []) throws java.sql.SQLException, ClassNotFoundException, JMSException { TopicConnectionFactory tc_fact= null; TopicConnection t_conn = null; TopicSession t_sess = null; try { if (args.length < 3 ) System.out.println("Usage:java filename [SID] [HOST] [PORT]"); else { /* Create the TopicConnectionFactory * Only the JDBC OCI driver can be used to access Oracle Streams through JMS */ tc_fact = AQjmsFactory.getTopicConnectionFactory( args[1], args[0], Integer.parseInt(args[2]), "oci8"); t_conn = tc_fact.createTopicConnection( "OE","OE"); /* Create a TopicSession */ t_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); /* Start the connection */ t_conn.start() ; /* Publish non-LCR based messages */ publishUserMessages(t_sess); /* Publish LCR based messages */ publishLcrMessages(t_sess); t_sess.close() ; t_conn.close() ; System.out.println("End of StreamsEnq Demo") ; } } catch (Exception ex) { System.out.println("Exception-1: " + ex); ex.printStackTrace(); } } /* * publishUserMessages - this method publishes an ADT message and a * JMS text message to a Oracle Streams topic */ public static void publishUserMessages(TopicSession t_sess) throws Exception { Topic topic = null; TopicPublisher t_pub = null; JPerson pers = null; JAddress addr = null; TextMessage t_msg = null; AdtMessage adt_msg = null; AQjmsAgent agent = null; AQjmsAgent[] recipList = null; try { /* Get the topic */ topic = ((AQjmsSession)t_sess).getTopic("strmadmin", "oe_queue"); /* Create a publisher */ t_pub = t_sess.createPublisher(topic); /* Agent to access oe_queue */ agent = new AQjmsAgent("explicit_enq", null); /* Create a PERSON adt message */ adt_msg = ((AQjmsSession)t_sess).createAdtMessage(); pers = new JPerson(); addr = new JAddress(); addr.setNum(new java.math.BigDecimal(500)); addr.setStreet("Oracle Pkwy"); pers.setName("Mark"); pers.setHome(addr); /* Set the payload in the message */ adt_msg.setAdtPayload(pers); ((AQjmsMessage)adt_msg).setSenderID(agent); System.out.println("Publish message 1 -type PERSON\n"); /* Create the recipient list */ recipList = new AQjmsAgent[1]; recipList[0] = new AQjmsAgent("explicit_dq", null); /* Publish the message */ ((AQjmsTopicPublisher)t_pub).publish(topic, adt_msg, recipList); t_sess.commit(); t_msg = t_sess.createTextMessage(); t_msg.setText("Test message"); t_msg.setStringProperty("color", "BLUE"); t_msg.setIntProperty("year", 1999); ((AQjmsMessage)t_msg).setSenderID(agent); System.out.println("Publish message 2 -type JMS TextMessage\n"); /* Publish the message */ ((AQjmsTopicPublisher)t_pub).publish(topic, t_msg, recipList); t_sess.commit(); } catch (JMSException jms_ex) { System.out.println("JMS Exception: " + jms_ex); if(jms_ex.getLinkedException() != null) System.out.println("Linked Exception: " + jms_ex.getLinkedException()); } } /* * publishLcrMessages - this method publishes an XML LCR message to a * Oracle Streams topic */ public static void publishLcrMessages(TopicSession t_sess) throws Exception { Topic topic = null; TopicPublisher t_pub = null; XMLType xml_lcr = null; AdtMessage adt_msg = null; AQjmsAgent agent = null; StringBuffer lcr_data = null; AQjmsAgent[] recipList = null; java.sql.Connection db_conn = null; try { /* Get the topic */ topic = ((AQjmsSession)t_sess).getTopic("strmadmin", "oe_queue"); /* Create a publisher */ t_pub = t_sess.createPublisher(topic); /* Get the JDBC connection */ db_conn = ((AQjmsSession)t_sess).getDBConnection(); /* Agent to access oe_queue */ agent = new AQjmsAgent("explicit_enq", null); /* Create a adt message */ adt_msg = ((AQjmsSession)t_sess).createAdtMessage(); /* Create the LCR representation in XML */ lcr_data = new StringBuffer(); lcr_data.append("<ROW_LCR "); lcr_data.append("xmlns='http://xmlns.oracle.com/streams/schemas/lcr' \n"); lcr_data.append("xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' \n"); lcr_data.append("xsi:schemaLocation='http://xmlns.oracle.com/streams/schemas/lcr http://xmlns.oracle.com/streams/schemas/lcr/streamslcr.xsd'"); lcr_data.append("> \n"); lcr_data.append("<source_database_name>source_dbname</source_database_name> \n"); lcr_data.append("<command_type>INSERT</command_type> \n"); lcr_data.append("<object_owner>Ram</object_owner> \n"); lcr_data.append("<object_name>Emp</object_name> \n"); lcr_data.append("<tag>0ABC</tag> \n"); lcr_data.append("<transaction_id>0.0.0</transaction_id> \n"); lcr_data.append("<scn>0</scn> \n"); lcr_data.append("<old_values> \n"); lcr_data.append("<old_value> \n"); lcr_data.append("<column_name>C01</column_name> \n"); lcr_data.append("<data><varchar2>Clob old</varchar2></data> \n"); lcr_data.append("</old_value> \n"); lcr_data.append("<old_value> \n"); lcr_data.append("<column_name>C02</column_name> \n"); lcr_data.append("<data><varchar2>A123FF</varchar2></data> \n"); lcr_data.append("</old_value> \n"); lcr_data.append("<old_value> \n"); lcr_data.append("<column_name>C03</column_name> \n"); lcr_data.append("<data> \n"); lcr_data.append("<date><value>1997-11-24</value><format>SYYYY-MM-DD</format></date> \n"); lcr_data.append("</data> \n"); lcr_data.append("</old_value> \n"); lcr_data.append("<old_value> \n"); lcr_data.append("<column_name>C04</column_name> \n"); lcr_data.append("<data> \n"); lcr_data.append("<timestamp><value>1999-05-31T13:20:00.000</value> <format>SYYYY-MM-DD'T'HH24:MI:SS.FF</format></timestamp> \n"); lcr_data.append("</data> \n"); lcr_data.append("</old_value> \n"); lcr_data.append("<old_value> \n"); lcr_data.append("<column_name>C05</column_name> \n"); lcr_data.append("<data><raw>ABCDE</raw></data> \n"); lcr_data.append("</old_value> \n"); lcr_data.append("</old_values> \n"); lcr_data.append("<new_values> \n"); lcr_data.append("<new_value> \n"); lcr_data.append("<column_name>C01</column_name> \n"); lcr_data.append("<data><varchar2>A123FF</varchar2></data> \n"); lcr_data.append("</new_value> \n"); lcr_data.append("<new_value> \n"); lcr_data.append("<column_name>C02</column_name> \n"); lcr_data.append("<data><number>35.23</number></data> \n"); lcr_data.append("</new_value> \n"); lcr_data.append("<new_value> \n"); lcr_data.append("<column_name>C03</column_name> \n"); lcr_data.append("<data><number>-100000</number></data> \n"); lcr_data.append("</new_value> \n"); lcr_data.append("<new_value> \n"); lcr_data.append("<column_name>C04</column_name> \n"); lcr_data.append("<data><varchar>Hel lo</varchar></data> \n"); lcr_data.append("</new_value> \n"); lcr_data.append("<new_value> \n"); lcr_data.append("<column_name>C05</column_name> \n"); lcr_data.append("<data><char>wor ld</char></data> \n"); lcr_data.append("</new_value> \n"); lcr_data.append("</new_values> \n"); lcr_data.append("</ROW_LCR>"); /* Create the XMLType containing the LCR */ xml_lcr = oracle.xdb.XMLType.createXML(db_conn, lcr_data.toString()); /* Set the payload in the message */ adt_msg.setAdtPayload(xml_lcr); ((AQjmsMessage)adt_msg).setSenderID(agent); System.out.println("Publish message 3 - XMLType containing LCR ROW\n"); /* Create the recipient list */ recipList = new AQjmsAgent[1]; recipList[0] = new AQjmsAgent("explicit_dq", null); /* Publish the message */ ((AQjmsTopicPublisher)t_pub).publish(topic, adt_msg, recipList); t_sess.commit(); } catch (JMSException jms_ex) { System.out.println("JMS Exception: " + jms_ex); if(jms_ex.getLinkedException() != null) System.out.println("Linked Exception: " + jms_ex.getLinkedException()); } } }
Step 6 Create a Java Code for Dequeuing Messages
This program uses Oracle JMS API to receive messages from a Oracle Streams topic.
This program does the following:
Registers mappings for person
, address
and XMLType
in JMS typemap
Receives LCR messages from a Oracle Streams topic
Receives user ADT messages from a Oracle Streams topic
import oracle.AQ.*; import oracle.jms.*; import javax.jms.*; import java.lang.*; import oracle.xdb.*; import java.sql.SQLException; public class StreamsDeq { public static void main (String args []) throws java.sql.SQLException, ClassNotFoundException, JMSException { TopicConnectionFactory tc_fact= null; TopicConnection t_conn = null; TopicSession t_sess = null; try { if (args.length < 3 ) System.out.println("Usage:java filename [SID] [HOST] [PORT]"); else { /* Create the TopicConnectionFactory * Only the JDBC OCI driver can be used to access Oracle Streams through JMS */ tc_fact = AQjmsFactory.getTopicConnectionFactory( args[1], args[0], Integer.parseInt(args[2]), "oci8"); t_conn = tc_fact.createTopicConnection( "OE","OE"); /* Create a TopicSession */ t_sess = t_conn.createTopicSession(true, Session.CLIENT_ACKNOWLEDGE); /* Start the connection */ t_conn.start() ; receiveMessages(t_sess); t_sess.close() ; t_conn.close() ; System.out.println("\nEnd of StreamsDeq Demo") ; } } catch (Exception ex) { System.out.println("Exception-1: " + ex); ex.printStackTrace(); } } /* * receiveMessages -This method receives messages from the SYS.AnyData queue */ public static void receiveMessages(TopicSession t_sess) throws Exception { Topic topic = null; JPerson pers = null; JAddress addr = null; XMLType xtype = null; TextMessage t_msg = null; AdtMessage adt_msg = null; Message jms_msg = null; TopicReceiver t_recv = null; int i = 0; java.util.Map map= null; try { /* Get the topic */ topic = ((AQjmsSession)t_sess).getTopic("strmadmin", "oe_queue"); /* Create a TopicReceiver to receive messages for consumer "jms_recv */ t_recv = ((AQjmsSession)t_sess).createTopicReceiver(topic, "jms_recv", null); map = ((AQjmsSession)t_sess).getTypeMap(); /* Register mappings for ADDRESS and PERSON in the JMS typemap */ map.put("OE.PERSON", Class.forName("JPerson")); map.put("OE.ADDRESS", Class.forName("JAddress")); /* Register mapping for XMLType in the TypeMap - required for LCRs */ map.put("SYS.XMLTYPE", Class.forName("oracle.xdb.XMLTypeFactory")); System.out.println("Receive messages ...\n"); do { try { jms_msg = (t_recv.receive(10)); i++; /* Set navigation mode to NEXT_MESSAGE */ ((AQjmsTopicReceiver)t_recv).setNavigationMode(AQjmsConstants.NAVIGATION_NEXT_MESSAGE); } catch (JMSException jms_ex2) { if((jms_ex2.getLinkedException() != null) && (jms_ex2.getLinkedException() instanceof SQLException)) { SQLException sql_ex2 =(SQLException)(jms_ex2.getLinkedException()); /* End of current transaction group * Use NEXT_TRANSACTION navigation mode */ if(sql_ex2.getErrorCode() == 25235) { ((AQjmsTopicReceiver)t_recv).setNavigationMode(AQjmsConstants.NAVIGATION_NEXT_TRANSACTION); continue; } else throw jms_ex2; } else throw jms_ex2; } if(jms_msg == null) { System.out.println("\nNo more messages"); } else { if(jms_msg instanceof AdtMessage) { adt_msg = (AdtMessage)jms_msg; System.out.println("Retrieved message " + i + ": " + adt_msg.getAdtPayload()); if(adt_msg.getAdtPayload() instanceof JPerson) { pers =(JPerson)( adt_msg.getAdtPayload()); System.out.println("PERSON: Name: " + pers.getName()); } else if(adt_msg.getAdtPayload() instanceof JAddress) { addr =(JAddress)( adt_msg.getAdtPayload()); System.out.println("ADDRESS: Street" + addr.getStreet()); } else if(adt_msg.getAdtPayload() instanceof oracle.xdb.XMLType) { xtype = (XMLType)adt_msg.getAdtPayload(); System.out.println("XMLType: Data: \n" + xtype.getStringVal()); } System.out.println("Msg id: " + adt_msg.getJMSMessageID()); System.out.println(); } else if(jms_msg instanceof TextMessage) { t_msg = (TextMessage)jms_msg; System.out.println("Retrieved message " + i + ": " + t_msg.getText()); System.out.println("Msg id: " + t_msg.getJMSMessageID()); System.out.println(); } else System.out.println("Invalid message type"); } } while (jms_msg != null); t_sess.commit(); } catch (JMSException jms_ex) { System.out.println("JMS Exception: " + jms_ex); if(jms_ex.getLinkedException() != null) System.out.println("Linked Exception: " + jms_ex.getLinkedException()); t_sess.rollback(); } catch (java.sql.SQLException sql_ex) { System.out.println("SQL Exception: " + sql_ex); sql_ex.printStackTrace(); t_sess.rollback(); } } }
Step 7 Compile the Scripts
javac StreamsEnq.java StreamsDeq.java JPerson.java JAddress.java
Step 8 Run the Enqueue Program
java StreamsEnq ORACLE_SID HOST PORT
For example, if your Oracle SID is orc182
, your host is hq_server
, and your port is 1521
, then enter the following:
java StreamsEnq orcl82 hq_server 1521
Step 9 Run the Dequeue Program
java StreamsDeq ORACLE_SID HOST PORT
For example, if your Oracle SID is orc182
, your host is hq_server
, and your port is 1520
, then enter the following:
java StreamsDeq orcl82 hq_server 1521