Engine directory for ticket #1

This commit is contained in:
DavidWyand-GG 2012-09-19 11:15:01 -04:00
parent 352279af7a
commit 7dbfe6994d
3795 changed files with 1363358 additions and 0 deletions

View file

@ -0,0 +1,129 @@
//-----------------------------------------------------------------------------
// Copyright (c) 2012 GarageGames, LLC
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to
// deal in the Software without restriction, including without limitation the
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
// sell copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
// IN THE SOFTWARE.
//-----------------------------------------------------------------------------
#include "platform/types.h"
#include "platform/platformAssert.h"
#ifndef _PLATFORM_THREADS_MUTEX_H_
#define _PLATFORM_THREADS_MUTEX_H_
// Forward ref used by platform code
struct PlatformMutexData;
class Mutex
{
protected:
PlatformMutexData *mData;
public:
Mutex();
virtual ~Mutex();
virtual bool lock(bool block = true);
virtual void unlock();
// Old API so that we don't have to change a load of code
static void *createMutex()
{
Mutex *mutex = new Mutex;
return (void *)mutex;
}
static void destroyMutex(void *mutex)
{
Mutex *realMutex = reinterpret_cast<Mutex *>(mutex);
delete realMutex;
}
static bool lockMutex(void *mutex, bool block = true)
{
Mutex *realMutex = reinterpret_cast<Mutex *>(mutex);
return realMutex->lock(block);
}
static void unlockMutex(void *mutex)
{
Mutex *realMutex = reinterpret_cast<Mutex *>(mutex);
realMutex->unlock();
}
};
/// Helper for simplifying mutex locking code.
///
/// This class will automatically unlock a mutex that you've
/// locked through it, saving you from managing a lot of complex
/// exit cases. For instance:
///
/// @code
/// MutexHandle handle;
/// handle.lock(myMutex);
///
/// if(error1)
/// return; // Auto-unlocked by handle if we leave here - normally would
/// // leave the mutex locked, causing much pain later.
///
/// handle.unlock();
/// @endcode
class MutexHandle
{
private:
void *mMutexPtr;
public:
MutexHandle()
: mMutexPtr(NULL)
{
}
~MutexHandle()
{
if(mMutexPtr)
unlock();
}
bool lock(void *mutex, bool blocking=false)
{
AssertFatal(!mMutexPtr, "MutexHandle::lock - shouldn't be locking things twice!");
bool ret = Mutex::lockMutex(mutex, blocking);
if(ret)
{
// We succeeded, do book-keeping.
mMutexPtr = mutex;
}
return ret;
}
void unlock()
{
if(mMutexPtr)
{
Mutex::unlockMutex(mMutexPtr);
mMutexPtr = NULL;
}
}
};
#endif // _PLATFORM_THREADS_MUTEX_H_

View file

@ -0,0 +1,55 @@
//-----------------------------------------------------------------------------
// Copyright (c) 2012 GarageGames, LLC
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to
// deal in the Software without restriction, including without limitation the
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
// sell copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
// IN THE SOFTWARE.
//-----------------------------------------------------------------------------
#ifndef _PLATFORM_THREAD_SEMAPHORE_H_
#define _PLATFORM_THREAD_SEMAPHORE_H_
#ifndef _TORQUE_TYPES_H_
#include "platform/types.h"
#endif
// Forward ref used by platform code
class PlatformSemaphore;
class Semaphore
{
protected:
PlatformSemaphore *mData;
public:
/// Create a semaphore. initialCount defaults to 1.
Semaphore(S32 initialCount = 1);
/// Delete a semaphore, ignoring it's count.
~Semaphore();
/// Acquire the semaphore, decrementing its count.
/// if the initial count is less than 1, block until it goes above 1, then acquire.
/// Returns true if the semaphore was acquired, false if the semaphore could
/// not be acquired and block was false.
bool acquire(bool block = true, S32 timeoutMS = -1);
/// Release the semaphore, incrementing its count.
/// Never blocks.
void release();
};
#endif

View file

@ -0,0 +1,245 @@
//-----------------------------------------------------------------------------
// Copyright (c) 2012 GarageGames, LLC
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to
// deal in the Software without restriction, including without limitation the
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
// sell copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
// IN THE SOFTWARE.
//-----------------------------------------------------------------------------
#ifndef _PLATFORM_THREADS_THREAD_H_
#define _PLATFORM_THREADS_THREAD_H_
#ifndef _TORQUE_TYPES_H_
#include "platform/types.h"
#endif
#ifndef _TVECTOR_H_
#include "core/util/tVector.h"
#endif
#ifndef _PLATFORM_THREADS_MUTEX_H_
#include "platform/threads/mutex.h"
#endif
#ifndef _TSINGLETON_H_
#include "core/util/tSingleton.h"
#endif
// Forward ref used by platform code
class PlatformThreadData;
// Typedefs
typedef void (*ThreadRunFunction)(void *data);
class Thread
{
public:
typedef void Parent;
protected:
PlatformThreadData* mData;
/// Used to signal threads need to stop.
/// Threads set this flag to false in start()
U32 shouldStop;
/// Set the name of this thread for identification in debuggers.
/// Maybe a NOP on platforms that do not support this. Always a NOP
/// in non-debug builds.
void _setName( const char* name );
public:
/// If set, the thread will delete itself once it has finished running.
bool autoDelete;
/// Create a thread.
/// @param func The starting function for the thread.
/// @param arg Data to be passed to func, when the thread starts.
/// @param start_thread Supported for compatibility. Must be false. Starting threads from
/// within the constructor is not allowed anymore as the run() method is virtual.
Thread(ThreadRunFunction func = 0, void *arg = 0, bool start_thread = false, bool autodelete = false);
/// Destroy a thread.
/// The thread MUST be allowed to exit before it is destroyed.
virtual ~Thread();
/// Start a thread.
/// Sets shouldStop to false and calls run() in a new thread of execution.
void start( void* arg = 0 );
/// Ask a thread to stop running.
void stop()
{
shouldStop = true;
}
/// Block until the thread stops running.
/// @note Don't use this in combination with auto-deletion as otherwise the thread will kill
/// itself while still executing the join() method on the waiting thread.
bool join();
/// Threads may call checkForStop() periodically to check if they've been
/// asked to stop. As soon as checkForStop() returns true, the thread should
/// clean up and return.
bool checkForStop()
{
return shouldStop;
}
/// Run the Thread's entry point function.
/// Override this method in a subclass of Thread to create threaded code in
/// an object oriented way, and without passing a function ptr to Thread().
/// Also, you can call this method directly to execute the thread's
/// code in a non-threaded way.
virtual void run(void *arg = 0);
/// Returns true if the thread is running.
bool isAlive();
/// Returns the platform specific thread id for this thread.
U32 getId();
};
///
class ThreadManager
{
Vector<Thread*> threadPool;
Mutex poolLock;
struct MainThreadId
{
U32 mId;
MainThreadId()
{
mId = ThreadManager::getCurrentThreadId();
}
U32 get()
{
// Okay, this is a bit soso. The main thread ID may get queried during
// global ctor phase before MainThreadId's ctor ran. Since global
// ctors will/should all run on the main thread, we can sort of safely
// assume here that we can just query the current thread's ID.
if( !mId )
mId = ThreadManager::getCurrentThreadId();
return mId;
}
};
static MainThreadId smMainThreadId;
public:
ThreadManager()
{
VECTOR_SET_ASSOCIATION( threadPool );
}
/// Return true if the caller is running on the main thread.
static bool isMainThread();
/// Returns true if threadId is the same as the calling thread's id.
static bool isCurrentThread(U32 threadId);
/// Returns true if the 2 thread ids represent the same thread. Some thread
/// APIs return an opaque object as a thread id, so the == operator cannot
/// reliably compare thread ids.
// this comparator is needed by pthreads and ThreadManager.
static bool compare(U32 threadId_1, U32 threadId_2);
/// Returns the platform specific thread id of the calling thread. Some
/// platforms do not guarantee that this ID stays the same over the life of
/// the thread, so use ThreadManager::compare() to compare thread ids.
static U32 getCurrentThreadId();
/// Returns the platform specific thread id ot the main thread.
static U32 getMainThreadId() { return smMainThreadId.get(); }
/// Each thread should add itself to the thread pool the first time it runs.
static void addThread(Thread* thread)
{
ThreadManager &manager = *ManagedSingleton< ThreadManager >::instance();
manager.poolLock.lock();
Thread *alreadyAdded = getThreadById(thread->getId());
if(!alreadyAdded)
manager.threadPool.push_back(thread);
manager.poolLock.unlock();
}
static void removeThread(Thread* thread)
{
ThreadManager &manager = *ManagedSingleton< ThreadManager >::instance();
manager.poolLock.lock();
U32 threadID = thread->getId();
for(U32 i = 0;i < manager.threadPool.size();++i)
{
if( compare( manager.threadPool[i]->getId(), threadID ) )
{
manager.threadPool.erase(i);
break;
}
}
manager.poolLock.unlock();
}
/// Searches the pool of known threads for a thread whose id is equivalent to
/// the given threadid. Compares thread ids with ThreadManager::compare().
static Thread* getThreadById(U32 threadid)
{
AssertFatal(threadid != 0, "ThreadManager::getThreadById() Searching for a bad thread id.");
Thread* ret = NULL;
ThreadManager &manager = *ManagedSingleton< ThreadManager >::instance();
manager.poolLock.lock();
Vector<Thread*> &pool = manager.threadPool;
for( S32 i = pool.size() - 1; i >= 0; i--)
{
Thread* p = pool[i];
if(compare(p->getId(), threadid))
{
ret = p;
break;
}
}
manager.poolLock.unlock();
return ret;
}
static Thread* getCurrentThread()
{
return getThreadById(ThreadManager::getCurrentThreadId());
}
static const char* getSingletonName()
{
return "ThreadManager";
}
};
inline bool ThreadManager::isMainThread()
{
return compare( ThreadManager::getCurrentThreadId(), smMainThreadId.get() );
}
inline bool ThreadManager::isCurrentThread(U32 threadId)
{
U32 current = getCurrentThreadId();
return compare(current, threadId);
}
#endif // _PLATFORM_THREADS_THREAD_H_

View file

