Skip Headers
Oracle® Streams Advanced Queuing User's Guide and Reference
10g Release 2 (10.2)

Part Number B14257-01
Go to Documentation Home
Home
Go to Book List
Book List
Go to Table of Contents
Contents
Go to Index
Index
Go to Master Index
Master Index
Go to Feedback page
Feedback

Go to previous page
Previous
Go to next page
Next
View PDF

C OCI Examples

This appendix presents syntax and examples for the Oracle Streams Advanced Queuing (AQ) OCI operational interface.

This appendix contains these topics:

C.1 Enqueuing Messages

sword OCIAQEnq ( OCISvcCtx           *svch,
                 OCIError            *errh,
                 text                *queue_name,
                 OCIAQEnqOptions     *enqueue_options,
                 OCIAQMsgProperties  *message_properties,
                 OCIType             *payload_tdo,
                 dvoid               **payload,
                 dvoid               **payload_ind,
                 OCIRaw              **msgid,
                 ub4                 flags );

This call is used for an Oracle Streams AQ enqueue.


See Also:

"OCIAQEnq()" in Oracle Call Interface Programmer's Guide for parameter information and usage notes

Example C-1 through Example C-5 demonstrate the use of OCIAQEnq() and OCIAQDeq() in several different situations.

Example C-1 Enqueuing and Dequeuing an Object Payload

struct message
{
  OCIString   *subject;
  OCIString   *data;
};
typedef struct message message;

struct null_message
{
  OCIInd    null_adt;
  OCIInd    null_subject;
  OCIInd    null_data;
};
typedef struct null_message null_message;

int main( argc, argv)
int    argc;
char * argv[];
{
  OCIEnv *envhp;
  OCIServer *srvhp;
  OCIError *errhp;
  OCISvcCtx *svchp;
  dvoid     *tmp;
  OCIType *mesg_tdo = (OCIType *) 0;
  message  msg;
  null_message nmsg;
  message *mesg = &msg;
  null_message *nmesg = &nmsg;
  message *deqmesg = (message *)0;
  null_message *ndeqmesg = (null_message *)0;

  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)(dvoid *,size_t)) 0,
                dvoid * (*)(dvoid *, dvoid *, size_t)) 0,  
                (void (*)(dvoid *, dvoid *)) 0 );

  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
                  52, (dvoid **) &tmp);

  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
                    52, (dvoid **) &tmp);
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
                    52, (dvoid **) &tmp);

  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
                  52, (dvoid **) &tmp);
  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
                  (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);

  OCILogon(envhp, errhp, &svchp, (const text *)"AQ", (ub4) strlen("AQ"), 
          (const text *) "AQ", (ub4) strlen("AQ"), (const text *)0, 0);

  /* obtain TDO of message_type */
  OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", (ub4) strlen("AQ"),
              (CONST text *)"MESSAGE_TYPE", (ub4) strlen("MESSAGE_TYPE"),
              (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo);

  /* prepare the message payload */
  mesg->subject = (OCIString *)0;
  mesg->data = (OCIString *)0;
  OCIStringAssignText(envhp, errhp, (CONST text *)"NORMAL MESSAGE",
                 (ub4) strlen("NORMAL MESSAGE"), &mesg->subject);
  OCIStringAssignText(envhp, errhp,(CONST text *)"OCI ENQUEUE",
                 (ub4) strlen("OCI ENQUEUE"), &mesg->data);
  nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL;

  /* enqueue into the msg_queue */
  OCIAQEnq(svchp, errhp, (text *)"msg_queue", (OCIAQEnqOptions *)0,
           (OCIAQMsgProperties *)0,
           mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, (OCIRaw **)0, 0);
  OCITransCommit(svchp, errhp, (ub4) 0);

  /* dequeue from the msg_queue */
  OCIAQDeq(svchp, errhp, (text *)"msg_queue", (OCIAQDeqOptions *)0,
           (OCIAQMsgProperties *)0,
           mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, (OCIRaw **)0, 0);
  printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
  printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
  OCITransCommit(svchp, errhp, (ub4) 0);
  return 0;
}

Example C-2 Enqueuing and Dequeuing Using Correlation Identifiers

struct message
{
  OCIString   *subject;
  OCIString   *data;
};
typedef struct message message;

struct null_message
{
  OCIInd    null_adt;
  OCIInd    null_subject;
  OCIInd    null_data;
};
typedef struct null_message null_message;

int main( argc, argv)
int    argc;
char * argv[];
{
  OCIEnv *envhp;
  OCIServer *srvhp;
  OCIError *errhp;
  OCISvcCtx *svchp;
  dvoid     *tmp;
  OCIType *mesg_tdo = (OCIType *) 0;
  message  msg;
  null_message nmsg;
  message *mesg = &msg;
  null_message *nmesg = &nmsg;
  message *deqmesg = (message *)0;
  null_message *ndeqmesg = (null_message *)0;
  OCIRaw*firstmsg = (OCIRaw *)0;
  OCIAQMsgProperties *msgprop = (OCIAQMsgProperties *)0;
  OCIAQDeqOptions *deqopt = (OCIAQDeqOptions *)0;
  text correlation1[30], correlation2[30];

  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)(dvoid *,size_t)) 0,
  (dvoid * (*)(dvoid *, dvoid *, size_t)) 0,  (void (*)(dvoid *, dvoid *)) 0 );

  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
                 52, (dvoid **) &tmp);

  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
                 52, (dvoid **) &tmp);
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
                 52, (dvoid **) &tmp);

  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
                 52, (dvoid **) &tmp);
  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
                 (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);

  OCILogon(envhp, errhp, &svchp, (const text *) "AQ", (ub4) strlen("AQ"), 
           (const text *) "AQ", (ub4) strlen("AQ"), (const text *) 0, 0);

  /* allocate message properties descriptor */
  OCIDescriptorAlloc(envhp, (dvoid **)&msgprop,
                     OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0);
  strcpy((char *) correlation1, "1st message");
  OCIAttrSet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *)&correlation1,
            (ub4) strlen((const char*) correlation1), OCI_ATTR_CORRELATION,
            errhp);

  /* obtain TDO of message_type */
  OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", (ub4)strlen("AQ"),
                (CONST text *)"MESSAGE_TYPE", (ub4) strlen("MESSAGE_TYPE"),
                (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo);

  /* prepare the message payload */
  mesg->subject = (OCIString *)0;
  mesg->data = (OCIString *)0;
  OCIStringAssignText(envhp, errhp, (CONST text *)"NORMAL ENQUEUE1",
                  (ub4) strlen("NORMAL ENQUEUE1"), &mesg->subject);
  OCIStringAssignText(envhp, errhp,(CONST text *)"OCI ENQUEUE",
                  (ub4) strlen("OCI ENQUEUE"), &mesg->data);
  nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL;

  /* enqueue into the msg_queue, store the message id into firstmsg */
  OCIAQEnq(svchp, errhp, (text *)"msg_queue", (OCIAQEnqOptions *)0, msgprop,
           mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, &firstmsg, 0);

  /* enqueue into the msg_queue with a different correlation id */
  strcpy((char *)correlation2, "2nd message");
  OCIAttrSet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid*)&correlation2,
             (ub4) strlen((const char *)correlation2), OCI_ATTR_CORRELATION,
             errhp);
  OCIStringAssignText(envhp, errhp, (text *)"NORMAL ENQUEUE2",
                  (ub4) strlen("NORMAL ENQUEUE2"), &mesg->subject);
  OCIAQEnq(svchp, errhp, (text *)"msg_queue", (OCIAQEnqOptions *)0, msgprop,
                  mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, (OCIRaw **)0, 0);

  OCITransCommit(svchp, errhp, (ub4) 0);

  /* first dequeue by correlation id "2nd message" */
  /* allocate dequeue options descriptor and set the correlation option */
  OCIDescriptorAlloc(envhp, (dvoid **)&deqopt,
                     OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **)0);
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)correlation2,
            (ub4) strlen((const char *)correlation2),  OCI_ATTR_CORRELATION,
            errhp);

  /* dequeue from the msg_queue */
  OCIAQDeq(svchp, errhp, (text *)"msg_queue", deqopt, (OCIAQMsgProperties *)0,
           mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, (OCIRaw **)0, 0);
  printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
  printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
  OCITransCommit(svchp, errhp, (ub4) 0);

  /* second dequeue by message id */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&firstmsg,
  OCIRawSize(envhp, firstmsg), OCI_ATTR_DEQ_MSGID, errhp);
  /* clear correlation id option */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS,
             (dvoid *)correlation2, 0, OCI_ATTR_CORRELATION, errhp);

  /* dequeue from the msg_queue */
  OCIAQDeq(svchp, errhp, (text *)"msg_queue", deqopt, (OCIAQMsgProperties *)0,
           mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, (OCIRaw **)0, 0);
  printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
  printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
  OCITransCommit(svchp, errhp, (ub4) 0);
  return 0;
}

Example C-3 Enqueuing and Dequeuing Messages by Correlation and Message ID Using OCI

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <oci.h>

struct message
{
  OCIString   *subject;
  OCIString   *data;
};
typedef struct message message;

struct null_message
{
  OCIInd    null_adt;
  OCIInd    null_subject;
  OCIInd    null_data;
};
typedef struct null_message null_message;

int main()
{
  OCIEnv       *envhp;
  OCIServer    *srvhp;
  OCIError     *errhp;
  OCISvcCtx    *svchp;
  dvoid        *tmp;
  OCIType      *mesg_tdo = (OCIType *) 0;
  message      msg;
  null_message nmsg;
  message      *mesg     = &msg;
  null_message *nmesg    = &nmsg;
  message      *deqmesg  = (message *)0;
  null_message *ndeqmesg = (null_message *)0;

  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)()) 0,
                (dvoid * (*)()) 0,  (void (*)()) 0 );

  OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
                 52, (dvoid **) &tmp);

  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );

  OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
                 52, (dvoid **) &tmp);
  OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
                 52, (dvoid **) &tmp);

  OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);

  OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
                 52, (dvoid **) &tmp);

  OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
             (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);
 
  OCILogon(envhp, errhp, &svchp, "AQ", strlen("AQ"), "AQ", strlen("AQ"), 0, 0);

  /* Obtain TDO of message_typ */
  OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", strlen("AQ"),
                (CONST text *)"MESSAGE_TYP", strlen("MESSAGE_TYP"), 
                (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo);

  /* Prepare the message payload */
  mesg->subject = (OCIString *)0;
  mesg->data = (OCIString *)0;
  OCIStringAssignText(envhp, errhp,
                      (CONST text *)"NORMAL MESSAGE", strlen("NORMAL MESSAGE"), 
                      &mesg->subject);
  OCIStringAssignText(envhp, errhp,
                      (CONST text *)"OCI ENQUEUE", strlen("OCI ENQUEUE"), 
                      &mesg->data);
  nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL;

  /* Enqueue into the msg_queue */
  OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue", 0, 0,
           mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0);
  OCITransCommit(svchp, errhp, (ub4) 0);

  /* Dequeue from the msg_queue */
  OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue", 0, 0,
           mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0);
  printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
  printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
  OCITransCommit(svchp, errhp, (ub4) 0);
}

