• Main Page
  • Related Pages
  • Modules
  • Namespaces
  • Classes
  • Files
  • File List
  • File Members

ThreadPool.h

Go to the documentation of this file.
00001 #ifndef __THREAD_POOL_H
00002 #define __THREAD_POOL_H
00003 
00004 #include "RakMemoryOverride.h"
00005 #include "DS_Queue.h"
00006 #include "SimpleMutex.h"
00007 #include "Export.h"
00008 #include "RakThread.h"
00009 #include "SignaledEvent.h"
00010 
00011 #ifdef _MSC_VER
00012 #pragma warning( push )
00013 #endif
00014 
00015 class ThreadDataInterface
00016 {
00017 public:
00018         ThreadDataInterface() {}
00019         virtual ~ThreadDataInterface() {}
00020 
00021         virtual void* PerThreadFactory(void *context)=0;
00022         virtual void PerThreadDestructor(void* factoryResult, void *context)=0;
00023 };
00028 template <class InputType, class OutputType>
00029 struct RAK_DLL_EXPORT ThreadPool
00030 {
00031         ThreadPool();
00032         ~ThreadPool();
00033 
00040         bool StartThreads(int numThreads, int stackSize, void* (*_perThreadInit)()=0, void (*_perThreadDeinit)(void*)=0);
00041 
00042         // Alternate form of _perThreadDataFactory, _perThreadDataDestructor
00043         void SetThreadDataInterface(ThreadDataInterface *tdi, void *context);
00044 
00046         void StopThreads(void);
00047 
00056         void AddInput(OutputType (*workerThreadCallback)(InputType, bool *returnOutput, void* perThreadData), InputType inputData);
00057 
00061         void AddOutput(OutputType outputData);
00062 
00065         bool HasOutput(void);
00066 
00069         bool HasOutputFast(void);
00070 
00073         bool HasInput(void);
00074 
00077         bool HasInputFast(void);
00078 
00082         OutputType GetOutput(void);
00083 
00085         void Clear(void);
00086 
00089         void LockInput(void);
00090 
00092         void UnlockInput(void);
00093 
00095         unsigned InputSize(void);
00096 
00098         InputType GetInputAtIndex(unsigned index);
00099 
00101         void RemoveInputAtIndex(unsigned index);
00102 
00105         void LockOutput(void);
00106         
00108         void UnlockOutput(void);
00109 
00111         unsigned OutputSize(void);
00112 
00114         OutputType GetOutputAtIndex(unsigned index);
00115 
00117         void RemoveOutputAtIndex(unsigned index);
00118 
00120         void ClearInput(void);
00121 
00123         void ClearOutput(void);
00124 
00126         bool IsWorking(void);
00127 
00129         int NumThreadsWorking(void);
00130 
00132         bool WasStarted(void);
00133 
00134         // Block until all threads are stopped.
00135         bool Pause(void);
00136 
00137         // Continue running
00138         void Resume(void);
00139 
00140 protected:
00141         // It is valid to cancel input before it is processed.  To do so, lock the inputQueue with inputQueueMutex,
00142         // Scan the list, and remove the item you don't want.
00143         SimpleMutex inputQueueMutex, outputQueueMutex, workingThreadCountMutex, runThreadsMutex;
00144 
00145         void* (*perThreadDataFactory)();
00146         void (*perThreadDataDestructor)(void*);
00147 
00148         // inputFunctionQueue & inputQueue are paired arrays so if you delete from one at a particular index you must delete from the other
00149         // at the same index
00150         DataStructures::Queue<OutputType (*)(InputType, bool *, void*)> inputFunctionQueue;
00151         DataStructures::Queue<InputType> inputQueue;
00152         DataStructures::Queue<OutputType> outputQueue;
00153 
00154         ThreadDataInterface *threadDataInterface;
00155         void *tdiContext;
00156 
00157         
00158         template <class ThreadInputType, class ThreadOutputType>
00159         friend RAK_THREAD_DECLARATION(WorkerThread);
00160 
00161         /*
00162 #ifdef _WIN32
00163         friend unsigned __stdcall WorkerThread( LPVOID arguments );
00164 #else
00165         friend void* WorkerThread( void* arguments );
00166 #endif
00167         */
00168 
00170         bool runThreads;
00172         int numThreadsRunning;
00174         int numThreadsWorking;
00176         SimpleMutex numThreadsRunningMutex;
00177 
00178         SignaledEvent quitAndIncomingDataEvents;
00179 };
00180 
00181 #include "ThreadPool.h"
00182 #include "RakSleep.h"
00183 #ifdef _WIN32
00184 #else
00185 #include <unistd.h>
00186 #endif
00187 
00188 #ifdef _MSC_VER
00189 #pragma warning(disable:4127)
00190 #pragma warning( disable : 4701 )  // potentially uninitialized local variable 'inputData' used
00191 #endif
00192 
00193 template <class ThreadInputType, class ThreadOutputType>
00194 RAK_THREAD_DECLARATION(WorkerThread)
00195 /*
00196 #ifdef _WIN32
00197 unsigned __stdcall WorkerThread( LPVOID arguments )
00198 #else
00199 void* WorkerThread( void* arguments )
00200 #endif
00201 */
00202 {
00203         bool returnOutput;
00204         ThreadPool<ThreadInputType, ThreadOutputType> *threadPool = (ThreadPool<ThreadInputType, ThreadOutputType>*) arguments;
00205         ThreadOutputType (*userCallback)(ThreadInputType, bool *, void*);
00206         ThreadInputType inputData;
00207         ThreadOutputType callbackOutput;
00208 
00209         userCallback=0;
00210 
00211         void *perThreadData;
00212         if (threadPool->perThreadDataFactory)
00213                 perThreadData=threadPool->perThreadDataFactory();
00214         else if (threadPool->threadDataInterface)
00215                 perThreadData=threadPool->threadDataInterface->PerThreadFactory(threadPool->tdiContext);
00216         else
00217                 perThreadData=0;
00218 
00219         // Increase numThreadsRunning
00220         threadPool->numThreadsRunningMutex.Lock();
00221         ++threadPool->numThreadsRunning;
00222         threadPool->numThreadsRunningMutex.Unlock();
00223 
00224         while (1)
00225         {
00226 #ifdef _WIN32
00227                 if (userCallback==0)
00228                 {
00229                         threadPool->quitAndIncomingDataEvents.WaitOnEvent(INFINITE);
00230                 }               
00231 #endif
00232 
00233                 threadPool->runThreadsMutex.Lock();
00234                 if (threadPool->runThreads==false)
00235                 {
00236                         threadPool->runThreadsMutex.Unlock();
00237                         break;
00238                 }
00239                 threadPool->runThreadsMutex.Unlock();
00240 
00241                 threadPool->workingThreadCountMutex.Lock();
00242                 ++threadPool->numThreadsWorking;
00243                 threadPool->workingThreadCountMutex.Unlock();
00244 
00245                 // Read input data
00246                 userCallback=0;
00247                 threadPool->inputQueueMutex.Lock();
00248                 if (threadPool->inputFunctionQueue.Size())
00249                 {
00250                         userCallback=threadPool->inputFunctionQueue.Pop();
00251                         inputData=threadPool->inputQueue.Pop();
00252                 }
00253                 threadPool->inputQueueMutex.Unlock();
00254 
00255                 if (userCallback)
00256                 {
00257                         callbackOutput=userCallback(inputData, &returnOutput,perThreadData);
00258                         if (returnOutput)
00259                         {
00260                                 threadPool->outputQueueMutex.Lock();
00261                                 threadPool->outputQueue.Push(callbackOutput, __FILE__, __LINE__ );
00262                                 threadPool->outputQueueMutex.Unlock();
00263                         }                       
00264                 }
00265 
00266                 threadPool->workingThreadCountMutex.Lock();
00267                 --threadPool->numThreadsWorking;
00268                 threadPool->workingThreadCountMutex.Unlock();
00269         }
00270 
00271         // Decrease numThreadsRunning
00272         threadPool->numThreadsRunningMutex.Lock();
00273         --threadPool->numThreadsRunning;
00274         threadPool->numThreadsRunningMutex.Unlock();
00275         
00276         if (threadPool->perThreadDataDestructor)
00277                 threadPool->perThreadDataDestructor(perThreadData);
00278         else if (threadPool->threadDataInterface)
00279                 threadPool->threadDataInterface->PerThreadDestructor(perThreadData, threadPool->tdiContext);
00280 
00281         return 0;
00282 }
00283 template <class InputType, class OutputType>
00284 ThreadPool<InputType, OutputType>::ThreadPool()
00285 {
00286         runThreads=false;
00287         numThreadsRunning=0;
00288         threadDataInterface=0;
00289         tdiContext=0;
00290         numThreadsWorking=0;
00291 
00292 }
00293 template <class InputType, class OutputType>
00294 ThreadPool<InputType, OutputType>::~ThreadPool()
00295 {
00296         StopThreads();
00297         Clear();
00298 }
00299 template <class InputType, class OutputType>
00300 bool ThreadPool<InputType, OutputType>::StartThreads(int numThreads, int stackSize, void* (*_perThreadDataFactory)(), void (*_perThreadDataDestructor)(void *))
00301 {
00302         (void) stackSize;
00303 
00304         runThreadsMutex.Lock();
00305         if (runThreads==true)
00306         {
00307                 runThreadsMutex.Unlock();
00308                 return false;
00309         }
00310         runThreadsMutex.Unlock();
00311 
00312         quitAndIncomingDataEvents.InitEvent();
00313 
00314         perThreadDataFactory=_perThreadDataFactory;
00315         perThreadDataDestructor=_perThreadDataDestructor;
00316 
00317         runThreadsMutex.Lock();
00318         runThreads=true;
00319         runThreadsMutex.Unlock();
00320 
00321         numThreadsWorking=0;
00322         unsigned threadId = 0;
00323         (void) threadId;
00324         int i;
00325         for (i=0; i < numThreads; i++)
00326         {
00327                 int errorCode = RakNet::RakThread::Create(WorkerThread<InputType, OutputType>, this);
00328                 if (errorCode!=0)
00329                 {
00330                         StopThreads();
00331                         return false;
00332                 }
00333         }
00334         // Wait for number of threads running to increase to numThreads
00335         bool done=false;
00336         while (done==false)
00337         {
00338                 RakSleep(50);
00339                 numThreadsRunningMutex.Lock();
00340                 if (numThreadsRunning==numThreads)
00341                         done=true;
00342                 numThreadsRunningMutex.Unlock();
00343         }
00344 
00345         return true;
00346 }
00347 template <class InputType, class OutputType>
00348 void ThreadPool<InputType, OutputType>::SetThreadDataInterface(ThreadDataInterface *tdi, void *context)
00349 {
00350         threadDataInterface=tdi;
00351         tdiContext=context;
00352 }
00353 template <class InputType, class OutputType>
00354 void ThreadPool<InputType, OutputType>::StopThreads(void)
00355 {
00356         runThreadsMutex.Lock();
00357         if (runThreads==false)
00358         {
00359                 runThreadsMutex.Unlock();
00360                 return;
00361         }
00362 
00363         runThreads=false;
00364         runThreadsMutex.Unlock();
00365 
00366         // Wait for number of threads running to decrease to 0
00367         bool done=false;
00368         while (done==false)
00369         {
00370                 quitAndIncomingDataEvents.SetEvent();
00371 
00372                 RakSleep(50);
00373                 numThreadsRunningMutex.Lock();
00374                 if (numThreadsRunning==0)
00375                         done=true;
00376                 numThreadsRunningMutex.Unlock();
00377         }
00378 
00379         quitAndIncomingDataEvents.CloseEvent();
00380 }
00381 template <class InputType, class OutputType>
00382 void ThreadPool<InputType, OutputType>::AddInput(OutputType (*workerThreadCallback)(InputType, bool *returnOutput, void* perThreadData), InputType inputData)
00383 {
00384         inputQueueMutex.Lock();
00385         inputQueue.Push(inputData, __FILE__, __LINE__ );
00386         inputFunctionQueue.Push(workerThreadCallback, __FILE__, __LINE__ );
00387         inputQueueMutex.Unlock();
00388 
00389         quitAndIncomingDataEvents.SetEvent();
00390 }
00391 template <class InputType, class OutputType>
00392 void ThreadPool<InputType, OutputType>::AddOutput(OutputType outputData)
00393 {
00394         outputQueueMutex.Lock();
00395         outputQueue.Push(outputData, __FILE__, __LINE__ );
00396         outputQueueMutex.Unlock();
00397 }
00398 template <class InputType, class OutputType>
00399 bool ThreadPool<InputType, OutputType>::HasOutputFast(void)
00400 {
00401         return outputQueue.IsEmpty()==false;
00402 }
00403 template <class InputType, class OutputType>
00404 bool ThreadPool<InputType, OutputType>::HasOutput(void)
00405 {
00406         bool res;
00407         outputQueueMutex.Lock();
00408         res=outputQueue.IsEmpty()==false;
00409         outputQueueMutex.Unlock();
00410         return res;
00411 }
00412 template <class InputType, class OutputType>
00413 bool ThreadPool<InputType, OutputType>::HasInputFast(void)
00414 {
00415         return inputQueue.IsEmpty()==false;
00416 }
00417 template <class InputType, class OutputType>
00418 bool ThreadPool<InputType, OutputType>::HasInput(void)
00419 {
00420         bool res;
00421         inputQueueMutex.Lock();
00422         res=inputQueue.IsEmpty()==false;
00423         inputQueueMutex.Unlock();
00424         return res;
00425 }
00426 template <class InputType, class OutputType>
00427 OutputType ThreadPool<InputType, OutputType>::GetOutput(void)
00428 {
00429         // Real output check
00430         OutputType output;
00431         outputQueueMutex.Lock();
00432         output=outputQueue.Pop();
00433         outputQueueMutex.Unlock();
00434         return output;
00435 }
00436 template <class InputType, class OutputType>
00437 void ThreadPool<InputType, OutputType>::Clear(void)
00438 {
00439         runThreadsMutex.Lock();
00440         if (runThreads)
00441         {
00442                 runThreadsMutex.Unlock();
00443                 inputQueueMutex.Lock();
00444                 inputFunctionQueue.Clear(__FILE__, __LINE__);
00445                 inputQueue.Clear(__FILE__, __LINE__);
00446                 inputQueueMutex.Unlock();
00447 
00448                 outputQueueMutex.Lock();
00449                 outputQueue.Clear(__FILE__, __LINE__);
00450                 outputQueueMutex.Unlock();
00451         }
00452         else
00453         {
00454                 inputFunctionQueue.Clear(__FILE__, __LINE__);
00455                 inputQueue.Clear(__FILE__, __LINE__);
00456                 outputQueue.Clear(__FILE__, __LINE__);
00457         }
00458 }
00459 template <class InputType, class OutputType>
00460 void ThreadPool<InputType, OutputType>::LockInput(void)
00461 {
00462         inputQueueMutex.Lock();
00463 }
00464 template <class InputType, class OutputType>
00465 void ThreadPool<InputType, OutputType>::UnlockInput(void)
00466 {
00467         inputQueueMutex.Unlock();
00468 }
00469 template <class InputType, class OutputType>
00470 unsigned ThreadPool<InputType, OutputType>::InputSize(void)
00471 {
00472         return inputQueue.Size();
00473 }
00474 template <class InputType, class OutputType>
00475 InputType ThreadPool<InputType, OutputType>::GetInputAtIndex(unsigned index)
00476 {
00477         return inputQueue[index];
00478 }
00479 template <class InputType, class OutputType>
00480 void ThreadPool<InputType, OutputType>::RemoveInputAtIndex(unsigned index)
00481 {
00482         inputQueue.RemoveAtIndex(index);
00483         inputFunctionQueue.RemoveAtIndex(index);
00484 }
00485 template <class InputType, class OutputType>
00486 void ThreadPool<InputType, OutputType>::LockOutput(void)
00487 {
00488         outputQueueMutex.Lock();
00489 }
00490 template <class InputType, class OutputType>
00491 void ThreadPool<InputType, OutputType>::UnlockOutput(void)
00492 {
00493         outputQueueMutex.Unlock();
00494 }
00495 template <class InputType, class OutputType>
00496 unsigned ThreadPool<InputType, OutputType>::OutputSize(void)
00497 {
00498         return outputQueue.Size();
00499 }
00500 template <class InputType, class OutputType>
00501 OutputType ThreadPool<InputType, OutputType>::GetOutputAtIndex(unsigned index)
00502 {
00503         return outputQueue[index];
00504 }
00505 template <class InputType, class OutputType>
00506 void ThreadPool<InputType, OutputType>::RemoveOutputAtIndex(unsigned index)
00507 {
00508         outputQueue.RemoveAtIndex(index);
00509 }
00510 template <class InputType, class OutputType>
00511 void ThreadPool<InputType, OutputType>::ClearInput(void)
00512 {
00513         inputQueue.Clear(__FILE__,__LINE__);
00514         inputFunctionQueue.Clear(__FILE__,__LINE__);
00515 }
00516 
00517 template <class InputType, class OutputType>
00518 void ThreadPool<InputType, OutputType>::ClearOutput(void)
00519 {
00520         outputQueue.Clear(__FILE__,__LINE__);
00521 }
00522 template <class InputType, class OutputType>
00523 bool ThreadPool<InputType, OutputType>::IsWorking(void)
00524 {
00525         bool isWorking;
00526 //      workingThreadCountMutex.Lock();
00527 //      isWorking=numThreadsWorking!=0;
00528 //      workingThreadCountMutex.Unlock();
00529 
00530 //      if (isWorking)
00531 //              return true;
00532 
00533         // Bug fix: Originally the order of these two was reversed.
00534         // It's possible with the thread timing that working could have been false, then it picks up the data in the other thread, then it checks
00535         // here and sees there is no data.  So it thinks the thread is not working when it was.
00536         if (HasOutputFast() && HasOutput())
00537                 return true;
00538 
00539         if (HasInputFast() && HasInput())
00540                 return true;
00541 
00542         // Need to check is working again, in case the thread was between the first and second checks
00543         workingThreadCountMutex.Lock();
00544         isWorking=numThreadsWorking!=0;
00545         workingThreadCountMutex.Unlock();
00546 
00547         return isWorking;
00548 }
00549 
00550 template <class InputType, class OutputType>
00551 int ThreadPool<InputType, OutputType>::NumThreadsWorking(void)
00552 {
00553         return numThreadsWorking;
00554 }
00555 
00556 template <class InputType, class OutputType>
00557 bool ThreadPool<InputType, OutputType>::WasStarted(void)
00558 {
00559         bool b;
00560         runThreadsMutex.Lock();
00561         b = runThreads;
00562         runThreadsMutex.Unlock();
00563         return b;
00564 }
00565 template <class InputType, class OutputType>
00566 bool ThreadPool<InputType, OutputType>::Pause(void)
00567 {
00568         if (WasStarted()==false)
00569                 return false;
00570 
00571         workingThreadCountMutex.Lock();
00572         while (numThreadsWorking>0)
00573         {
00574                 RakSleep(30);
00575         }
00576         return true;
00577 }
00578 template <class InputType, class OutputType>
00579 void ThreadPool<InputType, OutputType>::Resume(void)
00580 {
00581         workingThreadCountMutex.Unlock();
00582 }
00583 
00584 #ifdef _MSC_VER
00585 #pragma warning( pop )
00586 #endif
00587 
00588 #endif
00589 

Generated on Thu Sep 30 2010 01:27:28 for RakNet by  doxygen 1.7.1