@ -0,0 +1,479 @@
//-----------------------------------------------------------------------------
// Copyright (c) 2012 GarageGames, LLC
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to
// deal in the Software without restriction, including without limitation the
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
// sell copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
// IN THE SOFTWARE.
//-----------------------------------------------------------------------------
#include "platform/threads/threadPool.h"
#include "platform/threads/thread.h"
#include "platform/platformCPUCount.h"
#include "core/strings/stringFunctions.h"
#include "core/util/tSingleton.h"
//#define DEBUG_SPEW
//=============================================================================
// ThreadPool::Context.
//=============================================================================
ThreadPool::Context ThreadPool::Context::smRootContext( "ROOT", NULL, 1.0 );
//--------------------------------------------------------------------------
ThreadPool::Context::Context( const char* name, ThreadPool::Context* parent, F32 priorityBias )
: mName( name ),
mParent( parent ),
mSibling( 0 ),
mChildren( 0 ),
mPriorityBias( priorityBias ),
mAccumulatedPriorityBias( 0.0 )
{
if( parent )
{
mSibling = mParent->mChildren;
mParent->mChildren = this;
}
}
//--------------------------------------------------------------------------
ThreadPool::Context::~Context()
{
if( mParent )
for( Context* context = mParent->mChildren, *prev = 0; context != 0; prev = context, context = context->mSibling )
if( context == this )
{
if( !prev )
mParent->mChildren = this->mSibling;
else
prev->mSibling = this->mSibling;
}
}
//--------------------------------------------------------------------------
ThreadPool::Context* ThreadPool::Context::getChild( const char* name )
{
for( Context* child = getChildren(); child != 0; child = child->getSibling() )
if( dStricmp( child->getName(), name ) == 0 )
return child;
return 0;
}
//--------------------------------------------------------------------------
F32 ThreadPool::Context::getAccumulatedPriorityBias()
{
if( !mAccumulatedPriorityBias )
updateAccumulatedPriorityBiases();
return mAccumulatedPriorityBias;
}
//--------------------------------------------------------------------------
void ThreadPool::Context::setPriorityBias( F32 value )
{
mPriorityBias = value;
mAccumulatedPriorityBias = 0.0;
}
//--------------------------------------------------------------------------
void ThreadPool::Context::updateAccumulatedPriorityBiases()
{
// Update our own priority bias.
mAccumulatedPriorityBias = mPriorityBias;
for( Context* context = getParent(); context != 0; context = context->getParent() )
mAccumulatedPriorityBias *= context->getPriorityBias();
// Update our children.
for( Context* child = getChildren(); child != 0; child = child->getSibling() )
child->updateAccumulatedPriorityBiases();
}
//=============================================================================
// ThreadPool::WorkItem.
//=============================================================================
//--------------------------------------------------------------------------
void ThreadPool::WorkItem::process()
{
execute();
}
//--------------------------------------------------------------------------
bool ThreadPool::WorkItem::isCancellationRequested()
{
return false;
}
//--------------------------------------------------------------------------
bool ThreadPool::WorkItem::cancellationPoint()
{
if( isCancellationRequested() )
{
onCancelled();
return true;
}
else
return false;
}
//--------------------------------------------------------------------------
F32 ThreadPool::WorkItem::getPriority()
{
return 1.0;
}
//=============================================================================
// ThreadPool::WorkItemWrapper.
//=============================================================================
/// Value wrapper for work items while placed on priority queue.
/// Conforms to interface dictated by ThreadSafePriorityQueueWithUpdate.
///
/// @see ThreadSafePriorityQueueWithUpdate
/// @see ThreadPool::WorkItem
///
struct ThreadPool::WorkItemWrapper : public ThreadSafeRef< WorkItem >
{
typedef ThreadSafeRef< WorkItem > Parent;
WorkItemWrapper() {}
WorkItemWrapper( WorkItem* item )
: Parent( item ) {}
bool isAlive();
F32 getPriority();
};
inline bool ThreadPool::WorkItemWrapper::isAlive()
{
WorkItem* item = ptr();
if( !item )
return false;
else if( item->isCancellationRequested() )
{
( *this ) = 0;
return false;
}
else
return true;
}
inline F32 ThreadPool::WorkItemWrapper::getPriority()
{
WorkItem* item = ptr();
AssertFatal( item != 0, "ThreadPool::WorkItemWrapper::getPriority - called on dead item" );
// Compute a scaled priority value based on the item's context.
return ( item->getContext()->getAccumulatedPriorityBias() * item->getPriority() );
}
//=============================================================================
// ThreadPool::WorkerThread.
//=============================================================================
///
///
struct ThreadPool::WorkerThread : public Thread
{
WorkerThread( ThreadPool* pool, U32 index );
WorkerThread* getNext();
virtual void run( void* arg = 0 );
private:
U32 mIndex;
ThreadPool* mPool;
WorkerThread* mNext;
};
ThreadPool::WorkerThread::WorkerThread( ThreadPool* pool, U32 index )
: mPool( pool ),
mIndex( index )
{
// Link us to the pool's thread list.
mNext = pool->mThreads;
pool->mThreads = this;
}
inline ThreadPool::WorkerThread* ThreadPool::WorkerThread::getNext()
{
return mNext;
}
void ThreadPool::WorkerThread::run( void* arg )
{
#ifdef TORQUE_DEBUG
{
// Set the thread's name for debugging.
char buffer[ 2048 ];
dSprintf( buffer, sizeof( buffer ), "ThreadPool(%s) WorkerThread %i", mPool->mName.c_str(), mIndex );
_setName( buffer );
}
#endif
#if defined(TORQUE_OS_XENON)
// On Xbox 360 you must explicitly assign software threads to hardware threads.
// This will distribute job threads across the secondary CPUs leaving both
// primary CPU cores available to the "main" thread. This will help prevent
// more L2 thrashing of the main thread/core.
static U32 sCoreAssignment = 2;
XSetThreadProcessor( GetCurrentThread(), sCoreAssignment );
sCoreAssignment = sCoreAssignment < 6 ? sCoreAssignment + 1 : 2;
#endif
while( 1 )
{
if( checkForStop() )
{
#ifdef DEBUG_SPEW
Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' exits", getId() );
#endif
dFetchAndAdd( mPool->mNumThreads, ( U32 ) -1 );
return;
}
// Mark us as potentially blocking.
dFetchAndAdd( mPool->mNumThreadsReady, ( U32 ) -1 );
bool waitForSignal = false;
{
// Try to take an item from the queue. Do
// this in a separate block, so we'll be
// releasing the item after we have finished.
WorkItemWrapper workItem;
if( mPool->mWorkItemQueue.takeNext( workItem ) )
{
// Mark us as non-blocking as this loop definitely
// won't wait on the semaphore.
dFetchAndAdd( mPool->mNumThreadsReady, 1 );
#ifdef DEBUG_SPEW
Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' takes item '0x%x'", getId(), *workItem );
#endif
workItem->process();
}
else
waitForSignal = true;
}
if( waitForSignal )
{
dFetchAndAdd( mPool->mNumThreadsAwake, ( U32 ) -1 );
#ifdef DEBUG_SPEW
Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' going to sleep", getId() );
#endif
mPool->mSemaphore.acquire();
#ifdef DEBUG_SPEW
Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' waking up", getId() );
#endif
dFetchAndAdd( mPool->mNumThreadsAwake, 1 );
dFetchAndAdd( mPool->mNumThreadsReady, 1 );
}
}
}
//=============================================================================
// ThreadPool.
//=============================================================================
bool ThreadPool::smForceAllMainThread;
U32 ThreadPool::smMainThreadTimeMS;
ThreadPool::QueueType ThreadPool::smMainThreadQueue;
//--------------------------------------------------------------------------
ThreadPool::ThreadPool( const char* name, U32 numThreads )
: mName( name ),
mNumThreads( numThreads ),
mNumThreadsAwake( 0 ),
mThreads( 0 ),
mSemaphore( 0 )
{
// Number of worker threads to create.
if( !mNumThreads )
{
// Use platformCPUInfo directly as in the case of the global pool,
// Platform::SystemInfo will not yet have been initialized.
U32 numLogical;
U32 numPhysical;
U32 numCores;
CPUInfo::CPUCount( numLogical, numCores, numPhysical );
const U32 baseCount = getMax( numLogical, numCores );
if( baseCount )
mNumThreads = baseCount;
else
mNumThreads = 2;
}
#ifdef DEBUG_SPEW
Platform::outputDebugString( "[ThreadPool] spawning %i threads", mNumThreads );
#endif
// Create the threads.
mNumThreadsAwake = mNumThreads;
mNumThreadsReady = mNumThreads;
for( U32 i = 0; i < mNumThreads; i ++ )
{
WorkerThread* thread = new WorkerThread( this, i );
thread->start();
}
}
//--------------------------------------------------------------------------
ThreadPool::~ThreadPool()
{
shutdown();
}
//--------------------------------------------------------------------------
void ThreadPool::shutdown()
{
const U32 numThreads = mNumThreads;
// Tell our worker threads to stop.
for( WorkerThread* thread = mThreads; thread != 0; thread = thread->getNext() )
thread->stop();
// Release the semaphore as many times as there are threads.
// Doing this separately guarantees we're not waking a thread
// that hasn't been set its stop flag yet.
for( U32 n = 0; n < numThreads; ++ n )
mSemaphore.release();
// Delete each worker thread. Wait until death as we're prone to
// running into issues with decomposing work item lists otherwise.
for( WorkerThread* thread = mThreads; thread != 0; )
{
WorkerThread* next = thread->getNext();
thread->join();
delete thread;
thread = next;
}
mThreads = NULL;
mNumThreads = 0;
}
//--------------------------------------------------------------------------
void ThreadPool::queueWorkItem( WorkItem* item )
{
bool executeRightAway = ( getForceAllMainThread() );
#ifdef DEBUG_SPEW
Platform::outputDebugString( "[ThreadPool] %s work item '0x%x'",
( executeRightAway ? "executing" : "queuing" ),
item );
#endif
if( executeRightAway )
item->process();
else
{
// Put the item in the queue.
mWorkItemQueue.insert( item->getPriority(), item );
// Wake up some thread, if we need to.
// Use the ready count here as the wake count does
// not correctly protect the critical section in the
// thread's run function. This may lead us to release
// the semaphore more often than necessary, but it avoids
// a race condition.
if( !dCompareAndSwap( mNumThreadsReady, mNumThreads, mNumThreads ) )
mSemaphore.release();
}
}
//--------------------------------------------------------------------------
void ThreadPool::flushWorkItems( S32 timeOut )
{
AssertFatal( mNumThreads, "ThreadPool::flushWorkItems() - no worker threads in pool" );
U32 endTime = 0;
if( timeOut != -1 )
endTime = Platform::getRealMilliseconds() + timeOut;
// Spinlock until the queue is empty.
while( !mWorkItemQueue.isEmpty() )
{
Platform::sleep( 25 );
// Stop if we have exceeded our processing time budget.
if( timeOut != -1
&& Platform::getRealMilliseconds() >= endTime )
break;
}
}
//--------------------------------------------------------------------------
void ThreadPool::queueWorkItemOnMainThread( WorkItem* item )
{
smMainThreadQueue.insert( item->getPriority(), item );
}
//--------------------------------------------------------------------------
void ThreadPool::processMainThreadWorkItems()
{
AssertFatal( ThreadManager::isMainThread(),
"ThreadPool::processMainThreadWorkItems - this function must only be called on the main thread" );
U32 timeLimit = ( Platform::getRealMilliseconds() + getMainThreadThresholdTimeMS() );
do
{
WorkItemWrapper item;
if( !smMainThreadQueue.takeNext( item ) )
break;
else
item->process();
}
while( Platform::getRealMilliseconds() < timeLimit );
}

View file

