Torque3D/Engine/source/platform/async/asyncPacketQueue.h
AzaezelX bd8a72005e uninitialized variables-platform
(cherry picked from commit 36fd324de7a29a8f4bb84b7622ae925acb1d3760)
2020-05-11 15:15:01 -05:00

314 lines
11 KiB
C++

//-----------------------------------------------------------------------------
// Copyright (c) 2012 GarageGames, LLC
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to
// deal in the Software without restriction, including without limitation the
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
// sell copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
// IN THE SOFTWARE.
//-----------------------------------------------------------------------------
#ifndef _ASYNCPACKETQUEUE_H_
#define _ASYNCPACKETQUEUE_H_
#ifndef _TFIXEDSIZEQUEUE_H_
#include "core/util/tFixedSizeDeque.h"
#endif
#ifndef _TSTREAM_H_
#include "core/stream/tStream.h"
#endif
#ifndef _TYPETRAITS_H_
#include "platform/typetraits.h"
#endif
//#define DEBUG_SPEW
/// @file
/// Time-based packet streaming.
///
/// The classes contained in this file can be used for any kind
/// of continuous playback that depends on discrete samplings of
/// a source stream (i.e. any kind of digital media streaming).
//--------------------------------------------------------------------------
// Async packet queue.
//--------------------------------------------------------------------------
/// Time-based packet stream queue.
///
/// AsyncPacketQueue writes data packets to a consumer stream in sync to
/// a tick time source. Outdated packets may optionally be dropped automatically
/// by the queue. A fixed maximum number of packets can reside in the queue
/// concurrently at any one time.
///
/// Be aware that using single item queues for synchronizing to a timer
/// will usually result in bad timing behavior when packet uploading takes
/// any non-trivial amount of time.
///
/// @note While the queue associates a variable tick count with each
/// individual packet, the queue fill status is measured in number of
/// packets rather than in total tick time.
///
/// @param Packet Value type of packets passed through this queue.
/// @param TimeSource Value type for time tick source to which the queue
/// is synchronized.
/// @param Consumer Value type of stream to which the packets are written.
///
template< typename Packet, typename TimeSource = IPositionable< U32 >*, typename Consumer = IOutputStream< Packet >*, typename Tick = U32 >
class AsyncPacketQueue
{
public:
typedef void Parent;
/// The type of data packets being streamed through this queue.
typedef typename TypeTraits< Packet >::BaseType PacketType;
/// The type of consumer that receives the packets from this queue.
typedef typename TypeTraits< Consumer >::BaseType ConsumerType;
///
typedef typename TypeTraits< TimeSource >::BaseType TimeSourceType;
/// Type for counting ticks.
typedef Tick TickType;
protected:
/// Information about the time slice covered by an
/// individual packet currently on the queue.
struct QueuedPacket
{
/// First tick contained in this packet.
TickType mStartTick;
/// First tick *not* contained in this packet anymore.
TickType mEndTick;
QueuedPacket( TickType start, TickType end )
: mStartTick( start ), mEndTick( end ) {}
/// Return the total number of ticks in this packet.
TickType getNumTicks() const
{
return ( mEndTick - mStartTick );
}
};
typedef FixedSizeDeque< QueuedPacket > PacketQueue;
/// If true, packets that have missed their proper queuing timeframe
/// will be dropped. If false, they will be queued nonetheless.
bool mDropPackets;
/// Total number of ticks spanned by the total queue playback time.
/// If this is zero, the total queue time is considered to be infinite.
TickType mTotalTicks;
/// Total number of ticks submitted to the queue so far.
TickType mTotalQueuedTicks;
/// Queue that holds records for each packet currently in the queue. New packets
/// are added to back.
PacketQueue mPacketQueue;
/// The time source to which we are sync'ing.
TimeSource mTimeSource;
/// The output stream that this queue feeds into.
Consumer mConsumer;
/// Total number of packets queued so far.
U32 mTotalQueuedPackets;
public:
/// Construct an AsyncPacketQueue of the given length.
///
/// @param maxQueuedPackets The length of the queue in packets. Only a maximum of
/// 'maxQueuedPackets' packets can be concurrently in the queue at any one time.
/// @param timeSource The tick time source to which the queue synchronizes.
/// @param consumer The output stream that receives the packets in sync to timeSource.
/// @param totalTicks The total number of ticks that will be played back through the
/// queue; if 0, the length is considered indefinite.
/// @param dropPackets Whether the queue should drop outdated packets; if dropped, a
/// packet will not reach the consumer.
AsyncPacketQueue( U32 maxQueuedPackets,
TimeSource timeSource,
Consumer consumer,
TickType totalTicks = 0,
bool dropPackets = false )
: mDropPackets( dropPackets ),
mTotalTicks( totalTicks ),
mTotalQueuedTicks( 0 ),
mPacketQueue( maxQueuedPackets ),
mTimeSource( timeSource ),
mConsumer( consumer )
{
mTotalQueuedPackets = 0;
}
/// Return true if there are currently
bool isEmpty() const { return mPacketQueue.isEmpty(); }
/// Return true if all packets have been streamed.
bool isAtEnd() const;
/// Return true if the queue needs one or more new packets to be submitted.
bool needPacket();
/// Submit a data packet to the queue.
///
/// @param packet The data packet.
/// @param packetTicks The duration of the packet in ticks.
/// @param isLast If true, the packet is the last one in the stream.
/// @param packetPos The absolute position of the packet in the stream; if this is not supplied
/// the packet is assumed to immediately follow the preceding packet.
///
/// @return true if the packet has been queued or false if it has been dropped.
bool submitPacket( Packet packet,
TickType packetTicks,
bool isLast = false,
TickType packetPos = TypeTraits< TickType >::MAX );
/// Return the current playback position according to the time source.
TickType getCurrentTick() const { return Deref( mTimeSource ).getPosition(); }
/// Return the total number of ticks that have been queued so far.
TickType getTotalQueuedTicks() const { return mTotalQueuedTicks; }
/// Return the total number of packets that have been queued so far.
U32 getTotalQueuedPackets() const { return mTotalQueuedPackets; }
};
template< typename Packet, typename TimeSource, typename Consumer, typename Tick >
inline bool AsyncPacketQueue< Packet, TimeSource, Consumer, Tick >::isAtEnd() const
{
// Never at end if infinite.
if( !mTotalTicks )
return false;
// Otherwise, we're at end if we're past the total tick count.
return ( getCurrentTick() >= mTotalTicks
&& ( mDropPackets || mTotalQueuedTicks >= mTotalTicks ) );
}
template< typename Packet, typename TimeSource, typename Consumer, typename Tick >
bool AsyncPacketQueue< Packet, TimeSource, Consumer, Tick >::needPacket()
{
// Never need more packets once we have reached the
// end.
if( isAtEnd() )
return false;
// Always needs packets while the queue is not
// filled up completely.
if( mPacketQueue.capacity() != 0 )
return true;
// Unqueue packets that have expired their playtime.
TickType currentTick = getCurrentTick();
while( mPacketQueue.size() && currentTick >= mPacketQueue.front().mEndTick )
{
#ifdef DEBUG_SPEW
Platform::outputDebugString( "[AsyncPacketQueue] expired packet #%i: %i-%i (tick: %i; queue: %i)",
mTotalQueuedPackets - mPacketQueue.size(),
U32( mPacketQueue.front().mStartTick ),
U32( mPacketQueue.front().mEndTick ),
U32( currentTick ),
mPacketQueue.size() );
#endif
mPacketQueue.popFront();
}
// Need more packets if the queue isn't full anymore.
return ( mPacketQueue.capacity() != 0 );
}
template< typename Packet, typename TimeSource, typename Consumer, typename Tick >
bool AsyncPacketQueue< Packet, TimeSource, Consumer, Tick >::submitPacket( Packet packet, TickType packetTicks, bool isLast, TickType packetPos )
{
AssertFatal( mPacketQueue.capacity() != 0,
"AsyncPacketQueue::submitPacket() - Queue is full!" );
TickType packetStartPos;
TickType packetEndPos;
if( packetPos != TypeTraits< TickType >::MAX )
{
packetStartPos = packetPos;
packetEndPos = packetPos + packetTicks;
}
else
{
packetStartPos = mTotalQueuedTicks;
packetEndPos = mTotalQueuedTicks + packetTicks;
}
// Check whether the packet is outdated, if enabled.
bool dropPacket = false;
if( mDropPackets )
{
TickType currentTick = getCurrentTick();
if( currentTick >= packetEndPos )
dropPacket = true;
}
#ifdef DEBUG_SPEW
Platform::outputDebugString( "[AsyncPacketQueue] new packet #%i: %i-%i (ticks: %i, current: %i, queue: %i)%s",
mTotalQueuedPackets,
U32( mTotalQueuedTicks ),
U32( packetEndPos ),
U32( packetTicks ),
U32( getCurrentTick() ),
mPacketQueue.size(),
dropPacket ? " !! DROPPED !!" : "" );
#endif
// Queue the packet.
if( !dropPacket )
{
mPacketQueue.pushBack( QueuedPacket( packetStartPos, packetEndPos ) );
Deref( mConsumer ).write( &packet, 1 );
}
mTotalQueuedTicks = packetEndPos;
if( isLast && !mTotalTicks )
mTotalTicks = mTotalQueuedTicks;
mTotalQueuedPackets ++;
return !dropPacket;
}
#undef DEBUG_SPEW
#endif // _ASYNCPACKETQUEUE_H_