Middleware (#662)

* removed suspicious shadowing and clarified failure message; customized skipped bundling

* smp history is now a no-less-efficient circular array

* adjustment to bundle dispatch timing; adjustment to inbound sequence reorder queue

* adjustments to handling inbound packets with missing subslots

* unused PacketCoding features

* comments; delayed start of the queue processor task; turned sequence reorder task and subslot missing task into function literals

* optimizations to the inbound re-order by sequence routines by controlling execution flow

* the subslot request timer has been separated from the standard bundling processor; config values for bundling, sequence resolution, and subslot requests in the middleware actor have been included

* replacing func-array with conditional logic
This commit is contained in:
Fate-JH 2021-01-12 14:33:44 -05:00 committed by GitHub
parent 563afcdb19
commit b5fc2ecf70
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 649 additions and 248 deletions

View file

@ -99,6 +99,23 @@ anti-cheat {
}
network {
middleware {
# How often between executions of the outbound bundling process
packet-bundling-delay = 25 milliseconds
# Pause inbound packet transmission towards the network if the sequence number is out of order
# Packets are put aside until the sequence is restored, or this timeout passes
in-reorder-timeout = 50 milliseconds
# Wait on inbound packets if that packet is a SlottedMetaPacket and the next subslot number is greater than expected
# Does not stop the transmission of packets to the server
# but dictates how long between requests to the network (client) for missing packets with anticipated subslot numbers
in-subslot-missing-delay = 50 milliseconds
# How many attempts at resolving missing packets with anticipated subslot numbers
in-subslot-missing-attempts = 10
}
session {
# The maximum amount of time since the last inbound packet from a UDP session
# before it is dropped.

View file

@ -1,53 +1,70 @@
package net.psforever.actors.net
import java.net.InetSocketAddress
import java.security.{SecureRandom, Security}
import akka.actor.Cancellable
import akka.actor.typed.{ActorRef, ActorTags, Behavior, PostStop, Signal}
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.io.Udp
import net.psforever.packet.{
CryptoPacketOpcode,
PacketCoding,
PlanetSideControlPacket,
PlanetSideCryptoPacket,
PlanetSideGamePacket,
PlanetSidePacket
}
import net.psforever.packet.control.{
ClientStart,
ConnectionClose,
ControlSync,
ControlSyncResp,
HandleGamePacket,
MultiPacket,
MultiPacketEx,
RelatedA,
RelatedB,
ServerStart,
SlottedMetaPacket,
TeardownConnection
}
import net.psforever.packet.crypto.{ClientChallengeXchg, ClientFinished, ServerChallengeXchg, ServerFinished}
import net.psforever.packet.game.{ChangeFireModeMessage, CharacterInfoMessage, KeepAliveMessage, PingMsg}
import java.net.InetSocketAddress
import java.security.{SecureRandom, Security}
import javax.crypto.spec.SecretKeySpec
import org.bouncycastle.jce.provider.BouncyCastleProvider
import scala.collection.concurrent.TrieMap
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.duration._
import scodec.Attempt
import scodec.Attempt.{Failure, Successful}
import scodec.bits.{BitVector, ByteVector, HexStringSyntax}
import scodec.interop.akka.EnrichedByteVector
import javax.crypto.spec.SecretKeySpec
import net.psforever.objects.Default
import net.psforever.packet.{CryptoPacketOpcode, PacketCoding, PlanetSideControlPacket, PlanetSideCryptoPacket, PlanetSideGamePacket, PlanetSidePacket}
import net.psforever.packet.control.{ClientStart, ConnectionClose, ControlSync, ControlSyncResp, HandleGamePacket, MultiPacket, MultiPacketEx, RelatedA, RelatedB, ServerStart, SlottedMetaPacket, TeardownConnection}
import net.psforever.packet.crypto.{ClientChallengeXchg, ClientFinished, ServerChallengeXchg, ServerFinished}
import net.psforever.packet.game.{ChangeFireModeMessage, CharacterInfoMessage, KeepAliveMessage, PingMsg}
import net.psforever.packet.PacketCoding.CryptoCoding
import net.psforever.util.{DiffieHellman, Md5Mac}
import org.bouncycastle.jce.provider.BouncyCastleProvider
import scodec.Attempt
import net.psforever.util.{Config, DiffieHellman, Md5Mac}
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.concurrent.duration._
/** MiddlewareActor sits between the raw UDP socket and the "main" actors (either login or session) and handles
* crypto and control packets. This means it sets up cryptography, it decodes incoming packets,
* it encodes, bundles and splits outgoing packets, it handles things like requesting/resending lost packets and more.
/**
* `MiddlewareActor` sits between the raw UDP socket and the "main" actors
* (either `LoginActor` or `SessionActor`)
* and handles crypto and control packets.
* The former side is called the outbound network (the clients); the former is called the inbound local (server).
* This service sets up cryptography, it decodes incoming packets, it encodes, bundles, or splits outgoing packets,
* and it handles things like requesting/resending lost packets.
* Accurate operation of the service is mostly a product of the network being agreeable
* and allowing packets to arrive correctly.
* Various subroutines are used to keep track of the predicted offending packets.<br>
* <br>
* The cryptographic aspect of the service resolves itself first,
* exchanging keys and passcodes and challeneges between local and the network.
* Most of this process occurs without any prior cryptographic setup,
* relying on the validation of its own exchange to operate.
* Afterwards its completion,
* all raw data arriving from the network or leaving to the network will require a cipher
* based on the previously exchanged data.
* <br>
* As packets arrive from local with the intention of being sent out towards the network terminus,
* they will be pooled into a buffer.
* Periodically, the buffer contents will be evaluated,
* and packet data of enough quantity and combined length will be assembled into a singular packet bundle.
* This bundle will be queued and dispatched towards the network.
* If the outbound packets do not arrive at the network terminus correctly,
* the network has a method of dispatching requests for identified missing packets.
* This side of the communication will keep track of its previously dispatched packets for a "reasonable" amount of time
* and will respond to those requests if possible by searching its catalog.<br>
* <br>
* If the inbound packets do not arrive correctly the first time,
* after a while, requests for identified mising packets from the network source will occur.
* Virtually all important packets have a sequence number that bestows on each packet an absolute delivery order.
* Bundled packets have a subslot number that indicates the total number of subslot packets dispatched to this client
* as well as the order in which the packets should have been received.
* If a packet is out of sequence - a future packet, compared to what is being expected - it is buffered.
* Should the expected packets show up out of order before the buffered is cleared,
* everything sorts itself out.
* Unresolved missing sequence entries will often lead to requests for missing packets with anticipated subslots.
* If these requests do not resolve, there is unfortuantely not much that can be done except grin and bear with it.
*/
object MiddlewareActor {
Security.addProvider(new BouncyCastleProvider)
@ -77,8 +94,52 @@ object MiddlewareActor {
/** Close connection */
final case class Close() extends Command
/** ... */
private case class ProcessQueue() extends Command
/** Log inbound packets that are yet to be in proper order by sequence number */
private case class InReorderEntry(packet: PlanetSidePacket, sequence: Int, time: Long)
/**
* All packets are bundled by themselves.
* May as well just waste all of the cycles on your CPU, eh?
*/
def allPacketGuard(packet: PlanetSidePacket): Boolean = true
/**
* `CharacterInfoMessage` packets are bundled by themselves.<br>
* <br>
* Super awkward special case.
* Bundling `CharacterInfoMessage` with its corresponding `ObjectCreateDetailedMesssage`,
* which can occur during otherwise careless execution of the character select screen,
* causes the character options to show blank slots and be unusable.
*/
def characterInfoMessageGuard(packet: PlanetSidePacket): Boolean = {
packet.isInstanceOf[CharacterInfoMessage]
}
/**
* `KeepAliveMessage` packets are bundled by themselves.
* They're special.
*/
def keepAliveMessageGuard(packet: PlanetSidePacket): Boolean = {
packet.isInstanceOf[KeepAliveMessage]
}
/**
* A function for blanking tasks related to inbound packet resolution.
* Do nothing.
* Wait to be told to do something.
*/
private def doNothing(): Unit = { }
}
/**
* MiddlewareActor sits between the raw UDP socket and the "main" actors (either login or session) and handles
* crypto and control packets. This means it sets up cryptography, it decodes incoming packets,
* it encodes, bundles and splits outgoing packets, it handles things like requesting/resending lost packets and more.
*/
class MiddlewareActor(
context: ActorContext[MiddlewareActor.Command],
socket: ActorRef[Udp.Command],
@ -106,16 +167,16 @@ class MiddlewareActor(
context.spawnAnonymous(next(context.self, sender, connectionId), ActorTags(s"id=$connectionId"))
/** Queue of incoming packets (plus sequence numbers and timestamps) that arrived in the wrong order */
val inReorderQueue: ListBuffer[(PlanetSidePacket, Int, Long)] = ListBuffer()
private val inReorderQueue: mutable.Queue[InReorderEntry] = mutable.Queue()
/** Latest incoming sequence number */
var inSequence = 0
var inSequence = -1
/** Latest incoming subslot number */
var inSubslot = 0
var inSubslot = -1
/** List of missing subslot numbers and attempts counter */
var inSubslotsMissing: mutable.Map[Int, Int] = mutable.Map()
val inSubslotsMissing: mutable.Map[Int, Int] = TrieMap()
/** Queue of outgoing packets used for bundling and splitting */
val outQueue: mutable.Queue[(PlanetSidePacket, BitVector)] = mutable.Queue()
@ -123,9 +184,16 @@ class MiddlewareActor(
/** Queue of outgoing packets ready for sending */
val outQueueBundled: mutable.Queue[PlanetSidePacket] = mutable.Queue()
/** Latest outgoing sequence number */
/** Latest outbound sequence number;
* the current sequence is one less than this number */
var outSequence = 0
/**
* Increment the outbound sequence number.
* The previous sequence number is returned.
* The fidelity of the sequence field in packets is 16 bits, so wrap back to 0 after 65535.
* @return
*/
def nextSequence: Int = {
val r = outSequence
if (outSequence == 0xffff) {
@ -136,9 +204,16 @@ class MiddlewareActor(
r
}
/** Latest outgoing subslot number */
/** Latest outbound subslot number;
* the current subslot is one less than this number */
var outSubslot = 0
/**
* Increment the outbound subslot number.
* The previous subslot number is returned.
* The fidelity of the subslot field in `SlottedMetapacket`'s is 16 bits, so wrap back to 0 after 65535.
* @return
*/
def nextSubslot: Int = {
val r = outSubslot
if (outSubslot == 0xffff) {
@ -149,111 +224,46 @@ class MiddlewareActor(
r
}
/** Create a new SlottedMetaPacket with the sequence number filled in and the packet added to the history */
def smp(slot: Int, data: ByteVector): SlottedMetaPacket = {
if (outSlottedMetaPackets.length > 100) {
outSlottedMetaPackets = outSlottedMetaPackets.takeRight(100)
}
val packet = SlottedMetaPacket(slot, nextSubslot, data)
outSlottedMetaPackets += packet
packet
}
/**
* Do not bundle these packets together with other packets
*/
val packetsBundledByThemselves: List[PlanetSidePacket=>Boolean] = List(
MiddlewareActor.keepAliveMessageGuard,
MiddlewareActor.characterInfoMessageGuard
)
/** History of sent SlottedMetaPackets in case the client requests missing SMP packets via a RelatedA packet. */
var outSlottedMetaPackets: ListBuffer[SlottedMetaPacket] = ListBuffer()
val smpHistoryLength: Int = 100
/** History of created `SlottedMetaPacket`s.
* In case the client does not register receiving a packet by checking against packet subslot index numbers,
* it will dispatch a `RelatedA` packet,
* and the server will hopefully locate the packet where it has been backlogged.
* The client will also dispatch a `RelatedB` packet to indicate the packet with the highest subslot received.
* All packets with subslots less than that number have been received or will no longer be requested.
* The client and server supposedly maintain reciprocating mechanisms.
*/
val preparedSlottedMetaPackets: Array[SlottedMetaPacket] = new Array[SlottedMetaPacket](smpHistoryLength)
var nextSmpIndex: Int = 0
var acceptedSmpSubslot: Int = 0
/** Timer that handles the bundling and throttling of outgoing packets and the reordering of incoming packets */
val queueProcessor: Cancellable = {
context.system.scheduler.scheduleWithFixedDelay(10.milliseconds, 10.milliseconds)(() => {
try {
/** end of life stat */
var timesInReorderQueue: Int = 0
/** end of life stat */
var timesSubslotMissing: Int = 0
/** Delay between runs of the packet bundler/resolver timer (ms);
* 250ms per network update (client upstream), so 10 runs of this bundling code every update */
val packetProcessorDelay = Config.app.network.middleware.packetBundlingDelay
/** Timer that handles the bundling and throttling of outgoing packets and resolves disorganized inbound packets */
var packetProcessor: Cancellable = Default.Cancellable
/** how long packets that are out of sequential order wait for the missing sequence before being expedited (ms) */
val inReorderTimeout = Config.app.network.middleware.inReorderTimeout
/** Timer that handles the bundling and throttling of outgoing packets requesting packets with known subslot numbers */
var subslotMissingProcessor: Cancellable = Default.Cancellable
/** how long to wait between repeated requests for packets with known missing subslot numbers (ms) */
val inSubslotMissingDelay = Config.app.network.middleware.inSubslotMissingDelay
/** how many time to repeat the request for a packet with a known missing subslot number */
val inSubslotMissingNumberOfAttempts = Config.app.network.middleware.inSubslotMissingAttempts
if (outQueue.nonEmpty && outQueueBundled.isEmpty) {
var length = 0L
val bundle = outQueue
.dequeueWhile {
case (packet, payload) =>
// packet length + MultiPacketEx prefix length
val packetLength = payload.length + (if (payload.length < 256 * 8) { 1L * 8 }
else if (payload.length < 65536 * 8) { 2L * 8 }
else { 4L * 8 })
length += packetLength
packet match {
// Super awkward special case: Bundling CharacterInfoMessage with OCDM causes the character selection
// to show blank lines and be broken. So we make sure CharacterInfoMessage is always sent as the only
// packet in a bundle.
case _: CharacterInfoMessage =>
if (length == packetLength) {
length += MTU
true
} else {
false
}
case _ =>
// Some packets may be larger than the MTU limit, in that case we dequeue anyway and split later
// We deduct some bytes to leave room for SlottedMetaPacket (4 bytes) and MultiPacketEx (2 bytes + prefix per packet)
length == packetLength || length <= (MTU - 6) * 8
}
}
.map(_._2)
if (bundle.length == 1) {
outQueueBundled.enqueueAll(splitPacket(bundle.head))
} else {
PacketCoding.encodePacket(MultiPacketEx(bundle.toVector.map(_.bytes))) match {
case Successful(data) => outQueueBundled.enqueue(smp(0, data.bytes))
case Failure(cause) => log.error(cause.message)
}
}
}
outQueueBundled.dequeueFirst(_ => true) match {
case Some(packet) => send(packet, Some(nextSequence), crypto)
case None => ()
}
if (inReorderQueue.nonEmpty) {
var currentSequence = inSequence
val currentTime = System.currentTimeMillis()
inReorderQueue
.sortBy(_._2)
.dropWhile {
case (_, sequence, time) =>
// Forward packet if next in sequence order or older than 20ms
if (sequence == currentSequence + 1 || currentTime - time > 20) {
currentSequence += 1
true
} else {
false
}
}
.foreach {
case (packet, sequence, _) =>
if (sequence > inSequence) {
inSequence = sequence
}
in(packet)
}
}
if (inSubslotsMissing.nonEmpty) {
inSubslotsMissing.foreach {
case (subslot, attempts) =>
if (attempts <= 50) {
// Slight hack to send RelatedA less frequently, might want to put this on a separate timer
if (attempts % 10 == 0) send(RelatedA(0, subslot))
inSubslotsMissing(subslot) += 1
} else {
log.warn(s"Requesting subslot '$subslot' from client failed")
inSubslotsMissing.remove(subslot)
}
}
}
} catch {
case e: Throwable => log.error(e)("Queue processing error")
}
})
}
//formerly, CryptoSessionActor
def start(): Behavior[Command] = {
Behaviors.receiveMessagePartial {
@ -269,7 +279,7 @@ class MiddlewareActor(
// TODO ResetSequence
case _ =>
log.error(s"Unexpected packet type $packet in init")
log.warn(s"Unexpected packet type $packet in start (before crypto)")
Behaviors.same
}
case Failure(_) =>
@ -283,9 +293,12 @@ class MiddlewareActor(
// reflect the packet back to the sender
send(ping)
Behaviors.same
case _: ChangeFireModeMessage =>
log.trace(s"What is this packet that just arrived? ${msg.toString}")
//ignore
Behaviors.same
case _ =>
log.error(s"Unexpected non-crypto packet type $packet in start")
Behaviors.same
@ -312,18 +325,14 @@ class MiddlewareActor(
packet match {
case (ClientChallengeXchg(time, challenge, p, g), Some(_)) =>
serverMACBuffer ++= msg.drop(3)
val dh = DiffieHellman(p.toArray, g.toArray)
val clientChallenge = ServerChallengeXchg.getCompleteChallenge(time, challenge)
val serverTime = System.currentTimeMillis() / 1000L
val randomChallenge = randomBytes(0xc)
val serverChallenge = ServerChallengeXchg.getCompleteChallenge(serverTime, randomChallenge)
serverMACBuffer ++= send(
ServerChallengeXchg(serverTime, randomChallenge, ByteVector.view(dh.publicKey))
).drop(3)
cryptoFinish(dh, clientChallenge, serverChallenge)
case _ =>
@ -350,34 +359,18 @@ class MiddlewareActor(
packet match {
case (ClientFinished(clientPubKey, _), Some(_)) =>
serverMACBuffer ++= msg.drop(3)
val agreedKey = dh.agree(clientPubKey.toArray)
val agreedMessage = ByteVector("master secret".getBytes) ++ clientChallenge ++
hex"00000000" ++ serverChallenge ++ hex"00000000"
val masterSecret = new Md5Mac(ByteVector.view(agreedKey)).updateFinal(agreedMessage)
val mac = new Md5Mac(masterSecret)
// To do? verify client challenge. The code below has always been commented out, so it probably never
// worked and it surely doesn't work now. The whole cryptography is flawed because
// of the 128bit p values for DH, so implementing security features is probably not worth it.
/*
val clientChallengeExpanded = mac.updateFinal(
ByteVector(
"client finished".getBytes
) ++ serverMACBuffer ++ hex"01" ++ clientChallengeResult ++ hex"01",
0xc
)
*/
//TODO verify client challenge?
val serverChallengeResult = mac
.updateFinal(ByteVector("server finished".getBytes) ++ serverMACBuffer ++ hex"01", 0xc)
val encExpansion = ByteVector.view("server expansion".getBytes) ++ hex"0000" ++ serverChallenge ++
hex"00000000" ++ clientChallenge ++ hex"00000000"
val decExpansion = ByteVector.view("client expansion".getBytes) ++ hex"0000" ++ serverChallenge ++
hex"00000000" ++ clientChallenge ++ hex"00000000"
val expandedEncKey = mac.updateFinal(encExpansion, 64)
val expandedDecKey = mac.updateFinal(decExpansion, 64)
@ -389,9 +382,15 @@ class MiddlewareActor(
expandedDecKey.slice(20, 36)
)
)
send(ServerFinished(serverChallengeResult))
//start the queue processor loop
packetProcessor =
context.system.scheduler.scheduleWithFixedDelay(
packetProcessorDelay,
packetProcessorDelay
)(()=> {
context.self ! ProcessQueue()
})
active()
case other =>
@ -409,20 +408,19 @@ class MiddlewareActor(
.receiveSignal(onSignal)
}
//formerly, PacketCodingActor
def active(): Behavior[Command] = {
Behaviors
.receiveMessage[Command] {
case Receive(msg) =>
PacketCoding.unmarshalPacket(msg, crypto) match {
case Successful((packet, Some(sequence))) =>
if (sequence == inSequence + 1) {
inSequence = sequence
activeSequenceFunc(packet, sequence)
case Successful((packet, None)) =>
in(packet)
} else {
inReorderQueue.addOne((packet, sequence, System.currentTimeMillis()))
}
case Successful((packet, None)) => in(packet)
case Failure(e) => log.error(s"Could not decode packet: $e")
case Failure(e) =>
log.error(s"could not decode packet: $e")
}
Behaviors.same
@ -430,6 +428,10 @@ class MiddlewareActor(
out(packet)
Behaviors.same
case ProcessQueue() =>
processQueue()
Behaviors.same
case Teardown() =>
send(TeardownConnection(clientNonce))
context.self ! Close()
@ -438,7 +440,7 @@ class MiddlewareActor(
case Close() =>
outQueue
.dequeueAll(_ => true)
.foreach(p => send(smp(0, p._2.bytes), Some(nextSequence), crypto))
.foreach(p => send(smp(slot = 0, p._2.bytes), Some(nextSequence), crypto))
connectionClose()
}
.receiveSignal(onSignal)
@ -447,7 +449,13 @@ class MiddlewareActor(
val onSignal: PartialFunction[(ActorContext[Command], Signal), Behavior[Command]] = {
case (_, PostStop) =>
context.stop(nextActor)
queueProcessor.cancel()
if(timesInReorderQueue > 0 || timesSubslotMissing > 0) {
log.trace(s"out of sequence checks: $timesInReorderQueue, subslot missing checks: $timesSubslotMissing")
}
packetProcessor.cancel()
subslotMissingProcessor.cancel()
inReorderQueue.clear()
inSubslotsMissing.clear()
Behaviors.same
}
@ -461,17 +469,7 @@ class MiddlewareActor(
case packet: PlanetSideControlPacket =>
packet match {
case SlottedMetaPacket(slot, subslot, inner) =>
if (subslot > inSubslot + 1) {
((inSubslot + 1) until subslot).foreach(s => inSubslotsMissing.addOne((s, 0)))
} else if (inSubslotsMissing.contains(subslot)) {
inSubslotsMissing.remove(subslot)
} else if (inSubslotsMissing.isEmpty) {
send(RelatedB(slot, subslot))
}
if (subslot > inSubslot) {
inSubslot = subslot
}
in(PacketCoding.decodePacket(inner))
activeSubslotsFunc(slot, subslot, inner)
Behaviors.same
case MultiPacket(packets) =>
@ -483,15 +481,19 @@ class MiddlewareActor(
Behaviors.same
case RelatedA(slot, subslot) =>
log.info(s"Client indicated a packet is missing prior to slot '$slot' and subslot '$subslot'")
outSlottedMetaPackets.find(_.subslot == subslot - 1) match {
case Some(_packet) => outQueueBundled.enqueue(_packet)
case None => log.warn(s"Client requested unknown subslot '$subslot'")
val requestedSubslot = subslot - 1
preparedSlottedMetaPackets.find(_.subslot == requestedSubslot) match {
case Some(_packet) =>
outQueueBundled.enqueue(_packet)
case None if requestedSubslot < acceptedSmpSubslot =>
log.warn(s"Client indicated an smp of slot $slot prior to $subslot that is no longer logged")
case None =>
log.warn(s"Client indicated an smp of slot $slot prior to $subslot that is not found")
}
Behaviors.same
case RelatedB(_, subslot) =>
outSlottedMetaPackets = outSlottedMetaPackets.filter(_.subslot > subslot)
acceptedSmpSubslot = subslot
Behaviors.same
case ControlSync(diff, _, _, _, _, _, fa, fb) =>
@ -523,7 +525,6 @@ class MiddlewareActor(
log.error(s"Unexpected crypto packet '$packet'")
Behaviors.same
}
}
def in(packet: Attempt[PlanetSidePacket]): Unit = {
@ -547,6 +548,391 @@ class MiddlewareActor(
}
}
/**
* Periodically deal with these concerns:
* the bundling and throttling of outgoing packets
* and the reordering of out-of-sequence incoming packets.
*/
def processQueue(): Unit = {
processOutQueueBundle()
inReorderQueueFunc()
}
/**
* Outbound packets that have not been caught by other guards need to be bundled
* and placed into `SlottedMetaPacket` messages.
* Multiple packets in one bundle have to be placed into a `MultiPacketEx` message wrapper
* before being placed into the `SlottedMetaPacket`.
* Packets that are too big for the MTU must go on to be split into smaller portions that will be wrapped individually.
* Once queued, the first bundle is dispatched to the network.
*/
def processOutQueueBundle(): Unit = {
try {
if (outQueueBundled.nonEmpty) {
sendFirstBundle()
} else if (outQueue.nonEmpty) {
val bundle = {
var length = 0L
val (_, bundle) = outQueue
.dequeueWhile {
case (packet, payload) =>
// packet length + MultiPacketEx header length
val packetLength = payload.length + (
if (payload.length < 2048) { 8L } //256 * 8; 1L * 8
else if (payload.length < 524288) { 16L } //65536 * 8; 2L * 8
else { 32L } //4L * 8
)
length += packetLength
if (packetsBundledByThemselves.exists { _(packet) }) {
if (length == packetLength) {
length += MTU
true //dequeue only packet
} else {
false //dequeue later
}
} else {
// Some packets may be larger than the MTU limit, in that case we dequeue anyway and split later
// We deduct some bytes to leave room for SlottedMetaPacket (4 bytes) and MultiPacketEx (2 bytes + prefix per packet)
length == packetLength || length <= (MTU - 6) * 8
}
}
.unzip
bundle
}
if (bundle.length == 1) {
splitPacket(bundle.head) match {
case Seq() =>
//TODO is oversized packet recovery possible?
case data =>
outQueueBundled.enqueueAll(data)
sendFirstBundle()
}
} else {
PacketCoding.encodePacket(MultiPacketEx(bundle.toVector.map(_.bytes))) match {
case Successful(data) =>
outQueueBundled.enqueue(smp(slot = 0, data.bytes))
sendFirstBundle()
case Failure(cause) =>
log.error(cause.message)
//to avoid packets being lost, unwrap bundle and queue the packets individually
bundle.foreach { packet =>
outQueueBundled.enqueue(smp(slot = 0, packet.bytes))
}
sendFirstBundle()
}
}
}
} catch {
case e: Throwable =>
log.error(s"outbound queue processing error - ${Option(e.getMessage).getOrElse(e.getClass.getSimpleName)}")
}
}
/** what to do when a packet unmarshals properly and has a sequence number
* @see `activeNormal`
* @see `activeWithReordering`
*/
private var activeSequenceFunc: (PlanetSidePacket, Int)=>Unit = activeNormal
/**
* Properly handle the newly-arrived packet based on its sequence number.
* If the packet is in the proper order, or is considered old, by its sequence number,
* act on the packet immediately.
* If the sequence if higher than the next one expected, indicating that some packets may have been missed,
* put that packet to the side and active a resolution queue.
* @param packet the packet
* @param sequence the sequence number obtained from the packet
*/
def activeNormal(packet: PlanetSidePacket, sequence: Int): Unit = {
if (sequence == inSequence + 1) {
inSequence = sequence
in(packet)
} else if(sequence < inSequence) { //expedite this packet
in(packet)
} else if (sequence == inSequence) {
//do nothing?
} else {
inReorderQueue.enqueue(InReorderEntry(packet, sequence, System.currentTimeMillis())) //first entry
inReorderQueueFunc = processInReorderQueueTimeoutOnly
activeSequenceFunc = activeWithReordering
log.trace("packet sequence in disorder")
}
}
/**
* Properly handle the newly-arrived packet based on its sequence number,
* with an ongoing resolution queue for managing out of sequence packets.
* If the packet is in the proper order, or is considered old, by its sequence number,
* act on the packet immediately.
* If the packet is considered a packet that was expected in the past,
* attempt to clear it from the queue and test the resolution queue for further work.
* If the sequence if higher than the next one expected, indicating that some packets may have been missed,
* add it to the resolution queue.
* Finally, act on the state of the resolution queue.
* @param packet the packet
* @param sequence the sequence number obtained from the packet
*/
def activeWithReordering(packet: PlanetSidePacket, sequence: Int): Unit = {
if (sequence == inSequence + 1) {
inSequence = sequence
in(packet)
processInReorderQueue()
} else if(sequence < inSequence) { //expedite this packet
inReorderQueue.filterInPlace(_.sequence == sequence)
in(packet)
inReorderQueueFunc = inReorderQueueTest
inReorderQueueTest()
} else if (sequence == inSequence) {
//do nothing?
} else {
var insertAtIndex = 0
val length = inReorderQueue.length
while (insertAtIndex < length && sequence >= inReorderQueue(insertAtIndex).sequence) {
insertAtIndex += 1
}
inReorderQueue.insert(insertAtIndex, InReorderEntry(packet, sequence, System.currentTimeMillis()))
processInReorderQueue()
}
}
/** how to periodically respond to the state of the inbound reorder queue;
* administered by the primary packet processor schedule
* @see `inReorderQueueTest`
* @see `processInReorderQueueTimeoutOnly`
*/
private var inReorderQueueFunc: ()=>Unit = doNothing
/**
* Examine inbound packets that need to be reordered by sequence number and
* pass all packets that are now in the correct order and
* pass packets that have been kept waiting for too long in the queue.
* Set the recorded inbound sequence number to belong to the greatest packet removed from the queue.
* @see `processInReorderQueueTimeoutOnly`
*/
def processInReorderQueue(): Unit = {
timesInReorderQueue += 1
var currentSequence = inSequence
val currentTime = System.currentTimeMillis()
val takenPackets = (inReorderQueue.indexWhere { currentTime - _.time > inReorderTimeout.toMillis } match {
case -1 =>
inReorderQueue
.takeWhile { entry =>
// Forward packet if next in sequence order
if (entry.sequence == currentSequence + 1) {
currentSequence = entry.sequence //already ordered by sequence, stays in order during traversal
true
} else {
false
}
}
case index =>
// Forward all packets ahead of any packet that has been in the queue for 50ms
val entries = inReorderQueue.take(index + 1)
currentSequence = entries.last.sequence
entries
}).map(_.packet)
inReorderQueue.dropInPlace(takenPackets.length)
inSequence = currentSequence
takenPackets.foreach { p =>
inReorderQueueFunc = processInReorderQueueTimeoutOnly
in(p)
}
}
/**
* Examine inbound packets that need to be reordered by sequence number and
* pass packets that have been kept waiting for too long in the queue.
* Set the recorded inbound sequence number to belong to the greatest packet removed from the queue.
* This may run during the scheduled check on the in-order queue after the outbound bundling process.
*/
def processInReorderQueueTimeoutOnly(): Unit = {
timesInReorderQueue += 1
val currentTime = System.currentTimeMillis()
val index = inReorderQueue.indexWhere { currentTime - _.time > inReorderTimeout.toMillis }
val takenPackets = inReorderQueue.take(index + 1)
inReorderQueue.dropInPlace(takenPackets.length)
takenPackets.foreach { p =>
inReorderQueueFunc = inReorderQueueTest
inSequence = p.sequence
in(p.packet)
}
}
/**
* If the inbound reorder queue is empty, do no more work on it until work is available.
* Otherwise, do work on whatever is at the front of the queue, and do not test again until explicitly requested.
* Test the queue for more contents after removing content from it.
*/
def inReorderQueueTest(): Unit = {
if (inReorderQueue.isEmpty) {
inReorderQueueFunc = doNothing
activeSequenceFunc = activeNormal
log.trace("normalcy with packet sequence; resuming normal workflow")
//do nothing
} else {
inReorderQueueFunc = processInReorderQueueTimeoutOnly
processInReorderQueue()
}
}
/** what to do with a `SlottedMetaPacket` control packet
* @see `inSubslotNotMissing`
* @see `inSubslotMissingRequests`
*/
private var activeSubslotsFunc: (Int, Int, ByteVector)=>Unit = inSubslotNotMissing
/**
* What to do with a `SlottedMetaPacket` control packet normally.
* The typical approach, when the subslot is the expected next number, is to merely receive the packet
* and dispatch a confirmation.
* When the subslot is farther ahead of what is expected,
* requests need to be logged for the missing subslots.
* @param slot the SMP slot (related to the opcode)
* @param subslot the SMP subslot (related to the expected order of packets)
* @param inner the contents of this SMP
* @see `askForMissingSubslots`
* @see `in`
* @see `inSubslotMissingRequests`
* @see `PacketCoding.decodePacket`
* @see `RelatedB`
*/
def inSubslotNotMissing(slot: Int, subslot: Int, inner: ByteVector): Unit = {
if (subslot == inSubslot + 1) {
in(PacketCoding.decodePacket(inner))
send(RelatedB(slot, subslot))
inSubslot = subslot
} else if (subslot > inSubslot + 1) {
in(PacketCoding.decodePacket(inner))
((inSubslot + 1) until subslot).foreach { s =>
inSubslotsMissing.addOne((s, inSubslotMissingNumberOfAttempts))
} //request missing SMP's
inSubslot = subslot
activeSubslotsFunc = inSubslotMissingRequests
askForMissingSubslots()
log.trace("packet subslots in disorder; start requests")
}
}
/**
* What to do with an inbound `SlottedMetaPacket` control packet when the subslots are in disarray.
* Whenever a subslot arrives prior to the current highest, removing that subslot from the request list is possible.
* If the proper next subslot arrives, proceeding like normal is fine.
* Do not, however, send any `ResultB` confirmations until no more requests are outstanding.
* @param slot the SMP slot (related to the opcode)
* @param subslot the SMP subslot (related to the expected order of packets)
* @param inner the contents of this SMP
* @see `in`
* @see `inSubslotsMissingRequestsFinished`
* @see `PacketCoding.decodePacket`
*/
def inSubslotMissingRequests(slot: Int, subslot: Int, inner: ByteVector): Unit = {
if (subslot < inSubslot) {
inSubslotsMissing.remove(subslot)
in(PacketCoding.decodePacket(inner))
inSubslotsMissingRequestsFinished(slot)
} else if (subslot > inSubslot + 1) {
((inSubslot + 1) until subslot).foreach { s =>
inSubslotsMissing.addOne((s, inSubslotMissingNumberOfAttempts))
} //request missing SMP's
inSubslot = subslot
in(PacketCoding.decodePacket(inner))
} else if (subslot == inSubslot + 1) {
inSubslot = subslot
in(PacketCoding.decodePacket(inner))
}
}
/**
* If there are no more requests for missing subslots,
* resume normal operations when acting upon inbound `SlottedMetaPacket` packets.
* @param slot the optional slot to report the "first" `RelatedB` in a "while"
*/
def inSubslotsMissingRequestsFinished(slot: Int = 0) : Unit = {
if (inSubslotsMissing.isEmpty) {
subslotMissingProcessor.cancel()
activeSubslotsFunc = inSubslotNotMissing
send(RelatedB(slot, inSubslot)) //send a confirmation packet after all requested packets are handled
log.trace("normalcy with packet subslot order; resuming normal workflow")
}
}
/**
* Start making requests for missing `SlotedMetaPackets`
* if no prior requests were prepared.
* Start the scheduled task and handle the dispatched requests.
* @see `inSubslotsMissingRequestFuncs`
* @see `inSubslotsMissingRequestsFinished`
* @see `RelatedA`
*/
def askForMissingSubslots(): Unit = {
if (subslotMissingProcessor.isCancelled) {
subslotMissingProcessor =
context.system.scheduler.scheduleWithFixedDelay(
initialDelay = 0.milliseconds,
inSubslotMissingDelay
)(()=> {
inSubslotsMissing.synchronized {
timesSubslotMissing += inSubslotsMissing.size
inSubslotsMissing.foreach { case (subslot, attempt) =>
val value = attempt - 1
if(value > 0) {
inSubslotsMissing(subslot) = value
} else {
inSubslotsMissing.remove(subslot)
}
send(RelatedA(0, subslot))
}
inSubslotsMissingRequestsFinished()
}
})
}
}
/**
* Split packet into multiple chunks (if necessary).
* Split packets are wrapped in a `HandleGamePacket` and sent as `SlottedMetaPacket4`.
* The purpose of `SlottedMetaPacket4` may or may not be to indicate a split packet.
*/
def splitPacket(packet: BitVector): Seq[PlanetSideControlPacket] = {
if (packet.length > (MTU - 4) * 8) {
PacketCoding.encodePacket(HandleGamePacket(packet.bytes)) match {
case Successful(data) =>
data.grouped((MTU - 8) * 8).map(vec => smp(slot = 4, vec.bytes)).toSeq
case Failure(cause) =>
log.error(cause.message)
Seq()
}
} else {
Seq(smp(slot = 0, packet.bytes))
}
}
/**
* Create a new `SlottedMetaPacket` with the sequence number filled in and the packet added to the history.
* @param slot the slot for this packet, which influences the type of SMP
* @param data hexadecimal data, the encoded packets to be placed in the SMP
* @return the packet
*/
def smp(slot: Int, data: ByteVector): SlottedMetaPacket = {
val packet = SlottedMetaPacket(slot, nextSubslot, data)
preparedSlottedMetaPackets.update(nextSmpIndex, packet)
nextSmpIndex = (nextSmpIndex + 1) % smpHistoryLength
packet
}
/**
* Take the first fully-prepared packet from the queue of prepared packets and send it to the client.
* Assign the dispatched packet the latest sequence number.
* Do not call unless confident the the queue has at least one element.
* @throws NoSuchElementException if there is no packet to dequeue
* @see `SlottedMetaPacket`
*/
def sendFirstBundle(): Unit = {
send(outQueueBundled.dequeue(), Some(nextSequence), crypto)
}
def send(packet: PlanetSideControlPacket): ByteVector = {
send(packet, if (crypto.isDefined) Some(nextSequence) else None, crypto)
}
@ -566,38 +952,28 @@ class MiddlewareActor(
socket ! Udp.Send(bytes.toByteString, sender)
bytes
case Failure(e) =>
log.error(s"Failed to encode packet ${packet.getClass.getName}: $e")
log.error(s"Failed to encode packet ${packet.getClass.getSimpleName}: $e")
ByteVector.empty
}
}
/**
* A random series of bytes used as a challenge value.
* @param amount the number of bytes
* @return a random series of bytes
*/
def randomBytes(amount: Int): ByteVector = {
val array = Array.ofDim[Byte](amount)
random.nextBytes(array)
ByteVector.view(array)
}
/**
* End client-server ops.
* End messaging capabilities.
*/
def connectionClose(): Behavior[Command] = {
send(ConnectionClose())
Behaviors.stopped
}
/** Split packet into multiple chunks (if necessary)
* Split packets are wrapped in a HandleGamePacket and sent as SlottedMetaPacket4
* The purpose of SlottedMetaPacket4 may or may not be to indicate a split packet
*/
def splitPacket(packet: BitVector): Seq[PlanetSideControlPacket] = {
if (packet.length > (MTU - 4) * 8) {
PacketCoding.encodePacket(HandleGamePacket(packet.bytes)) match {
case Successful(data) =>
data.grouped((MTU - 8) * 8).map(vec => smp(4, vec.bytes)).toSeq
case Failure(cause) =>
log.error(cause.message)
Seq()
}
} else {
Seq(smp(0, packet.bytes))
}
}
}

View file

@ -1,7 +1,7 @@
// Copyright (c) 2017 PSForever
package net.psforever.packet
import java.security.{Key, SecureRandom, Security}
import java.security.{Key, Security}
import javax.crypto.Cipher
import javax.crypto.spec.RC5ParameterSpec
@ -16,8 +16,6 @@ import net.psforever.util.Md5Mac
object PacketCoding {
Security.addProvider(new BouncyCastleProvider)
private val random = new SecureRandom()
val RC5_BLOCK_SIZE = 8
/** A lower bound on the packet size */
@ -40,9 +38,9 @@ object PacketCoding {
case _: PlanetSideControlPacket if crypto.isEmpty => BitVector.empty
case _ =>
sequence match {
case Some(sequence) =>
uint16L.encode(sequence) match {
case Successful(seq) => seq
case Some(_sequence) =>
uint16L.encode(_sequence) match {
case Successful(_seq) => _seq
case f @ Failure(_) => return f
}
case None =>
@ -53,8 +51,8 @@ object PacketCoding {
val (flags, payload) = packet match {
case _: PlanetSideGamePacket | _: PlanetSideControlPacket if crypto.isDefined =>
encodePacket(packet) match {
case Successful(payload) =>
val encryptedPayload = crypto.get.encrypt(payload.bytes) match {
case Successful(_payload) =>
val encryptedPayload = crypto.get.encrypt(_payload.bytes) match {
case Successful(p) => p
case f: Failure => return f
}
@ -68,29 +66,29 @@ object PacketCoding {
}
case packet: PlanetSideGamePacket =>
encodePacket(packet) match {
case Successful(payload) =>
case Successful(_payload) =>
(
PlanetSidePacketFlags.codec.encode(PlanetSidePacketFlags(PacketType.Normal, secured = false)).require,
payload
_payload
)
case f @ Failure(_) => return f
}
case packet: PlanetSideControlPacket =>
encodePacket(packet) match {
case Successful(payload) =>
case Successful(_payload) =>
(
// control packets don't have flags
BitVector.empty,
payload
_payload
)
case f @ Failure(_) => return f
}
case packet: PlanetSideCryptoPacket =>
encodePacket(packet) match {
case Successful(payload) =>
case Successful(_payload) =>
(
PlanetSidePacketFlags.codec.encode(PlanetSidePacketFlags(PacketType.Crypto, secured = false)).require,
payload
_payload
)
case f @ Failure(_) => return f
}
@ -164,7 +162,7 @@ object PacketCoding {
crypto: Option[CryptoCoding] = None
): Attempt[(PlanetSidePacket, Int)] = {
val (flags, remainder) = Codec.decode[PlanetSidePacketFlags](BitVector(msg)) match {
case Successful(DecodeResult(value, remainder)) => (value, remainder)
case Successful(DecodeResult(value, _remainder)) => (value, _remainder)
case Failure(e) => return Failure(Err(s"Failed to parse packet flags: ${e.message}"))
}
@ -184,8 +182,8 @@ object PacketCoding {
// all packets have a two byte sequence ID
val (sequence, payload) = uint16L.decode(remainder) match {
case Successful(DecodeResult(value, remainder)) =>
(value, remainder.toByteVector)
case Successful(DecodeResult(value, _remainder)) =>
(value, _remainder.toByteVector)
case Failure(e) =>
return Failure(Err(s"Failed to parse packet sequence number: ${e.message}"))
}
@ -195,9 +193,9 @@ object PacketCoding {
CryptoPacketOpcode
.getPacketDecoder(cryptoState)(payload.bits)
.map(p => (p.value.asInstanceOf[PlanetSidePacket], sequence))
case (PacketType.Normal, Some(crypto)) if flags.secured =>
case (PacketType.Normal, Some(_crypto)) if flags.secured =>
// encrypted payload is 4-byte aligned: 1b flags, 2b sequence, 1b padding
crypto.decrypt(payload.drop(1)).map(p => decodePacket(p)).flatten.map(p => (p, sequence))
_crypto.decrypt(payload.drop(1)).map(p => decodePacket(p)).flatten.map(p => (p, sequence))
case (PacketType.Normal, None) if !flags.secured =>
decodePacket(payload).map(p => (p, sequence))
case (PacketType.Normal, None) =>
@ -279,7 +277,9 @@ object PacketCoding {
Successful(ByteVector.view(rc5Encrypt.doFinal(packetWithPadding.toArray)))
}
} catch {
case e: Throwable => Failure(Err(s"encrypt error: '${e.getMessage}' data: ${packetWithPadding.toHex}"))
case e: Throwable =>
val msg = if(e.getMessage == null) e.getClass.getSimpleName else e.getMessage
Failure(Err(s"encrypt error: '$msg' data: ${packetWithPadding.toHex}"))
}
}
@ -295,7 +295,7 @@ object PacketCoding {
// last byte is the padding length
val padding = uint8L.decode(payloadDecrypted.takeRight(1).bits) match {
case Successful(padding) => padding.value
case Successful(_padding) => _padding.value
case Failure(e) => return Failure(Err(s"Failed to decode the encrypted padding length: ${e.message}"))
}
@ -304,7 +304,7 @@ object PacketCoding {
val mac = bytes(Md5Mac.MACLENGTH).decode(payloadMac.bits) match {
case Failure(e) => return Failure(Err("Failed to extract the encrypted MAC: " + e.message))
case Successful(mac) => mac.value
case Successful(_mac) => _mac.value
}
val payloadNoMac = payloadNoPadding.dropRight(Md5Mac.MACLENGTH)

View file

@ -114,7 +114,15 @@ case class AntiCheatConfig(
)
case class NetworkConfig(
session: SessionConfig
session: SessionConfig,
middleware: MiddlewareConfig
)
case class MiddlewareConfig(
packetBundlingDelay: FiniteDuration,
inReorderTimeout: FiniteDuration,
inSubslotMissingDelay: FiniteDuration,
inSubslotMissingAttempts: Int
)
case class SessionConfig(