@ -0,0 +1,398 @@
//-----------------------------------------------------------------------------
// Copyright (c) 2012 GarageGames, LLC
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to
// deal in the Software without restriction, including without limitation the
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
// sell copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
// IN THE SOFTWARE.
//-----------------------------------------------------------------------------
#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_
#ifndef _THREADSAFEREFCOUNT_H_
#include "platform/threads/threadSafeRefCount.h"
#endif
#ifndef _THREADSAFEPRIORITYQUEUE_H_
#include "platform/threads/threadSafePriorityQueue.h"
#endif
#ifndef _PLATFORM_THREAD_SEMAPHORE_H_
#include "platform/threads/semaphore.h"
#endif
#ifndef _TSINGLETON_H_
#include "core/util/tSingleton.h"
#endif
/// @file
/// Interface for an asynchronous work manager.
/// Asynchronous work manager.
///
/// Thread pooling allows to submit work items for background execution.
/// Each work item will be placed on a queue and, based on a total priority
/// ordering, executed when it has the highest priority and a worker thread
/// becomes available.
///
/// @note The global pool maintains the invariant that only the main thread
/// may submit items in order to be able to flush the item queue reliably
/// from the main thread itself. If other threads were issuing items to
/// the queue, the queue may never empty out and the main thread will
/// deadlock.
///
/// Flushing is the simplest method to guarantee that no asynchronous
/// operation is pending in a specific case (deletion of the target object
/// being the most common case). However, when possible, avoid this
/// situation and design your work items to operate independently,
/// e.g. by having only a single point of access to data that may have
/// disappeared in the meantime and putting a check around that single
/// access so that the item will silently die when its target object has
/// disappeared.
///
/// The cleanest safe solution to this is to create a separate concurrently
/// reference-counted structure that holds all interfacing state and
/// functionality shared between a work item and its issueing code. This way
/// the host object can safely disappear with the interfacing structure
/// automatically being released once the last concurrent work item has been
/// processed or discarded.
///
class ThreadPool
{
public:
/// A ThreadPool context defines a logical context in which WorkItems are
/// being executed. Their primary use is for biasing priorities of
/// WorkItems.
///
/// Contexts are arranged in a tree hierarchy. Each parent node's priority
/// bias scales all the priority biases underneath it.
///
/// Note that instances of this class are meant to be instantiated
/// globally only.
///
class Context
{
protected:
/// Superordinate context; scales this context's priority bias.
Context* mParent;
/// First child.
Context* mChildren;
/// Next sibling in child chain.
Context* mSibling;
/// Name of this context. Should be unique in parent namespace.
const char* mName;
/// Priority scale factor of this context.
F32 mPriorityBias;
/// Accumulated scale factor.
F32 mAccumulatedPriorityBias;
/// The root context; does not modify priorities. All contexts should be direct or indirect children of this one.
static Context smRootContext;
/// Recursively update cached accumulated priority biases.
void updateAccumulatedPriorityBiases();
public:
Context( const char* name, Context* parent, F32 priorityBias );
~Context();
/// Return the name of the worker threading context.
const char* getName() const
{
return mName;
}
/// Return the context's own work item priority bias.
F32 getPriorityBias() const
{
return mPriorityBias;
}
/// Return the superordinate node to the current context.
Context* getParent() const
{
return mParent;
}
/// Return the next sibling to the current context.
Context* getSibling() const
{
return mSibling;
}
/// Return the first child context.
Context* getChildren() const
{
return mChildren;
}
/// Return the root context.
static Context* ROOT_CONTEXT()
{
return &smRootContext;
}
///
F32 getAccumulatedPriorityBias();
///
Context* getChild( const char* name );
///
void setPriorityBias( F32 value );
};
/// An action to execute on a worker thread from the pool.
///
/// Work items are concurrently reference-counted and will be
/// automatically released once the last reference disappears.
///
class WorkItem : public ThreadSafeRefCount< WorkItem >
{
public:
typedef ThreadSafeRefCount< WorkItem > Parent;
protected:
/// The work context of this item.
Context* mContext;
/// Mark a point in a work item's execution where the item can
/// be safely cancelled.
///
/// This method should be called by subclasses' execute() methods
/// whenever an item can be safely cancelled. When it returns true,
/// the work item should exit from its execute() method.
bool cancellationPoint();
/// Called when the item has been cancelled.
virtual void onCancelled() {}
/// Execute the actions associated with this work item.
/// This is the primary function to implement by subclasses.
virtual void execute() = 0;
public:
/// Construct a new work item.
///
/// @param context The work context in which the item should be placed.
/// If NULL, the root context will be used.
WorkItem( Context* context = 0 )
: mContext( context ? context : Context::ROOT_CONTEXT() )
{
}
virtual ~WorkItem() {}
/// Return the work context associated with the work item.
inline Context* getContext() const
{
return mContext;
}
/// Process the work item.
void process();
/// Return true if the work item should be cancelled.
///
/// This method can be overridden by subclasses. It's value will be
/// checked each time cancellationPoint() is called. When it returns
/// true, the item's process() method will exit automatically.
///
/// @return true, if item should be cancelled; default is false.
/// @see ThreadPool::WorkItem::cancellationPoint
virtual bool isCancellationRequested();
/// Return the item's base priority value.
/// @return item priority; defaults to 1.0.
virtual F32 getPriority();
};
typedef ThreadSafeRef< WorkItem > WorkItemPtr;
struct GlobalThreadPool;
protected:
struct WorkItemWrapper;
struct WorkerThread;
friend struct WorkerThread; // mSemaphore, mNumThreadsAwake, mThreads
typedef ThreadSafePriorityQueueWithUpdate< WorkItemWrapper, F32 > QueueType;
/// Name of this pool. Mainly for debugging. Used to name worker threads.
String mName;
/// Number of worker threads spawned by the pool.
U32 mNumThreads;
/// Number of worker threads in non-sleeping state.
U32 mNumThreadsAwake;
/// Number of worker threads guaranteed to be non-blocking.
U32 mNumThreadsReady;
/// Semaphore used to wake up threads, if necessary.
Semaphore mSemaphore;
/// Threaded priority queue for concurrent access by worker threads.
QueueType mWorkItemQueue;
/// List of worker threads.
WorkerThread* mThreads;
/// Force all work items to execute on main thread;
/// turns this into a single-threaded system.
/// Primarily useful to find whether malfunctions are caused
/// by parallel execution or not.
static bool smForceAllMainThread;
///
static U32 smMainThreadTimeMS;
/// Work queue for main thread; can be used to ping back work items to
/// main thread that need processing that can only happen on main thread.
static QueueType smMainThreadQueue;
public:
/// Create a new thread pool with the given number of worker threads.
///
/// If numThreads is zero (the default), the number of threads created
/// will be based on the number of CPU cores available.
///
/// @param numThreads Number of threads to create or zero for default.
ThreadPool( const char* name, U32 numThreads = 0 );
~ThreadPool();
/// Manually shutdown threads outside of static destructors.
void shutdown();
///
void queueWorkItem( WorkItem* item );
///
/// <em>For the global pool, it is very important to only ever call
/// this function on the main thread and to let work items only ever
/// come from the main thread. Otherwise this function has the potential
/// of dead-locking as new work items may constantly be fed to the queue
/// without it ever getting empty.</em>
///
/// @param timeOut Soft limit on the number of milliseconds to wait for
/// the queue to flush out. -1 = infinite.
void flushWorkItems( S32 timeOut = -1 );
/// Add a work item to the main thread's work queue.
///
/// The main thread's work queue will be processed each frame using
/// a set timeout to limit the work being done. Nonetheless, work
/// items will not be suspended in-midst of processing, so make sure
/// that whatever work you issue to the main thread is light work
/// or you may see short hangs in gameplay.
///
/// To reiterate this: any code executed through this interface directly
/// adds to frame processing time on the main thread.
///
/// This method *may* (and is meant to) be called from threads
/// other than the main thread.
static void queueWorkItemOnMainThread( WorkItem* item );
/// Process work items waiting on the main thread's work queue.
///
/// There is a soft limit imposed on the time this method is allowed
/// to run so as to balance frame-to-frame load. However, work
/// items, once their processing is initiated, will not be suspended
/// and will run for as long as they take to complete, so make sure
/// individual items perform as little work as necessary.
///
/// @see ThreadPool::getMainThreadThesholdTimeMS
static void processMainThreadWorkItems();
/// Return the interval in which item priorities are updated on the queue.
/// @return update interval in milliseconds.
U32 getQueueUpdateInterval() const
{
return mWorkItemQueue.getUpdateInterval();
}
/// Return the priority increment applied to work items on each passing of the update interval.
F32 getQueueTimeBasedPriorityBoost() const
{
return mWorkItemQueue.getTimeBasedPriorityBoost();
}
/// Set the update interval of the work item queue to the given value.
/// @param milliSeconds Time between updates in milliseconds.
void setQueueUpdateInterval( U32 milliSeconds )
{
mWorkItemQueue.setUpdateInterval( milliSeconds );
}
/// Set the priority increment applied to work items on each update interval.
/// @param value Priority increment. Set to zero to deactivate.
void setQueueTimeBasedPriorityBoost( F32 value )
{
mWorkItemQueue.setTimeBasedPriorityBoost( value );
}
///
static U32& getMainThreadThresholdTimeMS()
{
return smMainThreadTimeMS;
}
///
static bool& getForceAllMainThread()
{
return smForceAllMainThread;
}
/// Return the global thread pool singleton.
static ThreadPool& GLOBAL();
};
typedef ThreadPool::Context ThreadContext;
typedef ThreadPool::WorkItem ThreadWorkItem;
struct ThreadPool::GlobalThreadPool : public ThreadPool, public ManagedSingleton< GlobalThreadPool >
{
typedef ThreadPool Parent;
GlobalThreadPool()
: Parent( "GLOBAL" ) {}
// For ManagedSingleton.
static const char* getSingletonName() { return "GlobalThreadPool"; }
};
inline ThreadPool& ThreadPool::GLOBAL()
{
return *( GlobalThreadPool::instance() );
}
#endif // !_THREADPOOL_H_

View file