Example C-4 Enqueuing and Dequeuing of a RAW Payload

int main( argc, argv)
int    argc;
char * argv[];
{
  OCIEnv *envhp;
  OCIServer *srvhp;
  OCIError *errhp;
  OCISvcCtx *svchp;
  dvoid     *tmp;
  OCIType *mesg_tdo = (OCIType *) 0;
  char  msg_text[100];
  OCIRaw  *mesg = (OCIRaw *)0;
  OCIRaw*deqmesg = (OCIRaw *)0;
  OCIInd   ind = 0;
  dvoid  *indptr = (dvoid *)&ind;
  int i;

  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)(dvoid *, size_t)) 0,
               (dvoid * (*)(dvoid *, dvoid *, size_t)) 0,  (void (*)(dvoid *,
               dvoid *)) 0 );
  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
                  52, (dvoid **) &tmp);

  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
                  52, (dvoid **) &tmp);
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
                  52, (dvoid **) &tmp);

  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
                  52, (dvoid **) &tmp);
  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
                  (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);

  OCILogon(envhp, errhp, &svchp, (const text *) "AQ", (ub4) strlen("AQ"), (const
           text *) "AQ", (ub4) strlen("AQ"), (const text *) 0, 0);

  /* obtain the TDO of the RAW datatype */
  OCITypeByName(envhp, errhp, svchp, (CONST text *)"SYS", (ub4) strlen("SYS"),
                (CONST text *)"RAW", (ub4) strlen("RAW"),
                (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo);

  /* prepare the message payload */
  strcpy(msg_text, "Enqueue to a RAW queue");
  OCIRawAssignBytes(envhp, errhp, (const ub1 *)msg_text, (ub4) strlen(msg_text),
                    &mesg);

  /* enqueue the message into raw_msg_queue */
  OCIAQEnq(svchp, errhp, (text *)"raw_msg_queue", (OCIAQEnqOptions *)0,
           (OCIAQMsgProperties *) 0,
           mesg_tdo, (dvoid **)&mesg, (dvoid **)&indptr, (OCIRaw **)0, 0);
  OCITransCommit(svchp, errhp, (ub4) 0);

  /* dequeue the same message into C variable deqmesg */
  OCIAQDeq(svchp, errhp, (text *)"raw_msg_queue", (OCIAQDeqOptions *)0,
           (OCIAQMsgProperties *)0,
            mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&indptr, (OCIRaw **)0, 0);
  for (i = 0; i < OCIRawSize(envhp, deqmesg); i++)
    printf("%c", *(OCIRawPtr(envhp, deqmesg) + i));
  OCITransCommit(svchp, errhp, (ub4) 0);
  return 0;
}

Example C-5 Enqueuing and Dequeuing Using OCIAQAgent

struct message
{
  OCIString   *subject;
  OCIString   *data;
};
typedef struct message message;

struct null_message
{
  OCIInd    null_adt;
  OCIInd    null_subject;
  OCIInd    null_data;
};
typedef struct null_message null_message;

int main( argc, argv)
int    argc;
char * argv[];
{
  OCIEnv *envhp;
  OCIServer *srvhp;
  OCIError *errhp;
  OCISvcCtx *svchp;
  dvoid     *tmp;
  OCIType *mesg_tdo = (OCIType *) 0;
  message  msg;
  null_message nmsg;
  message *mesg = &msg;
  null_message *nmesg = &nmsg;
  message *deqmesg = (message *)0;
  null_message *ndeqmesg = (null_message *)0;
  OCIAQMsgProperties *msgprop = (OCIAQMsgProperties *)0;
  OCIAQAgent *agents[2];
  OCIAQDeqOptions *deqopt = (OCIAQDeqOptions *)0;
  ub4 wait = OCI_DEQ_NO_WAIT;
  ub4 navigation = OCI_DEQ_FIRST_MSG;

  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0, (dvoid * (*)(dvoid *,size_t)) 0,
                (dvoid * (*)(dvoid *, dvoid *, size_t)) 0,  
                (void (*)(dvoid *, dvoid *)) 0 );

  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
                52, (dvoid **) &tmp);

  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
                   52, (dvoid **) &tmp);
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
                   52, (dvoid **) &tmp);

  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
                   52, (dvoid **) &tmp);

  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
              (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);

  OCILogon(envhp, errhp, &svchp, (const text *) "AQ", (ub4) strlen("AQ"), (const
           text *) "AQ", (ub4) strlen("AQ"), (const text *) 0, 0);

  /* obtain TDO of message_type */
  OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", (ub4) strlen("AQ"),
       (CONST text *)"MESSAGE_TYPE", (ub4) strlen("MESSAGE_TYPE"),
       (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo);

  /* prepare the message payload */
  mesg->subject = (OCIString *)0;
  mesg->data = (OCIString *)0;
  OCIStringAssignText(envhp, errhp,
                    (CONST text *)"MESSAGE 1", (ub4) strlen("MESSAGE 1"),
                    &mesg->subject);
  OCIStringAssignText(envhp, errhp,
                    (CONST text *)"mesg for queue subscribers",
                    (ub4) strlen("mesg for queue subscribers"), &mesg->data);
  nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL;

  /* enqueue MESSAGE 1 for subscribers to the queue for RED and GREEN */
  OCIAQEnq(svchp, errhp, (text *)"msg_queue_multiple", (OCIAQEnqOptions *)0,
           (OCIAQMsgProperties *)0,
           mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, (OCIRaw **)0, 0);

  /* enqueue MESSAGE 2 for specified recipients for RED and BLUE */
  /* prepare message payload */
  OCIStringAssignText(envhp, errhp,
                    (CONST text *)"MESSAGE 2", (ub4) strlen("MESSAGE 2"),
                    &mesg->subject);
  OCIStringAssignText(envhp, errhp,
                    (CONST text *)"mesg for two recipients",
                    (ub4) strlen("mesg for two recipients"), &mesg->data);

  /* allocate AQ message properties and agent descriptors */
  OCIDescriptorAlloc(envhp, (dvoid **)&msgprop,
                OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0);
  OCIDescriptorAlloc(envhp, (dvoid **)&agents[0],
                  OCI_DTYPE_AQAGENT, 0, (dvoid **)0);
  OCIDescriptorAlloc(envhp, (dvoid **)&agents[1],
                  OCI_DTYPE_AQAGENT, 0, (dvoid **)0);

  /* prepare the recipient list, RED and BLUE */
  OCIAttrSet(agents[0], OCI_DTYPE_AQAGENT, (dvoid *) "RED", (ub4) strlen("RED"),
             OCI_ATTR_AGENT_NAME, errhp);
  OCIAttrSet(agents[1], OCI_DTYPE_AQAGENT, (dvoid *)"BLUE", 
             (ub4) strlen("BLUE"), OCI_ATTR_AGENT_NAME, errhp);
  OCIAttrSet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *)agents, 2,
             OCI_ATTR_RECIPIENT_LIST, errhp);

  OCIAQEnq(svchp, errhp, (text *)"msg_queue_multiple", (OCIAQEnqOptions *)0,
         msgprop, mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, (OCIRaw **)0, 0);
  OCITransCommit(svchp, errhp, (ub4) 0);

  /* now dequeue the messages using different consumer names */
  /* allocate dequeue options descriptor to set the dequeue options
*/
  OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 0,
                     (dvoid **)0);

  /* set wait parameter to NO_WAIT so that the dequeue returns
immediately */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&wait, 0,
             OCI_ATTR_WAIT, errhp);

  /* set navigation to FIRST_MESSAGE so that the dequeue resets the
position */
  /* after a new consumer_name is set in the dequeue options     */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&navigation, 0,
              OCI_ATTR_NAVIGATION, errhp);

  /* dequeue from the msg_queue_multiple as consumer BLUE */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)"BLUE",
             (ub4)strlen("BLUE"), OCI_ATTR_CONSUMER_NAME, errhp);
  while (OCIAQDeq(svchp, errhp, (text *)"msg_queue_multiple", deqopt,
                  (OCIAQMsgProperties *) 0,
                   mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 
                  (OCIRaw **) 0, 0) == OCI_SUCCESS)
  {
    printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
    printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
  }
  OCITransCommit(svchp, errhp, (ub4) 0);

  /* dequeue from the msg_queue_multiple as consumer RED */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)"RED",
             (ub4)strlen("RED"), OCI_ATTR_CONSUMER_NAME, errhp);
  while (OCIAQDeq(svchp, errhp, (text *)"msg_queue_multiple", deqopt,
                 (OCIAQMsgProperties *)0,
                 mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 
                 (OCIRaw **)0, 0) == OCI_SUCCESS)
  {
    printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
    printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
  }
  OCITransCommit(svchp, errhp, (ub4) 0);

  /* dequeue from the msg_queue_multiple as consumer GREEN */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS,(dvoid *)"GREEN", (ub4)
             strlen("GREEN"), OCI_ATTR_CONSUMER_NAME, errhp);
  while (OCIAQDeq(svchp, errhp, (text *)"msg_queue_multiple", deqopt,
                  (OCIAQMsgProperties *)0,
                  mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 
                 (OCIRaw **)0, 0) == OCI_SUCCESS)
  {
    printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
    printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
  }
  OCITransCommit(svchp, errhp, (ub4) 0);
  return 0;
}

Example C-6 Enqueuing and Dequeuing Messages for a Multiconsumer Queue

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <oci.h>

struct message
{
  OCIString   *subject;
  OCIString   *data;
};
typedef struct message message;

struct null_message
{
  OCIInd    null_adt;
  OCIInd    null_subject;
  OCIInd    null_data;
};
typedef struct null_message null_message;

