ACE Tutorial 013
Multiple thread pools


I've been trying to justify the chain of tasks by talking about a Work object that implements a state machine. The idea is that your Work object has to perform a series of discrete steps to complete it's function. Traditionally, all of those steps would take place in one thread of execution. That thread would probably be one from a Task thread pool.

Suppose, however, that some of those steps spend a lot of time waiting for disk IO. You could find that all of your thread-pool threads are just sitting there waiting for the disk. You might then be tempted to increase the thread pool size to get more work through. However, if some of the stages are memory intensive, you could run out of memory if all of the workers get to that state at the same time.

One solution might be to have different thread pools for each state. Each pool could have it's size tuned appropriately for the work that would be done there. That's where the chain of Tasks comes in. In this tutorial's implementation I've taken the easy route and set all of the thread pools to the same size but a more realistic solution would be to set each thread pool in the chain to a specific size as needed by that state of operation.

There's not much to this header either so I've combined it with the cpp file as with task.



work.h


// page07.html,v 1.8 1999/09/22 03:13:49 jcej Exp

#ifndef WORK_H
#define WORK_H

#include "ace/Log_Msg.h"

#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif /* ACE_LACKS_PRAGMA_ONCE */

#include "ace/Synch.h"
#include "mld.h"

/*
   Our specilized message queue and thread pool will know how to do "work" on
   our Unit_Of_Work baseclass.
 */
class Unit_Of_Work
{
public:
    Unit_Of_Work (void);

    virtual ~ Unit_Of_Work (void);

        // Display the object instance value
    void who_am_i (void);

        // The baseclass can override this to show it's "type name"
    virtual void what_am_i (void);

        // This is where you do application level logic.  It will be
        // called once for each thread pool it passes through.  It
        // would typically implement a state machine and execute a
        // different state on each call.
    virtual int process (void);

        // This is called by the last Task in the series (see task.h)
        // in case our process() didn't get through all of it's states.
    virtual int fini (void);

protected:
    ACE_Atomic_Op < ACE_Mutex, int >state_;
    MLD;
};

/*
   A fairly trivial work derivative that implements an equally trivial state
   machine in process()
 */
class Work : public Unit_Of_Work
{
public:
    Work (void);

    Work (int message);

    virtual ~ Work (void);

    void what_am_i (void);

    int process (void);

    int fini (void);

protected:
    int message_;
    MLD;
};

#endif

work.cpp


// page07.html,v 1.8 1999/09/22 03:13:49 jcej Exp

#include "work.h"

/*
   Initialize the state to zero
 */
Unit_Of_Work::Unit_Of_Work (void)
        : state_ (0)
{
    ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Unit_Of_Work ctor\n", (void *) this));
}

Unit_Of_Work::~Unit_Of_Work (void)
{
    ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Unit_Of_Work dtor\n", (void *) this));
}

/*
   Display our instance value
 */
void Unit_Of_Work::who_am_i (void)
{
    ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Unit_Of_Work instance\n", (void *) this));
}

/*
   Dispay our type name
 */
void Unit_Of_Work::what_am_i (void)
{
    ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x I am a Unit_Of_Work object\n", (void *) this));
}

/*
   Return failure.  You should always derive from Unit_Of_Work...
 */
int Unit_Of_Work::process (void)
{
    return -1;
}

/*
   ditto
 */
int Unit_Of_Work::fini (void)
{
    return -1;
}

/*
   Default constructor has no "message number"
 */
Work::Work (void)
        :message_ (-1)
{
    ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Work ctor\n", (void *) this));
}

/*
   The useful constructor remembers which message it is and will tell you if
   you ask.
 */
Work::Work (int message)
        : message_ (message)
{
    ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Work ctor for message %d\n", (void *) this, message_));
}

Work::~Work (void)
{
    ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Work dtor\n", (void *) this));
}

/*
   This objects type name is different from the baseclass
 */
void Work::what_am_i (void)
{
    ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x I am a Work object for message %d\n", (void *) this, message_));
}

/*
   A very simple state machine that just walks through three stages. If it is
   called more than that, it will tell you not to bother.
 */
int Work::process (void)
{
    switch (++state_)
    {
        case 1:
            ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Stage One\n", (void *) this));
            break;
        case 2:
            ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Stage Two\n", (void *) this));
            break;
        case 3:
            ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Stage Three\n", (void *) this));
            break;
        default:
            ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x No work to do in state %d\n",
                        (void *) this, state_.value ()));
            break;
    }
    return (0);
}

/*
   If you don't have enough subtasks in the chain then the state machine won't
   progress to the end.  The fini() hook will allow us to  recover from that by
   executing the remaining states in the final task of the chain.
 */
int Work::fini (void)
{
    while (state_.value () < 3)
    {
        ACE_DEBUG ((LM_DEBUG, "(%P|%t) 0x%x Work::fini() state %d\n", (void *) this,state_.value()));
        if (this->process () == -1)
        {
            ACE_ERROR_RETURN ((LM_ERROR, "%p\n", "process"), -1);
        }
    }
    return (0);
}

And that is that. For a more complex machine that may want to "jump states" you would have to set some "state information" (sorry, bad choice of terminology again) so that process() could decide what to do at each call. You might also modify Task::svc() so that it will respect the return value of process() and do something useful with the information.


[Tutorial Index] [Continue This Tutorial]