@ -0,0 +1,357 @@
//-----------------------------------------------------------------------------
// Copyright (c) 2012 GarageGames, LLC
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to
// deal in the Software without restriction, including without limitation the
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
// sell copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
// IN THE SOFTWARE.
//-----------------------------------------------------------------------------
#ifndef _THREADPOOLASYNCIO_H_
#define _THREADPOOLASYNCIO_H_
#ifndef _THREADPOOL_H_
# include "platform/threads/threadPool.h"
#endif
#ifndef _RAWDATA_H_
# include "core/util/rawData.h"
#endif
#ifndef _TSTREAM_H_
# include "core/stream/tStream.h"
#endif
//RDTODO: I/O error handling
/// @file
/// Thread pool work items for asynchronous stream I/O.
/// Through the use of stream filters, this can be basically used for any
/// type of asynchronous stream processing.
//--------------------------------------------------------------------------
// AsyncIOItem.
//--------------------------------------------------------------------------
/// Abstract superclass of async I/O work items.
///
/// Supports both offset-based stream I/O as well as I/O on streams with
/// implicit positions. Note that if you use the latter type, make sure
/// that no other thread is messing with the stream at the same time or
/// chaos will ensue.
///
/// @param T Type of elements being streamed.
template< typename T, class Stream >
class AsyncIOItem : public ThreadPool::WorkItem
{
public:
typedef WorkItem Parent;
typedef T ValueType;
typedef RawDataT< ValueType > BufferType;
typedef U32 OffsetType;
typedef Stream StreamType;
protected:
/// Buffer keeping/receiving the data elements.
BufferType mBuffer;
/// The stream to read from/write to.
StreamType* mStream;
/// Number of elements to read from/write to the stream.
U32 mNumElements;
/// Offset in "mBuffer" from where to read/where to start writing to.
U32 mOffsetInBuffer;
/// Offset in stream from where to read/where to write to.
/// @note This is only meaningful if the stream is an offset I/O
/// stream. For a stream that is can do both types of I/O,
/// explicit offsets are preferred and this value is used.
OffsetType mOffsetInStream;
///
ValueType* getBufferPtr()
{
return &getBuffer().data[ getOffsetInBuffer() ];
}
public:
///
/// If the stream uses implicit positioning, then the supplied "offsetInStream"
/// is meaningless and ignored.
AsyncIOItem( StreamType* stream, U32 numElements, OffsetType offsetInStream,
ThreadContext* context = 0 )
: Parent( context ),
mStream( stream ),
mNumElements( numElements ),
mOffsetInStream( offsetInStream ),
mOffsetInBuffer( 0 ) {}
/// Construct a read item on "stream" that stores data into the given "buffer".
///
AsyncIOItem( StreamType* stream, BufferType& buffer, U32 offsetInBuffer,
U32 numElements, OffsetType offsetInStream, bool takeOwnershipOfBuffer = true,
ThreadContext* context = 0 )
: Parent( context ),
mStream( stream ),
mBuffer( buffer ),
mNumElements( numElements ),
mOffsetInStream( offsetInStream ),
mOffsetInBuffer( offsetInBuffer )
{
if( takeOwnershipOfBuffer )
mBuffer.ownMemory = true;
}
/// Return the stream being written to/read from.
StreamType* getStream()
{
return mStream;
}
/// Return the data buffer being written to/read from.
/// @note This may not yet have been allocated.
BufferType& getBuffer()
{
return mBuffer;
}
/// Return the number of elements involved in the transfer.
U32 getNumElements()
{
return mNumElements;
}
/// Return the position in the data buffer at which to start the transfer.
U32 getOffsetInBuffer()
{
return mOffsetInBuffer;
}
/// Return the position in the stream at which to start the transfer.
/// @note Only meaningful for streams that support offset I/O.
OffsetType getOffsetInStream()
{
return mOffsetInStream;
}
};
//--------------------------------------------------------------------------
// AsyncReadItem.
//--------------------------------------------------------------------------
//RDTODO: error handling
/// Work item to asynchronously read from a stream.
///
/// The given stream type may implement any of the input stream
/// interfaces. Preference is given to IAsyncInputStream, then to
/// IOffsetInputStream, and only if none of these are implemented
/// IInputStream is used.
///
/// For IAsyncInputStreams, the async read operation is issued immediately
/// on the constructing thread and then picked up on the worker thread.
/// This ensures optimal use of concurrency.
template< typename T, class Stream = IOffsetInputStream< T > >
class AsyncReadItem : public AsyncIOItem< T, Stream >
{
public:
typedef AsyncIOItem< T, Stream > Parent;
typedef typename Parent::StreamType StreamType;
typedef typename Parent::OffsetType OffsetType;
typedef typename Parent::BufferType BufferType;
typedef typename Parent::ValueType ValueType;
/// Construct a read item that reads "numElements" at "offsetInStream"
/// from "stream".
///
/// Since with this constructor no data buffer is supplied, it will be
/// dynamically allocated by the read() method. Note that this only makes
/// sense if this class is subclassed and processing is done on the buffer
/// after it has been read.
///
/// @param stream The stream to read from.
/// @param numElement The number of elements to read from the stream.
/// @param offsetInStream The offset at which to read from the stream;
/// ignored if the stream uses implicit positioning
/// @param context The tread pool context to place the item into.
AsyncReadItem( StreamType* stream, U32 numElements, OffsetType offsetInStream,
ThreadContext* context = 0 )
: Parent( stream, numElements, offsetInStream, context )
{
_prep();
}
AsyncReadItem( StreamType* stream, U32 numElements, OffsetType offsetInStream,
BufferType& buffer, bool takeOwnershipOfBuffer = false,
U32 offsetInBuffer = 0, ThreadContext* context = 0 )
: Parent( stream, buffer, offsetInBuffer, numElements, offsetInStream, takeOwnershipOfBuffer, context )
{
_prep();
}
/// @return The number of elements actually read from the stream.
U32 getNumElementsRead()
{
return mNumElementsRead;
}
protected:
/// Handle of asynchronous stream read, if we are using an async interface.
void* mAsyncHandle;
/// After the read operation has completed, this holds the number of
/// elements actually read from the stream.
U32 mNumElementsRead;
virtual void execute();
void _allocBuffer()
{
if( !this->getBuffer().data )
this->getBuffer().alloc( this->getNumElements() );
}
void _prep()
{
IAsyncInputStream< T >* s = dynamic_cast< IAsyncInputStream< T >* >( this->getStream() );
if( s )
{
_allocBuffer();
mAsyncHandle = s->issueReadAt( this->getOffsetInStream(), this->getBufferPtr(), this->getNumElements() );
}
}
// Helper functions to differentiate between stream types.
void _read( IInputStream< T >* stream )
{
mNumElementsRead = stream->read( this->getBufferPtr(), this->getNumElements() );
}
void _read( IOffsetInputStream< T >* stream )
{
mNumElementsRead = stream->readAt( this->getOffsetInStream(), this->getBufferPtr(), this->getNumElements() );
}
void _read( IAsyncInputStream< T >* stream )
{
stream->tryCompleteReadAt( mAsyncHandle, mNumElementsRead, true );
}
};
template< typename T, class Stream >
void AsyncReadItem< T, Stream >::execute()
{
_allocBuffer();
// Read the data. Do a dynamic cast for any of the
// interfaces we prefer.
if( this->cancellationPoint() ) return;
StreamType* stream = this->getStream();
if( dynamic_cast< IAsyncInputStream< T >* >( stream ) )
_read( ( IAsyncInputStream< T >* ) stream );
else if( dynamic_cast< IOffsetInputStream< T >* >( stream ) )
_read( ( IOffsetInputStream< T >* ) stream );
else
_read( stream );
}
//--------------------------------------------------------------------------
// AsyncWriteItem.
//--------------------------------------------------------------------------
/// Work item for writing to an output stream.
///
/// The stream being written to may implement any of the given output stream
/// interfaces. Preference is given to IAsyncOutputStream, then to
/// IOffsetOutputStream, and only if none of these is implemented IOutputStream
/// is used.
///
/// A useful feature is to yield ownership of the data buffer to the
/// write item. This way, this can be pretty much used in a fire-and-forget
/// manner where after submission, no further synchronization happens
/// between the client and the work item.
///
/// @note Be aware that if writing to an output stream that has an implicit
/// position property, multiple concurrent writes will interfere with each other.
template< typename T, class Stream = IOffsetOutputStream< T > >
class AsyncWriteItem : public AsyncIOItem< T, Stream >
{
public:
typedef AsyncIOItem< T, Stream > Parent;
typedef typename Parent::StreamType StreamType;
typedef typename Parent::OffsetType OffsetType;
typedef typename Parent::BufferType BufferType;
typedef typename Parent::ValueType ValueType;
AsyncWriteItem( StreamType* stream, U32 numElements, OffsetType offsetInStream,
BufferType& buffer, bool takeOwnershipOfBuffer = true,
U32 offsetInBuffer = 0, ThreadContext* context = 0 )
: Parent( stream, buffer, offsetInBuffer, numElements, offsetInStream, takeOwnershipOfBuffer, context )
{
_prep( stream );
}
protected:
/// Handle of asynchronous write operation, if the stream implements IAsyncOutputStream.
void* mAsyncHandle;
virtual void execute();
void _prep( StreamType* stream )
{
IAsyncOutputStream< T >* s = dynamic_cast< IAsyncOutputStream< T >* >( stream );
if( s )
mAsyncHandle = s->issueWriteAt( this->getOffset(), this->getBufferPtr(), this->getNumElements() );
}
void _write( IOutputStream< T >* stream )
{
stream->write( this->getBufferPtr(), this->getNumElements() );
}
void _write( IOffsetOutputStream< T >* stream )
{
stream->writeAt( this->getOffsetInStream(), this->getBufferPtr(), this->getNumElements() );
}
void _write( IAsyncOutputStream< T >* stream )
{
stream->tryCompleteWriteAt( mAsyncHandle, true );
}
};
template< typename T, class Stream >
void AsyncWriteItem< T, Stream >::execute()
{
if( this->cancellationPoint() ) return;
StreamType* stream = this->getStream();
if( dynamic_cast< IAsyncOutputStream< T >* >( stream ) )
_write( ( IAsyncOutputStream< T >* ) stream );
if( dynamic_cast< IOffsetOutputStream< T >* >( stream ) )
_write( ( IOffsetOutputStream< T >* ) stream );
else
_write( stream );
}
#endif // _THREADPOOLASYNCIO_H_

View file

