solved issue with redundant support actor messaging; refactored event buses out of event systems

This commit is contained in:
Fate-JH 2026-01-27 19:23:37 -05:00
parent 9ba1b3048e
commit 5ece864bfd
10 changed files with 113 additions and 57 deletions

View file

@ -3,21 +3,18 @@ package net.psforever.services.base
import akka.actor.Actor
import net.psforever.services.Service
import net.psforever.types.PlanetSideGUID
import net.psforever.services.base.bus.{AllGenericBusMsg, GenericEventBus, GenericEventBusResponse}
import org.log4s.Logger
import scala.annotation.unused
trait GenericResponseEnvelope
extends GenericEventBusMsg {
def filter: PlanetSideGUID
extends GenericEventBusResponse {
def reply: EventResponse
def inner: EventResponse = reply
}
trait GenericMessageEnvelope {
def channel: String
def filter: PlanetSideGUID
trait GenericMessageEnvelope
extends AllGenericBusMsg {
def msg: EventMessage
}
@ -25,7 +22,7 @@ abstract class GenericEventService[OUT <: GenericResponseEnvelope](busName: Stri
extends Actor {
protected lazy val log: Logger = org.log4s.getLogger(getClass.getSimpleName)
protected val eventBus = new GenericEventBus[OUT]
protected val eventBus: GenericEventBus[OUT] = setupEventBus()
def BusName: String = busName
@ -48,20 +45,23 @@ abstract class GenericEventService[OUT <: GenericResponseEnvelope](busName: Stri
eventBus.unsubscribe(sender())
}
def receive: Receive =
commonJoinBehavior.orElse(commonLeaveBehavior)
.orElse {
case msg: GenericMessageEnvelope =>
handleMessage(msg)
def receive: Receive = commonJoinBehavior
.orElse(commonLeaveBehavior)
.orElse {
case msg: GenericMessageEnvelope =>
handleMessage(msg)
case msg => ()
log.warn(s"Unhandled message $msg from ${sender()}")
}
case msg => ()
log.warn(s"Unhandled message $msg from ${sender()}")
}
protected def handleMessage(msg: GenericMessageEnvelope): Unit = {
private def handleMessage(msg: GenericMessageEnvelope): Unit = {
eventBus.publish(compose(msg))
}
protected def setupEventBus(): GenericEventBus[OUT] = {
new GenericEventBus[OUT]
}
protected def compose(@unused msg: GenericMessageEnvelope): OUT
def formatChannelOnBusName(channel: String): String = GenericEventService.BusOnChannelFormat(busName)(channel)

View file

@ -3,6 +3,7 @@ package net.psforever.services.base
import akka.actor.{ActorContext, ActorRef}
import net.psforever.services.Service
import net.psforever.services.base.bus.{GenericEventBus, GenericEventBusResponseToSupport, GenericEventBusWithSupport}
import net.psforever.types.PlanetSideGUID
import scala.annotation.unused
@ -36,16 +37,6 @@ abstract class GenericEventServiceWithSupport[OUT <: GenericResponseEnvelope]
.map { supportService => (supportService.label, supportService.constructor(context)) }
.toMap[String, ActorRef]
private def supportReceive: Receive = {
case msg: GenericMessageToSupportEnvelopeOnly =>
forwardToSupport(msg)
case msg: GenericMessageToSupportEnvelope =>
forwardToSupport(msg)
handleMessage(msg)
}
override def receive: Receive = supportReceive.orElse(super.receive)
private def forwardToSupport(msg: GenericMessageToSupportEnvelope): Unit = {
supportServices
.get(msg.supportLabel)
@ -57,4 +48,21 @@ abstract class GenericEventServiceWithSupport[OUT <: GenericResponseEnvelope]
log.error(s"support service ${msg.supportLabel} was not found - check message routing or service params")
}
}
override protected def setupEventBus(): GenericEventBus[OUT] = {
new GenericEventBus[OUT] with GenericEventBusWithSupport[OUT] {
override def publish(event: OUT): Unit = publishingWithSupport(event)
override def forwardToExternalSupport(msg: GenericEventBusResponseToSupport): Unit = {
msg match {
case supportMessage: GenericMessageToSupportEnvelope => forwardToExternal(supportMessage)
case _ => ()
}
}
private def forwardToExternal(msg: GenericMessageToSupportEnvelope): Unit = {
forwardToSupport(msg)
}
}
}
}

View file

@ -1,15 +1,19 @@
// Copyright (c) 2017 PSForever
package net.psforever.services.base
package net.psforever.services.base.bus
import akka.event.{ActorEventBus, SubchannelClassification}
import akka.util.Subclassification
import net.psforever.types.PlanetSideGUID
trait GenericEventBusMsg {
trait AllGenericBusMsg {
def channel: String
def inner: Any
def filter: PlanetSideGUID
}
class GenericEventBus[A <: GenericEventBusMsg]
trait GenericEventBusResponse
extends AllGenericBusMsg
class GenericEventBus[A <: GenericEventBusResponse]
extends ActorEventBus with SubchannelClassification {
type Event = A
type Classifier = String
@ -26,4 +30,8 @@ class GenericEventBus[A <: GenericEventBusMsg]
protected def publish(event: Event, subscriber: Subscriber): Unit = {
subscriber ! event
}
def truePublish(event: Event): Unit = {
super[SubchannelClassification].publish(event)
}
}

View file

@ -0,0 +1,32 @@
// Copyright (c) 2026 PSForever
package net.psforever.services.base.bus
import scala.annotation.unused
trait GenericEventBusResponseToSupport
extends GenericEventBusResponse {
def supportLabel: String
def supportMessage: Any
}
trait GenericEventBusResponseToSupportOnly
extends GenericEventBusResponseToSupport {
def channel: String = ""
}
trait GenericEventBusWithSupport[T <: GenericEventBusResponse] {
bus: GenericEventBus[T] =>
def publishingWithSupport(event: T): Unit = {
event match {
case msg: GenericEventBusResponseToSupportOnly =>
forwardToExternalSupport(msg)
case msg: GenericEventBusResponseToSupport =>
forwardToExternalSupport(msg)
bus.truePublish(event)
case _ =>
bus.truePublish(event)
}
}
def forwardToExternalSupport(@unused msg: GenericEventBusResponseToSupport): Unit
}

View file

@ -1,21 +1,24 @@
// Copyright (c) 2026 PSForever
package net.psforever.services.base
package net.psforever.services.base.bus
import net.psforever.types.PlanetSideGUID
import scala.collection.concurrent.{Map => CMap}
import scala.jdk.CollectionConverters._
import java.util.concurrent.ConcurrentHashMap
import scala.annotation.unused
import scala.collection.concurrent.{Map => CMap}
import scala.jdk.CollectionConverters._
/*
Adapted from the rating limiting code in https://github.com/Pinapse/giant with permission
*/
trait GenericGuidEventBusMsg extends GenericEventBusMsg {
trait GenericGuidEventBusResponse
extends GenericEventBusResponse {
def guid: PlanetSideGUID
def inner: Any
}
class RateLimitScheduler[A <: GenericGuidEventBusMsg](eventBus: GenericGuidEventBus[A], interval: Long) extends Thread {
class RateLimitScheduler[A <: GenericGuidEventBusResponse](eventBus: GenericEventBus[A], interval: Long) extends Thread {
private var hasWork: Boolean = false
private var working: Boolean = false
private var timeOfLastFlush: Long = 0L
@ -50,7 +53,7 @@ class RateLimitScheduler[A <: GenericGuidEventBusMsg](eventBus: GenericGuidEvent
buffer.foreachEntry { (_, map) =>
map.foreachEntry { (_, map) =>
map.foreachEntry { (_, event) =>
eventBus.publishForce(event)
eventBus.truePublish(event)
}
map.clear()
}
@ -75,7 +78,7 @@ class RateLimitScheduler[A <: GenericGuidEventBusMsg](eventBus: GenericGuidEvent
}
}
abstract class GenericGuidEventBus[A <: GenericGuidEventBusMsg](rateLimit: Double)
abstract class GenericGuidEventBus[A <: GenericGuidEventBusResponse](rateLimit: Double)
extends GenericEventBus[A] {
private val rateLimitedDispatch = new RateLimitScheduler[A](
eventBus = this,
@ -87,16 +90,12 @@ abstract class GenericGuidEventBus[A <: GenericGuidEventBusMsg](rateLimit: Doubl
if (rateLimit > 0 && shouldRateLimit(event) && rateLimitedDispatch.isRunning) {
rateLimitedDispatch.push(event)
} else {
publishForce(event)
truePublish(event)
}
}
def shouldRateLimit(@unused event: Event): Boolean
def publishForce(event: Event): Unit = {
super.publish(event)
}
// override protected def publish(event: Event, subscriber: Subscriber): Unit = {
// val trimmedEventClassName =
// event.inner.getClass().getName().stripPrefix(event.inner.getClass.getPackageName() + ".").stripSuffix("$")

View file

@ -4,7 +4,9 @@ package net.psforever.services.hart
import akka.actor.{Actor, ActorRef, Cancellable}
import net.psforever.objects.Default
import net.psforever.objects.zones.Zone
import net.psforever.services.base.{EventResponse, GenericEventBus, GenericEventBusMsg}
import net.psforever.services.Service
import net.psforever.services.base.bus.{GenericEventBus, GenericEventBusResponse}
import net.psforever.services.base.EventResponse
import net.psforever.services.local.{LocalAction, LocalServiceMessage}
import net.psforever.types.{HartSequence, PlanetSideGUID}
@ -259,7 +261,10 @@ object HartTimer {
* to relay instructions back to the individual facility amenity portions of this HART system.
* The channel is blank because it does not need special designation.
*/
trait Command extends EventResponse with GenericEventBusMsg { def channel: String = "" }
trait Command extends EventResponse with GenericEventBusResponse {
def channel: String = ""
def filter: PlanetSideGUID = Service.defaultPlayerGUID
}
/**
* Forbid entry through the boartding gantry doors.
*/

View file

@ -5,11 +5,14 @@ import akka.actor.ActorRef
import net.psforever.objects.avatar.Certification
import net.psforever.objects.teamwork.Squad
import net.psforever.packet.game.{SquadDetail, SquadInfo, WaypointEventAction, WaypointInfo}
import net.psforever.services.Service
import net.psforever.types.{ChatMessageType, PlanetSideGUID, SquadResponseType, SquadWaypoint}
import net.psforever.services.base.{EventResponse, GenericEventBusMsg}
import net.psforever.services.base.{EventResponse, GenericResponseEnvelope}
final case class SquadServiceResponse(channel: String, exclude: Iterable[Long], response: SquadResponse.Response)
extends EventResponse with GenericEventBusMsg
final case class SquadServiceResponse(channel: String, exclude: Iterable[Long], reply: SquadResponse.Response)
extends EventResponse with GenericResponseEnvelope {
override def filter: PlanetSideGUID = Service.defaultPlayerGUID
}
object SquadServiceResponse {
def apply(toChannel: String, response: SquadResponse.Response): SquadServiceResponse =
@ -20,7 +23,7 @@ object SquadServiceResponse {
}
object SquadResponse {
sealed trait Response
sealed trait Response extends EventResponse
final case class ListSquadFavorite(line: Int, task: String) extends Response

View file

@ -6,7 +6,7 @@ import akka.actor.ActorRef
import scala.collection.mutable
import net.psforever.objects.teamwork.{Squad, SquadFeatures}
import net.psforever.packet.game.SquadDetail
import net.psforever.services.base.GenericEventBus
import net.psforever.services.base.bus.GenericEventBus
import net.psforever.types.{PlanetSideEmpire, PlanetSideGUID}
class SquadSubscriptionEntity {

View file

@ -6,10 +6,10 @@ import net.psforever.objects.serverobject.pad.VehicleSpawnPad
import net.psforever.objects.zones.Zone
import net.psforever.packet.game.ObjectCreateMessage
import net.psforever.packet.game.objectcreate.ObjectCreateMessageParent
import net.psforever.services.base.GenericEventBus
import net.psforever.services.vehicle.support.TurretUpgrader
import net.psforever.types.DriveState
import net.psforever.services.Service
import net.psforever.services.base.bus.GenericEventBus
class VehicleService(zone: Zone) extends Actor {
private val turretUpgrade: ActorRef = context.actorOf(Props[TurretUpgrader](), s"${zone.id}-turret-upgrade-agent")

View file

@ -11,13 +11,14 @@ import net.psforever.packet.PlanetSideGamePacket
import net.psforever.packet.game.objectcreate.ConstructorData
import net.psforever.packet.game.ObjectCreateMessage
import net.psforever.types.{BailType, DriveState, PlanetSideGUID, Vector3}
import net.psforever.services.base.{EventResponse, GenericEventBusMsg}
import net.psforever.services.base.EventResponse
import net.psforever.services.base.bus.GenericEventBusResponse
final case class VehicleServiceResponse(
channel: String,
avatar_guid: PlanetSideGUID,
replyMessage: VehicleResponse.Response
) extends GenericEventBusMsg
channel: String,
filter: PlanetSideGUID,
reply: VehicleResponse.Response
) extends GenericEventBusResponse
object VehicleResponse {
trait Response extends EventResponse