mostly, comments for the event system files; the event system stamp now determines how the out channel forms; some case class entities have become case object entities

This commit is contained in:
Fate-JH 2026-03-02 20:35:08 -05:00
parent de84e31547
commit 40d5721914
29 changed files with 331 additions and 116 deletions

View file

@ -46,7 +46,7 @@ class AvatarService3Test extends ActorTest {
"subscribe to a specific channel" in {
val service = system.actorOf(Props(classOf[AvatarService]), AvatarServiceTest.TestName)
service ! Service.Join("test")
service ! Service.Leave()
service ! Service.LeaveAll
assert(true)
}
}
@ -58,7 +58,7 @@ class AvatarService4Test extends ActorTest {
ServiceManager.boot(system)
val service = system.actorOf(Props(classOf[AvatarService]), AvatarServiceTest.TestName)
service ! Service.Join("test")
service ! Service.LeaveAll()
service ! Service.LeaveAll
assert(true)
}
}

View file

@ -332,7 +332,7 @@ class ChatLogic(val ops: ChatOperations, implicit val context: ActorContext) ext
val channel = player.Name
val events = continent.AvatarEvents
seeSpectatorsIn = None
events ! Service.Leave(Some("spectator"))
events ! Service.Leave("spectator")
continent
.AllPlayers
.filter(_.spectator)

View file