@ -0,0 +1,474 @@
//-----------------------------------------------------------------------------
// Copyright (c) 2012 GarageGames, LLC
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to
// deal in the Software without restriction, including without limitation the
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
// sell copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
// IN THE SOFTWARE.
//-----------------------------------------------------------------------------
#ifndef _THREADSAFEDEQUE_H_
#define _THREADSAFEDEQUE_H_
#ifndef _THREADSAFEFREELIST_H_
# include "platform/threads/threadSafeFreeList.h"
#endif
#include "platform/tmm_off.h"
/// Fast, lock-free double-ended queue for concurrent access.
///
/// @param T Type of list elements; must have default contructor.
template< typename T >
class ThreadSafeDeque
{
// Lock-free deques using just single-word atomic writes are
// very tricky as each pointer update must immediately result
// in a fully valid list state. The idea here is to maintain the
// deque's head and tail pointers unreliably but otherwise keep a
// regular double-linked list (since we never insert nodes in the
// middle, single-word writes are all we need).
//
// Deletions are a bit less straightforward and require the threads
// to work cooperatively. Since failure of a pointer update depends
// on the deletion state, the deletion flag has to be encoded into
// the link fields. However, as there are two link fields this creates
// two independent deletion flags for each single node, one on the
// next link and one on the prev link.
//
// This will not lead to a problem, though, as it only becomes relevant
// when there is only a single value in the list which, even if the
// respective node gets both deleted and appended/prepended a new node,
// will result in a valid list state.
public:
typedef T ValueType;
protected:
class Node;
class DeleteNode;
typedef ThreadSafeRef< Node > NodeRef;
/// List node.
class Node : public ThreadSafeFreeListNode< Node, DeleteNode >
{
public:
friend class DeleteNode; // mFreeList;
typedef typename ThreadSafeDeque< T >::ValueType ValueType;
/// Thread claim flag. This is to prevent two threads who concurrently
/// do a tryPopFront() and tryPopBack() respectively on a deque with just
/// a single node to both claim and return the same value (which would happen
/// without the flag as otherwise both threads would use two different
/// deletion bits for claiming the node).
U32 mIsClaimed;
/// Link to the freelist that the node has been
/// allocated from.
ThreadSafeFreeList< Node >& mFreeList;
/// Value contained in the node.
ValueType mValue;
/// Reference to next node and deletion bit.
NodeRef mNext;
/// Reference to previous node and deletion bit.
NodeRef mPrev;
/// Construct an unlinked node allocated from "freeList".
Node( ThreadSafeFreeList< Node >& freeList, const ValueType& value )
: mIsClaimed( 0 ), mFreeList( freeList ), mValue( value ) {}
};
class DeleteNode
{
public:
template< typename N >
static void destroy( N* ptr )
{
AssertFatal( ptr->mIsClaimed,
"ThreadSafeDeque::DeleteNode::destroy() - deleting unclaimed node" );
destructInPlace( ptr );
ptr->mFreeList.free( ptr );
}
};
#ifdef TORQUE_DEBUG
S32 mNumValues;
#endif
/// Reference to the head node.
NodeRef mHead;
///
NodeRef mTail;
/// Free list for list nodes.
ThreadSafeFreeList< Node > mFreeList;
/// @return the leftmost node in the list.
/// @note Updates the list state and may purge deleted nodes.
NodeRef getHead();
/// @return the rightmost node in the list.
/// @note Updates the list state and may purge deleted nodes.
NodeRef getTail();
public:
/// Construct an empty deque.
ThreadSafeDeque()
{
#ifdef TORQUE_DEBUG
mNumValues = 0;
#endif
}
~ThreadSafeDeque()
{
ValueType value;
while( tryPopFront( value ) );
AssertFatal( isEmpty(), "ThreadSafeDeque::~ThreadSafeDeque() - not empty" );
}
/// @return true if the queue is empty.
bool isEmpty()
{
return ( !getHead() && !getTail() );
}
/// Prepend the given value to the list.
void pushFront( const ValueType& value );
/// Append the given value to the list.
void pushBack( const ValueType& value );
/// Try to take the leftmost value from the deque.
/// Fails if the deque is empty at the time the method tries to
/// take a node from the list.
bool tryPopFront( ValueType& outValue );
/// Try to take the rightmost value from the deque.
/// Fails if the deque is empty at the time the method tries to
/// take a node from the list.
bool tryPopBack( ValueType& outValue );
void dumpDebug()
{
#ifdef TORQUE_DEBUG
Platform::outputDebugString( "[ThreadSafeDeque] numValues=%i", mNumValues );
mFreeList.dumpDebug();
#endif
}
};
// The getHead() and getTail() code here is pretty much brute-force in order
// to keep the complexities of synchronizing it bounded. We just let each
// thread work as if it is the only thread but require each one to start from
// scratch on each iteration.
template< typename T >
typename ThreadSafeDeque< T >::NodeRef ThreadSafeDeque< T >::getHead()
{
// Find leftmost node.
NodeRef result;
while( 1 )
{
// Iterate through to leftmost node.
{
NodeRef head = mHead;
while( head != NULL )
{
NodeRef prev = head->mPrev;
if( prev != NULL )
mHead.trySetFromTo( head, prev, NodeRef::TAG_Unset );
else
break;
head = mHead;
}
}
// Clear out dead nodes at front of list.
{
NodeRef head = mHead;
if( head && head->mPrev.isTagged() )
{
NodeRef next = head->mNext;
mHead.trySetFromTo( head, next, NodeRef::TAG_Unset );
mTail.trySetFromTo( head, next, NodeRef::TAG_Unset );
if( next != NULL )
next->mPrev.trySetFromTo( head, NULL );
head->mNext.trySetFromTo( next, NULL, NodeRef::TAG_Set );
continue; // Restart.
}
}
// Try head.
NodeRef head = mHead;
if( head != NULL && !head->mPrev.isTagged() )
{
result = head;
break;
}
// Try tail.
if( !head )
{
head = mTail;
if( !head )
break;
}
// Update head.
NodeRef prev = head->mPrev;
if( head->mPrev != NULL )
{
if( !mHead.trySetFromTo( head, prev, NodeRef::TAG_Unset ) )
mHead.trySetFromTo( NULL, prev );
}
else
mHead.trySetFromTo( NULL, head );
}
AssertFatal( !result.isTagged(), "ThreadSafeDeque::getHead() - head got tagged" );
return result;
}
template< typename T >
typename ThreadSafeDeque< T >::NodeRef ThreadSafeDeque< T >::getTail()
{
// Find rightmost node.
NodeRef result;
while( 1 )
{
// Iterate through to rightmost node.
{
NodeRef tail = mTail;
while( tail != NULL )
{
NodeRef next = tail->mNext;
if( next != NULL )
mTail.trySetFromTo( tail, next, NodeRef::TAG_Unset );
else
break;
tail = mTail;
}
}
// Clear out dead nodes at tail of list.
{
NodeRef tail = mTail;
if( tail != NULL && tail->mNext.isTagged() )
{
NodeRef prev = tail->mPrev;
mHead.trySetFromTo( tail, prev, NodeRef::TAG_Unset );
mTail.trySetFromTo( tail, prev, NodeRef::TAG_Unset );
if( prev != NULL )
prev->mNext.trySetFromTo( tail, NULL );
tail->mPrev.trySetFromTo( prev, NULL, NodeRef::TAG_Set );
continue; // Restart.
}
}
// Try tail.
NodeRef tail = mTail;
if( tail != NULL && !tail->mNext.isTagged() )
{
result = tail;
break;
}
// Try head.
if( !tail )
{
tail = mHead;
if( !tail )
break;
}
// Update tail.
NodeRef next = tail->mNext;
if( next != NULL )
{
if( !mTail.trySetFromTo( tail, next, NodeRef::TAG_Unset ) )
mTail.trySetFromTo( NULL, next );
}
else
mTail.trySetFromTo( NULL, tail );
}
AssertFatal( !result.isTagged(), "ThreadSafeDeque::getTail() - tail got tagged" );
return result;
}
template< typename T >
void ThreadSafeDeque< T >::pushFront( const ValueType& value )
{
NodeRef nextNode;
NodeRef newNode;
NodeRef::unsafeWrite( newNode, new ( mFreeList ) Node( mFreeList, value ) );
while( 1 )
{
nextNode = getHead();
if( !nextNode )
{
newNode->mNext = NULL;
if( mHead.trySetFromTo( NULL, newNode ) )
break;
}
else
{
newNode->mNext = nextNode;
if( nextNode->mPrev.trySetFromTo( NULL, newNode, NodeRef::TAG_FailIfSet ) )
break;
}
}
#ifdef TORQUE_DEBUG
dFetchAndAdd( mNumValues, 1 );
#endif
}
template< typename T >
void ThreadSafeDeque< T >::pushBack( const ValueType& value )
{
NodeRef prevNode;
NodeRef newNode;
NodeRef::unsafeWrite( newNode, new ( mFreeList ) Node( mFreeList, value ) );
while( 1 )
{
prevNode = getTail();
if( !prevNode )
{
newNode->mPrev = NULL;
if( mHead.trySetFromTo( NULL, newNode ) ) // use head so we synchronize with pushFront
break;
}
else
{
newNode->mPrev = prevNode;
if( prevNode->mNext.trySetFromTo( NULL, newNode, NodeRef::TAG_FailIfSet ) )
break;
}
}
#ifdef TORQUE_DEBUG
dFetchAndAdd( mNumValues, 1 );
#endif
}
template< typename T >
bool ThreadSafeDeque< T >::tryPopFront( ValueType& outValue )
{
NodeRef oldHead;
while( 1 )
{
oldHead = getHead();
if( !oldHead )
return false;
// Try to claim the node.
if( oldHead->mPrev.trySetFromTo( NULL, NULL, NodeRef::TAG_SetOrFail ) )
{
if( dCompareAndSwap( oldHead->mIsClaimed, 0, 1 ) )
break;
else
continue;
}
}
outValue = oldHead->mValue;
oldHead = NULL;
// Cleanup.
getHead();
#ifdef TORQUE_DEBUG
dFetchAndAdd( mNumValues, -1 );
#endif
return true;
}
template< typename T >
bool ThreadSafeDeque< T >::tryPopBack( ValueType& outValue )
{
NodeRef oldTail;
while( 1 )
{
oldTail = getTail();
if( !oldTail )
return false;
// Try to claim the node.
if( oldTail->mNext.trySetFromTo( NULL, NULL, NodeRef::TAG_SetOrFail ) )
{
if( dCompareAndSwap( oldTail->mIsClaimed, 0, 1 ) )
break;
}
}
outValue = oldTail->mValue;
oldTail = NULL;
// Cleanup.
getTail();
#ifdef TORQUE_DEBUG
dFetchAndAdd( mNumValues, -1 );
#endif
return true;
}
#include "platform/tmm_on.h"
#endif // _THREADSAFEDEQUE_H_

View file

@ -0,0 +1,192 @@
//-----------------------------------------------------------------------------
// Copyright (c) 2012 GarageGames, LLC
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to
// deal in the Software without restriction, including without limitation the
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
// sell copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
// IN THE SOFTWARE.
//-----------------------------------------------------------------------------
#ifndef _THREADSAFEFREELIST_H_
#define _THREADSAFEFREELIST_H_
#ifndef _THREADSAFEREFCOUNT_H_
# include "platform/threads/threadSafeRefCount.h"
#endif
#ifndef _PLATFORMINTRINSICS_H_
# include "platform/platformIntrinsics.h"
#endif
#include "platform/tmm_off.h"
/// @file
/// Lock-free freelists for concurrent access.
/// Freelist for re-using allocations in a concurrent setting.
///
/// @note Make sure that there are no more allocations in use
/// when the free list is destructed.
/// @note Allocated instances come with a reference already counted
/// on the instance.
///
/// @param T Type of elements to allocate; must be derived from
/// ThreadSafeRefCount and have at least define one additional
/// pointer-sized field.
template< class T >
class ThreadSafeFreeList
{
protected:
T* mFreeList;
#ifdef TORQUE_DEBUG
S32 mNumNodesTotal;
S32 mNumNodesFree;
#endif
T*& getNext( T* ptr )
{
return *( ( T** ) &( ( U8* ) ptr )[ sizeof( T ) - sizeof( T* ) ] );
}
public:
/// Create the freelist.
///
/// @param numPreAlloc Number of instances to pre-allocate.
ThreadSafeFreeList( U32 numPreAlloc = 0 )
: mFreeList( 0 )
{
#ifdef TORQUE_DEBUG
mNumNodesTotal = 0;
mNumNodesFree = 0;
#endif
for( U32 i = 0; i < numPreAlloc; ++ i )
free( alloc() );
}
~ThreadSafeFreeList()
{
#ifdef TORQUE_DEBUG
AssertWarn( mNumNodesTotal == mNumNodesFree,
"ThreadSafeFreeList::~ThreadSafeFreeList() - still got live instances" );
#endif
// Destroy remaining nodes. Not synchronized. We assume all
// concurrent processing to have finished.
while( mFreeList )
{
T* next = getNext( mFreeList );
dFree( mFreeList );
mFreeList = next;
}
}
/// Return memory for a new instance.
void* alloc()
{
T* ptr;
while( 1 )
{
ptr = ThreadSafeRef< T >::safeRead( mFreeList );
if( !ptr )
{
ptr = ( T* ) dMalloc( sizeof( T ) );
dMemset( ptr, 0, sizeof( T ) );
#ifdef TORQUE_DEBUG
dFetchAndAdd( mNumNodesTotal, 1 );
#endif
ptr->addRef();
break;
}
else if( dCompareAndSwap( mFreeList, ptr, getNext( ptr ) ) )
{
#ifdef TORQUE_DEBUG
dFetchAndAdd( mNumNodesFree, -1 );
#endif
ptr->clearLowestBit();
break;
}
else
ptr->release();
}
return ptr;
}
/// Return the memory allocated to the given instance to the freelist.
void free( void* ptr )
{
AssertFatal( ptr, "ThreadSafeFreeList::free() - got a NULL pointer" );
T* node = ( T* ) ptr;
while( 1 )
{
T* list = mFreeList;
getNext( node ) = list;
if( dCompareAndSwap( mFreeList, list, node ) )
break;
}
#ifdef TORQUE_DEBUG
dFetchAndAdd( mNumNodesFree, 1 );
#endif
}
void dumpDebug()
{
#ifdef TORQUE_DEBUG
Platform::outputDebugString( "[ThreadSafeFreeList] total=%i, free=%i",
mNumNodesTotal, mNumNodesFree );
#endif
}
};
/// Baseclass for objects allocated from ThreadSafeFreeLists.
template< class T, class DeletePolicy = DeleteSingle >
class ThreadSafeFreeListNode : public ThreadSafeRefCount< T, DeletePolicy >
{
public:
typedef ThreadSafeRefCount< T, DeletePolicy > Parent;
ThreadSafeFreeListNode()
: Parent( false ) {}
static void* operator new( size_t size, ThreadSafeFreeList< T >& freeList )
{
AssertFatal( size <= sizeof( T ),
"ThreadSafeFreeListNode::new() - size exceeds limit of freelist" );
TORQUE_UNUSED( size );
return freeList.alloc();
}
static void operator delete( void* ptr, ThreadSafeFreeList< T >& freeList )
{
freeList.free( ptr );
}
};
#include "platform/tmm_on.h"
#endif // _THREADSAFEFREELIST_H_