int main()
{
  OCIEnv                *envhp;
  OCIServer             *srvhp;
  OCIError              *errhp;
  OCISvcCtx             *svchp;
  dvoid                 *tmp;
  OCIType               *mesg_tdo = (OCIType *) 0;
  message               msg;
  null_message          nmsg;
  message               *mesg = &msg;
  null_message          *nmesg = &nmsg;
  message               *deqmesg = (message *)0;
  null_message          *ndeqmesg = (null_message *)0;
  OCIAQMsgProperties    *msgprop = (OCIAQMsgProperties *)0;
  OCIAQAgent            *agents[2];
  OCIAQDeqOptions       *deqopt = (OCIAQDeqOptions *)0;
  ub4                   wait = OCI_DEQ_NO_WAIT;
  ub4                   navigation = OCI_DEQ_FIRST_MSG;

  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)()) 0,
                (dvoid * (*)()) 0,  (void (*)()) 0 );



  OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
                 52, (dvoid **) &tmp);

  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );

  OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
                 52, (dvoid **) &tmp);
  OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
                 52, (dvoid **) &tmp);

  OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);

  OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
                 52, (dvoid **) &tmp);

  OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
             (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);
 
  OCILogon(envhp, errhp, &svchp, "AQ", strlen("AQ"), "AQ", strlen("AQ"), 0, 0);

  /* Obtain TDO of message_typ */
  OCITypeByName(envhp, errhp, svchp, (CONST text *)"AQ", strlen("AQ"),
               (CONST text *)"MESSAGE_TYP", strlen("MESSAGE_TYP"), 
               (text *)0, 0, OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo);

  /* Prepare the message payload */
  mesg->subject = (OCIString *)0;
  mesg->data = (OCIString *)0;
  OCIStringAssignText(envhp, errhp,
                      (CONST text *)"MESSAGE 1", strlen("MESSAGE 1"), 
                      &mesg->subject);
  OCIStringAssignText(envhp, errhp,
                     (CONST text *)"mesg for queue subscribers", 
                     strlen("mesg for queue subscribers"), &mesg->data);
  nmesg->null_adt = nmesg->null_subject = nmesg->null_data = OCI_IND_NOTNULL;

  /* Enqueue MESSAGE 1 for subscribers to the queue. */
  OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue_multiple", 0, 0,
           mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0);

  /* Enqueue MESSAGE 2 for specified recipients. */
  /* prepare message payload */
 OCIStringAssignText(envhp, errhp,
                      (CONST text *)"MESSAGE 2", strlen("MESSAGE 2"),
                      &mesg->subject);
  OCIStringAssignText(envhp, errhp,
       (CONST text *)"mesg for two recipients", 
       strlen("mesg for two recipients"), &mesg->data);

  /* Allocate AQ message properties and agent descriptors */
  OCIDescriptorAlloc(envhp, (dvoid **)&msgprop, 
                     OCI_DTYPE_AQMSG_PROPERTIES, 0, (dvoid **)0);
  OCIDescriptorAlloc(envhp, (dvoid **)&agents[0], 
                     OCI_DTYPE_AQAGENT, 0, (dvoid **)0);
  OCIDescriptorAlloc(envhp, (dvoid **)&agents[1], 
                     OCI_DTYPE_AQAGENT, 0, (dvoid **)0);

  /* Prepare the recipient list, RED and BLUE */
  OCIAttrSet(agents[0], OCI_DTYPE_AQAGENT, "RED", strlen("RED"),
             OCI_ATTR_AGENT_NAME, errhp);
  OCIAttrSet(agents[1], OCI_DTYPE_AQAGENT, "BLUE", strlen("BLUE"),
             OCI_ATTR_AGENT_NAME, errhp);
  OCIAttrSet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, (dvoid *)agents, 2,
             OCI_ATTR_RECIPIENT_LIST, errhp);

  OCIAQEnq(svchp, errhp, (CONST text *)"msg_queue_multiple", 0, msgprop,
           mesg_tdo, (dvoid **)&mesg, (dvoid **)&nmesg, 0, 0);

  OCITransCommit(svchp, errhp, (ub4) 0);

  /* Now dequeue the messages using different consumer names */
  /* Allocate dequeue options descriptor to set the dequeue options */
  OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 0, 
                     (dvoid **)0);

  /* Set wait parameter to NO_WAIT so that the dequeue returns immediately */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&wait, 0, 
             OCI_ATTR_WAIT, errhp);

  /* Set navigation to FIRST_MESSAGE so that the dequeue resets the position */
  /* after a new consumer_name is set in the dequeue options           */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)&navigation, 0, 
             OCI_ATTR_NAVIGATION, errhp);

  /* Dequeue from the msg_queue_multiple as consumer BLUE */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)"BLUE", strlen("BLUE"), 
             OCI_ATTR_CONSUMER_NAME, errhp);

  while (OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue_multiple", deqopt, 0,
                  mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0) 
                  == OCI_SUCCESS)
  {
    printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
    printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
  }
  OCITransCommit(svchp, errhp, (ub4) 0);

  /* Dequeue from the msg_queue_multiple as consumer RED */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, (dvoid *)"RED", strlen("RED"), 
        OCI_ATTR_CONSUMER_NAME, errhp);
  while (OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue_multiple", deqopt, 0,
    mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0) 
    == OCI_SUCCESS)
  {
    printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
    printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
  }
  OCITransCommit(svchp, errhp, (ub4) 0);

  /* Dequeue from the msg_queue_multiple as consumer GREEN */
  OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS,(dvoid *)"GREEN",strlen("GREEN"), 
        OCI_ATTR_CONSUMER_NAME, errhp);
  while (OCIAQDeq(svchp, errhp, (CONST text *)"msg_queue_multiple", deqopt, 0,
    mesg_tdo, (dvoid **)&deqmesg, (dvoid **)&ndeqmesg, 0, 0) 
    == OCI_SUCCESS)
  {
    printf("Subject: %s\n", OCIStringPtr(envhp, deqmesg->subject));
    printf("Text: %s\n", OCIStringPtr(envhp, deqmesg->data));
  }
  OCITransCommit(svchp, errhp, (ub4) 0);
}

C.2 Enqueuing an Array of Messages

sword OCIAQEnqArray ( OCISvcCtx              *svchp,
                      OCIError               *errhp,
                      OraText                *queue_name,
                      OCIAQEnqOptions        *enqopt,
                      ub4                    *iters,
                      OCIAQMsgProperties     **msgprop,
                      OCIType                *payload_tdo,
                      dvoid                  **payload,
                      dvoid                  **payload_ind,
                      OCIRaw                 **msgid,
                      dvoid                  *ctxp,
                      OCICallbackAQEnq       (cbfp)
                                             (
                                             dvoid              *ctxp,
                                             dvoid              **payload,
                                             dvoid              **payload_ind
                                             ),
                      ub4                    flags );

This call enqueues an array of messages to a queue. The array of messages is enqueued with the same options and has the same payload column TDO.


See Also:

"OCIAQEnqArray()" in Oracle Call Interface Programmer's Guide for parameter information and usage notes

Example C-7 Enqueuing an Array of Messages

struct message{
 
  OCIString   *data;
};
typedef struct message message;
 
struct null_message{
 
  OCIInd null_adt;
  OCIInd null_data;
};
typedef struct null_message null_message;
 
int main( argc, argv)
int argc ;
char **argv ;{
 
  OCIEnv              *envhp;
  OCIServer           *srvhp;
  OCIError            *errhp;
  OCISvcCtx           *svchp;
  OCISession          *usrhp;
  dvoid               *tmp;
  OCIType             *mesg_tdo = (OCIType *) 0;
  message              mesg[NMESGS];
  message             *mesgp[NMESGS];
  null_message         nmesg[NMESGS];
  null_message        *nmesgp[NMESGS];
  int                  i, j, k;
  OCIInd               ind[NMESGS];
  dvoid               *indptr[NMESGS];
  ub4                  priority;
  OCIAQEnqOptions     *enqopt = (OCIAQEnqOptions *)0;
  OCIAQMsgProperties  *msgprop= (OCIAQMsgProperties *)0;
  ub4                  wait = 1;
  ub4                  navigation = OCI_DEQ_NEXT_MSG;
  ub4                  iters = 2;
  text                *qname ; 
  text                 mesgdata[30];
  ub4                  payload_size = 5;
  text                *payload = (text *)0;
  ub4                  batch_size = 2;
  ub4                  enq_size = 2;
  
  printf("session start\n");
  /* establish a session */  
  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)()) 0,
        (dvoid * (*)()) 0,  (void (*)()) 0 );
  
  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
           52, (dvoid **) &tmp);
  
  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
           52, (dvoid **) &tmp);
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
           52, (dvoid **) &tmp);
  
  printf("server attach\n");
  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
           52, (dvoid **) &tmp);
  
  /* set attribute server context in the service context */
  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
           (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);
  
  /* allocate a user context handle */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
           (size_t) 0, (dvoid **) 0);
  
  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
           (dvoid *)"AQUSER", (ub4)strlen("AQUSER"), 
             OCI_ATTR_USERNAME, errhp);
           
  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
           (dvoid *)"AQUSER", (ub4)strlen("AQUSER"),
           OCI_ATTR_PASSWORD, errhp);
  
  checkerr(errhp, OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, 
                                   OCI_DEFAULT));
  
  OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX,
           (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp);
 
  /* get descriptor for enqueue options */
  checkerr(errhp, OCIDescriptorAlloc(envhp, (dvoid **)&enqopt, 
                                         OCI_DTYPE_AQENQ_OPTIONS, 0, 
                                         (dvoid **)0));
 
  printf("enq options set\n");
  /* set enqueue options - for consumer name, wait and navigation */
 
  /* construct null terminated payload string */
  payload = (text *)malloc(payload_size+1);
  for (k=0 ; k < payload_size ; k++)
    payload[k] = 'a';
  payload[payload_size] = '\0';
 
  for (k=0 ; k < batch_size ; k++)
  {
    indptr[k] = &ind[k];
    mesgp[k] = &mesg[k];
    nmesgp[k] = &nmesg[k];
    nmesg[k].null_adt = nmesg[k].null_data = OCI_IND_NOTNULL; 
    mesg[k].data = (OCIString *)0;
    OCIStringAssignText(envhp, errhp, (const unsigned char *)payload,
                        strlen((const char *)payload), &(mesg[k].data));
  }
  
  printf("check message tdo\n");
  checkerr(errhp, OCITypeByName(envhp, errhp, svchp, 
   (CONST text *)"AQUSER", strlen("AQUSER"),
   (CONST text *)"MESSAGE", strlen("MESSAGE"), (text *)0, 0, 
   OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo));
  k=0;
 
  while (k < iters) 
  {
    enq_size = batch_size;
    checkerr(errhp, OCIAQEnqArray(svchp, errhp, 
                                  (dvoid *)"AQUSER.MY_QUEUE", 
                                  (OCIAQEnqOptions *)0, &enq_size, 
                                  0, mesg_tdo, 
                                  (dvoid **)&mesgp, 
                                  (dvoid **)&nmesgp, 0, 0, 0, 0));
    k+=batch_size;
  }
  
  checkerr(errhp, OCITransCommit(svchp, errhp, (ub4) 0));
 
  checkerr(errhp, OCIServerDetach( srvhp, errhp, (ub4) OCI_DEFAULT));
 
  return 0;}

