diff --git a/Engine/source/platform/threads/test/threadPoolTest.cpp b/Engine/source/platform/threads/test/threadPoolTest.cpp index cfb5c5dd4..c5b035bfb 100644 --- a/Engine/source/platform/threads/test/threadPoolTest.cpp +++ b/Engine/source/platform/threads/test/threadPoolTest.cpp @@ -44,6 +44,20 @@ public: mResults[mIndex] = mIndex; } }; + + // A worker that delays for some time. We'll use this to test the ThreadPool's + // synchronous and asynchronous operations. + struct DelayItem : public ThreadPool::WorkItem + { + U32 ms; + DelayItem(U32 _ms) : ms(_ms) {} + + protected: + virtual void execute() + { + Platform::sleep(ms); + } + }; }; TEST_FIX(ThreadPool, BasicAPI) @@ -63,8 +77,7 @@ TEST_FIX(ThreadPool, BasicAPI) pool->queueWorkItem(item); } - // Wait for all items to complete. - pool->flushWorkItems(); + pool->waitForAllItems(); // Verify. for (U32 i = 0; i < numItems; i++) @@ -72,4 +85,37 @@ TEST_FIX(ThreadPool, BasicAPI) results.clear(); } +TEST_FIX(ThreadPool, Asynchronous) +{ + const U32 delay = 500; //ms + + // Launch a single delaying work item. + ThreadPool* pool = &ThreadPool::GLOBAL(); + ThreadSafeRef item(new DelayItem(delay)); + pool->queueWorkItem(item); + + // The thread should not yet be finished. + EXPECT_EQ(false, item->hasExecuted()); + + // Wait til the item should have completed. + Platform::sleep(delay * 2); + + EXPECT_EQ(true, item->hasExecuted()); +} + +TEST_FIX(ThreadPool, Synchronous) +{ + const U32 delay = 500; //ms + + // Launch a single delaying work item. + ThreadPool* pool = &ThreadPool::GLOBAL(); + ThreadSafeRef item(new DelayItem(delay)); + pool->queueWorkItem(item); + + // Wait for the item to complete. + pool->waitForAllItems(); + + EXPECT_EQ(true, item->hasExecuted()); +} + #endif \ No newline at end of file diff --git a/Engine/source/platform/threads/threadPool.cpp b/Engine/source/platform/threads/threadPool.cpp index 5b96b495b..feaa52ae0 100644 --- a/Engine/source/platform/threads/threadPool.cpp +++ b/Engine/source/platform/threads/threadPool.cpp @@ -120,6 +120,7 @@ void ThreadPool::Context::updateAccumulatedPriorityBiases() void ThreadPool::WorkItem::process() { execute(); + mExecuted = true; } //-------------------------------------------------------------------------- @@ -281,6 +282,8 @@ void ThreadPool::WorkerThread::run( void* arg ) Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' takes item '0x%x'", getId(), *workItem ); #endif workItem->process(); + + dFetchAndAdd( mPool->mNumPendingItems, ( U32 ) -1 ); } else waitForSignal = true; @@ -318,6 +321,7 @@ ThreadPool::ThreadPool( const char* name, U32 numThreads ) : mName( name ), mNumThreads( numThreads ), mNumThreadsAwake( 0 ), + mNumPendingItems( 0 ), mThreads( 0 ), mSemaphore( 0 ) { @@ -409,7 +413,7 @@ void ThreadPool::queueWorkItem( WorkItem* item ) else { // Put the item in the queue. - + dFetchAndAdd( mNumPendingItems, 1 ); mWorkItemQueue.insert( item->getPriority(), item ); mSemaphore.release(); @@ -440,6 +444,26 @@ void ThreadPool::flushWorkItems( S32 timeOut ) } } +void ThreadPool::waitForAllItems( S32 timeOut ) +{ + U32 endTime = 0; + if( timeOut != -1 ) + endTime = Platform::getRealMilliseconds() + timeOut; + + // Spinlock until there are no items that have not been processed. + + while( dAtomicRead( mNumPendingItems ) ) + { + Platform::sleep( 25 ); + + // Stop if we have exceeded our processing time budget. + + if( timeOut != -1 + && Platform::getRealMilliseconds() >= endTime ) + break; + } +} + //-------------------------------------------------------------------------- void ThreadPool::queueWorkItemOnMainThread( WorkItem* item ) diff --git a/Engine/source/platform/threads/threadPool.h b/Engine/source/platform/threads/threadPool.h index 2f18a5bee..5fb8cc60b 100644 --- a/Engine/source/platform/threads/threadPool.h +++ b/Engine/source/platform/threads/threadPool.h @@ -194,6 +194,9 @@ class ThreadPool /// This is the primary function to implement by subclasses. virtual void execute() = 0; + /// This flag is set after the execute() method has completed. + bool mExecuted; + public: /// Construct a new work item. @@ -201,7 +204,8 @@ class ThreadPool /// @param context The work context in which the item should be placed. /// If NULL, the root context will be used. WorkItem( Context* context = 0 ) - : mContext( context ? context : Context::ROOT_CONTEXT() ) + : mContext( context ? context : Context::ROOT_CONTEXT() ), + mExecuted( false ) { } @@ -229,6 +233,12 @@ class ThreadPool /// Return the item's base priority value. /// @return item priority; defaults to 1.0. virtual F32 getPriority(); + + /// Has this work item been executed already? + bool hasExecuted() const + { + return mExecuted; + } }; typedef ThreadSafeRef< WorkItem > WorkItemPtr; @@ -254,6 +264,9 @@ class ThreadPool /// Number of worker threads guaranteed to be non-blocking. U32 mNumThreadsReady; + + /// Number of work items that have not yet completed execution. + U32 mNumPendingItems; /// Semaphore used to wake up threads, if necessary. Semaphore mSemaphore; @@ -306,6 +319,18 @@ class ThreadPool /// the queue to flush out. -1 = infinite. void flushWorkItems( S32 timeOut = -1 ); + /// If you're using a non-global thread pool to parallelise some work, you + /// may want to block until all the parallel work is complete. As with + /// flushWorkItems, this method may block indefinitely if new items keep + /// getting added to the pool before old ones finish. + /// + /// This method will not wait for items queued on the main thread using + /// queueWorkItemOnMainThread! + /// + /// @param timeOut Soft limit on the number of milliseconds to wait for + /// all items to complete. -1 = infinite. + void waitForAllItems( S32 timeOut = -1 ); + /// Add a work item to the main thread's work queue. /// /// The main thread's work queue will be processed each frame using diff --git a/Tools/CMake/torque3d.cmake b/Tools/CMake/torque3d.cmake index 7e853ed7c..14d4873b1 100644 --- a/Tools/CMake/torque3d.cmake +++ b/Tools/CMake/torque3d.cmake @@ -200,6 +200,7 @@ if( NOT TORQUE_DEDICATED ) endif() addPath("${srcDir}/platform/test") addPath("${srcDir}/platform/threads") +addPath("${srcDir}/platform/threads/test") addPath("${srcDir}/platform/async") addPath("${srcDir}/platform/async/test") addPath("${srcDir}/platform/input")