diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 83da8966..2396ea92 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -220,7 +220,8 @@ game { # while additional entries with insufficient time spacing may result in no change in behavior. delays = [350, 600, 800] } - + + # Don't ask. doors-can-be-opened-by-med-app-from-this-distance = 5.05 } @@ -234,7 +235,12 @@ anti-cheat { network { middleware { # How often between executions of the outbound bundling process - packet-bundling-delay = 50 milliseconds + # Used for outbound packet arrival triggers + packet-bundling-delay = 40 milliseconds + + # How often between executions of the outbound bundling process + # Affects the base value on the timer + packet-bundling-delay-multiplier = 1.25 # Pause inbound packet transmission towards the network if the sequence number is out of order # Packets are put aside until the sequence is restored, or this timeout passes diff --git a/src/main/scala/net/psforever/actors/net/MiddlewareActor.scala b/src/main/scala/net/psforever/actors/net/MiddlewareActor.scala index 1c77da57..edf525a8 100644 --- a/src/main/scala/net/psforever/actors/net/MiddlewareActor.scala +++ b/src/main/scala/net/psforever/actors/net/MiddlewareActor.scala @@ -40,7 +40,7 @@ import net.psforever.util.{Config, DiffieHellman, Md5Mac} * Various subroutines are used to keep track of the predicted offending packets.
*
* The cryptographic aspect of the service resolves itself first, - * exchanging keys and passcodes and challeneges between local and the network. + * exchanging keys and passcodes and challenges between local and the network. * Most of this process occurs without any prior cryptographic setup, * relying on the validation of its own exchange to operate. * Afterwards its completion, @@ -58,7 +58,7 @@ import net.psforever.util.{Config, DiffieHellman, Md5Mac} * and will respond to those requests if possible by searching its catalog.
*
* If the inbound packets do not arrive correctly the first time, - * after a while, requests for identified mising packets from the network source will occur. + * after a while, requests for identified missing packets from the network source will occur. * Virtually all important packets have a sequence number that bestows on each packet an absolute delivery order. * Bundled packets have a subslot number that indicates the total number of subslot packets dispatched to this client * as well as the order in which the packets should have been received. @@ -66,14 +66,13 @@ import net.psforever.util.{Config, DiffieHellman, Md5Mac} * Should the expected packets show up out of order before the buffered is cleared, * everything sorts itself out. * Unresolved missing sequence entries will often lead to requests for missing packets with anticipated subslots. - * If these requests do not resolve, there is unfortuantely not much that can be done except grin and bear with it. + * If these requests do not resolve, there is unfortunately not much that can be done except grin and bear with it. */ object MiddlewareActor { Security.addProvider(new BouncyCastleProvider) /** Maximum packet size in bytes */ - //final val MTU: Int = 467 - final val MTU: Int = 440 + final val MTU: Int = 440 //467 def apply( socket: ActorRef[Udp.Command], @@ -98,7 +97,7 @@ object MiddlewareActor { def apply(msg: ByteVector): Raw = Raw(msg.toBitVector, exclusive = false) - def apply(msg: ByteVector, exclusive: Boolean): Raw = Raw(msg, exclusive) + def apply(msg: ByteVector, exclusive: Boolean): Raw = Raw(msg.toBitVector, exclusive) } /** Teardown connection */ @@ -120,22 +119,23 @@ object MiddlewareActor { * All packets are bundled by themselves. * May as well just waste all of the cycles on your CPU, eh? */ - def allPacketGuard(packet: PlanetSidePacket): Boolean = true + //noinspection ScalaUnusedSymbol + private def allPacketGuard(packet: PlanetSidePacket): Boolean = true /** * `CharacterInfoMessage` packets are bundled by themselves.
*
* Super awkward special case. - * Bundling `CharacterInfoMessage` with its corresponding `ObjectCreateDetailedMesssage`, + * Bundling `CharacterInfoMessage` with its corresponding `ObjectCreateDetailedMessage`, * which can occur during otherwise careless execution of the character select screen, * causes the character options to show blank slots and be unusable. */ - def characterInfoMessageGuard(packet: PlanetSidePacket): Boolean = { + private def characterInfoMessageGuard(packet: PlanetSidePacket): Boolean = { packet.isInstanceOf[CharacterInfoMessage] } /** `KeepAliveMessage` packets are bundled by themselves. They're special. */ - def keepAliveMessageGuard(packet: PlanetSidePacket): Boolean = { + private def keepAliveMessageGuard(packet: PlanetSidePacket): Boolean = { packet.isInstanceOf[KeepAliveMessage] } @@ -162,42 +162,42 @@ class MiddlewareActor( import MiddlewareActor._ - implicit val ec: ExecutionContextExecutor = context.executionContext - implicit val executor: ExecutionContext = context.executionContext + private implicit val ec: ExecutionContextExecutor = context.executionContext + private implicit val executor: ExecutionContext = context.executionContext private[this] val log = org.log4s.getLogger - var clientNonce: Long = 0 + private var clientNonce: Long = 0 - var serverMACBuffer: ByteVector = ByteVector.empty + private var serverMACBuffer: ByteVector = ByteVector.empty - val random = new SecureRandom() + private val random = new SecureRandom() - var crypto: Option[CryptoCoding] = None + private var crypto: Option[CryptoCoding] = None - val nextActor: ActorRef[PlanetSidePacket] = + private val nextActor: ActorRef[PlanetSidePacket] = context.spawnAnonymous(next(context.self, sender, connectionId), ActorTags(s"id=$connectionId")) /** Queue of incoming packets (plus sequence numbers and timestamps) that arrived in the wrong order */ private val inReorderQueue: mutable.Queue[InReorderEntry] = mutable.Queue() /** Latest incoming sequence number */ - var inSequence = -1 + private var inSequence: Int = -1 /** Latest incoming subslot number */ - var inSubslot = -1 + private var inSubslot: Int = -1 /** List of missing subslot numbers and attempts counter */ - val inSubslotsMissing: mutable.Map[Int, Int] = TrieMap() + private val inSubslotsMissing: mutable.Map[Int, Int] = TrieMap() /** Queue of outgoing packets used for bundling and splitting */ - val outQueue: mutable.Queue[(PlanetSidePacket, BitVector)] = mutable.Queue() + private val outQueue: mutable.Queue[(PlanetSidePacket, BitVector)] = mutable.Queue() /** Queue of outgoing packets ready for sending */ - val outQueueBundled: mutable.Queue[PlanetSidePacket] = mutable.Queue() + private val outQueueBundled: mutable.Queue[PlanetSidePacket] = mutable.Queue() /** Latest outbound sequence number */ - var outSequence = -1 + private var outSequence: Int = -1 /** * Increment the outbound sequence number. @@ -205,33 +205,33 @@ class MiddlewareActor( * The fidelity of the sequence field in packets is 16 bits, so wrap back to 0 after 65535. * @return */ - def nextSequence: Int = { + private def nextSequence: Int = { if (outSequence >= 0xffff) { // TODO resetting the sequence to 0 causes a client crash // but that does not happen when we always send the same number // the solution is most likely to send the proper ResetSequence payload // send(ResetSequence(), None, crypto) - // outSequence = -1 // return nextSequence - return outSequence + outSequence + } else { + outSequence += 1 + outSequence } - outSequence += 1 - outSequence } /** Latest outbound subslot number; * the current subslot is one less than this number */ - var outSubslot = 0 + private var outSubslot = 0 /** * Increment the outbound subslot number. * The previous subslot number is returned. - * The fidelity of the subslot field in `SlottedMetapacket`'s is 16 bits, so wrap back to 0 after 65535. + * The fidelity of the subslot field in a `SlottedMetaPacket` is 16 bits, so wrap back to 0 after 65535. * @return */ - def nextSubslot: Int = { + private def nextSubslot: Int = { val r = outSubslot if (outSubslot == 0xffff) { outSubslot = 0 @@ -244,12 +244,12 @@ class MiddlewareActor( /** * Do not bundle these packets together with other packets */ - val packetsBundledByThemselves: List[PlanetSidePacket => Boolean] = List( + private val packetsBundledByThemselves: List[PlanetSidePacket => Boolean] = List( MiddlewareActor.keepAliveMessageGuard, MiddlewareActor.characterInfoMessageGuard ) - val smpHistoryLength: Int = 100 + private val smpHistoryLength: Int = 100 /** History of created `SlottedMetaPacket`s. * In case the client does not register receiving a packet by checking against packet subslot index numbers, @@ -259,35 +259,40 @@ class MiddlewareActor( * All packets with subslots less than that number have been received or will no longer be requested. * The client and server supposedly maintain reciprocating mechanisms. */ - val preparedSlottedMetaPackets: Array[SlottedMetaPacket] = new Array[SlottedMetaPacket](smpHistoryLength) - var nextSmpIndex: Int = 0 - var acceptedSmpSubslot: Int = 0 + private val preparedSlottedMetaPackets: Array[SlottedMetaPacket] = new Array[SlottedMetaPacket](smpHistoryLength) + private var nextSmpIndex: Int = 0 + private var acceptedSmpSubslot: Int = 0 /** end of life stat */ - var timesInReorderQueue: Int = 0 + private var timesInReorderQueue: Int = 0 /** end of life stat */ - var timesSubslotMissing: Int = 0 + private var timesSubslotMissing: Int = 0 + + /** Delay for testing outbound packet evaluation (ms) */ + private val packetOutboundDelay: Long = Config.app.network.middleware.packetBundlingDelay.toMillis /** Delay between runs of the packet bundler/resolver timer (ms); - * 250ms per network update (client upstream), so 10 runs of this bundling code every update - */ - val packetProcessorDelay = Config.app.network.middleware.packetBundlingDelay + * this is the same as `packetOutboundDelay` increased by the multiplier */ + private val packetProcessorDelay: FiniteDuration = Duration.apply( + math.abs(packetOutboundDelay * Config.app.network.middleware.packetBundlingDelayMultiplier).toLong, + "milliseconds" + ) /** Timer that handles the bundling and throttling of outgoing packets and resolves disorganized inbound packets */ - var packetProcessor: Cancellable = Default.Cancellable + private var packetProcessor: Cancellable = Default.Cancellable /** how long packets that are out of sequential order wait for the missing sequence before being expedited (ms) */ - val inReorderTimeout = Config.app.network.middleware.inReorderTimeout + private val inReorderTimeout: FiniteDuration = Config.app.network.middleware.inReorderTimeout /** Timer that handles the bundling and throttling of outgoing packets requesting packets with known subslot numbers */ - var subslotMissingProcessor: Cancellable = Default.Cancellable + private var subslotMissingProcessor: Cancellable = Default.Cancellable /** how long to wait between repeated requests for packets with known missing subslot numbers (ms) */ - val inSubslotMissingDelay = Config.app.network.middleware.inSubslotMissingDelay + private val inSubslotMissingDelay: FiniteDuration = Config.app.network.middleware.inSubslotMissingDelay /** how many time to repeat the request for a packet with a known missing subslot number */ - val inSubslotMissingNumberOfAttempts = Config.app.network.middleware.inSubslotMissingAttempts + private val inSubslotMissingNumberOfAttempts: Int = Config.app.network.middleware.inSubslotMissingAttempts //formerly, CryptoSessionActor @@ -308,7 +313,7 @@ class MiddlewareActor( Unknown30 is used to reuse an existing crypto session when switching from login to world When not handling it, it appears that the client will fall back to using ClientStart Do we need to implement this? - */ + */ connectionClose() case (ConnectionClose(), _) => @@ -318,7 +323,6 @@ class MiddlewareActor( */ Behaviors.same - // TODO ResetSequence case _ => log.warn(s"Unexpected packet type $packet in start (before crypto)") Behaviors.same @@ -357,7 +361,7 @@ class MiddlewareActor( } - def cryptoSetup(): Behavior[Command] = { + private def cryptoSetup(): Behavior[Command] = { Behaviors .receiveMessagePartial[Command] { case Receive(msg) => @@ -369,7 +373,7 @@ class MiddlewareActor( val dh = DiffieHellman(p.toArray, g.toArray) val clientChallenge = ServerChallengeXchg.getCompleteChallenge(time, challenge) val serverTime = System.currentTimeMillis() / 1000L - val randomChallenge = randomBytes(0xc) + val randomChallenge = randomBytes(amount = 0xc) val serverChallenge = ServerChallengeXchg.getCompleteChallenge(serverTime, randomChallenge) serverMACBuffer ++= send( ServerChallengeXchg(serverTime, randomChallenge, ByteVector.view(dh.publicKey)) @@ -391,7 +395,7 @@ class MiddlewareActor( .receiveSignal(onSignal) } - def cryptoFinish(dh: DiffieHellman, clientChallenge: ByteVector, serverChallenge: ByteVector): Behavior[Command] = { + private def cryptoFinish(dh: DiffieHellman, clientChallenge: ByteVector, serverChallenge: ByteVector): Behavior[Command] = { Behaviors .receiveMessagePartial[Command] { case Receive(msg) => @@ -424,13 +428,8 @@ class MiddlewareActor( ) ) send(ServerFinished(serverChallengeResult)) - //start the queue processor loop - packetProcessor = context.system.scheduler.scheduleWithFixedDelay( - packetProcessorDelay, - packetProcessorDelay - )(() => { - context.self ! ProcessQueue - }) + //try the queue processor loop + processQueue() active() case other => @@ -450,7 +449,7 @@ class MiddlewareActor( //formerly, PacketCodingActor - def active(): Behavior[Command] = { + private def active(): Behavior[Command] = { Behaviors .receiveMessage[Command] { case Receive(msg) => @@ -462,7 +461,7 @@ class MiddlewareActor( case Successful((packet, None)) => packet match { case _: PlanetSideResetSequencePacket => - log.info(s"ResetSequence: ${msg.toHex}, inSeq: ${inSequence}, outSeq: ${outSequence}") + log.info(s"ResetSequence: ${msg.toHex}, inSeq: $inSequence, outSeq: $outSequence") case _ => () } in(packet) @@ -475,12 +474,24 @@ class MiddlewareActor( out(packet) Behaviors.same - case Raw(msg, exclusive) => - if (exclusive) { - outQueue.enqueue((KeepAliveMessage(), msg)) //caught by bundling isolation filter - } else { - outQueue.enqueue((ActionResultMessage.Pass, msg)) - } + case Raw(msg, true) + if System.currentTimeMillis() - lastOutboundEventTime > packetOutboundDelay => + outQueue.enqueue((KeepAliveMessage(), msg)) //caught by bundling isolation filter + processOutQueueBundle() + Behaviors.same + + case Raw(msg, true) => + outQueue.enqueue((KeepAliveMessage(), msg)) //caught by bundling isolation filter + Behaviors.same + + case Raw(msg, false) + if System.currentTimeMillis() - lastOutboundEventTime > packetOutboundDelay => + outQueue.enqueue((ActionResultMessage.Pass, msg)) + processOutQueueBundle() + Behaviors.same + + case Raw(msg, false) => + outQueue.enqueue((ActionResultMessage.Pass, msg)) Behaviors.same case ProcessQueue => @@ -493,19 +504,24 @@ class MiddlewareActor( case Teardown() => send(TeardownConnection(clientNonce)) - context.self ! Close() + close() Behaviors.same case Close() => - outQueue - .dequeueAll(_ => true) - .foreach(p => send(smp(slot = 0, p._2.bytes), Some(nextSequence), crypto)) - connectionClose() + close() + Behaviors.same } .receiveSignal(onSignal) } - val onSignal: PartialFunction[(ActorContext[Command], Signal), Behavior[Command]] = { + private def close(): Unit = { + outQueue + .dequeueAll(_ => true) + .foreach(p => send(smp(slot = 0, p._2.bytes), Some(nextSequence), crypto)) + connectionClose() + } + + private val onSignal: PartialFunction[(ActorContext[Command], Signal), Behavior[Command]] = { case (_, PostStop) => context.stop(nextActor) if (timesInReorderQueue > 0 || timesSubslotMissing > 0) { @@ -519,7 +535,7 @@ class MiddlewareActor( } /** Handle incoming packet */ - def in(packet: PlanetSidePacket): Behavior[Command] = { + private def in(packet: PlanetSidePacket): Behavior[Command] = { packet match { case packet: PlanetSideGamePacket => nextActor ! packet @@ -556,25 +572,30 @@ class MiddlewareActor( Behaviors.same case ControlSync(diff, _, _, _, _, _, fa, fb) => - // TODO: figure out what this is what what it does for the PS client - // I believe it has something to do with reliable packet transmission and resending - - // Work around the 2038 problem - // TODO can we just start at 0 again? what is this for? + // something to do with reliable packet transmission and resending + // TODO Work around the 2038 problem; can't we just start at 0 again? val serverTick = math.min(System.currentTimeMillis(), 4294967295L) val nextDiff = if (diff == 65535) 0 else diff + 1 send(ControlSyncResp(nextDiff, serverTick, fa, fb, fb, fa)) Behaviors.same case ConnectionClose() => - Behaviors.stopped + connectionClose() case TeardownConnection(_) => - Behaviors.stopped + connectionClose() case ClientStart(_) => start() + case Unknown30(_) => + /* + Unknown30 is used to reuse an existing crypto session when switching from login to world + When not handling it, it appears that the client will fall back to using ClientStart + Do we need to implement this? + */ + connectionClose() + case other => log.warn(s"Unhandled control packet '$other'") Behaviors.same @@ -584,35 +605,38 @@ class MiddlewareActor( log.error(s"Unexpected crypto packet '$packet'") Behaviors.same - case packet: PlanetSideResetSequencePacket => - // TODO This is wrong - // I suspect ResetSequence is a notification that the remote sequence has been reset - // rather than a request to reset our outgoing sequence number - // Resetting it this way causes a client crash, see nextSequence - - // log.debug(s"Received sequence reset request from client: $packet.}") - // outSequence = 0 + case _: PlanetSideResetSequencePacket => + log.error(s"Unexpected crypto packet: received a PlanetSideResetSequencePacket when it should never happen") Behaviors.same } } - def in(packet: Attempt[PlanetSidePacket]): Unit = { + private def in(packet: Attempt[PlanetSidePacket]): Unit = { packet match { case Successful(_packet) => in(_packet) case Failure(cause) => log.error(s"Could not decode packet: ${cause.message}") } } + private var lastOutboundEventTime: Long = 0L + /** Handle outgoing packet */ - def out(packet: PlanetSidePacket): Unit = { + private def out(packet: PlanetSidePacket): Unit = { packet match { case packet: KeepAliveMessage => send(packet) case _ => PacketCoding.encodePacket(packet) match { - case Successful(payload) => outQueue.enqueue((packet, payload)) - case Failure(cause) => log.error(s"Could not encode $packet: ${cause.message}") + case Successful(payload) + if System.currentTimeMillis() - lastOutboundEventTime > packetOutboundDelay => + outQueue.enqueue((packet, payload)) + processOutQueueBundle() + case Successful(payload) => + outQueue.enqueue((packet, payload)) + retimePacketProcessorIfNotRunning() + case Failure(cause) => + log.error(s"Could not encode $packet: ${cause.message}") } } } @@ -622,9 +646,9 @@ class MiddlewareActor( * the bundling and throttling of outgoing packets * and the reordering of out-of-sequence incoming packets. */ - def processQueue(): Unit = { - processOutQueueBundle() + private def processQueue(): Unit = { inReorderQueueFunc() + processOutQueueBundle() } /** @@ -635,7 +659,9 @@ class MiddlewareActor( * Packets that are too big for the MTU must go on to be split into smaller portions that will be wrapped individually. * Once queued, the first bundle is dispatched to the network. */ - def processOutQueueBundle(): Unit = { + private def processOutQueueBundle(): Unit = { + packetProcessor.cancel() + packetProcessor = Default.Cancellable try { if (outQueueBundled.nonEmpty) { sendFirstBundle() @@ -671,7 +697,8 @@ class MiddlewareActor( if (bundle.length == 1) { splitPacket(bundle.head) match { case Seq() => - //TODO is oversized packet recovery possible? + //TODO is over-sized packet recovery possible? + retimePacketProcessorIfWork() case data => outQueueBundled.enqueueAll(data) sendFirstBundle() @@ -712,18 +739,19 @@ class MiddlewareActor( * @param packet the packet * @param sequence the sequence number obtained from the packet */ - def activeNormal(packet: PlanetSidePacket, sequence: Int): Unit = { + private def activeNormal(packet: PlanetSidePacket, sequence: Int): Unit = { if (sequence == inSequence + 1) { inSequence = sequence in(packet) } else if (sequence < inSequence) { //expedite this packet in(packet) } else if (sequence == inSequence) { - //do nothing? + log.warn("reported packet sequence matches previous packet sequence; nothing will happen?") } else { inReorderQueue.enqueue(InReorderEntry(packet, sequence, System.currentTimeMillis())) //first entry inReorderQueueFunc = processInReorderQueueTimeoutOnly activeSequenceFunc = activeWithReordering + retimePacketProcessorIfNotRunning() log.trace("packet sequence in disorder") } } @@ -741,7 +769,7 @@ class MiddlewareActor( * @param packet the packet * @param sequence the sequence number obtained from the packet */ - def activeWithReordering(packet: PlanetSidePacket, sequence: Int): Unit = { + private def activeWithReordering(packet: PlanetSidePacket, sequence: Int): Unit = { if (sequence == inSequence + 1) { inSequence = sequence in(packet) @@ -778,7 +806,7 @@ class MiddlewareActor( * Set the recorded inbound sequence number to belong to the greatest packet removed from the queue. * @see `processInReorderQueueTimeoutOnly` */ - def processInReorderQueue(): Unit = { + private def processInReorderQueue(): Unit = { timesInReorderQueue += 1 var currentSequence = inSequence val currentTime = System.currentTimeMillis() @@ -806,6 +834,7 @@ class MiddlewareActor( inReorderQueueFunc = processInReorderQueueTimeoutOnly in(p) } + retimePacketProcessorIfNotRunningIfWork() } /** @@ -814,7 +843,7 @@ class MiddlewareActor( * Set the recorded inbound sequence number to belong to the greatest packet removed from the queue. * This may run during the scheduled check on the in-order queue after the outbound bundling process. */ - def processInReorderQueueTimeoutOnly(): Unit = { + private def processInReorderQueueTimeoutOnly(): Unit = { timesInReorderQueue += 1 val currentTime = System.currentTimeMillis() val index = inReorderQueue.indexWhere { currentTime - _.time > inReorderTimeout.toMillis } @@ -832,7 +861,7 @@ class MiddlewareActor( * Otherwise, do work on whatever is at the front of the queue, and do not test again until explicitly requested. * Test the queue for more contents after removing content from it. */ - def inReorderQueueTest(): Unit = { + private def inReorderQueueTest(): Unit = { if (inReorderQueue.isEmpty) { inReorderQueueFunc = doNothing activeSequenceFunc = activeNormal @@ -883,7 +912,7 @@ class MiddlewareActor( * @see `PacketCoding.decodePacket` * @see `RelatedB` */ - def inSubslotNotMissing(slot: Int, subslot: Int, inner: ByteVector): Unit = { + private def inSubslotNotMissing(slot: Int, subslot: Int, inner: ByteVector): Unit = { if (subslot == inSubslot + 1) { in(PacketCoding.decodePacket(inner)) send(RelatedB(slot, subslot)) @@ -933,7 +962,7 @@ class MiddlewareActor( * @see `inSubslotsMissingRequestsFinished` * @see `PacketCoding.decodePacket` */ - def inSubslotMissingRequests(slot: Int, subslot: Int, inner: ByteVector): Unit = { + private def inSubslotMissingRequests(slot: Int, subslot: Int, inner: ByteVector): Unit = { if (subslot < inSubslot) { inSubslotsMissing.remove(subslot) in(PacketCoding.decodePacket(inner)) @@ -955,7 +984,7 @@ class MiddlewareActor( * resume normal operations when acting upon inbound `SlottedMetaPacket` packets. * @param slot the optional slot to report the "first" `RelatedB` in a "while" */ - def inSubslotsMissingRequestsFinished(slot: Int = 0): Unit = { + private def inSubslotsMissingRequestsFinished(slot: Int = 0): Unit = { if (inSubslotsMissing.isEmpty) { subslotMissingProcessor.cancel() activeSubslotsFunc = inSubslotNotMissing @@ -969,7 +998,7 @@ class MiddlewareActor( * Split packets are wrapped in a `HandleGamePacket` and sent as `SlottedMetaPacket4`. * The purpose of `SlottedMetaPacket4` may or may not be to indicate a split packet. */ - def splitPacket(packet: BitVector): Seq[PlanetSideControlPacket] = { + private def splitPacket(packet: BitVector): Seq[PlanetSideControlPacket] = { if (packet.length > (MTU - 4) * 8) { PacketCoding.encodePacket(HandleGamePacket(packet.bytes)) match { case Successful(data) => @@ -989,7 +1018,7 @@ class MiddlewareActor( * @param data hexadecimal data, the encoded packets to be placed in the SMP * @return the packet */ - def smp(slot: Int, data: ByteVector): SlottedMetaPacket = { + private def smp(slot: Int, data: ByteVector): SlottedMetaPacket = { val packet = SlottedMetaPacket(slot, nextSubslot, data) preparedSlottedMetaPackets.update(nextSmpIndex, packet) nextSmpIndex = (nextSmpIndex + 1) % smpHistoryLength @@ -1003,23 +1032,25 @@ class MiddlewareActor( * @throws NoSuchElementException if there is no packet to dequeue * @see `SlottedMetaPacket` */ - def sendFirstBundle(): Unit = { + private def sendFirstBundle(): Unit = { + lastOutboundEventTime = System.currentTimeMillis() send(outQueueBundled.dequeue(), Some(nextSequence), crypto) + retimePacketProcessorIfNotRunningIfWork() } - def send(packet: PlanetSideControlPacket): ByteVector = { + private def send(packet: PlanetSideControlPacket): ByteVector = { send(packet, if (crypto.isDefined) Some(nextSequence) else None, crypto) } - def send(packet: PlanetSideCryptoPacket): ByteVector = { + private def send(packet: PlanetSideCryptoPacket): ByteVector = { send(packet, Some(nextSequence), crypto) } - def send(packet: PlanetSideGamePacket): ByteVector = { + private def send(packet: PlanetSideGamePacket): ByteVector = { send(packet, Some(nextSequence), crypto) } - def send(packet: PlanetSidePacket, sequence: Option[Int], crypto: Option[CryptoCoding]): ByteVector = { + private def send(packet: PlanetSidePacket, sequence: Option[Int], crypto: Option[CryptoCoding]): ByteVector = { PacketCoding.marshalPacket(packet, sequence, crypto) match { case Successful(bits) => val bytes = bits.toByteVector @@ -1036,17 +1067,61 @@ class MiddlewareActor( * @param amount the number of bytes * @return a random series of bytes */ - def randomBytes(amount: Int): ByteVector = { + //noinspection SameParameterValue + private def randomBytes(amount: Int): ByteVector = { val array = Array.ofDim[Byte](amount) random.nextBytes(array) ByteVector.view(array) } + /** + * If the timer the timer for processing outbound packets is not currently running, + * test if there are any packets waiting in certain messaging queues + * for the chance to cancel, then restart, the timer for processing outbound packets. + */ + private def retimePacketProcessorIfNotRunningIfWork(): Unit = { + if (packetProcessor == Default.Cancellable || packetProcessor.isCancelled) { + retimePacketProcessorIfWork() + } + } + + /** + * If the timer the timer for processing outbound packets is not currently running, + * test if there are any packets waiting in certain messaging queues + * for the chance to cancel, then restart, the timer for processing outbound packets. + */ + private def retimePacketProcessorIfNotRunning(): Unit = { + if (packetProcessor == Default.Cancellable || packetProcessor.isCancelled) { + retimePacketProcessor() + } + } + + /** + * If there are any packets waiting in the following queues, + * cancel, then restart, the timer for processing outbound packets. + */ + private def retimePacketProcessorIfWork(): Unit = { + if (outQueueBundled.nonEmpty || outQueue.nonEmpty || inReorderQueue.nonEmpty) { + retimePacketProcessor() + } + } + + /** + * Cancel, then restart, the timer for processing outbound packets. + * Sends a message to the normal mailbox to signal processing activity. + */ + private def retimePacketProcessor(): Unit = { + packetProcessor.cancel() + packetProcessor = context.system.scheduler.scheduleOnce(packetProcessorDelay, () => { + context.self ! ProcessQueue + }) + } + /** * End client-server ops. * End messaging capabilities. */ - def connectionClose(): Behavior[Command] = { + private def connectionClose(): Behavior[Command] = { send(ConnectionClose()) Behaviors.stopped } diff --git a/src/main/scala/net/psforever/objects/zones/blockmap/BlockMap.scala b/src/main/scala/net/psforever/objects/zones/blockmap/BlockMap.scala index f63a10c7..b6c91ec9 100644 --- a/src/main/scala/net/psforever/objects/zones/blockmap/BlockMap.scala +++ b/src/main/scala/net/psforever/objects/zones/blockmap/BlockMap.scala @@ -516,7 +516,8 @@ object BlockMap { structure: Iterable[Sector] ): Iterable[Sector] = { if (list.max < structure.size) { - list.toSet.map { structure.toSeq } + val structureSeq = structure.toSeq + list.toSet.map { structureSeq } } else { List[Sector]() } diff --git a/src/main/scala/net/psforever/util/Config.scala b/src/main/scala/net/psforever/util/Config.scala index b7097cbb..fa5c742f 100644 --- a/src/main/scala/net/psforever/util/Config.scala +++ b/src/main/scala/net/psforever/util/Config.scala @@ -135,6 +135,7 @@ case class NetworkConfig( case class MiddlewareConfig( packetBundlingDelay: FiniteDuration, + packetBundlingDelayMultiplier: Float, inReorderTimeout: FiniteDuration, inSubslotMissingDelay: FiniteDuration, inSubslotMissingAttempts: Int