C.3 Listening to One or More Queues

sword OCIAQListen (OCISvcCtx      *svchp, 
                   OCIError       *errhp,
                   OCIAQAgent     **agent_list, 
                   ub4            num_agents,
                   sb4            wait, 
                   OCIAQAgent     **agent,
                   ub4            flags);

This call listens on one or more queues on behalf of a list of agents.


See Also:

"OCIAQListen()" in Oracle Call Interface Programmer's Guide for parameter information and usage notes

Example C-8 Listening for Single-Consumer Queues with Zero Timeout (OCI)

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <oci.h>

static void checkerr(errhp, status)
OCIError *errhp;
sword status;
{
    text errbuf[512];
    ub4 buflen;
    sb4 errcode;

    switch (status)
    {
   case OCI_SUCCESS:
       break;
   case OCI_SUCCESS_WITH_INFO:
       printf("Error - OCI_SUCCESS_WITH_INFO\n");
       break;
   case OCI_NEED_DATA:
       printf("Error - OCI_NEED_DATA\n");
       break;
   case OCI_NO_DATA:
       printf("Error - OCI_NO_DATA\n");
       break;
   case OCI_ERROR:
       OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode,
       errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR);
       printf("Error - %s\n", errbuf);
       break;
   case OCI_INVALID_HANDLE:
       printf("Error - OCI_INVALID_HANDLE\n");
       break;
   case OCI_STILL_EXECUTING:
       printf("Error - OCI_STILL_EXECUTE\n");
       break;
   case OCI_CONTINUE:
       printf("Error - OCI_CONTINUE\n");
       break;
   default:
   break;
    }
}

/* set agent into descriptor */
void SetAgent(agent, appname, queue,errhp)

OCIAQAgent  *agent;
text        *appname;
text        *queue;
OCIError    *errhp;
{

  OCIAttrSet(agent, OCI_DTYPE_AQAGENT, 
     appname ? (dvoid *)appname : (dvoid *)"", 
     appname ? strlen((const char *)appname) : 0,
        OCI_ATTR_AGENT_NAME, errhp);

  OCIAttrSet(agent, OCI_DTYPE_AQAGENT, 
     queue ? (dvoid *)queue : (dvoid *)"", 
     queue ? strlen((const char *)queue) : 0,
        OCI_ATTR_AGENT_ADDRESS, errhp);

  printf("Set agent name to %s\n", appname ? (char *)appname : "NULL");
  printf("Set agent address to %s\n", queue ? (char *)queue : "NULL");
}

/* get agent from descriptor */
void GetAgent(agent, errhp)
OCIAQAgent *agent;
OCIError   *errhp;
{
text      *appname;
text      *queue;
ub4       appsz;
ub4       queuesz;

  if (!agent )
  {
    printf("agent was NULL \n");
    return;
  }
  checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, 
     (dvoid *)&appname, &appsz, OCI_ATTR_AGENT_NAME, errhp));
  checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, 
     (dvoid *)&queue, &queuesz, OCI_ATTR_AGENT_ADDRESS, errhp));
  if (!appsz)
     printf("agent name: NULL\n");
  else printf("agent name: %.*s\n", appsz, (char *)appname);
  if (!queuesz)
     printf("agent address: NULL\n");
  else printf("agent address: %.*s\n", queuesz, (char *)queue);
}

int main()
{
  OCIEnv *envhp;
  OCIServer *srvhp;
  OCIError *errhp;
  OCISvcCtx *svchp;
  OCISession *usrhp;
  OCIAQAgent *agent_list[3];
  OCIAQAgent *agent = (OCIAQAgent *)0;
  /* added next 2 121598 */
  int i;

 /* Standard OCI Initialization */

  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)()) 0,
     (dvoid * (*)()) 0,  (void (*)()) 0 );
 
  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, 
             (ub4) OCI_HTYPE_ENV, 0, (dvoid **) 0);

  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 0, (dvoid **) 0);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
     0, (dvoid **) 0);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
     0, (dvoid **) 0);

  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
     0, (dvoid **) 0);

  /* set attribute server context in the service context */
  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
     (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);

  /* allocate a user context handle */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
      (size_t) 0, (dvoid **) 0);

  /* allocate a user context handle */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
     (size_t) 0, (dvoid **) 0);

  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
     (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp);

  OCIAttrSet((dvoid *) usrhp, (ub4) OCI_HTYPE_SESSION,
      (dvoid *) "tiger", (ub4) strlen("tiger"),
      (ub4) OCI_ATTR_PASSWORD, errhp);

  OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT);

  OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX,
     (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp);

  /* AQ LISTEN Initialization - allocate agent handles */
  for (i = 0; i < 3; i++)
  {
     agent_list[i] = (OCIAQAgent *)0;
     OCIDescriptorAlloc(envhp, (dvoid **)&agent_list[i], 
         OCI_DTYPE_AQAGENT, 0, (dvoid **)0);
  }

  /* 
   *   SCQ1, SCQ2, SCQ3 are single-consumer queues in SCOTT's schema
   */
 
  SetAgent(agent_list[0], (text *)0, "SCOTT.SCQ1", errhp);
  SetAgent(agent_list[1], (text *)0, "SCOTT.SCQ2", errhp);
  SetAgent(agent_list[2], (text *)0, "SCOTT.SCQ3", errhp);
 
  checkerr(errhp,OCIAQListen(svchp, errhp, agent_list, 3, 0, &agent, 0));

  printf("MESSAGE for :- \n");
  GetAgent(agent, errhp);
  printf("\n");

}

Example C-9 Listening for Single-Consumer Queues with 120 Second Timeout (OCI)

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <oci.h>

static void checkerr(errhp, status)
OCIError *errhp;
sword status;
{
    text errbuf[512];
    ub4 buflen;
    sb4 errcode;

    switch (status)
    {
   case OCI_SUCCESS:
       break;
   case OCI_SUCCESS_WITH_INFO:
       printf("Error - OCI_SUCCESS_WITH_INFO\n");
       break;
   case OCI_NEED_DATA:
       printf("Error - OCI_NEED_DATA\n");
       break;
   case OCI_NO_DATA:
       printf("Error - OCI_NO_DATA\n");
       break;
   case OCI_ERROR:
       OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode,
       errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR);
       printf("Error - %s\n", errbuf);
       break;
   case OCI_INVALID_HANDLE:
       printf("Error - OCI_INVALID_HANDLE\n");
       break;
   case OCI_STILL_EXECUTING:
       printf("Error - OCI_STILL_EXECUTE\n");
       break;
   case OCI_CONTINUE:
       printf("Error - OCI_CONTINUE\n");
       break;
   default:
   break;
    }
}

/* set agent into descriptor */
/* void SetAgent(agent, appname, queue) */
void SetAgent(agent, appname, queue,errhp)

OCIAQAgent  *agent;
text        *appname;
text        *queue;
OCIError    *errhp;
{

  OCIAttrSet(agent, OCI_DTYPE_AQAGENT, 
     appname ? (dvoid *)appname : (dvoid *)"", 
     appname ? strlen((const char *)appname) : 0,
        OCI_ATTR_AGENT_NAME, errhp);

  OCIAttrSet(agent, OCI_DTYPE_AQAGENT, 
     queue ? (dvoid *)queue : (dvoid *)"", 
     queue ? strlen((const char *)queue) : 0,
        OCI_ATTR_AGENT_ADDRESS, errhp);

  printf("Set agent name to %s\n", appname ? (char *)appname : "NULL");
  printf("Set agent address to %s\n", queue ? (char *)queue : "NULL");
}

/* get agent from descriptor */
void GetAgent(agent, errhp)
OCIAQAgent *agent;
OCIError   *errhp;
{
text      *appname;
text      *queue;
ub4       appsz;
ub4       queuesz;

  if (!agent )
  {
    printf("agent was NULL \n");
    return;
  }
  checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, 
     (dvoid *)&appname, &appsz, OCI_ATTR_AGENT_NAME, errhp));
  checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, 
     (dvoid *)&queue, &queuesz, OCI_ATTR_AGENT_ADDRESS, errhp));
  if (!appsz)
     printf("agent name: NULL\n");
  else printf("agent name: %.*s\n", appsz, (char *)appname);
  if (!queuesz)
     printf("agent address: NULL\n");
  else printf("agent address: %.*s\n", queuesz, (char *)queue);
}

int main()
{
  OCIEnv *envhp;
  OCIServer *srvhp;
  OCIError *errhp;
  OCISvcCtx *svchp;
  OCISession *usrhp;
  OCIAQAgent *agent_list[3];
  OCIAQAgent *agent = (OCIAQAgent *)0;
  /* added next 2 121598 */
  int i;

 /* Standard OCI Initialization */

  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)()) 0,
     (dvoid * (*)()) 0,  (void (*)()) 0 );
 
  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, 
             (ub4) OCI_HTYPE_ENV, 0, (dvoid **) 0);

  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 0, (dvoid **) 0);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
     0, (dvoid **) 0);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
     0, (dvoid **) 0);

  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
     0, (dvoid **) 0);

  /* set attribute server context in the service context */
  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
     (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);

  /* allocate a user context handle */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
      (size_t) 0, (dvoid **) 0);

  /* allocate a user context handle */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
     (size_t) 0, (dvoid **) 0);

  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
     (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp);

  OCIAttrSet((dvoid *) usrhp, (ub4) OCI_HTYPE_SESSION,
      (dvoid *) "tiger", (ub4) strlen("tiger"),
      (ub4) OCI_ATTR_PASSWORD, errhp);

  OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT);

  OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX,
     (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp);

  /* AQ LISTEN Initialization - allocate agent handles */
  for (i = 0; i < 3; i++)
  {
     agent_list[i] = (OCIAQAgent *)0;
     OCIDescriptorAlloc(envhp, (dvoid **)&agent_list[i], 
         OCI_DTYPE_AQAGENT, 0, (dvoid **)0);
  }

  /* 
   *   SCQ1, SCQ2, SCQ3 are single-consumer queues in SCOTT's schema
   */
 
  SetAgent(agent_list[0], (text *)0, "SCOTT.SCQ1", errhp);
  SetAgent(agent_list[1], (text *)0, "SCOTT.SCQ2", errhp);
  SetAgent(agent_list[2], (text *)0, "SCOTT.SCQ3", errhp);
 
  checkerr(errhp,OCIAQListen(svchp, errhp, agent_list, 3, 120, &agent, 0));

  printf("MESSAGE for :- \n");
  GetAgent(agent, errhp);
  printf("\n");

}

