Skip Headers
Oracle® Streams Concepts and Administration
11g Release 1 (11.1)

Part Number B28321-01
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Index
Index
Go to Master Index
Master Index
Go to Feedback page
Contact Us

Go to previous page
Previous
Go to next page
Next
View PDF

14 Configuring Oracle Streams Messaging Environments

Oracle Streams enables messaging with queues, propagations, and enqueue and dequeue capabilities. The queues can be of ANYDATA queues or typed queues. ANYDATA queues stage user messages whose payloads are of ANYDATA type, and an ANYDATA payload can be a wrapper for payloads of different data types. Typed queues can stage messages of a specific type.

The following topics describe configuring Oracle Streams messaging environments:

Each task described in this chapter should be completed by an Oracle Streams administrator that has been granted the appropriate privileges, unless specified otherwise. The examples in this chapter assume that you have configured an Oracle Streams administrator at each database.

See Also:

Wrapping User Message Payloads in an ANYDATA Wrapper and Enqueuing Them

You can wrap almost any type of payload in an ANYDATA payload. The following sections provide examples of enqueuing messages into, and dequeuing messages from, an ANYDATA queue.

The following steps illustrate how to wrap payloads of various types in an ANYDATA payload.

  1. Connect as an administrative user who can create users, grant privileges, create tablespaces, and alter users at the dbs1.net database.

  2. Grant EXECUTE privilege on the DBMS_AQ package to the oe user so that this user can run the ENQUEUE and DEQUEUE procedures in that package:

    GRANT EXECUTE ON DBMS_AQ TO oe;
    
  3. Connect as the Oracle Streams administrator, as in the following example:

    CONNECT strmadmin/user-password@dbs1.net
    
  4. Create an ANYDATA queue if one does not already exist.

    BEGIN
      DBMS_STREAMS_ADM.SET_UP_QUEUE(
        queue_table  => 'oe_q_table_any',
        queue_name   => 'oe_q_any',
        queue_user   => 'oe');
    END;
    /
    

    The oe user is configured automatically as a secure queue user of the oe_q_any queue and is given ENQUEUE and DEQUEUE privileges on the queue. In addition, an Oracle Streams AQ agent named oe is configured and is associated with the oe user. However, a message cannot be enqueued unless a subscriber who can dequeue the message is configured.

  5. Add a subscriber for oe_q_any queue. This subscriber will perform explicit dequeues of messages.

    DECLARE
      subscriber SYS.AQ$_AGENT;
    BEGIN
      subscriber :=  SYS.AQ$_AGENT('OE', NULL, NULL);  
      SYS.DBMS_AQADM.ADD_SUBSCRIBER(
        queue_name  =>  'strmadmin.oe_q_any',
        subscriber  =>  subscriber);
    END;
    /
    
  6. Connect as the oe user.

    CONNECT oe/user-password@dbs1.net
    
  7. Create a procedure that takes as an input parameter an object of ANYDATA type and enqueues a message containing the payload into an existing ANYDATA queue.

    CREATE OR REPLACE PROCEDURE oe.enq_proc (payload ANYDATA) 
    IS
      enqopt     DBMS_AQ.ENQUEUE_OPTIONS_T;
      mprop      DBMS_AQ.MESSAGE_PROPERTIES_T;
      enq_msgid  RAW(16);
    BEGIN
      mprop.SENDER_ID := SYS.AQ$_AGENT('OE', NULL, NULL); 
      DBMS_AQ.ENQUEUE(
        queue_name          =>  'strmadmin.oe_q_any',
        enqueue_options     =>  enqopt,
        message_properties  =>  mprop,
        payload             =>  payload,
        msgid               =>  enq_msgid);
    END;
    /
    
  8. Run the procedure you created in Step 7 by specifying the appropriate Convertdata_type function. The following commands enqueue messages of various types.

    VARCHAR2 type:

    EXEC oe.enq_proc(ANYDATA.ConvertVarchar2('Chemicals - SW'));
    COMMIT;
    

    NUMBER type:

    EXEC oe.enq_proc(ANYDATA.ConvertNumber('16'));
    COMMIT;
    

    User-defined type:

    BEGIN
      oe.enq_proc(ANYDATA.ConvertObject(oe.cust_address_typ(
        '1646 Brazil Blvd','361168','Chennai','Tam', 'IN')));
    END;
    /
    COMMIT;
    

