From 5ece864bfd89fe760dc8b35bcce6ac320979e8f0 Mon Sep 17 00:00:00 2001 From: Fate-JH Date: Tue, 27 Jan 2026 19:23:37 -0500 Subject: [PATCH] solved issue with redundant support actor messaging; refactored event buses out of event systems --- .../services/base/GenericEventService.scala | 36 +++++++++---------- .../base/GenericEventServiceWithSupport.scala | 28 +++++++++------ .../base/{ => bus}/GenericEventBus.scala | 16 ++++++--- .../base/bus/GenericEventBusWithSupport.scala | 32 +++++++++++++++++ .../base/{ => bus}/GenericGuidEventBus.scala | 23 ++++++------ .../psforever/services/hart/HartTimer.scala | 9 +++-- .../teamwork/SquadServiceResponse.scala | 11 +++--- .../teamwork/SquadSubscriptionEntity.scala | 2 +- .../services/vehicle/VehicleService.scala | 2 +- .../vehicle/VehicleServiceResponse.scala | 11 +++--- 10 files changed, 113 insertions(+), 57 deletions(-) rename src/main/scala/net/psforever/services/base/{ => bus}/GenericEventBus.scala (66%) create mode 100644 src/main/scala/net/psforever/services/base/bus/GenericEventBusWithSupport.scala rename src/main/scala/net/psforever/services/base/{ => bus}/GenericGuidEventBus.scala (86%) diff --git a/src/main/scala/net/psforever/services/base/GenericEventService.scala b/src/main/scala/net/psforever/services/base/GenericEventService.scala index a9c69a4df..0ec9aa690 100644 --- a/src/main/scala/net/psforever/services/base/GenericEventService.scala +++ b/src/main/scala/net/psforever/services/base/GenericEventService.scala @@ -3,21 +3,18 @@ package net.psforever.services.base import akka.actor.Actor import net.psforever.services.Service -import net.psforever.types.PlanetSideGUID +import net.psforever.services.base.bus.{AllGenericBusMsg, GenericEventBus, GenericEventBusResponse} import org.log4s.Logger import scala.annotation.unused trait GenericResponseEnvelope - extends GenericEventBusMsg { - def filter: PlanetSideGUID + extends GenericEventBusResponse { def reply: EventResponse - def inner: EventResponse = reply } -trait GenericMessageEnvelope { - def channel: String - def filter: PlanetSideGUID +trait GenericMessageEnvelope + extends AllGenericBusMsg { def msg: EventMessage } @@ -25,7 +22,7 @@ abstract class GenericEventService[OUT <: GenericResponseEnvelope](busName: Stri extends Actor { protected lazy val log: Logger = org.log4s.getLogger(getClass.getSimpleName) - protected val eventBus = new GenericEventBus[OUT] + protected val eventBus: GenericEventBus[OUT] = setupEventBus() def BusName: String = busName @@ -48,20 +45,23 @@ abstract class GenericEventService[OUT <: GenericResponseEnvelope](busName: Stri eventBus.unsubscribe(sender()) } - def receive: Receive = - commonJoinBehavior.orElse(commonLeaveBehavior) - .orElse { - case msg: GenericMessageEnvelope => - handleMessage(msg) + def receive: Receive = commonJoinBehavior + .orElse(commonLeaveBehavior) + .orElse { + case msg: GenericMessageEnvelope => + handleMessage(msg) + case msg => () + log.warn(s"Unhandled message $msg from ${sender()}") + } - case msg => () - log.warn(s"Unhandled message $msg from ${sender()}") - } - - protected def handleMessage(msg: GenericMessageEnvelope): Unit = { + private def handleMessage(msg: GenericMessageEnvelope): Unit = { eventBus.publish(compose(msg)) } + protected def setupEventBus(): GenericEventBus[OUT] = { + new GenericEventBus[OUT] + } + protected def compose(@unused msg: GenericMessageEnvelope): OUT def formatChannelOnBusName(channel: String): String = GenericEventService.BusOnChannelFormat(busName)(channel) diff --git a/src/main/scala/net/psforever/services/base/GenericEventServiceWithSupport.scala b/src/main/scala/net/psforever/services/base/GenericEventServiceWithSupport.scala index c4a531000..c40368389 100644 --- a/src/main/scala/net/psforever/services/base/GenericEventServiceWithSupport.scala +++ b/src/main/scala/net/psforever/services/base/GenericEventServiceWithSupport.scala @@ -3,6 +3,7 @@ package net.psforever.services.base import akka.actor.{ActorContext, ActorRef} import net.psforever.services.Service +import net.psforever.services.base.bus.{GenericEventBus, GenericEventBusResponseToSupport, GenericEventBusWithSupport} import net.psforever.types.PlanetSideGUID import scala.annotation.unused @@ -36,16 +37,6 @@ abstract class GenericEventServiceWithSupport[OUT <: GenericResponseEnvelope] .map { supportService => (supportService.label, supportService.constructor(context)) } .toMap[String, ActorRef] - private def supportReceive: Receive = { - case msg: GenericMessageToSupportEnvelopeOnly => - forwardToSupport(msg) - case msg: GenericMessageToSupportEnvelope => - forwardToSupport(msg) - handleMessage(msg) - } - - override def receive: Receive = supportReceive.orElse(super.receive) - private def forwardToSupport(msg: GenericMessageToSupportEnvelope): Unit = { supportServices .get(msg.supportLabel) @@ -57,4 +48,21 @@ abstract class GenericEventServiceWithSupport[OUT <: GenericResponseEnvelope] log.error(s"support service ${msg.supportLabel} was not found - check message routing or service params") } } + + override protected def setupEventBus(): GenericEventBus[OUT] = { + new GenericEventBus[OUT] with GenericEventBusWithSupport[OUT] { + override def publish(event: OUT): Unit = publishingWithSupport(event) + + override def forwardToExternalSupport(msg: GenericEventBusResponseToSupport): Unit = { + msg match { + case supportMessage: GenericMessageToSupportEnvelope => forwardToExternal(supportMessage) + case _ => () + } + } + + private def forwardToExternal(msg: GenericMessageToSupportEnvelope): Unit = { + forwardToSupport(msg) + } + } + } } diff --git a/src/main/scala/net/psforever/services/base/GenericEventBus.scala b/src/main/scala/net/psforever/services/base/bus/GenericEventBus.scala similarity index 66% rename from src/main/scala/net/psforever/services/base/GenericEventBus.scala rename to src/main/scala/net/psforever/services/base/bus/GenericEventBus.scala index 74e929a91..faaa81490 100644 --- a/src/main/scala/net/psforever/services/base/GenericEventBus.scala +++ b/src/main/scala/net/psforever/services/base/bus/GenericEventBus.scala @@ -1,15 +1,19 @@ // Copyright (c) 2017 PSForever -package net.psforever.services.base +package net.psforever.services.base.bus import akka.event.{ActorEventBus, SubchannelClassification} import akka.util.Subclassification +import net.psforever.types.PlanetSideGUID -trait GenericEventBusMsg { +trait AllGenericBusMsg { def channel: String - def inner: Any + def filter: PlanetSideGUID } -class GenericEventBus[A <: GenericEventBusMsg] +trait GenericEventBusResponse + extends AllGenericBusMsg + +class GenericEventBus[A <: GenericEventBusResponse] extends ActorEventBus with SubchannelClassification { type Event = A type Classifier = String @@ -26,4 +30,8 @@ class GenericEventBus[A <: GenericEventBusMsg] protected def publish(event: Event, subscriber: Subscriber): Unit = { subscriber ! event } + + def truePublish(event: Event): Unit = { + super[SubchannelClassification].publish(event) + } } diff --git a/src/main/scala/net/psforever/services/base/bus/GenericEventBusWithSupport.scala b/src/main/scala/net/psforever/services/base/bus/GenericEventBusWithSupport.scala new file mode 100644 index 000000000..34381ccb1 --- /dev/null +++ b/src/main/scala/net/psforever/services/base/bus/GenericEventBusWithSupport.scala @@ -0,0 +1,32 @@ +// Copyright (c) 2026 PSForever +package net.psforever.services.base.bus + +import scala.annotation.unused + +trait GenericEventBusResponseToSupport + extends GenericEventBusResponse { + def supportLabel: String + def supportMessage: Any +} + +trait GenericEventBusResponseToSupportOnly + extends GenericEventBusResponseToSupport { + def channel: String = "" +} + +trait GenericEventBusWithSupport[T <: GenericEventBusResponse] { + bus: GenericEventBus[T] => + def publishingWithSupport(event: T): Unit = { + event match { + case msg: GenericEventBusResponseToSupportOnly => + forwardToExternalSupport(msg) + case msg: GenericEventBusResponseToSupport => + forwardToExternalSupport(msg) + bus.truePublish(event) + case _ => + bus.truePublish(event) + } + } + + def forwardToExternalSupport(@unused msg: GenericEventBusResponseToSupport): Unit +} diff --git a/src/main/scala/net/psforever/services/base/GenericGuidEventBus.scala b/src/main/scala/net/psforever/services/base/bus/GenericGuidEventBus.scala similarity index 86% rename from src/main/scala/net/psforever/services/base/GenericGuidEventBus.scala rename to src/main/scala/net/psforever/services/base/bus/GenericGuidEventBus.scala index 16ed9d188..d583194e2 100644 --- a/src/main/scala/net/psforever/services/base/GenericGuidEventBus.scala +++ b/src/main/scala/net/psforever/services/base/bus/GenericGuidEventBus.scala @@ -1,21 +1,24 @@ // Copyright (c) 2026 PSForever -package net.psforever.services.base +package net.psforever.services.base.bus import net.psforever.types.PlanetSideGUID -import scala.collection.concurrent.{Map => CMap} -import scala.jdk.CollectionConverters._ + import java.util.concurrent.ConcurrentHashMap import scala.annotation.unused +import scala.collection.concurrent.{Map => CMap} +import scala.jdk.CollectionConverters._ /* Adapted from the rating limiting code in https://github.com/Pinapse/giant with permission */ -trait GenericGuidEventBusMsg extends GenericEventBusMsg { +trait GenericGuidEventBusResponse + extends GenericEventBusResponse { def guid: PlanetSideGUID + def inner: Any } -class RateLimitScheduler[A <: GenericGuidEventBusMsg](eventBus: GenericGuidEventBus[A], interval: Long) extends Thread { +class RateLimitScheduler[A <: GenericGuidEventBusResponse](eventBus: GenericEventBus[A], interval: Long) extends Thread { private var hasWork: Boolean = false private var working: Boolean = false private var timeOfLastFlush: Long = 0L @@ -50,7 +53,7 @@ class RateLimitScheduler[A <: GenericGuidEventBusMsg](eventBus: GenericGuidEvent buffer.foreachEntry { (_, map) => map.foreachEntry { (_, map) => map.foreachEntry { (_, event) => - eventBus.publishForce(event) + eventBus.truePublish(event) } map.clear() } @@ -75,7 +78,7 @@ class RateLimitScheduler[A <: GenericGuidEventBusMsg](eventBus: GenericGuidEvent } } -abstract class GenericGuidEventBus[A <: GenericGuidEventBusMsg](rateLimit: Double) +abstract class GenericGuidEventBus[A <: GenericGuidEventBusResponse](rateLimit: Double) extends GenericEventBus[A] { private val rateLimitedDispatch = new RateLimitScheduler[A]( eventBus = this, @@ -87,16 +90,12 @@ abstract class GenericGuidEventBus[A <: GenericGuidEventBusMsg](rateLimit: Doubl if (rateLimit > 0 && shouldRateLimit(event) && rateLimitedDispatch.isRunning) { rateLimitedDispatch.push(event) } else { - publishForce(event) + truePublish(event) } } def shouldRateLimit(@unused event: Event): Boolean - def publishForce(event: Event): Unit = { - super.publish(event) - } - // override protected def publish(event: Event, subscriber: Subscriber): Unit = { // val trimmedEventClassName = // event.inner.getClass().getName().stripPrefix(event.inner.getClass.getPackageName() + ".").stripSuffix("$") diff --git a/src/main/scala/net/psforever/services/hart/HartTimer.scala b/src/main/scala/net/psforever/services/hart/HartTimer.scala index 8267e2f76..ab5d2d4d7 100644 --- a/src/main/scala/net/psforever/services/hart/HartTimer.scala +++ b/src/main/scala/net/psforever/services/hart/HartTimer.scala @@ -4,7 +4,9 @@ package net.psforever.services.hart import akka.actor.{Actor, ActorRef, Cancellable} import net.psforever.objects.Default import net.psforever.objects.zones.Zone -import net.psforever.services.base.{EventResponse, GenericEventBus, GenericEventBusMsg} +import net.psforever.services.Service +import net.psforever.services.base.bus.{GenericEventBus, GenericEventBusResponse} +import net.psforever.services.base.EventResponse import net.psforever.services.local.{LocalAction, LocalServiceMessage} import net.psforever.types.{HartSequence, PlanetSideGUID} @@ -259,7 +261,10 @@ object HartTimer { * to relay instructions back to the individual facility amenity portions of this HART system. * The channel is blank because it does not need special designation. */ - trait Command extends EventResponse with GenericEventBusMsg { def channel: String = "" } + trait Command extends EventResponse with GenericEventBusResponse { + def channel: String = "" + def filter: PlanetSideGUID = Service.defaultPlayerGUID + } /** * Forbid entry through the boartding gantry doors. */ diff --git a/src/main/scala/net/psforever/services/teamwork/SquadServiceResponse.scala b/src/main/scala/net/psforever/services/teamwork/SquadServiceResponse.scala index 49600cc48..af6b2f622 100644 --- a/src/main/scala/net/psforever/services/teamwork/SquadServiceResponse.scala +++ b/src/main/scala/net/psforever/services/teamwork/SquadServiceResponse.scala @@ -5,11 +5,14 @@ import akka.actor.ActorRef import net.psforever.objects.avatar.Certification import net.psforever.objects.teamwork.Squad import net.psforever.packet.game.{SquadDetail, SquadInfo, WaypointEventAction, WaypointInfo} +import net.psforever.services.Service import net.psforever.types.{ChatMessageType, PlanetSideGUID, SquadResponseType, SquadWaypoint} -import net.psforever.services.base.{EventResponse, GenericEventBusMsg} +import net.psforever.services.base.{EventResponse, GenericResponseEnvelope} -final case class SquadServiceResponse(channel: String, exclude: Iterable[Long], response: SquadResponse.Response) - extends EventResponse with GenericEventBusMsg +final case class SquadServiceResponse(channel: String, exclude: Iterable[Long], reply: SquadResponse.Response) + extends EventResponse with GenericResponseEnvelope { + override def filter: PlanetSideGUID = Service.defaultPlayerGUID +} object SquadServiceResponse { def apply(toChannel: String, response: SquadResponse.Response): SquadServiceResponse = @@ -20,7 +23,7 @@ object SquadServiceResponse { } object SquadResponse { - sealed trait Response + sealed trait Response extends EventResponse final case class ListSquadFavorite(line: Int, task: String) extends Response diff --git a/src/main/scala/net/psforever/services/teamwork/SquadSubscriptionEntity.scala b/src/main/scala/net/psforever/services/teamwork/SquadSubscriptionEntity.scala index af47196b6..e1aca37d9 100644 --- a/src/main/scala/net/psforever/services/teamwork/SquadSubscriptionEntity.scala +++ b/src/main/scala/net/psforever/services/teamwork/SquadSubscriptionEntity.scala @@ -6,7 +6,7 @@ import akka.actor.ActorRef import scala.collection.mutable import net.psforever.objects.teamwork.{Squad, SquadFeatures} import net.psforever.packet.game.SquadDetail -import net.psforever.services.base.GenericEventBus +import net.psforever.services.base.bus.GenericEventBus import net.psforever.types.{PlanetSideEmpire, PlanetSideGUID} class SquadSubscriptionEntity { diff --git a/src/main/scala/net/psforever/services/vehicle/VehicleService.scala b/src/main/scala/net/psforever/services/vehicle/VehicleService.scala index 4ac69bfcc..0116f278a 100644 --- a/src/main/scala/net/psforever/services/vehicle/VehicleService.scala +++ b/src/main/scala/net/psforever/services/vehicle/VehicleService.scala @@ -6,10 +6,10 @@ import net.psforever.objects.serverobject.pad.VehicleSpawnPad import net.psforever.objects.zones.Zone import net.psforever.packet.game.ObjectCreateMessage import net.psforever.packet.game.objectcreate.ObjectCreateMessageParent -import net.psforever.services.base.GenericEventBus import net.psforever.services.vehicle.support.TurretUpgrader import net.psforever.types.DriveState import net.psforever.services.Service +import net.psforever.services.base.bus.GenericEventBus class VehicleService(zone: Zone) extends Actor { private val turretUpgrade: ActorRef = context.actorOf(Props[TurretUpgrader](), s"${zone.id}-turret-upgrade-agent") diff --git a/src/main/scala/net/psforever/services/vehicle/VehicleServiceResponse.scala b/src/main/scala/net/psforever/services/vehicle/VehicleServiceResponse.scala index 166d12302..dd052809f 100644 --- a/src/main/scala/net/psforever/services/vehicle/VehicleServiceResponse.scala +++ b/src/main/scala/net/psforever/services/vehicle/VehicleServiceResponse.scala @@ -11,13 +11,14 @@ import net.psforever.packet.PlanetSideGamePacket import net.psforever.packet.game.objectcreate.ConstructorData import net.psforever.packet.game.ObjectCreateMessage import net.psforever.types.{BailType, DriveState, PlanetSideGUID, Vector3} -import net.psforever.services.base.{EventResponse, GenericEventBusMsg} +import net.psforever.services.base.EventResponse +import net.psforever.services.base.bus.GenericEventBusResponse final case class VehicleServiceResponse( - channel: String, - avatar_guid: PlanetSideGUID, - replyMessage: VehicleResponse.Response -) extends GenericEventBusMsg + channel: String, + filter: PlanetSideGUID, + reply: VehicleResponse.Response + ) extends GenericEventBusResponse object VehicleResponse { trait Response extends EventResponse