View file

@ -0,0 +1,740 @@
//-----------------------------------------------------------------------------
// Copyright (c) 2012 GarageGames, LLC
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to
// deal in the Software without restriction, including without limitation the
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
// sell copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
// IN THE SOFTWARE.
//-----------------------------------------------------------------------------
#ifndef _THREADSAFEPRIORITYQUEUE_H_
#define _THREADSAFEPRIORITYQUEUE_H_
#ifndef _PLATFORMINTRINSICS_H_
#include "platform/platformIntrinsics.h"
#endif
#ifndef _THREADSAFEREFCOUNT_H_
#include "platform/threads/threadSafeRefCount.h"
#endif
#ifndef _TYPETRAITS_H_
#include "platform/typetraits.h"
#endif
// Disable TMM's new operator grabbing.
#include "platform/tmm_off.h"
//#define DEBUG_SPEW
/// @file
/// Template code for an efficient thread-safe priority queue
/// implementation. There are two alternative implementations to
/// choose from: ThreadSafePriorityQueue and ThreadSafePriorityQueueWithUpdate
/// where the latter adds concurrent status updates of queue items to
/// the former implementation.
//--------------------------------------------------------------------------
// ThreadSafePriorityQueue.
//--------------------------------------------------------------------------
/// Fast, lock-free priority queue implementation for concurrent access.
///
/// Equal priorities are allowed and are placed <em>before</em> existing items of
/// identical priority in the queue.
///
/// Based on (but with significant deviations from) "Fast and Lock-Free Concurrent
/// Priority Queues for Multi-Thread Systems" by Hakan Sundell and Philippas Tsigas.
/// Parts of the skiplist code is based on work by William Pugh.
///
/// @param T The item value type. Must have a default constructor.
/// @param K The priority key type. Must be comparable, have a default constructor,
/// and be a valid template parameter to TypeTraits.
/// @param SORT_MIN_TO_MAX If true, the queue sorts from minimum to maximum priority or
/// the reverse if false.
/// @param MAX_LEVEL The number of levels a node can have at most.
/// @param PROBABILISTIC_BIAS The probabilistic level distribution factor for
/// the skiplist. Multiplied by 100 and turned into int to conform to restrictions
/// on non-type template parameters.
///
/// @see TypeTraits
template< typename T, typename K = F32, bool SORT_MIN_TO_MAX = false, U32 MAX_LEVEL = 4, U32 PROBABILISTIC_BIAS = 50 >
struct ThreadSafePriorityQueue
{
typedef T ValueType;
typedef K KeyType;
enum { MAX_LEVEL_CONST = MAX_LEVEL };
ThreadSafePriorityQueue();
bool isEmpty();
void insert( KeyType priority, const T& value );
bool takeNext( T& outValue, KeyType upToPriority = ( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MAX : TypeTraits< KeyType >::MIN ) );
protected:
struct Node;
typedef ThreadSafeRef< Node > NodePtr;
friend class ThreadSafeRefCount< Node >;
friend struct DeleteSingle;
/// A queue node.
///
/// Nodes are reference-counted to coordinate memory management
/// between the different threads. Reclamation happens on the
/// thread that releases the last reference.
///
/// Reference-counting and deletion requests are kept separate.
/// A given node is marked for deletion and will then have its references
/// progressively disappear and eventually be reclaimed once the
/// reference count drops to zero.
///
/// Note that 'Next' references are released by the destructor which
/// is only called when the reference count to the node itself drops to
/// zero. This is to avoid threads getting trapped in a node with no
/// link out.
struct Node : public ThreadSafeRefCount< Node >
{
typedef ThreadSafeRefCount< Node > Parent;
Node( KeyType priority, const ValueType& value );
~Node();
KeyType getPriority() { return mPriority; }
ValueType& getValue() { return mValue; }
U32 getLevel();
NodePtr& getNext( U32 level );
bool isMarkedForDeletion();
bool tryMarkForDeletion();
void clearValue() { mValue = ValueType(); }
static U32 randomLevel();
void* operator new( size_t size, S32 level = -1 );
void operator delete( void* ptr );
private:
KeyType mPriority; ///< Priority key.
U32 mLevel; ///< Level count and deletion bit (highest).
ValueType mValue;
Node* mNext[ 1 ]; ///< Variable-sized array of next pointers.
struct FreeList
{
bool mDestroyed;
Node* mNodes;
~FreeList();
};
static FreeList smFreeLists[ MAX_LEVEL ];
};
NodePtr mHead; ///< Artificial head node.
NodePtr mTail; ///< Artificial tail node.
void readNext( NodePtr& refPrev, NodePtr& refNext, U32 level );
void scan( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority );
void scanFromHead( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority );
void insert( KeyType priority, const T& value, NodePtr& outResult );
void helpDelete();
};
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
typename ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::FreeList ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::smFreeLists[ MAX_LEVEL ];
/// Construct an empty queue.
///
/// Internally, this creates a head node with maximal priority and a tail node with minimal priority,
/// both at maximum level.
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::ThreadSafePriorityQueue()
{
NodePtr::unsafeWrite( mHead, new ( MAX_LEVEL - 1 )
Node( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MIN : TypeTraits< KeyType >::MAX, ValueType() ) );
NodePtr::unsafeWrite( mTail, new ( MAX_LEVEL - 1 )
Node( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MAX : TypeTraits< KeyType >::MIN, ValueType() ) );
for( U32 level = 0; level < MAX_LEVEL; level ++ )
mHead->getNext( level ) = mTail;
}
/// Return true if the queue does not currently contain an item.
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::isEmpty()
{
return ( mHead->getNext( 0 ) == mTail );
}
/// Insert the given value into the queue at the place determined by the given priority.
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
inline void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::insert( KeyType priority, const ValueType& value )
{
NodePtr result;
insert( priority, value, result );
}
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::insert( KeyType priority, const ValueType& value, NodePtr& outResult )
{
// Create a new node at a random level.
outResult = NULL;
NodePtr::unsafeWrite( outResult, new Node( priority, value ) );
U32 resultNodeLevel = outResult->getLevel();
// Link up all the levels. Do this bottom-up instead of
// top-down (as would be the right way for a skiplist) so
// that our list state always remains valid. If going top-down,
// we'll insert nodes with NULL pointers at their lower levels.
U32 currentLevel = 0;
do
{
while( 1 )
{
NodePtr nextNode;
NodePtr prevNode;
scanFromHead( prevNode, nextNode, currentLevel, priority );
outResult->getNext( currentLevel ) = nextNode;
if( prevNode->getNext( currentLevel ).trySetFromTo( nextNode, outResult, NodePtr::TAG_FailIfSet ) )
break;
else
outResult->getNext( currentLevel ) = 0;
}
currentLevel ++;
}
while( currentLevel <= resultNodeLevel
&& !outResult->isMarkedForDeletion() ); // No point linking up remaining levels if another thread already took this node.
}
/// Take the item with the highest priority from the queue.
///
/// @param outValue Reference to where the resulting value should be stored.
/// @param upToPriority Priority limit (inclusive) up to which items are taken from the queue.
/// @return true if there was a matching item in the queue.
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::takeNext( T& outValue, KeyType upToPriority )
{
// Iterate through to the first unmarked node.
NodePtr prevNode = mHead;
while( 1 )
{
NodePtr node;
readNext( prevNode, node, 0 );
if( node == mTail )
return false; // End reached.
bool priorityThresholdReached = SORT_MIN_TO_MAX
? ( upToPriority >= node->getPriority() )
: ( upToPriority <= node->getPriority() );
if( !priorityThresholdReached )
return false;
else
{
// Try to mark the node for deletion. Only if that succeeds, taking the
// node was a success and we can return. If it fails, spin and try again.
if( node->tryMarkForDeletion() )
{
helpDelete();
// Node is now off the list and will disappear as soon as
// all references held by threads (including this one)
// go out of scope.
outValue = node->getValue();
node->clearValue();
return true;
}
}
}
}
/// Update the given references to the next non-deleted node at the given level.
/// refPrev will be updated to reference the immediate predecessor of the next
/// node returned. Note that this can be a node in deleted state.
///
/// @param refPrev Reference to a node of which the successor node should be
/// returned. Updated to immediate predecessor of refNext on return.
/// @param refNext Reference to update to refer to next non-deleted node on
/// the given level.
/// @param level Skiplist level to operate on.
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
inline void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::readNext( NodePtr& refPrev, NodePtr& refNext, U32 level )
{
while( 1 )
{
refNext = refPrev->getNext( level );
AssertFatal( refNext != NULL, "ThreadSafePriorityQueue::readNext() - next is NULL" );
if( !refNext->isMarkedForDeletion() || refNext == mTail )
break;
refPrev = refNext;
}
}
/// Scan for the position at which to insert a node of the given priority.
/// Upon return, the position between refPrev and refNext is the one to insert at.
///
/// @param refPrev position at which to start scanning; updated to match insert position.
/// @param refNext
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::scan( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority )
{
while( 1 )
{
readNext( refPrev, refNext, level );
if( refNext == mTail
|| ( SORT_MIN_TO_MAX
? ( refNext->getPriority() > priority )
: ( refNext->getPriority() < priority ) ) )
break;
refPrev = refNext;
}
}
///
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::scanFromHead( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority )
{
// Purge dead nodes at left end of queue so
// we don't get stuck hitting the same node
// in deletable state over and over again.
helpDelete();
S32 currentLevel = MAX_LEVEL - 1;
refPrev = mHead;
do
{
scan( refPrev, refNext, currentLevel, priority );
currentLevel --;
}
while( currentLevel >= S32( level ) );
}
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::helpDelete()
{
// Clean out all the references from head.
// Spin over a given reference on each level until head
// clearly refers to a node in non-deletable state. This
// makes this code work cooperatively with other threads
// doing takeNexts on prior or later nodes while also
// guaranteeing that all next pointers to us will eventually
// disappear.
//
// Note that this is *the only place* where we will be cleaning
// out our lists.
S32 level = MAX_LEVEL - 1;
do
{
while( 1 )
{
NodePtr ptr = mHead->getNext( level );
if( !ptr->isMarkedForDeletion() )
break;
else
{
NodePtr& next = ptr->getNext( level );
next.setTag();
mHead->getNext( level ).trySetFromTo( ptr, next, NodePtr::TAG_Unset );
AssertFatal( next->getRefCount() >= 2, "ThreadSafePriorityQueue::helpDelete() - invalid refcount" );
}
}
level --;
}
while( level >= 0 );
}
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
inline ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::Node( KeyType priority, const ValueType& value )
: Parent( false ),
mPriority( priority ),
mValue( value )
{
dMemset( mNext, 0, sizeof( Node* ) * ( getLevel() + 1 ) );
// Level is already set by the allocation routines.
}
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::~Node()
{
for( U32 level = 0; level < ( getLevel() + 1 ); level ++ )
getNext( level ) = NULL;
}
/// Return the skip list level the node is at.
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
inline U32 ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::getLevel()
{
// Mask out the deletion request bit.
return ( mLevel & 0x7FFFFFFF );
}
/// Return the successor node at the given level.
/// @param level The level of the desired successor node; must be within the node's level bounds.
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
inline typename ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::NodePtr& ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::getNext( U32 level )
{
return *reinterpret_cast< NodePtr* >( &mNext[ level ] );
}
/// Return true if the node is marked to be deleted.
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
inline bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::isMarkedForDeletion()
{
return ( mLevel & 0x80000000 );
}
/// Attempt to mark the node for deletion. If the mark bit has not yet been set
/// and setting it on the current thread succeeds, returns true.
///
/// @return true, if the marking succeeded.
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
inline bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::tryMarkForDeletion()
{
U32 oldVal = mLevel & 0x7FFFFFFF;
U32 newVal = oldVal | 0x80000000;
return ( dCompareAndSwap( mLevel, oldVal, newVal ) );
}
/// Choose a random level.
///
/// The chosen level depends on the given PROBABILISTIC_BIAS and MAX_LEVEL,
/// but is not affected by the actual number of nodes in a queue.
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
U32 ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::randomLevel()
{
U32 level = 0;
while( Platform::getRandom() < ( ( ( F32 ) PROBABILISTIC_BIAS ) / 100 ) && level < ( MAX_LEVEL - 1 ) )
level ++;
return level;
}
/// Allocate a new node.
/// The node comes with a reference count of one and its level already set.
///
/// @param level The level to allocate the node at. If this is -1, a random level is chosen.
/// @return a new node.
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
void* ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::operator new( size_t size, S32 level )
{
if( level == -1 )
level = randomLevel();
Node* node = 0;
while( 1 )
{
// Try to take a node from the freelist. If there's none,
// allocate a new one.
if( !smFreeLists[ level ].mDestroyed )
node = Node::safeRead( smFreeLists[ level ].mNodes );
if( !node )
{
node = ( Node* ) dMalloc( sizeof( Node ) + sizeof( Node* ) * level );
dMemset( node, 0, sizeof( Node ) );
node->mLevel = level;
node->addRef();
break;
}
else if( dCompareAndSwap( smFreeLists[ level ].mNodes, node, node->mNext[ 0 ] ) )
{
node->clearLowestBit();
break;
}
else
node->release(); // Other thread was quicker than us; release.
}
AssertFatal( node->getRefCount() != 0, "ThreadSafePriorityQueue::new Node() - invalid refcount" );
AssertFatal( ( node->getRefCount() % 2 ) == 0, "ThreadSafePriorityQueue::new Node() - invalid refcount" );
return node;
}
/// Reclaim a node.
///
/// @param node The node to reclaim. Must refer to a Node instance.
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::operator delete( void* ptr )
{
//TODO: limit number of nodes kept
Node* node = ( Node* ) ptr;
U32 level = node->getLevel();
node->mLevel = level; // Reset the node's deletion bit.
while( !smFreeLists[ level ].mDestroyed )
{
// Put the node on the freelist.
Node* freeList = smFreeLists[ level ].mNodes;
node->mNext[ 0 ] = freeList;
if( dCompareAndSwap( smFreeLists[ level ].mNodes, freeList, node ) )
{
node = NULL;
break;
}
}
if( node )
dFree( node );
}
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::FreeList::~FreeList()
{
mDestroyed = true;
while( mNodes )
{
//FIXME: could leak some bytes under unfortunate circumstances (this in
// combination with mDestroyed is a dependent write)
Node* next = mNodes;
if( dCompareAndSwap( mNodes, next, next->mNext[ 0 ] ) )
dFree( next );
}
}
//--------------------------------------------------------------------------
// ThreadSafePriorityQueueWithUpdate.
//--------------------------------------------------------------------------
/// Fast, lock-free priority queue implementation for concurrent access that
/// performs dynamic re-prioritization of items.
///
/// Within the bounds of a set update interval UPDATE_INTERVAL, the takeNext
/// method is guaranteed to always return the item that has the highest priority
/// at the time the method is called rather than at the time items were inserted
/// into the queue.
///
/// Values placed on the queue must implement the following interface:
///
/// @code
/// template&lt; typename K >
/// struct IThreadSafePriorityQueueItem
/// {
/// // Default constructor.
/// IThreadSafePriorityQueueItem();
///
/// // Return the current priority.
/// // This must run normally even if the item is already dead.
/// K getPriority();
///
/// // Return true if the item is still meant to be waiting in the queue.
/// bool isAlive();
/// };
/// @endcode
template< typename T, typename K, bool SORT_MIN_TO_MAX = false, U32 MAX_LEVEL = 4, U32 PROBABILISTIC_BIAS = 50 >
struct ThreadSafePriorityQueueWithUpdate : public ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >
{
typedef T ValueType;
typedef K KeyType;
enum { DEFAULT_UPDATE_INTERVAL = 256 };
ThreadSafePriorityQueueWithUpdate( U32 updateInterval = DEFAULT_UPDATE_INTERVAL );
void insert( KeyType priority, const T& value );
bool takeNext( T& outValue, KeyType upToPriority = ( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MAX : TypeTraits< KeyType >::MIN ) );
U32 getUpdateInterval() const;
void setUpdateInterval( U32 value );
KeyType getTimeBasedPriorityBoost() const;
void setTimeBasedPriorityBoost( KeyType value );
void updatePriorities();
protected:
typedef ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS > Parent;
typedef U32 TickType;
typedef typename Parent::NodePtr NodePtr;
U32 mUpdateInterval;
KeyType mPriorityBoost; ///< If this is non-zero, priorities will be boosted by this amount each update. This can be used to prevent constant high-priority inserts to starve low-priority items already in the queue.
/// Work queue for node updates.
ThreadSafePriorityQueue< NodePtr, TickType, true, MAX_LEVEL, PROBABILISTIC_BIAS > mUpdateQueue;
TickType getTick() { return Platform::getRealMilliseconds(); }
};
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::ThreadSafePriorityQueueWithUpdate( U32 updateInterval )
: mUpdateInterval( updateInterval ),
mPriorityBoost( TypeTraits< KeyType >::ZERO )
{
}
/// Return the current update interval in milliseconds.
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
U32 ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::getUpdateInterval() const
{
return mUpdateInterval;
}
/// Set update interval of queue to given value.
///
/// <em>Call this method on the main thread only.</em>
///
/// @param value Time between priority updates in milliseconds.
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::setUpdateInterval( U32 value )
{
mUpdateInterval = value;
}
/// Return the delta to apply to priorities on each update.
/// Set to zero to deactivate time-based priority adjustments.
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
K ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::getTimeBasedPriorityBoost() const
{
return mPriorityBoost;
}
/// Set the delta for time-based priority adjustments to the given value.
///
/// <em>Call this method on the main thread only.</em>
///
/// @param value The new priority adjustment value.
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::setTimeBasedPriorityBoost( KeyType value )
{
mPriorityBoost = value;
}
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::insert( KeyType priority, const ValueType& value )
{
NodePtr node;
Parent::insert( priority, value, node );
mUpdateQueue.insert( getTick() + getUpdateInterval(), node );
}
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
bool ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::takeNext( T& outValue, KeyType upToPriority )
{
updatePriorities();
bool result = false;
do
{
result = Parent::takeNext( outValue, upToPriority );
}
while( result && !outValue.isAlive() );
return result;
}
///
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::updatePriorities()
{
TickType currentTime = getTick();
U32 numNodesUpdated = 0;
U32 numNodesDead = 0;
U32 numNodesChanged = 0;
NodePtr node;
while( mUpdateQueue.takeNext( node, currentTime ) )
{
numNodesUpdated ++;
// Since we're updating nodes on the update queue only periodically,
// their associated values or main queue nodes may have died in the
// meantime. If so, we just discard them here.
if( node->getValue().isAlive()
&& !node->isMarkedForDeletion() )
{
KeyType newPriority = node->getValue().getPriority() + getTimeBasedPriorityBoost();
if( newPriority != node->getPriority() )
{
// Node is outdated. Reinsert with new priority and mark the
// old node for deletion.
insert( newPriority, node->getValue() );
node->tryMarkForDeletion();
numNodesChanged ++;
}
else
{
// Node is still current. Just move to end.
mUpdateQueue.insert( currentTime + getUpdateInterval(), node );
}
}
else
numNodesDead ++;
}
#ifdef DEBUG_SPEW
if( numNodesUpdated )
Platform::outputDebugString( "[ThreadSafePriorityQueueWithUpdate] updated %i nodes (%i changed, %i dead)",
numNodesUpdated, numNodesChanged, numNodesDead );
#endif
}
// Re-enable TMM if necessary.
#include "platform/tmm_on.h"
#undef DEBUG_SPEW
#endif // !_THREADSAFEPRIORITYQUEUE_H_