C.4 Dequeuing a Message

sword OCIAQDeq ( OCISvcCtx           *svch,
                 OCIError            *errh,
                 text                *queue_name,
                 OCIAQDeqOptions     *dequeue_options,
                 OCIAQMsgProperties  *message_properties,
                 OCIType             *payload_tdo,
                 dvoid               **payload,
                 dvoid               **payload_ind,
                 OCIRaw              **msgid,
                 ub4                 flags );

This call is used for an Oracle Streams AQ dequeue. Dequeue examples are in "Enqueuing Messages".


See Also:

"OCIAQDeq()" in Oracle Call Interface Programmer's Guide for parameter information and usage notes

C.5 Dequeuing an Array of Messages

sword OCIAQDeqArray ( OCISvcCtx           *svchp,
                      OCIError            *errhp,
                      OraText             *queue_name,
                      OCIAQDeqOptions     *deqopt,
                      ub4                 *iters,
                      OCIAQMsgProperties  **msgprop,
                      OCIType             *payload_tdo,
                      dvoid               **payload,
                      dvoid               **payload_ind,
                      OCIRaw              **msgid,
                      dvoid               *ctxp,
                      OCICallbackAQDeq    (cbfp) 
                                          (
                                          dvoid              *ctxp,
                                          dvoid              **payload,
                                          dvoid              **payload_ind
                                          ),
                      ub4                 flags );

This call dequeues an array of messages from a queue. The array of messages is all dequeued with the same option and has the same queue table payload column TDO.


See Also:

"OCIAQDeqArray()" in Oracle Call Interface Programmer's Guide for parameter information and usage notes

Example C-10 Array Dequeuing from a Queue of Type Message (OCI)

struct message{
 
  OCIString   *data;
};
typedef struct message message;
 
struct null_message{
 
  OCIInd null_adt;
  OCIInd null_data;
};
typedef struct null_message null_message;
 
int main(argc, argv)
 int argc;
 char **argv;{
 
  OCIEnv                *envhp;
  OCIServer             *srvhp;
  OCIError              *errhp;
  OCISvcCtx             *svchp;
  OCISession            *usrhp;
  dvoid                 *tmp;
  message               *mesgp[NMESGS];
  int                    i, j, k;
  null_message          *nmesgp[NMESGS];
  ub4                    priority = 0;
  OCIAQDeqOptions       *deqopt = (OCIAQDeqOptions *)0;
  ub4                    iters = 2;
  OCIType               *mesg_tdo = (OCIType *) 0;
  ub4                    batch_size = 2;
  ub4                    deq_size = batch_size;

 printf("session start\n");
  /* establish a session */  
  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)()) 0,
        (dvoid * (*)()) 0,  (void (*)()) 0 );
  
  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
           52, (dvoid **) &tmp);
  
  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
           52, (dvoid **) &tmp);
 
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
           52, (dvoid **) &tmp);
  
  printf("server attach\n");
  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
           52, (dvoid **) &tmp);
  
  /* set attribute server context in the service context */
  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
           (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);
  
  /* allocate a user context handle */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
           (size_t) 0, (dvoid **) 0);
  
  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
           (dvoid *)"AQUSER", (ub4)strlen("AQUSER"), 
             OCI_ATTR_USERNAME, errhp);
  
  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
           (dvoid *)"AQUSER", (ub4)strlen("AQUSER"),
           OCI_ATTR_PASSWORD, errhp);
  
  checkerr(errhp, OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, 
                                   OCI_DEFAULT));
  
  OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX,
           (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp);
 
  /* get descriptor for dequeue options */
  checkerr(errhp, OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, 
                                         OCI_DTYPE_AQDEQ_OPTIONS, 0, 
                                         (dvoid **)0));
  
  printf("deq options set\n");
  /* set dequeue options - for consumer name, wait and navigation */
  checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 
                                 (dvoid *)"SUB1",
                                 (ub4)strlen("SUB1"), 
                                 OCI_ATTR_CONSUMER_NAME, errhp));
 
  for (k=0 ; k < NMESGS ; k++)
  {
    mesgp[k] = 0;
    nmesgp[k] = 0;
  }
  
  printf("check message tdo\n");
  checkerr(errhp, OCITypeByName(envhp, errhp, svchp, 
        (CONST text *)"AQUSER", strlen("AQUSER"),
        (CONST text *)"MESSAGE", strlen("MESSAGE"), (text *)0, 0, 
        OCI_DURATION_SESSION, OCI_TYPEGET_ALL, &mesg_tdo));

 k=0;
 
  while (k < iters) 
  {
    deq_size = batch_size;
    checkerr(errhp, OCIAQDeqArray(svchp, errhp, 
                                  (text *)"AQUSER.MY_QUEUE",
                                  (OCIAQDeqOptions *)deqopt, 
                                  &deq_size, 0, mesg_tdo, 
                                  (dvoid **)mesgp, 
                                  (dvoid **)nmesgp, 0, 0, 0, 0));
    k+=batch_size;
  }
  checkerr(errhp, OCITransCommit(svchp, errhp, (ub4) 0)); 
  
  checkerr(errhp, OCIServerDetach( srvhp, errhp, (ub4) OCI_DEFAULT));
  return 0;}
 

C.6 Registering for Notification

ub4 OCISubscriptionRegister ( OCISvcCtx         *svchp,
                              OCISubscription   **subscrhpp,
                              ub2               count,
                              OCIError          *errhp
                              ub4               mode );

This call registers a callback for message notification.

This interface is only valid for the asynchronous mode of message delivery. In this mode, a subscriber applies a registration call which specifies a callback. When messages are received that match the subscription criteria, the callback is invoked. The callback can then apply an explicit message_receive (dequeue) to retrieve the message.

The user must specify a subscription handle at registration time with the namespace attribute set to OCI_SUBSCR_NAMESPACE_AQ.


See Also:

"OCISubscriptionRegister()" in Oracle Call Interface Programmer's Guide for parameter information and usage notes

Example C-11 Registering for Notifications (OCI)

/* OCISubscriptionRegister can be used by the client to register to receive  
   notifications when messages are enqueued. */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <oci.h>

static OCIEnv     *envhp;
static OCIServer  *srvhp;
static OCIError   *errhp;
static OCISvcCtx  *svchp;

/* The callback that gets invoked on notification */
ub4 notifyCB(ctx, subscrhp, pay, payl, desc, mode)
dvoid *ctx;
OCISubscription *subscrhp;      /* subscription handle */
dvoid           *pay;           /* payload  */
ub4              payl;          /* payload length */
dvoid           *desc;          /* the AQ notification descriptor */
ub4              mode;
{
 text                *subname;
 ub4                  size;
 ub4                 *number = (ub4 *)ctx;
 text                *queue;
 text                *consumer;
 OCIRaw              *msgid;
 OCIAQMsgProperties  *msgprop;

  (*number)++;

  /* Get the subscription name */
  OCIAttrGet((dvoid *)subscrhp, OCI_HTYPE_SUBSCRIPTION,
                             (dvoid *)&subname, &size,
                             OCI_ATTR_SUBSCR_NAME, errhp);
 printf("got notification number %d for %.*s  %d  \n", 
         *number, size, subname, payl);

 /* Get the queue name from the AQ notify descriptor */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&queue, &size, 
             OCI_ATTR_QUEUE_NAME, errhp);
 
 /* Get the consumer name for which this notification was received */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&consumer, &size, 
       OCI_ATTR_CONSUMER_NAME, errhp);

 /* Get the message ID of the message for which we were notified */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgid, &size, 
       OCI_ATTR_NFY_MSGID, errhp);

 /* Get the message properties of the message for which we were notified */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgprop, &size, 
       OCI_ATTR_MSG_PROP, errhp);
}
int main(argc, argv)
int argc;
char *argv[];
{
  OCISession *authp = (OCISession *) 0;

  /* The subscription handles */
  OCISubscription *subscrhp[5];

  /* Registrations are for AQ namespace */
  ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ;

  /* The context fot the callback */
  ub4 ctx[5] = {0,0,0,0,0};

  printf("Initializing OCI Process\n");

  /* The OCI Process Environment must be initialized  with OCI_EVENTS */
  /* OCI_OBJECT flag is set to enable us dequeue */
  (void) OCIInitialize((ub4) OCI_EVENTS|OCI_OBJECT, (dvoid *)0,
                       (dvoid * (*)(dvoid *, size_t)) 0,
                       (dvoid * (*)(dvoid *, dvoid *, size_t))0,
                       (void (*)(dvoid *, dvoid *)) 0 );

  printf("Initialization successful\n");

  /* The standard OCI setup */
  printf("Initializing OCI Env\n");
  (void) OCIEnvInit((OCIEnv **) &envhp, OCI_DEFAULT, (size_t) 0, 
                (dvoid **) 0 );

  (void) OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, OCI_HTYPE_ERROR, 
                   (size_t) 0, (dvoid **) 0);

  /* Server contexts */
  (void) OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, OCI_HTYPE_SERVER,
                   (size_t) 0, (dvoid **) 0);

  (void) OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, OCI_HTYPE_SVCCTX,
                   (size_t) 0, (dvoid **) 0);


  printf("connecting to server\n");
  (void) OCIServerAttach( srvhp, errhp, (text *)"", strlen(""), 0);
  printf("connect successful\n");

  /* Set attribute server context in the service context */
  (void) OCIAttrSet( (dvoid *) svchp, OCI_HTYPE_SVCCTX, (dvoid *)srvhp, 
          (ub4) 0, OCI_ATTR_SERVER, (OCIError *) errhp);

  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&authp,
             (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0);
 
  (void) OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION,
                 (dvoid *) "scott", (ub4) strlen("scott"),
                 (ub4) OCI_ATTR_USERNAME, errhp);
 
  (void) OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION,
                 (dvoid *) "tiger", (ub4) strlen("tiger"),
                 (ub4) OCI_ATTR_PASSWORD, errhp);

  checkerr(errhp, OCISessionBegin ( svchp,  errhp, authp, OCI_CRED_RDBMS, 
           (ub4) OCI_DEFAULT));

  (void) OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX,
                   (dvoid *) authp, (ub4) 0,
                   (ub4) OCI_ATTR_SESSION, errhp);

 /* Setting the subscription handle for notification on
    a NORMAL single-consumer queue */
  printf("allocating subscription handle\n");
  subscrhp[0] = (OCISubscription *)0;
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[0], 
         (ub4) OCI_HTYPE_SUBSCRIPTION,
         (size_t) 0, (dvoid **) 0);
 
  printf("setting subscription name\n");
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                (dvoid *) "SCOTT.SCQ1", (ub4) strlen("SCOTT.SCQ1"),
                (ub4) OCI_ATTR_SUBSCR_NAME, errhp);
 
  printf("setting subscription callback\n");
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) notifyCB, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp);

 printf("setting subscription context \n");
 (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *)&ctx[0], (ub4)sizeof(ctx[0]),
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp);

  printf("setting subscription namespace\n");
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) &namespace, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp);

 /* Setting the subscription handle for notification on a NORMAL multiconsumer 
    consumer queue */
  subscrhp[1] = (OCISubscription *)0;
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[1], 
         (ub4) OCI_HTYPE_SUBSCRIPTION,
         (size_t) 0, (dvoid **) 0);

  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) "SCOTT.MCQ1:APP1", 
                 (ub4) strlen("SCOTT.MCQ1:APP1"),
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp);

  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) notifyCB, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp);

  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *)&ctx[1], (ub4)sizeof(ctx[1]),
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp);

  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) &namespace, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp);

