From 87d314942a0e64543331edfea979fb68035dd520 Mon Sep 17 00:00:00 2001 From: FateJH Date: Tue, 13 Mar 2018 23:00:47 -0400 Subject: [PATCH] added utility classes and functionality to compose SlottedMetaPacket0/MultiPacketEx combos --- .../packet/control/MultiPacketCollector.scala | 103 ++++++++++++++++ .../src/main/scala/PacketCodingActor.scala | 111 +++++++++++++++++- .../src/main/scala/WorldSessionActor.scala | 5 + 3 files changed, 214 insertions(+), 5 deletions(-) create mode 100644 common/src/main/scala/net/psforever/packet/control/MultiPacketCollector.scala diff --git a/common/src/main/scala/net/psforever/packet/control/MultiPacketCollector.scala b/common/src/main/scala/net/psforever/packet/control/MultiPacketCollector.scala new file mode 100644 index 00000000..c02bf15a --- /dev/null +++ b/common/src/main/scala/net/psforever/packet/control/MultiPacketCollector.scala @@ -0,0 +1,103 @@ +// Copyright (c) 2017 PSForever +package net.psforever.packet.control + +import net.psforever.packet.PlanetSidePacket + +/** + * Message for holding a series of packets being moved through the system (server), + * eventually be bundled into a `MultiPacketEx` and dispatched to the client. + * Invalid packets are eliminated at the time of creation. + * At least one packet is necessary. + * @param packets a series of packets to be bundled together; + * this list is effectively immutable; + * the only way to access these packets is through pattern matching + */ +final case class MultiPacketBundle(private var packets : List[PlanetSidePacket]) { + packets match { + case Nil => + throw new IllegalArgumentException("can not create with zero packets") + case _ => + packets = MultiPacketBundle.collectValidPackets(packets) + } +} + +object MultiPacketBundle { + /** + * Accept a series of packets of a specific supertype (`PlanetSidePacket`) + * and filter out subtypes that should be excluded. + * Show a generic disclaimer if any packets were filtered. + * Two of the four subclasses of `PlanetSidePacket` are accepted - `PlanetSideGamePacket` and `PlanetSideControlPacket`. + * @param packets a series of packets + * @return the accepted packets from the original group + */ + def collectValidPackets(packets : List[PlanetSidePacket]) : List[PlanetSidePacket] = { + import net.psforever.packet.{PlanetSideGamePacket, PlanetSideControlPacket} + val (good, bad) = packets.partition( { + case _ : PlanetSideGamePacket => true + case _ : PlanetSideControlPacket => true + case _ => false + }) + if(bad.nonEmpty) { + org.log4s.getLogger("MultiPacketBundle") + .warn(s"attempted to include packet types that are on the whitelist; ${bad.size} items have been excluded") + } + good + } +} + +/** + * Accumulator for packets that will eventually be bundled and submitted for composing a `MultiPacketEx` packet.
+ *
+ * The accumulator is intended to be a disposable convenience class to incrementally construct a `MultiPacketBundle`. + * Packets can only be added on top of the existing internal collection and can not be removed. + * (Overloaded methods for adding packets from various sources also exist.) + * Additionally, retrieving a copy of the collection via a `MultiPacketBundle` does not empty the collection. + */ +class MultiPacketCollector() { + private var bundle : List[PlanetSidePacket] = List.empty + + def Add(t : PlanetSidePacket) : Unit = Add(List(t)) + + def Add(t : MultiPacketBundle) : Unit = t match { + case MultiPacketBundle(list) => + Add(list) + } + + def Add(t : List[PlanetSidePacket]) : Unit = { + bundle = bundle ++ t + } + + /** + * Retrieve the internal collection of packets. + * @return a loaded `MultiPacketBundle` object + */ + def Bundle : MultiPacketBundle = MultiPacketBundle(bundle) + + /** + * A safer `Bundle` that consumes any` Exceptions` that might be thrown in the process of producing output. + * @see `Bundle` + * @return a loaded `MultiPacketBundle` object, or `None` + */ + def BundleOption : Option[MultiPacketBundle] = { + try { + Some(Bundle) + } + catch { + case _ : Exception => + None + } + } +} + +object MultiPacketCollector { + /** + * Overload constructor that accepts a initial packets. + * @param packets a series of packets + * @return a `MultiPacketCollector` object + */ + def apply(packets : List[PlanetSidePacket]) : MultiPacketCollector = { + val obj = new MultiPacketCollector() + obj.Add(packets) + obj + } +} diff --git a/pslogin/src/main/scala/PacketCodingActor.scala b/pslogin/src/main/scala/PacketCodingActor.scala index 42d7671d..45e6ff1d 100644 --- a/pslogin/src/main/scala/PacketCodingActor.scala +++ b/pslogin/src/main/scala/PacketCodingActor.scala @@ -7,6 +7,9 @@ import org.log4s.MDC import MDCContextAware.Implicits._ import net.psforever.packet.control._ +import scala.annotation.tailrec +import scala.collection.mutable + /** * In between the network side and the higher functioning side of the simulation: * accept packets and transform them into a sequence of data (encoding), and @@ -107,19 +110,22 @@ class PacketCodingActor extends Actor with MDCContextAware { MDC("sessionId") = sessionId.toString sendResponseRight(game) } + //bundling packets into a SlottedMetaPacket0/MultiPacketEx + case msg @ MultiPacketBundle(list) => + log.trace(s"BUNDLE PACKET REQUEST SEND, LEFT (always): $msg") + handleBundlePacket(list) //etc case msg => - log.trace(s"PACKET SEND, LEFT: $msg") if(sender == rightRef) { + log.trace(s"BASE CASE PACKET SEND, LEFT: $msg") MDC("sessionId") = sessionId.toString leftRef !> msg } else { + log.trace(s"BASE CASE PACKET SEND, RIGHT: $msg") MDC("sessionId") = sessionId.toString rightRef !> msg } -// case default => -// failWithError(s"Invalid message '$default' received in state Established") } /** @@ -175,8 +181,7 @@ class PacketCodingActor extends Actor with MDCContextAware { def handleSplitPacket(data : ByteVector) : Unit = { val lim = PacketCodingActor.MTU_LIMIT_BYTES - 4 //4 bytes is the base size of SlottedMetaPacket data.grouped(lim).foreach(bvec => { - val pkt = PacketCoding.CreateControlPacket(SlottedMetaPacket(4, Subslot, bvec)) - PacketCoding.EncodePacket(pkt.packet) match { + PacketCoding.EncodePacket(SlottedMetaPacket(4, Subslot, bvec)) match { case Successful(bdata) => sendResponseLeft(bdata.toByteVector) case f : Failure => @@ -185,6 +190,55 @@ class PacketCodingActor extends Actor with MDCContextAware { }) } + /** + * Accept a `List` of packets and sequentially re-package the elements from the list into multiple container packets.
+ *
+ * The original packets are encoded then paired with their encoding lengths plus extra space to prefix the length. + * Encodings from these pairs are drawn from the list until into buckets that fit a maximum byte stream length. + * The size limitation on any bucket is the mtu limit + * less by the base sizes of `MultiPacketEx` (2) and of `SlottedMetaPacket` (4). + * @param bundle the packets to be bundled + */ + def handleBundlePacket(bundle : List[PlanetSidePacket]) : Unit = { + val packets : List[(ByteVector, Int)] = recursiveEncode(bundle.iterator).map( pkt => { + pkt -> { + val len = pkt.length.toInt + len + (if(len < 256) { 1 } else if(len < 65536) { 2 } else { 4 }) //space for the prefixed length byte(s) + } + }) + recursiveFillPacketBuckets(packets.iterator, PacketCodingActor.MTU_LIMIT_BYTES - 6) + .foreach( list => { + handleBundlePacket(list.toVector) + }) + } + + /** + * Accept a `Vector` of encoded packets and re-package them into a `MultiPacketEx`. + * @param vec a specific number of byte streams + */ + def handleBundlePacket(vec : Vector[ByteVector]) : Unit = { + PacketCoding.EncodePacket(MultiPacketEx(vec)) match { + case Successful(bdata) => + handleBundlePacket(bdata.toByteVector) + case Failure(e) => + log.warn(s"bundling failed on MultiPacketEx creation: - $e") + } + } + + /** + * Accept `ByteVector` data and package it into a `SlottedMetaPacket`. + * Send it (towards the network) upon successful encoding. + * @param data an encoded packet + */ + def handleBundlePacket(data : ByteVector) : Unit = { + PacketCoding.EncodePacket(SlottedMetaPacket(0, Subslot, data)) match { + case Successful(bdata) => + sendResponseLeft(bdata.toByteVector) + case Failure(e) => + log.warn(s"bundling failed on SlottedMetaPacket creation: - $e") + } + } + /** * Encoded sequence of data going towards the network. * @param cont the data @@ -270,6 +324,53 @@ class PacketCodingActor extends Actor with MDCContextAware { MDC("sessionId") = sessionId.toString rightRef !> cont } + + /** WIP */ + + @tailrec private def recursiveEncode(iter : Iterator[PlanetSidePacket], out : List[ByteVector] = List()) : List[ByteVector] = { + if(!iter.hasNext) { + out + } + else { + import net.psforever.packet.{PlanetSideControlPacket, PlanetSideGamePacket} + iter.next match { + case msg : PlanetSideGamePacket => + PacketCoding.EncodePacket(msg) match { + case Successful(bytecode) => + recursiveEncode(iter, out :+ bytecode.toByteVector) + case Failure(e) => + log.warn(s"game packet $msg, part of a bundle, did not encode - $e") + recursiveEncode(iter, out) + } + case msg : PlanetSideControlPacket => + PacketCoding.EncodePacket(msg) match { + case Successful(bytecode) => + recursiveEncode(iter, out :+ bytecode.toByteVector) + case Failure(e) => + log.warn(s"control packet $msg, part of a bundle, did not encode - $e") + recursiveEncode(iter, out) + } + case _ => + recursiveEncode(iter, out) + } + } + } + + @tailrec private def recursiveFillPacketBuckets(iter : Iterator[(ByteVector, Int)], lim : Int, currLen : Int = 0, out : List[mutable.ListBuffer[ByteVector]] = List(mutable.ListBuffer())) : List[mutable.ListBuffer[ByteVector]] = { + if(!iter.hasNext) { + out + } + else { + val (data, len) = iter.next + if(currLen + len > lim) { + recursiveFillPacketBuckets(iter, lim, len, out :+ mutable.ListBuffer(data)) + } + else { + out.last += data + recursiveFillPacketBuckets(iter, lim, currLen + len, out) + } + } + } } object PacketCodingActor { diff --git a/pslogin/src/main/scala/WorldSessionActor.scala b/pslogin/src/main/scala/WorldSessionActor.scala index b19360b8..e0037493 100644 --- a/pslogin/src/main/scala/WorldSessionActor.scala +++ b/pslogin/src/main/scala/WorldSessionActor.scala @@ -3019,6 +3019,11 @@ class WorldSessionActor extends Actor with MDCContextAware { sendResponse(cont.asInstanceOf[Any]) } + def sendResponse(cont : MultiPacketBundle) : Unit = { + log.trace("WORLD SEND: " + cont) + sendResponse(cont.asInstanceOf[Any]) + } + def sendResponse(msg : Any) : Unit = { MDC("sessionId") = sessionId.toString rightRef !> msg