View file

@ -0,0 +1,380 @@
//-----------------------------------------------------------------------------
// Copyright (c) 2012 GarageGames, LLC
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to
// deal in the Software without restriction, including without limitation the
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
// sell copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
// IN THE SOFTWARE.
//-----------------------------------------------------------------------------
#ifndef _THREADSAFEREFCOUNT_H_
#define _THREADSAFEREFCOUNT_H_
#ifndef _PLATFORMINTRINSICS_H_
# include "platform/platformIntrinsics.h"
#endif
#ifndef _TYPETRAITS_H_
# include "platform/typetraits.h"
#endif
/// @file
/// Templated code for concurrent reference-counting.
///
/// Part of this code is based on work by J.D. Valois, Michael M. Maged,
/// and Scott L. Michael.
//--------------------------------------------------------------------------
// ThreadSafeRefCount.
//--------------------------------------------------------------------------
/// Baseclass for concurrently reference-counted objects.
///
/// @note NOTE that freshly instantiated objects start out with a reference
/// count of ZERO! Depending on how this class is used, this may not
/// be desirable, so override this behavior in constructors if necessary.
///
/// @param T the class being reference counted; this is passed to this class,
/// so it can call the correct destructor without having to force users
/// to have virtual methods
template< class T, class DeletePolicy = DeleteSingle >
class ThreadSafeRefCount
{
public:
typedef void Parent;
ThreadSafeRefCount()
: mRefCount( 0 ) {}
ThreadSafeRefCount( bool noSet ) {}
bool isShared() const;
U32 getRefCount() const;
void addRef();
void release();
void clearLowestBit();
static T* safeRead( T* const volatile& refPtr );
protected:
U32 mRefCount; ///< Reference count and claim bit. Note that this increments in steps of two.
static U32 decrementAndTestAndSet( U32& refCount );
};
/// @return true if the object is referenced by more than a single
/// reference.
template< class T, class DeletePolicy >
inline bool ThreadSafeRefCount< T, DeletePolicy >::isShared() const
{
return ( mRefCount > 3 );
}
/// Get the current reference count. This method is mostly meant for
/// debugging and should not normally be used.
template< class T, class DeletePolicy >
inline U32 ThreadSafeRefCount< T, DeletePolicy >::getRefCount() const
{
return mRefCount;
}
/// Increase the reference count on the object.
template< class T, class DeletePolicy >
inline void ThreadSafeRefCount< T, DeletePolicy >::addRef()
{
dFetchAndAdd( mRefCount, 2 );
}
/// Decrease the object's reference count and delete the object, if the count
/// drops to zero and claiming the object by the current thread succeeds.
template< class T, class DeletePolicy >
inline void ThreadSafeRefCount< T, DeletePolicy >::release()
{
AssertFatal( mRefCount != 0, "ThreadSafeRefCount::release() - refcount of zero" );
if( decrementAndTestAndSet( mRefCount ) != 0 )
DeletePolicy::destroy( ( T* ) this );
}
/// Dereference a reference-counted pointer in a multi-thread safe way.
template< class T, class DeletePolicy >
T* ThreadSafeRefCount< T, DeletePolicy >::safeRead( T* const volatile& refPtr )
{
while( 1 )
{
// Support tagged pointers here.
T* ptr = TypeTraits< T* >::getUntaggedPtr( refPtr );
if( !ptr )
return 0;
ptr->addRef();
if( ptr == TypeTraits< T* >::getUntaggedPtr( refPtr ) )
return ptr;
else
ptr->release();
}
}
/// Decrement the given reference count. Return 1 if the count dropped to zero
/// and the claim bit has been successfully set; return 0 otherwise.
template< class T, class DeletePolicy >
U32 ThreadSafeRefCount< T, DeletePolicy >::decrementAndTestAndSet( U32& refCount )
{
U32 oldVal;
U32 newVal;
do
{
oldVal = refCount;
newVal = oldVal - 2;
AssertFatal( oldVal >= 2,
"ThreadSafeRefCount::decrementAndTestAndSet() - invalid refcount" );
if( newVal == 0 )
newVal = 1;
}
while( !dCompareAndSwap( refCount, oldVal, newVal ) );
return ( ( oldVal - newVal ) & 1 );
}
///
template< class T, class DeletePolicy >
inline void ThreadSafeRefCount< T, DeletePolicy >::clearLowestBit()
{
AssertFatal( mRefCount % 2 != 0, "ThreadSafeRefCount::clearLowestBit() - invalid refcount" );
U32 oldVal;
U32 newVal;
do
{
oldVal = mRefCount;
newVal = oldVal - 1;
}
while( !dCompareAndSwap( mRefCount, oldVal, newVal ) );
}
//--------------------------------------------------------------------------
// ThreadSafeRef.
//--------------------------------------------------------------------------
/// Reference to a concurrently reference-counted object.
///
/// This class takes care of the reference-counting as well as protecting
/// the reference itself from concurrent operations.
///
/// Tagging allows the pointer contained in the reference to be flagged.
/// Tag state is preserved through updates to the reference.
///
/// @note If you directly assign a freshly created object with a reference
/// count of zero to a ThreadSafeRef, make absolutely sure the ThreadSafeRef
/// is accessed only by a single thread. Otherwise there's a risk of the
/// object being released and freed in midst of trying to set the reference.
template< class T >
class ThreadSafeRef
{
public:
enum ETag
{
TAG_PreserveOld, ///< Preserve existing tagging state when changing pointer.
TAG_PreserveNew, ///< Preserve tagging state of new pointer when changing pointer.
TAG_Set, ///< Set tag when changing pointer; okay if already set.
TAG_Unset, ///< Unset tag when changing pointer; okay if already unset.
TAG_SetOrFail, ///< Set tag when changing pointer; fail if already set.
TAG_UnsetOrFail, ///< Unset tag when changing pointer; fail if already unset.
TAG_FailIfSet, ///< Fail changing pointer when currently tagged.
TAG_FailIfUnset ///< Fail changing pointer when currently untagged.
};
typedef ThreadSafeRef< T > ThisType;
ThreadSafeRef() : mPtr( 0 ) {}
ThreadSafeRef( T* ptr ) : mPtr( ThreadSafeRefCount< T >::safeRead( ptr ) ) {}
ThreadSafeRef( const ThisType& ref ) : mPtr( ThreadSafeRefCount< T >::safeRead( ref.mPtr ) ) {}
~ThreadSafeRef()
{
T* ptr = NULL;
while( !trySetFromTo( mPtr, ptr ) );
}
T* ptr() const { return getUntaggedPtr( mPtr ) ; }
void setTag() { while( !trySetFromTo( mPtr, mPtr, TAG_Set ) ); }
bool isTagged() const { return isTaggedPtr( mPtr ); }
bool trySetFromTo( T* oldVal, T* const volatile& newVal, ETag tag = TAG_PreserveOld );
bool trySetFromTo( T* oldVal, const ThisType& newVal, ETag tag = TAG_PreserveOld );
bool trySetFromTo( const ThisType& oldVal, const ThisType& newVal, ETag tag = TAG_PreserveOld );
static void unsafeWrite( ThisType& ref, T* ptr );
static T* safeRead( T* const volatile& refPtr ) { return ThreadSafeRefCount< T >::safeRead( refPtr ); }
bool operator ==( T* ptr ) const;
bool operator ==( const ThisType& ref ) const;
bool operator !=( T* ptr ) const { return !( *this == ptr ); }
bool operator !=( const ThisType& ref ) const { return !( *this == ref ); }
ThisType& operator =( T* ptr );
ThisType& operator =( const ThisType& ref );
bool operator !() const { return ( ptr() == 0 ); }
T& operator *() const { return *ptr(); }
T* operator ->() const { return ptr(); }
operator T*() const { return ptr(); }
protected:
T* volatile mPtr;
static bool isTaggedPtr( T* ptr ) { return TypeTraits< T* >::isTaggedPtr( ptr ); }
static T* getTaggedPtr( T* ptr ) { return TypeTraits< T* >::getTaggedPtr( ptr ); }
static T* getUntaggedPtr( T* ptr ) { return TypeTraits< T* >::getUntaggedPtr( ptr ); }
};
/// Update the reference from pointing to oldVal to point to newVal.
/// Do so in a thread-safe way.
///
/// This operation will only succeed, if, when doing the pointer-swapping,
/// the reference still points to oldVal. If, however, the reference
/// has been changed in the meantime by another thread, the operation will
/// fail.
///
/// @param oldVal The pointer assumed to currently be contained in this ThreadSafeRef.
/// @param newVal The pointer to store in this ThreadSafeRef.
/// @param tag Operation to perform on the reference's tag field.
///
/// @return true, if the reference now points to newVal.
template< class T >
bool ThreadSafeRef< T >::trySetFromTo( T* oldVal, T* const volatile& newVal, ETag tag )
{
bool setTag = false;
bool getTag = false;
bool isTagged = isTaggedPtr( oldVal );
switch( tag )
{
case TAG_PreserveOld: setTag = isTaggedPtr( oldVal ); break;
case TAG_PreserveNew: setTag = isTaggedPtr( newVal ); break;
case TAG_Set: setTag = true; break;
case TAG_Unset: setTag = false; break;
case TAG_SetOrFail: setTag = true; getTag = true; break;
case TAG_UnsetOrFail: setTag = false; getTag = true; break;
case TAG_FailIfSet: if( isTagged ) return false; break;
case TAG_FailIfUnset: if( !isTagged ) return false; break;
}
T* newValPtr = ( setTag
? getTaggedPtr( ThreadSafeRefCount< T >::safeRead( newVal ) )
: getUntaggedPtr( ThreadSafeRefCount< T >::safeRead( newVal ) ) );
if( dCompareAndSwap( mPtr,
( getTag
? ( setTag
? getUntaggedPtr( oldVal )
: getTaggedPtr( oldVal ) )
: oldVal ),
newValPtr ) )
{
if( getUntaggedPtr( oldVal ) )
getUntaggedPtr( oldVal )->release();
return true;
}
else
{
if( getUntaggedPtr( newValPtr ) )
getUntaggedPtr( newValPtr )->release();
return false;
}
}
template< class T >
inline bool ThreadSafeRef< T >::trySetFromTo( T* oldVal, const ThisType& newVal, ETag tag )
{
return trySetFromTo( oldVal, newVal.mPtr, tag );
}
template< class T >
inline bool ThreadSafeRef< T >::trySetFromTo( const ThisType& oldVal, const ThisType& newVal, ETag tag )
{
return trySetFromTo( oldVal.mPtr, newVal.mPtr, tag );
}
/// Update ref to point to ptr but <em>do not</em> release an existing
/// reference held by ref nor do the operation in a thread-safe way.
///
/// This method is <em>only</em> for when you absolutely know that your
/// thread is the only thread operating on a reference <em>and</em> you
/// are keeping track of reference counts yourself.
///
/// @param ref The reference to update.
/// @param ptr The new pointer to store in ref.
template< class T >
inline void ThreadSafeRef< T >::unsafeWrite( ThisType& ref, T* ptr )
{
ref.mPtr = ptr;
}
template< class T >
inline bool ThreadSafeRef< T >::operator ==( T* p ) const
{
return ( ptr() == p );
}
template< class T >
inline bool ThreadSafeRef< T >::operator ==( const ThisType& ref ) const
{
return ( ptr() == ref.ptr() );
}
template< class T >
inline ThreadSafeRef< T >& ThreadSafeRef< T >::operator =( T* ptr )
{
while( !trySetFromTo( mPtr, ptr, TAG_PreserveNew ) );
return *this;
}
template< class T >
inline ThreadSafeRef< T >& ThreadSafeRef< T >::operator =( const ThisType& ref )
{
while( !trySetFromTo( mPtr, ref, TAG_PreserveNew ) );
return *this;
}
template< typename T >
struct TypeTraits< ThreadSafeRef< T > > : public TypeTraits< T* > {};
template< typename T >
inline T& Deref( ThreadSafeRef< T >& ref )
{
return *ref;
}
template< typename T >
inline T& Deref( const ThreadSafeRef< T >& ref )
{
return *ref;
}
#endif // _THREADSAFEREFCOUNT_H_