Oracle® Streams Advanced Queuing User's Guide and Reference Release 10.1 Part Number B10785-01 |
|
|
View PDF |
This Appendix contains the following scripts:
tkaqdoca.sql: Script to Create Users, Objects, Queue Tables, Queues, and Subscribers
tkaqdocd.sql: Examples of Administrative and Operational Interfaces
Rem $Header: tkaqdoca.sql 26-jan-99.17:50:37 aquser1 Exp $ Rem Rem tkaqdoca.sql Rem Rem Copyright (c) Oracle 1998, 1999. All Rights Reserved. Rem Rem NAME Rem tkaqdoca.sql - TKAQ DOCumentation Admin examples file Rem Set up a queue admin account and individual accounts for each application Rem connect system/manager set serveroutput on; set echo on; Rem Create a common admin account for all BooksOnLine applications Rem CREATE USER BOLADM IDENTIFIED BY BOLADM; GRANT CONNECT, RESOURCE, aq_administrator_role to BOLADM; GRANT EXECUTE ON DBMS_AQ TO BOLADM; GRANT EXECUTE ON DBMS_AQADM TO BOLADM; execute DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('ENQUEUE_ANY','BOLADM',FALSE); execute DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('DEQUEUE_ANY','BOLADM',FALSE); Rem Create the application schemas and grant appropriate permission Rem to all schemas Rem Create an account for Order Entry CREATE USER OE IDENTIFIED BY OE; GRANT CONNECT, RESOURCE to OE; GRANT EXECUTE ON DBMS_AQ TO OE; GRANT EXECUTE ON DBMS_AQADM TO OE; Rem Create an account for WR Shipping CREATE USER WS IDENTIFIED BY WS; GRANT CONNECT, RESOURCE to WS; GRANT EXECUTE ON DBMS_AQ TO WS; GRANT EXECUTE ON DBMS_AQADM TO WS; Rem Create an account for ER Shipping CREATE USER ES IDENTIFIED BY ES; GRANT CONNECT, RESOURCE to ES; GRANT EXECUTE ON DBMS_AQ TO ES; GRANT EXECUTE ON DBMS_AQADM TO ES; Rem Create an account for Overseas Shipping CREATE USER TS IDENTIFIED BY TS; GRANT CONNECT, RESOURCE to TS; GRANT EXECUTE ON DBMS_AQ TO TS; GRANT EXECUTE ON DBMS_AQADM TO TS; Rem Create an account for Customer Billing Rem Customer Billing, for security reason, has an admin schema that Rem hosts all the queue tables and an application schema from where Rem the application runs. CREATE USER CBADM IDENTIFIED BY CBADM; GRANT CONNECT, RESOURCE to CBADM; GRANT EXECUTE ON DBMS_AQ TO CBADM; GRANT EXECUTE ON DBMS_AQADM TO CBADM; CREATE USER CB IDENTIFIED BY CB; GRANT CONNECT, RESOURCE to CB; GRANT EXECUTE ON DBMS_AQ TO CB; GRANT EXECUTE ON DBMS_AQADM TO CB; Rem Create an account for Customer Service CREATE USER CS IDENTIFIED BY CS; GRANT CONNECT, RESOURCE to CS; GRANT EXECUTE ON DBMS_AQ TO CS; GRANT EXECUTE ON DBMS_AQADM TO CS; Rem All object types are created in the administrator schema. Rem All application schemas that host any propagation source Rem queues are given the ENQUEUE_ANY system level privilege Rem allowing the application schemas to enqueue to the destination Rem queue. Rem connect BOLADM/BOLADM; Rem Create objects CREATE OR REPLACE TYPE customer_typ AS object ( custno number, name varchar2(100), street varchar2(100), city varchar2(30), state varchar2(2), zip number, country varchar2(100)); / CREATE OR REPLACE TYPE book_typ AS object ( title varchar2(100), authors varchar2(100), ISBN number, price number); / CREATE OR REPLACE TYPE orderitem_typ AS object ( quantity number, item book_typ, subtotal number); / CREATE OR REPLACE TYPE orderitemlist_vartyp AS varray (20) of orderitem_typ; / CREATE OR REPLACE TYPE order_typ AS object ( orderno number, status varchar2(30), ordertype varchar2(30), orderregion varchar2(30), customer customer_typ, paymentmethod varchar2(30), items orderitemlist_vartyp, total number); / GRANT EXECUTE ON ORDER_TYP TO OE; GRANT EXECUTE ON ORDERITEMLIST_VARTYP TO OE; GRANT EXECUTE ON ORDERITEM_TYP TO OE; GRANT EXECUTE ON BOOK_TYP TO OE; GRANT EXECUTE ON CUSTOMER_TYP TO OE; execute DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('ENQUEUE_ANY','OE',FALSE); GRANT EXECUTE ON ORDER_TYP TO WS; GRANT EXECUTE ON ORDERITEMLIST_VARTYP TO WS; GRANT EXECUTE ON ORDERITEM_TYP TO WS; GRANT EXECUTE ON BOOK_TYP TO WS; GRANT EXECUTE ON CUSTOMER_TYP TO WS; execute DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('ENQUEUE_ANY','WS',FALSE); GRANT EXECUTE ON ORDER_TYP TO ES; GRANT EXECUTE ON ORDERITEMLIST_VARTYP TO ES; GRANT EXECUTE ON ORDERITEM_TYP TO ES; GRANT EXECUTE ON BOOK_TYP TO ES; GRANT EXECUTE ON CUSTOMER_TYP TO ES; execute DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('ENQUEUE_ANY','ES',FALSE); GRANT EXECUTE ON ORDER_TYP TO TS; GRANT EXECUTE ON ORDERITEMLIST_VARTYP TO TS; GRANT EXECUTE ON ORDERITEM_TYP TO TS; GRANT EXECUTE ON BOOK_TYP TO TS; GRANT EXECUTE ON CUSTOMER_TYP TO TS; execute DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('ENQUEUE_ANY','TS',FALSE); GRANT EXECUTE ON ORDER_TYP TO CBADM; GRANT EXECUTE ON ORDERITEMLIST_VARTYP TO CBADM; GRANT EXECUTE ON ORDERITEM_TYP TO CBADM; GRANT EXECUTE ON BOOK_TYP TO CBADM; GRANT EXECUTE ON CUSTOMER_TYP TO CBADM; GRANT EXECUTE ON ORDER_TYP TO CB; GRANT EXECUTE ON ORDERITEMLIST_VARTYP TO CB; GRANT EXECUTE ON ORDERITEM_TYP TO CB; GRANT EXECUTE ON BOOK_TYP TO CB; GRANT EXECUTE ON CUSTOMER_TYP TO CB; GRANT EXECUTE ON ORDER_TYP TO CS; GRANT EXECUTE ON ORDERITEMLIST_VARTYP TO CS; GRANT EXECUTE ON ORDERITEM_TYP TO CS; GRANT EXECUTE ON BOOK_TYP TO CS; GRANT EXECUTE ON CUSTOMER_TYP TO CS; Rem Create queue tables, queues for OE Rem connect OE/OE; begin DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'OE_orders_sqtab', comment => 'Order Entry Single Consumer Orders queue table', queue_payload_type => 'BOLADM.order_typ', message_grouping => DBMS_AQADM.TRANSACTIONAL, compatible => '8.1', primary_instance => 1, secondary_instance => 2); end; / Rem Create a priority queue table for OE begin DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'OE_orders_pr_mqtab', sort_list =>'priority,enq_time', comment => 'Order Entry Priority MultiConsumer Orders queue table', multiple_consumers => TRUE, queue_payload_type => 'BOLADM.order_typ', compatible => '8.1', primary_instance => 2, secondary_instance => 1); end; / begin DBMS_AQADM.CREATE_QUEUE ( queue_name => 'OE_neworders_que', queue_table => 'OE_orders_sqtab'); end; / begin DBMS_AQADM.CREATE_QUEUE ( queue_name => 'OE_bookedorders_que', queue_table => 'OE_orders_pr_mqtab'); end; / Rem Orders in OE_bookedorders_que are being propagated to WS_bookedorders_que, Rem ES_bookedorders_que and TS_bookedorders_que according to the region Rem the books are shipped to. At the time an order is placed, the customer Rem can request Fed-ex shipping (priority 1), priority air shipping (priority Rem 2) and ground shipping (priority 3). A priority queue is created in Rem each region, the shipping applications dequeue from these priority Rem queues according to the orders' shipping priorities, process the orders Rem and enqueue the processed orders into Rem the shipped_orders queues or the back_orders queues. Both the shipped_ Rem orders queues and the back_orders queues are FIFO queues. However, Rem orders put into the back_orders_queues are enqueued with delay time Rem set to 1 day, so that each order in the back_order_queues is processed Rem only once a day until the shipment is filled. Rem Create queue tables, queues for WS Shipping connect WS/WS; Rem Create a priority queue table for WS shipping begin DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'WS_orders_pr_mqtab', sort_list =>'priority,enq_time', comment => 'West Shipping Priority MultiConsumer Orders queue table', multiple_consumers => TRUE, queue_payload_type => 'BOLADM.order_typ', compatible => '8.1'); end; / Rem Create a FIFO queue tables for WS shipping begin DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'WS_orders_mqtab', comment => 'West Shipping Multi Consumer Orders queue table', multiple_consumers => TRUE, queue_payload_type => 'BOLADM.order_typ', compatible => '8.1'); end; / Rem Booked orders are stored in the priority queue table begin DBMS_AQADM.CREATE_QUEUE ( queue_name => 'WS_bookedorders_que', queue_table => 'WS_orders_pr_mqtab'); end; / Rem Shipped orders and backorders are stored in the FIFO queue table begin DBMS_AQADM.CREATE_QUEUE ( queue_name => 'WS_shippedorders_que', queue_table => 'WS_orders_mqtab'); end; / begin DBMS_AQADM.CREATE_QUEUE ( queue_name => 'WS_backorders_que', queue_table => 'WS_orders_mqtab'); end; / Rem Rem In order to test history, set retention to 1 DAY for the queues Rem in WS begin DBMS_AQADM.ALTER_QUEUE( queue_name => 'WS_bookedorders_que', retention_time => 86400); end; / begin DBMS_AQADM.ALTER_QUEUE( queue_name => 'WS_shippedorders_que', retention_time => 86400); end; / begin DBMS_AQADM.ALTER_QUEUE( queue_name => 'WS_backorders_que', retention_time => 86400); end; / Rem Create queue tables, queues for ES Shipping connect ES/ES; Rem Create a priority queue table for ES shipping begin DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'ES_orders_mqtab', comment => 'East Shipping Multi Consumer Orders queue table', multiple_consumers => TRUE, queue_payload_type => 'BOLADM.order_typ', compatible => '8.1'); end; / Rem Create a FIFO queue tables for ES shipping begin DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'ES_orders_pr_mqtab', sort_list =>'priority,enq_time', comment => 'East Shipping Priority Multi Consumer Orders queue table', multiple_consumers => TRUE, queue_payload_type => 'BOLADM.order_typ', compatible => '8.1'); end; / Rem Booked orders are stored in the priority queue table begin DBMS_AQADM.CREATE_QUEUE ( queue_name => 'ES_bookedorders_que', queue_table => 'ES_orders_pr_mqtab'); end; / Rem Shipped orders and backorders are stored in the FIFO queue table begin DBMS_AQADM.CREATE_QUEUE ( queue_name => 'ES_shippedorders_que', queue_table => 'ES_orders_mqtab'); end; / begin DBMS_AQADM.CREATE_QUEUE ( queue_name => 'ES_backorders_que', queue_table => 'ES_orders_mqtab'); end; / Rem Create queue tables, queues for Overseas Shipping connect TS/TS; Rem Create a priority queue table for TS shipping begin DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'TS_orders_pr_mqtab', sort_list =>'priority,enq_time', comment => 'Overseas Shipping Priority MultiConsumer Orders queue table', multiple_consumers => TRUE, queue_payload_type => 'BOLADM.order_typ', compatible => '8.1'); end; / Rem Create a FIFO queue tables for TS shipping begin DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'TS_orders_mqtab', comment => 'Overseas Shipping Multi Consumer Orders queue table', multiple_consumers => TRUE, queue_payload_type => 'BOLADM.order_typ', compatible => '8.1'); end; / Rem Booked orders are stored in the priority queue table begin DBMS_AQADM.CREATE_QUEUE ( queue_name => 'TS_bookedorders_que', queue_table => 'TS_orders_pr_mqtab'); end; / Rem Shipped orders and backorders are stored in the FIFO queue table begin DBMS_AQADM.CREATE_QUEUE ( queue_name => 'TS_shippedorders_que', queue_table => 'TS_orders_mqtab'); end; / begin DBMS_AQADM.CREATE_QUEUE ( queue_name => 'TS_backorders_que', queue_table => 'TS_orders_mqtab'); end; / Rem Create queue tables, queues for Customer Billing connect CBADM/CBADM; begin DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'CBADM_orders_sqtab', comment => 'Customer Billing Single Consumer Orders queue table', queue_payload_type => 'BOLADM.order_typ', compatible => '8.1'); DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'CBADM_orders_mqtab', comment => 'Customer Billing Multi Consumer Service queue table', multiple_consumers => TRUE, queue_payload_type => 'BOLADM.order_typ', compatible => '8.1'); DBMS_AQADM.CREATE_QUEUE ( queue_name => 'CBADM_shippedorders_que', queue_table => 'CBADM_orders_sqtab'); end; / Rem Grant dequeue privilege on the shopped orders queue to the Customer Billing Rem application. The CB application retrieves shipped orders (not billed yet) Rem from the shopped orders queue. execute DBMS_AQADM.GRANT_QUEUE_PRIVILEGE('DEQUEUE', 'CBADM_shippedorders_que', 'CB', FALSE); begin DBMS_AQADM.CREATE_QUEUE ( queue_name => 'CBADM_billedorders_que', queue_table => 'CBADM_orders_mqtab'); end; / Rem Grant enqueue privilege on the billed orders queue to Customer Billing Rem application. The CB application is allowed to put billed orders into Rem this queue. execute DBMS_AQADM.GRANT_QUEUE_PRIVILEGE('ENQUEUE', 'CBADM_billedorders_que', 'CB', FALSE); Rem Customer support tracks the state of the customer request in the system Rem Rem At any point, customer request can be in one of the following states Rem A. BOOKED B. SHIPPED C. BACKED D. BILLED Rem Given the order number the customer support returns the state Rem the order is in. This state is maintained in the order_status_table connect CS/CS; CREATE TABLE Order_Status_Table(customer_order boladm.order_typ, status varchar2(30)); Rem Create queue tables, queues for Customer Service begin DBMS_AQADM.CREATE_QUEUE_TABLE( queue_table => 'CS_order_status_qt', comment => 'Customer Status multi consumer queue table', multiple_consumers => TRUE, queue_payload_type => 'BOLADM.order_typ', compatible => '8.1'); DBMS_AQADM.CREATE_QUEUE ( queue_name => 'CS_bookedorders_que', queue_table => 'CS_order_status_qt'); DBMS_AQADM.CREATE_QUEUE ( queue_name => 'CS_backorders_que', queue_table => 'CS_order_status_qt'); DBMS_AQADM.CREATE_QUEUE ( queue_name => 'CS_shippedorders_que', queue_table => 'CS_order_status_qt'); DBMS_AQADM.CREATE_QUEUE ( queue_name => 'CS_billedorders_que', queue_table => 'CS_order_status_qt'); end; / Rem Create the Subscribers for OE queues Rem Add the Subscribers for the OE booked_orders queue connect OE/OE; Rem Add a rule-based subscriber for West Shipping Rem West Shipping handles Western region US orders Rem Rush Western region orders are handled by East Shipping declare subscriber aq$_agent; begin subscriber := aq$_agent('West_Shipping', 'WS.WS_bookedorders_que', null); DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'OE.OE_bookedorders_que', subscriber => subscriber, rule => 'tab.user_data.orderregion = ''WESTERN'' AND tab.user_data.ordertype != ''RUSH'''); end; / Rem Add a rule-based subscriber for East Shipping Rem East shipping handles all Eastern region orders Rem East shipping also handles all US rush orders declare subscriber aq$_agent; begin subscriber := aq$_agent('East_Shipping', 'ES.ES_bookedorders_que', null); DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'OE.OE_bookedorders_que', subscriber => subscriber, rule => 'tab.user_data.orderregion = ''EASTERN'' OR (tab.user_data.ordertype = ''RUSH'' AND tab.user_data.customer.country = ''USA'') '); end; / Rem Add a rule-based subscriber for Overseas Shipping Rem Intl Shipping handles all non-US orders declare subscriber aq$_agent; begin subscriber := aq$_agent('Overseas_Shipping', 'TS.TS_bookedorders_que', null); DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'OE.OE_bookedorders_que', subscriber => subscriber, rule => 'tab.user_data.orderregion = ''INTERNATIONAL'''); end; / Rem Add the Customer Service order queues as a subscribers to the Rem corresponding queues in OrderEntry, Shipping and Billing declare subscriber aq$_agent; begin /* Subscribe to the booked orders queue */ subscriber := aq$_agent('BOOKED_ORDER', 'CS.CS_bookedorders_que', null); DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'OE.OE_bookedorders_que', subscriber => subscriber); end; / connect WS/WS; declare subscriber aq$_agent; begin /* Subscribe to the WS backorders queue */ subscriber := aq$_agent('BACK_ORDER', 'CS.CS_backorders_que', null); DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'WS.WS_backorders_que', subscriber => subscriber); end; / declare subscriber aq$_agent; begin /* Subscribe to the WS shipped orders queue */ subscriber := aq$_agent('SHIPPED_ORDER', 'CS.CS_shippedorders_que', null); DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'WS.WS_shippedorders_que', subscriber => subscriber); end; / connect CBADM/CBADM; declare subscriber aq$_agent; begin /* Subscribe to the BILLING billed orders queue */ subscriber := aq$_agent('BILLED_ORDER', 'CS.CS_billedorders_que', null); DBMS_AQADM.ADD_SUBSCRIBER(queue_name => 'CBADM.CBADM_billedorders_que', subscriber => subscriber); end; / Rem Rem BOLADM will Start all the queues Rem connect BOLADM/BOLADM execute DBMS_AQADM.START_QUEUE(queue_name => 'OE.OE_neworders_que'); execute DBMS_AQADM.START_QUEUE(queue_name => 'OE.OE_bookedorders_que'); execute DBMS_AQADM.START_QUEUE(queue_name => 'WS.WS_bookedorders_que'); execute DBMS_AQADM.START_QUEUE(queue_name => 'WS.WS_shippedorders_que'); execute DBMS_AQADM.START_QUEUE(queue_name => 'WS.WS_backorders_que'); execute DBMS_AQADM.START_QUEUE(queue_name => 'ES.ES_bookedorders_que'); execute DBMS_AQADM.START_QUEUE(queue_name => 'ES.ES_shippedorders_que'); execute DBMS_AQADM.START_QUEUE(queue_name => 'ES.ES_backorders_que'); execute DBMS_AQADM.START_QUEUE(queue_name => 'TS.TS_bookedorders_que'); execute DBMS_AQADM.START_QUEUE(queue_name => 'TS.TS_shippedorders_que'); execute DBMS_AQADM.START_QUEUE(queue_name => 'TS.TS_backorders_que'); execute DBMS_AQADM.START_QUEUE(queue_name => 'CBADM.CBADM_shippedorders_que'); execute DBMS_AQADM.START_QUEUE(queue_name => 'CBADM.CBADM_billedorders_que'); execute DBMS_AQADM.START_QUEUE(queue_name => 'CS.CS_bookedorders_que'); execute DBMS_AQADM.START_QUEUE(queue_name => 'CS.CS_backorders_que'); execute DBMS_AQADM.START_QUEUE(queue_name => 'CS.CS_shippedorders_que'); execute DBMS_AQADM.START_QUEUE(queue_name => 'CS.CS_billedorders_que'); connect system/manager Rem Rem Start job_queue_processes to handle AQ propagation Rem alter system set job_queue_processes=4;
Rem Rem $Header: tkaqdocd.sql 26-jan-99.17:51:23 aquser1 Exp $ Rem Rem tkaqdocd.sql Rem Rem Copyright (c) Oracle 1998, 1999. All Rights Reserved. Rem Rem NAME Rem tkaqdocd.sql - <one-line expansion of the name> Rem Rem DESCRIPTION Rem <short description of component this file declares/defines> Rem Rem NOTES Rem <other useful comments, qualifications, and so on> Rem Rem Rem Rem Schedule propagation for the shipping, billing, order entry queues Rem connect OE/OE; execute DBMS_AQADM.SCHEDULE_PROPAGATION(queue_name => 'OE.OE_bookedorders_que'); connect WS/WS; execute DBMS_AQADM.SCHEDULE_PROPAGATION(queue_name => 'WS.WS_backorders_que'); execute DBMS_AQADM.SCHEDULE_PROPAGATION(queue_name => 'WS.WS_shippedorders_que'); connect CBADM/CBADM; execute DBMS_AQADM.SCHEDULE_PROPAGATION(queue_name => 'CBADM.CBADM_billedorders_que'); Rem Rem Customer service application Rem Rem This application monitors the status queue for messages and updates Rem the Order_Status table. connect CS/CS Rem Rem Dequeus messages from the 'queue' for 'consumer' CREATE OR REPLACE PROCEDURE DEQUEUE_MESSAGE( queue IN VARCHAR2, consumer IN VARCHAR2, message OUT BOLADM.order_typ) IS dopt dbms_aq.dequeue_options_t; mprop dbms_aq.message_properties_t; deq_msgid raw(16); BEGIN dopt.dequeue_mode := dbms_aq.REMOVE; dopt.navigation := dbms_aq.FIRST_MESSAGE; dopt.consumer_name := consumer; dbms_aq.dequeue( queue_name => queue, dequeue_options => dopt, message_properties => mprop, payload => message, msgid => deq_msgid); commit; END; / Rem Rem Updates the status of the order in the status table Rem CREATE OR REPLACE PROCEDURE update_status( new_status IN VARCHAR2, order_msg IN BOLADM.ORDER_TYP) IS old_status VARCHAR2(30); dummy NUMBER; BEGIN BEGIN /* query old status from the table */ SELECT st.status INTO old_status from order_status_table st where st.customer_order.orderno = order_msg.orderno; /* Status can be 'BOOKED_ORDER', 'SHIPPED_ORDER', 'BACK_ORDER' * and 'BILLED_ORDER' */ IF new_status = 'SHIPPED_ORDER' THEN IF old_status = 'BILLED_ORDER' THEN return; /* message about a previous state */ END IF; ELSIF new_status = 'BACK_ORDER' THEN IF old_status = 'SHIPPED_ORDER' OR old_status = 'BILLED_ORDER' THEN return; /* message about a previous state */ END IF; END IF; /* update the order status */ UPDATE order_status_table st SET st.customer_order = order_msg, st.status = new_status where st.customer_order.orderno = order_msg.orderno; COMMIT; EXCEPTION WHEN OTHERS THEN /* change to no data found */ /* first update for the order */ INSERT INTO order_status_table(customer_order, status) VALUES (order_msg, new_status); COMMIT; END; END; / Rem Rem Monitors the customer service queues for 'time' seconds Rem CREATE OR REPLACE PROCEDURE MONITOR_STATUS_QUEUE(time IN NUMBER) IS agent_w_message aq$_agent; agent_list dbms_aq.aq$_agent_list_t; wait_time INTEGER := 120; no_message EXCEPTION; pragma EXCEPTION_INIT(no_message, -25254); order_msg boladm.order_typ; new_status VARCHAR2(30); monitor BOOLEAN := TRUE; begin_time number; end_time number; BEGIN begin_time := dbms_utility.get_time; WHILE (monitor) LOOP BEGIN agent_list(1) := aq$_agent('BILLED_ORDER', 'CS_billedorders_que', NULL); agent_list(2) := aq$_agent('SHIPPED_ORDER', 'CS_shippedorders_que', NULL); agent_list(3) := aq$_agent('BACK_ORDER', 'CS_backorders_que', NULL); agent_list(4) := aq$_agent('Booked_ORDER', 'CS_bookedorders_que', NULL); /* wait for order status messages */ dbms_aq.listen(agent_list, wait_time, agent_w_message); dbms_output.put_line('Agent' || agent_w_message.name || ' Address '|| agent_w_message.address); /* dequeue the message from the queue */ dequeue_message(agent_w_message.address, agent_w_message.name, order_msg); /* update the status of the order depending on the type of the message * the name of the agent contains the new state */ update_status(agent_w_message.name, order_msg); /* exit if we have been working long enough */ end_time := dbms_utility.get_time; IF (end_time - begin_time > time) THEN EXIT; END IF; EXCEPTION WHEN no_message THEN dbms_output.put_line('No messages in the past 2 minutes'); end_time := dbms_utility.get_time; /* exit if we have done enough work */ IF (end_time - begin_time > time) THEN EXIT; END IF; END; END LOOP; END; / Rem Rem History queries Rem Rem Rem Average processing time for messages in western shipping: Rem Difference between the ship- time and book-time for the order Rem Rem NOTE: we assume that order ID is the correlation identifier Rem Only processed messages are considered. Connect WS/WS SELECT SUM(SO.enq_time - BO.enq_time) / count (*) AVG_PRCS_TIME FROM WS.AQ$WS_orders_pr_mqtab BO , WS.AQ$WS_orders_mqtab SO WHERE SO.msg_state = 'PROCESSED' and BO.msg_state = 'PROCESSED' AND SO.corr_id = BO.corr_id and SO.queue = 'WS_shippedorders_que'; Rem Rem Average backed up time (again only processed messages are considered Rem SELECT SUM(BACK.deq_time - BACK.enq_time)/count (*) AVG_BACK_TIME FROM WS.AQ$WS_orders_mqtab BACK WHERE BACK.msg_state = 'PROCESSED' and BACK.queue = 'WS_backorders_que';
Rem Rem $Header: tkaqdoce.sql 26-jan-99.17:51:28 aquser1 Exp $ Rem Rem tkaqdocl.sql Rem Rem Copyright (c) Oracle 1998, 1999. All Rights Reserved. Rem set echo on Rem ================================================================== Rem Demonstrate enqueuing a backorder with delay time set Rem to 1 day. This guarantees that each backorder is Rem processed only once a day until the order is filled. Rem ================================================================== Rem Create a package that enqueue with delay set to one day connect BOLADM/BOLADM create or replace procedure requeue_unfilled_order(sale_region varchar2, backorder order_typ) as back_order_queue_name varchar2(62); enqopt dbms_aq.enqueue_options_t; msgprop dbms_aq.message_properties_t; enq_msgid raw(16); begin -- Choose a backorder queue based the the region IF sale_region = 'WEST' THEN back_order_queue_name := 'WS.WS_backorders_que'; ELSIF sale_region = 'EAST' THEN back_order_queue_name := 'ES.ES_backorders_que'; ELSE back_order_queue_name := 'TS.TS_backorders_que'; END IF; -- Enqueue the order with delay time set to 1 day msgprop.delay := 60*60*24; dbms_aq.enqueue(back_order_queue_name, enqopt, msgprop, backorder, enq_msgid); end;
Rem Rem $Header: tkaqdocp.sql 26-jan-99.17:50:54 aquser1 Exp $ Rem Rem tkaqdocp.sql Rem Rem Copyright (c) Oracle 1998, 1999. All Rights Reserved. Rem Rem NAME Rem tkaqdocp.sql - <one-line expansion of the name> Rem set echo on; Rem ================================================================== Rem Illustrating Support for Real Application Clusters Rem ================================================================== Rem Login into OE account connect OE/OE; set serveroutput on; Rem check instance affinity of OE queue tables from AQ administrative view select queue_table, primary_instance, secondary_instance, owner_instance from user_queue_tables; Rem alter instance affinity of OE queue tables begin DBMS_AQADM.ALTER_QUEUE_TABLE( queue_table => 'OE.OE_orders_sqtab', primary_instance => 2, secondary_instance => 1); end; / begin DBMS_AQADM.ALTER_QUEUE_TABLE( queue_table => 'OE.OE_orders_pr_mqtab', primary_instance => 1, secondary_instance => 2); end; / Rem check instance affinity of OE queue tables from AQ administrative view select queue_table, primary_instance, secondary_instance, owner_instance from user_queue_tables; Rem ================================================================== Rem Illustrating Propagation Scheduling Rem ================================================================== Rem Login into OE account set echo on; connect OE/OE; set serveroutput on; Rem Rem Schedule Propagation from bookedorders_que to shipping Rem execute DBMS_AQADM.SCHEDULE_PROPAGATION(queue_name => 'OE.OE_bookedorders_que'); Rem Login into boladm account set echo on; connect boladm/boladm; set serveroutput on; Rem create a procedure to enqueue an order create or replace procedure order_enq(book_title in varchar2, book_qty in number, order_num in number, shipping_priority in number, cust_state in varchar2, cust_country in varchar2, cust_region in varchar2, cust_ord_typ in varchar2) as OE_enq_order_data BOLADM.order_typ; OE_enq_cust_data BOLADM.customer_typ; OE_enq_book_data BOLADM.book_typ; OE_enq_item_data BOLADM.orderitem_typ; OE_enq_item_list BOLADM.orderitemlist_vartyp; enqopt dbms_aq.enqueue_options_t; msgprop dbms_aq.message_properties_t; enq_msgid raw(16); begin msgprop.correlation := cust_ord_typ; OE_enq_cust_data := BOLADM.customer_typ(NULL, NULL, NULL, NULL, cust_state, NULL, cust_country); OE_enq_book_data := BOLADM.book_typ(book_title, NULL, NULL, NULL); OE_enq_item_data := BOLADM.orderitem_typ(book_qty, OE_enq_book_data, NULL); OE_enq_item_list := BOLADM.orderitemlist_vartyp( BOLADM.orderitem_typ(book_qty, OE_enq_book_data, NULL)); OE_enq_order_data := BOLADM.order_typ(order_num, NULL, cust_ord_typ, cust_region, OE_enq_cust_data, NULL, OE_enq_item_list, NULL); -- Put the shipping priority into message property before -- enqueuing the message msgprop.priority := shipping_priority; dbms_aq.enqueue('OE.OE_bookedorders_que', enqopt, msgprop, OE_enq_order_data, enq_msgid); end; / show errors; GRANT EXECUTE ON ORDER_ENQ TO OE; Rem now create a procedure to dequeue booked orders for shipment processing create or replace procedure shipping_bookedorder_deq( consumer in varchar2, deqmode in binary_integer) as deq_cust_data BOLADM.customer_typ; deq_book_data BOLADM.book_typ; deq_item_data BOLADM.orderitem_typ; deq_msgid RAW(16); dopt dbms_aq.dequeue_options_t; mprop dbms_aq.message_properties_t; deq_order_data BOLADM.order_typ; qname varchar2(30); no_messages exception; pragma exception_init (no_messages, -25228); new_orders BOOLEAN := TRUE; begin dopt.consumer_name := consumer; dopt.wait := DBMS_AQ.NO_WAIT; dopt.dequeue_mode := deqmode; dopt.navigation := dbms_aq.FIRST_MESSAGE; IF (consumer = 'West_Shipping') THEN qname := 'WS.WS_bookedorders_que'; ELSIF (consumer = 'East_Shipping') THEN qname := 'ES.ES_bookedorders_que'; ELSE qname := 'TS.TS_bookedorders_que'; END IF; WHILE (new_orders) LOOP BEGIN dbms_aq.dequeue( queue_name => qname, dequeue_options => dopt, message_properties => mprop, payload => deq_order_data, msgid => deq_msgid); deq_item_data := deq_order_data.items(1); deq_book_data := deq_item_data.item; deq_cust_data := deq_order_data.customer; dbms_output.put_line(' **** next booked order **** '); dbms_output.put_line('order_num: ' || deq_order_data.orderno || ' book_title: ' || deq_book_data.title || ' quantity: ' || deq_item_data.quantity); dbms_output.put_line('ship_state: ' || deq_cust_data.state || ' ship_country: ' || deq_cust_data.country || ' ship_order_type: ' || deq_order_data.ordertype); dopt.navigation := dbms_aq.NEXT_MESSAGE; EXCEPTION WHEN no_messages THEN dbms_output.put_line (' ---- NO MORE BOOKED ORDERS ---- '); new_orders := FALSE; END; END LOOP; end; / show errors; Rem now create a procedure to dequeue rush orders for shipment create or replace procedure get_rushtitles(consumer in varchar2) as deq_cust_data BOLADM.customer_typ; deq_book_data BOLADM.book_typ; deq_item_data BOLADM.orderitem_typ; deq_msgid RAW(16); dopt dbms_aq.dequeue_options_t; mprop dbms_aq.message_properties_t; deq_order_data BOLADM.order_typ; qname varchar2(30); no_messages exception; pragma exception_init (no_messages, -25228); new_orders BOOLEAN := TRUE; begin dopt.consumer_name := consumer; dopt.wait := 1; dopt.correlation := 'RUSH'; IF (consumer = 'West_Shipping') THEN qname := 'WS.WS_bookedorders_que'; ELSIF (consumer = 'East_Shipping') THEN qname := 'ES.ES_bookedorders_que'; ELSE qname := 'TS.TS_bookedorders_que'; END IF; WHILE (new_orders) LOOP BEGIN dbms_aq.dequeue( queue_name => qname, dequeue_options => dopt, message_properties => mprop, payload => deq_order_data, msgid => deq_msgid); deq_item_data := deq_order_data.items(1); deq_book_data := deq_item_data.item; dbms_output.put_line(' rushorder book_title: ' || deq_book_data.title || ' quantity: ' || deq_item_data.quantity); EXCEPTION WHEN no_messages THEN dbms_output.put_line (' ---- NO MORE RUSH TITLES ---- '); new_orders := FALSE; END; END LOOP; end; / show errors; Rem now create a procedure to dequeue orders for handling North American Rem orders create or replace procedure get_northamerican_orders as deq_cust_data BOLADM.customer_typ; deq_book_data BOLADM.book_typ; deq_item_data BOLADM.orderitem_typ; deq_msgid RAW(16); dopt dbms_aq.dequeue_options_t; mprop dbms_aq.message_properties_t; deq_order_data BOLADM.order_typ; deq_order_nodata BOLADM.order_typ; qname varchar2(30); no_messages exception; pragma exception_init (no_messages, -25228); new_orders BOOLEAN := TRUE; begin dopt.consumer_name := 'Overseas_Shipping'; dopt.wait := DBMS_AQ.NO_WAIT; dopt.navigation := dbms_aq.FIRST_MESSAGE; dopt.dequeue_mode := DBMS_AQ.LOCKED; qname := 'TS.TS_bookedorders_que'; WHILE (new_orders) LOOP BEGIN dbms_aq.dequeue( queue_name => qname, dequeue_options => dopt, message_properties => mprop, payload => deq_order_data, msgid => deq_msgid); deq_item_data := deq_order_data.items(1); deq_book_data := deq_item_data.item; deq_cust_data := deq_order_data.customer; IF (deq_cust_data.country = 'Canada' OR deq_cust_data.country = 'Mexico' ) THEN dopt.dequeue_mode := dbms_aq.REMOVE_NODATA; dopt.msgid := deq_msgid; dbms_aq.dequeue( queue_name => qname, dequeue_options => dopt, message_properties => mprop, payload => deq_order_nodata, msgid => deq_msgid); dbms_output.put_line(' **** next booked order **** '); dbms_output.put_line('order_no: ' || deq_order_data.orderno || ' book_title: ' || deq_book_data.title || ' quantity: ' || deq_item_data.quantity); dbms_output.put_line('ship_state: ' || deq_cust_data.state || ' ship_country: ' || deq_cust_data.country || ' ship_order_type: ' || deq_order_data.ordertype); END IF; commit; dopt.dequeue_mode := DBMS_AQ.LOCKED; dopt.msgid := NULL; dopt.navigation := dbms_aq.NEXT_MESSAGE; EXCEPTION WHEN no_messages THEN dbms_output.put_line (' ---- NO MORE BOOKED ORDERS ---- '); new_orders := FALSE; END; END LOOP; end; / show errors; GRANT EXECUTE ON SHIPPING_BOOKEDORDER_DEQ TO WS; GRANT EXECUTE ON SHIPPING_BOOKEDORDER_DEQ TO ES; GRANT EXECUTE ON SHIPPING_BOOKEDORDER_DEQ TO TS; GRANT EXECUTE ON SHIPPING_BOOKEDORDER_DEQ TO CS; GRANT EXECUTE ON GET_RUSHTITLES TO ES; GRANT EXECUTE ON GET_NORTHAMERICAN_ORDERS TO TS; Rem Login into OE account connect OE/OE; set serveroutput on; Rem Rem Enqueue some orders into OE_bookedorders_que Rem execute BOLADM.order_enq('My First Book', 1, 1001, 3,'CA', 'USA', 'WESTERN', 'NORMAL'); execute BOLADM.order_enq('My Second Book', 2, 1002, 3,'NY', 'USA', 'EASTERN', 'NORMAL'); execute BOLADM.order_enq('My Third Book', 3, 1003, 3, '', 'Canada', 'INTERNATIONAL', 'NORMAL'); execute BOLADM.order_enq('My Fourth Book', 4, 1004, 2, 'NV', 'USA', 'WESTERN', 'RUSH'); execute BOLADM.order_enq('My Fifth Book', 5, 1005, 2, 'MA', 'USA', 'EASTERN', 'RUSH'); execute BOLADM.order_enq('My Sixth Book', 6, 1006, 3,'' , 'UK', 'INTERNATIONAL', 'NORMAL'); execute BOLADM.order_enq('My Seventh Book', 7, 1007, 1,'', 'Canada', 'INTERNATIONAL', 'RUSH'); execute BOLADM.order_enq('My Eighth Book', 8, 1008, 3,'', 'Mexico', 'INTERNATIONAL', 'NORMAL'); execute BOLADM.order_enq('My Ninth Book', 9, 1009, 1, 'CA', 'USA', 'WESTERN', 'RUSH'); execute BOLADM.order_enq('My Tenth Book', 8, 1010, 3, '' , 'UK', 'INTERNATIONAL', 'NORMAL'); execute BOLADM.order_enq('My Last Book', 7, 1011, 3, '' , 'Mexico', 'INTERNATIONAL', 'NORMAL'); commit; / Rem Rem Wait for Propagation to Complete Rem execute dbms_lock.sleep(100); Rem ================================================================== Rem Illustrating Dequeue Modes/Methods Rem ================================================================== connect WS/WS; set serveroutput on; Rem Dequeue all booked orders for West_Shipping execute BOLADM.shipping_bookedorder_deq('West_Shipping', DBMS_AQ.REMOVE); commit; / connect ES/ES; set serveroutput on; Rem Browse all booked orders for East_Shipping execute BOLADM.shipping_bookedorder_deq('East_Shipping', DBMS_AQ.BROWSE); Rem Dequeue all rush order titles for East_Shipping execute BOLADM.get_rushtitles('East_Shipping'); commit; / Rem Dequeue all remaining booked orders (normal order) for East_Shipping execute BOLADM.shipping_bookedorder_deq('East_Shipping', DBMS_AQ.REMOVE); commit; / connect TS/TS; set serveroutput on; Rem Dequeue all international North American orders for Overseas_Shipping execute BOLADM.get_northamerican_orders; commit; / Rem Dequeue rest of the booked orders for Overseas_Shipping execute BOLADM.shipping_bookedorder_deq('Overseas_Shipping', DBMS_AQ.REMOVE); commit; / Rem ================================================================== Rem Illustrating Enhanced Propagation Capabilities Rem ================================================================== connect OE/OE; set serveroutput on; Rem Rem Get propagation schedule information & statistics Rem Rem get averages select avg_time, avg_number, avg_size from user_queue_schedules; Rem get totals select total_time, total_number, total_bytes from user_queue_schedules; Rem get status information of schedule (present only when active) select process_name, session_id, instance, schedule_disabled from user_queue_schedules; Rem get information about last and next execution select last_run_date, last_run_time, next_run_date, next_run_time from user_queue_schedules; Rem get last error information if any select failures, last_error_msg, last_error_date, last_error_time from user_queue_schedules; Rem disable propagation schedule for booked orders execute DBMS_AQADM.DISABLE_PROPAGATION _SCHEDULE(queue_name => 'OE_bookedorders_que'); execute dbms_lock.sleep(30); select schedule_disabled from user_queue_schedules; Rem alter propagation schedule for booked orders to execute every Rem 15 mins (900 seconds) for a window duration of 300 seconds begin DBMS_AQADM.ALTER_PROPAGATION _SCHEDULE( queue_name => 'OE_bookedorders_que', duration => 300, next_time => 'SYSDATE + 900/86400', latency => 25); end; / execute dbms_lock.sleep(30); select next_time, latency, propagation_window from user_queue_schedules; Rem enable propagation schedule for booked orders execute DBMS_AQADM.ENABLE_PROPAGATION _SCHEDULE(queue_name => 'OE_bookedorders_que'); execute dbms_lock.sleep(30); select schedule_disabled from user_queue_schedules; Rem unschedule propagation for booked orders execute DBMS_AQADM.UNSCHEDULE_PROPAGATION(queue_name => 'OE.OE_bookedorders_que'); set echo on; Rem ================================================================== Rem Illustrating Message Grouping Rem ================================================================== Rem Login into boladm account set echo on; connect boladm/boladm; set serveroutput on; Rem now create a procedure to handle order entry create or replace procedure new_order_enq(book_title in varchar2, book_qty in number, order_num in number, cust_state in varchar2) as OE_enq_order_data BOLADM.order_typ; OE_enq_cust_data BOLADM.customer_typ; OE_enq_book_data BOLADM.book_typ; OE_enq_item_data BOLADM.orderitem_typ; OE_enq_item_list BOLADM.orderitemlist_vartyp; enqopt dbms_aq.enqueue_options_t; msgprop dbms_aq.message_properties_t; enq_msgid raw(16); begin OE_enq_cust_data := BOLADM.customer_typ(NULL, NULL, NULL, NULL, cust_state, NULL, NULL); OE_enq_book_data := BOLADM.book_typ(book_title, NULL, NULL, NULL); OE_enq_item_data := BOLADM.orderitem_typ(book_qty, OE_enq_book_data, NULL); OE_enq_item_list := BOLADM.orderitemlist_vartyp( BOLADM.orderitem_typ(book_qty, OE_enq_book_data, NULL)); OE_enq_order_data := BOLADM.order_typ(order_num, NULL, NULL, NULL, OE_enq_cust_data, NULL, OE_enq_item_list, NULL); dbms_aq.enqueue('OE.OE_neworders_que', enqopt, msgprop, OE_enq_order_data, enq_msgid); end; / show errors; Rem now create a procedure to handle order enqueue create or replace procedure same_order_enq(book_title in varchar2, book_qty in number) as OE_enq_order_data BOLADM.order_typ; OE_enq_book_data BOLADM.book_typ; OE_enq_item_data BOLADM.orderitem_typ; OE_enq_item_list BOLADM.orderitemlist_vartyp; enqopt dbms_aq.enqueue_options_t; msgprop dbms_aq.message_properties_t; enq_msgid raw(16); begin OE_enq_book_data := BOLADM.book_typ(book_title, NULL, NULL, NULL); OE_enq_item_data := BOLADM.orderitem_typ(book_qty, OE_enq_book_data, NULL); OE_enq_item_list := BOLADM.orderitemlist_vartyp( BOLADM.orderitem_typ(book_qty, OE_enq_book_data, NULL)); OE_enq_order_data := BOLADM.order_typ(NULL, NULL, NULL, NULL, NULL, NULL, OE_enq_item_list, NULL); dbms_aq.enqueue('OE.OE_neworders_que', enqopt, msgprop, OE_enq_order_data, enq_msgid); end; / show errors; GRANT EXECUTE ON NEW_ORDER_ENQ TO OE; GRANT EXECUTE ON SAME_ORDER_ENQ TO OE; Rem now create a procedure to get new orders by dequeuing create or replace procedure get_new_orders as deq_cust_data BOLADM.customer_typ; deq_book_data BOLADM.book_typ; deq_item_data BOLADM.orderitem_typ; deq_msgid RAW(16); dopt dbms_aq.dequeue_options_t; mprop dbms_aq.message_properties_t; deq_order_data BOLADM.order_typ; qname varchar2(30); no_messages exception; end_of_group exception; pragma exception_init (no_messages, -25228); pragma exception_init (end_of_group, -25235); new_orders BOOLEAN := TRUE; begin dopt.wait := 1; dopt.navigation := DBMS_AQ.FIRST_MESSAGE; qname := 'OE.OE_neworders_que'; WHILE (new_orders) LOOP BEGIN LOOP BEGIN dbms_aq.dequeue( queue_name => qname, dequeue_options => dopt, message_properties => mprop, payload => deq_order_data, msgid => deq_msgid); deq_item_data := deq_order_data.items(1); deq_book_data := deq_item_data.item; deq_cust_data := deq_order_data.customer; IF (deq_cust_data IS NOT NULL) THEN dbms_output.put_line(' **** NEXT ORDER **** '); dbms_output.put_line('order_num: ' || deq_order_data.orderno); dbms_output.put_line('ship_state: ' || deq_cust_data.state); END IF; dbms_output.put_line(' ---- next book ---- '); dbms_output.put_line(' book_title: ' || deq_book_data.title || ' quantity: ' || deq_item_data.quantity); EXCEPTION WHEN end_of_group THEN dbms_output.put_line ('*** END OF ORDER ***'); commit; dopt.navigation := DBMS_AQ.NEXT_TRANSACTION; END; END LOOP; EXCEPTION WHEN no_messages THEN dbms_output.put_line (' ---- NO MORE NEW ORDERS ---- '); new_orders := FALSE; END; END LOOP; end; / show errors; GRANT EXECUTE ON GET_NEW_ORDERS TO OE; Rem Login into OE account connect OE/OE; set serveroutput on; Rem Rem Enqueue some orders using message grouping into OE_neworders_que Rem Rem First Order execute BOLADM.new_order_enq('My First Book', 1, 1001, 'CA'); execute BOLADM.same_order_enq('My Second Book', 2); commit; / Rem Second Order execute BOLADM.new_order_enq('My Third Book', 1, 1002, 'WA'); commit; / Rem Third Order execute BOLADM.new_order_enq('My Fourth Book', 1, 1003, 'NV'); execute BOLADM.same_order_enq('My Fifth Book', 3); execute BOLADM.same_order_enq('My Sixth Book', 2); commit; / Rem Fourth Order execute BOLADM.new_order_enq('My Seventh Book', 1, 1004, 'MA'); execute BOLADM.same_order_enq('My Eighth Book', 3); execute BOLADM.same_order_enq('My Ninth Book', 2); commit; / Rem Rem Dequeue the neworders Rem execute BOLADM.get_new_orders;
Rem Rem $Header: tkaqdocc.sql 26-jan-99.17:51:05 aquser1 Exp $ Rem Rem tkaqdocc.sql Rem Rem Copyright (c) Oracle 1998, 1999. All Rights Reserved. Rem Rem NAME Rem tkaqdocc.sql - <one-line expansion of the name> Rem set echo on; connect system/manager set serveroutput on; drop user WS cascade; drop user ES cascade; drop user TS cascade; drop user CB cascade; drop user CBADM cascade; drop user CS cascade; drop user OE cascade; drop user boladm cascade;