Main Page | Class Hierarchy | Class List | File List | Class Members | Related Pages

framework.h

00001 /* BEGIN_COPYRIGHT                                                        */
00002 /*                                                                        */
00003 /* Open Diameter: Open-source software for the Diameter and               */
00004 /*                Diameter related protocols                              */
00005 /*                                                                        */
00006 /* Copyright (C) 2002-2004 Open Diameter Project                          */
00007 /*                                                                        */
00008 /* This library is free software; you can redistribute it and/or modify   */
00009 /* it under the terms of the GNU Lesser General Public License as         */
00010 /* published by the Free Software Foundation; either version 2.1 of the   */
00011 /* License, or (at your option) any later version.                        */
00012 /*                                                                        */
00013 /* This library is distributed in the hope that it will be useful,        */
00014 /* but WITHOUT ANY WARRANTY; without even the implied warranty of         */
00015 /* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU      */
00016 /* Lesser General Public License for more details.                        */
00017 /*                                                                        */
00018 /* You should have received a copy of the GNU Lesser General Public       */
00019 /* License along with this library; if not, write to the Free Software    */
00020 /* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307    */
00021 /* USA.                                                                   */
00022 /*                                                                        */
00023 /* In addition, when you copy and redistribute some or the entire part of */
00024 /* the source code of this software with or without modification, you     */
00025 /* MUST include this copyright notice in each copy.                       */
00026 /*                                                                        */
00027 /* If you make any changes that are appeared to be useful, please send    */
00028 /* sources that include the changed part to                               */
00029 /* diameter-developers@lists.sourceforge.net so that we can reflect your  */
00030 /* changes to one unified version of this software.                       */
00031 /*                                                                        */
00032 /* END_COPYRIGHT                                                          */
00033 // $Id: framework.h,v 1.55 2004/06/17 21:03:14 yohba Exp $
00034 // framework.h: Protocol Framework API based on ACE.
00035 // Written by Yoshihiro Ohba
00036 
00037 #ifndef __FRAMEWORK_H__
00038 #define __FRAMEWORK_H__
00039 
00040 #include <list>
00041 #include <map>
00042 #include <string>
00043 #include <ace/Task.h>
00044 #include <ace/Task_T.h>
00045 #include <ace/Thread.h>
00046 #include <ace/Thread_Manager.h>
00047 #include <ace/Event_Handler.h>
00048 #include <ace/Synch.h>
00049 #include <ace/Reactor.h>
00050 #include <ace/Thread_Exit.h>
00051 #include <ace/Time_Value.h>
00052 #include <ace/Singleton.h>
00053 #include <ace/Token.h>
00054 #include <ace/Log_Msg.h>
00055 #include <ace/Atomic_Op_T.h>
00056 #include <boost/shared_ptr.hpp>
00057 #include <boost/scoped_array.hpp>
00058 #include <boost/shared_array.hpp>
00059 
00061 #define FRAMEWORK_LOG ACE_Log_Msg::instance()->log
00062 
00067 template <class T> struct Type2Type
00068 {
00069   typedef T OriginalType; 
00070 };
00071 
00073 enum AAA_Error { 
00074                 NoInitialState, 
00075                 FoundDuplicateStateTableEntry,
00076                 ActivationFailure,
00077                 TaskNotRunning,
00078                 NoData
00079         };
00080 
00081 
00088 template <class LOCK>
00089 class AAA_ScopeLock
00090 {
00091  public:
00092   AAA_ScopeLock(LOCK &l) : lock(l) {
00093     lock.acquire();
00094   }
00095   ~AAA_ScopeLock() {
00096     lock.release();
00097   }
00098 
00099  private:
00100   LOCK &lock;
00101 };
00102 
00104 typedef AAA_ScopeLock<ACE_Token> AAA_TokenScopeLock;
00105 
00107 typedef AAA_ScopeLock<ACE_Mutex> AAA_MutexScopeLock;
00108 
00110 typedef AAA_ScopeLock<ACE_Thread_Mutex> AAA_ThreadMutexScopeLock;
00111 
00112 class AAA_JobData {};
00113 
00114 class AAA_JobDeleter;
00115 class AAA_Task;
00116 
00122 
00123 
00124 class AAA_Job
00125 {
00126   friend class AAA_JobDeleter;
00127 
00128  public:
00130   AAA_Job(AAA_JobData *d=0, char* name=0) : data(d), priority(1), weight(1)
00131   {
00132     if (name) this->name = std::string(name);
00133     refcount = 1;
00134   }
00135 
00138   virtual int Serve()=0;
00139 
00146   virtual int Schedule(AAA_Job*, size_t backlogSize=1)=0;
00147 
00150   void Delete() { if (--refcount <=0 ) delete this; }
00151 
00153   virtual bool ExistBacklog() { return false; }
00154 
00156   virtual size_t BacklogSize() { return 0; }
00157 
00159   inline std::string& Name() { return name; }
00160 
00162   inline AAA_JobData* Data() { return data; }
00163 
00166   template <class T> inline T* Data(Type2Type<T>) { return (T*)data; }
00167 
00169   unsigned& Priority() { return priority; }
00170 
00172   unsigned& Weight() { return weight; }
00173 
00175   void Acquire() { ++refcount; }
00176 
00180   int Release(int n=1) 
00181   { 
00182     refcount -= n;
00183     if (refcount<0) {
00184       delete this;
00185       return -1;
00186     }
00187     return 0;
00188   }
00189 
00192   virtual bool Running() { return true; }
00193 
00194   bool operator==(AAA_Job* job) { return (this==job); }
00195 
00196   bool operator!=(AAA_Job* job) { return (this!=job); }
00197 
00198  protected:
00199 
00201   virtual ~AAA_Job() {}
00202 
00203  private:  
00204 
00206   std::string name;
00207 
00209   AAA_JobData* data;
00210 
00213   ACE_Atomic_Op<ACE_Thread_Mutex, int> refcount;
00214 
00216   unsigned priority;
00217 
00219   unsigned weight;
00220 };
00221 
00228 template <class JOB>
00229 class AAA_JobHandle : public boost::shared_ptr<JOB>
00230 {
00231  public:
00232   AAA_JobHandle(JOB* job) : boost::shared_ptr<JOB>(job, AAA_JobDeleter()) {}
00233 
00234   JOB& Job() { return (JOB&)*get(); }
00235 };
00236 
00243 class AAA_JobDeleter
00244 {
00245  public:
00246   void operator()(AAA_Job *job) { job->Delete(); }
00247   static void Delete(AAA_Job *job) { job->Delete(); }
00248 };
00249 
00250 const unsigned maxNumPriority = 3;
00251 const unsigned maxMaxWeight = 3;
00252 
00261 template <class T, class LOCK = ACE_Thread_Mutex>
00262   class AAA_QueueJob : public AAA_Job
00263 {
00264  public:
00266   AAA_QueueJob(AAA_JobData *d=0, char* name=0, 
00267                unsigned numPriority=1, unsigned maxWeight=1) throw (int)
00268  : AAA_Job(d, name), 
00269  numPriority(numPriority), maxWeight(maxWeight), total(0),
00270  cond(std::auto_ptr< ACE_Condition<LOCK> >
00271       (new ACE_Condition<LOCK>(lock))),
00272  signaled(false)
00273 
00274  {
00275    if (numPriority > maxNumPriority)
00276      {
00277        FRAMEWORK_LOG(LM_ERROR, "[%N] Too many priorities.\n");
00278        throw 1;
00279      }
00280    if (maxWeight > maxMaxWeight)
00281      {
00282        FRAMEWORK_LOG(LM_ERROR, "[%N] Too large weight.\n");
00283        throw 1;
00284      }
00285    indexQueue = boost::shared_array< std::list<unsigned> >
00286    (new std::list<unsigned>[numPriority]);
00287 
00288    dataQueue = boost::shared_array< std::list<T> >
00289    (new std::list<T>[numPriority*maxWeight]);
00290  }
00291 
00293   virtual ~AAA_QueueJob() {}
00294 
00296   virtual int Serve()=0;
00297 
00299   virtual int Schedule(AAA_Job*, size_t backlogSize=1)=0;
00300 
00302  inline bool ExistBacklog() 
00303  { 
00304    // Lock is not needed here, because this does not change the state
00305    // of the object.
00306    return (total>0);
00307  }
00308 
00309  inline size_t BacklogSize() { return total; }
00310 
00314  inline int Enqueue(T entry, bool blocking=false,
00315                     unsigned priority=1, unsigned weight=1) 
00316  { 
00317    AAA_ScopeLock<LOCK> g(lock); 
00318 
00319    // In the case where one or more threads can enter here
00320    // simultaneously (this is the case when Dequeue is called by a
00321    // task), conditional mutex is needed to avoid possible blocking
00322    // on Enqueue() when the queue is full.
00323 
00324    while (!signaled && total==MaxSize()) 
00325    { 
00326      if (!blocking)
00327        {
00328          FRAMEWORK_LOG(LM_DEBUG, "[%N] Enqueue is blocked.\n");
00329          return 0;
00330        }
00331      cond->wait(); 
00332    }
00333 
00334    if (signaled && blocking)
00335       return -1;
00336 
00337    unsigned pIndex = Index(priority);
00338    unsigned dIndex = Index(priority, weight);
00339 
00340 
00341    // The enqueueing algorithm is as follows.  
00342 
00343    // 1) Compare the weight of the job with the queue length of
00344    // the data queue where the job is enqueued.
00345 
00346    // 2) If the queue length of the data queue is less than the
00347    // weight, then enqueue the index of the data queue to the index
00348    // queue that is corresponding to the the priority of the job.
00349 
00350    // 3) Enqueue the job to the data queue.
00351 
00352    // Step (2) means that the larger the weight, the more the job
00353    // gets the chance to be served.  
00354 
00355    if (dataQueue[dIndex].size() < weight)
00356      indexQueue[pIndex].push_back(dIndex);
00357 
00358    dataQueue[dIndex].push_back(entry); 
00359    total++;
00360 
00361    // Wake up waiting threads.  This signal() call can be outside of
00362    // the scope lock.
00363    cond->signal();
00364    return total;
00365   }
00366 
00373  int Dequeue(T &entry, bool blocking=false) throw(int)
00374   { 
00375     do {
00376       AAA_ScopeLock<LOCK> g(lock);
00377       // In the case where one or more threads can enter here
00378       // simultaneously (this is the case when Dequeue is called by a
00379       // task), conditional mutex is needed to avoid possible blocking
00380       // on Dequeue() when the queue is empty.
00381 
00382       while (!signaled && total==0) 
00383         { 
00384           if (!blocking)
00385             return 0;
00386           cond->wait(); 
00387         }
00388 
00389       if (signaled && blocking)
00390         return -1;
00391 
00392       // The serving algorithm is as follows:
00393 
00394       // 1) Find the highest priority class that has a backlog.
00395 
00396       // 2) Dequeue an index from the index queue of the found
00397       // class. The dequeued index points the data queue to serve.  
00398 
00399       // 3) Dequeue an entry from the data queue corresponding to the
00400       // index.
00401 
00402       // 4) After step (3), if the queue length of the data queue is
00403       // not less than the weight given to that queue, then re-enqueue
00404       // the index to the index queue of the same priority class.
00405 
00406       // Step 2) of Enqueue() and Step 4) of Dequeue are complement
00407       // operations, which provides both "work conserving" and "O(1)
00408       // complexity" characteristics.
00409       
00410       do {
00411         bool found=false;
00412         for (unsigned p=0; p<numPriority; p++)
00413           {
00414             if (indexQueue[p].empty())
00415               continue;
00416 
00417             unsigned dIndex = indexQueue[p].front();
00418             indexQueue[p].pop_front();
00419             entry = dataQueue[dIndex].front();
00420             dataQueue[dIndex].pop_front();
00421             if (dataQueue[dIndex].size() >= (dIndex % maxWeight) + 1)
00422               indexQueue[p].push_back(dIndex);
00423             found=true;
00424             total--;
00425             break;
00426           }
00427 
00428         if (!found)
00429           {
00430             FRAMEWORK_LOG(LM_ERROR, "[%N] BUG (dequeue error).\n");
00431             return 0;
00432           }
00433 
00434       } while (0);
00435     } while (0);
00436     if (total>0) 
00437       cond->signal();
00438     return 1;
00439   }
00440 
00443  inline int Remove(T entry)
00444  { 
00445    AAA_ScopeLock<LOCK> g(lock); 
00446    int totalRemove=0;
00447    for (unsigned i=0; i<numPriority; i++)
00448      for (unsigned j=i*maxWeight; j<(i+1)*maxWeight; j++)
00449      {
00450        unsigned preSize = (unsigned)dataQueue[j].size();
00451        dataQueue[j].remove(entry);
00452        unsigned postSize = (unsigned)dataQueue[j].size();
00453        if (preSize != postSize)
00454          {
00455            totalRemove =+ preSize - postSize;
00456            total -= preSize - postSize;
00457            // Adjust the index queue so that the index for the removed entry 
00458            AdjustIndexQueue(i, j);
00459          }
00460      }
00461    if (totalRemove>0) 
00462      cond->signal();
00463    return totalRemove;
00464  }
00465 
00467   virtual void Flush() 
00468   { 
00469     AAA_ScopeLock<LOCK> g(lock); 
00470     for (unsigned i=0; i<numPriority; indexQueue[i++].clear());
00471     for (unsigned i=0; i<numPriority*maxWeight; dataQueue[i++].clear());
00472     total=0;
00473     cond->signal();
00474   }
00475 
00476  public:
00477  void Signal() 
00478  { 
00479    signaled=true;
00480    cond->broadcast(); 
00481  }
00482 
00484  inline size_t MaxSize() { return (size_t)-1; }
00485 
00486  protected:
00487 
00489   LOCK lock;
00490 
00491  private:
00492 
00493   inline unsigned Index(unsigned p)
00494     {
00495       p = (p>maxNumPriority) ? maxNumPriority : p;
00496       return p-1;
00497     }
00498 
00499  inline unsigned Index(unsigned p, unsigned w)
00500     {
00501       p = (p>maxNumPriority) ? maxNumPriority : p;
00502       w = (w>maxMaxWeight) ? maxMaxWeight : w;
00503       return (p-1)*maxWeight + (w-1);
00504     }
00505 
00506  void AdjustIndexQueue(unsigned pIndex, unsigned dIndex)
00507  {
00508    std::list<unsigned> &iQueue = indexQueue[pIndex];
00509    std::list<unsigned>::iterator i;
00510    unsigned j=0;
00511    for (i=iQueue.begin(); i!=iQueue.end(); )
00512      {
00513        if ((*i) == dIndex)
00514          i = (++j > dataQueue[dIndex].size()) ? iQueue.erase(i) : i++;
00515        else
00516          i++;
00517      }
00518  }
00519 
00520  unsigned numPriority;  // number of priority
00521  unsigned maxWeight;    // maximum weight value
00522 
00523  // This queue stores indexes for the dataQueue per priority.
00524  boost::shared_array< std::list<unsigned> > indexQueue;
00525 
00526  // This queue stores data for per priority and per weight 
00527  boost::shared_array< std::list<T> > dataQueue;
00528 
00529  unsigned total;
00530 
00532  std::auto_ptr< ACE_Condition<LOCK> > cond;
00533 
00535  bool signaled;
00536 };
00537 
00539 typedef int AAA_Event;
00540 
00547 
00548 
00549 typedef AAA_QueueJob<AAA_Event> AAA_EventQueueJob;
00550 
00557 typedef AAA_QueueJob<AAA_Job*> AAA_JobQueueJob;
00558 
00562 #define AAA_SCHED_FIFO      0  
00563 #define AAA_SCHED_WFQ       1
00564 #define AAA_SCHED_PRIORITY  2
00565 
00566 typedef ACE_UINT16 AAA_SchedulingPolicy;
00567 
00568 
00611 class AAA_GroupedJob : public AAA_QueueJob<AAA_Job*, ACE_Thread_Mutex>
00612 {
00613  public:
00615   static AAA_GroupedJob* 
00616     Create(AAA_Job &parent, AAA_JobData* d, 
00617            char *name=0, 
00618            AAA_SchedulingPolicy policy=AAA_SCHED_FIFO,
00619            bool blocking=false,
00620            unsigned numPriority=1, unsigned maxWeight=1) throw(AAA_Error)
00621   {
00622     if (d==0)
00623       {
00624         FRAMEWORK_LOG(LM_ERROR, "AAA_JobData must be non-null.\n");
00625         throw NoData;
00626       }
00627     return new AAA_GroupedJob(parent, d, name, policy, blocking,
00628                               numPriority, maxWeight);
00629   }
00630 
00631   static AAA_GroupedJob* 
00632     Create(AAA_JobData* d, char *name=0, 
00633            AAA_SchedulingPolicy policy=AAA_SCHED_FIFO,
00634            bool blocking=false,
00635            unsigned numPriority=1, unsigned maxWeight=1) throw(AAA_Error)
00636   {
00637     if (d==0)
00638       {
00639         FRAMEWORK_LOG(LM_ERROR, "AAA_JobData must be non-null.\n");
00640         throw NoData;
00641       }
00642     return new AAA_GroupedJob(d, name, policy, blocking,
00643                               numPriority, maxWeight);
00644   }
00645 
00646   void Start() throw(AAA_Error) { running=true; }
00647 
00648   void Stop() 
00649   { 
00650     running=false; Flush(); 
00651     Signal();
00652   }
00653   
00654   void Flush()
00655   {
00656     while (ExistBacklog())
00657       {
00658         AAA_Job* job;
00659         Dequeue(job);   // Non-blocking dequeue.
00660         job->Release();
00661       }
00662   }
00663 
00664   void Remove(AAA_Job *job)
00665   {
00666     // Root job relies on Acquire/Release mechinism instead of
00667     // Remove().
00668     int n = AAA_QueueJob<AAA_Job*, ACE_Thread_Mutex>::Remove(job);
00669     job->Release(n);
00670   }
00671         
00673   int Schedule(AAA_Job *job , size_t backlogSize=1)
00674   {
00675     if (!Running())
00676       return (-1);
00677 
00678     job->Acquire();
00679 
00680     unsigned priority, weight = 0;
00681     EnforceSchedulingPolicy(job, priority, weight);
00682 
00683     // If there is already a backlog for this session, do nothing.
00684     // Job weight is taken into account here.
00685     if (backlogSize > weight)
00686       return (0);
00687 
00688     // Since weight is taken into account by above backlog-weight
00689     // comparison, just specify the priority in Enqueue().
00690     do {
00691       int result = Enqueue(job, blocking);
00692       if (result <= 0)
00693         return (-1);
00694     } while (0);
00695 
00696     //    Enqueue(job, priority);
00697 
00698     // If this is not a root job, then ask the parent to schedule me.
00699     return (!IsRoot()) ? parent.Schedule(this) : 0;
00700   }
00701 
00703   int Serve()
00704   {
00705     AAA_Job* job;
00706 
00707     do {
00708       int result = Dequeue(job, blocking);
00709       if (result <= 0)
00710         return result;
00711     } while (0);
00712 
00713     if (job->Release() == -1)
00714       return (int)BacklogSize();
00715 
00716     unsigned childBacklogSize = job->Serve();
00717 
00718     unsigned priority, weight = 0;
00719     EnforceSchedulingPolicy(job, priority, weight);
00720 
00721     // If there is already a backlog for this session, do nothing.
00722     // Job weight is taken into account here.
00723     if (childBacklogSize < weight)
00724       return (int)BacklogSize();
00725 
00726     job->Acquire();
00727 
00728     // Since weight is taken into account by above backlog-weight
00729     // comparison, just specify the priority in Enqueue().
00730     return Enqueue(job, priority ? true : false);
00731   }
00732 
00733  protected:
00734 
00735  private:
00737    AAA_GroupedJob(AAA_Job &parent, 
00738                   AAA_JobData* d, 
00739                   char *name=0,
00740                   AAA_SchedulingPolicy policy=AAA_SCHED_FIFO,
00741                   bool blocking=false, 
00742                   unsigned numPriority=1, unsigned maxWeight=1)
00743      : AAA_QueueJob<AAA_Job*, ACE_Thread_Mutex>(d, name, 
00744                                                 numPriority, maxWeight), 
00745     parent(parent), policy(policy), running(false), blocking(blocking)
00746   {
00747     // Increment the reference counter of the parent.
00748     parent.Acquire();
00749   }
00750 
00752   AAA_GroupedJob(AAA_JobData* d, char *name=0,
00753                  AAA_SchedulingPolicy policy=AAA_SCHED_FIFO, 
00754                  bool blocking=false,
00755                  unsigned numPriority=1, unsigned maxWeight=1)
00756     : AAA_QueueJob<AAA_Job*, ACE_Thread_Mutex>(d, name,
00757                                                numPriority, maxWeight), 
00758     parent(*this), policy(policy), running(false), blocking(blocking)
00759   {}
00760 
00762   ~AAA_GroupedJob() { 
00763     Stop(); 
00764     if (!IsRoot())
00765       parent.Release(); 
00766   }
00767 
00769   inline bool IsRoot() { return (parent == this); }
00770   
00771 
00774   void EnforceSchedulingPolicy(AAA_Job *job, 
00775                                unsigned &priority,
00776                                unsigned &weight)
00777   {
00779     const int infinity = 100000;
00780  
00781     priority = job->Priority();
00782     weight = job->Weight();
00783 
00784     // Adjust the priority and weight to the scheduling policy.
00785     if (policy != AAA_SCHED_WFQ)
00786       weight = infinity;
00787     
00788     if (policy != AAA_SCHED_PRIORITY)
00789       priority = 1;
00790   }
00791 
00793   AAA_Job& parent;
00794 
00798   unsigned policy;
00799 
00800   bool running;
00801 
00804   bool blocking;
00805 };
00806 
00814 
00815 
00816 class AAA_Task : public ACE_Task<ACE_MT_SYNCH>
00817 {
00818 public:
00819 
00827   AAA_Task(AAA_SchedulingPolicy policy=AAA_SCHED_FIFO, char *name=0,
00828            unsigned numPriority=1, unsigned maxWeight=1) 
00829     : handle
00830     (AAA_JobHandle<AAA_GroupedJob>
00831      (AAA_GroupedJob::Create((AAA_JobData*)this, name, policy, true,
00832                              numPriority, maxWeight))),
00833     cond(std::auto_ptr< ACE_Condition<ACE_Mutex> >
00834          (new ACE_Condition<ACE_Mutex>(mutex)))
00835     {
00836       reactor(0);
00837     }
00838 
00839   virtual ~AAA_Task() { Stop(); }
00840   
00843   void Start(int n) throw(AAA_Error)
00844   {
00845     if (Running())
00846       return;
00847 
00848     if (activate(THR_NEW_LWP|THR_JOINABLE, n+1) == -1)
00849       {
00850         throw ActivationFailure;
00851       }
00852 
00853     // Wait until the timer thread is up and running.
00854     do {
00855       AAA_MutexScopeLock guard(mutex);
00856       while (!Running()) { cond->wait(); }
00857     } while (0);
00858 
00859     Job().Start();
00860   }
00861     
00863     
00864   void Stop()
00865   {
00866     if (!Running())
00867       return;
00868 
00869     // Send delete signal to threads waiting on scheduling queue.
00870     Job().Stop();
00871 
00872     // Send signal to delete the timer thread.
00873     reactor()->schedule_timer(this, (const void*)0, ACE_Time_Value(0));
00874         
00875     // Wait for all activated threads to complete.
00876     wait(); 
00877   }
00878 
00884   inline long ScheduleTimer(ACE_Event_Handler* eventHandler, const void* arg, 
00885                             ACE_Time_Value delay, ACE_Time_Value interval=0)
00886   {
00887     if (Running())
00888       return reactor()->schedule_timer(eventHandler, arg, delay, interval);
00889     return (-1);
00890   }
00891 
00894   inline int CancelTimer(ACE_UINT32 timerHandle, const void** arg)
00895   {
00896     if (Running())
00897       return reactor()->cancel_timer(timerHandle, arg);
00898     return (-1);
00899   }
00900 
00901   inline bool Running() { return reactor() ? true : false; }
00902 
00903   inline AAA_GroupedJob& Job() { return handle.Job(); }
00904 
00905   inline AAA_JobHandle<AAA_GroupedJob>& JobHandle() { return handle; }
00906 
00907  protected:
00908 
00909  private:
00910 
00913   int svc()
00914   {
00915 #if !defined(OS_FREEBSD) && !defined(WIN32)
00916     std::auto_ptr<ACE_Thread_Exit> threadExit(new ACE_Thread_Exit);
00917     ACE_TSS<ACE_Thread_Exit> threadExitTSS(threadExit.get());
00918 #endif
00919     mutex.acquire();
00920     if (!reactor())
00921       {
00922         std::auto_ptr<ACE_Reactor> r(new ACE_Reactor);
00923         reactor(r.get());
00924         mutex.release();
00925         cond->signal();
00926         while (1) // Loop for timer thread loop.
00927           if (reactor()->handle_events() == -1)
00928             FRAMEWORK_LOG(LM_ERROR, "[%N] handle_events().\n", 
00929                           Job().Name().c_str());
00930         return 0;
00931       }
00932     mutex.release();
00933     while (1)  // Loop for serving threads.
00934       {
00935         if (Job().Serve()<0)
00936           return 0;
00937       }
00938     return 0;
00939   }
00940   
00941   int close(u_long flags=0) { 
00942     return 0; 
00943   }
00944 
00945   int handle_timeout(const ACE_Time_Value &tv, const void *arg)
00946   {
00947     reactor(0);
00948     ACE_Thread::exit();
00949     return 0;
00950   }
00951 
00952   AAA_JobHandle<AAA_GroupedJob> handle;
00953 
00955   std::auto_ptr< ACE_Condition<ACE_Mutex> > cond;
00956 
00958   ACE_Mutex mutex;
00959 };
00960 
00968 
00969 typedef int AAA_State;
00970 
00973 template <class ARG> class AAA_Action 
00974 {
00975  public:
00976   virtual void operator()(ARG&)=0;
00977  protected:
00978   virtual ~AAA_Action() {}
00979   AAA_Action() {}
00980 };
00981 
00984 template <class ARG> class AAA_NullAction_S : public AAA_Action<ARG>
00985 {
00986   friend class ACE_Singleton<AAA_NullAction_S<ARG>, 
00987     ACE_Recursive_Thread_Mutex>;
00988  public:
00989   void operator()(ARG&) {}
00990  private:
00991   AAA_NullAction_S() {}
00992   ~AAA_NullAction_S() {}
00993 };
00994 
00997 template <class ARG> class AAA_NullAction 
00998 {
00999  public:
01000   AAA_Action<ARG>& operator()() 
01001   { 
01002     return *ACE_Singleton<AAA_NullAction_S<ARG>, ACE_Recursive_Thread_Mutex>::
01003       instance();
01004   }
01005 };
01006 
01016 
01017 
01018 template <class ARG> class AAA_StateTableEntry 
01019 {
01020 public:
01022   AAA_StateTableEntry(AAA_State st1, AAA_Event ev, AAA_State st2, 
01023                       AAA_Action<ARG>& ac=AAA_NullAction<ARG>()()) : 
01024   prevState(st1), event(ev), nextState(st2), action(ac), isWildcardEvent(false)
01025   {}
01027   AAA_StateTableEntry(AAA_State st1, AAA_State st2, 
01028                       AAA_Action<ARG>& ac=AAA_NullAction<ARG>()()) :
01029   prevState(st1), nextState(st2), action(ac), isWildcardEvent(true)
01030   {}
01031 
01033   ~AAA_StateTableEntry() {}
01034 
01035   inline AAA_State PrevState() { return prevState; }
01036   inline AAA_Event Event() { return event; }
01037   inline AAA_State NextState() { return nextState; }
01038   inline AAA_Action<ARG>& Action() { return action; }
01039   inline bool IsWildcardEvent() { return isWildcardEvent; }
01040 
01041  protected:
01042   AAA_State   prevState;   
01043   AAA_Event   event;       
01044   AAA_State   nextState;
01045   AAA_Action<ARG>& action;
01046   bool isWildcardEvent;
01047 };
01048 
01062 
01063 template <class ARG> class AAA_StateTable : 
01064 public std::list< AAA_StateTableEntry <ARG> * >
01065 {
01066 public:
01067   AAA_StateTable() : isInitialStateSet(false) {}
01068 
01069   virtual ~AAA_StateTable() 
01070   { 
01071     for (typename std::list< AAA_StateTableEntry <ARG> * >::iterator 
01072            i=begin(); i!=end(); delete *(i++));
01073   }
01074 
01075   
01077   AAA_State InitialState() throw (AAA_Error)
01078   {
01079     if (! isInitialStateSet)
01080       throw NoInitialState;
01081     return initialState; 
01082   }
01083 
01085   void InitialState(AAA_State st) 
01086   {
01087     initialState = st; 
01088     isInitialStateSet = true;
01089   }
01090 
01093   bool FindStateTableEntry(AAA_State st, AAA_Event ev, 
01094                            AAA_StateTableEntry<ARG>*& entry)
01095   {
01096     bool found=false;
01097     for (typename std::list< AAA_StateTableEntry <ARG> * >::iterator 
01098            i=begin(); i!=end(); i++)
01099       {
01100         AAA_StateTableEntry<ARG> *e = *i;
01101         if (e->PrevState() != st)
01102           continue;
01103 
01104         if (e->IsWildcardEvent())
01105           {
01106             entry = e;
01107             found=true;
01108             continue;
01109           }
01110 
01111         if (e->Event() == ev)
01112           {
01113             entry = e;
01114             return true;
01115           }
01116       }
01117     return found;
01118   }
01119 
01122   bool FindStateTableEntry(AAA_State st, AAA_StateTableEntry<ARG>*& entry)
01123   {
01124     for (typename std::list< AAA_StateTableEntry <ARG> * >::iterator 
01125            i=begin(); i!=end(); i++)
01126       {
01127         entry  = *i;
01128         if (entry->PrevState() != st)
01129           continue;
01130         if (entry->IsWildcardEvent())
01131           return true;
01132       }
01133     return false;
01134   }
01135 
01136 protected:
01138   void AddStateTableEntry(AAA_State pSt, AAA_Event ev, 
01139                           AAA_State nSt, 
01140                           AAA_Action<ARG>& ac= AAA_NullAction<ARG>()())
01141     throw (AAA_Error)
01142   {
01143     AAA_StateTableEntry<ARG> *dummy;
01144     if (FindStateTableEntry(pSt, ev, dummy))
01145       {
01146         throw FoundDuplicateStateTableEntry;
01147       }
01148     push_back(new AAA_StateTableEntry<ARG>(pSt, ev, nSt, ac));
01149   }
01150 
01152   void AddWildcardStateTableEntry(AAA_State pSt, AAA_State nSt, 
01153                                   AAA_Action<ARG>& ac= AAA_NullAction<ARG>()())
01154       throw (AAA_Error)
01155   {
01156     AAA_StateTableEntry<ARG> *dummy;
01157     if (FindStateTableEntry(pSt, dummy))
01158       throw FoundDuplicateStateTableEntry;
01159 
01160     push_back(new AAA_StateTableEntry<ARG>(pSt,nSt,ac));
01161   }
01162 private:
01163   AAA_State initialState;
01164   bool isInitialStateSet;
01165 };
01166 
01178 
01179 class AAA_StateMachineBase
01180 {
01181 public:
01183   virtual void Start()=0;
01184 
01186   virtual void Stop()=0;
01187 
01189   virtual void Restart() { Stop(); Start(); }
01190 
01192   virtual bool Running()=0;
01193 
01196   virtual void Event(AAA_Event ev)=0;
01197 
01199   AAA_StateMachineBase& operator=(AAA_StateMachineBase& sm) { return sm; }
01200 
01203   std::string& Name() { return name; }
01204 
01205 protected:
01206   AAA_StateMachineBase(char *name=0) 
01207   {
01208     if (name)
01209       this->name = std::string(name);
01210   }
01211 
01213   virtual ~AAA_StateMachineBase() {}
01214 
01215  private:
01216   std::string name;
01217 };
01218 
01221 template <class ARG> 
01222 class AAA_StateMachine : public AAA_StateMachineBase
01223 {
01224 public:
01226   virtual ~AAA_StateMachine() {}
01227 
01229   virtual void Start() throw(AAA_Error)
01230   { 
01231     state = stateTable.InitialState();
01232     running = true;
01233   }
01234 
01236   inline void Stop() { running=false; }
01237 
01240   inline bool Running() { return running; }
01241 
01244 
01245   void Event(AAA_Event ev)
01246   {
01247     // Throw an exception if state machine is not started.
01248     if (!running)
01249       {
01250         FRAMEWORK_LOG(LM_ERROR, 
01251                       "StateMachine[%s] state machine is not running.\n", 
01252                       Name().c_str());
01253         return;
01254       }
01255 
01256     AAA_StateTableEntry<ARG>* entry;
01257 
01258     // Search state table for StateTableEntry containing the event
01259     // defined in the current state.
01260     if (!stateTable.FindStateTableEntry(state, ev, entry))
01261       {
01262         // Cannot found state table that accepts event.
01263         FRAMEWORK_LOG
01264           (LM_ERROR, 
01265            "StateMachine[%s] cannot accept event %d at state %d.\n", 
01266            Name().c_str(), ev, state);
01267         return;
01268       }
01269 
01270     // Change the state.
01271     state = entry->NextState();
01272 
01273     // Execute the action for the current StateTableEntry.
01274     entry->Action()(actionArg);
01275   }
01276 
01278   AAA_StateMachine<ARG>& operator=(AAA_StateMachine<ARG>& sm) { return sm; }
01279 
01280 protected:
01281   AAA_StateMachine(ARG &arg, AAA_StateTable<ARG> &table, char *name=0)
01282     : AAA_StateMachineBase(name),
01283     stateTable(table), actionArg(arg), running(true)
01284   {}
01285 
01286   AAA_StateTable<ARG>& stateTable;
01287   AAA_State state;
01288   ARG& actionArg;
01289 private:
01292   bool running;
01293 };
01294 
01302 
01303 class AAA_TimerTypeAllocator_S
01304 {
01305   friend class ACE_Singleton<AAA_TimerTypeAllocator_S, 
01306     ACE_Recursive_Thread_Mutex>;
01307  public:
01308   inline int Allocate() { return ++lastAllocatedType.value_i(); }
01309  private:
01310   AAA_TimerTypeAllocator_S() {}
01311   ~AAA_TimerTypeAllocator_S() {}
01312   ACE_Atomic_Op<ACE_Thread_Mutex, int> lastAllocatedType;
01313 };
01314 
01316 typedef ACE_Singleton<AAA_TimerTypeAllocator_S, ACE_Recursive_Thread_Mutex> 
01317 AAA_TimerTypeAllocator;
01318 
01319 typedef std::map<int,ACE_UINT32> TimerHandleMap;
01320 
01333 
01334 
01335 template <class ARG>
01336 class AAA_StateMachineWithTimer : public AAA_StateMachine<ARG>
01337 {
01338 public:
01342   void ScheduleTimer(AAA_Event ev, ACE_UINT32 sec, 
01343                      ACE_UINT32 usec=0, int type=0)
01344   {
01345     AAA_Event *eventP = new AAA_Event(ev);
01346     ACE_UINT32 timerHandle;
01347     // Cancel the already scheduled timer for this type.
01348     CancelTimer(type);
01349     timerHandle =  reactor.schedule_timer(
01350           (ACE_Event_Handler*)&timerEventHandler, 
01351           (const void*)eventP, 
01352           ACE_Time_Value(sec, usec), 
01353           ACE_Time_Value(sec, usec));
01354     timerHandleMap.insert(std::pair<int, ACE_UINT32>(type, timerHandle));
01355   }
01356 
01358   void CancelTimer(int type=0) 
01359   {
01360     AAA_Event *ev;
01361 
01362     ACE_UINT32 timerHandle;
01363     TimerHandleMap::iterator i;
01364     i = timerHandleMap.find(type);
01365     if (i == timerHandleMap.end())
01366       return;
01367 
01368     timerHandle = i->second;
01369     timerHandleMap.erase(i);
01370     if (reactor.cancel_timer(timerHandle, (const void**)&ev) == 1)
01371       delete ev;
01372   }
01373 
01375   void CancelAllTimer()
01376   {
01377     while (! timerHandleMap.empty()) {
01378       TimerHandleMap::iterator i = timerHandleMap.begin();
01379       CancelTimer(i->first);  
01380     }
01381   }
01382 
01383  protected:
01384   // separate class derivation for ACE_Event_Handler
01385   // to protect against changes to ACE_Event_Handler
01386   class AAA_FsmTimerEventHandler : public ACE_Event_Handler
01387   {
01388   public:
01389     AAA_FsmTimerEventHandler(AAA_StateMachineWithTimer<ARG> &fsm)
01390                              : fsm_ref(fsm) { }
01391 
01392   private:
01394     int handle_timeout(const ACE_Time_Value &tv, const void *arg)
01395     {
01396       AAA_Event event = *(AAA_Event*)arg;
01397 
01402       fsm_ref.Timeout(event);
01403       return (tv ? 0 : 0);
01404     }
01405     AAA_StateMachineWithTimer<ARG> &fsm_ref;
01406   };
01407     
01410   AAA_StateMachineWithTimer(ARG &arg, AAA_StateTable<ARG> &table, 
01411                             ACE_Reactor &r, char *name=0)
01412       : AAA_StateMachine<ARG>(arg, table, name), reactor(r),
01413         timerEventHandler(*this)        
01414   {}
01415 
01417   virtual ~AAA_StateMachineWithTimer()  { CancelAllTimer(); }
01418 
01421   virtual void Timeout(AAA_Event ev)=0;
01422 
01423 private:
01424   ACE_Reactor& reactor;
01425   TimerHandleMap timerHandleMap;
01426   AAA_FsmTimerEventHandler timerEventHandler;
01427 };
01428 
01429 #endif // __FRAMEWORK_H__

Generated on Fri Jun 25 17:35:03 2004 for Framework API for Task Management and Protocol State Machines by doxygen 1.3.5