* make the timer for outbound packet bundling a fallback option rather than a primary control for outbound timing

* better emergency timer determination

* eh

* master merge and light changes
This commit is contained in:
Fate-JH 2023-07-26 23:07:24 -04:00 committed by GitHub
parent e391a0b3ef
commit 2b6edc25fb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 200 additions and 117 deletions

View file

@ -220,7 +220,8 @@ game {
# while additional entries with insufficient time spacing may result in no change in behavior. # while additional entries with insufficient time spacing may result in no change in behavior.
delays = [350, 600, 800] delays = [350, 600, 800]
} }
# Don't ask.
doors-can-be-opened-by-med-app-from-this-distance = 5.05 doors-can-be-opened-by-med-app-from-this-distance = 5.05
} }
@ -234,7 +235,12 @@ anti-cheat {
network { network {
middleware { middleware {
# How often between executions of the outbound bundling process # How often between executions of the outbound bundling process
packet-bundling-delay = 50 milliseconds # Used for outbound packet arrival triggers
packet-bundling-delay = 40 milliseconds
# How often between executions of the outbound bundling process
# Affects the base value on the timer
packet-bundling-delay-multiplier = 1.25
# Pause inbound packet transmission towards the network if the sequence number is out of order # Pause inbound packet transmission towards the network if the sequence number is out of order
# Packets are put aside until the sequence is restored, or this timeout passes # Packets are put aside until the sequence is restored, or this timeout passes

View file

@ -40,7 +40,7 @@ import net.psforever.util.{Config, DiffieHellman, Md5Mac}
* Various subroutines are used to keep track of the predicted offending packets.<br> * Various subroutines are used to keep track of the predicted offending packets.<br>
* <br> * <br>
* The cryptographic aspect of the service resolves itself first, * The cryptographic aspect of the service resolves itself first,
* exchanging keys and passcodes and challeneges between local and the network. * exchanging keys and passcodes and challenges between local and the network.
* Most of this process occurs without any prior cryptographic setup, * Most of this process occurs without any prior cryptographic setup,
* relying on the validation of its own exchange to operate. * relying on the validation of its own exchange to operate.
* Afterwards its completion, * Afterwards its completion,
@ -58,7 +58,7 @@ import net.psforever.util.{Config, DiffieHellman, Md5Mac}
* and will respond to those requests if possible by searching its catalog.<br> * and will respond to those requests if possible by searching its catalog.<br>
* <br> * <br>
* If the inbound packets do not arrive correctly the first time, * If the inbound packets do not arrive correctly the first time,
* after a while, requests for identified mising packets from the network source will occur. * after a while, requests for identified missing packets from the network source will occur.
* Virtually all important packets have a sequence number that bestows on each packet an absolute delivery order. * Virtually all important packets have a sequence number that bestows on each packet an absolute delivery order.
* Bundled packets have a subslot number that indicates the total number of subslot packets dispatched to this client * Bundled packets have a subslot number that indicates the total number of subslot packets dispatched to this client
* as well as the order in which the packets should have been received. * as well as the order in which the packets should have been received.
@ -66,14 +66,13 @@ import net.psforever.util.{Config, DiffieHellman, Md5Mac}
* Should the expected packets show up out of order before the buffered is cleared, * Should the expected packets show up out of order before the buffered is cleared,
* everything sorts itself out. * everything sorts itself out.
* Unresolved missing sequence entries will often lead to requests for missing packets with anticipated subslots. * Unresolved missing sequence entries will often lead to requests for missing packets with anticipated subslots.
* If these requests do not resolve, there is unfortuantely not much that can be done except grin and bear with it. * If these requests do not resolve, there is unfortunately not much that can be done except grin and bear with it.
*/ */
object MiddlewareActor { object MiddlewareActor {
Security.addProvider(new BouncyCastleProvider) Security.addProvider(new BouncyCastleProvider)
/** Maximum packet size in bytes */ /** Maximum packet size in bytes */
//final val MTU: Int = 467 final val MTU: Int = 440 //467
final val MTU: Int = 440
def apply( def apply(
socket: ActorRef[Udp.Command], socket: ActorRef[Udp.Command],
@ -98,7 +97,7 @@ object MiddlewareActor {
def apply(msg: ByteVector): Raw = Raw(msg.toBitVector, exclusive = false) def apply(msg: ByteVector): Raw = Raw(msg.toBitVector, exclusive = false)
def apply(msg: ByteVector, exclusive: Boolean): Raw = Raw(msg, exclusive) def apply(msg: ByteVector, exclusive: Boolean): Raw = Raw(msg.toBitVector, exclusive)
} }
/** Teardown connection */ /** Teardown connection */
@ -120,22 +119,23 @@ object MiddlewareActor {
* All packets are bundled by themselves. * All packets are bundled by themselves.
* May as well just waste all of the cycles on your CPU, eh? * May as well just waste all of the cycles on your CPU, eh?
*/ */
def allPacketGuard(packet: PlanetSidePacket): Boolean = true //noinspection ScalaUnusedSymbol
private def allPacketGuard(packet: PlanetSidePacket): Boolean = true
/** /**
* `CharacterInfoMessage` packets are bundled by themselves.<br> * `CharacterInfoMessage` packets are bundled by themselves.<br>
* <br> * <br>
* Super awkward special case. * Super awkward special case.
* Bundling `CharacterInfoMessage` with its corresponding `ObjectCreateDetailedMesssage`, * Bundling `CharacterInfoMessage` with its corresponding `ObjectCreateDetailedMessage`,
* which can occur during otherwise careless execution of the character select screen, * which can occur during otherwise careless execution of the character select screen,
* causes the character options to show blank slots and be unusable. * causes the character options to show blank slots and be unusable.
*/ */
def characterInfoMessageGuard(packet: PlanetSidePacket): Boolean = { private def characterInfoMessageGuard(packet: PlanetSidePacket): Boolean = {
packet.isInstanceOf[CharacterInfoMessage] packet.isInstanceOf[CharacterInfoMessage]
} }
/** `KeepAliveMessage` packets are bundled by themselves. They're special. */ /** `KeepAliveMessage` packets are bundled by themselves. They're special. */
def keepAliveMessageGuard(packet: PlanetSidePacket): Boolean = { private def keepAliveMessageGuard(packet: PlanetSidePacket): Boolean = {
packet.isInstanceOf[KeepAliveMessage] packet.isInstanceOf[KeepAliveMessage]
} }
@ -162,42 +162,42 @@ class MiddlewareActor(
import MiddlewareActor._ import MiddlewareActor._
implicit val ec: ExecutionContextExecutor = context.executionContext private implicit val ec: ExecutionContextExecutor = context.executionContext
implicit val executor: ExecutionContext = context.executionContext private implicit val executor: ExecutionContext = context.executionContext
private[this] val log = org.log4s.getLogger private[this] val log = org.log4s.getLogger
var clientNonce: Long = 0 private var clientNonce: Long = 0
var serverMACBuffer: ByteVector = ByteVector.empty private var serverMACBuffer: ByteVector = ByteVector.empty
val random = new SecureRandom() private val random = new SecureRandom()
var crypto: Option[CryptoCoding] = None private var crypto: Option[CryptoCoding] = None
val nextActor: ActorRef[PlanetSidePacket] = private val nextActor: ActorRef[PlanetSidePacket] =
context.spawnAnonymous(next(context.self, sender, connectionId), ActorTags(s"id=$connectionId")) context.spawnAnonymous(next(context.self, sender, connectionId), ActorTags(s"id=$connectionId"))
/** Queue of incoming packets (plus sequence numbers and timestamps) that arrived in the wrong order */ /** Queue of incoming packets (plus sequence numbers and timestamps) that arrived in the wrong order */
private val inReorderQueue: mutable.Queue[InReorderEntry] = mutable.Queue() private val inReorderQueue: mutable.Queue[InReorderEntry] = mutable.Queue()
/** Latest incoming sequence number */ /** Latest incoming sequence number */
var inSequence = -1 private var inSequence: Int = -1
/** Latest incoming subslot number */ /** Latest incoming subslot number */
var inSubslot = -1 private var inSubslot: Int = -1
/** List of missing subslot numbers and attempts counter */ /** List of missing subslot numbers and attempts counter */
val inSubslotsMissing: mutable.Map[Int, Int] = TrieMap() private val inSubslotsMissing: mutable.Map[Int, Int] = TrieMap()
/** Queue of outgoing packets used for bundling and splitting */ /** Queue of outgoing packets used for bundling and splitting */
val outQueue: mutable.Queue[(PlanetSidePacket, BitVector)] = mutable.Queue() private val outQueue: mutable.Queue[(PlanetSidePacket, BitVector)] = mutable.Queue()
/** Queue of outgoing packets ready for sending */ /** Queue of outgoing packets ready for sending */
val outQueueBundled: mutable.Queue[PlanetSidePacket] = mutable.Queue() private val outQueueBundled: mutable.Queue[PlanetSidePacket] = mutable.Queue()
/** Latest outbound sequence number */ /** Latest outbound sequence number */
var outSequence = -1 private var outSequence: Int = -1
/** /**
* Increment the outbound sequence number. * Increment the outbound sequence number.
@ -205,33 +205,33 @@ class MiddlewareActor(
* The fidelity of the sequence field in packets is 16 bits, so wrap back to 0 after 65535. * The fidelity of the sequence field in packets is 16 bits, so wrap back to 0 after 65535.
* @return * @return
*/ */
def nextSequence: Int = { private def nextSequence: Int = {
if (outSequence >= 0xffff) { if (outSequence >= 0xffff) {
// TODO resetting the sequence to 0 causes a client crash // TODO resetting the sequence to 0 causes a client crash
// but that does not happen when we always send the same number // but that does not happen when we always send the same number
// the solution is most likely to send the proper ResetSequence payload // the solution is most likely to send the proper ResetSequence payload
// send(ResetSequence(), None, crypto) // send(ResetSequence(), None, crypto)
// outSequence = -1 // outSequence = -1
// return nextSequence // return nextSequence
return outSequence outSequence
} else {
outSequence += 1
outSequence
} }
outSequence += 1
outSequence
} }
/** Latest outbound subslot number; /** Latest outbound subslot number;
* the current subslot is one less than this number * the current subslot is one less than this number
*/ */
var outSubslot = 0 private var outSubslot = 0
/** /**
* Increment the outbound subslot number. * Increment the outbound subslot number.
* The previous subslot number is returned. * The previous subslot number is returned.
* The fidelity of the subslot field in `SlottedMetapacket`'s is 16 bits, so wrap back to 0 after 65535. * The fidelity of the subslot field in a `SlottedMetaPacket` is 16 bits, so wrap back to 0 after 65535.
* @return * @return
*/ */
def nextSubslot: Int = { private def nextSubslot: Int = {
val r = outSubslot val r = outSubslot
if (outSubslot == 0xffff) { if (outSubslot == 0xffff) {
outSubslot = 0 outSubslot = 0
@ -244,12 +244,12 @@ class MiddlewareActor(
/** /**
* Do not bundle these packets together with other packets * Do not bundle these packets together with other packets
*/ */
val packetsBundledByThemselves: List[PlanetSidePacket => Boolean] = List( private val packetsBundledByThemselves: List[PlanetSidePacket => Boolean] = List(
MiddlewareActor.keepAliveMessageGuard, MiddlewareActor.keepAliveMessageGuard,
MiddlewareActor.characterInfoMessageGuard MiddlewareActor.characterInfoMessageGuard
) )
val smpHistoryLength: Int = 100 private val smpHistoryLength: Int = 100
/** History of created `SlottedMetaPacket`s. /** History of created `SlottedMetaPacket`s.
* In case the client does not register receiving a packet by checking against packet subslot index numbers, * In case the client does not register receiving a packet by checking against packet subslot index numbers,
@ -259,35 +259,40 @@ class MiddlewareActor(
* All packets with subslots less than that number have been received or will no longer be requested. * All packets with subslots less than that number have been received or will no longer be requested.
* The client and server supposedly maintain reciprocating mechanisms. * The client and server supposedly maintain reciprocating mechanisms.
*/ */
val preparedSlottedMetaPackets: Array[SlottedMetaPacket] = new Array[SlottedMetaPacket](smpHistoryLength) private val preparedSlottedMetaPackets: Array[SlottedMetaPacket] = new Array[SlottedMetaPacket](smpHistoryLength)
var nextSmpIndex: Int = 0 private var nextSmpIndex: Int = 0
var acceptedSmpSubslot: Int = 0 private var acceptedSmpSubslot: Int = 0
/** end of life stat */ /** end of life stat */
var timesInReorderQueue: Int = 0 private var timesInReorderQueue: Int = 0
/** end of life stat */ /** end of life stat */
var timesSubslotMissing: Int = 0 private var timesSubslotMissing: Int = 0
/** Delay for testing outbound packet evaluation (ms) */
private val packetOutboundDelay: Long = Config.app.network.middleware.packetBundlingDelay.toMillis
/** Delay between runs of the packet bundler/resolver timer (ms); /** Delay between runs of the packet bundler/resolver timer (ms);
* 250ms per network update (client upstream), so 10 runs of this bundling code every update * this is the same as `packetOutboundDelay` increased by the multiplier */
*/ private val packetProcessorDelay: FiniteDuration = Duration.apply(
val packetProcessorDelay = Config.app.network.middleware.packetBundlingDelay math.abs(packetOutboundDelay * Config.app.network.middleware.packetBundlingDelayMultiplier).toLong,
"milliseconds"
)
/** Timer that handles the bundling and throttling of outgoing packets and resolves disorganized inbound packets */ /** Timer that handles the bundling and throttling of outgoing packets and resolves disorganized inbound packets */
var packetProcessor: Cancellable = Default.Cancellable private var packetProcessor: Cancellable = Default.Cancellable
/** how long packets that are out of sequential order wait for the missing sequence before being expedited (ms) */ /** how long packets that are out of sequential order wait for the missing sequence before being expedited (ms) */
val inReorderTimeout = Config.app.network.middleware.inReorderTimeout private val inReorderTimeout: FiniteDuration = Config.app.network.middleware.inReorderTimeout
/** Timer that handles the bundling and throttling of outgoing packets requesting packets with known subslot numbers */ /** Timer that handles the bundling and throttling of outgoing packets requesting packets with known subslot numbers */
var subslotMissingProcessor: Cancellable = Default.Cancellable private var subslotMissingProcessor: Cancellable = Default.Cancellable
/** how long to wait between repeated requests for packets with known missing subslot numbers (ms) */ /** how long to wait between repeated requests for packets with known missing subslot numbers (ms) */
val inSubslotMissingDelay = Config.app.network.middleware.inSubslotMissingDelay private val inSubslotMissingDelay: FiniteDuration = Config.app.network.middleware.inSubslotMissingDelay
/** how many time to repeat the request for a packet with a known missing subslot number */ /** how many time to repeat the request for a packet with a known missing subslot number */
val inSubslotMissingNumberOfAttempts = Config.app.network.middleware.inSubslotMissingAttempts private val inSubslotMissingNumberOfAttempts: Int = Config.app.network.middleware.inSubslotMissingAttempts
//formerly, CryptoSessionActor //formerly, CryptoSessionActor
@ -308,7 +313,7 @@ class MiddlewareActor(
Unknown30 is used to reuse an existing crypto session when switching from login to world Unknown30 is used to reuse an existing crypto session when switching from login to world
When not handling it, it appears that the client will fall back to using ClientStart When not handling it, it appears that the client will fall back to using ClientStart
Do we need to implement this? Do we need to implement this?
*/ */
connectionClose() connectionClose()
case (ConnectionClose(), _) => case (ConnectionClose(), _) =>
@ -318,7 +323,6 @@ class MiddlewareActor(
*/ */
Behaviors.same Behaviors.same
// TODO ResetSequence
case _ => case _ =>
log.warn(s"Unexpected packet type $packet in start (before crypto)") log.warn(s"Unexpected packet type $packet in start (before crypto)")
Behaviors.same Behaviors.same
@ -357,7 +361,7 @@ class MiddlewareActor(
} }
def cryptoSetup(): Behavior[Command] = { private def cryptoSetup(): Behavior[Command] = {
Behaviors Behaviors
.receiveMessagePartial[Command] { .receiveMessagePartial[Command] {
case Receive(msg) => case Receive(msg) =>
@ -369,7 +373,7 @@ class MiddlewareActor(
val dh = DiffieHellman(p.toArray, g.toArray) val dh = DiffieHellman(p.toArray, g.toArray)
val clientChallenge = ServerChallengeXchg.getCompleteChallenge(time, challenge) val clientChallenge = ServerChallengeXchg.getCompleteChallenge(time, challenge)
val serverTime = System.currentTimeMillis() / 1000L val serverTime = System.currentTimeMillis() / 1000L
val randomChallenge = randomBytes(0xc) val randomChallenge = randomBytes(amount = 0xc)
val serverChallenge = ServerChallengeXchg.getCompleteChallenge(serverTime, randomChallenge) val serverChallenge = ServerChallengeXchg.getCompleteChallenge(serverTime, randomChallenge)
serverMACBuffer ++= send( serverMACBuffer ++= send(
ServerChallengeXchg(serverTime, randomChallenge, ByteVector.view(dh.publicKey)) ServerChallengeXchg(serverTime, randomChallenge, ByteVector.view(dh.publicKey))
@ -391,7 +395,7 @@ class MiddlewareActor(
.receiveSignal(onSignal) .receiveSignal(onSignal)
} }
def cryptoFinish(dh: DiffieHellman, clientChallenge: ByteVector, serverChallenge: ByteVector): Behavior[Command] = { private def cryptoFinish(dh: DiffieHellman, clientChallenge: ByteVector, serverChallenge: ByteVector): Behavior[Command] = {
Behaviors Behaviors
.receiveMessagePartial[Command] { .receiveMessagePartial[Command] {
case Receive(msg) => case Receive(msg) =>
@ -424,13 +428,8 @@ class MiddlewareActor(
) )
) )
send(ServerFinished(serverChallengeResult)) send(ServerFinished(serverChallengeResult))
//start the queue processor loop //try the queue processor loop
packetProcessor = context.system.scheduler.scheduleWithFixedDelay( processQueue()
packetProcessorDelay,
packetProcessorDelay
)(() => {
context.self ! ProcessQueue
})
active() active()
case other => case other =>
@ -450,7 +449,7 @@ class MiddlewareActor(
//formerly, PacketCodingActor //formerly, PacketCodingActor
def active(): Behavior[Command] = { private def active(): Behavior[Command] = {
Behaviors Behaviors
.receiveMessage[Command] { .receiveMessage[Command] {
case Receive(msg) => case Receive(msg) =>
@ -462,7 +461,7 @@ class MiddlewareActor(
case Successful((packet, None)) => case Successful((packet, None)) =>
packet match { packet match {
case _: PlanetSideResetSequencePacket => case _: PlanetSideResetSequencePacket =>
log.info(s"ResetSequence: ${msg.toHex}, inSeq: ${inSequence}, outSeq: ${outSequence}") log.info(s"ResetSequence: ${msg.toHex}, inSeq: $inSequence, outSeq: $outSequence")
case _ => () case _ => ()
} }
in(packet) in(packet)
@ -475,12 +474,24 @@ class MiddlewareActor(
out(packet) out(packet)
Behaviors.same Behaviors.same
case Raw(msg, exclusive) => case Raw(msg, true)
if (exclusive) { if System.currentTimeMillis() - lastOutboundEventTime > packetOutboundDelay =>
outQueue.enqueue((KeepAliveMessage(), msg)) //caught by bundling isolation filter outQueue.enqueue((KeepAliveMessage(), msg)) //caught by bundling isolation filter
} else { processOutQueueBundle()
outQueue.enqueue((ActionResultMessage.Pass, msg)) Behaviors.same
}
case Raw(msg, true) =>
outQueue.enqueue((KeepAliveMessage(), msg)) //caught by bundling isolation filter
Behaviors.same
case Raw(msg, false)
if System.currentTimeMillis() - lastOutboundEventTime > packetOutboundDelay =>
outQueue.enqueue((ActionResultMessage.Pass, msg))
processOutQueueBundle()
Behaviors.same
case Raw(msg, false) =>
outQueue.enqueue((ActionResultMessage.Pass, msg))
Behaviors.same Behaviors.same
case ProcessQueue => case ProcessQueue =>
@ -493,19 +504,24 @@ class MiddlewareActor(
case Teardown() => case Teardown() =>
send(TeardownConnection(clientNonce)) send(TeardownConnection(clientNonce))
context.self ! Close() close()
Behaviors.same Behaviors.same
case Close() => case Close() =>
outQueue close()
.dequeueAll(_ => true) Behaviors.same
.foreach(p => send(smp(slot = 0, p._2.bytes), Some(nextSequence), crypto))
connectionClose()
} }
.receiveSignal(onSignal) .receiveSignal(onSignal)
} }
val onSignal: PartialFunction[(ActorContext[Command], Signal), Behavior[Command]] = { private def close(): Unit = {
outQueue
.dequeueAll(_ => true)
.foreach(p => send(smp(slot = 0, p._2.bytes), Some(nextSequence), crypto))
connectionClose()
}
private val onSignal: PartialFunction[(ActorContext[Command], Signal), Behavior[Command]] = {
case (_, PostStop) => case (_, PostStop) =>
context.stop(nextActor) context.stop(nextActor)
if (timesInReorderQueue > 0 || timesSubslotMissing > 0) { if (timesInReorderQueue > 0 || timesSubslotMissing > 0) {
@ -519,7 +535,7 @@ class MiddlewareActor(
} }
/** Handle incoming packet */ /** Handle incoming packet */
def in(packet: PlanetSidePacket): Behavior[Command] = { private def in(packet: PlanetSidePacket): Behavior[Command] = {
packet match { packet match {
case packet: PlanetSideGamePacket => case packet: PlanetSideGamePacket =>
nextActor ! packet nextActor ! packet
@ -556,25 +572,30 @@ class MiddlewareActor(
Behaviors.same Behaviors.same
case ControlSync(diff, _, _, _, _, _, fa, fb) => case ControlSync(diff, _, _, _, _, _, fa, fb) =>
// TODO: figure out what this is what what it does for the PS client // something to do with reliable packet transmission and resending
// I believe it has something to do with reliable packet transmission and resending // TODO Work around the 2038 problem; can't we just start at 0 again?
// Work around the 2038 problem
// TODO can we just start at 0 again? what is this for?
val serverTick = math.min(System.currentTimeMillis(), 4294967295L) val serverTick = math.min(System.currentTimeMillis(), 4294967295L)
val nextDiff = if (diff == 65535) 0 else diff + 1 val nextDiff = if (diff == 65535) 0 else diff + 1
send(ControlSyncResp(nextDiff, serverTick, fa, fb, fb, fa)) send(ControlSyncResp(nextDiff, serverTick, fa, fb, fb, fa))
Behaviors.same Behaviors.same
case ConnectionClose() => case ConnectionClose() =>
Behaviors.stopped connectionClose()
case TeardownConnection(_) => case TeardownConnection(_) =>
Behaviors.stopped connectionClose()
case ClientStart(_) => case ClientStart(_) =>
start() start()
case Unknown30(_) =>
/*
Unknown30 is used to reuse an existing crypto session when switching from login to world
When not handling it, it appears that the client will fall back to using ClientStart
Do we need to implement this?
*/
connectionClose()
case other => case other =>
log.warn(s"Unhandled control packet '$other'") log.warn(s"Unhandled control packet '$other'")
Behaviors.same Behaviors.same
@ -584,35 +605,38 @@ class MiddlewareActor(
log.error(s"Unexpected crypto packet '$packet'") log.error(s"Unexpected crypto packet '$packet'")
Behaviors.same Behaviors.same
case packet: PlanetSideResetSequencePacket => case _: PlanetSideResetSequencePacket =>
// TODO This is wrong log.error(s"Unexpected crypto packet: received a PlanetSideResetSequencePacket when it should never happen")
// I suspect ResetSequence is a notification that the remote sequence has been reset
// rather than a request to reset our outgoing sequence number
// Resetting it this way causes a client crash, see nextSequence
// log.debug(s"Received sequence reset request from client: $packet.}")
// outSequence = 0
Behaviors.same Behaviors.same
} }
} }
def in(packet: Attempt[PlanetSidePacket]): Unit = { private def in(packet: Attempt[PlanetSidePacket]): Unit = {
packet match { packet match {
case Successful(_packet) => in(_packet) case Successful(_packet) => in(_packet)
case Failure(cause) => log.error(s"Could not decode packet: ${cause.message}") case Failure(cause) => log.error(s"Could not decode packet: ${cause.message}")
} }
} }
private var lastOutboundEventTime: Long = 0L
/** Handle outgoing packet */ /** Handle outgoing packet */
def out(packet: PlanetSidePacket): Unit = { private def out(packet: PlanetSidePacket): Unit = {
packet match { packet match {
case packet: KeepAliveMessage => case packet: KeepAliveMessage =>
send(packet) send(packet)
case _ => case _ =>
PacketCoding.encodePacket(packet) match { PacketCoding.encodePacket(packet) match {
case Successful(payload) => outQueue.enqueue((packet, payload)) case Successful(payload)
case Failure(cause) => log.error(s"Could not encode $packet: ${cause.message}") if System.currentTimeMillis() - lastOutboundEventTime > packetOutboundDelay =>
outQueue.enqueue((packet, payload))
processOutQueueBundle()
case Successful(payload) =>
outQueue.enqueue((packet, payload))
retimePacketProcessorIfNotRunning()
case Failure(cause) =>
log.error(s"Could not encode $packet: ${cause.message}")
} }
} }
} }
@ -622,9 +646,9 @@ class MiddlewareActor(
* the bundling and throttling of outgoing packets * the bundling and throttling of outgoing packets
* and the reordering of out-of-sequence incoming packets. * and the reordering of out-of-sequence incoming packets.
*/ */
def processQueue(): Unit = { private def processQueue(): Unit = {
processOutQueueBundle()
inReorderQueueFunc() inReorderQueueFunc()
processOutQueueBundle()
} }
/** /**
@ -635,7 +659,9 @@ class MiddlewareActor(
* Packets that are too big for the MTU must go on to be split into smaller portions that will be wrapped individually. * Packets that are too big for the MTU must go on to be split into smaller portions that will be wrapped individually.
* Once queued, the first bundle is dispatched to the network. * Once queued, the first bundle is dispatched to the network.
*/ */
def processOutQueueBundle(): Unit = { private def processOutQueueBundle(): Unit = {
packetProcessor.cancel()
packetProcessor = Default.Cancellable
try { try {
if (outQueueBundled.nonEmpty) { if (outQueueBundled.nonEmpty) {
sendFirstBundle() sendFirstBundle()
@ -671,7 +697,8 @@ class MiddlewareActor(
if (bundle.length == 1) { if (bundle.length == 1) {
splitPacket(bundle.head) match { splitPacket(bundle.head) match {
case Seq() => case Seq() =>
//TODO is oversized packet recovery possible? //TODO is over-sized packet recovery possible?
retimePacketProcessorIfWork()
case data => case data =>
outQueueBundled.enqueueAll(data) outQueueBundled.enqueueAll(data)
sendFirstBundle() sendFirstBundle()
@ -712,18 +739,19 @@ class MiddlewareActor(
* @param packet the packet * @param packet the packet
* @param sequence the sequence number obtained from the packet * @param sequence the sequence number obtained from the packet
*/ */
def activeNormal(packet: PlanetSidePacket, sequence: Int): Unit = { private def activeNormal(packet: PlanetSidePacket, sequence: Int): Unit = {
if (sequence == inSequence + 1) { if (sequence == inSequence + 1) {
inSequence = sequence inSequence = sequence
in(packet) in(packet)
} else if (sequence < inSequence) { //expedite this packet } else if (sequence < inSequence) { //expedite this packet
in(packet) in(packet)
} else if (sequence == inSequence) { } else if (sequence == inSequence) {
//do nothing? log.warn("reported packet sequence matches previous packet sequence; nothing will happen?")
} else { } else {
inReorderQueue.enqueue(InReorderEntry(packet, sequence, System.currentTimeMillis())) //first entry inReorderQueue.enqueue(InReorderEntry(packet, sequence, System.currentTimeMillis())) //first entry
inReorderQueueFunc = processInReorderQueueTimeoutOnly inReorderQueueFunc = processInReorderQueueTimeoutOnly
activeSequenceFunc = activeWithReordering activeSequenceFunc = activeWithReordering
retimePacketProcessorIfNotRunning()
log.trace("packet sequence in disorder") log.trace("packet sequence in disorder")
} }
} }
@ -741,7 +769,7 @@ class MiddlewareActor(
* @param packet the packet * @param packet the packet
* @param sequence the sequence number obtained from the packet * @param sequence the sequence number obtained from the packet
*/ */
def activeWithReordering(packet: PlanetSidePacket, sequence: Int): Unit = { private def activeWithReordering(packet: PlanetSidePacket, sequence: Int): Unit = {
if (sequence == inSequence + 1) { if (sequence == inSequence + 1) {
inSequence = sequence inSequence = sequence
in(packet) in(packet)
@ -778,7 +806,7 @@ class MiddlewareActor(
* Set the recorded inbound sequence number to belong to the greatest packet removed from the queue. * Set the recorded inbound sequence number to belong to the greatest packet removed from the queue.
* @see `processInReorderQueueTimeoutOnly` * @see `processInReorderQueueTimeoutOnly`
*/ */
def processInReorderQueue(): Unit = { private def processInReorderQueue(): Unit = {
timesInReorderQueue += 1 timesInReorderQueue += 1
var currentSequence = inSequence var currentSequence = inSequence
val currentTime = System.currentTimeMillis() val currentTime = System.currentTimeMillis()
@ -806,6 +834,7 @@ class MiddlewareActor(
inReorderQueueFunc = processInReorderQueueTimeoutOnly inReorderQueueFunc = processInReorderQueueTimeoutOnly
in(p) in(p)
} }
retimePacketProcessorIfNotRunningIfWork()
} }
/** /**
@ -814,7 +843,7 @@ class MiddlewareActor(
* Set the recorded inbound sequence number to belong to the greatest packet removed from the queue. * Set the recorded inbound sequence number to belong to the greatest packet removed from the queue.
* This may run during the scheduled check on the in-order queue after the outbound bundling process. * This may run during the scheduled check on the in-order queue after the outbound bundling process.
*/ */
def processInReorderQueueTimeoutOnly(): Unit = { private def processInReorderQueueTimeoutOnly(): Unit = {
timesInReorderQueue += 1 timesInReorderQueue += 1
val currentTime = System.currentTimeMillis() val currentTime = System.currentTimeMillis()
val index = inReorderQueue.indexWhere { currentTime - _.time > inReorderTimeout.toMillis } val index = inReorderQueue.indexWhere { currentTime - _.time > inReorderTimeout.toMillis }
@ -832,7 +861,7 @@ class MiddlewareActor(
* Otherwise, do work on whatever is at the front of the queue, and do not test again until explicitly requested. * Otherwise, do work on whatever is at the front of the queue, and do not test again until explicitly requested.
* Test the queue for more contents after removing content from it. * Test the queue for more contents after removing content from it.
*/ */
def inReorderQueueTest(): Unit = { private def inReorderQueueTest(): Unit = {
if (inReorderQueue.isEmpty) { if (inReorderQueue.isEmpty) {
inReorderQueueFunc = doNothing inReorderQueueFunc = doNothing
activeSequenceFunc = activeNormal activeSequenceFunc = activeNormal
@ -883,7 +912,7 @@ class MiddlewareActor(
* @see `PacketCoding.decodePacket` * @see `PacketCoding.decodePacket`
* @see `RelatedB` * @see `RelatedB`
*/ */
def inSubslotNotMissing(slot: Int, subslot: Int, inner: ByteVector): Unit = { private def inSubslotNotMissing(slot: Int, subslot: Int, inner: ByteVector): Unit = {
if (subslot == inSubslot + 1) { if (subslot == inSubslot + 1) {
in(PacketCoding.decodePacket(inner)) in(PacketCoding.decodePacket(inner))
send(RelatedB(slot, subslot)) send(RelatedB(slot, subslot))
@ -933,7 +962,7 @@ class MiddlewareActor(
* @see `inSubslotsMissingRequestsFinished` * @see `inSubslotsMissingRequestsFinished`
* @see `PacketCoding.decodePacket` * @see `PacketCoding.decodePacket`
*/ */
def inSubslotMissingRequests(slot: Int, subslot: Int, inner: ByteVector): Unit = { private def inSubslotMissingRequests(slot: Int, subslot: Int, inner: ByteVector): Unit = {
if (subslot < inSubslot) { if (subslot < inSubslot) {
inSubslotsMissing.remove(subslot) inSubslotsMissing.remove(subslot)
in(PacketCoding.decodePacket(inner)) in(PacketCoding.decodePacket(inner))
@ -955,7 +984,7 @@ class MiddlewareActor(
* resume normal operations when acting upon inbound `SlottedMetaPacket` packets. * resume normal operations when acting upon inbound `SlottedMetaPacket` packets.
* @param slot the optional slot to report the "first" `RelatedB` in a "while" * @param slot the optional slot to report the "first" `RelatedB` in a "while"
*/ */
def inSubslotsMissingRequestsFinished(slot: Int = 0): Unit = { private def inSubslotsMissingRequestsFinished(slot: Int = 0): Unit = {
if (inSubslotsMissing.isEmpty) { if (inSubslotsMissing.isEmpty) {
subslotMissingProcessor.cancel() subslotMissingProcessor.cancel()
activeSubslotsFunc = inSubslotNotMissing activeSubslotsFunc = inSubslotNotMissing
@ -969,7 +998,7 @@ class MiddlewareActor(
* Split packets are wrapped in a `HandleGamePacket` and sent as `SlottedMetaPacket4`. * Split packets are wrapped in a `HandleGamePacket` and sent as `SlottedMetaPacket4`.
* The purpose of `SlottedMetaPacket4` may or may not be to indicate a split packet. * The purpose of `SlottedMetaPacket4` may or may not be to indicate a split packet.
*/ */
def splitPacket(packet: BitVector): Seq[PlanetSideControlPacket] = { private def splitPacket(packet: BitVector): Seq[PlanetSideControlPacket] = {
if (packet.length > (MTU - 4) * 8) { if (packet.length > (MTU - 4) * 8) {
PacketCoding.encodePacket(HandleGamePacket(packet.bytes)) match { PacketCoding.encodePacket(HandleGamePacket(packet.bytes)) match {
case Successful(data) => case Successful(data) =>
@ -989,7 +1018,7 @@ class MiddlewareActor(
* @param data hexadecimal data, the encoded packets to be placed in the SMP * @param data hexadecimal data, the encoded packets to be placed in the SMP
* @return the packet * @return the packet
*/ */
def smp(slot: Int, data: ByteVector): SlottedMetaPacket = { private def smp(slot: Int, data: ByteVector): SlottedMetaPacket = {
val packet = SlottedMetaPacket(slot, nextSubslot, data) val packet = SlottedMetaPacket(slot, nextSubslot, data)
preparedSlottedMetaPackets.update(nextSmpIndex, packet) preparedSlottedMetaPackets.update(nextSmpIndex, packet)
nextSmpIndex = (nextSmpIndex + 1) % smpHistoryLength nextSmpIndex = (nextSmpIndex + 1) % smpHistoryLength
@ -1003,23 +1032,25 @@ class MiddlewareActor(
* @throws NoSuchElementException if there is no packet to dequeue * @throws NoSuchElementException if there is no packet to dequeue
* @see `SlottedMetaPacket` * @see `SlottedMetaPacket`
*/ */
def sendFirstBundle(): Unit = { private def sendFirstBundle(): Unit = {
lastOutboundEventTime = System.currentTimeMillis()
send(outQueueBundled.dequeue(), Some(nextSequence), crypto) send(outQueueBundled.dequeue(), Some(nextSequence), crypto)
retimePacketProcessorIfNotRunningIfWork()
} }
def send(packet: PlanetSideControlPacket): ByteVector = { private def send(packet: PlanetSideControlPacket): ByteVector = {
send(packet, if (crypto.isDefined) Some(nextSequence) else None, crypto) send(packet, if (crypto.isDefined) Some(nextSequence) else None, crypto)
} }
def send(packet: PlanetSideCryptoPacket): ByteVector = { private def send(packet: PlanetSideCryptoPacket): ByteVector = {
send(packet, Some(nextSequence), crypto) send(packet, Some(nextSequence), crypto)
} }
def send(packet: PlanetSideGamePacket): ByteVector = { private def send(packet: PlanetSideGamePacket): ByteVector = {
send(packet, Some(nextSequence), crypto) send(packet, Some(nextSequence), crypto)
} }
def send(packet: PlanetSidePacket, sequence: Option[Int], crypto: Option[CryptoCoding]): ByteVector = { private def send(packet: PlanetSidePacket, sequence: Option[Int], crypto: Option[CryptoCoding]): ByteVector = {
PacketCoding.marshalPacket(packet, sequence, crypto) match { PacketCoding.marshalPacket(packet, sequence, crypto) match {
case Successful(bits) => case Successful(bits) =>
val bytes = bits.toByteVector val bytes = bits.toByteVector
@ -1036,17 +1067,61 @@ class MiddlewareActor(
* @param amount the number of bytes * @param amount the number of bytes
* @return a random series of bytes * @return a random series of bytes
*/ */
def randomBytes(amount: Int): ByteVector = { //noinspection SameParameterValue
private def randomBytes(amount: Int): ByteVector = {
val array = Array.ofDim[Byte](amount) val array = Array.ofDim[Byte](amount)
random.nextBytes(array) random.nextBytes(array)
ByteVector.view(array) ByteVector.view(array)
} }
/**
* If the timer the timer for processing outbound packets is not currently running,
* test if there are any packets waiting in certain messaging queues
* for the chance to cancel, then restart, the timer for processing outbound packets.
*/
private def retimePacketProcessorIfNotRunningIfWork(): Unit = {
if (packetProcessor == Default.Cancellable || packetProcessor.isCancelled) {
retimePacketProcessorIfWork()
}
}
/**
* If the timer the timer for processing outbound packets is not currently running,
* test if there are any packets waiting in certain messaging queues
* for the chance to cancel, then restart, the timer for processing outbound packets.
*/
private def retimePacketProcessorIfNotRunning(): Unit = {
if (packetProcessor == Default.Cancellable || packetProcessor.isCancelled) {
retimePacketProcessor()
}
}
/**
* If there are any packets waiting in the following queues,
* cancel, then restart, the timer for processing outbound packets.
*/
private def retimePacketProcessorIfWork(): Unit = {
if (outQueueBundled.nonEmpty || outQueue.nonEmpty || inReorderQueue.nonEmpty) {
retimePacketProcessor()
}
}
/**
* Cancel, then restart, the timer for processing outbound packets.
* Sends a message to the normal mailbox to signal processing activity.
*/
private def retimePacketProcessor(): Unit = {
packetProcessor.cancel()
packetProcessor = context.system.scheduler.scheduleOnce(packetProcessorDelay, () => {
context.self ! ProcessQueue
})
}
/** /**
* End client-server ops. * End client-server ops.
* End messaging capabilities. * End messaging capabilities.
*/ */
def connectionClose(): Behavior[Command] = { private def connectionClose(): Behavior[Command] = {
send(ConnectionClose()) send(ConnectionClose())
Behaviors.stopped Behaviors.stopped
} }

View file

@ -516,7 +516,8 @@ object BlockMap {
structure: Iterable[Sector] structure: Iterable[Sector]
): Iterable[Sector] = { ): Iterable[Sector] = {
if (list.max < structure.size) { if (list.max < structure.size) {
list.toSet.map { structure.toSeq } val structureSeq = structure.toSeq
list.toSet.map { structureSeq }
} else { } else {
List[Sector]() List[Sector]()
} }

View file

@ -135,6 +135,7 @@ case class NetworkConfig(
case class MiddlewareConfig( case class MiddlewareConfig(
packetBundlingDelay: FiniteDuration, packetBundlingDelay: FiniteDuration,
packetBundlingDelayMultiplier: Float,
inReorderTimeout: FiniteDuration, inReorderTimeout: FiniteDuration,
inSubslotMissingDelay: FiniteDuration, inSubslotMissingDelay: FiniteDuration,
inSubslotMissingAttempts: Int inSubslotMissingAttempts: Int