@ -250,9 +250,9 @@ class LocalHandlerLogic(val ops: SessionLocalHandlers, implicit val context: Act
//todo we might be able to piggyback this for squad recalls later
if(session.zone eq zone) {
sessionLogic.zoning.zoneReload = true
zone.AvatarEvents ! Service.Leave()
zone.LocalEvents ! Service.Leave()
zone.VehicleEvents ! Service.Leave()
zone.AvatarEvents ! Service.LeaveAll
zone.LocalEvents ! Service.LeaveAll
zone.VehicleEvents ! Service.LeaveAll
zone.AvatarEvents ! Service.Join(player.Name) //must manually restore this subscriptions
sessionLogic.zoning.spawn.handleNewPlayerLoaded(player) //will restart subscriptions and dispatch a LoadMapMessage
} else {

View file

@ -232,7 +232,7 @@ class VehicleHandlerLogic(val ops: SessionVehicleHandlers, implicit val context:
case VehicleAction.TransferPassengerChannel(oldChannel, tempChannel, vehicle, vehicleToDelete) if isNotSameTarget =>
sessionLogic.zoning.interstellarFerry = Some(vehicle)
sessionLogic.zoning.interstellarFerryTopLevelGUID = Some(vehicleToDelete)
continent.VehicleEvents ! Service.Leave(Some(oldChannel)) //old vehicle-specific channel (was s"${vehicle.Actor}")
continent.VehicleEvents ! Service.Leave(oldChannel) //old vehicle-specific channel (was s"${vehicle.Actor}")
galaxyService ! Service.Join(tempChannel) //temporary vehicle-specific channel
log.debug(s"TransferPassengerChannel: ${player.Name} now subscribed to $tempChannel for vehicle gating")

View file

@ -743,7 +743,7 @@ class GeneralOperations(
* @param channel the channel name
*/
private def unaccessContainerChannel(events: ActorRef, channel: String): Unit = {
events ! Service.Leave(Some(channel))
events ! Service.Leave(channel)
}
/**

View file

@ -616,12 +616,12 @@ class SessionData(
squadResponseOpt.foreach(_.stop())
zoningOpt.foreach(_.stop())
chatOpt.foreach(_.stop())
continent.AvatarEvents ! Service.Leave()
continent.LocalEvents ! Service.Leave()
continent.VehicleEvents ! Service.Leave()
galaxyService ! Service.Leave()
continent.AvatarEvents ! Service.LeaveAll
continent.LocalEvents ! Service.LeaveAll
continent.VehicleEvents ! Service.LeaveAll
galaxyService ! Service.LeaveAll
if (avatar != null && squadService != Default.Actor) {
squadService ! Service.Leave()
squadService ! Service.LeaveAll
}
}
}

View file

@ -620,9 +620,9 @@ class ZoningOperations(
//the only zone-level event system subscription necessary before BeginZoningMessage (for persistence purposes)
zone.AvatarEvents ! Service.Join(player.Name)
sessionLogic.persist()
oldZone.AvatarEvents ! Service.Leave()
oldZone.LocalEvents ! Service.Leave()
oldZone.VehicleEvents ! Service.Leave()
oldZone.AvatarEvents ! Service.LeaveAll
oldZone.LocalEvents ! Service.LeaveAll
oldZone.VehicleEvents ! Service.LeaveAll
if (player.isAlive && zoningType != Zoning.Method.Reset) {
if (player.HasGUID) {
@ -651,9 +651,9 @@ class ZoningOperations(
val oldZone = session.zone
session = session.copy(zone = foundZone)
sessionLogic.persist()
oldZone.AvatarEvents ! Service.Leave()
oldZone.LocalEvents ! Service.Leave()
oldZone.VehicleEvents ! Service.Leave()
oldZone.AvatarEvents ! Service.LeaveAll
oldZone.LocalEvents ! Service.LeaveAll
oldZone.VehicleEvents ! Service.LeaveAll
//the only zone-level event system subscription necessary before BeginZoningMessage (for persistence purposes)
foundZone.AvatarEvents ! Service.Join(player.Name)
foundZone.Population ! Zone.Population.Join(avatar)
@ -762,7 +762,7 @@ class ZoningOperations(
case _ =>
interstellarFerry match {
case None =>
galaxyService ! Service.Leave(Some(temp_channel)) //no longer being transferred between zones
galaxyService ! Service.Leave(temp_channel) //no longer being transferred between zones
interstellarFerryTopLevelGUID = None
case Some(_) => () //wait patiently
}
@ -770,7 +770,7 @@ class ZoningOperations(
}
private def handleTransferPassengerVehicle(vehicle: Vehicle, temporaryChannel: String): Unit = {
galaxyService ! Service.Leave(Some(temporaryChannel)) //temporary vehicle-specific channel (see above)
galaxyService ! Service.Leave(temporaryChannel) //temporary vehicle-specific channel (see above)
spawn.deadState = DeadState.Release
sendResponse(AvatarDeadStateMessage(DeadState.Release, 0, 0, player.Position, player.Faction, unk5=true))
interstellarFerry = Some(vehicle) //on the other continent and registered to that continent's GUID system

View file

@ -59,7 +59,7 @@ class PainboxControl(painbox: Painbox) extends PoweredAmenityControl {
}
var commonBehavior: Receive = {
case Service.Startup() =>
case Service.Startup =>
if (!disabled && domain.midpoint == Vector3.Zero) {
initialStartup()
}

View file

@ -40,7 +40,7 @@ class ResourceSiloControl(resourceSilo: ResourceSilo)
private var drainMultiplier: Float = 1.0f
def receive: Receive = {
case Service.Startup() =>
case Service.Startup =>
resourceSilo.Owner match {
case building: Building =>
UpdateChargeLevel(amount = 0)

View file

@ -106,7 +106,7 @@ class OrbitalShuttlePadControl(pad: OrbitalShuttlePad) extends Actor {
* register and add the shuttle as a common vehicle of the said zone
*/
val startUp: Receive = {
case Service.Startup() =>
case Service.Startup =>
import net.psforever.types.Vector3
import net.psforever.types.Vector3.DistanceSquared
import net.psforever.objects.GlobalDefinitions._

View file

@ -83,7 +83,7 @@ object ProximityTerminal {
if (obj.Actor == Default.Actor) {
obj.Actor =
context.actorOf(Props(classOf[ProximityTerminalControl], obj), PlanetSideServerObject.UniqueActorName(obj))
obj.Actor ! Service.Startup()
obj.Actor ! Service.Startup
}
}
}

View file

@ -1718,14 +1718,14 @@ object Zone {
.flatMap(_.Amenities.filter(_.Definition == GlobalDefinitions.resource_silo))
.collect {
case silo: ResourceSilo =>
silo.Actor ! Service.Startup()
silo.Actor ! Service.Startup
}
//some painfields need to look for their closest door
buildings.values
.flatMap(_.Amenities.filter(_.Definition.isInstanceOf[PainboxDefinition]))
.collect {
case painbox: Painbox =>
painbox.Actor ! Service.Startup()
painbox.Actor ! Service.Startup
}
//the orbital_buildings in sanctuary zones have to establish their shuttle routes
map.shuttleBays
@ -1733,7 +1733,7 @@ object Zone {
guid(_)
}
.collect { case Some(obj: OrbitalShuttlePad) =>
obj.Actor ! Service.Startup()
obj.Actor ! Service.Startup
}
//allocate soi information
zone.soi ! SOI.Build()

View file

@ -7,7 +7,7 @@ import net.psforever.types.PlanetSideGUID
object Service {
final val defaultPlayerGUID: PlanetSideGUID = PlanetSideGUID(0)
final case class Startup()
case object Startup
final case class Join(channel: String, sendJoinConfirmation: Boolean)
@ -17,7 +17,7 @@ object Service {
final case class JoinConfirmation(eventSystem: ActorRef, channel: String)
final case class Leave(channel: Option[String] = None)
final case class Leave(channel: String)
final case class LeaveAll()
case object LeaveAll
}

View file

@ -453,7 +453,7 @@ class PersistenceMonitor(
*/
def AvatarLogout(avatar: Avatar): Unit = {
LivePlayerList.Remove(avatar.id)
squadService.tell(Service.Leave(Some(avatar.id.toString)), context.parent)
squadService.tell(Service.Leave(avatar.id.toString), context.parent)
galaxyService.tell(GalaxyServiceMessage(GalaxyAction.LogStatusChange(avatar.name)), context.parent)
Deployables.Disown(inZone, avatar, context.parent)
inZone.Population.tell(Zone.Population.Leave(avatar), context.parent)

View file

@ -4,42 +4,78 @@ package net.psforever.services.base
import akka.actor.Actor
import net.psforever.services.Service
import net.psforever.services.base.bus.GenericEventBus
import net.psforever.services.base.envelope.{GenericMessageEnvelope, GenericResponseEnvelope}
import net.psforever.services.base.envelope.GenericMessageEnvelope
import org.log4s.Logger
trait EventSystemStamp
/**
* A distinct tag associated with an event system.
* The stamp is intended to demonstrate that the input message has been interpreted into an output response
* through actual use of an event system
* and that the response has not been fabricated and is not fraudulent.
* While the word "stamp" more probably calls to mind the concept of a postage stamp,
* the purpose of this artifact is closer to that of the routing information
* stamped onto an envelope over the postage stamp area of a letter.
*/
trait EventSystemStamp {
/*
Example Classifiers: "foo", "foo.fizz", and "foo.buzz"
In general, Classifier channels will perform left-pattern matching.
"foo" will publish to "foo", "foo.fizz", and "foo.buzz"
To isolate "foo", one must distinguish it with a right-pattern.
"foo" is appended as "foo.bar" and no longer publishes to "foo.fizz.bar" or to "foo.buzz.bar"
*/
/**
* Take an input channel and produce the publishing output channel.
* @param channel publishing channel
* @return appended publishing channel
*/
def routing(channel: String): String = s"/$channel/out"
}
/**
* Basic opt-in event response relay system.
* Remembers "subscribers" (`ActorRef`) to "channels" (`String`);
* accepts "messages" (`GenericMessageEnvelope`) and interprets the message as a response (`GenericResponseEnvelope`);
* and, dispatches the response to all subscribers associated with the channel provided in the message.
* @param stamp distinct tag associated with an event system
*/
abstract class GenericEventService(stamp: EventSystemStamp)
extends Actor {
protected lazy val log: Logger = org.log4s.getLogger(getClass.getSimpleName)
protected val eventBus: GenericEventBus = new GenericEventBus
/**
* Add subscription handling.
*/
private def commonJoinBehavior: Receive = {
case Service.Join(channel, true) =>
val path = formatChannel(channel)
val path = stamp.routing(channel)
val who = sender()
eventBus.subscribe(who, path)
who ! Service.JoinConfirmation(self, channel)
case Service.Join(channel, _) =>
val path = formatChannel(channel)
val path = stamp.routing(channel)
val who = sender()
eventBus.subscribe(who, path)
}
/**
* Remove subscription handling.
*/
private def commonLeaveBehavior: Receive = {
case Service.Leave(None) =>
case Service.LeaveAll =>
eventBus.unsubscribe(sender())
case Service.Leave(Some(channel)) =>
val path = formatChannel(channel)
case Service.Leave(channel) =>
val path = stamp.routing(channel)
eventBus.unsubscribe(sender(), path)
case Service.LeaveAll() =>
eventBus.unsubscribe(sender())
}
/**
* Accept and handle designated messages.
*/
protected def commonBehavior: Receive = {
case msg: GenericMessageEnvelope =>
handleMessage(msg)
@ -53,13 +89,12 @@ abstract class GenericEventService(stamp: EventSystemStamp)
log.warn(s"Unhandled message $msg from ${sender()}")
}
protected def handleMessage(msg: GenericMessageEnvelope): Unit = {
eventBus.publish(composeResponseEnvelope(msg))
/**
* Handle designated messages.
* Interpret the input message as an output response and publish that response.
* @param event event system message
*/
protected def handleMessage(event: GenericMessageEnvelope): Unit = {
eventBus.publish(event.response(stamp))
}
protected def composeResponseEnvelope(msg: GenericMessageEnvelope): GenericResponseEnvelope = {
msg.response(stamp, formatChannel)
}
protected def formatChannel(channel: String): String = s"/$channel"
}

View file

@ -18,21 +18,36 @@ import scala.concurrent.duration.DurationInt
Adapted from the rating limiting code in PSForever fork https://github.com/Pinapse/giant with permission
*/
trait CachedGenericEventMessageEnvelope
/**
* A framework for flagged messages to be cached
* based on designation of a target globally unique identifier.
* Suggestion: this identifier be associated with the source.
*/
trait CachedGenericEventEnvelope
extends MessageTransformationBehavior {
/** cache designator */
def guid: PlanetSideGUID
}
final case class CachedEnvelope(
guid: PlanetSideGUID,
originalChannel: String,
override val filter: PlanetSideGUID,
override val msg: EventMessage
) extends CachedGenericEventMessageEnvelope {
filter: PlanetSideGUID,
msg: EventMessage
) extends CachedGenericEventEnvelope {
assert(guid != Service.defaultPlayerGUID, "can not cache message under default GUID")
}
object CachedEnvelope {
/**
* If the filter is specified, but not the cache target, treat the filter like the cache target.
* Treat invalid values as a reason to downgrade from a cached message envelope to a traditional message envelope.
* Warn of this downgrade.e
* @param channel set of subscribers on an event system bus the envelope should reach
* @param filter specific subscriber endpoint to be excluded (the subscriber should filter themselves)
* @param msg input payload transported by this envelope
* @return either a `CacheEnvelope` or a `MessageEnvelope`, depending on cache-readiness of the `filter` value
*/
def apply(channel: String, filter: PlanetSideGUID, msg: EventMessage): GenericMessageEnvelope = {
if (filter == Service.defaultPlayerGUID) {
org.log4s.getLogger("CachedEnvelope").warn("(1) cached message envelope downgraded to normal message envelope")
@ -42,20 +57,37 @@ object CachedEnvelope {
}
}
/**
* If the cache target is specified, but is not valid,
* downgrade from a cached message envelope to a traditional message envelope.
* Warn of this downgrade.
* If valid, the filter becomes its defaulted value in the corresponding message
* @param guid cache designator
* @param channel set of subscribers on an event system bus the envelope should reach
* @param msg input payload transported by this envelope
* @return either a `CacheEnvelope` or a `MessageEnvelope`, depending on cache-readiness of the target
*/
def apply(guid: PlanetSideGUID, channel: String, msg: EventMessage): GenericMessageEnvelope = {
if (guid == Service.defaultPlayerGUID) {
org.log4s.getLogger("CachedEnvelope").warn("(2) cached message envelope downgraded to normal message envelope")
MessageEnvelope(channel, guid, msg)
} else {
CachedEnvelope(guid, channel, guid, msg)
CachedEnvelope(guid, channel, Service.defaultPlayerGUID, msg)
}
}
}
/**
* An internal message for the purpose of forcing cached messages to be flushed.
* All of it's fields default to harmless values because it is not intended to be processed by an event system
* but it must maintain the trappings of a message envelope to be processed.
* @see `NoMessage`
* @see `NoResponseEnvelope`
*/
private case object FlushCachedMessages extends GenericMessageEnvelope {
def originalChannel: String = ""
def msg: EventMessage = NoMessage
def response(stamp: EventSystemStamp, sendToChannel: String => String): GenericResponseEnvelope = NoResponseEnvelope
def response(stamp: EventSystemStamp): GenericResponseEnvelope = NoResponseEnvelope
def channel: String = ""
def filter: PlanetSideGUID = Service.defaultPlayerGUID
}
@ -65,7 +97,7 @@ abstract class GenericEventServiceWithCacheAndSupport
stamp: EventSystemStamp,
eventSupportServices: List[EventServiceSupport]
) extends GenericEventServiceWithSupport(stamp, eventSupportServices) {
private val flushCacheWait: Long = 50 //milliseconds
private val flushCacheWait: Long = 50L //milliseconds
private var hasCachedMessages: Boolean = false
private var nextTimeToFlushCache: Long = 0L
private var emergencyFlush: Cancellable = Default.Cancellable
@ -78,6 +110,12 @@ abstract class GenericEventServiceWithCacheAndSupport
super.postStop()
}
/**
* If there were previously no messages in the cache,
* prepare to flush the cache after the intended interval passes,
* whether the passing of that interval is detected by a new incoming message and the cache gets flushed naturally,
* or if a safety timer expires and the cache is flushed in precaution.
*/
private def tryRetimeFlushCache(): Unit = {
if (!hasCachedMessages) {
hasCachedMessages = true
@ -86,7 +124,14 @@ abstract class GenericEventServiceWithCacheAndSupport
}
}
private def pushToCache(event: CachedGenericEventMessageEnvelope): Unit = {
/**
* Add messages to the cache based on their channel, then their type, then their cache target identifier.
* Messages that arrive with the same cache profile as a previous message,
* but before that previous message was dispatched,
* will overwrite the previous message without fanfare or warning.
* @param event event system message
*/
private def pushToCache(event: CachedGenericEventEnvelope): Unit = {
val eventClassName = event.msg.getClass.getName
val updateBranch = cache
.getOrElseUpdate(event.channel, new ConcurrentHashMap[String, CMap[PlanetSideGUID, GenericMessageEnvelope]]().asScala)
@ -95,12 +140,22 @@ abstract class GenericEventServiceWithCacheAndSupport
tryRetimeFlushCache()
}
private def tryFlushCache(): Unit = {
if (hasCachedMessages && nextTimeToFlushCache < System.currentTimeMillis()) {
/**
* If the cache has messages and the current time exceeds the anticipated flush time,
* flush the cache messages to the normal event system bus.
*/
private def tryFlushCache(): Boolean = {
val willFLush = hasCachedMessages && nextTimeToFlushCache < System.currentTimeMillis()
if (willFLush) {
flushCache()
}
willFLush
}
/**
* Flush the cache messages to the normal event system bus.
* Clear old messages then reset all flags that would force the messages to be flushed.
*/
private def flushCache(): Unit = {
cache.foreachEntry { (_, map) =>
map.foreachEntry { (_, map) =>
@ -115,15 +170,24 @@ abstract class GenericEventServiceWithCacheAndSupport
emergencyFlush = Default.Cancellable
}
/**
* If the cache will be flushed after this message, flush the cache and then pass the message for processing.
* If the cache will not be flushed, add the message to the cache.
* In any case, procedure should always be ready to flush the cache when a message is received.
* If there is a support actor involved with a cached message, it is resolved when the message is flushed.
* @param event event system message that may be cached
*/
override protected def handleMessage(event: GenericMessageEnvelope): Unit = {
event match {
case envelope: CachedGenericEventMessageEnvelope =>
pushToCache(envelope)
case FlushCachedMessages =>
tryFlushCache()
flushCache()
case _: CachedGenericEventEnvelope if tryFlushCache() =>
super.handleMessage(event)
case envelope: CachedGenericEventEnvelope =>
pushToCache(envelope)
case _ =>
tryFlushCache()
super.handleMessage(event)
}
tryFlushCache()
}
}

View file

@ -9,8 +9,18 @@ import net.psforever.types.PlanetSideGUID
import scala.annotation.unused
/**
* A message when there should be none, but a message is required to be defined anyway.
* @see `GenericSupportEnvelopeOnly`
*/
case object NoMessage extends SelfRespondingEvent
/**
* A response when there should be none, but a response is required to be defined anyway.
* @see `GenericSupportEnvelopeOnly`
* @see `NoReply`
* @see `Undelivered`
*/
case object NoResponseEnvelope extends GenericResponseEnvelope {
def reply: EventResponse = NoReply
def stamp: EventSystemStamp = Undelivered
@ -18,20 +28,39 @@ case object NoResponseEnvelope extends GenericResponseEnvelope {
def filter: PlanetSideGUID = Service.defaultPlayerGUID
}
/**
* A framework for how support actors are to be submitted to an event system.
* The bare bones are a label by which the support actor is identified for message routing,
* and a function that constructs the support actor within the context of the event system.
* @see `ActorContext`
*/
trait EventServiceSupport {
def label: String
def constructor(@unused context: ActorContext): ActorRef
}
/**
* A framework for communicating messages to support actors within an event system.
* The bare bones are a label by which the support actor is identified for message routing,
* and the message payload for the support actor to process.
* @see `ActorContext`
*/
trait GenericMessageToSupport {
def supportLabel: String
def supportMessage: Any
}
/**
* An envelope framework for communicating messages to support actors within an event system
* and also interacting with the event system directly.
*/
trait GenericSupportEnvelope
extends GenericMessageToSupport
with MessageTransformationBehavior
/**
* An envelope framework for communicating messages to support actors within an event system only.
*/
trait GenericSupportEnvelopeOnly
extends GenericMessageToSupport
with GenericMessageEnvelope {
@ -40,9 +69,17 @@ trait GenericSupportEnvelopeOnly
def filter: PlanetSideGUID = Service.defaultPlayerGUID
def msg: EventMessage = NoMessage
def response(@unused stamp: EventSystemStamp, @unused sendToChannel: String => String): GenericResponseEnvelope = NoResponseEnvelope
def response(@unused stamp: EventSystemStamp): GenericResponseEnvelope = NoResponseEnvelope
}
/**
* Advanced opt-in event response relay system.
* Includes a system of specialized child actors that serve specific repeatable operations
* whose behaviors are intentionally synchronized on this event pipeline
* or whose collective operational overhead can be streamlined by reliance on a singular pipeline.
* @param stamp distinct tag associated with an event system
* @param eventSupportServices list of support actors to initialize
*/
abstract class GenericEventServiceWithSupport
(
stamp: EventSystemStamp,
@ -54,6 +91,12 @@ abstract class GenericEventServiceWithSupport
.map { supportService => (supportService.label, supportService.constructor(context)) }
.toMap[String, ActorRef]
/**
* Use the label assigned to a support actor
* to locate that support actor on this event system
* and forward to it a payload message.
* @param msg event system message carrying an additional message for a support actor
*/
private def forwardToSupport(msg: GenericMessageToSupport): Unit = {
supportServices
.get(msg.supportLabel)
@ -66,15 +109,20 @@ abstract class GenericEventServiceWithSupport
}
}
/**
* If the message involves the support actor subsystem, forward the message to it for additional processing.
* Afterwards, or if not, merely call up to previously-established message handling if warranted.
* @param event event system message that may be carrying an additional message for a support actor
*/
override protected def handleMessage(event: GenericMessageEnvelope): Unit = {
event match {
case msg: GenericSupportEnvelopeOnly =>
forwardToSupport(msg)
case msg: GenericSupportEnvelope =>
forwardToSupport(msg)
eventBus.publish(composeResponseEnvelope(event))
super.handleMessage(event)
case event =>
eventBus.publish(composeResponseEnvelope(event))
super.handleMessage(event)
}
}
}

View file

@ -5,6 +5,16 @@ import akka.event.{ActorEventBus, SubchannelClassification}
import akka.util.Subclassification
import net.psforever.services.base.envelope.GenericResponseEnvelope
/**
* Maintain a list of endpoints that have subscribed to a specific perspective
* internally called a `Classifier` and externally referred to as a "channel".
* When an `Event` - externally referred colloquially as a "message" - is received,
* to be published,
* match the "channel" to a list of known `Classifier` endpoints.
* Each of these `Subscribers` should receive the message.
* @see `GenericResponseEnvelope`
* @see `ActorEventBus.Subscriber`
*/
class GenericEventBus
extends ActorEventBus with SubchannelClassification {
type Event = GenericResponseEnvelope
@ -12,6 +22,11 @@ class GenericEventBus
protected def classify(event: Event): Classifier = event.channel
/*
Example Classifiers: "foo", "foo.fizz", and "foo.buzz"
In general, Classifier channels will perform left-pattern matching.
"foo" will publish to "foo", "foo.fizz", and "foo.buzz"
*/
protected def subclassification: Subclassification[String] =
new Subclassification[Classifier] {
def isEqual(x: Classifier, y: Classifier): Boolean = x == y
@ -19,15 +34,11 @@ class GenericEventBus
def isSubclass(x: Classifier, y: Classifier): Boolean = x.startsWith(y)
}
override def publish(event: Event): Unit = {
super[SubchannelClassification].publish(event)
}
protected def publish(event: Event, subscriber: Subscriber): Unit = {
subscriber ! event
}
override def publish(event: Event): Unit = {
truePublish(event)
}
def truePublish(event: Event): Unit = {
super[SubchannelClassification].publish(event)
}
}

View file

@ -3,7 +3,13 @@ package net.psforever.services.base.envelope
import net.psforever.types.PlanetSideGUID
/**
* Base of all envelope classes.
* Defines a channel and a filter, both of which server the purpose of routing the message to its destination.
*/
trait AllEnvelopes {
/** set of subscribers on an event system bus the envelope should reach */
def channel: String
/** specific subscriber endpoint to be excluded (the subscriber should filter themselves) */
def filter: PlanetSideGUID
}

View file

@ -7,12 +7,23 @@ import net.psforever.types.PlanetSideGUID
trait GenericMessageEnvelope
extends AllEnvelopes {
/** channel information provided with the message */
def originalChannel: String
/** input payload transported by this envelope */
def msg: EventMessage
def response(stamp: EventSystemStamp, sendToChannel: String => String): GenericResponseEnvelope
/** method that counts as "processing" the envelope by an event system;
* the event system supplies their stamp and converts the message envelope into a response envelope;
* the input message is converted into a response message */
def response(stamp: EventSystemStamp): GenericResponseEnvelope
}
object GenericMessageEnvelope {
/**
* The `unapply`ed data from a message envelope resembles the data from includes the filter and the channel information.
* The original channel information.
* @param obj response envelope
* @return a tuple containing the channel, filter, and reply message
*/
def unapply(obj: GenericMessageEnvelope): Option[(String, PlanetSideGUID, EventMessage)] = {
Some((obj.originalChannel, obj.filter, obj.msg))
}

View file

@ -5,18 +5,37 @@ import net.psforever.services.base.EventSystemStamp
import net.psforever.services.base.message.{EventMessage, EventResponse}
import net.psforever.types.PlanetSideGUID
/**
* The framework of an event system envelope that is treated as the processed complement of a message envelope.
*/
trait GenericResponseEnvelope
extends AllEnvelopes {
/** result of converting the message envelope input payload into output payload */
def reply: EventResponse
/** marker indicating the routing through which the original message was processed */
def stamp: EventSystemStamp
}
object GenericResponseEnvelope {
def apply(stamp: EventSystemStamp, channel: String, filter: PlanetSideGUID, msg: EventMessage): GenericResponseEnvelope ={
/**
* Fake a response envelope, as if processed by a specific event system.
* @param stamp marker indicating the routing through which the original message was processed
* @param channel set of subscribers on an event system bus the envelope should reach
* @param filter a specific subscriber endpoint to be excluded
* @param msg input payload transported by this envelope
* @return a faked but typically acceptable response envelope
*/
def apply(stamp: EventSystemStamp, channel: String, filter: PlanetSideGUID, msg: EventMessage): GenericResponseEnvelope = {
val envelope = MessageEnvelope(channel, filter, msg)
envelope.response(stamp, s => s)
envelope.response(stamp)
}
def unapply(obj: GenericResponseEnvelope): Option[(String, PlanetSideGUID, EventResponse)] =
/**
* The `unapply`ed data from a response envelope resembles the data from includes the filter and the channel information.
* @param obj response envelope
* @return a tuple containing the channel, filter, and reply message
*/
def unapply(obj: GenericResponseEnvelope): Option[(String, PlanetSideGUID, EventResponse)] = {
Some((obj.channel, obj.filter, obj.reply))
}
}

View file

@ -5,10 +5,29 @@ import net.psforever.services.base.EventSystemStamp
import net.psforever.services.base.message.{EventMessage, EventResponse}
import net.psforever.types.PlanetSideGUID
/**
* A response when there should be none, but a reply is required to be defined anyway.
*/
case object NoReply extends EventResponse
/**
* A stamp that represents not having been processed by an event system.
* Should never been given out to an event system.
*/
case object Undelivered extends EventSystemStamp
/**
* The mechanics of a proper event system envelope.
* The envelope has two forms: the input message envelope and the output response envelope.
* The event system uses its stamp to mark that the message form converts into the response form when being processed.
* In terms of proper metaphor,
* it makes no sense for a post office take a letter out of its original envelope,
* write a completely different envelope,
* write a completely different letter,
* package the new letter in the new envelope,
* and deliver it is place of the originals.
* That would be considered fraud.
*/
trait MessageTransformationBehavior
extends GenericMessageEnvelope
with GenericResponseEnvelope {
@ -16,22 +35,24 @@ trait MessageTransformationBehavior
private var outputChannel: String = originalChannel
private var outputReply: EventResponse = NoReply
// satisfies GenericMessageEnvelope2 (and GenericResponseEnvelope2)
// satisfies GenericMessageEnvelope (and GenericResponseEnvelope)
def channel: String = outputChannel
// satisfies GenericMessageEnvelope2
def response(stamp: EventSystemStamp, sendToChannel: String => String): GenericResponseEnvelope = {
// satisfies GenericMessageEnvelope
def response(stamp: EventSystemStamp): GenericResponseEnvelope = {
outputStamp = stamp
outputChannel = sendToChannel(originalChannel)
outputChannel = stamp.routing(originalChannel)
outputReply = msg.response()
this
}
// satisfies GenericResponseEnvelope2
// satisfies GenericResponseEnvelope
def stamp: EventSystemStamp = outputStamp
def reply: EventResponse = outputReply
}
/**
* A proper event system envelope.
*/
case class MessageEnvelope(originalChannel: String, filter: PlanetSideGUID, msg: EventMessage)
extends MessageTransformationBehavior
extends MessageTransformationBehavior

View file

@ -3,15 +3,15 @@ package net.psforever.services.galaxy
import net.psforever.services.base.{EventSystemStamp, GenericEventService}
case object GalaxyStamp extends EventSystemStamp
class GalaxyService
extends GenericEventService(stamp = GalaxyStamp) {
override protected def formatChannel(channel: String): String = {
case object GalaxyStamp extends EventSystemStamp {
override def routing(channel: String): String = {
if (channel.trim.isEmpty) {
"/all"
"/out"
} else {
s"/$channel"
s"/$channel/out"
}
}
}
class GalaxyService
extends GenericEventService(stamp = GalaxyStamp)

View file

@ -176,20 +176,20 @@ class SquadService extends Actor {
def receive: Receive = {
//subscribe to a faction's channel - necessary to receive updates about listed squads
case Service.Join(faction) if SquadService.FactionWordSalad.indexOf(faction) > -1 =>
case Service.Join(faction, _) if SquadService.FactionWordSalad.indexOf(faction) > -1 =>
JoinByFaction(faction, sender())
//subscribe to the player's personal channel - necessary for future and previous squad information
case Service.Join(char_id) =>
case Service.Join(char_id, _) =>
JoinByCharacterId(char_id, sender())
case Service.Leave(Some(faction)) if SquadService.FactionWordSalad.indexOf(faction) > -1 =>
case Service.Leave(faction) if SquadService.FactionWordSalad.indexOf(faction) > -1 =>
LeaveByFaction(faction, sender())
case Service.Leave(Some(char_id)) =>
case Service.Leave(char_id) =>
LeaveByCharacterId(char_id, sender())
case Service.Leave(None) | Service.LeaveAll() =>
case Service.LeaveAll =>
LeaveInGeneral(sender())
case Terminated(actorRef) =>

View file

@ -82,7 +82,7 @@ class OrbitalShuttlePadControlTest extends FreedContextActorTest {
"startup and create the shuttle" in {
assert(building.Amenities.size == 9)
assert(vehicles.isEmpty)
pad.Actor ! Service.Startup()
pad.Actor ! Service.Startup
expectNoMessage(max = 5 seconds)
assert(building.Amenities.size == 10)
assert(vehicles.size == 1)

View file

@ -91,7 +91,7 @@ class ResourceSiloControlStartupTest extends ActorTest {
"Resource silo" should {
"startup properly" in {
obj.Actor ! Service.Startup()
obj.Actor ! Service.Startup
expectNoMessage(max = 1000 milliseconds)
}
}
@ -113,7 +113,7 @@ class ResourceSiloControlStartupMessageNoneTest extends ActorTest {
"report if it has no NTU on startup" in {
obj.NtuCapacitor = 0
assert(obj.NtuCapacitor == 0)
obj.Actor ! Service.Startup()
obj.Actor ! Service.Startup
val ownerMsg = buildingEvents.receiveOne(200 milliseconds)
assert(ownerMsg match {
case BuildingActor.NtuDepleted() => true
@ -139,7 +139,7 @@ class ResourceSiloControlStartupMessageSomeTest extends ActorTest {
"report if it has any NTU on startup" in {
obj.NtuCapacitor = 1
assert(obj.NtuCapacitor == 1)
obj.Actor ! Service.Startup()
obj.Actor ! Service.Startup
val ownerMsg = buildingEvents.receiveOne(200 milliseconds)
assert(ownerMsg match {
case BuildingActor.SuppliedWithNtu() => true
@ -197,7 +197,7 @@ class ResourceSiloControlUseTest extends FreedContextActorTest {
ant.DeploymentState = DriveState.Deployed
building.Amenities = silo
silo.Actor = system.actorOf(Props(classOf[ResourceSiloControl], silo), "test-silo")
silo.Actor ! Service.Startup()
silo.Actor ! Service.Startup
"Resource silo" should {
"respond when being used" in {
@ -225,7 +225,7 @@ class ResourceSiloControlNtuWarningTest extends ActorTest {
val zoneEvents: TestProbe = TestProbe("zone-events")
zone.AvatarEvents = zoneEvents.ref
obj.Actor ! Service.Startup()
obj.Actor ! Service.Startup
obj.Actor ! ResourceSilo.UpdateChargeLevel(-obj.NtuCapacitor)
zoneEvents.receiveN(3, 500.milliseconds) //events from setup
@ -257,7 +257,7 @@ class ResourceSiloControlUpdate1Test extends ActorTest {
val buildingEvents: TestProbe = TestProbe("building-events")
zone.AvatarEvents = zoneEvents.ref
bldg.Actor = buildingEvents.ref
obj.Actor ! Service.Startup()
obj.Actor ! Service.Startup
buildingEvents.receiveOne(500 milliseconds) //message caused by "startup"
obj.Actor ! ResourceSilo.UpdateChargeLevel(-obj.NtuCapacitor)
zoneEvents.receiveN(3, 500.milliseconds) //events from setup
@ -301,7 +301,7 @@ class ResourceSiloControlUpdate2Test extends ActorTest {
val buildingEvents: TestProbe = TestProbe("building-events")
zone.AvatarEvents = zoneEvents.ref
bldg.Actor = buildingEvents.ref
obj.Actor ! Service.Startup()
obj.Actor ! Service.Startup
buildingEvents.receiveOne(500 milliseconds) //message caused by "startup"
obj.Actor ! ResourceSilo.UpdateChargeLevel(-obj.NtuCapacitor + 100)
zoneEvents.receiveN(3, 500.milliseconds) //events from setup
@ -345,7 +345,7 @@ class ResourceSiloControlNoUpdateTest extends ActorTest {
val buildingEvents: TestProbe = TestProbe("building-events")
zone.AvatarEvents = zoneEvents.ref
bldg.Actor = buildingEvents.ref
obj.Actor ! Service.Startup()
obj.Actor ! Service.Startup
obj.NtuCapacitor = 0
"Resource silo" should {

View file

@ -190,7 +190,7 @@ class ProximityTerminalControlStartTest extends ActorTest {
avatar.GUID = PlanetSideGUID(1)
terminal.GUID = PlanetSideGUID(2)
terminal.Actor ! Service.Startup()
terminal.Actor ! Service.Startup
expectNoMessage(500 milliseconds) //spacer
val probe1 = new TestProbe(system, "local-events")
val probe2 = new TestProbe(system, "target-callback")
@ -258,7 +258,7 @@ class ProximityTerminalControlTwoUsersTest extends ActorTest {
avatar.GUID = PlanetSideGUID(1)
avatar2.GUID = PlanetSideGUID(2)
terminal.GUID = PlanetSideGUID(3)
terminal.Actor ! Service.Startup()
terminal.Actor ! Service.Startup
expectNoMessage(500 milliseconds) //spacer
val probe1 = new TestProbe(system, "local-events")
val probe2 = new TestProbe(system, "target-callback-1")
@ -317,7 +317,7 @@ class ProximityTerminalControlStopTest extends ActorTest {
avatar.GUID = PlanetSideGUID(1)
terminal.GUID = PlanetSideGUID(2)
terminal.Actor ! Service.Startup()
terminal.Actor ! Service.Startup
expectNoMessage(500 milliseconds) //spacer
val probe1 = new TestProbe(system, "local-events")
val probe2 = new TestProbe(system, "target-callback")
@ -395,7 +395,7 @@ class ProximityTerminalControlNotStopTest extends ActorTest {
avatar.GUID = PlanetSideGUID(1)
avatar2.GUID = PlanetSideGUID(2)
terminal.GUID = PlanetSideGUID(3)
terminal.Actor ! Service.Startup()
terminal.Actor ! Service.Startup
expectNoMessage(500 milliseconds) //spacer
val probe1 = new TestProbe(system, "local-events")
val probe2 = new TestProbe(system, "target-callback-1")

View file

@ -6,7 +6,7 @@ import akka.testkit.TestProbe
import base.ActorTest
import net.psforever.services.Service
import net.psforever.services.base.message.EventMessage
import net.psforever.services.base.{CachedGenericEventMessageEnvelope, EventServiceSupport, GenericEventServiceWithCacheAndSupport, GenericSupportEnvelope}
import net.psforever.services.base.{CachedGenericEventEnvelope, EventServiceSupport, GenericEventServiceWithCacheAndSupport, GenericSupportEnvelope}
import net.psforever.types.PlanetSideGUID
import scala.concurrent.duration._
@ -20,7 +20,7 @@ object EventServiceCacheSupportTest {
originalChannel: String,
override val msg: EventMessage,
supportMessage: Any
) extends CachedGenericEventMessageEnvelope with GenericSupportEnvelope {
) extends CachedGenericEventEnvelope with GenericSupportEnvelope {
assert(guid != Service.defaultPlayerGUID, "can not cache message under default GUID")
def filter: PlanetSideGUID = Service.defaultPlayerGUID
def supportLabel: String = "supportActor"

View file

@ -6,7 +6,7 @@ import akka.testkit.TestProbe
import base.ActorTest
import net.psforever.services.Service
import net.psforever.services.base.GenericEventService
import net.psforever.services.base.envelope.GenericResponseEnvelope
import net.psforever.services.base.envelope.{GenericResponseEnvelope, MessageEnvelope}
import scala.concurrent.duration._
@ -160,7 +160,7 @@ class EventServiceTest7 extends ActorTest {
case _ => assert(false, "(7) message expected but not received")
}
events.tell(Service.Leave(), probe.ref)
events.tell(Service.LeaveAll, probe.ref)
events ! MessageEnvelope("test", Service.defaultPlayerGUID, TestMessage(5))
events ! MessageEnvelope("anotherTest", Service.defaultPlayerGUID, TestMessage(6))
probe.expectNoMessage(250 milliseconds)
@ -190,7 +190,7 @@ class EventServiceTest8 extends ActorTest {
case _ => assert(false, "(7) message expected but not received")
}
events.tell(Service.LeaveAll(), probe.ref)
events.tell(Service.LeaveAll, probe.ref)
events ! MessageEnvelope("test", Service.defaultPlayerGUID, TestMessage(5))
events ! MessageEnvelope("anotherTest", Service.defaultPlayerGUID, TestMessage(6))
probe.expectNoMessage(250 milliseconds)