added utility classes and functionality to compose SlottedMetaPacket0/MultiPacketEx combos

This commit is contained in:
FateJH 2018-03-13 23:00:47 -04:00
parent 68e3377d2e
commit 87d314942a
3 changed files with 214 additions and 5 deletions

View file

@ -0,0 +1,103 @@
// Copyright (c) 2017 PSForever
package net.psforever.packet.control
import net.psforever.packet.PlanetSidePacket
/**
* Message for holding a series of packets being moved through the system (server),
* eventually be bundled into a `MultiPacketEx` and dispatched to the client.
* Invalid packets are eliminated at the time of creation.
* At least one packet is necessary.
* @param packets a series of packets to be bundled together;
* this list is effectively immutable;
* the only way to access these packets is through pattern matching
*/
final case class MultiPacketBundle(private var packets : List[PlanetSidePacket]) {
packets match {
case Nil =>
throw new IllegalArgumentException("can not create with zero packets")
case _ =>
packets = MultiPacketBundle.collectValidPackets(packets)
}
}
object MultiPacketBundle {
/**
* Accept a series of packets of a specific supertype (`PlanetSidePacket`)
* and filter out subtypes that should be excluded.
* Show a generic disclaimer if any packets were filtered.
* Two of the four subclasses of `PlanetSidePacket` are accepted - `PlanetSideGamePacket` and `PlanetSideControlPacket`.
* @param packets a series of packets
* @return the accepted packets from the original group
*/
def collectValidPackets(packets : List[PlanetSidePacket]) : List[PlanetSidePacket] = {
import net.psforever.packet.{PlanetSideGamePacket, PlanetSideControlPacket}
val (good, bad) = packets.partition( {
case _ : PlanetSideGamePacket => true
case _ : PlanetSideControlPacket => true
case _ => false
})
if(bad.nonEmpty) {
org.log4s.getLogger("MultiPacketBundle")
.warn(s"attempted to include packet types that are on the whitelist; ${bad.size} items have been excluded")
}
good
}
}
/**
* Accumulator for packets that will eventually be bundled and submitted for composing a `MultiPacketEx` packet.<br>
* <br>
* The accumulator is intended to be a disposable convenience class to incrementally construct a `MultiPacketBundle`.
* Packets can only be added on top of the existing internal collection and can not be removed.
* (Overloaded methods for adding packets from various sources also exist.)
* Additionally, retrieving a copy of the collection via a `MultiPacketBundle` does not empty the collection.
*/
class MultiPacketCollector() {
private var bundle : List[PlanetSidePacket] = List.empty
def Add(t : PlanetSidePacket) : Unit = Add(List(t))
def Add(t : MultiPacketBundle) : Unit = t match {
case MultiPacketBundle(list) =>
Add(list)
}
def Add(t : List[PlanetSidePacket]) : Unit = {
bundle = bundle ++ t
}
/**
* Retrieve the internal collection of packets.
* @return a loaded `MultiPacketBundle` object
*/
def Bundle : MultiPacketBundle = MultiPacketBundle(bundle)
/**
* A safer `Bundle` that consumes any` Exceptions` that might be thrown in the process of producing output.
* @see `Bundle`
* @return a loaded `MultiPacketBundle` object, or `None`
*/
def BundleOption : Option[MultiPacketBundle] = {
try {
Some(Bundle)
}
catch {
case _ : Exception =>
None
}
}
}
object MultiPacketCollector {
/**
* Overload constructor that accepts a initial packets.
* @param packets a series of packets
* @return a `MultiPacketCollector` object
*/
def apply(packets : List[PlanetSidePacket]) : MultiPacketCollector = {
val obj = new MultiPacketCollector()
obj.Add(packets)
obj
}
}

View file

