Torque3D/Engine/source/platform/threads/threadPoolAsyncIO.h
marauder2k7 81a913616c revert ThreadPool rename
revert ThreadPool rename, resources ThreadPool class is already nested in namespace VHACD
2024-05-12 21:59:18 +01:00

358 lines
12 KiB
C++

//-----------------------------------------------------------------------------
// Copyright (c) 2012 GarageGames, LLC
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to
// deal in the Software without restriction, including without limitation the
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
// sell copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
// IN THE SOFTWARE.
//-----------------------------------------------------------------------------
#ifndef _THREADPOOLASYNCIO_H_
#define _THREADPOOLASYNCIO_H_
#ifndef _THREADPOOL_H_
# include "platform/threads/threadPool.h"
#endif
#ifndef _RAWDATA_H_
# include "core/util/rawData.h"
#endif
#ifndef _TSTREAM_H_
# include "core/stream/tStream.h"
#endif
//RDTODO: I/O error handling
/// @file
/// Thread pool work items for asynchronous stream I/O.
/// Through the use of stream filters, this can be basically used for any
/// type of asynchronous stream processing.
//--------------------------------------------------------------------------
// AsyncIOItem.
//--------------------------------------------------------------------------
/// Abstract superclass of async I/O work items.
///
/// Supports both offset-based stream I/O as well as I/O on streams with
/// implicit positions. Note that if you use the latter type, make sure
/// that no other thread is messing with the stream at the same time or
/// chaos will ensue.
///
/// @param T Type of elements being streamed.
template< typename T, class Stream >
class AsyncIOItem : public ThreadPool::WorkItem
{
public:
typedef WorkItem Parent;
typedef T ValueType;
typedef RawDataT< ValueType > BufferType;
typedef U32 OffsetType;
typedef Stream StreamType;
protected:
/// Buffer keeping/receiving the data elements.
BufferType mBuffer;
/// The stream to read from/write to.
StreamType* mStream;
/// Number of elements to read from/write to the stream.
U32 mNumElements;
/// Offset in "mBuffer" from where to read/where to start writing to.
U32 mOffsetInBuffer;
/// Offset in stream from where to read/where to write to.
/// @note This is only meaningful if the stream is an offset I/O
/// stream. For a stream that is can do both types of I/O,
/// explicit offsets are preferred and this value is used.
OffsetType mOffsetInStream;
///
ValueType* getBufferPtr()
{
return &getBuffer().data[ getOffsetInBuffer() ];
}
public:
///
/// If the stream uses implicit positioning, then the supplied "offsetInStream"
/// is meaningless and ignored.
AsyncIOItem( StreamType* stream, U32 numElements, OffsetType offsetInStream,
ThreadContext* context = 0 )
: Parent( context ),
mStream( stream ),
mNumElements( numElements ),
mOffsetInBuffer( 0 ),
mOffsetInStream( offsetInStream ) {}
/// Construct a read item on "stream" that stores data into the given "buffer".
///
AsyncIOItem( StreamType* stream, BufferType& buffer, U32 offsetInBuffer,
U32 numElements, OffsetType offsetInStream, bool takeOwnershipOfBuffer = true,
ThreadContext* context = 0 )
: Parent( context ),
mBuffer( buffer ),
mStream( stream ),
mNumElements( numElements ),
mOffsetInBuffer( offsetInBuffer ),
mOffsetInStream( offsetInStream )
{
if( takeOwnershipOfBuffer )
mBuffer.ownMemory = true;
}
/// Return the stream being written to/read from.
StreamType* getStream()
{
return mStream;
}
/// Return the data buffer being written to/read from.
/// @note This may not yet have been allocated.
BufferType& getBuffer()
{
return mBuffer;
}
/// Return the number of elements involved in the transfer.
U32 getNumElements()
{
return mNumElements;
}
/// Return the position in the data buffer at which to start the transfer.
U32 getOffsetInBuffer()
{
return mOffsetInBuffer;
}
/// Return the position in the stream at which to start the transfer.
/// @note Only meaningful for streams that support offset I/O.
OffsetType getOffsetInStream()
{
return mOffsetInStream;
}
};
//--------------------------------------------------------------------------
// AsyncReadItem.
//--------------------------------------------------------------------------
//RDTODO: error handling
/// Work item to asynchronously read from a stream.
///
/// The given stream type may implement any of the input stream
/// interfaces. Preference is given to IAsyncInputStream, then to
/// IOffsetInputStream, and only if none of these are implemented
/// IInputStream is used.
///
/// For IAsyncInputStreams, the async read operation is issued immediately
/// on the constructing thread and then picked up on the worker thread.
/// This ensures optimal use of concurrency.
template< typename T, class Stream = IOffsetInputStream< T > >
class AsyncReadItem : public AsyncIOItem< T, Stream >
{
public:
typedef AsyncIOItem< T, Stream > Parent;
typedef typename Parent::StreamType StreamType;
typedef typename Parent::OffsetType OffsetType;
typedef typename Parent::BufferType BufferType;
typedef typename Parent::ValueType ValueType;
/// Construct a read item that reads "numElements" at "offsetInStream"
/// from "stream".
///
/// Since with this constructor no data buffer is supplied, it will be
/// dynamically allocated by the read() method. Note that this only makes
/// sense if this class is subclassed and processing is done on the buffer
/// after it has been read.
///
/// @param stream The stream to read from.
/// @param numElement The number of elements to read from the stream.
/// @param offsetInStream The offset at which to read from the stream;
/// ignored if the stream uses implicit positioning
/// @param context The tread pool context to place the item into.
AsyncReadItem( StreamType* stream, U32 numElements, OffsetType offsetInStream,
ThreadContext* context = 0 )
: Parent( stream, numElements, offsetInStream, context )
{
_prep();
}
AsyncReadItem( StreamType* stream, U32 numElements, OffsetType offsetInStream,
BufferType& buffer, bool takeOwnershipOfBuffer = false,
U32 offsetInBuffer = 0, ThreadContext* context = 0 )
: Parent( stream, buffer, offsetInBuffer, numElements, offsetInStream, takeOwnershipOfBuffer, context )
{
_prep();
}
/// @return The number of elements actually read from the stream.
U32 getNumElementsRead()
{
return mNumElementsRead;
}
protected:
/// Handle of asynchronous stream read, if we are using an async interface.
void* mAsyncHandle;
/// After the read operation has completed, this holds the number of
/// elements actually read from the stream.
U32 mNumElementsRead;
void execute() override;
void _allocBuffer()
{
if( !this->getBuffer().data )
this->getBuffer().alloc( this->getNumElements() );
}
void _prep()
{
IAsyncInputStream< T >* s = dynamic_cast< IAsyncInputStream< T >* >( this->getStream() );
if( s )
{
_allocBuffer();
mAsyncHandle = s->issueReadAt( this->getOffsetInStream(), this->getBufferPtr(), this->getNumElements() );
}
}
// Helper functions to differentiate between stream types.
void _read( IInputStream< T >* stream )
{
mNumElementsRead = stream->read( this->getBufferPtr(), this->getNumElements() );
}
void _read( IOffsetInputStream< T >* stream )
{
mNumElementsRead = stream->readAt( this->getOffsetInStream(), this->getBufferPtr(), this->getNumElements() );
}
void _read( IAsyncInputStream< T >* stream )
{
stream->tryCompleteReadAt( mAsyncHandle, mNumElementsRead, true );
}
};
template< typename T, class Stream >
void AsyncReadItem< T, Stream >::execute()
{
_allocBuffer();
// Read the data. Do a dynamic cast for any of the
// interfaces we prefer.
if( this->cancellationPoint() ) return;
StreamType* stream = this->getStream();
if( dynamic_cast< IAsyncInputStream< T >* >( stream ) )
_read( ( IAsyncInputStream< T >* ) stream );
else if( dynamic_cast< IOffsetInputStream< T >* >( stream ) )
_read( ( IOffsetInputStream< T >* ) stream );
else
_read( stream );
}
//--------------------------------------------------------------------------
// AsyncWriteItem.
//--------------------------------------------------------------------------
/// Work item for writing to an output stream.
///
/// The stream being written to may implement any of the given output stream
/// interfaces. Preference is given to IAsyncOutputStream, then to
/// IOffsetOutputStream, and only if none of these is implemented IOutputStream
/// is used.
///
/// A useful feature is to yield ownership of the data buffer to the
/// write item. This way, this can be pretty much used in a fire-and-forget
/// manner where after submission, no further synchronization happens
/// between the client and the work item.
///
/// @note Be aware that if writing to an output stream that has an implicit
/// position property, multiple concurrent writes will interfere with each other.
template< typename T, class Stream = IOffsetOutputStream< T > >
class AsyncWriteItem : public AsyncIOItem< T, Stream >
{
public:
typedef AsyncIOItem< T, Stream > Parent;
typedef typename Parent::StreamType StreamType;
typedef typename Parent::OffsetType OffsetType;
typedef typename Parent::BufferType BufferType;
typedef typename Parent::ValueType ValueType;
AsyncWriteItem( StreamType* stream, U32 numElements, OffsetType offsetInStream,
BufferType& buffer, bool takeOwnershipOfBuffer = true,
U32 offsetInBuffer = 0, ThreadContext* context = 0 )
: Parent( stream, buffer, offsetInBuffer, numElements, offsetInStream, takeOwnershipOfBuffer, context )
{
_prep( stream );
}
protected:
/// Handle of asynchronous write operation, if the stream implements IAsyncOutputStream.
void* mAsyncHandle;
virtual void execute();
void _prep( StreamType* stream )
{
IAsyncOutputStream< T >* s = dynamic_cast< IAsyncOutputStream< T >* >( stream );
if( s )
mAsyncHandle = s->issueWriteAt( this->getOffset(), this->getBufferPtr(), this->getNumElements() );
}
void _write( IOutputStream< T >* stream )
{
stream->write( this->getBufferPtr(), this->getNumElements() );
}
void _write( IOffsetOutputStream< T >* stream )
{
stream->writeAt( this->getOffsetInStream(), this->getBufferPtr(), this->getNumElements() );
}
void _write( IAsyncOutputStream< T >* stream )
{
stream->tryCompleteWriteAt( mAsyncHandle, true );
}
};
template< typename T, class Stream >
void AsyncWriteItem< T, Stream >::execute()
{
if( this->cancellationPoint() ) return;
StreamType* stream = this->getStream();
if( dynamic_cast< IAsyncOutputStream< T >* >( stream ) )
_write( ( IAsyncOutputStream< T >* ) stream );
if( dynamic_cast< IOffsetOutputStream< T >* >( stream ) )
_write( ( IOffsetOutputStream< T >* ) stream );
else
_write( stream );
}
#endif // _THREADPOOLASYNCIO_H_