See Also:

"Viewing the Contents of Messages in a Persistent Queue" for information about viewing the contents of these enqueued messages

Dequeuing a Payload that Is Wrapped in an ANYDATA Payload

The following steps illustrate how to dequeue a payload wrapped in an ANYDATA payload. This example assumes that you have completed the steps in "Wrapping User Message Payloads in an ANYDATA Wrapper and Enqueuing Them".

To dequeue messages, you must know the consumer of the messages. To find the consumer for the messages in a queue, connect as the owner of the queue and query the AQ$queue_table_name, where queue_table_name is the name of the queue table. For example, to find the consumers of the messages in the oe_q_any queue, run the following query:

CONNECT strmadmin/user-password@dbs1.net

SELECT MSG_ID, MSG_STATE, CONSUMER_NAME FROM AQ$OE_Q_TABLE_ANY;
  1. Connect as the oe user:

    CONNECT oe/user-password@dbs1.net
    
  2. Create a procedure that takes as an input the consumer of the messages you want to dequeue. The following example procedure dequeues messages of oe.cust_address_typ and prints the contents of the messages.

    CREATE OR REPLACE PROCEDURE oe.get_cust_address (
    consumer IN VARCHAR2) AS
      address         OE.CUST_ADDRESS_TYP;
      deq_address     ANYDATA; 
      msgid           RAW(16); 
      deqopt          DBMS_AQ.DEQUEUE_OPTIONS_T; 
      mprop           DBMS_AQ.MESSAGE_PROPERTIES_T;
      new_addresses   BOOLEAN := TRUE;
      next_trans      EXCEPTION;
      no_messages     EXCEPTION; 
      pragma exception_init (next_trans, -25235);
      pragma exception_init (no_messages, -25228);
      num_var         pls_integer;
    BEGIN
         deqopt.consumer_name := consumer;
         deqopt.wait := 1;
         WHILE (new_addresses) LOOP
         BEGIN
          DBMS_AQ.DEQUEUE( 
             queue_name          =>  'strmadmin.oe_q_any',
             dequeue_options     =>  deqopt,
             message_properties  =>  mprop,
             payload             =>  deq_address,
             msgid               =>  msgid);
          deqopt.navigation := DBMS_AQ.NEXT;
          DBMS_OUTPUT.PUT_LINE('****');
          IF (deq_address.GetTypeName() = 'OE.CUST_ADDRESS_TYP') THEN
              DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' ||  
                                    deq_address.GetTypeName()); 
              num_var := deq_address.GetObject(address);    
              DBMS_OUTPUT.PUT_LINE(' **** CUSTOMER ADDRESS **** ');
              DBMS_OUTPUT.PUT_LINE(address.street_address);
              DBMS_OUTPUT.PUT_LINE(address.postal_code);
              DBMS_OUTPUT.PUT_LINE(address.city);
              DBMS_OUTPUT.PUT_LINE(address.state_province);
              DBMS_OUTPUT.PUT_LINE(address.country_id);
          ELSE
             DBMS_OUTPUT.PUT_LINE('Message TYPE is: ' ||    
                                   deq_address.GetTypeName()); 
          END IF;  
        COMMIT;   
        EXCEPTION
          WHEN next_trans THEN
          deqopt.navigation := DBMS_AQ.NEXT_TRANSACTION;
          WHEN no_messages THEN
            new_addresses := FALSE;
            DBMS_OUTPUT.PUT_LINE('No more messages');
         END;
      END LOOP; 
    END;
    /
    
  3. Run the procedure you created in Step 1 and specify the consumer of the messages you want to dequeue, as in the following example:

    SET SERVEROUTPUT ON SIZE 100000
    EXEC oe.get_cust_address('OE');
    

Configuring a Messaging Client and Message Notification

