38#include "blocxx/BLOCXX_config.h"
65#define BLOCXX_POOL_LOG_DEBUG(logger, arg) do { BLOCXX_LOG_DEBUG(logger, m_poolName + ": " + arg); } while (0)
66#define BLOCXX_POOL_LOG_DEBUG2(logger, arg) do { BLOCXX_LOG_DEBUG2(logger, m_poolName + ": " + arg); } while (0)
67#define BLOCXX_POOL_LOG_DEBUG3(logger, arg) do { BLOCXX_LOG_DEBUG3(logger, m_poolName + ": " + arg); } while (0)
68#define BLOCXX_POOL_LOG_ERROR(logger, arg) do { BLOCXX_LOG_ERROR(logger, m_poolName + ": " + arg); } while (0)
69#define BLOCXX_POOL_LOG_FATAL_ERROR(logger, arg) do { BLOCXX_LOG_FATAL_ERROR(logger, m_poolName + ": " + arg); } while (0)
87class FixedSizePoolImpl;
89class FixedSizePoolWorkerThread :
public Thread
92 FixedSizePoolWorkerThread(FixedSizePoolImpl* thePool)
99 virtual void doShutdown()
101 MutexLock lock(m_guard);
102 if (m_currentRunnable)
107 virtual void doCooperativeCancel()
109 MutexLock lock(m_guard);
110 if (m_currentRunnable)
115 virtual void doDefinitiveCancel()
117 MutexLock lock(m_guard);
118 if (m_currentRunnable)
130 FixedSizePoolWorkerThread(
const FixedSizePoolWorkerThread&);
131 FixedSizePoolWorkerThread& operator=(
const FixedSizePoolWorkerThread&);
134class CommonPoolImpl :
public ThreadPoolImpl
137 CommonPoolImpl(UInt32 maxQueueSize,
const Logger& logger,
const String& poolName)
146 virtual ~CommonPoolImpl()
151 virtual bool queueIsFull()
const
153 return ((m_maxQueueSize > 0) && (
m_queue.size() == m_maxQueueSize));
157 bool queueClosed()
const
164 NonRecursiveMutexLock l(m_queueLock);
174 if (finishWorkInQueue)
176 TimeoutTimer timer(timeout);
179 if (timer.infinite())
187 if (!
m_queueEmpty.timedWait(l, timer.asAbsoluteTimeout()))
199 virtual void waitForEmptyQueue()
201 NonRecursiveMutexLock l(m_queueLock);
212 TimeoutTimer shutdownTimer(shutdownTimeout);
213 TimeoutTimer dTimer(definitiveCancelTimeout);
214 if (!finishOffWorkInQueue(finishWorkInQueue, shutdownTimer.asAbsoluteTimeout()))
223 if (!shutdownTimer.infinite())
233 Timeout absoluteShutdownTimeout(shutdownTimer.asAbsoluteTimeout());
237 m_threads[
i]->timedWait(absoluteShutdownTimeout);
247 if (!dTimer.infinite())
250 Timeout absoluteDefinitiveTimeout(dTimer.asAbsoluteTimeout());
256 if (!m_threads[
i]->definitiveCancel(absoluteDefinitiveTimeout))
261 catch (CancellationDeniedException& e)
263 BLOCXX_POOL_LOG_ERROR(m_logger, Format(
"Caught CanacellationDeniedException: %1 for thread %2. Pool shutdown may hang.", e,
i));
281 NonRecursiveMutexLock l(m_queueLock);
282 while ((
m_queue.size() == 0) && (!m_shutdown))
311 incrementWorkerCount();
328 virtual void incrementWorkerCount()
332 virtual void decrementWorkerCount()
351class FixedSizePoolImpl :
public CommonPoolImpl
354 FixedSizePoolImpl(UInt32 numThreads, UInt32 maxQueueSize,
const Logger& logger,
const String& poolName)
355 : CommonPoolImpl(maxQueueSize, logger, poolName)
359 for (UInt32
i = 0;
i < numThreads; ++
i)
363 for (UInt32
i = 0;
i < numThreads; ++
i)
369 catch (ThreadException& e)
381 virtual bool addWork(
const RunnableRef& work,
const Timeout& timeout)
391 TimeoutTimer timer(timeout);
392 while ( queueIsFull() && !queueClosed() )
426 shutdownThreads(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
428 virtual ~FixedSizePoolImpl()
446 friend class FixedSizePoolWorkerThread;
455 catch (ThreadCancelledException&)
459 catch (Exception& ex)
462 std::clog <<
"!!! Exception: " << ex.type() <<
" caught in ThreadPool worker: " << ex << std::endl;
464 Logger logger(COMPONENT_NAME);
465 BLOCXX_LOG_ERROR(logger, Format(
"!!! Exception caught in ThreadPool worker: %1", ex));
467 catch(std::exception& ex)
470 std::clog <<
"!!! std::exception what = \"" << ex.what() <<
"\" caught in ThreadPool worker" << std::endl;
472 Logger logger(COMPONENT_NAME);
473 BLOCXX_LOG_ERROR(logger, Format(
"!!! std::exception caught in ThreadPool worker: %1", ex.what()));
478 std::clog <<
"!!! Unknown Exception caught in ThreadPool worker" << std::endl;
480 Logger logger(COMPONENT_NAME);
481 BLOCXX_LOG_ERROR(logger,
"!!! Unknown Exception caught in ThreadPool worker.");
484Int32 FixedSizePoolWorkerThread::run()
507class DynamicSizePoolImpl;
509class DynamicSizePoolWorkerThread :
public Thread
512 DynamicSizePoolWorkerThread(DynamicSizePoolImpl* thePool)
519 virtual void doShutdown()
521 MutexLock lock(m_guard);
522 if (m_currentRunnable)
527 virtual void doCooperativeCancel()
529 MutexLock lock(m_guard);
530 if (m_currentRunnable)
535 virtual void doDefinitiveCancel()
537 MutexLock lock(m_guard);
538 if (m_currentRunnable)
550 DynamicSizePoolWorkerThread(
const DynamicSizePoolWorkerThread&);
551 DynamicSizePoolWorkerThread& operator=(
const DynamicSizePoolWorkerThread&);
554class DynamicSizePoolImpl :
public CommonPoolImpl
557 DynamicSizePoolImpl(UInt32 maxThreads, UInt32 maxQueueSize,
const Logger& logger,
const String& poolName)
558 : CommonPoolImpl(maxQueueSize, logger, poolName)
563 virtual bool addWork(
const RunnableRef& work,
const Timeout& timeout)
598 TimeoutTimer timer(timeout);
599 while ( queueIsFull() && !queueClosed() )
635 ThreadRef theThread(
new DynamicSizePoolWorkerThread(
this));
642 catch (ThreadException& e)
656 shutdownThreads(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
658 virtual ~DynamicSizePoolImpl()
677 UInt32 getMaxThreads()
const
685 friend class DynamicSizePoolWorkerThread;
687Int32 DynamicSizePoolWorkerThread::run()
713class DynamicSizeNoQueuePoolImpl :
public DynamicSizePoolImpl
716 DynamicSizeNoQueuePoolImpl(UInt32 maxThreads,
const Logger& logger,
const String& poolName)
717 : DynamicSizePoolImpl(maxThreads, maxThreads, logger, poolName)
722 virtual ~DynamicSizeNoQueuePoolImpl()
726 virtual void incrementWorkerCount()
731 virtual void decrementWorkerCount()
740 virtual bool queueIsFull()
const
744 size_t freeThreads = getMaxThreads() -
AtomicGet(m_workingThreads);
745 return (freeThreads <=
m_queue.size());
762 m_impl =
new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
765 m_impl =
new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
768 m_impl =
new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
778 m_impl =
new FixedSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
781 m_impl =
new DynamicSizePoolImpl(numThreads, maxQueueSize, logger, poolName);
784 m_impl =
new DynamicSizeNoQueuePoolImpl(numThreads, logger, poolName);
801 return m_impl->addWork(work, timeout);
811 m_impl->shutdown(finishWorkInQueue, timeout, timeout);
816 m_impl->shutdown(finishWorkInQueue, shutdownTimeout, definitiveCancelTimeout);
821 m_impl->waitForEmptyQueue();
#define BLOCXX_DEFINE_EXCEPTION(NAME)
Define a new exception class named <NAME>Exception that derives from Exception.
#define BLOCXX_GLOBAL_STRING_INIT(str)
#define BLOCXX_LOG_ERROR(logger, message)
Log message to logger with the Error level.
std::deque< RunnableRef > m_queue
FixedSizePoolImpl * m_thePool
#define BLOCXX_POOL_LOG_DEBUG(logger, arg)
NonRecursiveMutex m_queueLock
RunnableRef m_currentRunnable
#define BLOCXX_POOL_LOG_ERROR(logger, arg)
Array< ThreadRef > m_threads
#define BLOCXX_POOL_LOG_DEBUG2(logger, arg)
Condition m_queueNotEmpty
#define BLOCXX_POOL_LOG_FATAL_ERROR(logger, arg)
#define BLOCXX_POOL_LOG_DEBUG3(logger, arg)
This logger just discards all log messages.
This String class is an abstract data type that represents as NULL terminated string of characters.
static void yield()
Voluntarily yield to the processor giving the next thread in the chain the opportunity to run.
The ThreadPool class is used to coordinate a group of threads.
bool addWork(const RunnableRef &work)
Add an RunnableRef for the pool to execute.
ThreadPool(PoolType poolType, UInt32 numThreads, UInt32 maxQueueSize, const Logger &logger, const String &poolName="")
Constructor.
bool tryAddWork(const RunnableRef &work)
Add an RunnableRef for the pool to execute.
ThreadPool & operator=(const ThreadPool &x)
void shutdown(EShutdownQueueFlag finishWorkInQueue=E_FINISH_WORK_IN_QUEUE, const Timeout &timeout=Timeout::infinite)
Instruct all threads to exit and stop working.
@ E_DISCARD_WORK_IN_QUEUE
void waitForEmptyQueue()
Wait for the queue to empty out.
IntrusiveReference< ThreadPoolImpl > m_impl
virtual void waitForEmptyQueue()=0
virtual ~ThreadPoolImpl()
virtual void shutdown(ThreadPool::EShutdownQueueFlag finishWorkInQueue, const Timeout &shutdownTimeout, const Timeout &definitiveCancelTimeout)=0
virtual bool addWork(const RunnableRef &work, const Timeout &timeout)=0
A timeout can be absolute, which means that it will happen at the specified DateTime.
static Timeout relative(float seconds)
LazyGlobal< String, char const *const > GlobalString
IntrusiveReference< Runnable > RunnableRef
IntrusiveReference< Thread > ThreadRef
class BLOCXX_COMMON_API Logger
int AtomicGet(Atomic_t const &v)