Subslot error on zoning fix - functionality to handle resending SlottedMetaPackets if the client indicates it is missing one (#252)

This commit is contained in:
Mazo 2019-04-16 14:24:13 +01:00 committed by Fate-JH
parent 473d4d14c5
commit c91b3caa99

View file

@ -1,14 +1,17 @@
// Copyright (c) 2017 PSForever
import akka.actor.{Actor, ActorRef, MDCContextAware}
import akka.actor.{Actor, ActorRef, Cancellable, MDCContextAware}
import net.psforever.packet._
import scodec.Attempt.{Failure, Successful}
import scodec.bits._
import org.log4s.MDC
import MDCContextAware.Implicits._
import net.psforever.objects.DefaultCancellable
import net.psforever.packet.control.{HandleGamePacket, _}
import scala.annotation.tailrec
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
/**
* In between the network side and the higher functioning side of the simulation:
@ -39,6 +42,27 @@ class PacketCodingActor extends Actor with MDCContextAware {
private var rightRef : ActorRef = ActorRef.noSender
private[this] val log = org.log4s.getLogger
/*
Since the client can indicate missing packets when sending SlottedMetaPackets we should keep a history of them to resend to the client when requested with a RelatedA packet
Since the subslot counter can wrap around, we need to use a LinkedHashMap to maintain the order packets are inserted, then we can drop older entries as required
For example when a RelatedB packet arrives we can remove any entries to the left of the received ones without risking removing newer entries if the subslot counter wraps around back to 0
*/
private var slottedPacketLog : mutable.LinkedHashMap[Int, ByteVector] = mutable.LinkedHashMap()
// Due to the fact the client can send `RelatedA` packets out of order, we need to keep a buffer of which subslots arrived correctly, order them
// and then act accordingly to send the missing subslot packet after a specified timeout
private var relatedALog : ArrayBuffer[Int] = ArrayBuffer()
private var relatedABufferTimeout : Cancellable = DefaultCancellable.obj
def AddSlottedPacketToLog(subslot: Int, packet : ByteVector): Unit = {
val log_limit = 100 // Number of SlottedMetaPackets to keep in history
if(slottedPacketLog.size > log_limit) {
slottedPacketLog = slottedPacketLog.drop(slottedPacketLog.size - log_limit)
}
slottedPacketLog{subslot} = packet
}
override def postStop() = {
subslotOutbound = 0 //in case this `Actor` restarts
super.postStop()
@ -67,6 +91,41 @@ class PacketCodingActor extends Actor with MDCContextAware {
}
def Established : Receive = {
case PacketCodingActor.SubslotResend() => {
log.trace("Subslot resend timeout reached")
relatedABufferTimeout.cancel()
log.trace(s"Client indicated successful subslots ${relatedALog.sortBy(x => x).mkString(" ")}")
// If a non-contiguous range of RelatedA packets were received we may need to send multiple missing packets, thus split the array into contiguous ranges
val sorted_log = relatedALog.sortBy(x => x)
val split_logs : ArrayBuffer[ArrayBuffer[Int]] = new ArrayBuffer[ArrayBuffer[Int]]()
var curr : ArrayBuffer[Int] = ArrayBuffer()
for(i <- 0 to sorted_log.size - 1) {
if(i == 0 || (sorted_log(i) != sorted_log(i-1)+1)) {
curr = new ArrayBuffer()
split_logs.append(curr)
}
curr.append(sorted_log(i))
}
if(split_logs.size > 1) log.trace(s"Split successful subslots into ${split_logs.size} contiguous chunks")
for (range <- split_logs) {
log.trace(s"Processing chunk ${range.mkString(" ")}")
val first_accepted_subslot = range.min
val missing_subslot = first_accepted_subslot - 1
slottedPacketLog.get(missing_subslot) match {
case Some(packet: ByteVector) =>
log.info(s"Resending packet with subslot: $missing_subslot to session: ${sessionId}")
sendResponseLeft(packet)
case None =>
log.error(s"Couldn't find packet with subslot: ${missing_subslot} to resend to session ${sessionId}.")
}
}
relatedALog.clear()
}
case RawPacket(msg) =>
if(sender == rightRef) { //from LSA, WSA, etc., to network - encode
mtuLimit(msg)
@ -175,8 +234,10 @@ class PacketCodingActor extends Actor with MDCContextAware {
def handleSplitPacket(data : ByteVector) : Unit = {
val lim = PacketCodingActor.MTU_LIMIT_BYTES - 4 //4 bytes is the base size of SlottedMetaPacket
data.grouped(lim).foreach(bvec => {
PacketCoding.EncodePacket(SlottedMetaPacket(4, Subslot, bvec)) match {
val subslot = Subslot
PacketCoding.EncodePacket(SlottedMetaPacket(4, subslot, bvec)) match {
case Successful(bdata) =>
AddSlottedPacketToLog(subslot, bdata.toByteVector)
sendResponseLeft(bdata.toByteVector)
case f : Failure =>
log.error(s"$f")
@ -235,8 +296,10 @@ class PacketCodingActor extends Actor with MDCContextAware {
* @param data an encoded packet
*/
def handleBundlePacket(data : ByteVector) : Unit = {
PacketCoding.EncodePacket(SlottedMetaPacket(0, Subslot, data)) match {
val subslot = Subslot
PacketCoding.EncodePacket(SlottedMetaPacket(0, subslot, data)) match {
case Successful(bdata) =>
AddSlottedPacketToLog(subslot, bdata.toByteVector)
sendResponseLeft(bdata.toByteVector)
case Failure(e) =>
log.warn(s"bundling failed on SlottedMetaPacket creation: - $e")
@ -309,11 +372,25 @@ class PacketCodingActor extends Actor with MDCContextAware {
packets.foreach { UnmarshalInnerPacket(_, "the inner packet of a MultiPacketEx") }
case RelatedA(slot, subslot) =>
log.error(s"result $slot: subslot $subslot was in error")
log.trace(s"Client indicated a packet is missing prior to slot: $slot subslot: $subslot")
relatedALog += subslot
// (re)start the timeout period, if no more RelatedA packets are sent before the timeout period elapses the missing packet(s) will be resent
import scala.concurrent.ExecutionContext.Implicits.global
relatedABufferTimeout.cancel()
relatedABufferTimeout = context.system.scheduler.scheduleOnce(100 milliseconds, self, PacketCodingActor.SubslotResend())
case RelatedB(slot, subslot) =>
log.trace(s"result $slot: subslot $subslot accepted")
// The client has indicated it's received up to a certain subslot, that means we can purge the log of any subslots prior to and including the confirmed subslot
// Find where this subslot is stored in the packet log (if at all) and drop anything to the left of it, including itself
val pos = slottedPacketLog.keySet.toArray.indexOf(subslot)
if(pos != -1) {
slottedPacketLog = slottedPacketLog.drop(pos+1)
log.trace(s"Subslots left in log: ${slottedPacketLog.keySet.toString()}")
}
case _ =>
sendResponseRight(container)
}
@ -397,4 +474,6 @@ class PacketCodingActor extends Actor with MDCContextAware {
object PacketCodingActor {
final val MTU_LIMIT_BYTES : Int = 467
private final case class SubslotResend()
}