From b5fc2ecf7082da6a2dfae23027c0d2062e633b2c Mon Sep 17 00:00:00 2001 From: Fate-JH Date: Tue, 12 Jan 2021 14:33:44 -0500 Subject: [PATCH] Middleware (#662) * removed suspicious shadowing and clarified failure message; customized skipped bundling * smp history is now a no-less-efficient circular array * adjustment to bundle dispatch timing; adjustment to inbound sequence reorder queue * adjustments to handling inbound packets with missing subslots * unused PacketCoding features * comments; delayed start of the queue processor task; turned sequence reorder task and subslot missing task into function literals * optimizations to the inbound re-order by sequence routines by controlling execution flow * the subslot request timer has been separated from the standard bundling processor; config values for bundling, sequence resolution, and subslot requests in the middleware actor have been included * replacing func-array with conditional logic --- src/main/resources/application.conf | 17 + .../actors/net/MiddlewareActor.scala | 826 +++++++++++++----- .../net/psforever/packet/PacketCoding.scala | 44 +- .../scala/net/psforever/util/Config.scala | 10 +- 4 files changed, 649 insertions(+), 248 deletions(-) diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 1f5f34349..ae2052134 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -99,6 +99,23 @@ anti-cheat { } network { + middleware { + # How often between executions of the outbound bundling process + packet-bundling-delay = 25 milliseconds + + # Pause inbound packet transmission towards the network if the sequence number is out of order + # Packets are put aside until the sequence is restored, or this timeout passes + in-reorder-timeout = 50 milliseconds + + # Wait on inbound packets if that packet is a SlottedMetaPacket and the next subslot number is greater than expected + # Does not stop the transmission of packets to the server + # but dictates how long between requests to the network (client) for missing packets with anticipated subslot numbers + in-subslot-missing-delay = 50 milliseconds + + # How many attempts at resolving missing packets with anticipated subslot numbers + in-subslot-missing-attempts = 10 + } + session { # The maximum amount of time since the last inbound packet from a UDP session # before it is dropped. diff --git a/src/main/scala/net/psforever/actors/net/MiddlewareActor.scala b/src/main/scala/net/psforever/actors/net/MiddlewareActor.scala index 0b793974d..bd56e9f8f 100644 --- a/src/main/scala/net/psforever/actors/net/MiddlewareActor.scala +++ b/src/main/scala/net/psforever/actors/net/MiddlewareActor.scala @@ -1,53 +1,70 @@ package net.psforever.actors.net -import java.net.InetSocketAddress -import java.security.{SecureRandom, Security} import akka.actor.Cancellable import akka.actor.typed.{ActorRef, ActorTags, Behavior, PostStop, Signal} import akka.actor.typed.scaladsl.{ActorContext, Behaviors} import akka.io.Udp -import net.psforever.packet.{ - CryptoPacketOpcode, - PacketCoding, - PlanetSideControlPacket, - PlanetSideCryptoPacket, - PlanetSideGamePacket, - PlanetSidePacket -} -import net.psforever.packet.control.{ - ClientStart, - ConnectionClose, - ControlSync, - ControlSyncResp, - HandleGamePacket, - MultiPacket, - MultiPacketEx, - RelatedA, - RelatedB, - ServerStart, - SlottedMetaPacket, - TeardownConnection -} -import net.psforever.packet.crypto.{ClientChallengeXchg, ClientFinished, ServerChallengeXchg, ServerFinished} -import net.psforever.packet.game.{ChangeFireModeMessage, CharacterInfoMessage, KeepAliveMessage, PingMsg} +import java.net.InetSocketAddress +import java.security.{SecureRandom, Security} + +import javax.crypto.spec.SecretKeySpec +import org.bouncycastle.jce.provider.BouncyCastleProvider + +import scala.collection.concurrent.TrieMap +import scala.collection.mutable +import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} +import scala.concurrent.duration._ +import scodec.Attempt import scodec.Attempt.{Failure, Successful} import scodec.bits.{BitVector, ByteVector, HexStringSyntax} import scodec.interop.akka.EnrichedByteVector - -import javax.crypto.spec.SecretKeySpec +import net.psforever.objects.Default +import net.psforever.packet.{CryptoPacketOpcode, PacketCoding, PlanetSideControlPacket, PlanetSideCryptoPacket, PlanetSideGamePacket, PlanetSidePacket} +import net.psforever.packet.control.{ClientStart, ConnectionClose, ControlSync, ControlSyncResp, HandleGamePacket, MultiPacket, MultiPacketEx, RelatedA, RelatedB, ServerStart, SlottedMetaPacket, TeardownConnection} +import net.psforever.packet.crypto.{ClientChallengeXchg, ClientFinished, ServerChallengeXchg, ServerFinished} +import net.psforever.packet.game.{ChangeFireModeMessage, CharacterInfoMessage, KeepAliveMessage, PingMsg} import net.psforever.packet.PacketCoding.CryptoCoding -import net.psforever.util.{DiffieHellman, Md5Mac} -import org.bouncycastle.jce.provider.BouncyCastleProvider -import scodec.Attempt +import net.psforever.util.{Config, DiffieHellman, Md5Mac} -import scala.collection.mutable -import scala.collection.mutable.ListBuffer -import scala.concurrent.{ExecutionContext, ExecutionContextExecutor} -import scala.concurrent.duration._ - -/** MiddlewareActor sits between the raw UDP socket and the "main" actors (either login or session) and handles - * crypto and control packets. This means it sets up cryptography, it decodes incoming packets, - * it encodes, bundles and splits outgoing packets, it handles things like requesting/resending lost packets and more. +/** + * `MiddlewareActor` sits between the raw UDP socket and the "main" actors + * (either `LoginActor` or `SessionActor`) + * and handles crypto and control packets. + * The former side is called the outbound network (the clients); the former is called the inbound local (server). + * This service sets up cryptography, it decodes incoming packets, it encodes, bundles, or splits outgoing packets, + * and it handles things like requesting/resending lost packets. + * Accurate operation of the service is mostly a product of the network being agreeable + * and allowing packets to arrive correctly. + * Various subroutines are used to keep track of the predicted offending packets.
+ *
+ * The cryptographic aspect of the service resolves itself first, + * exchanging keys and passcodes and challeneges between local and the network. + * Most of this process occurs without any prior cryptographic setup, + * relying on the validation of its own exchange to operate. + * Afterwards its completion, + * all raw data arriving from the network or leaving to the network will require a cipher + * based on the previously exchanged data. + *
+ * As packets arrive from local with the intention of being sent out towards the network terminus, + * they will be pooled into a buffer. + * Periodically, the buffer contents will be evaluated, + * and packet data of enough quantity and combined length will be assembled into a singular packet bundle. + * This bundle will be queued and dispatched towards the network. + * If the outbound packets do not arrive at the network terminus correctly, + * the network has a method of dispatching requests for identified missing packets. + * This side of the communication will keep track of its previously dispatched packets for a "reasonable" amount of time + * and will respond to those requests if possible by searching its catalog.
+ *
+ * If the inbound packets do not arrive correctly the first time, + * after a while, requests for identified mising packets from the network source will occur. + * Virtually all important packets have a sequence number that bestows on each packet an absolute delivery order. + * Bundled packets have a subslot number that indicates the total number of subslot packets dispatched to this client + * as well as the order in which the packets should have been received. + * If a packet is out of sequence - a future packet, compared to what is being expected - it is buffered. + * Should the expected packets show up out of order before the buffered is cleared, + * everything sorts itself out. + * Unresolved missing sequence entries will often lead to requests for missing packets with anticipated subslots. + * If these requests do not resolve, there is unfortuantely not much that can be done except grin and bear with it. */ object MiddlewareActor { Security.addProvider(new BouncyCastleProvider) @@ -77,8 +94,52 @@ object MiddlewareActor { /** Close connection */ final case class Close() extends Command + + /** ... */ + private case class ProcessQueue() extends Command + + /** Log inbound packets that are yet to be in proper order by sequence number */ + private case class InReorderEntry(packet: PlanetSidePacket, sequence: Int, time: Long) + + /** + * All packets are bundled by themselves. + * May as well just waste all of the cycles on your CPU, eh? + */ + def allPacketGuard(packet: PlanetSidePacket): Boolean = true + + /** + * `CharacterInfoMessage` packets are bundled by themselves.
+ *
+ * Super awkward special case. + * Bundling `CharacterInfoMessage` with its corresponding `ObjectCreateDetailedMesssage`, + * which can occur during otherwise careless execution of the character select screen, + * causes the character options to show blank slots and be unusable. + */ + def characterInfoMessageGuard(packet: PlanetSidePacket): Boolean = { + packet.isInstanceOf[CharacterInfoMessage] + } + + /** + * `KeepAliveMessage` packets are bundled by themselves. + * They're special. + */ + def keepAliveMessageGuard(packet: PlanetSidePacket): Boolean = { + packet.isInstanceOf[KeepAliveMessage] + } + + /** + * A function for blanking tasks related to inbound packet resolution. + * Do nothing. + * Wait to be told to do something. + */ + private def doNothing(): Unit = { } } +/** + * MiddlewareActor sits between the raw UDP socket and the "main" actors (either login or session) and handles + * crypto and control packets. This means it sets up cryptography, it decodes incoming packets, + * it encodes, bundles and splits outgoing packets, it handles things like requesting/resending lost packets and more. + */ class MiddlewareActor( context: ActorContext[MiddlewareActor.Command], socket: ActorRef[Udp.Command], @@ -106,16 +167,16 @@ class MiddlewareActor( context.spawnAnonymous(next(context.self, sender, connectionId), ActorTags(s"id=$connectionId")) /** Queue of incoming packets (plus sequence numbers and timestamps) that arrived in the wrong order */ - val inReorderQueue: ListBuffer[(PlanetSidePacket, Int, Long)] = ListBuffer() + private val inReorderQueue: mutable.Queue[InReorderEntry] = mutable.Queue() /** Latest incoming sequence number */ - var inSequence = 0 + var inSequence = -1 /** Latest incoming subslot number */ - var inSubslot = 0 + var inSubslot = -1 /** List of missing subslot numbers and attempts counter */ - var inSubslotsMissing: mutable.Map[Int, Int] = mutable.Map() + val inSubslotsMissing: mutable.Map[Int, Int] = TrieMap() /** Queue of outgoing packets used for bundling and splitting */ val outQueue: mutable.Queue[(PlanetSidePacket, BitVector)] = mutable.Queue() @@ -123,9 +184,16 @@ class MiddlewareActor( /** Queue of outgoing packets ready for sending */ val outQueueBundled: mutable.Queue[PlanetSidePacket] = mutable.Queue() - /** Latest outgoing sequence number */ + /** Latest outbound sequence number; + * the current sequence is one less than this number */ var outSequence = 0 + /** + * Increment the outbound sequence number. + * The previous sequence number is returned. + * The fidelity of the sequence field in packets is 16 bits, so wrap back to 0 after 65535. + * @return + */ def nextSequence: Int = { val r = outSequence if (outSequence == 0xffff) { @@ -136,9 +204,16 @@ class MiddlewareActor( r } - /** Latest outgoing subslot number */ + /** Latest outbound subslot number; + * the current subslot is one less than this number */ var outSubslot = 0 + /** + * Increment the outbound subslot number. + * The previous subslot number is returned. + * The fidelity of the subslot field in `SlottedMetapacket`'s is 16 bits, so wrap back to 0 after 65535. + * @return + */ def nextSubslot: Int = { val r = outSubslot if (outSubslot == 0xffff) { @@ -149,111 +224,46 @@ class MiddlewareActor( r } - /** Create a new SlottedMetaPacket with the sequence number filled in and the packet added to the history */ - def smp(slot: Int, data: ByteVector): SlottedMetaPacket = { - if (outSlottedMetaPackets.length > 100) { - outSlottedMetaPackets = outSlottedMetaPackets.takeRight(100) - } - val packet = SlottedMetaPacket(slot, nextSubslot, data) - outSlottedMetaPackets += packet - packet - } + /** + * Do not bundle these packets together with other packets + */ + val packetsBundledByThemselves: List[PlanetSidePacket=>Boolean] = List( + MiddlewareActor.keepAliveMessageGuard, + MiddlewareActor.characterInfoMessageGuard + ) - /** History of sent SlottedMetaPackets in case the client requests missing SMP packets via a RelatedA packet. */ - var outSlottedMetaPackets: ListBuffer[SlottedMetaPacket] = ListBuffer() + val smpHistoryLength: Int = 100 + /** History of created `SlottedMetaPacket`s. + * In case the client does not register receiving a packet by checking against packet subslot index numbers, + * it will dispatch a `RelatedA` packet, + * and the server will hopefully locate the packet where it has been backlogged. + * The client will also dispatch a `RelatedB` packet to indicate the packet with the highest subslot received. + * All packets with subslots less than that number have been received or will no longer be requested. + * The client and server supposedly maintain reciprocating mechanisms. + */ + val preparedSlottedMetaPackets: Array[SlottedMetaPacket] = new Array[SlottedMetaPacket](smpHistoryLength) + var nextSmpIndex: Int = 0 + var acceptedSmpSubslot: Int = 0 - /** Timer that handles the bundling and throttling of outgoing packets and the reordering of incoming packets */ - val queueProcessor: Cancellable = { - context.system.scheduler.scheduleWithFixedDelay(10.milliseconds, 10.milliseconds)(() => { - try { + /** end of life stat */ + var timesInReorderQueue: Int = 0 + /** end of life stat */ + var timesSubslotMissing: Int = 0 + /** Delay between runs of the packet bundler/resolver timer (ms); + * 250ms per network update (client upstream), so 10 runs of this bundling code every update */ + val packetProcessorDelay = Config.app.network.middleware.packetBundlingDelay + /** Timer that handles the bundling and throttling of outgoing packets and resolves disorganized inbound packets */ + var packetProcessor: Cancellable = Default.Cancellable + /** how long packets that are out of sequential order wait for the missing sequence before being expedited (ms) */ + val inReorderTimeout = Config.app.network.middleware.inReorderTimeout + /** Timer that handles the bundling and throttling of outgoing packets requesting packets with known subslot numbers */ + var subslotMissingProcessor: Cancellable = Default.Cancellable + /** how long to wait between repeated requests for packets with known missing subslot numbers (ms) */ + val inSubslotMissingDelay = Config.app.network.middleware.inSubslotMissingDelay + /** how many time to repeat the request for a packet with a known missing subslot number */ + val inSubslotMissingNumberOfAttempts = Config.app.network.middleware.inSubslotMissingAttempts - if (outQueue.nonEmpty && outQueueBundled.isEmpty) { - var length = 0L - val bundle = outQueue - .dequeueWhile { - case (packet, payload) => - // packet length + MultiPacketEx prefix length - val packetLength = payload.length + (if (payload.length < 256 * 8) { 1L * 8 } - else if (payload.length < 65536 * 8) { 2L * 8 } - else { 4L * 8 }) - length += packetLength - - packet match { - // Super awkward special case: Bundling CharacterInfoMessage with OCDM causes the character selection - // to show blank lines and be broken. So we make sure CharacterInfoMessage is always sent as the only - // packet in a bundle. - case _: CharacterInfoMessage => - if (length == packetLength) { - length += MTU - true - } else { - false - } - case _ => - // Some packets may be larger than the MTU limit, in that case we dequeue anyway and split later - // We deduct some bytes to leave room for SlottedMetaPacket (4 bytes) and MultiPacketEx (2 bytes + prefix per packet) - length == packetLength || length <= (MTU - 6) * 8 - } - } - .map(_._2) - - if (bundle.length == 1) { - outQueueBundled.enqueueAll(splitPacket(bundle.head)) - } else { - PacketCoding.encodePacket(MultiPacketEx(bundle.toVector.map(_.bytes))) match { - case Successful(data) => outQueueBundled.enqueue(smp(0, data.bytes)) - case Failure(cause) => log.error(cause.message) - } - } - } - - outQueueBundled.dequeueFirst(_ => true) match { - case Some(packet) => send(packet, Some(nextSequence), crypto) - case None => () - } - - if (inReorderQueue.nonEmpty) { - var currentSequence = inSequence - val currentTime = System.currentTimeMillis() - inReorderQueue - .sortBy(_._2) - .dropWhile { - case (_, sequence, time) => - // Forward packet if next in sequence order or older than 20ms - if (sequence == currentSequence + 1 || currentTime - time > 20) { - currentSequence += 1 - true - } else { - false - } - } - .foreach { - case (packet, sequence, _) => - if (sequence > inSequence) { - inSequence = sequence - } - in(packet) - } - } - - if (inSubslotsMissing.nonEmpty) { - inSubslotsMissing.foreach { - case (subslot, attempts) => - if (attempts <= 50) { - // Slight hack to send RelatedA less frequently, might want to put this on a separate timer - if (attempts % 10 == 0) send(RelatedA(0, subslot)) - inSubslotsMissing(subslot) += 1 - } else { - log.warn(s"Requesting subslot '$subslot' from client failed") - inSubslotsMissing.remove(subslot) - } - } - } - } catch { - case e: Throwable => log.error(e)("Queue processing error") - } - }) - } +//formerly, CryptoSessionActor def start(): Behavior[Command] = { Behaviors.receiveMessagePartial { @@ -269,7 +279,7 @@ class MiddlewareActor( // TODO ResetSequence case _ => - log.error(s"Unexpected packet type $packet in init") + log.warn(s"Unexpected packet type $packet in start (before crypto)") Behaviors.same } case Failure(_) => @@ -283,9 +293,12 @@ class MiddlewareActor( // reflect the packet back to the sender send(ping) Behaviors.same + case _: ChangeFireModeMessage => - // ignore + log.trace(s"What is this packet that just arrived? ${msg.toString}") + //ignore Behaviors.same + case _ => log.error(s"Unexpected non-crypto packet type $packet in start") Behaviors.same @@ -312,18 +325,14 @@ class MiddlewareActor( packet match { case (ClientChallengeXchg(time, challenge, p, g), Some(_)) => serverMACBuffer ++= msg.drop(3) - - val dh = DiffieHellman(p.toArray, g.toArray) - + val dh = DiffieHellman(p.toArray, g.toArray) val clientChallenge = ServerChallengeXchg.getCompleteChallenge(time, challenge) val serverTime = System.currentTimeMillis() / 1000L val randomChallenge = randomBytes(0xc) val serverChallenge = ServerChallengeXchg.getCompleteChallenge(serverTime, randomChallenge) - serverMACBuffer ++= send( ServerChallengeXchg(serverTime, randomChallenge, ByteVector.view(dh.publicKey)) ).drop(3) - cryptoFinish(dh, clientChallenge, serverChallenge) case _ => @@ -350,34 +359,18 @@ class MiddlewareActor( packet match { case (ClientFinished(clientPubKey, _), Some(_)) => serverMACBuffer ++= msg.drop(3) - - val agreedKey = dh.agree(clientPubKey.toArray) + val agreedKey = dh.agree(clientPubKey.toArray) val agreedMessage = ByteVector("master secret".getBytes) ++ clientChallenge ++ hex"00000000" ++ serverChallenge ++ hex"00000000" - - val masterSecret = new Md5Mac(ByteVector.view(agreedKey)).updateFinal(agreedMessage) - val mac = new Md5Mac(masterSecret) - - // To do? verify client challenge. The code below has always been commented out, so it probably never - // worked and it surely doesn't work now. The whole cryptography is flawed because - // of the 128bit p values for DH, so implementing security features is probably not worth it. - /* - val clientChallengeExpanded = mac.updateFinal( - ByteVector( - "client finished".getBytes - ) ++ serverMACBuffer ++ hex"01" ++ clientChallengeResult ++ hex"01", - 0xc - ) - */ - + val masterSecret = new Md5Mac(ByteVector.view(agreedKey)).updateFinal(agreedMessage) + val mac = new Md5Mac(masterSecret) + //TODO verify client challenge? val serverChallengeResult = mac .updateFinal(ByteVector("server finished".getBytes) ++ serverMACBuffer ++ hex"01", 0xc) - - val encExpansion = ByteVector.view("server expansion".getBytes) ++ hex"0000" ++ serverChallenge ++ + val encExpansion = ByteVector.view("server expansion".getBytes) ++ hex"0000" ++ serverChallenge ++ hex"00000000" ++ clientChallenge ++ hex"00000000" - val decExpansion = ByteVector.view("client expansion".getBytes) ++ hex"0000" ++ serverChallenge ++ + val decExpansion = ByteVector.view("client expansion".getBytes) ++ hex"0000" ++ serverChallenge ++ hex"00000000" ++ clientChallenge ++ hex"00000000" - val expandedEncKey = mac.updateFinal(encExpansion, 64) val expandedDecKey = mac.updateFinal(decExpansion, 64) @@ -389,9 +382,15 @@ class MiddlewareActor( expandedDecKey.slice(20, 36) ) ) - send(ServerFinished(serverChallengeResult)) - + //start the queue processor loop + packetProcessor = + context.system.scheduler.scheduleWithFixedDelay( + packetProcessorDelay, + packetProcessorDelay + )(()=> { + context.self ! ProcessQueue() + }) active() case other => @@ -409,20 +408,19 @@ class MiddlewareActor( .receiveSignal(onSignal) } +//formerly, PacketCodingActor + def active(): Behavior[Command] = { Behaviors .receiveMessage[Command] { case Receive(msg) => PacketCoding.unmarshalPacket(msg, crypto) match { case Successful((packet, Some(sequence))) => - if (sequence == inSequence + 1) { - inSequence = sequence - in(packet) - } else { - inReorderQueue.addOne((packet, sequence, System.currentTimeMillis())) - } - case Successful((packet, None)) => in(packet) - case Failure(e) => log.error(s"Could not decode packet: $e") + activeSequenceFunc(packet, sequence) + case Successful((packet, None)) => + in(packet) + case Failure(e) => + log.error(s"could not decode packet: $e") } Behaviors.same @@ -430,6 +428,10 @@ class MiddlewareActor( out(packet) Behaviors.same + case ProcessQueue() => + processQueue() + Behaviors.same + case Teardown() => send(TeardownConnection(clientNonce)) context.self ! Close() @@ -438,7 +440,7 @@ class MiddlewareActor( case Close() => outQueue .dequeueAll(_ => true) - .foreach(p => send(smp(0, p._2.bytes), Some(nextSequence), crypto)) + .foreach(p => send(smp(slot = 0, p._2.bytes), Some(nextSequence), crypto)) connectionClose() } .receiveSignal(onSignal) @@ -447,7 +449,13 @@ class MiddlewareActor( val onSignal: PartialFunction[(ActorContext[Command], Signal), Behavior[Command]] = { case (_, PostStop) => context.stop(nextActor) - queueProcessor.cancel() + if(timesInReorderQueue > 0 || timesSubslotMissing > 0) { + log.trace(s"out of sequence checks: $timesInReorderQueue, subslot missing checks: $timesSubslotMissing") + } + packetProcessor.cancel() + subslotMissingProcessor.cancel() + inReorderQueue.clear() + inSubslotsMissing.clear() Behaviors.same } @@ -461,17 +469,7 @@ class MiddlewareActor( case packet: PlanetSideControlPacket => packet match { case SlottedMetaPacket(slot, subslot, inner) => - if (subslot > inSubslot + 1) { - ((inSubslot + 1) until subslot).foreach(s => inSubslotsMissing.addOne((s, 0))) - } else if (inSubslotsMissing.contains(subslot)) { - inSubslotsMissing.remove(subslot) - } else if (inSubslotsMissing.isEmpty) { - send(RelatedB(slot, subslot)) - } - if (subslot > inSubslot) { - inSubslot = subslot - } - in(PacketCoding.decodePacket(inner)) + activeSubslotsFunc(slot, subslot, inner) Behaviors.same case MultiPacket(packets) => @@ -483,15 +481,19 @@ class MiddlewareActor( Behaviors.same case RelatedA(slot, subslot) => - log.info(s"Client indicated a packet is missing prior to slot '$slot' and subslot '$subslot'") - outSlottedMetaPackets.find(_.subslot == subslot - 1) match { - case Some(_packet) => outQueueBundled.enqueue(_packet) - case None => log.warn(s"Client requested unknown subslot '$subslot'") + val requestedSubslot = subslot - 1 + preparedSlottedMetaPackets.find(_.subslot == requestedSubslot) match { + case Some(_packet) => + outQueueBundled.enqueue(_packet) + case None if requestedSubslot < acceptedSmpSubslot => + log.warn(s"Client indicated an smp of slot $slot prior to $subslot that is no longer logged") + case None => + log.warn(s"Client indicated an smp of slot $slot prior to $subslot that is not found") } Behaviors.same case RelatedB(_, subslot) => - outSlottedMetaPackets = outSlottedMetaPackets.filter(_.subslot > subslot) + acceptedSmpSubslot = subslot Behaviors.same case ControlSync(diff, _, _, _, _, _, fa, fb) => @@ -523,7 +525,6 @@ class MiddlewareActor( log.error(s"Unexpected crypto packet '$packet'") Behaviors.same } - } def in(packet: Attempt[PlanetSidePacket]): Unit = { @@ -547,6 +548,391 @@ class MiddlewareActor( } } + /** + * Periodically deal with these concerns: + * the bundling and throttling of outgoing packets + * and the reordering of out-of-sequence incoming packets. + */ + def processQueue(): Unit = { + processOutQueueBundle() + inReorderQueueFunc() + } + + /** + * Outbound packets that have not been caught by other guards need to be bundled + * and placed into `SlottedMetaPacket` messages. + * Multiple packets in one bundle have to be placed into a `MultiPacketEx` message wrapper + * before being placed into the `SlottedMetaPacket`. + * Packets that are too big for the MTU must go on to be split into smaller portions that will be wrapped individually. + * Once queued, the first bundle is dispatched to the network. + */ + def processOutQueueBundle(): Unit = { + try { + if (outQueueBundled.nonEmpty) { + sendFirstBundle() + } else if (outQueue.nonEmpty) { + val bundle = { + var length = 0L + val (_, bundle) = outQueue + .dequeueWhile { + case (packet, payload) => + // packet length + MultiPacketEx header length + val packetLength = payload.length + ( + if (payload.length < 2048) { 8L } //256 * 8; 1L * 8 + else if (payload.length < 524288) { 16L } //65536 * 8; 2L * 8 + else { 32L } //4L * 8 + ) + length += packetLength + + if (packetsBundledByThemselves.exists { _(packet) }) { + if (length == packetLength) { + length += MTU + true //dequeue only packet + } else { + false //dequeue later + } + } else { + // Some packets may be larger than the MTU limit, in that case we dequeue anyway and split later + // We deduct some bytes to leave room for SlottedMetaPacket (4 bytes) and MultiPacketEx (2 bytes + prefix per packet) + length == packetLength || length <= (MTU - 6) * 8 + } + } + .unzip + bundle + } + + if (bundle.length == 1) { + splitPacket(bundle.head) match { + case Seq() => + //TODO is oversized packet recovery possible? + case data => + outQueueBundled.enqueueAll(data) + sendFirstBundle() + } + } else { + PacketCoding.encodePacket(MultiPacketEx(bundle.toVector.map(_.bytes))) match { + case Successful(data) => + outQueueBundled.enqueue(smp(slot = 0, data.bytes)) + sendFirstBundle() + case Failure(cause) => + log.error(cause.message) + //to avoid packets being lost, unwrap bundle and queue the packets individually + bundle.foreach { packet => + outQueueBundled.enqueue(smp(slot = 0, packet.bytes)) + } + sendFirstBundle() + } + } + } + } catch { + case e: Throwable => + log.error(s"outbound queue processing error - ${Option(e.getMessage).getOrElse(e.getClass.getSimpleName)}") + } + } + + /** what to do when a packet unmarshals properly and has a sequence number + * @see `activeNormal` + * @see `activeWithReordering` + */ + private var activeSequenceFunc: (PlanetSidePacket, Int)=>Unit = activeNormal + + /** + * Properly handle the newly-arrived packet based on its sequence number. + * If the packet is in the proper order, or is considered old, by its sequence number, + * act on the packet immediately. + * If the sequence if higher than the next one expected, indicating that some packets may have been missed, + * put that packet to the side and active a resolution queue. + * @param packet the packet + * @param sequence the sequence number obtained from the packet + */ + def activeNormal(packet: PlanetSidePacket, sequence: Int): Unit = { + if (sequence == inSequence + 1) { + inSequence = sequence + in(packet) + } else if(sequence < inSequence) { //expedite this packet + in(packet) + } else if (sequence == inSequence) { + //do nothing? + } else { + inReorderQueue.enqueue(InReorderEntry(packet, sequence, System.currentTimeMillis())) //first entry + inReorderQueueFunc = processInReorderQueueTimeoutOnly + activeSequenceFunc = activeWithReordering + log.trace("packet sequence in disorder") + } + } + + /** + * Properly handle the newly-arrived packet based on its sequence number, + * with an ongoing resolution queue for managing out of sequence packets. + * If the packet is in the proper order, or is considered old, by its sequence number, + * act on the packet immediately. + * If the packet is considered a packet that was expected in the past, + * attempt to clear it from the queue and test the resolution queue for further work. + * If the sequence if higher than the next one expected, indicating that some packets may have been missed, + * add it to the resolution queue. + * Finally, act on the state of the resolution queue. + * @param packet the packet + * @param sequence the sequence number obtained from the packet + */ + def activeWithReordering(packet: PlanetSidePacket, sequence: Int): Unit = { + if (sequence == inSequence + 1) { + inSequence = sequence + in(packet) + processInReorderQueue() + } else if(sequence < inSequence) { //expedite this packet + inReorderQueue.filterInPlace(_.sequence == sequence) + in(packet) + inReorderQueueFunc = inReorderQueueTest + inReorderQueueTest() + } else if (sequence == inSequence) { + //do nothing? + } else { + var insertAtIndex = 0 + val length = inReorderQueue.length + while (insertAtIndex < length && sequence >= inReorderQueue(insertAtIndex).sequence) { + insertAtIndex += 1 + } + inReorderQueue.insert(insertAtIndex, InReorderEntry(packet, sequence, System.currentTimeMillis())) + processInReorderQueue() + } + } + + /** how to periodically respond to the state of the inbound reorder queue; + * administered by the primary packet processor schedule + * @see `inReorderQueueTest` + * @see `processInReorderQueueTimeoutOnly` + */ + private var inReorderQueueFunc: ()=>Unit = doNothing + + /** + * Examine inbound packets that need to be reordered by sequence number and + * pass all packets that are now in the correct order and + * pass packets that have been kept waiting for too long in the queue. + * Set the recorded inbound sequence number to belong to the greatest packet removed from the queue. + * @see `processInReorderQueueTimeoutOnly` + */ + def processInReorderQueue(): Unit = { + timesInReorderQueue += 1 + var currentSequence = inSequence + val currentTime = System.currentTimeMillis() + val takenPackets = (inReorderQueue.indexWhere { currentTime - _.time > inReorderTimeout.toMillis } match { + case -1 => + inReorderQueue + .takeWhile { entry => + // Forward packet if next in sequence order + if (entry.sequence == currentSequence + 1) { + currentSequence = entry.sequence //already ordered by sequence, stays in order during traversal + true + } else { + false + } + } + case index => + // Forward all packets ahead of any packet that has been in the queue for 50ms + val entries = inReorderQueue.take(index + 1) + currentSequence = entries.last.sequence + entries + }).map(_.packet) + inReorderQueue.dropInPlace(takenPackets.length) + inSequence = currentSequence + takenPackets.foreach { p => + inReorderQueueFunc = processInReorderQueueTimeoutOnly + in(p) + } + } + + /** + * Examine inbound packets that need to be reordered by sequence number and + * pass packets that have been kept waiting for too long in the queue. + * Set the recorded inbound sequence number to belong to the greatest packet removed from the queue. + * This may run during the scheduled check on the in-order queue after the outbound bundling process. + */ + def processInReorderQueueTimeoutOnly(): Unit = { + timesInReorderQueue += 1 + val currentTime = System.currentTimeMillis() + val index = inReorderQueue.indexWhere { currentTime - _.time > inReorderTimeout.toMillis } + val takenPackets = inReorderQueue.take(index + 1) + inReorderQueue.dropInPlace(takenPackets.length) + takenPackets.foreach { p => + inReorderQueueFunc = inReorderQueueTest + inSequence = p.sequence + in(p.packet) + } + } + + /** + * If the inbound reorder queue is empty, do no more work on it until work is available. + * Otherwise, do work on whatever is at the front of the queue, and do not test again until explicitly requested. + * Test the queue for more contents after removing content from it. + */ + def inReorderQueueTest(): Unit = { + if (inReorderQueue.isEmpty) { + inReorderQueueFunc = doNothing + activeSequenceFunc = activeNormal + log.trace("normalcy with packet sequence; resuming normal workflow") + //do nothing + } else { + inReorderQueueFunc = processInReorderQueueTimeoutOnly + processInReorderQueue() + } + } + + /** what to do with a `SlottedMetaPacket` control packet + * @see `inSubslotNotMissing` + * @see `inSubslotMissingRequests` + */ + private var activeSubslotsFunc: (Int, Int, ByteVector)=>Unit = inSubslotNotMissing + + /** + * What to do with a `SlottedMetaPacket` control packet normally. + * The typical approach, when the subslot is the expected next number, is to merely receive the packet + * and dispatch a confirmation. + * When the subslot is farther ahead of what is expected, + * requests need to be logged for the missing subslots. + * @param slot the SMP slot (related to the opcode) + * @param subslot the SMP subslot (related to the expected order of packets) + * @param inner the contents of this SMP + * @see `askForMissingSubslots` + * @see `in` + * @see `inSubslotMissingRequests` + * @see `PacketCoding.decodePacket` + * @see `RelatedB` + */ + def inSubslotNotMissing(slot: Int, subslot: Int, inner: ByteVector): Unit = { + if (subslot == inSubslot + 1) { + in(PacketCoding.decodePacket(inner)) + send(RelatedB(slot, subslot)) + inSubslot = subslot + } else if (subslot > inSubslot + 1) { + in(PacketCoding.decodePacket(inner)) + ((inSubslot + 1) until subslot).foreach { s => + inSubslotsMissing.addOne((s, inSubslotMissingNumberOfAttempts)) + } //request missing SMP's + inSubslot = subslot + activeSubslotsFunc = inSubslotMissingRequests + askForMissingSubslots() + log.trace("packet subslots in disorder; start requests") + } + } + + /** + * What to do with an inbound `SlottedMetaPacket` control packet when the subslots are in disarray. + * Whenever a subslot arrives prior to the current highest, removing that subslot from the request list is possible. + * If the proper next subslot arrives, proceeding like normal is fine. + * Do not, however, send any `ResultB` confirmations until no more requests are outstanding. + * @param slot the SMP slot (related to the opcode) + * @param subslot the SMP subslot (related to the expected order of packets) + * @param inner the contents of this SMP + * @see `in` + * @see `inSubslotsMissingRequestsFinished` + * @see `PacketCoding.decodePacket` + */ + def inSubslotMissingRequests(slot: Int, subslot: Int, inner: ByteVector): Unit = { + if (subslot < inSubslot) { + inSubslotsMissing.remove(subslot) + in(PacketCoding.decodePacket(inner)) + inSubslotsMissingRequestsFinished(slot) + } else if (subslot > inSubslot + 1) { + ((inSubslot + 1) until subslot).foreach { s => + inSubslotsMissing.addOne((s, inSubslotMissingNumberOfAttempts)) + } //request missing SMP's + inSubslot = subslot + in(PacketCoding.decodePacket(inner)) + } else if (subslot == inSubslot + 1) { + inSubslot = subslot + in(PacketCoding.decodePacket(inner)) + } + } + + /** + * If there are no more requests for missing subslots, + * resume normal operations when acting upon inbound `SlottedMetaPacket` packets. + * @param slot the optional slot to report the "first" `RelatedB` in a "while" + */ + def inSubslotsMissingRequestsFinished(slot: Int = 0) : Unit = { + if (inSubslotsMissing.isEmpty) { + subslotMissingProcessor.cancel() + activeSubslotsFunc = inSubslotNotMissing + send(RelatedB(slot, inSubslot)) //send a confirmation packet after all requested packets are handled + log.trace("normalcy with packet subslot order; resuming normal workflow") + } + } + + /** + * Start making requests for missing `SlotedMetaPackets` + * if no prior requests were prepared. + * Start the scheduled task and handle the dispatched requests. + * @see `inSubslotsMissingRequestFuncs` + * @see `inSubslotsMissingRequestsFinished` + * @see `RelatedA` + */ + def askForMissingSubslots(): Unit = { + if (subslotMissingProcessor.isCancelled) { + subslotMissingProcessor = + context.system.scheduler.scheduleWithFixedDelay( + initialDelay = 0.milliseconds, + inSubslotMissingDelay + )(()=> { + inSubslotsMissing.synchronized { + timesSubslotMissing += inSubslotsMissing.size + inSubslotsMissing.foreach { case (subslot, attempt) => + val value = attempt - 1 + if(value > 0) { + inSubslotsMissing(subslot) = value + } else { + inSubslotsMissing.remove(subslot) + } + send(RelatedA(0, subslot)) + } + inSubslotsMissingRequestsFinished() + } + }) + } + } + + /** + * Split packet into multiple chunks (if necessary). + * Split packets are wrapped in a `HandleGamePacket` and sent as `SlottedMetaPacket4`. + * The purpose of `SlottedMetaPacket4` may or may not be to indicate a split packet. + */ + def splitPacket(packet: BitVector): Seq[PlanetSideControlPacket] = { + if (packet.length > (MTU - 4) * 8) { + PacketCoding.encodePacket(HandleGamePacket(packet.bytes)) match { + case Successful(data) => + data.grouped((MTU - 8) * 8).map(vec => smp(slot = 4, vec.bytes)).toSeq + case Failure(cause) => + log.error(cause.message) + Seq() + } + } else { + Seq(smp(slot = 0, packet.bytes)) + } + } + + /** + * Create a new `SlottedMetaPacket` with the sequence number filled in and the packet added to the history. + * @param slot the slot for this packet, which influences the type of SMP + * @param data hexadecimal data, the encoded packets to be placed in the SMP + * @return the packet + */ + def smp(slot: Int, data: ByteVector): SlottedMetaPacket = { + val packet = SlottedMetaPacket(slot, nextSubslot, data) + preparedSlottedMetaPackets.update(nextSmpIndex, packet) + nextSmpIndex = (nextSmpIndex + 1) % smpHistoryLength + packet + } + + /** + * Take the first fully-prepared packet from the queue of prepared packets and send it to the client. + * Assign the dispatched packet the latest sequence number. + * Do not call unless confident the the queue has at least one element. + * @throws NoSuchElementException if there is no packet to dequeue + * @see `SlottedMetaPacket` + */ + def sendFirstBundle(): Unit = { + send(outQueueBundled.dequeue(), Some(nextSequence), crypto) + } + def send(packet: PlanetSideControlPacket): ByteVector = { send(packet, if (crypto.isDefined) Some(nextSequence) else None, crypto) } @@ -566,38 +952,28 @@ class MiddlewareActor( socket ! Udp.Send(bytes.toByteString, sender) bytes case Failure(e) => - log.error(s"Failed to encode packet ${packet.getClass.getName}: $e") + log.error(s"Failed to encode packet ${packet.getClass.getSimpleName}: $e") ByteVector.empty } } + /** + * A random series of bytes used as a challenge value. + * @param amount the number of bytes + * @return a random series of bytes + */ def randomBytes(amount: Int): ByteVector = { val array = Array.ofDim[Byte](amount) random.nextBytes(array) ByteVector.view(array) } + /** + * End client-server ops. + * End messaging capabilities. + */ def connectionClose(): Behavior[Command] = { send(ConnectionClose()) Behaviors.stopped } - - /** Split packet into multiple chunks (if necessary) - * Split packets are wrapped in a HandleGamePacket and sent as SlottedMetaPacket4 - * The purpose of SlottedMetaPacket4 may or may not be to indicate a split packet - */ - def splitPacket(packet: BitVector): Seq[PlanetSideControlPacket] = { - if (packet.length > (MTU - 4) * 8) { - PacketCoding.encodePacket(HandleGamePacket(packet.bytes)) match { - case Successful(data) => - data.grouped((MTU - 8) * 8).map(vec => smp(4, vec.bytes)).toSeq - case Failure(cause) => - log.error(cause.message) - Seq() - } - } else { - Seq(smp(0, packet.bytes)) - } - } - } diff --git a/src/main/scala/net/psforever/packet/PacketCoding.scala b/src/main/scala/net/psforever/packet/PacketCoding.scala index e76d35415..9da785fcc 100644 --- a/src/main/scala/net/psforever/packet/PacketCoding.scala +++ b/src/main/scala/net/psforever/packet/PacketCoding.scala @@ -1,7 +1,7 @@ // Copyright (c) 2017 PSForever package net.psforever.packet -import java.security.{Key, SecureRandom, Security} +import java.security.{Key, Security} import javax.crypto.Cipher import javax.crypto.spec.RC5ParameterSpec @@ -16,8 +16,6 @@ import net.psforever.util.Md5Mac object PacketCoding { Security.addProvider(new BouncyCastleProvider) - private val random = new SecureRandom() - val RC5_BLOCK_SIZE = 8 /** A lower bound on the packet size */ @@ -40,9 +38,9 @@ object PacketCoding { case _: PlanetSideControlPacket if crypto.isEmpty => BitVector.empty case _ => sequence match { - case Some(sequence) => - uint16L.encode(sequence) match { - case Successful(seq) => seq + case Some(_sequence) => + uint16L.encode(_sequence) match { + case Successful(_seq) => _seq case f @ Failure(_) => return f } case None => @@ -53,8 +51,8 @@ object PacketCoding { val (flags, payload) = packet match { case _: PlanetSideGamePacket | _: PlanetSideControlPacket if crypto.isDefined => encodePacket(packet) match { - case Successful(payload) => - val encryptedPayload = crypto.get.encrypt(payload.bytes) match { + case Successful(_payload) => + val encryptedPayload = crypto.get.encrypt(_payload.bytes) match { case Successful(p) => p case f: Failure => return f } @@ -68,29 +66,29 @@ object PacketCoding { } case packet: PlanetSideGamePacket => encodePacket(packet) match { - case Successful(payload) => + case Successful(_payload) => ( PlanetSidePacketFlags.codec.encode(PlanetSidePacketFlags(PacketType.Normal, secured = false)).require, - payload + _payload ) case f @ Failure(_) => return f } case packet: PlanetSideControlPacket => encodePacket(packet) match { - case Successful(payload) => + case Successful(_payload) => ( // control packets don't have flags BitVector.empty, - payload + _payload ) case f @ Failure(_) => return f } case packet: PlanetSideCryptoPacket => encodePacket(packet) match { - case Successful(payload) => + case Successful(_payload) => ( PlanetSidePacketFlags.codec.encode(PlanetSidePacketFlags(PacketType.Crypto, secured = false)).require, - payload + _payload ) case f @ Failure(_) => return f } @@ -164,7 +162,7 @@ object PacketCoding { crypto: Option[CryptoCoding] = None ): Attempt[(PlanetSidePacket, Int)] = { val (flags, remainder) = Codec.decode[PlanetSidePacketFlags](BitVector(msg)) match { - case Successful(DecodeResult(value, remainder)) => (value, remainder) + case Successful(DecodeResult(value, _remainder)) => (value, _remainder) case Failure(e) => return Failure(Err(s"Failed to parse packet flags: ${e.message}")) } @@ -184,8 +182,8 @@ object PacketCoding { // all packets have a two byte sequence ID val (sequence, payload) = uint16L.decode(remainder) match { - case Successful(DecodeResult(value, remainder)) => - (value, remainder.toByteVector) + case Successful(DecodeResult(value, _remainder)) => + (value, _remainder.toByteVector) case Failure(e) => return Failure(Err(s"Failed to parse packet sequence number: ${e.message}")) } @@ -195,9 +193,9 @@ object PacketCoding { CryptoPacketOpcode .getPacketDecoder(cryptoState)(payload.bits) .map(p => (p.value.asInstanceOf[PlanetSidePacket], sequence)) - case (PacketType.Normal, Some(crypto)) if flags.secured => + case (PacketType.Normal, Some(_crypto)) if flags.secured => // encrypted payload is 4-byte aligned: 1b flags, 2b sequence, 1b padding - crypto.decrypt(payload.drop(1)).map(p => decodePacket(p)).flatten.map(p => (p, sequence)) + _crypto.decrypt(payload.drop(1)).map(p => decodePacket(p)).flatten.map(p => (p, sequence)) case (PacketType.Normal, None) if !flags.secured => decodePacket(payload).map(p => (p, sequence)) case (PacketType.Normal, None) => @@ -279,7 +277,9 @@ object PacketCoding { Successful(ByteVector.view(rc5Encrypt.doFinal(packetWithPadding.toArray))) } } catch { - case e: Throwable => Failure(Err(s"encrypt error: '${e.getMessage}' data: ${packetWithPadding.toHex}")) + case e: Throwable => + val msg = if(e.getMessage == null) e.getClass.getSimpleName else e.getMessage + Failure(Err(s"encrypt error: '$msg' data: ${packetWithPadding.toHex}")) } } @@ -295,7 +295,7 @@ object PacketCoding { // last byte is the padding length val padding = uint8L.decode(payloadDecrypted.takeRight(1).bits) match { - case Successful(padding) => padding.value + case Successful(_padding) => _padding.value case Failure(e) => return Failure(Err(s"Failed to decode the encrypted padding length: ${e.message}")) } @@ -304,7 +304,7 @@ object PacketCoding { val mac = bytes(Md5Mac.MACLENGTH).decode(payloadMac.bits) match { case Failure(e) => return Failure(Err("Failed to extract the encrypted MAC: " + e.message)) - case Successful(mac) => mac.value + case Successful(_mac) => _mac.value } val payloadNoMac = payloadNoPadding.dropRight(Md5Mac.MACLENGTH) diff --git a/src/main/scala/net/psforever/util/Config.scala b/src/main/scala/net/psforever/util/Config.scala index 08b72739b..f85e30ff5 100644 --- a/src/main/scala/net/psforever/util/Config.scala +++ b/src/main/scala/net/psforever/util/Config.scala @@ -114,7 +114,15 @@ case class AntiCheatConfig( ) case class NetworkConfig( - session: SessionConfig + session: SessionConfig, + middleware: MiddlewareConfig +) + +case class MiddlewareConfig( + packetBundlingDelay: FiniteDuration, + inReorderTimeout: FiniteDuration, + inSubslotMissingDelay: FiniteDuration, + inSubslotMissingAttempts: Int ) case class SessionConfig(