Torque3D/Engine/source/platform/async/asyncBufferedStream.h
marauder2k7 eb33fe04af working vhacd
renamed ThreadPool to TorqueThreadPool to avoid conflics
fixed data transmission between stages of convexDecome and trimesh creation
TODO: re-add our own functions for generating sphere/cylinder/box
2024-05-12 14:43:56 +01:00

418 lines
14 KiB
C++

//-----------------------------------------------------------------------------
// Copyright (c) 2012 GarageGames, LLC
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to
// deal in the Software without restriction, including without limitation the
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
// sell copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
// IN THE SOFTWARE.
//-----------------------------------------------------------------------------
#ifndef _ASYNCBUFFEREDSTREAM_H_
#define _ASYNCBUFFEREDSTREAM_H_
#ifndef _TSTREAM_H_
#include "core/stream/tStream.h"
#endif
#ifndef _THREADPOOL_H_
#include "platform/threads/threadPool.h"
#endif
#ifndef _THREADSAFEDEQUE_H_
#include "platform/threads/threadSafeDeque.h"
#endif
// Disable nonsense warning about unreferenced
// local function on VC.
#ifdef TORQUE_COMPILER_VISUALC
#pragma warning( disable: 4505 )
#endif
template< typename T, class Stream >
class AsyncBufferedReadItem;
//=============================================================================
// AsyncBufferedInputStream.
//=============================================================================
///
template< typename T, class Stream = IInputStream< T >* >
class AsyncBufferedInputStream : public IInputStreamFilter< T, Stream >,
public ThreadSafeRefCount< AsyncBufferedInputStream< T, Stream > >
{
public:
typedef IInputStreamFilter< T, Stream > Parent;
/// Type of elements read, buffered, and returned by this stream.
typedef typename Parent::ElementType ElementType;
/// Type of the source stream being read by this stream.
typedef typename Parent::SourceStreamType SourceStreamType;
/// Type of elements being read from the source stream.
///
/// @note This does not need to correspond to the type of elements buffered
/// in this stream.
typedef typename Parent::SourceElementType SourceElementType;
enum
{
/// The number of elements to buffer in advance by default.
DEFAULT_STREAM_LOOKAHEAD = 3
};
friend class AsyncBufferedReadItem< T, Stream >; // _onArrival
protected:
/// Stream elements are kept on deques that can be concurrently
/// accessed by multiple threads.
typedef ThreadSafeDeque< ElementType > ElementList;
/// If true, the stream will restart over from the beginning once
/// it has been read in entirety.
bool mIsLooping;
/// If true, no further requests should be issued on this stream.
/// @note This in itself doesn't say anything about pending requests.
bool mIsStopped;
/// Number of source elements remaining in the source stream.
U32 mNumRemainingSourceElements;
/// Number of elements currently on buffer list.
U32 mNumBufferedElements;
/// Maximum number of elements allowed on buffer list.
U32 mMaxBufferedElements;
/// List of buffered elements.
ElementList mBufferedElements;
/// The thread pool to which read items are queued.
TorqueThreadPool* mThreadPool;
/// The thread context used for prioritizing read items in the pool.
ThreadContext* mThreadContext;
/// Request the next element from the underlying stream.
virtual void _requestNext() = 0;
/// Called when an element read has been completed on the underlying stream.
virtual void _onArrival( const ElementType& element );
public:
/// Construct a new buffered stream reading from "source".
///
/// @param stream The source stream from which to read the actual data elements.
/// @param numSourceElementsToRead Total number of elements to read from "stream".
/// @param numReadAhead Number of packets to read and buffer in advance.
/// @param isLooping If true, the packet stream will loop infinitely over the source stream.
/// @param pool The ThreadPool to use for asynchronous packet reads.
/// @param context The ThreadContext to place asynchronous packet reads in.
AsyncBufferedInputStream( const Stream& stream,
U32 numSourceElementsToRead = 0,
U32 numReadAhead = DEFAULT_STREAM_LOOKAHEAD,
bool isLooping = false,
TorqueThreadPool* pool = &TorqueThreadPool::GLOBAL(),
ThreadContext* context = ThreadContext::ROOT_CONTEXT() );
virtual ~AsyncBufferedInputStream();
/// @return true if the stream is looping infinitely.
bool isLooping() const { return mIsLooping; }
/// @return the number of elements that will be read and buffered in advance.
U32 getReadAhead() const { return mMaxBufferedElements; }
/// Initiate the request chain of the element stream.
void start() { _requestNext(); }
/// Call for the request chain of the element stream to stop at the next
/// synchronization point.
void stop() { mIsStopped = true; }
// IInputStream.
U32 read( ElementType* buffer, U32 num ) override;
};
//-----------------------------------------------------------------------------
template< typename T, typename Stream >
AsyncBufferedInputStream< T, Stream >::AsyncBufferedInputStream
( const Stream& stream,
U32 numSourceElementsToRead,
U32 numReadAhead,
bool isLooping,
TorqueThreadPool* threadPool,
ThreadContext* threadContext )
: Parent( stream ),
mIsLooping( isLooping ),
mIsStopped( false ),
mNumRemainingSourceElements( numSourceElementsToRead ),
mNumBufferedElements( 0 ),
mMaxBufferedElements( numReadAhead ),
mThreadPool( threadPool ),
mThreadContext( threadContext )
{
if( mIsLooping )
{
// Stream is looping so we don't count down source elements.
mNumRemainingSourceElements = 0;
}
else if( !mNumRemainingSourceElements )
{
// If not given number of elements to read, see if the source
// stream is sizeable. If so, take its size as the number of elements.
if( dynamic_cast< ISizeable<>* >( &Deref( stream ) ) )
mNumRemainingSourceElements = ( ( ISizeable<>* ) &Deref( stream ) )->getSize();
else
{
// Can't tell how many source elements there are.
mNumRemainingSourceElements = U32_MAX;
}
}
}
//-----------------------------------------------------------------------------
template< typename T, typename Stream >
AsyncBufferedInputStream< T, Stream >::~AsyncBufferedInputStream()
{
ElementType element;
while( mBufferedElements.tryPopFront( element ) )
destructSingle( element );
}
//-----------------------------------------------------------------------------
template< typename T, typename Stream >
void AsyncBufferedInputStream< T, Stream >::_onArrival( const ElementType& element )
{
mBufferedElements.pushBack( element );
// Adjust buffer count.
while( 1 )
{
S32 numBuffered = mNumBufferedElements;
if( dCompareAndSwap( mNumBufferedElements, numBuffered, numBuffered + 1 ) )
{
// If we haven't run against the lookahead limit and haven't reached
// the end of the stream, immediately trigger a new request.
if( !mIsStopped && ( numBuffered + 1 ) < mMaxBufferedElements )
_requestNext();
break;
}
}
}
//-----------------------------------------------------------------------------
template< typename T, typename Stream >
U32 AsyncBufferedInputStream< T, Stream >::read( ElementType* buffer, U32 num )
{
if( !num )
return 0;
U32 numRead = 0;
for( U32 i = 0; i < num; ++ i )
{
// Try to pop a element off the buffered element list.
ElementType element;
if( mBufferedElements.tryPopFront( element ) )
{
buffer[ i ] = element;
numRead ++;
}
else
break;
}
// Get the request chain going again, if it has stopped.
while( 1 )
{
U32 numBuffered = mNumBufferedElements;
U32 newNumBuffered = numBuffered - numRead;
if( dCompareAndSwap( mNumBufferedElements, numBuffered, newNumBuffered ) )
{
if( numBuffered == mMaxBufferedElements )
_requestNext();
break;
}
}
return numRead;
}
//=============================================================================
// AsyncSingleBufferedInputStream.
//=============================================================================
/// Asynchronous work item for reading an element from the source stream.
template< typename T, typename Stream = IInputStream< T >* >
class AsyncBufferedReadItem : public ThreadWorkItem
{
public:
typedef ThreadWorkItem Parent;
typedef ThreadSafeRef< AsyncBufferedInputStream< T, Stream > > AsyncStreamRef;
protected:
/// The issueing async state.
AsyncStreamRef mAsyncStream;
///
Stream mSourceStream;
/// The element read from the stream.
T mElement;
// WorkItem
void execute() override
{
if( Deref( mSourceStream ).read( &mElement, 1 ) )
{
// Buffer the element.
if( this->cancellationPoint() ) return;
mAsyncStream->_onArrival( mElement );
}
}
void onCancelled() override
{
Parent::onCancelled();
destructSingle( mElement );
mAsyncStream = NULL;
}
public:
///
AsyncBufferedReadItem(
const AsyncStreamRef& asyncStream,
TorqueThreadPool::Context* context = NULL
)
: Parent( context ),
mAsyncStream( asyncStream ),
mSourceStream( asyncStream->getSourceStream() )
{
}
};
/// A stream filter that performs background read-aheads on its source stream
/// and buffers the results.
///
/// As each element is read in an independent threaded operation, reading an
/// element should invole a certain amount of work for using this class to
/// make sense.
///
/// @note For looping streams, the stream must implement the IResettable interface.
///
template< typename T, typename Stream = IInputStream< T >*, class ReadItem = AsyncBufferedReadItem< T, Stream > >
class AsyncSingleBufferedInputStream : public AsyncBufferedInputStream< T, Stream >
{
public:
typedef AsyncBufferedInputStream< T, Stream > Parent;
typedef typename Parent::ElementType ElementType;
typedef typename Parent::SourceElementType SourceElementType;
typedef typename Parent::SourceStreamType SourceStreamType;
protected:
// AsyncBufferedInputStream.
void _requestNext() override;
/// Create a new work item that reads the next element.
virtual void _newReadItem( ThreadSafeRef< ThreadWorkItem >& outRef )
{
outRef = new ReadItem( this, this->mThreadContext );
}
public:
/// Construct a new buffered stream reading from "source".
///
/// @param stream The source stream from which to read the actual data elements.
/// @param numSourceElementsToRead Total number of elements to read from "stream".
/// @param numReadAhead Number of packets to read and buffer in advance.
/// @param isLooping If true, the packet stream will loop infinitely over the source stream.
/// @param pool The ThreadPool to use for asynchronous packet reads.
/// @param context The ThreadContext to place asynchronous packet reads in.
AsyncSingleBufferedInputStream( const Stream& stream,
U32 numSourceElementsToRead = 0,
U32 numReadAhead = Parent::DEFAULT_STREAM_LOOKAHEAD,
bool isLooping = false,
TorqueThreadPool* pool = &TorqueThreadPool::GLOBAL(),
ThreadContext* context = ThreadContext::ROOT_CONTEXT() )
: Parent( stream,
numSourceElementsToRead,
numReadAhead,
isLooping,
pool,
context ) {}
};
//-----------------------------------------------------------------------------
template< typename T, typename Stream, class ReadItem >
void AsyncSingleBufferedInputStream< T, Stream, ReadItem >::_requestNext()
{
Stream& stream = this->getSourceStream();
bool isEOS = !this->mNumRemainingSourceElements;
if( isEOS && this->mIsLooping )
{
SourceStreamType* s = &Deref( stream );
dynamic_cast< IResettable* >( s )->reset();
isEOS = false;
}
else if( isEOS )
return;
//TODO: could scale priority depending on feed status
// Queue a stream packet work item.
if( !this->mIsLooping && this->mNumRemainingSourceElements != U32_MAX )
-- this->mNumRemainingSourceElements;
ThreadSafeRef< ThreadWorkItem > workItem;
_newReadItem( workItem );
this->mThreadPool->queueWorkItem( workItem );
}
#endif // !_ASYNCBUFFEREDSTREAM_H_