diff --git a/common/src/main/scala/net/psforever/packet/control/MultiPacketCollector.scala b/common/src/main/scala/net/psforever/packet/control/MultiPacketCollector.scala
new file mode 100644
index 00000000..c02bf15a
--- /dev/null
+++ b/common/src/main/scala/net/psforever/packet/control/MultiPacketCollector.scala
@@ -0,0 +1,103 @@
+// Copyright (c) 2017 PSForever
+package net.psforever.packet.control
+
+import net.psforever.packet.PlanetSidePacket
+
+/**
+ * Message for holding a series of packets being moved through the system (server),
+ * eventually be bundled into a `MultiPacketEx` and dispatched to the client.
+ * Invalid packets are eliminated at the time of creation.
+ * At least one packet is necessary.
+ * @param packets a series of packets to be bundled together;
+ * this list is effectively immutable;
+ * the only way to access these packets is through pattern matching
+ */
+final case class MultiPacketBundle(private var packets : List[PlanetSidePacket]) {
+ packets match {
+ case Nil =>
+ throw new IllegalArgumentException("can not create with zero packets")
+ case _ =>
+ packets = MultiPacketBundle.collectValidPackets(packets)
+ }
+}
+
+object MultiPacketBundle {
+ /**
+ * Accept a series of packets of a specific supertype (`PlanetSidePacket`)
+ * and filter out subtypes that should be excluded.
+ * Show a generic disclaimer if any packets were filtered.
+ * Two of the four subclasses of `PlanetSidePacket` are accepted - `PlanetSideGamePacket` and `PlanetSideControlPacket`.
+ * @param packets a series of packets
+ * @return the accepted packets from the original group
+ */
+ def collectValidPackets(packets : List[PlanetSidePacket]) : List[PlanetSidePacket] = {
+ import net.psforever.packet.{PlanetSideGamePacket, PlanetSideControlPacket}
+ val (good, bad) = packets.partition( {
+ case _ : PlanetSideGamePacket => true
+ case _ : PlanetSideControlPacket => true
+ case _ => false
+ })
+ if(bad.nonEmpty) {
+ org.log4s.getLogger("MultiPacketBundle")
+ .warn(s"attempted to include packet types that are on the whitelist; ${bad.size} items have been excluded")
+ }
+ good
+ }
+}
+
+/**
+ * Accumulator for packets that will eventually be bundled and submitted for composing a `MultiPacketEx` packet.
+ *
+ * The accumulator is intended to be a disposable convenience class to incrementally construct a `MultiPacketBundle`.
+ * Packets can only be added on top of the existing internal collection and can not be removed.
+ * (Overloaded methods for adding packets from various sources also exist.)
+ * Additionally, retrieving a copy of the collection via a `MultiPacketBundle` does not empty the collection.
+ */
+class MultiPacketCollector() {
+ private var bundle : List[PlanetSidePacket] = List.empty
+
+ def Add(t : PlanetSidePacket) : Unit = Add(List(t))
+
+ def Add(t : MultiPacketBundle) : Unit = t match {
+ case MultiPacketBundle(list) =>
+ Add(list)
+ }
+
+ def Add(t : List[PlanetSidePacket]) : Unit = {
+ bundle = bundle ++ t
+ }
+
+ /**
+ * Retrieve the internal collection of packets.
+ * @return a loaded `MultiPacketBundle` object
+ */
+ def Bundle : MultiPacketBundle = MultiPacketBundle(bundle)
+
+ /**
+ * A safer `Bundle` that consumes any` Exceptions` that might be thrown in the process of producing output.
+ * @see `Bundle`
+ * @return a loaded `MultiPacketBundle` object, or `None`
+ */
+ def BundleOption : Option[MultiPacketBundle] = {
+ try {
+ Some(Bundle)
+ }
+ catch {
+ case _ : Exception =>
+ None
+ }
+ }
+}
+
+object MultiPacketCollector {
+ /**
+ * Overload constructor that accepts a initial packets.
+ * @param packets a series of packets
+ * @return a `MultiPacketCollector` object
+ */
+ def apply(packets : List[PlanetSidePacket]) : MultiPacketCollector = {
+ val obj = new MultiPacketCollector()
+ obj.Add(packets)
+ obj
+ }
+}
diff --git a/pslogin/src/main/scala/PacketCodingActor.scala b/pslogin/src/main/scala/PacketCodingActor.scala
index 42d7671d..45e6ff1d 100644
--- a/pslogin/src/main/scala/PacketCodingActor.scala
+++ b/pslogin/src/main/scala/PacketCodingActor.scala
@@ -7,6 +7,9 @@ import org.log4s.MDC
import MDCContextAware.Implicits._
import net.psforever.packet.control._
+import scala.annotation.tailrec
+import scala.collection.mutable
+
/**
* In between the network side and the higher functioning side of the simulation:
* accept packets and transform them into a sequence of data (encoding), and
@@ -107,19 +110,22 @@ class PacketCodingActor extends Actor with MDCContextAware {
MDC("sessionId") = sessionId.toString
sendResponseRight(game)
}
+ //bundling packets into a SlottedMetaPacket0/MultiPacketEx
+ case msg @ MultiPacketBundle(list) =>
+ log.trace(s"BUNDLE PACKET REQUEST SEND, LEFT (always): $msg")
+ handleBundlePacket(list)
//etc
case msg =>
- log.trace(s"PACKET SEND, LEFT: $msg")
if(sender == rightRef) {
+ log.trace(s"BASE CASE PACKET SEND, LEFT: $msg")
MDC("sessionId") = sessionId.toString
leftRef !> msg
}
else {
+ log.trace(s"BASE CASE PACKET SEND, RIGHT: $msg")
MDC("sessionId") = sessionId.toString
rightRef !> msg
}
-// case default =>
-// failWithError(s"Invalid message '$default' received in state Established")
}
/**
@@ -175,8 +181,7 @@ class PacketCodingActor extends Actor with MDCContextAware {
def handleSplitPacket(data : ByteVector) : Unit = {
val lim = PacketCodingActor.MTU_LIMIT_BYTES - 4 //4 bytes is the base size of SlottedMetaPacket
data.grouped(lim).foreach(bvec => {
- val pkt = PacketCoding.CreateControlPacket(SlottedMetaPacket(4, Subslot, bvec))
- PacketCoding.EncodePacket(pkt.packet) match {
+ PacketCoding.EncodePacket(SlottedMetaPacket(4, Subslot, bvec)) match {
case Successful(bdata) =>
sendResponseLeft(bdata.toByteVector)
case f : Failure =>
@@ -185,6 +190,55 @@ class PacketCodingActor extends Actor with MDCContextAware {
})
}
+ /**
+ * Accept a `List` of packets and sequentially re-package the elements from the list into multiple container packets.
+ *
+ * The original packets are encoded then paired with their encoding lengths plus extra space to prefix the length.
+ * Encodings from these pairs are drawn from the list until into buckets that fit a maximum byte stream length.
+ * The size limitation on any bucket is the mtu limit
+ * less by the base sizes of `MultiPacketEx` (2) and of `SlottedMetaPacket` (4).
+ * @param bundle the packets to be bundled
+ */
+ def handleBundlePacket(bundle : List[PlanetSidePacket]) : Unit = {
+ val packets : List[(ByteVector, Int)] = recursiveEncode(bundle.iterator).map( pkt => {
+ pkt -> {
+ val len = pkt.length.toInt
+ len + (if(len < 256) { 1 } else if(len < 65536) { 2 } else { 4 }) //space for the prefixed length byte(s)
+ }
+ })
+ recursiveFillPacketBuckets(packets.iterator, PacketCodingActor.MTU_LIMIT_BYTES - 6)
+ .foreach( list => {
+ handleBundlePacket(list.toVector)
+ })
+ }
+
+ /**
+ * Accept a `Vector` of encoded packets and re-package them into a `MultiPacketEx`.
+ * @param vec a specific number of byte streams
+ */
+ def handleBundlePacket(vec : Vector[ByteVector]) : Unit = {
+ PacketCoding.EncodePacket(MultiPacketEx(vec)) match {
+ case Successful(bdata) =>
+ handleBundlePacket(bdata.toByteVector)
+ case Failure(e) =>
+ log.warn(s"bundling failed on MultiPacketEx creation: - $e")
+ }
+ }
+
+ /**
+ * Accept `ByteVector` data and package it into a `SlottedMetaPacket`.
+ * Send it (towards the network) upon successful encoding.
+ * @param data an encoded packet
+ */
+ def handleBundlePacket(data : ByteVector) : Unit = {
+ PacketCoding.EncodePacket(SlottedMetaPacket(0, Subslot, data)) match {
+ case Successful(bdata) =>
+ sendResponseLeft(bdata.toByteVector)
+ case Failure(e) =>
+ log.warn(s"bundling failed on SlottedMetaPacket creation: - $e")
+ }
+ }
+
/**
* Encoded sequence of data going towards the network.
* @param cont the data
@@ -270,6 +324,53 @@ class PacketCodingActor extends Actor with MDCContextAware {
MDC("sessionId") = sessionId.toString
rightRef !> cont
}
+
+ /** WIP */
+
+ @tailrec private def recursiveEncode(iter : Iterator[PlanetSidePacket], out : List[ByteVector] = List()) : List[ByteVector] = {
+ if(!iter.hasNext) {
+ out
+ }
+ else {
+ import net.psforever.packet.{PlanetSideControlPacket, PlanetSideGamePacket}
+ iter.next match {
+ case msg : PlanetSideGamePacket =>
+ PacketCoding.EncodePacket(msg) match {
+ case Successful(bytecode) =>
+ recursiveEncode(iter, out :+ bytecode.toByteVector)
+ case Failure(e) =>
+ log.warn(s"game packet $msg, part of a bundle, did not encode - $e")
+ recursiveEncode(iter, out)
+ }
+ case msg : PlanetSideControlPacket =>
+ PacketCoding.EncodePacket(msg) match {
+ case Successful(bytecode) =>
+ recursiveEncode(iter, out :+ bytecode.toByteVector)
+ case Failure(e) =>
+ log.warn(s"control packet $msg, part of a bundle, did not encode - $e")
+ recursiveEncode(iter, out)
+ }
+ case _ =>
+ recursiveEncode(iter, out)
+ }
+ }
+ }
+
+ @tailrec private def recursiveFillPacketBuckets(iter : Iterator[(ByteVector, Int)], lim : Int, currLen : Int = 0, out : List[mutable.ListBuffer[ByteVector]] = List(mutable.ListBuffer())) : List[mutable.ListBuffer[ByteVector]] = {
+ if(!iter.hasNext) {
+ out
+ }
+ else {
+ val (data, len) = iter.next
+ if(currLen + len > lim) {
+ recursiveFillPacketBuckets(iter, lim, len, out :+ mutable.ListBuffer(data))
+ }
+ else {
+ out.last += data
+ recursiveFillPacketBuckets(iter, lim, currLen + len, out)
+ }
+ }
+ }
}
object PacketCodingActor {
diff --git a/pslogin/src/main/scala/WorldSessionActor.scala b/pslogin/src/main/scala/WorldSessionActor.scala
index b19360b8..e0037493 100644
--- a/pslogin/src/main/scala/WorldSessionActor.scala
+++ b/pslogin/src/main/scala/WorldSessionActor.scala
@@ -3019,6 +3019,11 @@ class WorldSessionActor extends Actor with MDCContextAware {
sendResponse(cont.asInstanceOf[Any])
}
+ def sendResponse(cont : MultiPacketBundle) : Unit = {
+ log.trace("WORLD SEND: " + cont)
+ sendResponse(cont.asInstanceOf[Any])
+ }
+
def sendResponse(msg : Any) : Unit = {
MDC("sessionId") = sessionId.toString
rightRef !> msg