mirror of
https://github.com/TorqueGameEngines/Torque3D.git
synced 2026-01-20 12:44:46 +00:00
741 lines
27 KiB
C++
741 lines
27 KiB
C++
//-----------------------------------------------------------------------------
|
|
// Copyright (c) 2012 GarageGames, LLC
|
|
//
|
|
// Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
// of this software and associated documentation files (the "Software"), to
|
|
// deal in the Software without restriction, including without limitation the
|
|
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
|
// sell copies of the Software, and to permit persons to whom the Software is
|
|
// furnished to do so, subject to the following conditions:
|
|
//
|
|
// The above copyright notice and this permission notice shall be included in
|
|
// all copies or substantial portions of the Software.
|
|
//
|
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
|
// IN THE SOFTWARE.
|
|
//-----------------------------------------------------------------------------
|
|
|
|
#ifndef _THREADSAFEPRIORITYQUEUE_H_
|
|
#define _THREADSAFEPRIORITYQUEUE_H_
|
|
|
|
#ifndef _PLATFORMINTRINSICS_H_
|
|
#include "platform/platformIntrinsics.h"
|
|
#endif
|
|
#ifndef _THREADSAFEREFCOUNT_H_
|
|
#include "platform/threads/threadSafeRefCount.h"
|
|
#endif
|
|
#ifndef _TYPETRAITS_H_
|
|
#include "platform/typetraits.h"
|
|
#endif
|
|
|
|
|
|
// Disable TMM's new operator grabbing.
|
|
#include "platform/tmm_off.h"
|
|
|
|
|
|
//#define DEBUG_SPEW
|
|
|
|
|
|
/// @file
|
|
/// Template code for an efficient thread-safe priority queue
|
|
/// implementation. There are two alternative implementations to
|
|
/// choose from: ThreadSafePriorityQueue and ThreadSafePriorityQueueWithUpdate
|
|
/// where the latter adds concurrent status updates of queue items to
|
|
/// the former implementation.
|
|
|
|
|
|
//--------------------------------------------------------------------------
|
|
// ThreadSafePriorityQueue.
|
|
//--------------------------------------------------------------------------
|
|
|
|
/// Fast, lock-free priority queue implementation for concurrent access.
|
|
///
|
|
/// Equal priorities are allowed and are placed <em>before</em> existing items of
|
|
/// identical priority in the queue.
|
|
///
|
|
/// Based on (but with significant deviations from) "Fast and Lock-Free Concurrent
|
|
/// Priority Queues for Multi-Thread Systems" by Hakan Sundell and Philippas Tsigas.
|
|
/// Parts of the skiplist code is based on work by William Pugh.
|
|
///
|
|
/// @param T The item value type. Must have a default constructor.
|
|
/// @param K The priority key type. Must be comparable, have a default constructor,
|
|
/// and be a valid template parameter to TypeTraits.
|
|
/// @param SORT_MIN_TO_MAX If true, the queue sorts from minimum to maximum priority or
|
|
/// the reverse if false.
|
|
/// @param MAX_LEVEL The number of levels a node can have at most.
|
|
/// @param PROBABILISTIC_BIAS The probabilistic level distribution factor for
|
|
/// the skiplist. Multiplied by 100 and turned into int to conform to restrictions
|
|
/// on non-type template parameters.
|
|
///
|
|
/// @see TypeTraits
|
|
|
|
template< typename T, typename K = F32, bool SORT_MIN_TO_MAX = false, U32 MAX_LEVEL = 4, U32 PROBABILISTIC_BIAS = 50 >
|
|
struct ThreadSafePriorityQueue
|
|
{
|
|
typedef T ValueType;
|
|
typedef K KeyType;
|
|
|
|
enum { MAX_LEVEL_CONST = MAX_LEVEL };
|
|
|
|
ThreadSafePriorityQueue();
|
|
|
|
bool isEmpty();
|
|
void insert( KeyType priority, const T& value );
|
|
bool takeNext( T& outValue, KeyType upToPriority = ( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MAX : TypeTraits< KeyType >::MIN ) );
|
|
|
|
protected:
|
|
struct Node;
|
|
typedef ThreadSafeRef< Node > NodePtr;
|
|
friend class ThreadSafeRefCount< Node >;
|
|
friend struct DeleteSingle;
|
|
|
|
/// A queue node.
|
|
///
|
|
/// Nodes are reference-counted to coordinate memory management
|
|
/// between the different threads. Reclamation happens on the
|
|
/// thread that releases the last reference.
|
|
///
|
|
/// Reference-counting and deletion requests are kept separate.
|
|
/// A given node is marked for deletion and will then have its references
|
|
/// progressively disappear and eventually be reclaimed once the
|
|
/// reference count drops to zero.
|
|
///
|
|
/// Note that 'Next' references are released by the destructor which
|
|
/// is only called when the reference count to the node itself drops to
|
|
/// zero. This is to avoid threads getting trapped in a node with no
|
|
/// link out.
|
|
|
|
struct Node : public ThreadSafeRefCount< Node >
|
|
{
|
|
typedef ThreadSafeRefCount< Node > Parent;
|
|
|
|
Node( KeyType priority, const ValueType& value );
|
|
~Node();
|
|
|
|
KeyType getPriority() { return mPriority; }
|
|
ValueType& getValue() { return mValue; }
|
|
U32 getLevel();
|
|
NodePtr& getNext( U32 level );
|
|
|
|
bool isMarkedForDeletion();
|
|
bool tryMarkForDeletion();
|
|
|
|
void clearValue() { mValue = ValueType(); }
|
|
|
|
static U32 randomLevel();
|
|
|
|
void* operator new( size_t size, S32 level = -1 );
|
|
void operator delete( void* ptr );
|
|
|
|
private:
|
|
KeyType mPriority; ///< Priority key.
|
|
U32 mLevel; ///< Level count and deletion bit (highest).
|
|
ValueType mValue;
|
|
Node* mNext[ 1 ]; ///< Variable-sized array of next pointers.
|
|
|
|
struct FreeList
|
|
{
|
|
bool mDestroyed;
|
|
Node* mNodes;
|
|
|
|
~FreeList();
|
|
};
|
|
|
|
static FreeList smFreeLists[ MAX_LEVEL ];
|
|
};
|
|
|
|
NodePtr mHead; ///< Artificial head node.
|
|
NodePtr mTail; ///< Artificial tail node.
|
|
|
|
void readNext( NodePtr& refPrev, NodePtr& refNext, U32 level );
|
|
void scan( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority );
|
|
void scanFromHead( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority );
|
|
void insert( KeyType priority, const T& value, NodePtr& outResult );
|
|
void helpDelete();
|
|
};
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
typename ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::FreeList ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::smFreeLists[ MAX_LEVEL ];
|
|
|
|
/// Construct an empty queue.
|
|
///
|
|
/// Internally, this creates a head node with maximal priority and a tail node with minimal priority,
|
|
/// both at maximum level.
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::ThreadSafePriorityQueue()
|
|
{
|
|
NodePtr::unsafeWrite( mHead, new ( MAX_LEVEL - 1 )
|
|
Node( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MIN : TypeTraits< KeyType >::MAX, ValueType() ) );
|
|
NodePtr::unsafeWrite( mTail, new ( MAX_LEVEL - 1 )
|
|
Node( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MAX : TypeTraits< KeyType >::MIN, ValueType() ) );
|
|
|
|
for( U32 level = 0; level < MAX_LEVEL; level ++ )
|
|
mHead->getNext( level ) = mTail;
|
|
}
|
|
|
|
/// Return true if the queue does not currently contain an item.
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::isEmpty()
|
|
{
|
|
return ( mHead->getNext( 0 ) == mTail );
|
|
}
|
|
|
|
/// Insert the given value into the queue at the place determined by the given priority.
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
inline void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::insert( KeyType priority, const ValueType& value )
|
|
{
|
|
NodePtr result;
|
|
insert( priority, value, result );
|
|
}
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::insert( KeyType priority, const ValueType& value, NodePtr& outResult )
|
|
{
|
|
// Create a new node at a random level.
|
|
|
|
outResult = NULL;
|
|
NodePtr::unsafeWrite( outResult, new Node( priority, value ) );
|
|
U32 resultNodeLevel = outResult->getLevel();
|
|
|
|
// Link up all the levels. Do this bottom-up instead of
|
|
// top-down (as would be the right way for a skiplist) so
|
|
// that our list state always remains valid. If going top-down,
|
|
// we'll insert nodes with NULL pointers at their lower levels.
|
|
|
|
U32 currentLevel = 0;
|
|
do
|
|
{
|
|
while( 1 )
|
|
{
|
|
NodePtr nextNode;
|
|
NodePtr prevNode;
|
|
|
|
scanFromHead( prevNode, nextNode, currentLevel, priority );
|
|
|
|
outResult->getNext( currentLevel ) = nextNode;
|
|
if( prevNode->getNext( currentLevel ).trySetFromTo( nextNode, outResult, NodePtr::TAG_FailIfSet ) )
|
|
break;
|
|
else
|
|
outResult->getNext( currentLevel ) = 0;
|
|
}
|
|
|
|
currentLevel ++;
|
|
}
|
|
while( currentLevel <= resultNodeLevel
|
|
&& !outResult->isMarkedForDeletion() ); // No point linking up remaining levels if another thread already took this node.
|
|
}
|
|
|
|
/// Take the item with the highest priority from the queue.
|
|
///
|
|
/// @param outValue Reference to where the resulting value should be stored.
|
|
/// @param upToPriority Priority limit (inclusive) up to which items are taken from the queue.
|
|
/// @return true if there was a matching item in the queue.
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::takeNext( T& outValue, KeyType upToPriority )
|
|
{
|
|
// Iterate through to the first unmarked node.
|
|
|
|
NodePtr prevNode = mHead;
|
|
while( 1 )
|
|
{
|
|
NodePtr node;
|
|
readNext( prevNode, node, 0 );
|
|
|
|
if( node == mTail )
|
|
return false; // End reached.
|
|
|
|
bool priorityThresholdReached = SORT_MIN_TO_MAX
|
|
? ( upToPriority >= node->getPriority() )
|
|
: ( upToPriority <= node->getPriority() );
|
|
|
|
if( !priorityThresholdReached )
|
|
return false;
|
|
else
|
|
{
|
|
// Try to mark the node for deletion. Only if that succeeds, taking the
|
|
// node was a success and we can return. If it fails, spin and try again.
|
|
|
|
if( node->tryMarkForDeletion() )
|
|
{
|
|
helpDelete();
|
|
|
|
// Node is now off the list and will disappear as soon as
|
|
// all references held by threads (including this one)
|
|
// go out of scope.
|
|
|
|
outValue = node->getValue();
|
|
node->clearValue();
|
|
|
|
return true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Update the given references to the next non-deleted node at the given level.
|
|
/// refPrev will be updated to reference the immediate predecessor of the next
|
|
/// node returned. Note that this can be a node in deleted state.
|
|
///
|
|
/// @param refPrev Reference to a node of which the successor node should be
|
|
/// returned. Updated to immediate predecessor of refNext on return.
|
|
/// @param refNext Reference to update to refer to next non-deleted node on
|
|
/// the given level.
|
|
/// @param level Skiplist level to operate on.
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
inline void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::readNext( NodePtr& refPrev, NodePtr& refNext, U32 level )
|
|
{
|
|
while( 1 )
|
|
{
|
|
refNext = refPrev->getNext( level );
|
|
AssertFatal( refNext != NULL, "ThreadSafePriorityQueue::readNext() - next is NULL" );
|
|
if( !refNext->isMarkedForDeletion() || refNext == mTail )
|
|
break;
|
|
|
|
refPrev = refNext;
|
|
}
|
|
}
|
|
|
|
/// Scan for the position at which to insert a node of the given priority.
|
|
/// Upon return, the position between refPrev and refNext is the one to insert at.
|
|
///
|
|
/// @param refPrev position at which to start scanning; updated to match insert position.
|
|
/// @param refNext
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::scan( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority )
|
|
{
|
|
while( 1 )
|
|
{
|
|
readNext( refPrev, refNext, level );
|
|
if( refNext == mTail
|
|
|| ( SORT_MIN_TO_MAX
|
|
? ( refNext->getPriority() > priority )
|
|
: ( refNext->getPriority() < priority ) ) )
|
|
break;
|
|
|
|
refPrev = refNext;
|
|
}
|
|
}
|
|
|
|
///
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::scanFromHead( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority )
|
|
{
|
|
// Purge dead nodes at left end of queue so
|
|
// we don't get stuck hitting the same node
|
|
// in deletable state over and over again.
|
|
helpDelete();
|
|
|
|
S32 currentLevel = MAX_LEVEL - 1;
|
|
refPrev = mHead;
|
|
do
|
|
{
|
|
scan( refPrev, refNext, currentLevel, priority );
|
|
currentLevel --;
|
|
}
|
|
while( currentLevel >= S32( level ) );
|
|
}
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::helpDelete()
|
|
{
|
|
// Clean out all the references from head.
|
|
// Spin over a given reference on each level until head
|
|
// clearly refers to a node in non-deletable state. This
|
|
// makes this code work cooperatively with other threads
|
|
// doing takeNexts on prior or later nodes while also
|
|
// guaranteeing that all next pointers to us will eventually
|
|
// disappear.
|
|
//
|
|
// Note that this is *the only place* where we will be cleaning
|
|
// out our lists.
|
|
|
|
S32 level = MAX_LEVEL - 1;
|
|
do
|
|
{
|
|
while( 1 )
|
|
{
|
|
NodePtr ptr = mHead->getNext( level );
|
|
if( !ptr->isMarkedForDeletion() )
|
|
break;
|
|
else
|
|
{
|
|
NodePtr& next = ptr->getNext( level );
|
|
next.setTag();
|
|
mHead->getNext( level ).trySetFromTo( ptr, next, NodePtr::TAG_Unset );
|
|
AssertFatal( next->getRefCount() >= 2, "ThreadSafePriorityQueue::helpDelete() - invalid refcount" );
|
|
}
|
|
}
|
|
|
|
level --;
|
|
}
|
|
while( level >= 0 );
|
|
}
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
inline ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::Node( KeyType priority, const ValueType& value )
|
|
: Parent( false ),
|
|
mPriority( priority ),
|
|
mValue( value )
|
|
{
|
|
dMemset( mNext, 0, sizeof( Node* ) * (U32)( getLevel() + 1 ) );
|
|
|
|
// Level is already set by the allocation routines.
|
|
}
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::~Node()
|
|
{
|
|
for( U32 level = 0; level < ( getLevel() + 1 ); level ++ )
|
|
getNext( level ) = NULL;
|
|
}
|
|
|
|
/// Return the skip list level the node is at.
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
inline U32 ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::getLevel()
|
|
{
|
|
// Mask out the deletion request bit.
|
|
|
|
return ( mLevel & 0x7FFFFFFF );
|
|
}
|
|
|
|
/// Return the successor node at the given level.
|
|
/// @param level The level of the desired successor node; must be within the node's level bounds.
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
inline typename ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::NodePtr& ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::getNext( U32 level )
|
|
{
|
|
return *reinterpret_cast< NodePtr* >( &mNext[ level ] );
|
|
}
|
|
|
|
/// Return true if the node is marked to be deleted.
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
inline bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::isMarkedForDeletion()
|
|
{
|
|
return ( mLevel & 0x80000000 );
|
|
}
|
|
|
|
/// Attempt to mark the node for deletion. If the mark bit has not yet been set
|
|
/// and setting it on the current thread succeeds, returns true.
|
|
///
|
|
/// @return true, if the marking succeeded.
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
inline bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::tryMarkForDeletion()
|
|
{
|
|
U32 oldVal = mLevel & 0x7FFFFFFF;
|
|
U32 newVal = oldVal | 0x80000000;
|
|
|
|
return ( dCompareAndSwap( mLevel, oldVal, newVal ) );
|
|
}
|
|
|
|
/// Choose a random level.
|
|
///
|
|
/// The chosen level depends on the given PROBABILISTIC_BIAS and MAX_LEVEL,
|
|
/// but is not affected by the actual number of nodes in a queue.
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
U32 ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::randomLevel()
|
|
{
|
|
U32 level = 0;
|
|
while( Platform::getRandom() < ( ( ( F32 ) PROBABILISTIC_BIAS ) / 100 ) && level < ( MAX_LEVEL - 1 ) )
|
|
level ++;
|
|
return level;
|
|
}
|
|
|
|
/// Allocate a new node.
|
|
/// The node comes with a reference count of one and its level already set.
|
|
///
|
|
/// @param level The level to allocate the node at. If this is -1, a random level is chosen.
|
|
/// @return a new node.
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
void* ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::operator new( size_t size, S32 level )
|
|
{
|
|
if( level == -1 )
|
|
level = randomLevel();
|
|
|
|
Node* node = 0;
|
|
while( 1 )
|
|
{
|
|
// Try to take a node from the freelist. If there's none,
|
|
// allocate a new one.
|
|
|
|
if( !smFreeLists[ level ].mDestroyed )
|
|
node = Node::safeRead( smFreeLists[ level ].mNodes );
|
|
|
|
if( !node )
|
|
{
|
|
node = ( Node* ) dMalloc( sizeof( Node ) + sizeof( Node* ) * level );
|
|
dMemset( node, 0, sizeof( Node ) );
|
|
node->mLevel = level;
|
|
node->addRef();
|
|
break;
|
|
}
|
|
else if( dCompareAndSwap( smFreeLists[ level ].mNodes, node, node->mNext[ 0 ] ) )
|
|
{
|
|
node->clearLowestBit();
|
|
break;
|
|
}
|
|
else
|
|
node->release(); // Other thread was quicker than us; release.
|
|
}
|
|
|
|
AssertFatal( node->getRefCount() != 0, "ThreadSafePriorityQueue::new Node() - invalid refcount" );
|
|
AssertFatal( ( node->getRefCount() % 2 ) == 0, "ThreadSafePriorityQueue::new Node() - invalid refcount" );
|
|
return node;
|
|
}
|
|
|
|
/// Reclaim a node.
|
|
///
|
|
/// @param node The node to reclaim. Must refer to a Node instance.
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::operator delete( void* ptr )
|
|
{
|
|
//TODO: limit number of nodes kept
|
|
|
|
Node* node = ( Node* ) ptr;
|
|
U32 level = node->getLevel();
|
|
node->mLevel = level; // Reset the node's deletion bit.
|
|
|
|
while( !smFreeLists[ level ].mDestroyed )
|
|
{
|
|
// Put the node on the freelist.
|
|
|
|
Node* freeList = smFreeLists[ level ].mNodes;
|
|
node->mNext[ 0 ] = freeList;
|
|
|
|
if( dCompareAndSwap( smFreeLists[ level ].mNodes, freeList, node ) )
|
|
{
|
|
node = NULL;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if( node )
|
|
dFree( node );
|
|
}
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::FreeList::~FreeList()
|
|
{
|
|
mDestroyed = true;
|
|
while( mNodes )
|
|
{
|
|
//FIXME: could leak some bytes under unfortunate circumstances (this in
|
|
// combination with mDestroyed is a dependent write)
|
|
|
|
Node* next = mNodes;
|
|
if( dCompareAndSwap( mNodes, next, next->mNext[ 0 ] ) )
|
|
dFree( next );
|
|
}
|
|
}
|
|
|
|
//--------------------------------------------------------------------------
|
|
// ThreadSafePriorityQueueWithUpdate.
|
|
//--------------------------------------------------------------------------
|
|
|
|
/// Fast, lock-free priority queue implementation for concurrent access that
|
|
/// performs dynamic re-prioritization of items.
|
|
///
|
|
/// Within the bounds of a set update interval UPDATE_INTERVAL, the takeNext
|
|
/// method is guaranteed to always return the item that has the highest priority
|
|
/// at the time the method is called rather than at the time items were inserted
|
|
/// into the queue.
|
|
///
|
|
/// Values placed on the queue must implement the following interface:
|
|
///
|
|
/// @code
|
|
/// template< typename K >
|
|
/// struct IThreadSafePriorityQueueItem
|
|
/// {
|
|
/// // Default constructor.
|
|
/// IThreadSafePriorityQueueItem();
|
|
///
|
|
/// // Return the current priority.
|
|
/// // This must run normally even if the item is already dead.
|
|
/// K getPriority();
|
|
///
|
|
/// // Return true if the item is still meant to be waiting in the queue.
|
|
/// bool isAlive();
|
|
/// };
|
|
/// @endcode
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX = false, U32 MAX_LEVEL = 4, U32 PROBABILISTIC_BIAS = 50 >
|
|
struct ThreadSafePriorityQueueWithUpdate : public ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >
|
|
{
|
|
|
|
typedef T ValueType;
|
|
typedef K KeyType;
|
|
|
|
enum { DEFAULT_UPDATE_INTERVAL = 256 };
|
|
|
|
ThreadSafePriorityQueueWithUpdate( U32 updateInterval = DEFAULT_UPDATE_INTERVAL );
|
|
|
|
void insert( KeyType priority, const T& value );
|
|
bool takeNext( T& outValue, KeyType upToPriority = ( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MAX : TypeTraits< KeyType >::MIN ) );
|
|
|
|
U32 getUpdateInterval() const;
|
|
void setUpdateInterval( U32 value );
|
|
|
|
KeyType getTimeBasedPriorityBoost() const;
|
|
void setTimeBasedPriorityBoost( KeyType value );
|
|
|
|
void updatePriorities();
|
|
|
|
protected:
|
|
typedef ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS > Parent;
|
|
typedef U32 TickType;
|
|
typedef typename Parent::NodePtr NodePtr;
|
|
|
|
U32 mUpdateInterval;
|
|
KeyType mPriorityBoost; ///< If this is non-zero, priorities will be boosted by this amount each update. This can be used to prevent constant high-priority inserts to starve low-priority items already in the queue.
|
|
|
|
/// Work queue for node updates.
|
|
ThreadSafePriorityQueue< NodePtr, TickType, true, MAX_LEVEL, PROBABILISTIC_BIAS > mUpdateQueue;
|
|
|
|
TickType getTick() { return Platform::getRealMilliseconds(); }
|
|
};
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::ThreadSafePriorityQueueWithUpdate( U32 updateInterval )
|
|
: mUpdateInterval( updateInterval ),
|
|
mPriorityBoost( TypeTraits< KeyType >::ZERO )
|
|
{
|
|
}
|
|
|
|
/// Return the current update interval in milliseconds.
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
U32 ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::getUpdateInterval() const
|
|
{
|
|
return mUpdateInterval;
|
|
}
|
|
|
|
/// Set update interval of queue to given value.
|
|
///
|
|
/// <em>Call this method on the main thread only.</em>
|
|
///
|
|
/// @param value Time between priority updates in milliseconds.
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::setUpdateInterval( U32 value )
|
|
{
|
|
mUpdateInterval = value;
|
|
}
|
|
|
|
/// Return the delta to apply to priorities on each update.
|
|
/// Set to zero to deactivate time-based priority adjustments.
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
K ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::getTimeBasedPriorityBoost() const
|
|
{
|
|
return mPriorityBoost;
|
|
}
|
|
|
|
/// Set the delta for time-based priority adjustments to the given value.
|
|
///
|
|
/// <em>Call this method on the main thread only.</em>
|
|
///
|
|
/// @param value The new priority adjustment value.
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::setTimeBasedPriorityBoost( KeyType value )
|
|
{
|
|
mPriorityBoost = value;
|
|
}
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::insert( KeyType priority, const ValueType& value )
|
|
{
|
|
NodePtr node;
|
|
Parent::insert( priority, value, node );
|
|
mUpdateQueue.insert( getTick() + getUpdateInterval(), node );
|
|
}
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
bool ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::takeNext( T& outValue, KeyType upToPriority )
|
|
{
|
|
updatePriorities();
|
|
|
|
bool result = false;
|
|
do
|
|
{
|
|
result = Parent::takeNext( outValue, upToPriority );
|
|
}
|
|
while( result && !outValue.isAlive() );
|
|
|
|
return result;
|
|
}
|
|
|
|
///
|
|
|
|
template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
|
|
void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::updatePriorities()
|
|
{
|
|
TickType currentTime = getTick();
|
|
U32 numNodesUpdated = 0;
|
|
U32 numNodesDead = 0;
|
|
U32 numNodesChanged = 0;
|
|
|
|
NodePtr node;
|
|
while( mUpdateQueue.takeNext( node, currentTime ) )
|
|
{
|
|
numNodesUpdated ++;
|
|
|
|
// Since we're updating nodes on the update queue only periodically,
|
|
// their associated values or main queue nodes may have died in the
|
|
// meantime. If so, we just discard them here.
|
|
|
|
if( node->getValue().isAlive()
|
|
&& !node->isMarkedForDeletion() )
|
|
{
|
|
KeyType newPriority = node->getValue().getPriority() + getTimeBasedPriorityBoost();
|
|
if( newPriority != node->getPriority() )
|
|
{
|
|
// Node is outdated. Reinsert with new priority and mark the
|
|
// old node for deletion.
|
|
|
|
insert( newPriority, node->getValue() );
|
|
node->tryMarkForDeletion();
|
|
numNodesChanged ++;
|
|
}
|
|
else
|
|
{
|
|
// Node is still current. Just move to end.
|
|
|
|
mUpdateQueue.insert( currentTime + getUpdateInterval(), node );
|
|
}
|
|
}
|
|
else
|
|
numNodesDead ++;
|
|
}
|
|
|
|
#ifdef DEBUG_SPEW
|
|
if( numNodesUpdated )
|
|
Platform::outputDebugString( "[ThreadSafePriorityQueueWithUpdate] updated %i nodes (%i changed, %i dead)",
|
|
numNodesUpdated, numNodesChanged, numNodesDead );
|
|
#endif
|
|
}
|
|
|
|
// Re-enable TMM if necessary.
|
|
#include "platform/tmm_on.h"
|
|
|
|
#undef DEBUG_SPEW
|
|
|
|
#endif // !_THREADSAFEPRIORITYQUEUE_H_
|