From 775d9dca5e9241520ca8a516cac0bc2670dcb802 Mon Sep 17 00:00:00 2001 From: Fate-JH Date: Mon, 9 Feb 2026 14:18:15 -0500 Subject: [PATCH] cached messages on avatar and vehicle message systems --- .../actors/session/csr/VehicleLogic.scala | 9 +- .../actors/session/normal/GeneralLogic.scala | 3 +- .../actors/session/normal/VehicleLogic.scala | 9 +- .../session/spectator/GeneralLogic.scala | 3 +- .../session/spectator/VehicleLogic.scala | 3 +- .../support/SessionMountHandlers.scala | 3 +- .../WeaponAndProjectileOperations.scala | 3 +- .../services/avatar/AvatarService.scala | 10 +- .../services/base/GenericEventService.scala | 10 +- ...nericEventServiceWithCacheAndSupport.scala | 113 ++++++++++++++++++ .../services/vehicle/VehicleService.scala | 4 +- 11 files changed, 147 insertions(+), 23 deletions(-) create mode 100644 src/main/scala/net/psforever/services/base/GenericEventServiceWithCacheAndSupport.scala diff --git a/src/main/scala/net/psforever/actors/session/csr/VehicleLogic.scala b/src/main/scala/net/psforever/actors/session/csr/VehicleLogic.scala index d544f6dcf..2c7bb30dc 100644 --- a/src/main/scala/net/psforever/actors/session/csr/VehicleLogic.scala +++ b/src/main/scala/net/psforever/actors/session/csr/VehicleLogic.scala @@ -13,6 +13,7 @@ import net.psforever.objects.zones.Zone import net.psforever.objects.zones.interaction.InteractsWithZone import net.psforever.packet.game.{ChildObjectStateMessage, DeployRequestMessage, FrameVehicleStateMessage, PlanetsideAttributeMessage, VehicleStateMessage, VehicleSubStateMessage} import net.psforever.services.avatar.AvatarServiceMessage +import net.psforever.services.base.CachedMessage import net.psforever.services.base.messages.PlanetsideAttribute import net.psforever.services.vehicle.{VehicleAction, VehicleServiceMessage} import net.psforever.types.{DriveState, Vector3} @@ -94,7 +95,7 @@ class VehicleLogic(val ops: VehicleOperations, implicit val context: ActorContex is_decelerating, obj.Cloaked ) - ) + ) //todo CachedMessage sessionLogic.squad.updateSquad() case (None, _) => //log.error(s"VehicleState: no vehicle $vehicle_guid found in zone") @@ -170,7 +171,7 @@ class VehicleLogic(val ops: VehicleOperations, implicit val context: ActorContex continent.id, player.GUID, VehicleAction.FrameVehicleState(vehicle_guid, unk1, position, angle, velocity, unk2, unk3, unk4, is_crouched, is_airborne, ascending_flight, flight_time, unk9, unkA) - ) + ) //todo CachedMessage sessionLogic.squad.updateSquad() case (None, _) => //log.error(s"VehicleState: no vehicle $vehicle_guid found in zone") @@ -225,7 +226,7 @@ class VehicleLogic(val ops: VehicleOperations, implicit val context: ActorContex continent.id, player.GUID, VehicleAction.ChildObjectState(object_guid, pitch, yaw) - ) + ) //todo CachedMessage } //TODO status condition of "playing getting out of vehicle to allow for late packets without warning if (player.death_by == -1) { @@ -244,7 +245,7 @@ class VehicleLogic(val ops: VehicleOperations, implicit val context: ActorContex obj.Velocity = vel sessionLogic.updateBlockMap(obj, pos) obj.zoneInteractions() - continent.VehicleEvents ! VehicleServiceMessage( + continent.VehicleEvents ! CachedMessage( continent.id, player.GUID, VehicleAction.VehicleState(vehicle_guid, unk1, pos, ang, obj.Velocity, obj.Flying, 0, 0, 15, unk5 = false, obj.Cloaked) diff --git a/src/main/scala/net/psforever/actors/session/normal/GeneralLogic.scala b/src/main/scala/net/psforever/actors/session/normal/GeneralLogic.scala index 14ac88743..e7004f6cc 100644 --- a/src/main/scala/net/psforever/actors/session/normal/GeneralLogic.scala +++ b/src/main/scala/net/psforever/actors/session/normal/GeneralLogic.scala @@ -43,6 +43,7 @@ import net.psforever.packet.PlanetSideGamePacket import net.psforever.packet.game.{ActionCancelMessage, ActionResultMessage, AvatarFirstTimeEventMessage, AvatarImplantMessage, AvatarJumpMessage, BattleplanMessage, BindPlayerMessage, BugReportMessage, ChangeFireModeMessage, ChangeShortcutBankMessage, CharacterCreateRequestMessage, CharacterRequestAction, CharacterRequestMessage, ChatMsg, CollisionIs, ConnectToWorldRequestMessage, CreateShortcutMessage, DeadState, DeployObjectMessage, DisplayedAwardMessage, DropItemMessage, EmoteMsg, FacilityBenefitShieldChargeRequestMessage, FriendsRequest, GenericAction, GenericActionMessage, GenericCollisionMsg, GenericObjectActionAtPositionMessage, GenericObjectActionMessage, GenericObjectStateMsg, HitHint, InvalidTerrainMessage, LootItemMessage, MoveItemMessage, ObjectDetectedMessage, ObjectHeldMessage, OutfitMembershipRequest, OutfitMembershipRequestAction, OutfitMembershipResponse, OutfitRequest, OutfitRequestAction, PickupItemMessage, PlanetsideAttributeMessage, PlayerStateMessageUpstream, RequestDestroyMessage, TargetingImplantRequest, TerrainCondition, TradeMessage, UnuseItemMessage, UseItemMessage, VoiceHostInfo, VoiceHostRequest, ZipLineMessage} import net.psforever.services.account.{AccountPersistenceService, RetrieveAccountData} import net.psforever.services.avatar.{AvatarAction, AvatarServiceMessage} +import net.psforever.services.base.CachedMessage import net.psforever.services.base.messages.PlanetsideAttribute import net.psforever.services.local.support.CaptureFlagManager import net.psforever.types.{CapacitorStateType, ChatMessageType, Cosmetic, ExoSuitType, ImplantType, PlanetSideEmpire, PlanetSideGUID, Vector3} @@ -196,7 +197,7 @@ class GeneralLogic(val ops: GeneralOperations, implicit val context: ActorContex isNotVisible, eagleEye ) - ) + ) //todo CachedMessage sessionLogic.squad.updateSquad() if (player.death_by == -1) { sessionLogic.kickedByAdministration() diff --git a/src/main/scala/net/psforever/actors/session/normal/VehicleLogic.scala b/src/main/scala/net/psforever/actors/session/normal/VehicleLogic.scala index ed6e4171e..59fd325c9 100644 --- a/src/main/scala/net/psforever/actors/session/normal/VehicleLogic.scala +++ b/src/main/scala/net/psforever/actors/session/normal/VehicleLogic.scala @@ -12,6 +12,7 @@ import net.psforever.objects.vehicles.control.BfrFlight import net.psforever.objects.zones.Zone import net.psforever.objects.zones.interaction.InteractsWithZone import net.psforever.packet.game.{ChatMsg, ChildObjectStateMessage, DeployRequestMessage, FrameVehicleStateMessage, VehicleStateMessage, VehicleSubStateMessage} +import net.psforever.services.base.CachedMessage import net.psforever.services.vehicle.{VehicleAction, VehicleServiceMessage} import net.psforever.types.{ChatMessageType, DriveState, Vector3} @@ -75,7 +76,7 @@ class VehicleLogic(val ops: VehicleOperations, implicit val context: ActorContex obj.Position = position obj.Orientation = angle // - continent.VehicleEvents ! VehicleServiceMessage( + continent.VehicleEvents ! CachedMessage( continent.id, player.GUID, VehicleAction.VehicleState( @@ -166,7 +167,7 @@ class VehicleLogic(val ops: VehicleOperations, implicit val context: ActorContex continent.id, player.GUID, VehicleAction.FrameVehicleState(vehicle_guid, unk1, position, angle, velocity, unk2, unk3, unk4, is_crouched, is_airborne, ascending_flight, flight_time, unk9, unkA) - ) + ) //todo CachedMessage sessionLogic.squad.updateSquad() case (None, _) => //log.error(s"VehicleState: no vehicle $vehicle_guid found in zone") @@ -219,7 +220,7 @@ class VehicleLogic(val ops: VehicleOperations, implicit val context: ActorContex continent.id, player.GUID, VehicleAction.ChildObjectState(object_guid, pitch, yaw) - ) + ) //todo CachedMessage } //TODO status condition of "playing getting out of vehicle to allow for late packets without warning if (player.death_by == -1) { @@ -238,7 +239,7 @@ class VehicleLogic(val ops: VehicleOperations, implicit val context: ActorContex obj.Velocity = vel sessionLogic.updateBlockMap(obj, pos) obj.zoneInteractions() - continent.VehicleEvents ! VehicleServiceMessage( + continent.VehicleEvents ! CachedMessage( continent.id, player.GUID, VehicleAction.VehicleState(vehicle_guid, unk1, pos, ang, obj.Velocity, obj.Flying, 0, 0, 15, unk5 = false, obj.Cloaked) diff --git a/src/main/scala/net/psforever/actors/session/spectator/GeneralLogic.scala b/src/main/scala/net/psforever/actors/session/spectator/GeneralLogic.scala index 836538004..7755718c4 100644 --- a/src/main/scala/net/psforever/actors/session/spectator/GeneralLogic.scala +++ b/src/main/scala/net/psforever/actors/session/spectator/GeneralLogic.scala @@ -17,6 +17,7 @@ import net.psforever.packet.PlanetSideGamePacket import net.psforever.packet.game.{ActionCancelMessage, AvatarFirstTimeEventMessage, AvatarImplantMessage, AvatarJumpMessage, BattleplanMessage, BindPlayerMessage, BugReportMessage, ChangeFireModeMessage, ChangeShortcutBankMessage, CharacterCreateRequestMessage, CharacterRequestMessage, CollisionIs, ConnectToWorldRequestMessage, CreateShortcutMessage, DeployObjectMessage, DisplayedAwardMessage, DropItemMessage, EmoteMsg, FacilityBenefitShieldChargeRequestMessage, FriendsRequest, GenericAction, GenericActionMessage, GenericCollisionMsg, GenericObjectActionAtPositionMessage, GenericObjectActionMessage, GenericObjectStateMsg, HitHint, ImplantAction, InvalidTerrainMessage, LootItemMessage, MoveItemMessage, ObjectDetectedMessage, ObjectHeldMessage, OutfitMembershipRequest, OutfitMembershipResponse, OutfitRequest, PickupItemMessage, PlanetsideAttributeMessage, PlayerStateMessageUpstream, RequestDestroyMessage, TargetingImplantRequest, TradeMessage, UnuseItemMessage, UseItemMessage, VoiceHostInfo, VoiceHostRequest, ZipLineMessage} import net.psforever.services.account.AccountPersistenceService import net.psforever.services.avatar.{AvatarAction, AvatarServiceMessage} +import net.psforever.services.base.CachedMessage import net.psforever.services.base.messages.PlanetsideAttribute import net.psforever.types.{ExoSuitType, Vector3} @@ -75,7 +76,7 @@ class GeneralLogic(val ops: GeneralOperations, implicit val context: ActorContex player.Crouching = isCrouching player.Jumping = isJumping player.Cloaked = player.ExoSuit == ExoSuitType.Infiltration && isCloaking - continent.AvatarEvents ! AvatarServiceMessage( + continent.AvatarEvents ! CachedMessage( "spectator", avatarGuid, AvatarAction.PlayerState( diff --git a/src/main/scala/net/psforever/actors/session/spectator/VehicleLogic.scala b/src/main/scala/net/psforever/actors/session/spectator/VehicleLogic.scala index ec7b4d4b4..af51a9628 100644 --- a/src/main/scala/net/psforever/actors/session/spectator/VehicleLogic.scala +++ b/src/main/scala/net/psforever/actors/session/spectator/VehicleLogic.scala @@ -7,6 +7,7 @@ import net.psforever.objects.serverobject.PlanetSideServerObject import net.psforever.objects.Vehicle import net.psforever.objects.serverobject.deploy.Deployment import net.psforever.packet.game.{ChildObjectStateMessage, DeployRequestMessage, FrameVehicleStateMessage, VehicleStateMessage, VehicleSubStateMessage} +import net.psforever.services.base.CachedMessage import net.psforever.services.vehicle.{VehicleAction, VehicleServiceMessage} import net.psforever.types.{DriveState, Vector3} @@ -39,7 +40,7 @@ class VehicleLogic(val ops: VehicleOperations, implicit val context: ActorContex obj.Velocity = vel sessionLogic.updateBlockMap(obj, pos) obj.zoneInteractions() - continent.VehicleEvents ! VehicleServiceMessage( + continent.VehicleEvents ! CachedMessage( continent.id, player.GUID, VehicleAction.VehicleState(vehicle_guid, unk1, pos, ang, obj.Velocity, obj.Flying, 0, 0, 15, unk5 = false, obj.Cloaked) diff --git a/src/main/scala/net/psforever/actors/session/support/SessionMountHandlers.scala b/src/main/scala/net/psforever/actors/session/support/SessionMountHandlers.scala index 443ac5f85..12c448913 100644 --- a/src/main/scala/net/psforever/actors/session/support/SessionMountHandlers.scala +++ b/src/main/scala/net/psforever/actors/session/support/SessionMountHandlers.scala @@ -8,6 +8,7 @@ import net.psforever.objects.{PlanetSideGameObject, Tool, Vehicle} import net.psforever.objects.vehicles.{CargoBehavior, MountableWeapons} import net.psforever.objects.vital.InGameHistory import net.psforever.packet.game.{DismountVehicleCargoMsg, GenericObjectActionMessage, InventoryStateMessage, MountVehicleCargoMsg, MountVehicleMsg, ObjectAttachMessage, ObjectDetachMessage, PlanetsideAttributeMessage} +import net.psforever.services.base.CachedMessage import net.psforever.services.base.messages.SendResponse import net.psforever.services.vehicle.{VehicleAction, VehicleServiceMessage} import net.psforever.types.{BailType, PlanetSideGUID, Vector3} @@ -224,7 +225,7 @@ class SessionMountHandlers( sessionLogic.vehicles.ServerVehicleOverrideStop(v) }*/ v.Velocity = Vector3.Zero - continent.VehicleEvents ! VehicleServiceMessage( + continent.VehicleEvents ! CachedMessage( continent.id, tplayer.GUID, VehicleAction.VehicleState(v.GUID, unk1 = 0, tplayer.Position, v.Orientation, v.Velocity, v.Flying, unk3 = 0, unk4 = 0, wheel_direction = 15, unk5 = false, unk6 = v.Cloaked) diff --git a/src/main/scala/net/psforever/actors/session/support/WeaponAndProjectileOperations.scala b/src/main/scala/net/psforever/actors/session/support/WeaponAndProjectileOperations.scala index ca6fb0509..c9ac61149 100644 --- a/src/main/scala/net/psforever/actors/session/support/WeaponAndProjectileOperations.scala +++ b/src/main/scala/net/psforever/actors/session/support/WeaponAndProjectileOperations.scala @@ -28,6 +28,7 @@ import net.psforever.objects.vital.projectile.ProjectileReason import net.psforever.objects.zones.exp.ToDatabase import net.psforever.packet.game.UplinkRequest import net.psforever.services.Service +import net.psforever.services.base.CachedMessage import net.psforever.services.base.messages.{ChangeAmmo, ChangeFireState_Start, ChangeFireState_Stop, ReloadTool, SendResponse, WeaponDryFire} import net.psforever.services.local.LocalServiceMessage import net.psforever.types.{ChatMessageType, PlanetSideEmpire, ValidPlanetSideGUID, Vector3} @@ -493,7 +494,7 @@ class WeaponAndProjectileOperations( projectile.Position = shot_pos projectile.Orientation = shot_orient projectile.Velocity = shot_vel - continent.AvatarEvents ! AvatarServiceMessage( + continent.AvatarEvents ! CachedMessage( continent.id, player.GUID, AvatarAction.ProjectileState( diff --git a/src/main/scala/net/psforever/services/avatar/AvatarService.scala b/src/main/scala/net/psforever/services/avatar/AvatarService.scala index dad3e9070..5056236cb 100644 --- a/src/main/scala/net/psforever/services/avatar/AvatarService.scala +++ b/src/main/scala/net/psforever/services/avatar/AvatarService.scala @@ -4,7 +4,7 @@ package net.psforever.services.avatar import akka.actor.{ActorContext, ActorRef, Props} import net.psforever.objects.zones.Zone import net.psforever.services.avatar.support.{CorpseRemovalActor, DroppedItemRemover} -import net.psforever.services.base.{EventServiceSupport, GenericEventServiceWithSupport, GenericMessageEnvelope} +import net.psforever.services.base.{EventServiceSupport, GenericEventServiceWithCacheAndSupport, GenericMessageEnvelope} case object CorpseRemovalSupport extends EventServiceSupport { @@ -14,18 +14,18 @@ case object CorpseRemovalSupport } } -case object ItemRemoverSupport +case object LitterRemovalSupport extends EventServiceSupport { def label: String = "janitor" def constructor(context: ActorContext): ActorRef = { - context.actorOf(Props[DroppedItemRemover](), name = "ItemRemover") + context.actorOf(Props[DroppedItemRemover](), name = "DroppedItemRemover") } } class AvatarService(zone: Zone) - extends GenericEventServiceWithSupport[AvatarServiceResponse]( + extends GenericEventServiceWithCacheAndSupport[AvatarServiceResponse]( busName = "Avatar", - eventSupportServices = List(CorpseRemovalSupport, ItemRemoverSupport) + eventSupportServices = List(CorpseRemovalSupport, LitterRemovalSupport) ) { protected def composeResponseEnvelope(msg: GenericMessageEnvelope): AvatarServiceResponse = { AvatarServiceResponse(formatChannelOnBusName(msg.channel), msg.filter, msg.msg.response()) diff --git a/src/main/scala/net/psforever/services/base/GenericEventService.scala b/src/main/scala/net/psforever/services/base/GenericEventService.scala index 913914b2c..9274b8ad9 100644 --- a/src/main/scala/net/psforever/services/base/GenericEventService.scala +++ b/src/main/scala/net/psforever/services/base/GenericEventService.scala @@ -45,12 +45,16 @@ abstract class GenericEventService[OUT <: GenericResponseEnvelope](busName: Stri eventBus.unsubscribe(sender()) } + protected def commonBehavior: Receive = { + case msg: GenericMessageEnvelope => + handleMessage(msg) + } + def receive: Receive = commonJoinBehavior .orElse(commonLeaveBehavior) + .orElse(commonBehavior) .orElse { - case msg: GenericMessageEnvelope => - handleMessage(msg) - case msg => () + case msg => log.warn(s"Unhandled message $msg from ${sender()}") } diff --git a/src/main/scala/net/psforever/services/base/GenericEventServiceWithCacheAndSupport.scala b/src/main/scala/net/psforever/services/base/GenericEventServiceWithCacheAndSupport.scala new file mode 100644 index 000000000..3c563ab39 --- /dev/null +++ b/src/main/scala/net/psforever/services/base/GenericEventServiceWithCacheAndSupport.scala @@ -0,0 +1,113 @@ +// Copyright (c) 2026 PSForever +package net.psforever.services.base + +import net.psforever.services.Service +import net.psforever.types.PlanetSideGUID + +import scala.collection.concurrent.{Map => CMap} +import scala.jdk.CollectionConverters._ +import java.util.concurrent.ConcurrentHashMap +import scala.concurrent.ExecutionContext.Implicits.global + +/* +Adapted from the rating limiting code in https://github.com/Pinapse/giant with permission + */ + +trait CachedGenericEventMessageEnvelope + extends GenericMessageEnvelope { + def guid: PlanetSideGUID +} + +final case class CachedMessage(guid: PlanetSideGUID, channel: String, filter: PlanetSideGUID, msg: EventMessage) + extends CachedGenericEventMessageEnvelope + +object CachedMessage { + def apply(channel: String, filter: PlanetSideGUID, msg: EventMessage): GenericMessageEnvelope = { + if (filter == Service.defaultPlayerGUID) { + MessageEnvelope(channel, filter, msg) + } else { + CachedMessage(filter, channel, filter, msg) + } + } +} + +object CachedGenericEventMessageEnvelope { + def apply(channel: String, filter: PlanetSideGUID, msg: EventMessage): GenericMessageEnvelope = { + if (filter == Service.defaultPlayerGUID) { + MessageEnvelope(channel, filter, msg) + } else { + CachedMessage(filter, channel, filter, msg) + } + } + + def apply(guid: PlanetSideGUID, channel: String, filter: PlanetSideGUID, msg: EventMessage): GenericMessageEnvelope = { + if (guid == Service.defaultPlayerGUID) { + MessageEnvelope(channel, filter, msg) + } else { + CachedMessage(guid, channel, filter, msg) + } + } +} + +private case object FlushCachedMessages + +abstract class GenericEventServiceWithCacheAndSupport[OUT <: GenericResponseEnvelope] +( + busName: String, + eventSupportServices: List[EventServiceSupport] +) extends GenericEventServiceWithSupport[OUT](busName, eventSupportServices) { + private val flushCacheWait: Long = 50 //milliseconds + private var hasCachedMessages: Boolean = false + private var nextTimeToFlushCache: Long = 0L + private val cache: CMap[String, CMap[String, CMap[PlanetSideGUID, GenericMessageEnvelope]]] = + new ConcurrentHashMap[String, CMap[String, CMap[PlanetSideGUID, GenericMessageEnvelope]]]().asScala + + override def postStop(): Unit = { + flushCache() + super.postStop() + } + + private def tryRetimeFlushCache(): Unit = { + if (!hasCachedMessages) { + hasCachedMessages = true + nextTimeToFlushCache = System.currentTimeMillis() + flushCacheWait + } + } + + private def pushToCache(event: CachedGenericEventMessageEnvelope): Unit = { + val eventClassName = event.msg.getClass.getName + val updateBranch = cache + .getOrElseUpdate(event.channel, new ConcurrentHashMap[String, CMap[PlanetSideGUID, GenericMessageEnvelope]]().asScala) + .getOrElseUpdate(eventClassName, new ConcurrentHashMap[PlanetSideGUID, GenericMessageEnvelope]().asScala) + updateBranch.updateWith(event.guid) { _ => Some(event) } + tryRetimeFlushCache() + } + + private def tryFlushCache(): Unit = { + if (hasCachedMessages && nextTimeToFlushCache < System.currentTimeMillis()) { + flushCache() + } + } + + private def flushCache(): Unit = { + cache.foreachEntry { (_, map) => + map.foreachEntry { (_, map) => + map.foreachEntry { (_, event) => + super.handleMessage(event) + } + map.clear() + } + } + hasCachedMessages = false + } + + override protected def handleMessage(event: GenericMessageEnvelope): Unit = { + event match { + case envelope: CachedGenericEventMessageEnvelope => + pushToCache(envelope) + case _ => + super.handleMessage(event) + } + tryFlushCache() + } +} diff --git a/src/main/scala/net/psforever/services/vehicle/VehicleService.scala b/src/main/scala/net/psforever/services/vehicle/VehicleService.scala index 47af9487a..213c7ac55 100644 --- a/src/main/scala/net/psforever/services/vehicle/VehicleService.scala +++ b/src/main/scala/net/psforever/services/vehicle/VehicleService.scala @@ -3,7 +3,7 @@ package net.psforever.services.vehicle import akka.actor.{ActorContext, ActorRef, Props} import net.psforever.objects.zones.Zone -import net.psforever.services.base.{EventServiceSupport, GenericEventServiceWithSupport, GenericMessageEnvelope} +import net.psforever.services.base.{EventServiceSupport, GenericEventServiceWithCacheAndSupport, GenericMessageEnvelope} import net.psforever.services.vehicle.support.TurretUpgrader case object TurretUpgradeSupport @@ -15,7 +15,7 @@ case object TurretUpgradeSupport } class VehicleService(zone: Zone) - extends GenericEventServiceWithSupport[VehicleServiceResponse]( + extends GenericEventServiceWithCacheAndSupport[VehicleServiceResponse]( busName = "Vehicle", eventSupportServices = List(TurretUpgradeSupport) ) {