@ -7,6 +7,9 @@ import org.log4s.MDC
import MDCContextAware.Implicits._
import net.psforever.packet.control._
import scala.annotation.tailrec
import scala.collection.mutable
/**
* In between the network side and the higher functioning side of the simulation:
* accept packets and transform them into a sequence of data (encoding), and
@ -107,19 +110,22 @@ class PacketCodingActor extends Actor with MDCContextAware {
MDC("sessionId") = sessionId.toString
sendResponseRight(game)
}
//bundling packets into a SlottedMetaPacket0/MultiPacketEx
case msg @ MultiPacketBundle(list) =>
log.trace(s"BUNDLE PACKET REQUEST SEND, LEFT (always): $msg")
handleBundlePacket(list)
//etc
case msg =>
log.trace(s"PACKET SEND, LEFT: $msg")
if(sender == rightRef) {
log.trace(s"BASE CASE PACKET SEND, LEFT: $msg")
MDC("sessionId") = sessionId.toString
leftRef !> msg
}
else {
log.trace(s"BASE CASE PACKET SEND, RIGHT: $msg")
MDC("sessionId") = sessionId.toString
rightRef !> msg
}
// case default =>
// failWithError(s"Invalid message '$default' received in state Established")
}
/**
@ -175,8 +181,7 @@ class PacketCodingActor extends Actor with MDCContextAware {
def handleSplitPacket(data : ByteVector) : Unit = {
val lim = PacketCodingActor.MTU_LIMIT_BYTES - 4 //4 bytes is the base size of SlottedMetaPacket
data.grouped(lim).foreach(bvec => {
val pkt = PacketCoding.CreateControlPacket(SlottedMetaPacket(4, Subslot, bvec))
PacketCoding.EncodePacket(pkt.packet) match {
PacketCoding.EncodePacket(SlottedMetaPacket(4, Subslot, bvec)) match {
case Successful(bdata) =>
sendResponseLeft(bdata.toByteVector)
case f : Failure =>
@ -185,6 +190,55 @@ class PacketCodingActor extends Actor with MDCContextAware {
})
}
/**
* Accept a `List` of packets and sequentially re-package the elements from the list into multiple container packets.<br>
* <br>
* The original packets are encoded then paired with their encoding lengths plus extra space to prefix the length.
* Encodings from these pairs are drawn from the list until into buckets that fit a maximum byte stream length.
* The size limitation on any bucket is the mtu limit
* less by the base sizes of `MultiPacketEx` (2) and of `SlottedMetaPacket` (4).
* @param bundle the packets to be bundled
*/
def handleBundlePacket(bundle : List[PlanetSidePacket]) : Unit = {
val packets : List[(ByteVector, Int)] = recursiveEncode(bundle.iterator).map( pkt => {
pkt -> {
val len = pkt.length.toInt
len + (if(len < 256) { 1 } else if(len < 65536) { 2 } else { 4 }) //space for the prefixed length byte(s)
}
})
recursiveFillPacketBuckets(packets.iterator, PacketCodingActor.MTU_LIMIT_BYTES - 6)
.foreach( list => {
handleBundlePacket(list.toVector)
})
}
/**
* Accept a `Vector` of encoded packets and re-package them into a `MultiPacketEx`.
* @param vec a specific number of byte streams
*/
def handleBundlePacket(vec : Vector[ByteVector]) : Unit = {
PacketCoding.EncodePacket(MultiPacketEx(vec)) match {
case Successful(bdata) =>
handleBundlePacket(bdata.toByteVector)
case Failure(e) =>
log.warn(s"bundling failed on MultiPacketEx creation: - $e")
}
}
/**
* Accept `ByteVector` data and package it into a `SlottedMetaPacket`.
* Send it (towards the network) upon successful encoding.
* @param data an encoded packet
*/
def handleBundlePacket(data : ByteVector) : Unit = {
PacketCoding.EncodePacket(SlottedMetaPacket(0, Subslot, data)) match {
case Successful(bdata) =>
sendResponseLeft(bdata.toByteVector)
case Failure(e) =>
log.warn(s"bundling failed on SlottedMetaPacket creation: - $e")
}
}
/**
* Encoded sequence of data going towards the network.
* @param cont the data
@ -270,6 +324,53 @@ class PacketCodingActor extends Actor with MDCContextAware {
MDC("sessionId") = sessionId.toString
rightRef !> cont
}
/** WIP */
@tailrec private def recursiveEncode(iter : Iterator[PlanetSidePacket], out : List[ByteVector] = List()) : List[ByteVector] = {
if(!iter.hasNext) {
out
}
else {
import net.psforever.packet.{PlanetSideControlPacket, PlanetSideGamePacket}
iter.next match {
case msg : PlanetSideGamePacket =>
PacketCoding.EncodePacket(msg) match {
case Successful(bytecode) =>
recursiveEncode(iter, out :+ bytecode.toByteVector)
case Failure(e) =>
log.warn(s"game packet $msg, part of a bundle, did not encode - $e")
recursiveEncode(iter, out)
}
case msg : PlanetSideControlPacket =>
PacketCoding.EncodePacket(msg) match {
case Successful(bytecode) =>
recursiveEncode(iter, out :+ bytecode.toByteVector)
case Failure(e) =>
log.warn(s"control packet $msg, part of a bundle, did not encode - $e")
recursiveEncode(iter, out)
}
case _ =>
recursiveEncode(iter, out)
}
}
}
@tailrec private def recursiveFillPacketBuckets(iter : Iterator[(ByteVector, Int)], lim : Int, currLen : Int = 0, out : List[mutable.ListBuffer[ByteVector]] = List(mutable.ListBuffer())) : List[mutable.ListBuffer[ByteVector]] = {
if(!iter.hasNext) {
out
}
else {
val (data, len) = iter.next
if(currLen + len > lim) {
recursiveFillPacketBuckets(iter, lim, len, out :+ mutable.ListBuffer(data))
}
else {
out.last += data
recursiveFillPacketBuckets(iter, lim, currLen + len, out)
}
}
}
}
object PacketCodingActor {

View file

@ -3019,6 +3019,11 @@ class WorldSessionActor extends Actor with MDCContextAware {
sendResponse(cont.asInstanceOf[Any])
}
def sendResponse(cont : MultiPacketBundle) : Unit = {
log.trace("WORLD SEND: " + cont)
sendResponse(cont.asInstanceOf[Any])
}
def sendResponse(msg : Any) : Unit = {
MDC("sessionId") = sessionId.toString
rightRef !> msg