C.7 Enabling Subscription Registration

ub4 OCISubscriptionEnable ( OCISubscription   *subscrhp,
                            OCIError          *errhp
                            ub4               mode );

This call enables a subscription registration that has been disabled. This turns on all notifications.


See Also:

"OCISubscriptionEnable()" in Oracle Call Interface Programmer's Guide for parameter information and usage notes

C.8 Unregistering a Subscription

ub4 OCISubscriptionUnRegister ( OCISvcCtx         *svchp,
                                OCISubscription   *subscrhp,
                                OCIError          *errhp
                                ub4               mode );

This call unregisters a subscription, which turns off notifications.


See Also:

"OCISubscriptionUnRegister()" in Oracle Call Interface Programmer's Guide for parameter information and usage notes

C.9 Posting for Subscriber Notification

ub4 OCISubscriptionPost ( OCISvcCtx         *svchp,
                          OCISubscription   **subscrhpp,
                          ub2               count,
                          OCIError          *errhp
                          ub4               mode );

This call posts to a subscription which allows all clients who are registered for the subscription to get notifications.


See Also:

"OCISubscriptionPost()" in Oracle Call Interface Programmer's Guide for parameter information and usage notes

C.10 Deploying Oracle Streams AQ with XA

You must set up the following data structures for certain examples to work:

CONNECT system/manager;
DROP USER aqadm CASCADE;
GRANT DBA TO aqadm; 
CREATE USER aqadm IDENTIFIED BY aqadm;
GRANT EXECUTE ON DBMS_AQADM TO aqadm;
GRANT Aq_administrator_role TO aqadm;
DROP USER aq CASCADE;
CREATE USER aq IDENTIFIED BY aq;
GRANT DBA TO aq; 
GRANT EXECUTE ON dbms_aq TO aq;
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE(
   queue_table => 'aq.qtable', 
   queue_payload_type => 'RAW');

EXECUTE DBMS_AQADM.CREATE_QUEUE(
   queue_name => 'aq.aqsqueue', 
   queue_table => 'aq.qtable');

EXECUTE DBMS_AQADM.START_QUEUE(queue_name => 'aq.aqsqueue');

The following example illustrates how to deploy Oracle Streams AQ with XA.

Example C-12 Deploying Oracle Streams AQ with XA

/* 
 * The program uses the XA interface to enqueue 100 messages and then 
 * dequeue them.
 * Login: aq/aq
 * Requires: AQ_USER_ROLE to be granted to aq
 *        a RAW queue called "aqsqueue" to be created in aqs schema
 *        (preceding steps can be performed by running aqaq.sql)
 * Message Format: Msgno: [0-1000] HELLO, WORLD!
 * Author: schandra@us.oracle.com
 */

#ifndef OCI_ORACLE
#include <oci.h>
#endif

#include <xa.h> 

/* XA open string */
char xaoinfo[] =  "oracle_xa+ACC=P/AQ/AQ+SESTM=30+Objects=T"; 

/* template for generating XA XIDs */
XID  xidtempl = { 0x1e0a0a1e, 12, 8, "GTRID001BQual001" }; 

/* Pointer to Oracle XA function table */
extern struct xa_switch_t xaosw;                         /* Oracle XA switch */
static struct xa_switch_t *xafunc = &xaosw; 

/* dummy stubs for ax_reg and ax_unreg */
int ax_reg(rmid, xid, flags)
int  rmid;
XID *xid;
long flags;
{
  xid->formatID = -1;
  return 0;
}
 
int ax_unreg(rmid, flags)
int     rmid;
long    flags;
{
  return 0;
}

/* generate an XID */
void xidgen(xid, serialno) 
XID *xid;
int  serialno;
{ 
  char seq [11]; 
 
  sprintf(seq, "%d", serialno); 
  memcpy((void *)xid, (void *)&xidtempl, sizeof(XID));
  strncpy((&xid->data[5]), seq, 3); 
} 

/* check if XA operation succeeded */
#define checkXAerr(action, funcname)    \
    if ((action) != XA_OK)        \
    {                  \
      printf("%s failed!\n", funcname);   \
      exit(-1);            \
    } else

/* check if OCI operation succeeded */
static void checkOCIerr(errhp, status)
OCIError *errhp;
sword      status;
{
  text errbuf[512];
  ub4 buflen;
  sb4 errcode;

  if (status == OCI_SUCCESS) return;

  if (status == OCI_ERROR)
  {
    OCIErrorGet((dvoid *) errhp, 1, (text *)0, &errcode, errbuf,
      (ub4)sizeof(errbuf), OCI_HTYPE_ERROR);
    printf("Error - %s\n", errbuf);
  }
  else
    printf("Error - %d\n", status);
  exit (-1);
}


void main(argc, argv)
int    argc;
char **argv;
{ 
  int         msgno = 0;             /* message being enqueued */
  OCIEnv       *envhp;                /* OCI environment handle */
  OCIError     *errhp;                 /* OCI Error handle */
  OCISvcCtx    *svchp;                    /* OCI Service handle */
  char      message[128];                /* message buffer */
  ub4      mesglen;             /* length of message */
  OCIRaw       *rawmesg = (OCIRaw *)0;       /* message in OCI RAW format */
  OCIInd      ind = 0;                 /* OCI null indicator */
  dvoid          *indptr = (dvoid *)&ind;       /* null indicator pointer */
  OCIType      *mesg_tdo = (OCIType *) 0;         /* TDO for RAW datatype */
  XID      xid;                 /* XA's global transaction id */
  ub4      i;                      /* array index */

  checkXAerr(xafunc->xa_open_entry(xaoinfo, 1, TMNOFLAGS), "xaoopen");

  svchp = xaoSvcCtx((text *)0);           /* get service handle from XA */
  envhp = xaoEnv((text *)0);          /* get enviornment handle from XA */

  if (!svchp || !envhp)
  {
    printf("Unable to obtain OCI Handles from XA!\n");
    exit (-1);
  }

  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&errhp, 
       OCI_HTYPE_ERROR, 0, (dvoid **)0);  /* allocate error handle */

  /* enqueue 1000 messages, 1 message for each XA transaction */
  for (msgno = 0; msgno < 1000; msgno++)
  {
    sprintf((const char *)message, "Msgno: %d, Hello, World!", msgno);
    mesglen = (ub4)strlen((const char *)message);
    xidgen(&xid, msgno);                 /* generate an XA xid */

    checkXAerr(xafunc->xa_start_entry(&xid, 1, TMNOFLAGS), "xaostart");

    checkOCIerr(errhp, OCIRawAssignBytes(envhp, errhp, (ub1 *)message, mesglen,
                &rawmesg));

    if (!mesg_tdo)          /* get Type descriptor (TDO) for RAW type */
      checkOCIerr(errhp, OCITypeByName(envhp, errhp, svchp, 
                      (CONST text *)"AQADM", strlen("AQADM"),
                       (CONST text *)"RAW", strlen("RAW"), 
                   (text *)0, 0, OCI_DURATION_SESSION, 
                   OCI_TYPEGET_ALL, &mesg_tdo));

    checkOCIerr(errhp, OCIAQEnq(svchp, errhp, (CONST text *)"aqsqueue",
               0, 0, mesg_tdo, (dvoid **)&rawmesg, &indptr, 
            0, 0)); 

    checkXAerr(xafunc->xa_end_entry(&xid, 1, TMSUCCESS), "xaoend");
    checkXAerr(xafunc->xa_commit_entry(&xid, 1, TMONEPHASE), "xaocommit");
    printf("%s Enqueued\n", message);
  }

  /* dequeue 1000 messages within one XA transaction */
  xidgen(&xid, msgno);                           /* generate an XA xid */
  checkXAerr(xafunc->xa_start_entry(&xid, 1, TMNOFLAGS), "xaostart");
  for (msgno = 0; msgno < 1000; msgno++)
  {
    checkOCIerr(errhp, OCIAQDeq(svchp, errhp, (CONST text *)"aqsqueue", 
             0, 0, mesg_tdo, (dvoid **)&rawmesg, &indptr, 
            0, 0));
    if (ind)
      printf("Null Raw Message");
    else
      for (i = 0; i < OCIRawSize(envhp, rawmesg); i++)
   printf("%c", *(OCIRawPtr(envhp, rawmesg) + i));
    printf("\n");

  }
  checkXAerr(xafunc->xa_end_entry(&xid, 1, TMSUCCESS), "xaoend");
  checkXAerr(xafunc->xa_commit_entry(&xid, 1, TMONEPHASE), "xaocommit");
} 

C.11 Oracle Streams AQ and Memory Usage

You must set up the following data structures for certain examples to work:

/* Create_types.sql */
CONNECT system/manager
GRANT AQ_ADMINISTRATOR_ROLE, AQ_USER_ROLE TO scott;
CONNECT scott/tiger
CREATE TYPE MESSAGE AS OBJECT (id NUMBER, data VARCHAR2(80));
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE(
   queue_table        => 'qt', 
   queue_payload_type => 'message');
EXECUTE DBMS_AQADM.CREATE_QUEUE('msgqueue', 'qt');
EXECUTE DBMS_AQADM.START_QUEUE('msgqueue');

The following examples illustrate Oracle Streams AQ memory usage:

Example C-13 Enqueuing Messages, Freeing Memory After Every Call (OCI)

This program, enqnoreuse.c, dequeues each line of text from a queue 'msgqueue' that has been created in the scott schema using create_types.sql. Messages are enqueued using enqnoreuse.c or enqreuse.c (see the following). If there are no messages, then it waits for 60 seconds before timing out. In this program, the dequeue subroutine does not reuse client side objects' memory. It allocates the required memory before dequeue and frees it after the dequeue is complete.

