From 7b2cb8d04f72fc0083eb039057ffd0e90ef06052 Mon Sep 17 00:00:00 2001 From: Daniel Buckmaster Date: Sun, 5 Jul 2015 12:40:50 +1000 Subject: [PATCH 1/5] Add a method to see whether a WorkItem has executed yet. --- Engine/source/platform/threads/threadPool.cpp | 1 + Engine/source/platform/threads/threadPool.h | 12 +++++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/Engine/source/platform/threads/threadPool.cpp b/Engine/source/platform/threads/threadPool.cpp index 5b96b495b..15cfcd9e0 100644 --- a/Engine/source/platform/threads/threadPool.cpp +++ b/Engine/source/platform/threads/threadPool.cpp @@ -120,6 +120,7 @@ void ThreadPool::Context::updateAccumulatedPriorityBiases() void ThreadPool::WorkItem::process() { execute(); + mExecuted = true; } //-------------------------------------------------------------------------- diff --git a/Engine/source/platform/threads/threadPool.h b/Engine/source/platform/threads/threadPool.h index 2f18a5bee..b77244034 100644 --- a/Engine/source/platform/threads/threadPool.h +++ b/Engine/source/platform/threads/threadPool.h @@ -194,6 +194,9 @@ class ThreadPool /// This is the primary function to implement by subclasses. virtual void execute() = 0; + /// This flag is set after the execute() method has completed. + bool mExecuted; + public: /// Construct a new work item. @@ -201,7 +204,8 @@ class ThreadPool /// @param context The work context in which the item should be placed. /// If NULL, the root context will be used. WorkItem( Context* context = 0 ) - : mContext( context ? context : Context::ROOT_CONTEXT() ) + : mContext( context ? context : Context::ROOT_CONTEXT() ), + mExecuted( false ) { } @@ -229,6 +233,12 @@ class ThreadPool /// Return the item's base priority value. /// @return item priority; defaults to 1.0. virtual F32 getPriority(); + + /// Has this work item been executed already? + bool hasExecuted() const + { + return mExecuted; + } }; typedef ThreadSafeRef< WorkItem > WorkItemPtr; From 0995520d6f08174380c8a58bf1794c7c4347338c Mon Sep 17 00:00:00 2001 From: Daniel Buckmaster Date: Sun, 5 Jul 2015 12:59:16 +1000 Subject: [PATCH 2/5] Add a method to wait for all pending items in a ThreadPool. --- Engine/source/platform/threads/threadPool.cpp | 27 ++++++++++++++++++- Engine/source/platform/threads/threadPool.h | 15 +++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/Engine/source/platform/threads/threadPool.cpp b/Engine/source/platform/threads/threadPool.cpp index 15cfcd9e0..81e40894e 100644 --- a/Engine/source/platform/threads/threadPool.cpp +++ b/Engine/source/platform/threads/threadPool.cpp @@ -282,6 +282,8 @@ void ThreadPool::WorkerThread::run( void* arg ) Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' takes item '0x%x'", getId(), *workItem ); #endif workItem->process(); + + dFetchAndAdd( mPool->mNumPendingItems, ( U32 ) -1 ); } else waitForSignal = true; @@ -319,6 +321,7 @@ ThreadPool::ThreadPool( const char* name, U32 numThreads ) : mName( name ), mNumThreads( numThreads ), mNumThreadsAwake( 0 ), + mNumPendingItems( 0 ), mThreads( 0 ), mSemaphore( 0 ) { @@ -410,7 +413,7 @@ void ThreadPool::queueWorkItem( WorkItem* item ) else { // Put the item in the queue. - + dFetchAndAdd( mNumPendingItems, 1 ); mWorkItemQueue.insert( item->getPriority(), item ); mSemaphore.release(); @@ -441,6 +444,28 @@ void ThreadPool::flushWorkItems( S32 timeOut ) } } +void ThreadPool::waitForAllItems( S32 timeOut ) +{ + AssertFatal( mNumPendingItems, "ThreadPool::waitForAllItems() - no items pending" ); + + U32 endTime = 0; + if( timeOut != -1 ) + endTime = Platform::getRealMilliseconds() + timeOut; + + // Spinlock until there are no items that have not been processed. + + while( dAtomicRead( mNumPendingItems ) ) + { + Platform::sleep( 25 ); + + // Stop if we have exceeded our processing time budget. + + if( timeOut != -1 + && Platform::getRealMilliseconds() >= endTime ) + break; + } +} + //-------------------------------------------------------------------------- void ThreadPool::queueWorkItemOnMainThread( WorkItem* item ) diff --git a/Engine/source/platform/threads/threadPool.h b/Engine/source/platform/threads/threadPool.h index b77244034..5fb8cc60b 100644 --- a/Engine/source/platform/threads/threadPool.h +++ b/Engine/source/platform/threads/threadPool.h @@ -264,6 +264,9 @@ class ThreadPool /// Number of worker threads guaranteed to be non-blocking. U32 mNumThreadsReady; + + /// Number of work items that have not yet completed execution. + U32 mNumPendingItems; /// Semaphore used to wake up threads, if necessary. Semaphore mSemaphore; @@ -316,6 +319,18 @@ class ThreadPool /// the queue to flush out. -1 = infinite. void flushWorkItems( S32 timeOut = -1 ); + /// If you're using a non-global thread pool to parallelise some work, you + /// may want to block until all the parallel work is complete. As with + /// flushWorkItems, this method may block indefinitely if new items keep + /// getting added to the pool before old ones finish. + /// + /// This method will not wait for items queued on the main thread using + /// queueWorkItemOnMainThread! + /// + /// @param timeOut Soft limit on the number of milliseconds to wait for + /// all items to complete. -1 = infinite. + void waitForAllItems( S32 timeOut = -1 ); + /// Add a work item to the main thread's work queue. /// /// The main thread's work queue will be processed each frame using From b491d7bbc01aa0771da8804d7c7f29ab6b12e9d5 Mon Sep 17 00:00:00 2001 From: Daniel Buckmaster Date: Sun, 5 Jul 2015 12:41:25 +1000 Subject: [PATCH 3/5] Fix ThreadPool tests to account for asynchronicity. --- .../platform/threads/test/threadPoolTest.cpp | 50 ++++++++++++++++++- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/Engine/source/platform/threads/test/threadPoolTest.cpp b/Engine/source/platform/threads/test/threadPoolTest.cpp index cfb5c5dd4..c5b035bfb 100644 --- a/Engine/source/platform/threads/test/threadPoolTest.cpp +++ b/Engine/source/platform/threads/test/threadPoolTest.cpp @@ -44,6 +44,20 @@ public: mResults[mIndex] = mIndex; } }; + + // A worker that delays for some time. We'll use this to test the ThreadPool's + // synchronous and asynchronous operations. + struct DelayItem : public ThreadPool::WorkItem + { + U32 ms; + DelayItem(U32 _ms) : ms(_ms) {} + + protected: + virtual void execute() + { + Platform::sleep(ms); + } + }; }; TEST_FIX(ThreadPool, BasicAPI) @@ -63,8 +77,7 @@ TEST_FIX(ThreadPool, BasicAPI) pool->queueWorkItem(item); } - // Wait for all items to complete. - pool->flushWorkItems(); + pool->waitForAllItems(); // Verify. for (U32 i = 0; i < numItems; i++) @@ -72,4 +85,37 @@ TEST_FIX(ThreadPool, BasicAPI) results.clear(); } +TEST_FIX(ThreadPool, Asynchronous) +{ + const U32 delay = 500; //ms + + // Launch a single delaying work item. + ThreadPool* pool = &ThreadPool::GLOBAL(); + ThreadSafeRef item(new DelayItem(delay)); + pool->queueWorkItem(item); + + // The thread should not yet be finished. + EXPECT_EQ(false, item->hasExecuted()); + + // Wait til the item should have completed. + Platform::sleep(delay * 2); + + EXPECT_EQ(true, item->hasExecuted()); +} + +TEST_FIX(ThreadPool, Synchronous) +{ + const U32 delay = 500; //ms + + // Launch a single delaying work item. + ThreadPool* pool = &ThreadPool::GLOBAL(); + ThreadSafeRef item(new DelayItem(delay)); + pool->queueWorkItem(item); + + // Wait for the item to complete. + pool->waitForAllItems(); + + EXPECT_EQ(true, item->hasExecuted()); +} + #endif \ No newline at end of file From e75a9fa0816c3c0f35b9acc5818d43577a89b5f0 Mon Sep 17 00:00:00 2001 From: Daniel Buckmaster Date: Sun, 5 Jul 2015 14:11:24 +1000 Subject: [PATCH 4/5] Don't assert; sometimes there aren't any jobs to wait for! --- Engine/source/platform/threads/threadPool.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/Engine/source/platform/threads/threadPool.cpp b/Engine/source/platform/threads/threadPool.cpp index 81e40894e..feaa52ae0 100644 --- a/Engine/source/platform/threads/threadPool.cpp +++ b/Engine/source/platform/threads/threadPool.cpp @@ -446,8 +446,6 @@ void ThreadPool::flushWorkItems( S32 timeOut ) void ThreadPool::waitForAllItems( S32 timeOut ) { - AssertFatal( mNumPendingItems, "ThreadPool::waitForAllItems() - no items pending" ); - U32 endTime = 0; if( timeOut != -1 ) endTime = Platform::getRealMilliseconds() + timeOut; From d68c9036bffa6e425747d6502ec3240857f76aa5 Mon Sep 17 00:00:00 2001 From: Daniel Buckmaster Date: Sun, 5 Jul 2015 14:11:58 +1000 Subject: [PATCH 5/5] Include thread tests in CMake. --- Tools/CMake/torque3d.cmake | 1 + 1 file changed, 1 insertion(+) diff --git a/Tools/CMake/torque3d.cmake b/Tools/CMake/torque3d.cmake index 1dce72d21..08d5fd561 100644 --- a/Tools/CMake/torque3d.cmake +++ b/Tools/CMake/torque3d.cmake @@ -200,6 +200,7 @@ if( NOT TORQUE_DEDICATED ) endif() addPath("${srcDir}/platform/test") addPath("${srcDir}/platform/threads") +addPath("${srcDir}/platform/threads/test") addPath("${srcDir}/platform/async") addPath("${srcDir}/platform/async/test") addPath("${srcDir}/platform/input")