00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037 #ifndef __FRAMEWORK_H__
00038 #define __FRAMEWORK_H__
00039
00040 #include <list>
00041 #include <map>
00042 #include <string>
00043 #include <ace/Task.h>
00044 #include <ace/Task_T.h>
00045 #include <ace/Thread.h>
00046 #include <ace/Thread_Manager.h>
00047 #include <ace/Event_Handler.h>
00048 #include <ace/Synch.h>
00049 #include <ace/Reactor.h>
00050 #include <ace/Thread_Exit.h>
00051 #include <ace/Time_Value.h>
00052 #include <ace/Singleton.h>
00053 #include <ace/Token.h>
00054 #include <ace/Log_Msg.h>
00055 #include <ace/Atomic_Op_T.h>
00056 #include <boost/shared_ptr.hpp>
00057 #include <boost/scoped_array.hpp>
00058 #include <boost/shared_array.hpp>
00059
00061 #define FRAMEWORK_LOG ACE_Log_Msg::instance()->log
00062
00067 template <class T> struct Type2Type
00068 {
00069 typedef T OriginalType;
00070 };
00071
00073 enum AAA_Error {
00074 NoInitialState,
00075 FoundDuplicateStateTableEntry,
00076 ActivationFailure,
00077 TaskNotRunning,
00078 NoData
00079 };
00080
00081
00088 template <class LOCK>
00089 class AAA_ScopeLock
00090 {
00091 public:
00092 AAA_ScopeLock(LOCK &l) : lock(l) {
00093 lock.acquire();
00094 }
00095 ~AAA_ScopeLock() {
00096 lock.release();
00097 }
00098
00099 private:
00100 LOCK &lock;
00101 };
00102
00104 typedef AAA_ScopeLock<ACE_Token> AAA_TokenScopeLock;
00105
00107 typedef AAA_ScopeLock<ACE_Mutex> AAA_MutexScopeLock;
00108
00110 typedef AAA_ScopeLock<ACE_Thread_Mutex> AAA_ThreadMutexScopeLock;
00111
00112 class AAA_JobData {};
00113
00114 class AAA_JobDeleter;
00115 class AAA_Task;
00116
00122
00123
00124 class AAA_Job
00125 {
00126 friend class AAA_JobDeleter;
00127
00128 public:
00130 AAA_Job(AAA_JobData *d=0, char* name=0) : data(d), priority(1), weight(1)
00131 {
00132 if (name) this->name = std::string(name);
00133 refcount = 1;
00134 }
00135
00138 virtual int Serve()=0;
00139
00146 virtual int Schedule(AAA_Job*, size_t backlogSize=1)=0;
00147
00150 void Delete() { if (--refcount <=0 ) delete this; }
00151
00153 virtual bool ExistBacklog() { return false; }
00154
00156 virtual size_t BacklogSize() { return 0; }
00157
00159 inline std::string& Name() { return name; }
00160
00162 inline AAA_JobData* Data() { return data; }
00163
00166 template <class T> inline T* Data(Type2Type<T>) { return (T*)data; }
00167
00169 unsigned& Priority() { return priority; }
00170
00172 unsigned& Weight() { return weight; }
00173
00175 void Acquire() { ++refcount; }
00176
00180 int Release(int n=1)
00181 {
00182 refcount -= n;
00183 if (refcount<0) {
00184 delete this;
00185 return -1;
00186 }
00187 return 0;
00188 }
00189
00192 virtual bool Running() { return true; }
00193
00194 bool operator==(AAA_Job* job) { return (this==job); }
00195
00196 bool operator!=(AAA_Job* job) { return (this!=job); }
00197
00198 protected:
00199
00201 virtual ~AAA_Job() {}
00202
00203 private:
00204
00206 std::string name;
00207
00209 AAA_JobData* data;
00210
00213 ACE_Atomic_Op<ACE_Thread_Mutex, int> refcount;
00214
00216 unsigned priority;
00217
00219 unsigned weight;
00220 };
00221
00228 template <class JOB>
00229 class AAA_JobHandle : public boost::shared_ptr<JOB>
00230 {
00231 public:
00232 AAA_JobHandle(JOB* job) : boost::shared_ptr<JOB>(job, AAA_JobDeleter()) {}
00233
00234 JOB& Job() { return (JOB&)*get(); }
00235 };
00236
00243 class AAA_JobDeleter
00244 {
00245 public:
00246 void operator()(AAA_Job *job) { job->Delete(); }
00247 static void Delete(AAA_Job *job) { job->Delete(); }
00248 };
00249
00250 const unsigned maxNumPriority = 3;
00251 const unsigned maxMaxWeight = 3;
00252
00261 template <class T, class LOCK = ACE_Thread_Mutex>
00262 class AAA_QueueJob : public AAA_Job
00263 {
00264 public:
00266 AAA_QueueJob(AAA_JobData *d=0, char* name=0,
00267 unsigned numPriority=1, unsigned maxWeight=1) throw (int)
00268 : AAA_Job(d, name),
00269 numPriority(numPriority), maxWeight(maxWeight), total(0),
00270 cond(std::auto_ptr< ACE_Condition<LOCK> >
00271 (new ACE_Condition<LOCK>(lock))),
00272 signaled(false)
00273
00274 {
00275 if (numPriority > maxNumPriority)
00276 {
00277 FRAMEWORK_LOG(LM_ERROR, "[%N] Too many priorities.\n");
00278 throw 1;
00279 }
00280 if (maxWeight > maxMaxWeight)
00281 {
00282 FRAMEWORK_LOG(LM_ERROR, "[%N] Too large weight.\n");
00283 throw 1;
00284 }
00285 indexQueue = boost::shared_array< std::list<unsigned> >
00286 (new std::list<unsigned>[numPriority]);
00287
00288 dataQueue = boost::shared_array< std::list<T> >
00289 (new std::list<T>[numPriority*maxWeight]);
00290 }
00291
00293 virtual ~AAA_QueueJob() {}
00294
00296 virtual int Serve()=0;
00297
00299 virtual int Schedule(AAA_Job*, size_t backlogSize=1)=0;
00300
00302 inline bool ExistBacklog()
00303 {
00304
00305
00306 return (total>0);
00307 }
00308
00309 inline size_t BacklogSize() { return total; }
00310
00314 inline int Enqueue(T entry, bool blocking=false,
00315 unsigned priority=1, unsigned weight=1)
00316 {
00317 AAA_ScopeLock<LOCK> g(lock);
00318
00319
00320
00321
00322
00323
00324 while (!signaled && total==MaxSize())
00325 {
00326 if (!blocking)
00327 {
00328 FRAMEWORK_LOG(LM_DEBUG, "[%N] Enqueue is blocked.\n");
00329 return 0;
00330 }
00331 cond->wait();
00332 }
00333
00334 if (signaled && blocking)
00335 return -1;
00336
00337 unsigned pIndex = Index(priority);
00338 unsigned dIndex = Index(priority, weight);
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352
00353
00354
00355 if (dataQueue[dIndex].size() < weight)
00356 indexQueue[pIndex].push_back(dIndex);
00357
00358 dataQueue[dIndex].push_back(entry);
00359 total++;
00360
00361
00362
00363 cond->signal();
00364 return total;
00365 }
00366
00373 int Dequeue(T &entry, bool blocking=false) throw(int)
00374 {
00375 do {
00376 AAA_ScopeLock<LOCK> g(lock);
00377
00378
00379
00380
00381
00382 while (!signaled && total==0)
00383 {
00384 if (!blocking)
00385 return 0;
00386 cond->wait();
00387 }
00388
00389 if (signaled && blocking)
00390 return -1;
00391
00392
00393
00394
00395
00396
00397
00398
00399
00400
00401
00402
00403
00404
00405
00406
00407
00408
00409
00410 do {
00411 bool found=false;
00412 for (unsigned p=0; p<numPriority; p++)
00413 {
00414 if (indexQueue[p].empty())
00415 continue;
00416
00417 unsigned dIndex = indexQueue[p].front();
00418 indexQueue[p].pop_front();
00419 entry = dataQueue[dIndex].front();
00420 dataQueue[dIndex].pop_front();
00421 if (dataQueue[dIndex].size() >= (dIndex % maxWeight) + 1)
00422 indexQueue[p].push_back(dIndex);
00423 found=true;
00424 total--;
00425 break;
00426 }
00427
00428 if (!found)
00429 {
00430 FRAMEWORK_LOG(LM_ERROR, "[%N] BUG (dequeue error).\n");
00431 return 0;
00432 }
00433
00434 } while (0);
00435 } while (0);
00436 if (total>0)
00437 cond->signal();
00438 return 1;
00439 }
00440
00443 inline int Remove(T entry)
00444 {
00445 AAA_ScopeLock<LOCK> g(lock);
00446 int totalRemove=0;
00447 for (unsigned i=0; i<numPriority; i++)
00448 for (unsigned j=i*maxWeight; j<(i+1)*maxWeight; j++)
00449 {
00450 unsigned preSize = (unsigned)dataQueue[j].size();
00451 dataQueue[j].remove(entry);
00452 unsigned postSize = (unsigned)dataQueue[j].size();
00453 if (preSize != postSize)
00454 {
00455 totalRemove =+ preSize - postSize;
00456 total -= preSize - postSize;
00457
00458 AdjustIndexQueue(i, j);
00459 }
00460 }
00461 if (totalRemove>0)
00462 cond->signal();
00463 return totalRemove;
00464 }
00465
00467 virtual void Flush()
00468 {
00469 AAA_ScopeLock<LOCK> g(lock);
00470 for (unsigned i=0; i<numPriority; indexQueue[i++].clear());
00471 for (unsigned i=0; i<numPriority*maxWeight; dataQueue[i++].clear());
00472 total=0;
00473 cond->signal();
00474 }
00475
00476 public:
00477 void Signal()
00478 {
00479 signaled=true;
00480 cond->broadcast();
00481 }
00482
00484 inline size_t MaxSize() { return (size_t)-1; }
00485
00486 protected:
00487
00489 LOCK lock;
00490
00491 private:
00492
00493 inline unsigned Index(unsigned p)
00494 {
00495 p = (p>maxNumPriority) ? maxNumPriority : p;
00496 return p-1;
00497 }
00498
00499 inline unsigned Index(unsigned p, unsigned w)
00500 {
00501 p = (p>maxNumPriority) ? maxNumPriority : p;
00502 w = (w>maxMaxWeight) ? maxMaxWeight : w;
00503 return (p-1)*maxWeight + (w-1);
00504 }
00505
00506 void AdjustIndexQueue(unsigned pIndex, unsigned dIndex)
00507 {
00508 std::list<unsigned> &iQueue = indexQueue[pIndex];
00509 std::list<unsigned>::iterator i;
00510 unsigned j=0;
00511 for (i=iQueue.begin(); i!=iQueue.end(); )
00512 {
00513 if ((*i) == dIndex)
00514 i = (++j > dataQueue[dIndex].size()) ? iQueue.erase(i) : i++;
00515 else
00516 i++;
00517 }
00518 }
00519
00520 unsigned numPriority;
00521 unsigned maxWeight;
00522
00523
00524 boost::shared_array< std::list<unsigned> > indexQueue;
00525
00526
00527 boost::shared_array< std::list<T> > dataQueue;
00528
00529 unsigned total;
00530
00532 std::auto_ptr< ACE_Condition<LOCK> > cond;
00533
00535 bool signaled;
00536 };
00537
00539 typedef int AAA_Event;
00540
00547
00548
00549 typedef AAA_QueueJob<AAA_Event> AAA_EventQueueJob;
00550
00557 typedef AAA_QueueJob<AAA_Job*> AAA_JobQueueJob;
00558
00562 #define AAA_SCHED_FIFO 0
00563 #define AAA_SCHED_WFQ 1
00564 #define AAA_SCHED_PRIORITY 2
00565
00566 typedef ACE_UINT16 AAA_SchedulingPolicy;
00567
00568
00611 class AAA_GroupedJob : public AAA_QueueJob<AAA_Job*, ACE_Thread_Mutex>
00612 {
00613 public:
00615 static AAA_GroupedJob*
00616 Create(AAA_Job &parent, AAA_JobData* d,
00617 char *name=0,
00618 AAA_SchedulingPolicy policy=AAA_SCHED_FIFO,
00619 bool blocking=false,
00620 unsigned numPriority=1, unsigned maxWeight=1) throw(AAA_Error)
00621 {
00622 if (d==0)
00623 {
00624 FRAMEWORK_LOG(LM_ERROR, "AAA_JobData must be non-null.\n");
00625 throw NoData;
00626 }
00627 return new AAA_GroupedJob(parent, d, name, policy, blocking,
00628 numPriority, maxWeight);
00629 }
00630
00631 static AAA_GroupedJob*
00632 Create(AAA_JobData* d, char *name=0,
00633 AAA_SchedulingPolicy policy=AAA_SCHED_FIFO,
00634 bool blocking=false,
00635 unsigned numPriority=1, unsigned maxWeight=1) throw(AAA_Error)
00636 {
00637 if (d==0)
00638 {
00639 FRAMEWORK_LOG(LM_ERROR, "AAA_JobData must be non-null.\n");
00640 throw NoData;
00641 }
00642 return new AAA_GroupedJob(d, name, policy, blocking,
00643 numPriority, maxWeight);
00644 }
00645
00646 void Start() throw(AAA_Error) { running=true; }
00647
00648 void Stop()
00649 {
00650 running=false; Flush();
00651 Signal();
00652 }
00653
00654 void Flush()
00655 {
00656 while (ExistBacklog())
00657 {
00658 AAA_Job* job;
00659 Dequeue(job);
00660 job->Release();
00661 }
00662 }
00663
00664 void Remove(AAA_Job *job)
00665 {
00666
00667
00668 int n = AAA_QueueJob<AAA_Job*, ACE_Thread_Mutex>::Remove(job);
00669 job->Release(n);
00670 }
00671
00673 int Schedule(AAA_Job *job , size_t backlogSize=1)
00674 {
00675 if (!Running())
00676 return (-1);
00677
00678 job->Acquire();
00679
00680 unsigned priority, weight = 0;
00681 EnforceSchedulingPolicy(job, priority, weight);
00682
00683
00684
00685 if (backlogSize > weight)
00686 return (0);
00687
00688
00689
00690 do {
00691 int result = Enqueue(job, blocking);
00692 if (result <= 0)
00693 return (-1);
00694 } while (0);
00695
00696
00697
00698
00699 return (!IsRoot()) ? parent.Schedule(this) : 0;
00700 }
00701
00703 int Serve()
00704 {
00705 AAA_Job* job;
00706
00707 do {
00708 int result = Dequeue(job, blocking);
00709 if (result <= 0)
00710 return result;
00711 } while (0);
00712
00713 if (job->Release() == -1)
00714 return (int)BacklogSize();
00715
00716 unsigned childBacklogSize = job->Serve();
00717
00718 unsigned priority, weight = 0;
00719 EnforceSchedulingPolicy(job, priority, weight);
00720
00721
00722
00723 if (childBacklogSize < weight)
00724 return (int)BacklogSize();
00725
00726 job->Acquire();
00727
00728
00729
00730 return Enqueue(job, priority ? true : false);
00731 }
00732
00733 protected:
00734
00735 private:
00737 AAA_GroupedJob(AAA_Job &parent,
00738 AAA_JobData* d,
00739 char *name=0,
00740 AAA_SchedulingPolicy policy=AAA_SCHED_FIFO,
00741 bool blocking=false,
00742 unsigned numPriority=1, unsigned maxWeight=1)
00743 : AAA_QueueJob<AAA_Job*, ACE_Thread_Mutex>(d, name,
00744 numPriority, maxWeight),
00745 parent(parent), policy(policy), running(false), blocking(blocking)
00746 {
00747
00748 parent.Acquire();
00749 }
00750
00752 AAA_GroupedJob(AAA_JobData* d, char *name=0,
00753 AAA_SchedulingPolicy policy=AAA_SCHED_FIFO,
00754 bool blocking=false,
00755 unsigned numPriority=1, unsigned maxWeight=1)
00756 : AAA_QueueJob<AAA_Job*, ACE_Thread_Mutex>(d, name,
00757 numPriority, maxWeight),
00758 parent(*this), policy(policy), running(false), blocking(blocking)
00759 {}
00760
00762 ~AAA_GroupedJob() {
00763 Stop();
00764 if (!IsRoot())
00765 parent.Release();
00766 }
00767
00769 inline bool IsRoot() { return (parent == this); }
00770
00771
00774 void EnforceSchedulingPolicy(AAA_Job *job,
00775 unsigned &priority,
00776 unsigned &weight)
00777 {
00779 const int infinity = 100000;
00780
00781 priority = job->Priority();
00782 weight = job->Weight();
00783
00784
00785 if (policy != AAA_SCHED_WFQ)
00786 weight = infinity;
00787
00788 if (policy != AAA_SCHED_PRIORITY)
00789 priority = 1;
00790 }
00791
00793 AAA_Job& parent;
00794
00798 unsigned policy;
00799
00800 bool running;
00801
00804 bool blocking;
00805 };
00806
00814
00815
00816 class AAA_Task : public ACE_Task<ACE_MT_SYNCH>
00817 {
00818 public:
00819
00827 AAA_Task(AAA_SchedulingPolicy policy=AAA_SCHED_FIFO, char *name=0,
00828 unsigned numPriority=1, unsigned maxWeight=1)
00829 : handle
00830 (AAA_JobHandle<AAA_GroupedJob>
00831 (AAA_GroupedJob::Create((AAA_JobData*)this, name, policy, true,
00832 numPriority, maxWeight))),
00833 cond(std::auto_ptr< ACE_Condition<ACE_Mutex> >
00834 (new ACE_Condition<ACE_Mutex>(mutex)))
00835 {
00836 reactor(0);
00837 }
00838
00839 virtual ~AAA_Task() { Stop(); }
00840
00843 void Start(int n) throw(AAA_Error)
00844 {
00845 if (Running())
00846 return;
00847
00848 if (activate(THR_NEW_LWP|THR_JOINABLE, n+1) == -1)
00849 {
00850 throw ActivationFailure;
00851 }
00852
00853
00854 do {
00855 AAA_MutexScopeLock guard(mutex);
00856 while (!Running()) { cond->wait(); }
00857 } while (0);
00858
00859 Job().Start();
00860 }
00861
00863
00864 void Stop()
00865 {
00866 if (!Running())
00867 return;
00868
00869
00870 Job().Stop();
00871
00872
00873 reactor()->schedule_timer(this, (const void*)0, ACE_Time_Value(0));
00874
00875
00876 wait();
00877 }
00878
00884 inline long ScheduleTimer(ACE_Event_Handler* eventHandler, const void* arg,
00885 ACE_Time_Value delay, ACE_Time_Value interval=0)
00886 {
00887 if (Running())
00888 return reactor()->schedule_timer(eventHandler, arg, delay, interval);
00889 return (-1);
00890 }
00891
00894 inline int CancelTimer(ACE_UINT32 timerHandle, const void** arg)
00895 {
00896 if (Running())
00897 return reactor()->cancel_timer(timerHandle, arg);
00898 return (-1);
00899 }
00900
00901 inline bool Running() { return reactor() ? true : false; }
00902
00903 inline AAA_GroupedJob& Job() { return handle.Job(); }
00904
00905 inline AAA_JobHandle<AAA_GroupedJob>& JobHandle() { return handle; }
00906
00907 protected:
00908
00909 private:
00910
00913 int svc()
00914 {
00915 #if !defined(OS_FREEBSD) && !defined(WIN32)
00916 std::auto_ptr<ACE_Thread_Exit> threadExit(new ACE_Thread_Exit);
00917 ACE_TSS<ACE_Thread_Exit> threadExitTSS(threadExit.get());
00918 #endif
00919 mutex.acquire();
00920 if (!reactor())
00921 {
00922 std::auto_ptr<ACE_Reactor> r(new ACE_Reactor);
00923 reactor(r.get());
00924 mutex.release();
00925 cond->signal();
00926 while (1)
00927 if (reactor()->handle_events() == -1)
00928 FRAMEWORK_LOG(LM_ERROR, "[%N] handle_events().\n",
00929 Job().Name().c_str());
00930 return 0;
00931 }
00932 mutex.release();
00933 while (1)
00934 {
00935 if (Job().Serve()<0)
00936 return 0;
00937 }
00938 return 0;
00939 }
00940
00941 int close(u_long flags=0) {
00942 return 0;
00943 }
00944
00945 int handle_timeout(const ACE_Time_Value &tv, const void *arg)
00946 {
00947 reactor(0);
00948 ACE_Thread::exit();
00949 return 0;
00950 }
00951
00952 AAA_JobHandle<AAA_GroupedJob> handle;
00953
00955 std::auto_ptr< ACE_Condition<ACE_Mutex> > cond;
00956
00958 ACE_Mutex mutex;
00959 };
00960
00968
00969 typedef int AAA_State;
00970
00973 template <class ARG> class AAA_Action
00974 {
00975 public:
00976 virtual void operator()(ARG&)=0;
00977 protected:
00978 virtual ~AAA_Action() {}
00979 AAA_Action() {}
00980 };
00981
00984 template <class ARG> class AAA_NullAction_S : public AAA_Action<ARG>
00985 {
00986 friend class ACE_Singleton<AAA_NullAction_S<ARG>,
00987 ACE_Recursive_Thread_Mutex>;
00988 public:
00989 void operator()(ARG&) {}
00990 private:
00991 AAA_NullAction_S() {}
00992 ~AAA_NullAction_S() {}
00993 };
00994
00997 template <class ARG> class AAA_NullAction
00998 {
00999 public:
01000 AAA_Action<ARG>& operator()()
01001 {
01002 return *ACE_Singleton<AAA_NullAction_S<ARG>, ACE_Recursive_Thread_Mutex>::
01003 instance();
01004 }
01005 };
01006
01016
01017
01018 template <class ARG> class AAA_StateTableEntry
01019 {
01020 public:
01022 AAA_StateTableEntry(AAA_State st1, AAA_Event ev, AAA_State st2,
01023 AAA_Action<ARG>& ac=AAA_NullAction<ARG>()()) :
01024 prevState(st1), event(ev), nextState(st2), action(ac), isWildcardEvent(false)
01025 {}
01027 AAA_StateTableEntry(AAA_State st1, AAA_State st2,
01028 AAA_Action<ARG>& ac=AAA_NullAction<ARG>()()) :
01029 prevState(st1), nextState(st2), action(ac), isWildcardEvent(true)
01030 {}
01031
01033 ~AAA_StateTableEntry() {}
01034
01035 inline AAA_State PrevState() { return prevState; }
01036 inline AAA_Event Event() { return event; }
01037 inline AAA_State NextState() { return nextState; }
01038 inline AAA_Action<ARG>& Action() { return action; }
01039 inline bool IsWildcardEvent() { return isWildcardEvent; }
01040
01041 protected:
01042 AAA_State prevState;
01043 AAA_Event event;
01044 AAA_State nextState;
01045 AAA_Action<ARG>& action;
01046 bool isWildcardEvent;
01047 };
01048
01062
01063 template <class ARG> class AAA_StateTable :
01064 public std::list< AAA_StateTableEntry <ARG> * >
01065 {
01066 public:
01067 AAA_StateTable() : isInitialStateSet(false) {}
01068
01069 virtual ~AAA_StateTable()
01070 {
01071 for (typename std::list< AAA_StateTableEntry <ARG> * >::iterator
01072 i=begin(); i!=end(); delete *(i++));
01073 }
01074
01075
01077 AAA_State InitialState() throw (AAA_Error)
01078 {
01079 if (! isInitialStateSet)
01080 throw NoInitialState;
01081 return initialState;
01082 }
01083
01085 void InitialState(AAA_State st)
01086 {
01087 initialState = st;
01088 isInitialStateSet = true;
01089 }
01090
01093 bool FindStateTableEntry(AAA_State st, AAA_Event ev,
01094 AAA_StateTableEntry<ARG>*& entry)
01095 {
01096 bool found=false;
01097 for (typename std::list< AAA_StateTableEntry <ARG> * >::iterator
01098 i=begin(); i!=end(); i++)
01099 {
01100 AAA_StateTableEntry<ARG> *e = *i;
01101 if (e->PrevState() != st)
01102 continue;
01103
01104 if (e->IsWildcardEvent())
01105 {
01106 entry = e;
01107 found=true;
01108 continue;
01109 }
01110
01111 if (e->Event() == ev)
01112 {
01113 entry = e;
01114 return true;
01115 }
01116 }
01117 return found;
01118 }
01119
01122 bool FindStateTableEntry(AAA_State st, AAA_StateTableEntry<ARG>*& entry)
01123 {
01124 for (typename std::list< AAA_StateTableEntry <ARG> * >::iterator
01125 i=begin(); i!=end(); i++)
01126 {
01127 entry = *i;
01128 if (entry->PrevState() != st)
01129 continue;
01130 if (entry->IsWildcardEvent())
01131 return true;
01132 }
01133 return false;
01134 }
01135
01136 protected:
01138 void AddStateTableEntry(AAA_State pSt, AAA_Event ev,
01139 AAA_State nSt,
01140 AAA_Action<ARG>& ac= AAA_NullAction<ARG>()())
01141 throw (AAA_Error)
01142 {
01143 AAA_StateTableEntry<ARG> *dummy;
01144 if (FindStateTableEntry(pSt, ev, dummy))
01145 {
01146 throw FoundDuplicateStateTableEntry;
01147 }
01148 push_back(new AAA_StateTableEntry<ARG>(pSt, ev, nSt, ac));
01149 }
01150
01152 void AddWildcardStateTableEntry(AAA_State pSt, AAA_State nSt,
01153 AAA_Action<ARG>& ac= AAA_NullAction<ARG>()())
01154 throw (AAA_Error)
01155 {
01156 AAA_StateTableEntry<ARG> *dummy;
01157 if (FindStateTableEntry(pSt, dummy))
01158 throw FoundDuplicateStateTableEntry;
01159
01160 push_back(new AAA_StateTableEntry<ARG>(pSt,nSt,ac));
01161 }
01162 private:
01163 AAA_State initialState;
01164 bool isInitialStateSet;
01165 };
01166
01178
01179 class AAA_StateMachineBase
01180 {
01181 public:
01183 virtual void Start()=0;
01184
01186 virtual void Stop()=0;
01187
01189 virtual void Restart() { Stop(); Start(); }
01190
01192 virtual bool Running()=0;
01193
01196 virtual void Event(AAA_Event ev)=0;
01197
01199 AAA_StateMachineBase& operator=(AAA_StateMachineBase& sm) { return sm; }
01200
01203 std::string& Name() { return name; }
01204
01205 protected:
01206 AAA_StateMachineBase(char *name=0)
01207 {
01208 if (name)
01209 this->name = std::string(name);
01210 }
01211
01213 virtual ~AAA_StateMachineBase() {}
01214
01215 private:
01216 std::string name;
01217 };
01218
01221 template <class ARG>
01222 class AAA_StateMachine : public AAA_StateMachineBase
01223 {
01224 public:
01226 virtual ~AAA_StateMachine() {}
01227
01229 virtual void Start() throw(AAA_Error)
01230 {
01231 state = stateTable.InitialState();
01232 running = true;
01233 }
01234
01236 inline void Stop() { running=false; }
01237
01240 inline bool Running() { return running; }
01241
01244
01245 void Event(AAA_Event ev)
01246 {
01247
01248 if (!running)
01249 {
01250 FRAMEWORK_LOG(LM_ERROR,
01251 "StateMachine[%s] state machine is not running.\n",
01252 Name().c_str());
01253 return;
01254 }
01255
01256 AAA_StateTableEntry<ARG>* entry;
01257
01258
01259
01260 if (!stateTable.FindStateTableEntry(state, ev, entry))
01261 {
01262
01263 FRAMEWORK_LOG
01264 (LM_ERROR,
01265 "StateMachine[%s] cannot accept event %d at state %d.\n",
01266 Name().c_str(), ev, state);
01267 return;
01268 }
01269
01270
01271 state = entry->NextState();
01272
01273
01274 entry->Action()(actionArg);
01275 }
01276
01278 AAA_StateMachine<ARG>& operator=(AAA_StateMachine<ARG>& sm) { return sm; }
01279
01280 protected:
01281 AAA_StateMachine(ARG &arg, AAA_StateTable<ARG> &table, char *name=0)
01282 : AAA_StateMachineBase(name),
01283 stateTable(table), actionArg(arg), running(true)
01284 {}
01285
01286 AAA_StateTable<ARG>& stateTable;
01287 AAA_State state;
01288 ARG& actionArg;
01289 private:
01292 bool running;
01293 };
01294
01302
01303 class AAA_TimerTypeAllocator_S
01304 {
01305 friend class ACE_Singleton<AAA_TimerTypeAllocator_S,
01306 ACE_Recursive_Thread_Mutex>;
01307 public:
01308 inline int Allocate() { return ++lastAllocatedType.value_i(); }
01309 private:
01310 AAA_TimerTypeAllocator_S() {}
01311 ~AAA_TimerTypeAllocator_S() {}
01312 ACE_Atomic_Op<ACE_Thread_Mutex, int> lastAllocatedType;
01313 };
01314
01316 typedef ACE_Singleton<AAA_TimerTypeAllocator_S, ACE_Recursive_Thread_Mutex>
01317 AAA_TimerTypeAllocator;
01318
01319 typedef std::map<int,ACE_UINT32> TimerHandleMap;
01320
01333
01334
01335 template <class ARG>
01336 class AAA_StateMachineWithTimer : public AAA_StateMachine<ARG>
01337 {
01338 public:
01342 void ScheduleTimer(AAA_Event ev, ACE_UINT32 sec,
01343 ACE_UINT32 usec=0, int type=0)
01344 {
01345 AAA_Event *eventP = new AAA_Event(ev);
01346 ACE_UINT32 timerHandle;
01347
01348 CancelTimer(type);
01349 timerHandle = reactor.schedule_timer(
01350 (ACE_Event_Handler*)&timerEventHandler,
01351 (const void*)eventP,
01352 ACE_Time_Value(sec, usec),
01353 ACE_Time_Value(sec, usec));
01354 timerHandleMap.insert(std::pair<int, ACE_UINT32>(type, timerHandle));
01355 }
01356
01358 void CancelTimer(int type=0)
01359 {
01360 AAA_Event *ev;
01361
01362 ACE_UINT32 timerHandle;
01363 TimerHandleMap::iterator i;
01364 i = timerHandleMap.find(type);
01365 if (i == timerHandleMap.end())
01366 return;
01367
01368 timerHandle = i->second;
01369 timerHandleMap.erase(i);
01370 if (reactor.cancel_timer(timerHandle, (const void**)&ev) == 1)
01371 delete ev;
01372 }
01373
01375 void CancelAllTimer()
01376 {
01377 while (! timerHandleMap.empty()) {
01378 TimerHandleMap::iterator i = timerHandleMap.begin();
01379 CancelTimer(i->first);
01380 }
01381 }
01382
01383 protected:
01384
01385
01386 class AAA_FsmTimerEventHandler : public ACE_Event_Handler
01387 {
01388 public:
01389 AAA_FsmTimerEventHandler(AAA_StateMachineWithTimer<ARG> &fsm)
01390 : fsm_ref(fsm) { }
01391
01392 private:
01394 int handle_timeout(const ACE_Time_Value &tv, const void *arg)
01395 {
01396 AAA_Event event = *(AAA_Event*)arg;
01397
01402 fsm_ref.Timeout(event);
01403 return (tv ? 0 : 0);
01404 }
01405 AAA_StateMachineWithTimer<ARG> &fsm_ref;
01406 };
01407
01410 AAA_StateMachineWithTimer(ARG &arg, AAA_StateTable<ARG> &table,
01411 ACE_Reactor &r, char *name=0)
01412 : AAA_StateMachine<ARG>(arg, table, name), reactor(r),
01413 timerEventHandler(*this)
01414 {}
01415
01417 virtual ~AAA_StateMachineWithTimer() { CancelAllTimer(); }
01418
01421 virtual void Timeout(AAA_Event ev)=0;
01422
01423 private:
01424 ACE_Reactor& reactor;
01425 TimerHandleMap timerHandleMap;
01426 AAA_FsmTimerEventHandler timerEventHandler;
01427 };
01428
01429 #endif // __FRAMEWORK_H__