This section contains instructions for configuring the following elements in a database:

You can query the DBA_STREAMS_MESSAGE_CONSUMERS data dictionary view for information about existing messaging clients and notifications.

Complete the following steps to configure a messaging client and message notification:

  1. Connect as an administrative user who can grant privileges and execute subprograms in supplied packages.

  2. Set the host name used to send the e-mail, the mail port, and the e-mail account that sends e-mail messages for e-mail notifications using the DBMS_AQELM package. The following example sets the mail host name to smtp.fictional_company.com, the mail port to 25, and the e-mail account to Mary.Smith@fictional_company.com:

    BEGIN
      DBMS_AQELM.SET_MAILHOST('smtp.fictional_company.com') ;
      DBMS_AQELM.SET_MAILPORT(25) ;
      DBMS_AQELM.SET_SENDFROM('Mary.Smith@fictional_company.com');
    END;
    /
    

    If you are not sure about the correct mail host, mail port, and send from e-mail address, then ask the system administrator for the computer system that runs the database.

  3. Grant the necessary privileges to the users who will create the messaging client, enqueue and dequeue messages, and specify message notifications. In this example, the oe user performs all of these tasks.

    GRANT EXECUTE ON DBMS_AQ TO oe;
    GRANT EXECUTE ON DBMS_STREAMS_ADM TO oe;
    GRANT EXECUTE ON DBMS_STREAMS_MESSAGING TO oe;
    
    BEGIN 
      DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
        privilege    => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ, 
        grantee      => 'oe', 
        grant_option => FALSE);
    END;
    /
    
    BEGIN 
      DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
        privilege    => DBMS_RULE_ADM.CREATE_RULE_OBJ, 
        grantee      => 'oe', 
        grant_option => FALSE);
    END;
    /
    
    BEGIN 
      DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE(
        privilege    => DBMS_RULE_ADM.CREATE_EVALUATION_CONTEXT_OBJ, 
        grantee      => 'oe', 
        grant_option => FALSE);
    END;
    /
    
  4. Connect as the oe user:

    CONNECT oe/user-password
    
  5. Create an ANYDATA queue using SET_UP_QUEUE, as in the following example:

    BEGIN
      DBMS_STREAMS_ADM.SET_UP_QUEUE(
        queue_table  => 'oe.notification_queue_table',
        queue_name   => 'oe.notification_queue');
    END;
    /
    
  6. Create the types for the user messages, as in the following example:

    CREATE TYPE oe.user_msg AS OBJECT(
      object_name    VARCHAR2(30),
      object_owner   VARCHAR2(30),
      message        VARCHAR2(50));
    /
    
  7. Create a trigger that enqueues a message into the queue whenever an order is inserted into the oe.orders table, as in the following example:

    CREATE OR REPLACE TRIGGER oe.order_insert AFTER INSERT
    ON oe.orders FOR EACH ROW
    DECLARE
      msg            oe.user_msg;
      str            VARCHAR2(2000);
    BEGIN
      str := 'New Order - ' || :NEW.ORDER_ID || ' Order ID';
      msg  := oe.user_msg(
                 object_name   => 'ORDERS',
                 object_owner  => 'OE',
                 message       => str);
      DBMS_STREAMS_MESSAGING.ENQUEUE (
        queue_name   => 'oe.notification_queue',
        payload      => ANYDATA.CONVERTOBJECT(msg));
    END;
    /
    
  8. Create the messaging client that will dequeue messages from the queue and the rule used by the messaging client to determine which messages to dequeue, as in the following example:

    BEGIN
      DBMS_STREAMS_ADM.ADD_MESSAGE_RULE (
        message_type   => 'oe.user_msg',
        rule_condition => ' :msg.OBJECT_OWNER = ''OE'' AND  ' ||
                          ' :msg.OBJECT_NAME = ''ORDERS'' ',
        streams_type   => 'dequeue',
        streams_name   => 'oe',
        queue_name     => 'oe.notification_queue');
    END;
    /
    
  9. Set the message notification to send e-mail upon enqueue of messages that can be dequeued by the messaging client, as in the following example:

    BEGIN
      DBMS_STREAMS_ADM.SET_MESSAGE_NOTIFICATION (
        streams_name         => 'oe',
        notification_action  => 'Mary.Smith@fictional_company.com',
        notification_type    => 'MAIL',
        include_notification => TRUE,
        queue_name           => 'oe.notification_queue');
    END;
    /
    
  10. Create a PL/SQL procedure that dequeues messages using the messaging client, as in the following example:

    CREATE OR REPLACE PROCEDURE oe.deq_notification(consumer IN VARCHAR2) AS
      msg            ANYDATA;
      user_msg       oe.user_msg;
      num_var        PLS_INTEGER;
      more_messages  BOOLEAN := TRUE;
      navigation     VARCHAR2(30);
    BEGIN
      navigation := 'FIRST MESSAGE';
      WHILE (more_messages) LOOP
        BEGIN
          DBMS_STREAMS_MESSAGING.DEQUEUE(
            queue_name   => 'oe.notification_queue',
            streams_name => consumer,
            payload      => msg,
            navigation   => navigation,
            wait         => DBMS_STREAMS_MESSAGING.NO_WAIT);
          IF msg.GETTYPENAME() = 'OE.USER_MSG' THEN
            num_var := msg.GETOBJECT(user_msg);
            DBMS_OUTPUT.PUT_LINE(user_msg.object_name);
            DBMS_OUTPUT.PUT_LINE(user_msg.object_owner);
            DBMS_OUTPUT.PUT_LINE(user_msg.message);
          END IF;
          navigation := 'NEXT MESSAGE';
          COMMIT;
        EXCEPTION WHEN SYS.DBMS_STREAMS_MESSAGING.ENDOFCURTRANS THEN
                    navigation := 'NEXT TRANSACTION';
                  WHEN DBMS_STREAMS_MESSAGING.NOMOREMSGS THEN
                    more_messages := FALSE;
                    DBMS_OUTPUT.PUT_LINE('No more messages.');
                  WHEN OTHERS THEN
                    RAISE;  
        END;
      END LOOP;
    END;
    /
    
  11. Insert rows into the oe.orders table, as in the following example:

    INSERT INTO oe.orders VALUES(2521, 'direct', 144, 0, 922.57, 159, NULL);
    INSERT INTO oe.orders VALUES(2522, 'direct', 116, 0, 1608.29, 153, NULL);
    COMMIT;
    INSERT INTO oe.orders VALUES(2523, 'direct', 116, 0, 227.55, 155, NULL);
    COMMIT;
    

