mirror of
https://github.com/TorqueGameEngines/Torque3D.git
synced 2026-01-20 12:44:46 +00:00
358 lines
12 KiB
C++
358 lines
12 KiB
C++
//-----------------------------------------------------------------------------
|
|
// Copyright (c) 2012 GarageGames, LLC
|
|
//
|
|
// Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
// of this software and associated documentation files (the "Software"), to
|
|
// deal in the Software without restriction, including without limitation the
|
|
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
|
// sell copies of the Software, and to permit persons to whom the Software is
|
|
// furnished to do so, subject to the following conditions:
|
|
//
|
|
// The above copyright notice and this permission notice shall be included in
|
|
// all copies or substantial portions of the Software.
|
|
//
|
|
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
|
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
|
// IN THE SOFTWARE.
|
|
//-----------------------------------------------------------------------------
|
|
|
|
#ifndef _THREADPOOLASYNCIO_H_
|
|
#define _THREADPOOLASYNCIO_H_
|
|
|
|
#ifndef _THREADPOOL_H_
|
|
# include "platform/threads/threadPool.h"
|
|
#endif
|
|
#ifndef _RAWDATA_H_
|
|
# include "core/util/rawData.h"
|
|
#endif
|
|
#ifndef _TSTREAM_H_
|
|
# include "core/stream/tStream.h"
|
|
#endif
|
|
|
|
|
|
//RDTODO: I/O error handling
|
|
|
|
/// @file
|
|
/// Thread pool work items for asynchronous stream I/O.
|
|
/// Through the use of stream filters, this can be basically used for any
|
|
/// type of asynchronous stream processing.
|
|
|
|
//--------------------------------------------------------------------------
|
|
// AsyncIOItem.
|
|
//--------------------------------------------------------------------------
|
|
|
|
/// Abstract superclass of async I/O work items.
|
|
///
|
|
/// Supports both offset-based stream I/O as well as I/O on streams with
|
|
/// implicit positions. Note that if you use the latter type, make sure
|
|
/// that no other thread is messing with the stream at the same time or
|
|
/// chaos will ensue.
|
|
///
|
|
/// @param T Type of elements being streamed.
|
|
template< typename T, class Stream >
|
|
class AsyncIOItem : public ThreadPool::WorkItem
|
|
{
|
|
public:
|
|
|
|
typedef WorkItem Parent;
|
|
typedef T ValueType;
|
|
typedef RawDataT< ValueType > BufferType;
|
|
typedef U32 OffsetType;
|
|
typedef Stream StreamType;
|
|
|
|
protected:
|
|
|
|
/// Buffer keeping/receiving the data elements.
|
|
BufferType mBuffer;
|
|
|
|
/// The stream to read from/write to.
|
|
StreamType* mStream;
|
|
|
|
/// Number of elements to read from/write to the stream.
|
|
U32 mNumElements;
|
|
|
|
/// Offset in "mBuffer" from where to read/where to start writing to.
|
|
U32 mOffsetInBuffer;
|
|
|
|
/// Offset in stream from where to read/where to write to.
|
|
/// @note This is only meaningful if the stream is an offset I/O
|
|
/// stream. For a stream that is can do both types of I/O,
|
|
/// explicit offsets are preferred and this value is used.
|
|
OffsetType mOffsetInStream;
|
|
|
|
///
|
|
ValueType* getBufferPtr()
|
|
{
|
|
return &getBuffer().data[ getOffsetInBuffer() ];
|
|
}
|
|
|
|
public:
|
|
|
|
///
|
|
/// If the stream uses implicit positioning, then the supplied "offsetInStream"
|
|
/// is meaningless and ignored.
|
|
AsyncIOItem( StreamType* stream, U32 numElements, OffsetType offsetInStream,
|
|
ThreadContext* context = 0 )
|
|
: Parent( context ),
|
|
mStream( stream ),
|
|
mNumElements( numElements ),
|
|
mOffsetInBuffer( 0 ),
|
|
mOffsetInStream( offsetInStream ) {}
|
|
|
|
/// Construct a read item on "stream" that stores data into the given "buffer".
|
|
///
|
|
AsyncIOItem( StreamType* stream, BufferType& buffer, U32 offsetInBuffer,
|
|
U32 numElements, OffsetType offsetInStream, bool takeOwnershipOfBuffer = true,
|
|
ThreadContext* context = 0 )
|
|
: Parent( context ),
|
|
mBuffer( buffer ),
|
|
mStream( stream ),
|
|
mNumElements( numElements ),
|
|
mOffsetInBuffer( offsetInBuffer ),
|
|
mOffsetInStream( offsetInStream )
|
|
{
|
|
if( takeOwnershipOfBuffer )
|
|
mBuffer.ownMemory = true;
|
|
}
|
|
|
|
/// Return the stream being written to/read from.
|
|
StreamType* getStream()
|
|
{
|
|
return mStream;
|
|
}
|
|
|
|
/// Return the data buffer being written to/read from.
|
|
/// @note This may not yet have been allocated.
|
|
BufferType& getBuffer()
|
|
{
|
|
return mBuffer;
|
|
|
|
}
|
|
|
|
/// Return the number of elements involved in the transfer.
|
|
U32 getNumElements()
|
|
{
|
|
return mNumElements;
|
|
}
|
|
|
|
/// Return the position in the data buffer at which to start the transfer.
|
|
U32 getOffsetInBuffer()
|
|
{
|
|
return mOffsetInBuffer;
|
|
}
|
|
|
|
/// Return the position in the stream at which to start the transfer.
|
|
/// @note Only meaningful for streams that support offset I/O.
|
|
OffsetType getOffsetInStream()
|
|
{
|
|
return mOffsetInStream;
|
|
}
|
|
};
|
|
|
|
//--------------------------------------------------------------------------
|
|
// AsyncReadItem.
|
|
//--------------------------------------------------------------------------
|
|
|
|
//RDTODO: error handling
|
|
/// Work item to asynchronously read from a stream.
|
|
///
|
|
/// The given stream type may implement any of the input stream
|
|
/// interfaces. Preference is given to IAsyncInputStream, then to
|
|
/// IOffsetInputStream, and only if none of these are implemented
|
|
/// IInputStream is used.
|
|
///
|
|
/// For IAsyncInputStreams, the async read operation is issued immediately
|
|
/// on the constructing thread and then picked up on the worker thread.
|
|
/// This ensures optimal use of concurrency.
|
|
|
|
template< typename T, class Stream = IOffsetInputStream< T > >
|
|
class AsyncReadItem : public AsyncIOItem< T, Stream >
|
|
{
|
|
public:
|
|
|
|
typedef AsyncIOItem< T, Stream > Parent;
|
|
typedef typename Parent::StreamType StreamType;
|
|
typedef typename Parent::OffsetType OffsetType;
|
|
typedef typename Parent::BufferType BufferType;
|
|
typedef typename Parent::ValueType ValueType;
|
|
|
|
/// Construct a read item that reads "numElements" at "offsetInStream"
|
|
/// from "stream".
|
|
///
|
|
/// Since with this constructor no data buffer is supplied, it will be
|
|
/// dynamically allocated by the read() method. Note that this only makes
|
|
/// sense if this class is subclassed and processing is done on the buffer
|
|
/// after it has been read.
|
|
///
|
|
/// @param stream The stream to read from.
|
|
/// @param numElement The number of elements to read from the stream.
|
|
/// @param offsetInStream The offset at which to read from the stream;
|
|
/// ignored if the stream uses implicit positioning
|
|
/// @param context The tread pool context to place the item into.
|
|
AsyncReadItem( StreamType* stream, U32 numElements, OffsetType offsetInStream,
|
|
ThreadContext* context = 0 )
|
|
: Parent( stream, numElements, offsetInStream, context )
|
|
{
|
|
_prep();
|
|
}
|
|
|
|
AsyncReadItem( StreamType* stream, U32 numElements, OffsetType offsetInStream,
|
|
BufferType& buffer, bool takeOwnershipOfBuffer = false,
|
|
U32 offsetInBuffer = 0, ThreadContext* context = 0 )
|
|
: Parent( stream, buffer, offsetInBuffer, numElements, offsetInStream, takeOwnershipOfBuffer, context )
|
|
{
|
|
_prep();
|
|
}
|
|
|
|
/// @return The number of elements actually read from the stream.
|
|
U32 getNumElementsRead()
|
|
{
|
|
return mNumElementsRead;
|
|
}
|
|
|
|
protected:
|
|
|
|
/// Handle of asynchronous stream read, if we are using an async interface.
|
|
void* mAsyncHandle;
|
|
|
|
/// After the read operation has completed, this holds the number of
|
|
/// elements actually read from the stream.
|
|
U32 mNumElementsRead;
|
|
|
|
virtual void execute();
|
|
|
|
void _allocBuffer()
|
|
{
|
|
if( !this->getBuffer().data )
|
|
this->getBuffer().alloc( this->getNumElements() );
|
|
}
|
|
|
|
void _prep()
|
|
{
|
|
IAsyncInputStream< T >* s = dynamic_cast< IAsyncInputStream< T >* >( this->getStream() );
|
|
if( s )
|
|
{
|
|
_allocBuffer();
|
|
mAsyncHandle = s->issueReadAt( this->getOffsetInStream(), this->getBufferPtr(), this->getNumElements() );
|
|
}
|
|
}
|
|
|
|
// Helper functions to differentiate between stream types.
|
|
|
|
void _read( IInputStream< T >* stream )
|
|
{
|
|
mNumElementsRead = stream->read( this->getBufferPtr(), this->getNumElements() );
|
|
}
|
|
void _read( IOffsetInputStream< T >* stream )
|
|
{
|
|
mNumElementsRead = stream->readAt( this->getOffsetInStream(), this->getBufferPtr(), this->getNumElements() );
|
|
}
|
|
void _read( IAsyncInputStream< T >* stream )
|
|
{
|
|
stream->tryCompleteReadAt( mAsyncHandle, mNumElementsRead, true );
|
|
}
|
|
};
|
|
|
|
template< typename T, class Stream >
|
|
void AsyncReadItem< T, Stream >::execute()
|
|
{
|
|
_allocBuffer();
|
|
|
|
// Read the data. Do a dynamic cast for any of the
|
|
// interfaces we prefer.
|
|
|
|
if( this->cancellationPoint() ) return;
|
|
StreamType* stream = this->getStream();
|
|
if( dynamic_cast< IAsyncInputStream< T >* >( stream ) )
|
|
_read( ( IAsyncInputStream< T >* ) stream );
|
|
else if( dynamic_cast< IOffsetInputStream< T >* >( stream ) )
|
|
_read( ( IOffsetInputStream< T >* ) stream );
|
|
else
|
|
_read( stream );
|
|
}
|
|
|
|
//--------------------------------------------------------------------------
|
|
// AsyncWriteItem.
|
|
//--------------------------------------------------------------------------
|
|
|
|
/// Work item for writing to an output stream.
|
|
///
|
|
/// The stream being written to may implement any of the given output stream
|
|
/// interfaces. Preference is given to IAsyncOutputStream, then to
|
|
/// IOffsetOutputStream, and only if none of these is implemented IOutputStream
|
|
/// is used.
|
|
///
|
|
/// A useful feature is to yield ownership of the data buffer to the
|
|
/// write item. This way, this can be pretty much used in a fire-and-forget
|
|
/// manner where after submission, no further synchronization happens
|
|
/// between the client and the work item.
|
|
///
|
|
/// @note Be aware that if writing to an output stream that has an implicit
|
|
/// position property, multiple concurrent writes will interfere with each other.
|
|
template< typename T, class Stream = IOffsetOutputStream< T > >
|
|
class AsyncWriteItem : public AsyncIOItem< T, Stream >
|
|
{
|
|
public:
|
|
|
|
typedef AsyncIOItem< T, Stream > Parent;
|
|
typedef typename Parent::StreamType StreamType;
|
|
typedef typename Parent::OffsetType OffsetType;
|
|
typedef typename Parent::BufferType BufferType;
|
|
typedef typename Parent::ValueType ValueType;
|
|
|
|
AsyncWriteItem( StreamType* stream, U32 numElements, OffsetType offsetInStream,
|
|
BufferType& buffer, bool takeOwnershipOfBuffer = true,
|
|
U32 offsetInBuffer = 0, ThreadContext* context = 0 )
|
|
: Parent( stream, buffer, offsetInBuffer, numElements, offsetInStream, takeOwnershipOfBuffer, context )
|
|
{
|
|
_prep( stream );
|
|
}
|
|
|
|
protected:
|
|
|
|
/// Handle of asynchronous write operation, if the stream implements IAsyncOutputStream.
|
|
void* mAsyncHandle;
|
|
|
|
virtual void execute();
|
|
|
|
void _prep( StreamType* stream )
|
|
{
|
|
IAsyncOutputStream< T >* s = dynamic_cast< IAsyncOutputStream< T >* >( stream );
|
|
if( s )
|
|
mAsyncHandle = s->issueWriteAt( this->getOffset(), this->getBufferPtr(), this->getNumElements() );
|
|
}
|
|
|
|
void _write( IOutputStream< T >* stream )
|
|
{
|
|
stream->write( this->getBufferPtr(), this->getNumElements() );
|
|
}
|
|
void _write( IOffsetOutputStream< T >* stream )
|
|
{
|
|
stream->writeAt( this->getOffsetInStream(), this->getBufferPtr(), this->getNumElements() );
|
|
}
|
|
void _write( IAsyncOutputStream< T >* stream )
|
|
{
|
|
stream->tryCompleteWriteAt( mAsyncHandle, true );
|
|
}
|
|
};
|
|
|
|
template< typename T, class Stream >
|
|
void AsyncWriteItem< T, Stream >::execute()
|
|
{
|
|
if( this->cancellationPoint() ) return;
|
|
|
|
StreamType* stream = this->getStream();
|
|
if( dynamic_cast< IAsyncOutputStream< T >* >( stream ) )
|
|
_write( ( IAsyncOutputStream< T >* ) stream );
|
|
if( dynamic_cast< IOffsetOutputStream< T >* >( stream ) )
|
|
_write( ( IOffsetOutputStream< T >* ) stream );
|
|
else
|
|
_write( stream );
|
|
}
|
|
|
|
#endif // _THREADPOOLASYNCIO_H_
|