#ifndef OCI_ORACLE
#include <oci.h>
#endif

#include <stdio.h>

static void checkerr(OCIError *errhp, sword status);
static void deqmesg(text *buf, ub4 *buflen);

OCIEnv       *envhp;
OCIError     *errhp;
OCISvcCtx    *svchp;

struct message
{
  OCINumber    id;
  OCIString   *data;
};
typedef struct message message;

struct null_message
{
  OCIInd    null_adt;
  OCIInd    null_id;
  OCIInd    null_data;
};
typedef struct null_message null_message;

static void deqmesg(buf, buflen)
text  *buf;
ub4   *buflen;
{
  OCIType         *mesgtdo = (OCIType *)0;   /* type descr of SCOTT.MESSAGE */
  message         *mesg    = (dvoid *)0;     /* instance of SCOTT.MESSAGE */
  null_message    *mesgind = (dvoid *)0;     /* null indicator */
  OCIAQDeqOptions *deqopt  = (OCIAQDeqOptions *)0;
  ub4              wait    = 60;             /* timeout after 60 seconds */
  ub4              navigation = OCI_DEQ_FIRST_MSG;/* always get head of q */

 
  /* Get the type descriptor object for the type SCOTT.MESSAGE: */
  checkerr(errhp, OCITypeByName(envhp, errhp, svchp, 
           (CONST text *)"SCOTT", strlen("SCOTT"),
           (CONST text *)"MESSAGE", strlen("MESSAGE"), 
           (text *)0, 0, OCI_DURATION_SESSION, 
           OCI_TYPEGET_ALL, &mesgtdo));

  /* Allocate an instance of SCOTT.MESSAGE, and get its null indicator: */
  checkerr(errhp, OCIObjectNew(envhp, errhp, svchp, OCI_TYPECODE_OBJECT,
           mesgtdo, (dvoid *)0, OCI_DURATION_SESSION,
           TRUE, (dvoid **)&mesg));
  checkerr(errhp, OCIObjectGetInd(envhp, errhp, (dvoid *)mesg, 
           (dvoid **)&mesgind));

  /* Allocate a descriptor for dequeue options and set wait time, navigation: */
  checkerr(errhp, OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, 
           OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **)0));
  checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 
           (dvoid *)&wait, 0, OCI_ATTR_WAIT, errhp));
  checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 
           (dvoid *)&navigation, 0, 
           OCI_ATTR_NAVIGATION, errhp));

  /* Dequeue the message and commit: */
  checkerr(errhp, OCIAQDeq(svchp, errhp, (CONST text *)"msgqueue", 
           deqopt, 0, mesgtdo, (dvoid **)&mesg, 
           (dvoid **)&mesgind, 0, 0));

  checkerr(errhp, OCITransCommit(svchp, errhp, (ub4) 0));

  /* Copy the message payload text into the user buffer: */
  if (mesgind->null_data)
    *buflen = 0;
  else
    memcpy((dvoid *)buf, (dvoid *)OCIStringPtr(envhp, mesg->data), 
           (size_t)(*buflen = OCIStringSize(envhp, mesg->data)));

  /* Free the dequeue options descriptor: */
  checkerr(errhp, OCIDescriptorFree((dvoid *)deqopt, OCI_DTYPE_AQDEQ_OPTIONS));

  /* Free the memory for the objects: */
  Checkerr(errhp, OCIObjectFree(envhp, errhp, (dvoid *)mesg, 
           OCI_OBJECTFREE_FORCE));
}                           /* end deqmesg */

void main()
{
  OCIServer     *srvhp;
  OCISession    *usrhp;
  dvoid         *tmp;
  text           buf[80];                 /* payload text */
  ub4            buflen;

  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)()) 0,
                (dvoid * (*)()) 0,  (void (*)()) 0 );

  OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
                 52, (dvoid **) &tmp);

  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );

  OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
                 52, (dvoid **) &tmp);
  OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
                 52, (dvoid **) &tmp);

  OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);

  OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
                 52, (dvoid **) &tmp);

  /* Set attribute server context in the service context: */
  OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
             (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);

  /* Allocate a user context handle: */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
                 (size_t) 0, (dvoid **) 0);

  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
             (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp);

  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
             (dvoid *)"tiger", (ub4)strlen("tiger"), OCI_ATTR_PASSWORD, errhp);

  checkerr(errhp, OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, 
           OCI_DEFAULT));

  OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX,
             (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp);

  do {
    deqmesg(buf, &buflen);
    printf("%.*s\n", buflen, buf);
  } while(1);
}                         /* end main */

static void checkerr(errhp, status)
OCIError  *errhp;
sword      status;
{
  text errbuf[512];
  ub4  buflen;
  sb4 errcode;

  if (status == OCI_SUCCESS) return;

  switch (status)
  {
  case OCI_ERROR:
    OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode,
                 errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR);
    printf("Error - %s\n", errbuf);
    break;
  case OCI_INVALID_HANDLE:
    printf("Error - OCI_INVALID_HANDLE\n");
    break;
  default:
    printf("Error - %d\n", status);
    break;
  }
  exit(-1);
}                          /* end checkerr */

Example C-14 Enqueuing Messages, Reusing Memory (OCI)

This program, enqreuse.c, enqueues each line of text into a queue 'msgqueue' that has been created in the scott schema by executing create_types.sql. Each line of text entered by the user is stored in the queue until user enters EOF. In this program the enqueue subroutine reuses the memory for the message payload, as well as the Oracle Streams AQ message properties descriptor.

#ifndef OCI_ORACLE
#include <oci.h>
#endif

#include <stdio.h>

static void checkerr(OCIError *errhp, sword status);
static void enqmesg(ub4 msgno, text *buf);

struct message
{
  OCINumber     id;
  OCIString    *data;
};
typedef struct message message;

struct null_message
{
  OCIInd    null_adt;
  OCIInd    null_id;
  OCIInd    null_data;
};
typedef struct null_message null_message;

/* Global data reused on calls to enqueue: */
OCIEnv             *envhp;
OCIError           *errhp;
OCISvcCtx          *svchp;
message             msg;
null_message        nmsg;
OCIAQMsgProperties *msgprop;

static void enqmesg(msgno, buf)
ub4     msgno;
text   *buf;
{
  OCIType          *mesgtdo = (OCIType *)0; /* type descr of SCOTT.MESSAGE */
  message          *mesg = &msg;            /* instance of SCOTT.MESSAGE */
  null_message     *mesgind = &nmsg;        /* null indicator */
  text              corrid[128];            /* correlation identifier */

  /* Get the type descriptor object for the type SCOTT.MESSAGE: */
  checkerr(errhp, OCITypeByName(envhp, errhp, svchp, 
           (CONST text *)"SCOTT", strlen("SCOTT"),
           (CONST text *)"MESSAGE", strlen("MESSAGE"), 
           (text *)0, 0, OCI_DURATION_SESSION, 
           OCI_TYPEGET_ALL, &mesgtdo));

  /* Fill in the attributes of SCOTT.MESSAGE: */
  checkerr(errhp, OCINumberFromInt(errhp, &msgno, sizeof(ub4), 0, &mesg->id));
  checkerr(errhp, OCIStringAssignText(envhp, errhp, buf, strlen(buf),
           &mesg->data));
  mesgind->null_adt = mesgind->null_id = mesgind->null_data = 0;

  /* Set the correlation id in the message properties descriptor: */
  sprintf((char *)corrid, "Msg#: %d", msgno);
  checkerr(errhp, OCIAttrSet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES, 
           (dvoid *)&corrid, strlen(corrid), 
           OCI_ATTR_CORRELATION, errhp));

  /* Enqueue the message and commit: */
  checkerr(errhp, OCIAQEnq(svchp, errhp, (CONST text *)"msgqueue", 
           0, msgprop, mesgtdo, (dvoid **)&mesg, 
           (dvoid **)&mesgind, 0, 0));

  checkerr(errhp, OCITransCommit(svchp, errhp, (ub4) 0));
}                           /* end enqmesg */

void main()
{
  OCIServer    *srvhp;
  OCISession   *usrhp;
  dvoid        *tmp;
  text          buf[80];                /* user supplied text */
  int           msgno = 0;

  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)()) 0,
                (dvoid * (*)()) 0,  (void (*)()) 0 );

  OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
                 52, (dvoid **) &tmp);

  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );

  OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
                 52, (dvoid **) &tmp);
  OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
                 52, (dvoid **) &tmp);

  OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);

  OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
                 52, (dvoid **) &tmp);

  /* Set attribute server context in the service context: */
  OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
             (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);

  /* Allocate a user context handle: */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
                 (size_t) 0, (dvoid **) 0);

  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
             (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp);

  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
             (dvoid *)"tiger", (ub4)strlen("tiger"), OCI_ATTR_PASSWORD, errhp);

  checkerr(errhp, OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, 
           OCI_DEFAULT));

  OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX,
             (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp);

  /* Allocate a message properties descriptor to fill in correlation ID :*/
  checkerr(errhp, OCIDescriptorAlloc(envhp, (dvoid **)&msgprop, 
           OCI_DTYPE_AQMSG_PROPERTIES, 
           0, (dvoid **)0));
  do {
    printf("Enter a line of text (max 80 chars):");
    if (!gets((char *)buf))
      break;
    enqmesg((ub4)msgno++, buf);
  } while(1);

  /* Free the message properties descriptor: */
  checkerr(errhp, OCIDescriptorFree((dvoid *)msgprop, 
           OCI_DTYPE_AQMSG_PROPERTIES));

}                         /* end main */

static void checkerr(errhp, status)
OCIError   *errhp;
sword      status;
{
  text errbuf[512];
  ub4  buflen;
  sb4  errcode;

  if (status == OCI_SUCCESS) return;

  switch (status)
  {
  case OCI_ERROR:
    OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode,
                 errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR);
    printf("Error - %s\n", errbuf);
    break;
  case OCI_INVALID_HANDLE:
    printf("Error - OCI_INVALID_HANDLE\n");
    break;
  default:
    printf("Error - %d\n", status);
    break;
  }
  exit(-1);
}                          /* end checkerr */

Example C-15 Dequeuing Messages, Freeing Memory After Every Call (OCI)

This program, deqnoreuse.c, dequeues each line of text from a queue 'msgqueue' that has been created in the scott schema by executing create_types.sql. Messages are enqueued using enqnoreuse or enqreuse. If there are no messages, then it waits for 60 seconds before timing out. In this program the dequeue subroutine does not reuse client side objects' memory. It allocates the required memory before dequeue and frees it after the dequeue is complete.

