From c91b3caa99220a7221caffc4f1ae5b9946c7297a Mon Sep 17 00:00:00 2001 From: Mazo Date: Tue, 16 Apr 2019 14:24:13 +0100 Subject: [PATCH] Subslot error on zoning fix - functionality to handle resending SlottedMetaPackets if the client indicates it is missing one (#252) --- .../src/main/scala/PacketCodingActor.scala | 87 ++++++++++++++++++- 1 file changed, 83 insertions(+), 4 deletions(-) diff --git a/pslogin/src/main/scala/PacketCodingActor.scala b/pslogin/src/main/scala/PacketCodingActor.scala index 2dc2687fc..96e91a9bc 100644 --- a/pslogin/src/main/scala/PacketCodingActor.scala +++ b/pslogin/src/main/scala/PacketCodingActor.scala @@ -1,14 +1,17 @@ // Copyright (c) 2017 PSForever -import akka.actor.{Actor, ActorRef, MDCContextAware} +import akka.actor.{Actor, ActorRef, Cancellable, MDCContextAware} import net.psforever.packet._ import scodec.Attempt.{Failure, Successful} import scodec.bits._ import org.log4s.MDC import MDCContextAware.Implicits._ +import net.psforever.objects.DefaultCancellable import net.psforever.packet.control.{HandleGamePacket, _} import scala.annotation.tailrec import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.duration._ /** * In between the network side and the higher functioning side of the simulation: @@ -39,6 +42,27 @@ class PacketCodingActor extends Actor with MDCContextAware { private var rightRef : ActorRef = ActorRef.noSender private[this] val log = org.log4s.getLogger + /* + Since the client can indicate missing packets when sending SlottedMetaPackets we should keep a history of them to resend to the client when requested with a RelatedA packet + Since the subslot counter can wrap around, we need to use a LinkedHashMap to maintain the order packets are inserted, then we can drop older entries as required + For example when a RelatedB packet arrives we can remove any entries to the left of the received ones without risking removing newer entries if the subslot counter wraps around back to 0 + */ + private var slottedPacketLog : mutable.LinkedHashMap[Int, ByteVector] = mutable.LinkedHashMap() + + // Due to the fact the client can send `RelatedA` packets out of order, we need to keep a buffer of which subslots arrived correctly, order them + // and then act accordingly to send the missing subslot packet after a specified timeout + private var relatedALog : ArrayBuffer[Int] = ArrayBuffer() + private var relatedABufferTimeout : Cancellable = DefaultCancellable.obj + + def AddSlottedPacketToLog(subslot: Int, packet : ByteVector): Unit = { + val log_limit = 100 // Number of SlottedMetaPackets to keep in history + if(slottedPacketLog.size > log_limit) { + slottedPacketLog = slottedPacketLog.drop(slottedPacketLog.size - log_limit) + } + + slottedPacketLog{subslot} = packet + } + override def postStop() = { subslotOutbound = 0 //in case this `Actor` restarts super.postStop() @@ -67,6 +91,41 @@ class PacketCodingActor extends Actor with MDCContextAware { } def Established : Receive = { + case PacketCodingActor.SubslotResend() => { + log.trace("Subslot resend timeout reached") + relatedABufferTimeout.cancel() + log.trace(s"Client indicated successful subslots ${relatedALog.sortBy(x => x).mkString(" ")}") + + // If a non-contiguous range of RelatedA packets were received we may need to send multiple missing packets, thus split the array into contiguous ranges + val sorted_log = relatedALog.sortBy(x => x) + + val split_logs : ArrayBuffer[ArrayBuffer[Int]] = new ArrayBuffer[ArrayBuffer[Int]]() + var curr : ArrayBuffer[Int] = ArrayBuffer() + for(i <- 0 to sorted_log.size - 1) { + if(i == 0 || (sorted_log(i) != sorted_log(i-1)+1)) { + curr = new ArrayBuffer() + split_logs.append(curr) + } + curr.append(sorted_log(i)) + } + + if(split_logs.size > 1) log.trace(s"Split successful subslots into ${split_logs.size} contiguous chunks") + + for (range <- split_logs) { + log.trace(s"Processing chunk ${range.mkString(" ")}") + val first_accepted_subslot = range.min + val missing_subslot = first_accepted_subslot - 1 + slottedPacketLog.get(missing_subslot) match { + case Some(packet: ByteVector) => + log.info(s"Resending packet with subslot: $missing_subslot to session: ${sessionId}") + sendResponseLeft(packet) + case None => + log.error(s"Couldn't find packet with subslot: ${missing_subslot} to resend to session ${sessionId}.") + } + } + + relatedALog.clear() + } case RawPacket(msg) => if(sender == rightRef) { //from LSA, WSA, etc., to network - encode mtuLimit(msg) @@ -175,8 +234,10 @@ class PacketCodingActor extends Actor with MDCContextAware { def handleSplitPacket(data : ByteVector) : Unit = { val lim = PacketCodingActor.MTU_LIMIT_BYTES - 4 //4 bytes is the base size of SlottedMetaPacket data.grouped(lim).foreach(bvec => { - PacketCoding.EncodePacket(SlottedMetaPacket(4, Subslot, bvec)) match { + val subslot = Subslot + PacketCoding.EncodePacket(SlottedMetaPacket(4, subslot, bvec)) match { case Successful(bdata) => + AddSlottedPacketToLog(subslot, bdata.toByteVector) sendResponseLeft(bdata.toByteVector) case f : Failure => log.error(s"$f") @@ -235,8 +296,10 @@ class PacketCodingActor extends Actor with MDCContextAware { * @param data an encoded packet */ def handleBundlePacket(data : ByteVector) : Unit = { - PacketCoding.EncodePacket(SlottedMetaPacket(0, Subslot, data)) match { + val subslot = Subslot + PacketCoding.EncodePacket(SlottedMetaPacket(0, subslot, data)) match { case Successful(bdata) => + AddSlottedPacketToLog(subslot, bdata.toByteVector) sendResponseLeft(bdata.toByteVector) case Failure(e) => log.warn(s"bundling failed on SlottedMetaPacket creation: - $e") @@ -309,11 +372,25 @@ class PacketCodingActor extends Actor with MDCContextAware { packets.foreach { UnmarshalInnerPacket(_, "the inner packet of a MultiPacketEx") } case RelatedA(slot, subslot) => - log.error(s"result $slot: subslot $subslot was in error") + log.trace(s"Client indicated a packet is missing prior to slot: $slot subslot: $subslot") + + relatedALog += subslot + + // (re)start the timeout period, if no more RelatedA packets are sent before the timeout period elapses the missing packet(s) will be resent + import scala.concurrent.ExecutionContext.Implicits.global + relatedABufferTimeout.cancel() + relatedABufferTimeout = context.system.scheduler.scheduleOnce(100 milliseconds, self, PacketCodingActor.SubslotResend()) case RelatedB(slot, subslot) => log.trace(s"result $slot: subslot $subslot accepted") + // The client has indicated it's received up to a certain subslot, that means we can purge the log of any subslots prior to and including the confirmed subslot + // Find where this subslot is stored in the packet log (if at all) and drop anything to the left of it, including itself + val pos = slottedPacketLog.keySet.toArray.indexOf(subslot) + if(pos != -1) { + slottedPacketLog = slottedPacketLog.drop(pos+1) + log.trace(s"Subslots left in log: ${slottedPacketLog.keySet.toString()}") + } case _ => sendResponseRight(container) } @@ -397,4 +474,6 @@ class PacketCodingActor extends Actor with MDCContextAware { object PacketCodingActor { final val MTU_LIMIT_BYTES : Int = 467 + + private final case class SubslotResend() }