From 0995520d6f08174380c8a58bf1794c7c4347338c Mon Sep 17 00:00:00 2001 From: Daniel Buckmaster Date: Sun, 5 Jul 2015 12:59:16 +1000 Subject: [PATCH] Add a method to wait for all pending items in a ThreadPool. --- Engine/source/platform/threads/threadPool.cpp | 27 ++++++++++++++++++- Engine/source/platform/threads/threadPool.h | 15 +++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/Engine/source/platform/threads/threadPool.cpp b/Engine/source/platform/threads/threadPool.cpp index 15cfcd9e0..81e40894e 100644 --- a/Engine/source/platform/threads/threadPool.cpp +++ b/Engine/source/platform/threads/threadPool.cpp @@ -282,6 +282,8 @@ void ThreadPool::WorkerThread::run( void* arg ) Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' takes item '0x%x'", getId(), *workItem ); #endif workItem->process(); + + dFetchAndAdd( mPool->mNumPendingItems, ( U32 ) -1 ); } else waitForSignal = true; @@ -319,6 +321,7 @@ ThreadPool::ThreadPool( const char* name, U32 numThreads ) : mName( name ), mNumThreads( numThreads ), mNumThreadsAwake( 0 ), + mNumPendingItems( 0 ), mThreads( 0 ), mSemaphore( 0 ) { @@ -410,7 +413,7 @@ void ThreadPool::queueWorkItem( WorkItem* item ) else { // Put the item in the queue. - + dFetchAndAdd( mNumPendingItems, 1 ); mWorkItemQueue.insert( item->getPriority(), item ); mSemaphore.release(); @@ -441,6 +444,28 @@ void ThreadPool::flushWorkItems( S32 timeOut ) } } +void ThreadPool::waitForAllItems( S32 timeOut ) +{ + AssertFatal( mNumPendingItems, "ThreadPool::waitForAllItems() - no items pending" ); + + U32 endTime = 0; + if( timeOut != -1 ) + endTime = Platform::getRealMilliseconds() + timeOut; + + // Spinlock until there are no items that have not been processed. + + while( dAtomicRead( mNumPendingItems ) ) + { + Platform::sleep( 25 ); + + // Stop if we have exceeded our processing time budget. + + if( timeOut != -1 + && Platform::getRealMilliseconds() >= endTime ) + break; + } +} + //-------------------------------------------------------------------------- void ThreadPool::queueWorkItemOnMainThread( WorkItem* item ) diff --git a/Engine/source/platform/threads/threadPool.h b/Engine/source/platform/threads/threadPool.h index b77244034..5fb8cc60b 100644 --- a/Engine/source/platform/threads/threadPool.h +++ b/Engine/source/platform/threads/threadPool.h @@ -264,6 +264,9 @@ class ThreadPool /// Number of worker threads guaranteed to be non-blocking. U32 mNumThreadsReady; + + /// Number of work items that have not yet completed execution. + U32 mNumPendingItems; /// Semaphore used to wake up threads, if necessary. Semaphore mSemaphore; @@ -316,6 +319,18 @@ class ThreadPool /// the queue to flush out. -1 = infinite. void flushWorkItems( S32 timeOut = -1 ); + /// If you're using a non-global thread pool to parallelise some work, you + /// may want to block until all the parallel work is complete. As with + /// flushWorkItems, this method may block indefinitely if new items keep + /// getting added to the pool before old ones finish. + /// + /// This method will not wait for items queued on the main thread using + /// queueWorkItemOnMainThread! + /// + /// @param timeOut Soft limit on the number of milliseconds to wait for + /// all items to complete. -1 = infinite. + void waitForAllItems( S32 timeOut = -1 ); + /// Add a work item to the main thread's work queue. /// /// The main thread's work queue will be processed each frame using