Message notification sends a message to the e-mail address specified in Step 9 for each message that was enqueued. Each notification is an AQXmlNotification, which includes of the following:

The following example shows the AQXmlNotification format sent in an e-mail notification:

<?xml version="1.0" encoding="UTF-8"?>
<Envelope xmlns="http://ns.oracle.com/AQ/schemas/envelope">
    <Body>
        <AQXmlNotification xmlns="http://ns.oracle.com/AQ/schemas/access">
            <notification_options>
                <destination>OE.NOTIFICATION_QUEUE</destination>
                <consumer_name>OE</consumer_name>
            </notification_options>
            <message_set>
                <message>
                    <message_header>
                        <message_id>CB510DDB19454731E034080020AE3E0A</message_id>
                        <expiration>-1</expiration>
                        <delay>0</delay>
                        <priority>1</priority>
                        <delivery_count>0</delivery_count>
                        <sender_id>
                            <agent_name>OE</agent_name>
                            <protocol>0</protocol>
                        </sender_id>
                        <message_state>0</message_state>
                    </message_header>
                </message>
            </message_set>
        </AQXmlNotification>
    </Body>
</Envelope>

You can dequeue the messages enqueued in this example by running the oe.deq_notification procedure:

SET SERVEROUTPUT ON SIZE 100000
EXEC oe.deq_notification('OE');

Note:

The DBMS_AQ package can also configure notifications. The DBMS_AQ package provides some notification features that are not available in DBMS_STREAMS_ADM package, such as buffered message notifications and notification grouping by time.

See Also: