00001 #ifndef __THREAD_POOL_H
00002 #define __THREAD_POOL_H
00003
00004 #include "RakMemoryOverride.h"
00005 #include "DS_Queue.h"
00006 #include "SimpleMutex.h"
00007 #include "Export.h"
00008 #include "RakThread.h"
00009 #include "SignaledEvent.h"
00010
00011 #ifdef _MSC_VER
00012 #pragma warning( push )
00013 #endif
00014
00015 class ThreadDataInterface
00016 {
00017 public:
00018 ThreadDataInterface() {}
00019 virtual ~ThreadDataInterface() {}
00020
00021 virtual void* PerThreadFactory(void *context)=0;
00022 virtual void PerThreadDestructor(void* factoryResult, void *context)=0;
00023 };
00028 template <class InputType, class OutputType>
00029 struct RAK_DLL_EXPORT ThreadPool
00030 {
00031 ThreadPool();
00032 ~ThreadPool();
00033
00040 bool StartThreads(int numThreads, int stackSize, void* (*_perThreadInit)()=0, void (*_perThreadDeinit)(void*)=0);
00041
00042
00043 void SetThreadDataInterface(ThreadDataInterface *tdi, void *context);
00044
00046 void StopThreads(void);
00047
00056 void AddInput(OutputType (*workerThreadCallback)(InputType, bool *returnOutput, void* perThreadData), InputType inputData);
00057
00061 void AddOutput(OutputType outputData);
00062
00065 bool HasOutput(void);
00066
00069 bool HasOutputFast(void);
00070
00073 bool HasInput(void);
00074
00077 bool HasInputFast(void);
00078
00082 OutputType GetOutput(void);
00083
00085 void Clear(void);
00086
00089 void LockInput(void);
00090
00092 void UnlockInput(void);
00093
00095 unsigned InputSize(void);
00096
00098 InputType GetInputAtIndex(unsigned index);
00099
00101 void RemoveInputAtIndex(unsigned index);
00102
00105 void LockOutput(void);
00106
00108 void UnlockOutput(void);
00109
00111 unsigned OutputSize(void);
00112
00114 OutputType GetOutputAtIndex(unsigned index);
00115
00117 void RemoveOutputAtIndex(unsigned index);
00118
00120 void ClearInput(void);
00121
00123 void ClearOutput(void);
00124
00126 bool IsWorking(void);
00127
00129 int NumThreadsWorking(void);
00130
00132 bool WasStarted(void);
00133
00134
00135 bool Pause(void);
00136
00137
00138 void Resume(void);
00139
00140 protected:
00141
00142
00143 SimpleMutex inputQueueMutex, outputQueueMutex, workingThreadCountMutex, runThreadsMutex;
00144
00145 void* (*perThreadDataFactory)();
00146 void (*perThreadDataDestructor)(void*);
00147
00148
00149
00150 DataStructures::Queue<OutputType (*)(InputType, bool *, void*)> inputFunctionQueue;
00151 DataStructures::Queue<InputType> inputQueue;
00152 DataStructures::Queue<OutputType> outputQueue;
00153
00154 ThreadDataInterface *threadDataInterface;
00155 void *tdiContext;
00156
00157
00158 template <class ThreadInputType, class ThreadOutputType>
00159 friend RAK_THREAD_DECLARATION(WorkerThread);
00160
00161
00162
00163
00164
00165
00166
00167
00168
00170 bool runThreads;
00172 int numThreadsRunning;
00174 int numThreadsWorking;
00176 SimpleMutex numThreadsRunningMutex;
00177
00178 SignaledEvent quitAndIncomingDataEvents;
00179 };
00180
00181 #include "ThreadPool.h"
00182 #include "RakSleep.h"
00183 #ifdef _WIN32
00184 #else
00185 #include <unistd.h>
00186 #endif
00187
00188 #ifdef _MSC_VER
00189 #pragma warning(disable:4127)
00190 #pragma warning( disable : 4701 ) // potentially uninitialized local variable 'inputData' used
00191 #endif
00192
00193 template <class ThreadInputType, class ThreadOutputType>
00194 RAK_THREAD_DECLARATION(WorkerThread)
00195
00196
00197
00198
00199
00200
00201
00202 {
00203 bool returnOutput;
00204 ThreadPool<ThreadInputType, ThreadOutputType> *threadPool = (ThreadPool<ThreadInputType, ThreadOutputType>*) arguments;
00205 ThreadOutputType (*userCallback)(ThreadInputType, bool *, void*);
00206 ThreadInputType inputData;
00207 ThreadOutputType callbackOutput;
00208
00209 userCallback=0;
00210
00211 void *perThreadData;
00212 if (threadPool->perThreadDataFactory)
00213 perThreadData=threadPool->perThreadDataFactory();
00214 else if (threadPool->threadDataInterface)
00215 perThreadData=threadPool->threadDataInterface->PerThreadFactory(threadPool->tdiContext);
00216 else
00217 perThreadData=0;
00218
00219
00220 threadPool->numThreadsRunningMutex.Lock();
00221 ++threadPool->numThreadsRunning;
00222 threadPool->numThreadsRunningMutex.Unlock();
00223
00224 while (1)
00225 {
00226 #ifdef _WIN32
00227 if (userCallback==0)
00228 {
00229 threadPool->quitAndIncomingDataEvents.WaitOnEvent(INFINITE);
00230 }
00231 #endif
00232
00233 threadPool->runThreadsMutex.Lock();
00234 if (threadPool->runThreads==false)
00235 {
00236 threadPool->runThreadsMutex.Unlock();
00237 break;
00238 }
00239 threadPool->runThreadsMutex.Unlock();
00240
00241 threadPool->workingThreadCountMutex.Lock();
00242 ++threadPool->numThreadsWorking;
00243 threadPool->workingThreadCountMutex.Unlock();
00244
00245
00246 userCallback=0;
00247 threadPool->inputQueueMutex.Lock();
00248 if (threadPool->inputFunctionQueue.Size())
00249 {
00250 userCallback=threadPool->inputFunctionQueue.Pop();
00251 inputData=threadPool->inputQueue.Pop();
00252 }
00253 threadPool->inputQueueMutex.Unlock();
00254
00255 if (userCallback)
00256 {
00257 callbackOutput=userCallback(inputData, &returnOutput,perThreadData);
00258 if (returnOutput)
00259 {
00260 threadPool->outputQueueMutex.Lock();
00261 threadPool->outputQueue.Push(callbackOutput, __FILE__, __LINE__ );
00262 threadPool->outputQueueMutex.Unlock();
00263 }
00264 }
00265
00266 threadPool->workingThreadCountMutex.Lock();
00267 --threadPool->numThreadsWorking;
00268 threadPool->workingThreadCountMutex.Unlock();
00269 }
00270
00271
00272 threadPool->numThreadsRunningMutex.Lock();
00273 --threadPool->numThreadsRunning;
00274 threadPool->numThreadsRunningMutex.Unlock();
00275
00276 if (threadPool->perThreadDataDestructor)
00277 threadPool->perThreadDataDestructor(perThreadData);
00278 else if (threadPool->threadDataInterface)
00279 threadPool->threadDataInterface->PerThreadDestructor(perThreadData, threadPool->tdiContext);
00280
00281 return 0;
00282 }
00283 template <class InputType, class OutputType>
00284 ThreadPool<InputType, OutputType>::ThreadPool()
00285 {
00286 runThreads=false;
00287 numThreadsRunning=0;
00288 threadDataInterface=0;
00289 tdiContext=0;
00290 numThreadsWorking=0;
00291
00292 }
00293 template <class InputType, class OutputType>
00294 ThreadPool<InputType, OutputType>::~ThreadPool()
00295 {
00296 StopThreads();
00297 Clear();
00298 }
00299 template <class InputType, class OutputType>
00300 bool ThreadPool<InputType, OutputType>::StartThreads(int numThreads, int stackSize, void* (*_perThreadDataFactory)(), void (*_perThreadDataDestructor)(void *))
00301 {
00302 (void) stackSize;
00303
00304 runThreadsMutex.Lock();
00305 if (runThreads==true)
00306 {
00307 runThreadsMutex.Unlock();
00308 return false;
00309 }
00310 runThreadsMutex.Unlock();
00311
00312 quitAndIncomingDataEvents.InitEvent();
00313
00314 perThreadDataFactory=_perThreadDataFactory;
00315 perThreadDataDestructor=_perThreadDataDestructor;
00316
00317 runThreadsMutex.Lock();
00318 runThreads=true;
00319 runThreadsMutex.Unlock();
00320
00321 numThreadsWorking=0;
00322 unsigned threadId = 0;
00323 (void) threadId;
00324 int i;
00325 for (i=0; i < numThreads; i++)
00326 {
00327 int errorCode = RakNet::RakThread::Create(WorkerThread<InputType, OutputType>, this);
00328 if (errorCode!=0)
00329 {
00330 StopThreads();
00331 return false;
00332 }
00333 }
00334
00335 bool done=false;
00336 while (done==false)
00337 {
00338 RakSleep(50);
00339 numThreadsRunningMutex.Lock();
00340 if (numThreadsRunning==numThreads)
00341 done=true;
00342 numThreadsRunningMutex.Unlock();
00343 }
00344
00345 return true;
00346 }
00347 template <class InputType, class OutputType>
00348 void ThreadPool<InputType, OutputType>::SetThreadDataInterface(ThreadDataInterface *tdi, void *context)
00349 {
00350 threadDataInterface=tdi;
00351 tdiContext=context;
00352 }
00353 template <class InputType, class OutputType>
00354 void ThreadPool<InputType, OutputType>::StopThreads(void)
00355 {
00356 runThreadsMutex.Lock();
00357 if (runThreads==false)
00358 {
00359 runThreadsMutex.Unlock();
00360 return;
00361 }
00362
00363 runThreads=false;
00364 runThreadsMutex.Unlock();
00365
00366
00367 bool done=false;
00368 while (done==false)
00369 {
00370 quitAndIncomingDataEvents.SetEvent();
00371
00372 RakSleep(50);
00373 numThreadsRunningMutex.Lock();
00374 if (numThreadsRunning==0)
00375 done=true;
00376 numThreadsRunningMutex.Unlock();
00377 }
00378
00379 quitAndIncomingDataEvents.CloseEvent();
00380 }
00381 template <class InputType, class OutputType>
00382 void ThreadPool<InputType, OutputType>::AddInput(OutputType (*workerThreadCallback)(InputType, bool *returnOutput, void* perThreadData), InputType inputData)
00383 {
00384 inputQueueMutex.Lock();
00385 inputQueue.Push(inputData, __FILE__, __LINE__ );
00386 inputFunctionQueue.Push(workerThreadCallback, __FILE__, __LINE__ );
00387 inputQueueMutex.Unlock();
00388
00389 quitAndIncomingDataEvents.SetEvent();
00390 }
00391 template <class InputType, class OutputType>
00392 void ThreadPool<InputType, OutputType>::AddOutput(OutputType outputData)
00393 {
00394 outputQueueMutex.Lock();
00395 outputQueue.Push(outputData, __FILE__, __LINE__ );
00396 outputQueueMutex.Unlock();
00397 }
00398 template <class InputType, class OutputType>
00399 bool ThreadPool<InputType, OutputType>::HasOutputFast(void)
00400 {
00401 return outputQueue.IsEmpty()==false;
00402 }
00403 template <class InputType, class OutputType>
00404 bool ThreadPool<InputType, OutputType>::HasOutput(void)
00405 {
00406 bool res;
00407 outputQueueMutex.Lock();
00408 res=outputQueue.IsEmpty()==false;
00409 outputQueueMutex.Unlock();
00410 return res;
00411 }
00412 template <class InputType, class OutputType>
00413 bool ThreadPool<InputType, OutputType>::HasInputFast(void)
00414 {
00415 return inputQueue.IsEmpty()==false;
00416 }
00417 template <class InputType, class OutputType>
00418 bool ThreadPool<InputType, OutputType>::HasInput(void)
00419 {
00420 bool res;
00421 inputQueueMutex.Lock();
00422 res=inputQueue.IsEmpty()==false;
00423 inputQueueMutex.Unlock();
00424 return res;
00425 }
00426 template <class InputType, class OutputType>
00427 OutputType ThreadPool<InputType, OutputType>::GetOutput(void)
00428 {
00429
00430 OutputType output;
00431 outputQueueMutex.Lock();
00432 output=outputQueue.Pop();
00433 outputQueueMutex.Unlock();
00434 return output;
00435 }
00436 template <class InputType, class OutputType>
00437 void ThreadPool<InputType, OutputType>::Clear(void)
00438 {
00439 runThreadsMutex.Lock();
00440 if (runThreads)
00441 {
00442 runThreadsMutex.Unlock();
00443 inputQueueMutex.Lock();
00444 inputFunctionQueue.Clear(__FILE__, __LINE__);
00445 inputQueue.Clear(__FILE__, __LINE__);
00446 inputQueueMutex.Unlock();
00447
00448 outputQueueMutex.Lock();
00449 outputQueue.Clear(__FILE__, __LINE__);
00450 outputQueueMutex.Unlock();
00451 }
00452 else
00453 {
00454 inputFunctionQueue.Clear(__FILE__, __LINE__);
00455 inputQueue.Clear(__FILE__, __LINE__);
00456 outputQueue.Clear(__FILE__, __LINE__);
00457 }
00458 }
00459 template <class InputType, class OutputType>
00460 void ThreadPool<InputType, OutputType>::LockInput(void)
00461 {
00462 inputQueueMutex.Lock();
00463 }
00464 template <class InputType, class OutputType>
00465 void ThreadPool<InputType, OutputType>::UnlockInput(void)
00466 {
00467 inputQueueMutex.Unlock();
00468 }
00469 template <class InputType, class OutputType>
00470 unsigned ThreadPool<InputType, OutputType>::InputSize(void)
00471 {
00472 return inputQueue.Size();
00473 }
00474 template <class InputType, class OutputType>
00475 InputType ThreadPool<InputType, OutputType>::GetInputAtIndex(unsigned index)
00476 {
00477 return inputQueue[index];
00478 }
00479 template <class InputType, class OutputType>
00480 void ThreadPool<InputType, OutputType>::RemoveInputAtIndex(unsigned index)
00481 {
00482 inputQueue.RemoveAtIndex(index);
00483 inputFunctionQueue.RemoveAtIndex(index);
00484 }
00485 template <class InputType, class OutputType>
00486 void ThreadPool<InputType, OutputType>::LockOutput(void)
00487 {
00488 outputQueueMutex.Lock();
00489 }
00490 template <class InputType, class OutputType>
00491 void ThreadPool<InputType, OutputType>::UnlockOutput(void)
00492 {
00493 outputQueueMutex.Unlock();
00494 }
00495 template <class InputType, class OutputType>
00496 unsigned ThreadPool<InputType, OutputType>::OutputSize(void)
00497 {
00498 return outputQueue.Size();
00499 }
00500 template <class InputType, class OutputType>
00501 OutputType ThreadPool<InputType, OutputType>::GetOutputAtIndex(unsigned index)
00502 {
00503 return outputQueue[index];
00504 }
00505 template <class InputType, class OutputType>
00506 void ThreadPool<InputType, OutputType>::RemoveOutputAtIndex(unsigned index)
00507 {
00508 outputQueue.RemoveAtIndex(index);
00509 }
00510 template <class InputType, class OutputType>
00511 void ThreadPool<InputType, OutputType>::ClearInput(void)
00512 {
00513 inputQueue.Clear(__FILE__,__LINE__);
00514 inputFunctionQueue.Clear(__FILE__,__LINE__);
00515 }
00516
00517 template <class InputType, class OutputType>
00518 void ThreadPool<InputType, OutputType>::ClearOutput(void)
00519 {
00520 outputQueue.Clear(__FILE__,__LINE__);
00521 }
00522 template <class InputType, class OutputType>
00523 bool ThreadPool<InputType, OutputType>::IsWorking(void)
00524 {
00525 bool isWorking;
00526
00527
00528
00529
00530
00531
00532
00533
00534
00535
00536 if (HasOutputFast() && HasOutput())
00537 return true;
00538
00539 if (HasInputFast() && HasInput())
00540 return true;
00541
00542
00543 workingThreadCountMutex.Lock();
00544 isWorking=numThreadsWorking!=0;
00545 workingThreadCountMutex.Unlock();
00546
00547 return isWorking;
00548 }
00549
00550 template <class InputType, class OutputType>
00551 int ThreadPool<InputType, OutputType>::NumThreadsWorking(void)
00552 {
00553 return numThreadsWorking;
00554 }
00555
00556 template <class InputType, class OutputType>
00557 bool ThreadPool<InputType, OutputType>::WasStarted(void)
00558 {
00559 bool b;
00560 runThreadsMutex.Lock();
00561 b = runThreads;
00562 runThreadsMutex.Unlock();
00563 return b;
00564 }
00565 template <class InputType, class OutputType>
00566 bool ThreadPool<InputType, OutputType>::Pause(void)
00567 {
00568 if (WasStarted()==false)
00569 return false;
00570
00571 workingThreadCountMutex.Lock();
00572 while (numThreadsWorking>0)
00573 {
00574 RakSleep(30);
00575 }
00576 return true;
00577 }
00578 template <class InputType, class OutputType>
00579 void ThreadPool<InputType, OutputType>::Resume(void)
00580 {
00581 workingThreadCountMutex.Unlock();
00582 }
00583
00584 #ifdef _MSC_VER
00585 #pragma warning( pop )
00586 #endif
00587
00588 #endif
00589