#ifndef OCI_ORACLE
#include <oci.h>
#endif

#include <stdio.h>

static void checkerr(OCIError *errhp, sword status);
static void deqmesg(text *buf, ub4 *buflen);

OCIEnv       *envhp;
OCIError     *errhp;
OCISvcCtx    *svchp;

struct message
{
  OCINumber     id;
  OCIString    *data;
};
typedef struct message message;

struct null_message
{
  OCIInd    null_adt;
  OCIInd    null_id;
  OCIInd    null_data;
};
typedef struct null_message null_message;

static void deqmesg(buf, buflen)
text      *buf;
ub4       *buflen;
{
  OCIType          *mesgtdo = (OCIType *)0;  /* type descr of SCOTT.MESSAGE */
  message          *mesg = (dvoid *)0;       /* instance of SCOTT.MESSAGE */
  null_message     *mesgind   = (dvoid *)0;       /* null indicator */
  OCIAQDeqOptions  *deqopt    = (OCIAQDeqOptions *)0;
  ub4               wait      = 60;               /* timeout after 60 seconds */
  ub4              navigation = OCI_DEQ_FIRST_MSG;/* always get head of q */

   /* Get the type descriptor object for the type SCOTT.MESSAGE: */
  checkerr(errhp, OCITypeByName(envhp, errhp, svchp, 
           (CONST text *)"SCOTT", strlen("SCOTT"),
           (CONST text *)"MESSAGE", strlen("MESSAGE"), 
           (text *)0, 0, OCI_DURATION_SESSION, 
           OCI_TYPEGET_ALL, &mesgtdo));

  /* Allocate an instance of SCOTT.MESSAGE, and get its null indicator: */
  checkerr(errhp, OCIObjectNew(envhp, errhp, svchp, OCI_TYPECODE_OBJECT,
           mesgtdo, (dvoid *)0, OCI_DURATION_SESSION,
           TRUE, (dvoid **)&mesg));
  checkerr(errhp, OCIObjectGetInd(envhp, errhp, (dvoid *)mesg, 
           (dvoid **)&mesgind));

  /* Allocate a descriptor for dequeue options and set wait time, navigation: */
  checkerr(errhp, OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, 
           OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **)0));
  checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 
           (dvoid *)&wait, 0, OCI_ATTR_WAIT, errhp));
  checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 
           (dvoid *)&navigation, 0, 
           OCI_ATTR_NAVIGATION, errhp));

  /* Dequeue the message and commit: */
  checkerr(errhp, OCIAQDeq(svchp, errhp, (CONST text *)"msgqueue", 
           deqopt, 0, mesgtdo, (dvoid **)&mesg, 
           (dvoid **)&mesgind, 0, 0));

  checkerr(errhp, OCITransCommit(svchp, errhp, (ub4) 0));

  /* Copy the message payload text into the user buffer: */
  if (mesgind->null_data)
    *buflen = 0;
  else
    memcpy((dvoid *)buf, (dvoid *)OCIStringPtr(envhp, mesg->data), 
           (size_t)(*buflen = OCIStringSize(envhp, mesg->data)));

  /* Free the dequeue options descriptor: */
  checkerr(errhp, OCIDescriptorFree((dvoid *)deqopt, OCI_DTYPE_AQDEQ_OPTIONS));

  /* Free the memory for the objects: */
  checkerr(errhp, OCIObjectFree(envhp, errhp, (dvoid *)mesg, 
           OCI_OBJECTFREE_FORCE));
}                                       /* end deqmesg */

void main()
{
  OCIServer    *srvhp;
  OCISession   *usrhp;
  dvoid        *tmp;
  text          buf[80];                 /* payload text */
  ub4           buflen;

  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)()) 0,
                (dvoid * (*)()) 0,  (void (*)()) 0 );

  OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
                 52, (dvoid **) &tmp);

  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );

  OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
                 52, (dvoid **) &tmp);
  OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
                 52, (dvoid **) &tmp);

  OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);

  OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
                 52, (dvoid **) &tmp);

  /* Set attribute server context in the service context: */
  OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
             (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);

  /* Allocate a user context handle: */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
                 (size_t) 0, (dvoid **) 0);

  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
             (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp);

  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
             (dvoid *)"tiger", (ub4)strlen("tiger"), OCI_ATTR_PASSWORD, errhp);

  checkerr(errhp, OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, 
           OCI_DEFAULT));

  OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX,
             (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp);

  do {
    deqmesg(buf, &buflen);
    printf("%.*s\n", buflen, buf);
  } while(1);
}                         /* end main */

static void checkerr(errhp, status)
OCIError   *errhp;
sword      status;
{
  text errbuf[512];
  ub4  buflen;
  sb4  errcode;

  if (status == OCI_SUCCESS) return;

  switch (status)
  {
  case OCI_ERROR:
    OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode,
                 errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR);
    printf("Error - %s\n", errbuf);
    break;
  case OCI_INVALID_HANDLE:
    printf("Error - OCI_INVALID_HANDLE\n");
    break;
  default:
    printf("Error - %d\n", status);
    break;
  }
  exit(-1);
}                          /* end checkerr */

Example C-16 Dequeuing Messages, Reusing Memory (OCI)

This program, deqreuse.c, dequeues each line of text from a queue 'msgqueue' that has been created in the scott schema by executing create_types.sql. Messages are enqueued using enqnoreuse.c or enqreuse.c. If there are no messages, then it waits for 60 seconds before timing out. In this program, the dequeue subroutine reuses client side objects' memory between invocation of OCIAQDeq.

  • During the first call to OCIAQDeq, OCI automatically allocates the memory for the message payload.

  • During subsequent calls to OCIAQDeq, the same payload pointers are passed and OCI automatically resizes the payload memory if necessary.

  • #ifndef OCI_ORACLE

#include <oci.h>
#endif

#include <stdio.h>

static void checkerr(OCIError *errhp, sword status);
static void deqmesg(text *buf, ub4 *buflen);

struct message
{
  OCINumber    id;
  OCIString   *data;
};
typedef struct message message;

struct null_message
{
  OCIInd    null_adt;
  OCIInd    null_id;
  OCIInd    null_data;
};
typedef struct null_message null_message;

/* Global data reused on calls to enqueue: */
OCIEnv          *envhp;
OCIError        *errhp;
OCISvcCtx       *svchp;
OCIAQDeqOptions *deqopt;
message         *mesg = (message *)0;
null_message    *mesgind = (null_message *)0;

static void deqmesg(buf, buflen)
text       *buf;
ub4        *buflen;
{

  OCIType        *mesgtdo    = (OCIType *)0; /* type descr of SCOTT.MESSAGE */
  ub4             wait       = 60;           /* timeout after 60 seconds */
  ub4             navigation = OCI_DEQ_FIRST_MSG;/* always get head of q */

  /* Get the type descriptor object for the type SCOTT.MESSAGE: */
  checkerr(errhp, OCITypeByName(envhp, errhp, svchp, 
           (CONST text *)"SCOTT", strlen("SCOTT"),
           (CONST text *)"MESSAGE", strlen("MESSAGE"), 
           (text *)0, 0, OCI_DURATION_SESSION, 
           OCI_TYPEGET_ALL, &mesgtdo));

  /* Set wait time, navigation in dequeue options: */
  checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 
           (dvoid *)&wait, 0, OCI_ATTR_WAIT, errhp));
  checkerr(errhp, OCIAttrSet(deqopt, OCI_DTYPE_AQDEQ_OPTIONS, 
           (dvoid *)&navigation, 0, 
           OCI_ATTR_NAVIGATION, errhp));

  /* 
   * Dequeue the message and commit. The memory for the payload is
   * automatically allocated/resized by OCI:
   */
  checkerr(errhp, OCIAQDeq(svchp, errhp, (CONST text *)"msgqueue", 
           deqopt, 0, mesgtdo, (dvoid **)&mesg, 
           (dvoid **)&mesgind, 0, 0));

  checkerr(errhp, OCITransCommit(svchp, errhp, (ub4) 0));

  /* Copy the message payload text into the user buffer: */
  if (mesgind->null_data)
    *buflen = 0;
  else
    memcpy((dvoid *)buf, (dvoid *)OCIStringPtr(envhp, mesg->data), 
           (size_t)(*buflen = OCIStringSize(envhp, mesg->data)));
}                           /* end deqmesg */

void main()
{
  OCIServer    *srvhp;
  OCISession   *usrhp;
  dvoid        *tmp;
  text          buf[80];                 /* payload text */
  ub4           buflen;

  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)()) 0,
                (dvoid * (*)()) 0,  (void (*)()) 0 );

  OCIHandleAlloc((dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
                 52, (dvoid **) &tmp);

  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 21, (dvoid **) &tmp  );

  OCIHandleAlloc((dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
                 52, (dvoid **) &tmp);
  OCIHandleAlloc((dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
                 52, (dvoid **) &tmp);

  OCIServerAttach(srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);

  OCIHandleAlloc((dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
                 52, (dvoid **) &tmp);

  /* set attribute server context in the service context */
  OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
             (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);

  /* allocate a user context handle */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
                 (size_t) 0, (dvoid **) 0);

  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
             (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp);

  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
             (dvoid *)"tiger", (ub4)strlen("tiger"), OCI_ATTR_PASSWORD, errhp);

  checkerr(errhp, OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, 
           OCI_DEFAULT));

  OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX,
             (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp);

  /* allocate the dequeue options descriptor */
  checkerr(errhp, OCIDescriptorAlloc(envhp, (dvoid **)&deqopt, 
           OCI_DTYPE_AQDEQ_OPTIONS, 0, (dvoid **)0));

  do {
    deqmesg(buf, &buflen);
    printf("%.*s\n", buflen, buf);
  } while(1);

  /* 
   * This program never reaches this point as the dequeue times out & exits.
   * If it does reach here, it is a good place to free the dequeue
   * options descriptor using OCIDescriptorFree and free the memory allocated
   * by OCI for the payload using OCIObjectFree
   */
}                         /* end main */

static void checkerr(errhp, status)
OCIError *errhp;
sword      status;
{
  text errbuf[512];
  ub4  buflen;
  sb4  errcode;

  if (status == OCI_SUCCESS) return;

  switch (status)
  {
  case OCI_ERROR:
    OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode,
                 errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR);
    printf("Error - %s\n", errbuf);
    break;
  case OCI_INVALID_HANDLE:
    printf("Error - OCI_INVALID_HANDLE\n");
    break;
  default:
    printf("Error - %d\n", status);
    break;
  }
  exit(-1);
}                          /* end checkerr */