diff --git a/server/src/main/scala/net/psforever/server/Server.scala b/server/src/main/scala/net/psforever/server/Server.scala
index c79f47fc6..2ec30a503 100644
--- a/server/src/main/scala/net/psforever/server/Server.scala
+++ b/server/src/main/scala/net/psforever/server/Server.scala
@@ -14,7 +14,7 @@ import akka.{actor => classic}
import ch.qos.logback.classic.LoggerContext
import ch.qos.logback.classic.joran.JoranConfigurator
import io.sentry.{Sentry, SentryOptions}
-import net.psforever.actors.net.{LoginActor, MiddlewareActor, SocketActor, SocketPane}
+import net.psforever.actors.net.{LoginActor, MiddlewareActor, SocketSetup, SocketSetupInfo, SocketPane}
import net.psforever.actors.session.SessionActor
import net.psforever.login.psadmin.PsAdminActor
import net.psforever.login._
@@ -98,6 +98,19 @@ object Server {
implicit val system: ActorSystem = classic.ActorSystem("PsLogin")
Default(system)
+ val zones = Zones.zones :+ Zone.Nowhere
+ val serviceManager = ServiceManager.boot
+ serviceManager ! ServiceManager.Register(classic.Props[AccountIntermediaryService](), "accountIntermediary")
+ serviceManager ! ServiceManager.Register(classic.Props[GalaxyService](), "galaxy")
+ serviceManager ! ServiceManager.Register(classic.Props[SquadService](), "squad")
+ serviceManager ! ServiceManager.Register(classic.Props[AccountPersistenceService](), "accountPersistence")
+ serviceManager ! ServiceManager.Register(classic.Props[PropertyOverrideManager](), "propertyOverrideManager")
+ serviceManager ! ServiceManager.Register(classic.Props[HartService](), "hart")
+
+ system.spawn(CavernRotationService(), CavernRotationService.CavernRotationServiceKey.id)
+ system.spawn(InterstellarClusterService(zones), InterstellarClusterService.InterstellarClusterServiceKey.id)
+ system.spawn(ChatService(), ChatService.ChatServiceKey.id)
+
// typed to classic wrappers for login and session actors
val loginPlan = (ref: ActorRef[MiddlewareActor.Command], info: InetSocketAddress, connectionId: String) => {
import net.psforever.services.account.IPAddress
@@ -121,22 +134,13 @@ object Server {
})
})
}
-
- val zones = Zones.zones ++ Seq(Zone.Nowhere)
- val serviceManager = ServiceManager.boot
- serviceManager ! ServiceManager.Register(classic.Props[AccountIntermediaryService](), "accountIntermediary")
- serviceManager ! ServiceManager.Register(classic.Props[GalaxyService](), "galaxy")
- serviceManager ! ServiceManager.Register(classic.Props[SquadService](), "squad")
- serviceManager ! ServiceManager.Register(classic.Props[AccountPersistenceService](), "accountPersistence")
- serviceManager ! ServiceManager.Register(classic.Props[PropertyOverrideManager](), "propertyOverrideManager")
- serviceManager ! ServiceManager.Register(classic.Props[HartService](), "hart")
-
- system.spawn(CavernRotationService(), CavernRotationService.CavernRotationServiceKey.id)
- system.spawn(InterstellarClusterService(zones), InterstellarClusterService.InterstellarClusterServiceKey.id)
- system.spawn(ChatService(), ChatService.ChatServiceKey.id)
-
- system.spawn(SocketActor(new InetSocketAddress(bindAddress, Config.app.login.port), loginPlan), "login-socket")
- system.spawn(SocketPane(bindAddress, sessionPlan), "world-socket-pane")
+ system.spawn(
+ SocketPane(Seq(
+ SocketSetup("login", SocketSetupInfo(bindAddress, Seq(Config.app.login.port), loginPlan)),
+ SocketSetup("world", SocketSetupInfo(bindAddress, Config.app.world.port +: Config.app.world.ports, sessionPlan))
+ )),
+ name = SocketPane.SocketPaneKey.id
+ )
val adminListener = system.actorOf(
classic.Props(
diff --git a/src/main/scala/net/psforever/actors/net/LoginActor.scala b/src/main/scala/net/psforever/actors/net/LoginActor.scala
index af7a66413..e359a2937 100644
--- a/src/main/scala/net/psforever/actors/net/LoginActor.scala
+++ b/src/main/scala/net/psforever/actors/net/LoginActor.scala
@@ -1,7 +1,10 @@
package net.psforever.actors.net
+import akka.actor.typed.receptionist.Receptionist
+
import java.net.{InetAddress, InetSocketAddress}
import akka.actor.{Actor, ActorRef, Cancellable, MDCContextAware, typed}
+import akka.actor.typed.scaladsl.adapter._
import com.github.t3hnar.bcrypt._
import net.psforever.objects.{Account, Default}
import net.psforever.packet.PlanetSideGamePacket
@@ -17,31 +20,18 @@ import net.psforever.util.Database._
import java.security.MessageDigest
import org.joda.time.LocalDateTime
+
+import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.duration._
+import scala.util.matching.Regex
import scala.util.{Failure, Success}
-/*
object LoginActor {
- /ef apply(
- middlewareActor: typed.ActorRef[MiddlewareActor.Command],
- uuid: String
- ): Behavior[Command] =
- Behaviors.setup(context => new LoginActor(context, middlewareActor, uuid).start())
-
sealed trait Command
-}
-class LoginActor(
- middlewareActor: typed.ActorRef[MiddlewareActor.Command],
- uuid: String
-) {
-
- def start(): Unit = {
- Behaviors.receiveMessagePartial {}
- }
+ final case class ReceptionistListing(listing: Receptionist.Listing) extends Command
}
- */
class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], connectionId: String, sessionId: Long)
extends Actor
@@ -51,11 +41,12 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
private case class UpdateServerList()
- val usernameRegex = """[A-Za-z0-9]{3,}""".r
+ val usernameRegex: Regex = """[A-Za-z0-9]{3,}""".r
- var leftRef: ActorRef = ActorRef.noSender
- var rightRef: ActorRef = ActorRef.noSender
- var accountIntermediary: ActorRef = ActorRef.noSender
+ var leftRef: ActorRef = Default.Actor
+ var rightRef: ActorRef = Default.Actor
+ var accountIntermediary: ActorRef = Default.Actor
+ var sockets: typed.ActorRef[SocketPane.Command] = Default.typed.Actor
var updateServerListTask: Cancellable = Default.Cancellable
@@ -64,14 +55,15 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
var canonicalHostName: String = ""
var port: Int = 0
- val serverName = Config.app.world.serverName
+ val serverName: String = Config.app.world.serverName
val publicAddress = new InetSocketAddress(InetAddress.getByName(Config.app.public), Config.app.world.port)
private val bcryptRounds = 12
ServiceManager.serviceManager ! Lookup("accountIntermediary")
+ ServiceManager.receptionist ! Receptionist.Find(SocketPane.SocketPaneKey, context.self)
- override def postStop() = {
+ override def postStop(): Unit = {
if (updateServerListTask != null)
updateServerListTask.cancel()
}
@@ -79,42 +71,46 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
def receive: Receive = {
case ServiceManager.LookupResult("accountIntermediary", endpoint) =>
accountIntermediary = endpoint
+
+ case SocketPane.SocketPaneKey.Listing(listings) =>
+ sockets = listings.head
+
case ReceiveIPAddress(address) =>
ipAddress = address.Address
hostName = address.HostName
canonicalHostName = address.CanonicalHostName
port = address.Port
+
case UpdateServerList() =>
updateServerList()
+
case packet: PlanetSideGamePacket =>
handleGamePkt(packet)
- case default => failWithError(s"Invalid packet class received: $default")
+
+ case SocketPane.NextPort(_, address, portNum) =>
+ val response = ConnectToWorldMessage(serverName, address.getHostAddress, portNum)
+ middlewareActor ! MiddlewareActor.Send(response)
+ middlewareActor ! MiddlewareActor.Close()
+
+ case default =>
+ failWithError(s"Invalid packet class received: $default")
}
- def handleGamePkt(pkt: PlanetSideGamePacket) =
+ def handleGamePkt(pkt: PlanetSideGamePacket): Unit =
pkt match {
case LoginMessage(majorVersion, minorVersion, buildDate, username, password, token, revision) =>
// TODO: prevent multiple LoginMessages from being processed in a row!! We need a state machine
-
val clientVersion = s"Client Version: $majorVersion.$minorVersion.$revision, $buildDate"
-
if (token.isDefined)
log.debug(s"New login UN:$username Token:${token.get}. $clientVersion")
else {
log.debug(s"New login UN:$username. $clientVersion")
}
-
- getAccountLogin(username, password, token)
+ requestAccountLogin(username, password, token)
case ConnectToWorldRequestMessage(name, _, _, _, _, _, _, _) =>
log.info(s"Connect to world request for '$name'")
- val response = ConnectToWorldMessage(
- serverName,
- publicAddress.getAddress.getHostAddress,
- SocketPane.Rotation.NextPort
- )
- middlewareActor ! MiddlewareActor.Send(response)
- middlewareActor ! MiddlewareActor.Close()
+ sockets ! SocketPane.GetNextPort("world", context.self)
case _ =>
log.warning(s"Unhandled GamePacket $pkt")
@@ -123,23 +119,19 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
// generates a password from username and password combination
// mimics the process the launcher follows and hashes the password salted by the username
def generateNewPassword(username: String, password: String): String = {
-
// salt password hash with username (like the launcher does) (username + password)
val saltedPassword = username.concat(password)
-
// https://stackoverflow.com/a/46332228
// hash password (like the launcher sends)
val hashedPassword = MessageDigest.getInstance("SHA-256")
.digest(saltedPassword.getBytes("UTF-8"))
.map("%02x".format(_)).mkString
-
// bcrypt hash for DB storage
val bcryptedPassword = hashedPassword.bcryptBounded(bcryptRounds)
-
bcryptedPassword
}
- def getAccountLogin(username: String, passwordOpt: Option[String], tokenOpt: Option[String]): Unit = {
+ def requestAccountLogin(username: String, passwordOpt: Option[String], tokenOpt: Option[String]): Unit = {
tokenOpt match {
case Some(token) => accountLoginWithToken(token)
case None => accountLogin(username, passwordOpt.getOrElse(""))
@@ -147,9 +139,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
}
def accountLogin(username: String, password: String): Unit = {
-
import ctx._
-
val newToken = this.generateToken()
val result = for {
// backwards compatibility: prefer exact match first, then try lowercase
@@ -160,21 +150,16 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
case Some(_) =>
Future.successful(Seq())
}
-
accountOption <- accountsExact.headOption orElse accountsLower.headOption match {
-
// account found
case Some(account) =>
Future.successful(Some(account))
-
// create new account
case None =>
if (Config.app.login.createMissingAccounts) {
-
// generate bcrypted passwords
val bcryptedPassword = generateNewPassword(username, password)
val passhash = password.bcryptBounded(bcryptRounds)
-
// save bcrypted password hash to DB
ctx.run(
query[persistence.Account]
@@ -187,7 +172,6 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
) flatMap { id => ctx.run(query[persistence.Account].filter(_.id == lift(id))) } map { accounts =>
Some(accounts.head)
}
-
} else {
loginFailureResponse(username, newToken)
Future.successful(None)
@@ -196,14 +180,11 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
login <- accountOption match {
case Some(account) =>
-
// remember: this is the in client "StagingTest" login handling
// the password is send in clear and needs to be checked against the "old" (only bcrypted) passhash
// if there ever is a way to update the password in the future passhash and password need be updated
(account.inactive, password.isBcryptedBounded(account.passhash)) match {
-
case (false, true) =>
-
accountIntermediary ! StoreAccountData(newToken, Account(account.id, account.username, account.gm))
val future = ctx.run(
query[persistence.Login].insert(
@@ -214,14 +195,11 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
_.port -> lift(port)
)
)
-
// handle new password
if (account.password == "") {
-
// generate bcrypted password
// use username as provided by the user (db entry could be wrong), that is the way the launcher does it
val bcryptedPassword = generateNewPassword(username, password)
-
// update account, set password
ctx.run(
query[persistence.Account]
@@ -229,21 +207,21 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
.update(_.password -> lift(bcryptedPassword))
)
}
-
loginSuccessfulResponse(username, newToken)
updateServerListTask =
context.system.scheduler.scheduleWithFixedDelay(0 seconds, 5 seconds, self, UpdateServerList())
future
+
case (_, false) =>
loginPwdFailureResponse(username, newToken)
Future.successful(None)
+
case (true, _) =>
loginAccountFailureResponse(username, newToken)
Future.successful(None)
}
case None => Future.successful(None)
}
-
} yield login
result.onComplete {
@@ -253,18 +231,12 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
}
def accountLoginWithToken(token: String): Unit = {
-
import ctx._
-
val newToken = this.generateToken()
val result = for {
-
accountsExact <- ctx.run(query[persistence.Account].filter(_.token.getOrNull == lift(token)))
-
accountOption <- accountsExact.headOption match {
-
case Some(account) =>
-
// token expires after 2 hours
// new connections and players leaving a world server will return to desktop
if (LocalDateTime.now().isAfter(account.tokenCreated.get.plusHours(2))) {
@@ -282,9 +254,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
login <- accountOption match {
case Some(account) =>
(account.inactive, account.token.getOrElse("") == token) match {
-
case (false, true) =>
-
accountIntermediary ! StoreAccountData(newToken, Account(account.id, account.username, account.gm))
val future = ctx.run(
query[persistence.Login].insert(
@@ -295,7 +265,6 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
_.port -> lift(port)
)
)
-
loginSuccessfulResponseToken(account.username, token, newToken)
updateServerListTask =
context.system.scheduler.scheduleWithFixedDelay(0 seconds, 5 seconds, self, UpdateServerList())
@@ -309,10 +278,8 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
loginAccountFailureResponseToken(account.username, token, newToken)
Future.successful(None)
}
-
case None => Future.successful(None)
}
-
} yield login
result.onComplete {
@@ -321,7 +288,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
}
}
- def loginSuccessfulResponse(username: String, newToken: String) = {
+ def loginSuccessfulResponse(username: String, newToken: String): Unit = {
middlewareActor ! MiddlewareActor.Send(
LoginRespMessage(
newToken,
@@ -335,10 +302,8 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
)
}
- def loginSuccessfulResponseToken(username: String, token: String, newToken: String) = {
-
+ def loginSuccessfulResponseToken(username: String, token: String, newToken: String): Unit = {
log.info(s"User $username logged in unsing token $token")
-
middlewareActor ! MiddlewareActor.Send(
LoginRespMessage(
newToken,
@@ -352,7 +317,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
)
}
- def loginPwdFailureResponse(username: String, newToken: String) = {
+ def loginPwdFailureResponse(username: String, newToken: String): Unit = {
log.warning(s"Failed login to account $username")
middlewareActor ! MiddlewareActor.Send(
LoginRespMessage(
@@ -367,7 +332,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
)
}
- def loginFailureResponseToken(token: String, newToken: String) = {
+ def loginFailureResponseToken(token: String, newToken: String): Unit = {
log.warning(s"Failed login using unknown token $token")
middlewareActor ! MiddlewareActor.Send(
LoginRespMessage(
@@ -382,7 +347,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
)
}
- def loginFailureResponseTokenExpired(token: String, newToken: String) = {
+ def loginFailureResponseTokenExpired(token: String, newToken: String): Unit = {
log.warning(s"Failed login using expired token $token")
middlewareActor ! MiddlewareActor.Send(
LoginRespMessage(
@@ -397,7 +362,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
)
}
- def loginFailureResponse(username: String, newToken: String) = {
+ def loginFailureResponse(username: String, newToken: String): Unit = {
log.warning(s"DB problem username: $username")
middlewareActor ! MiddlewareActor.Send(
LoginRespMessage(
@@ -412,7 +377,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
)
}
- def loginFailureResponseToken(username: String, token: String, newToken: String) = {
+ def loginFailureResponseToken(username: String, token: String, newToken: String): Unit = {
log.warning(s"DB problem username $username token: $token")
middlewareActor ! MiddlewareActor.Send(
LoginRespMessage(
@@ -427,7 +392,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
)
}
- def loginAccountFailureResponse(username: String, newToken: String) = {
+ def loginAccountFailureResponse(username: String, newToken: String): Unit = {
log.warning(s"Account $username inactive")
middlewareActor ! MiddlewareActor.Send(
LoginRespMessage(
@@ -442,7 +407,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
)
}
- def loginAccountFailureResponseToken(username: String, token: String, newToken: String) = {
+ def loginAccountFailureResponseToken(username: String, token: String, newToken: String): Unit = {
log.warning(s"Account $username inactive token: $token ")
middlewareActor ! MiddlewareActor.Send(
LoginRespMessage(
@@ -457,16 +422,16 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
)
}
- def generateToken() = {
+ def generateToken(): String = {
val r = new scala.util.Random
- val sb = new StringBuilder
+ val sb = new mutable.StringBuilder
for (_ <- 1 to 31) {
sb.append(r.nextPrintableChar())
}
sb.toString
}
- def updateServerList() = {
+ def updateServerList(): Unit = {
middlewareActor ! MiddlewareActor.Send(
VNLWorldStatusMessage(
"Welcome to PlanetSide! ",
@@ -475,7 +440,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
serverName,
WorldStatus.Up,
Config.app.world.serverType,
- Vector(WorldConnectionInfo(publicAddress)),
+ Vector(WorldConnectionInfo(publicAddress)), //todo ideally, ask for info from SocketPane
PlanetSideEmpire.VS
)
)
@@ -487,5 +452,4 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne
log.error(error)
middlewareActor ! MiddlewareActor.Close()
}
-
}
diff --git a/src/main/scala/net/psforever/actors/net/NetworkSimulator.scala b/src/main/scala/net/psforever/actors/net/NetworkSimulator.scala
new file mode 100644
index 000000000..3f2e35104
--- /dev/null
+++ b/src/main/scala/net/psforever/actors/net/NetworkSimulator.scala
@@ -0,0 +1,58 @@
+// Copyright (c) 2024 PSForever
+package net.psforever.actors.net
+
+import akka.actor.typed.{ActorRef, Behavior}
+import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors}
+import akka.io.Udp
+import net.psforever.util.Config
+
+import java.util.concurrent.ThreadLocalRandom
+import scala.util.Random
+import scala.concurrent.duration._
+
+// TODO? This doesn't quite support all parameters of the old network simulator
+// Need to decide wheter they are necessary or not
+// https://github.com/psforever/PSF-LoginServer/blob/07f447c2344ab55d581317316c41571772ac2242/src/main/scala/net/psforever/login/UdpNetworkSimulator.scala
+private object NetworkSimulator {
+ def apply(socketActor: ActorRef[SocketActor.Command]): Behavior[Udp.Message] =
+ Behaviors.setup(context => new NetworkSimulator(context, socketActor))
+}
+
+private class NetworkSimulator(context: ActorContext[Udp.Message], socketActor: ActorRef[SocketActor.Command])
+ extends AbstractBehavior[Udp.Message](context) {
+
+ private[this] val log = org.log4s.getLogger
+
+ override def onMessage(message: Udp.Message): Behavior[Udp.Message] = {
+ message match {
+ case _: Udp.Received | _: Udp.Send =>
+ simulate(message)
+ Behaviors.same
+ case _ =>
+ socketActor ! toSocket(message)
+ Behaviors.same
+ }
+ }
+
+ def simulate(message: Udp.Message): Unit = {
+ if (Random.nextDouble() > Config.app.development.netSim.loss) {
+ if (Random.nextDouble() <= Config.app.development.netSim.reorderChance) {
+ context.scheduleOnce(
+ ThreadLocalRandom.current().nextDouble(0.01, 0.2).seconds,
+ socketActor,
+ toSocket(message)
+ )
+ } else {
+ socketActor ! toSocket(message)
+ }
+ } else {
+ log.trace("Network simulator dropped packet")
+ }
+ }
+
+ def toSocket(message: Udp.Message): SocketActor.Command =
+ message match {
+ case message: Udp.Command => SocketActor.UdpCommandMessage(message)
+ case message: Udp.Event => SocketActor.UdpEventMessage(message)
+ }
+}
diff --git a/src/main/scala/net/psforever/actors/net/SocketActor.scala b/src/main/scala/net/psforever/actors/net/SocketActor.scala
index 88150a4ba..09ca2e26d 100644
--- a/src/main/scala/net/psforever/actors/net/SocketActor.scala
+++ b/src/main/scala/net/psforever/actors/net/SocketActor.scala
@@ -3,11 +3,10 @@ package net.psforever.actors.net
import java.net.InetSocketAddress
import java.security.SecureRandom
import java.util.UUID.randomUUID
-import java.util.concurrent.ThreadLocalRandom
import akka.actor.Cancellable
import akka.{actor => classic}
import akka.actor.typed.{ActorRef, ActorTags, Behavior, PostStop, Terminated}
-import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors}
+import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.io.{IO, Udp}
import akka.actor.typed.scaladsl.adapter._
import net.psforever.packet.PlanetSidePacket
@@ -16,12 +15,16 @@ import scodec.interop.akka.EnrichedByteString
import scala.collection.mutable
import scala.concurrent.ExecutionContextExecutor
-import scala.concurrent.duration.{DurationDouble, DurationInt}
-import scala.util.Random
+import scala.concurrent.duration.DurationInt
-/** SocketActor creates a UDP socket, receives packets and forwards them to MiddlewareActor
- * There is only one SocketActor, but each connected client gets its own MiddlewareActor
- */
+/**
+ * Create a networking port attachment that accepts user datagram protocol (UDP) packets
+ * and forwards those packets to a business logic unit referred to herein as a "plan".
+ * The landing site of a plan is a processing entity of middleware logic that dissects composed packet data
+ * and pushes that further down the chain to the business logic.
+ * Each instance of middleware support and then business logic
+ * is associated with a unique clients that attempt to connect to the server though this socket port.
+ */
object SocketActor {
def apply(
address: InetSocketAddress,
@@ -36,6 +39,8 @@ object SocketActor {
private final case class UdpUnboundMessage(message: Udp.Unbound) extends Command
private final case class Bound(socket: classic.ActorRef) extends Command
private final case class StopChild(ref: ActorRef[MiddlewareActor.Command]) extends Command
+ final case class AskSocketLoad(replyTo: ActorRef[Any]) extends Command
+ final case class SocketLoad(sessions: Int)
// Typed actors cannot access sender but you can only get the socket that way
private class SenderHack(ref: ActorRef[SocketActor.Command]) extends classic.Actor {
@@ -49,58 +54,11 @@ object SocketActor {
}
}
-// TODO? This doesn't quite support all parameters of the old network simulator
-// Need to decide wheter they are necessary or not
-// https://github.com/psforever/PSF-LoginServer/blob/07f447c2344ab55d581317316c41571772ac2242/src/main/scala/net/psforever/login/UdpNetworkSimulator.scala
-private object NetworkSimulator {
- def apply(socketActor: ActorRef[SocketActor.Command]): Behavior[Udp.Message] =
- Behaviors.setup(context => new NetworkSimulator(context, socketActor))
-}
-
-private class NetworkSimulator(context: ActorContext[Udp.Message], socketActor: ActorRef[SocketActor.Command])
- extends AbstractBehavior[Udp.Message](context) {
-
- private[this] val log = org.log4s.getLogger
-
- override def onMessage(message: Udp.Message): Behavior[Udp.Message] = {
- message match {
- case _: Udp.Received | _: Udp.Send =>
- simulate(message)
- Behaviors.same
- case _ =>
- socketActor ! toSocket(message)
- Behaviors.same
- }
- }
-
- def simulate(message: Udp.Message): Unit = {
- if (Random.nextDouble() > Config.app.development.netSim.loss) {
- if (Random.nextDouble() <= Config.app.development.netSim.reorderChance) {
- context.scheduleOnce(
- ThreadLocalRandom.current().nextDouble(0.01, 0.2).seconds,
- socketActor,
- toSocket(message)
- )
- } else {
- socketActor ! toSocket(message)
- }
- } else {
- log.trace("Network simulator dropped packet")
- }
- }
-
- def toSocket(message: Udp.Message): SocketActor.Command =
- message match {
- case message: Udp.Command => SocketActor.UdpCommandMessage(message)
- case message: Udp.Event => SocketActor.UdpEventMessage(message)
- }
-}
-
class SocketActor(
context: ActorContext[SocketActor.Command],
address: InetSocketAddress,
nextPlan: (ActorRef[MiddlewareActor.Command], InetSocketAddress, String) => Behavior[PlanetSidePacket]
-) {
+ ) {
import SocketActor._
import SocketActor.Command
@@ -122,8 +80,8 @@ class SocketActor(
context.spawnAnonymous(NetworkSimulator(context.self))
}
- val updUnboundAdapter: ActorRef[Udp.Unbound] = context.messageAdapter[Udp.Unbound](UdpUnboundMessage)
- val senderHack: classic.ActorRef = context.actorOf(classic.Props(new SenderHack(context.self)))
+ //val updUnboundAdapter: ActorRef[Udp.Unbound] = context.messageAdapter[Udp.Unbound](UdpUnboundMessage)
+ val senderHack: classic.ActorRef = context.actorOf(classic.Props(new SenderHack(context.self)))
IO(Udp)(context.system.classicSystem).tell(Udp.Bind(updEventAdapter.toClassic, address), senderHack)
@@ -207,13 +165,16 @@ class SocketActor(
socket ! message
Behaviors.same
+ case AskSocketLoad(replyTo) =>
+ replyTo ! SocketLoad(packetActors.size)
+ Behaviors.same
+
case UdpUnboundMessage(_) =>
Behaviors.stopped
case StopChild(ref) =>
context.stop(ref)
Behaviors.same
-
}
.receiveSignal {
case (_, PostStop) =>
diff --git a/src/main/scala/net/psforever/actors/net/SocketPane.scala b/src/main/scala/net/psforever/actors/net/SocketPane.scala
index f287ec205..aa163d926 100644
--- a/src/main/scala/net/psforever/actors/net/SocketPane.scala
+++ b/src/main/scala/net/psforever/actors/net/SocketPane.scala
@@ -1,22 +1,34 @@
// Copyright (c) 2024 PSForever
package net.psforever.actors.net
-import net.psforever.util.Config
+import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import java.net.{InetAddress, InetSocketAddress}
-//import akka.{actor => classic}
+import akka.{actor => classic}
import akka.actor.typed.{ActorRef, Behavior, PostStop}
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import net.psforever.packet.PlanetSidePacket
-final case class SocketPanePortRotation(
- portNumbers: Seq[Int],
- index: Int = 0
- ) {
+/**
+ * For a given sequence of ports,
+ * cycle through port numbers until the last port number has been produced
+ * then start again from the first port number again.
+ * @param portNumbers the sequence of ports to be cycled between
+ * @param index optional;
+ * the starting index in the sequence of ports
+ */
+private[net] case class SocketPanePortRotation(
+ portNumbers: Seq[Int],
+ index: Int = 0
+ ) {
private var currentIndex: Int = index
- def NextPort: Int = this.synchronized {
- val out = portNumbers.lift(currentIndex).getOrElse(Config.app.world.port)
+ /**
+ * Retrieve the sequentially next port number.
+ * @return the next port number
+ */
+ def NextPort: Int = {
+ val out = portNumbers.lift(currentIndex).orElse(portNumbers.headOption).getOrElse(0)
currentIndex += 1
currentIndex %= portNumbers.size
out
@@ -24,73 +36,171 @@ final case class SocketPanePortRotation(
}
object SocketPanePortRotation {
+ /**
+ * Overwritten constructor for `SocketPanePortRotation` entities.
+ * Copy constructor, essentially, that retains the internal current rotation index.
+ * @param rotation the previous rotation entity
+ * @return a copy of the previous rotation entity
+ */
def apply(rotation: SocketPanePortRotation): SocketPanePortRotation = {
SocketPanePortRotation(rotation.portNumbers, rotation.currentIndex)
}
+ /**
+ * Overwritten constructor for `SocketPanePortRotation` entities.
+ * Adda new port to the list of ports but retain the internal current rotation index.
+ * @param rotation the previous rotation entity
+ * @param newPort the new port number
+ * @return a copy of the previous rotation entity with an additional port that can be selected
+ */
def apply(rotation: SocketPanePortRotation, newPort: Int): SocketPanePortRotation = {
SocketPanePortRotation(rotation.portNumbers :+ newPort, rotation.currentIndex)
}
}
-/** SocketActor creates a UDP socket, receives packets and forwards them to MiddlewareActor
- * There is only one SocketActor, but each connected client gets its own MiddlewareActor
+/**
+ * Information explaining how to manage a socket group.
+ * @param groupId moniker for the group
+ * @param info how the sockets in this group operate
+ * @param initial should this socket group be protected as "original";
+ * defaults to `false`
*/
+final case class SocketSetup(groupId: String, info: SocketSetupInfo, initial: Boolean = false) {
+ assert(info.ports.nonEmpty, s"port group $groupId should define port numbers")
+ /* the same port can belong to multiple groups, but the same port may not be repeated in the same group */
+ assert(info.ports.size == info.ports.distinct.size, s"port group $groupId should not contain duplicate port numbers")
+}
+
+/**
+ * Information explaining the details of a socket group.
+ * @param address Internet protocol location of the host
+ * @param ports network ports that are used as endpoints of transmission;
+ * corresponds to a number of sockets
+ * @param planOfAction whenever a new connection across a socket is made, the method of consuming network packets
+ */
+final case class SocketSetupInfo(
+ address: InetAddress,
+ ports: Seq[Int],
+ planOfAction: (ActorRef[MiddlewareActor.Command], InetSocketAddress, String) => Behavior[PlanetSidePacket]
+ )
+
object SocketPane {
- def apply(
- address: InetAddress,
- nextPlan: (ActorRef[MiddlewareActor.Command], InetSocketAddress, String) => Behavior[PlanetSidePacket]
- ): Behavior[Command] =
- Behaviors.setup(context => new SocketPane(context, address, nextPlan).start())
+ val SocketPaneKey: ServiceKey[Command] = ServiceKey[SocketPane.Command](id = "socketPane")
+
+ /**
+ * Overwritten constructor for `SocketPane` entities.
+ * Registers the entity with the actor receptionist.
+ * @param setupInfo the details of the socket groups
+ * @return a `SocketPane` entity
+ */
+ def apply(setupInfo: Seq[SocketSetup]): Behavior[Command] = {
+ Behaviors.setup { context =>
+ context.system.receptionist ! Receptionist.Register(SocketPaneKey, context.self)
+ new SocketPane(context, setupInfo).start()
+ }
+ }
sealed trait Command
- final case class CreateNewSocket(port: Int) extends Command
+ final case class CreateNewSocket(groupId: String, port: Int) extends Command
- private var rotation: SocketPanePortRotation = SocketPanePortRotation(Array(Config.app.world.port))
+ final case class CreateNewSocketGroup(groupId: String, info: SocketSetupInfo) extends Command
- def Rotation: SocketPanePortRotation = SocketPanePortRotation(rotation)
+ final case class GetNextPort(groupId: String, replyTo: classic.ActorRef) extends Command
- final def getDefaultPorts: Seq[Int] = {
- val config = Config.app.world
- (config.port +: config.ports).distinct
- }
+ final case class NextPort(groupId: String, address: InetAddress, port: Int)
}
+/**
+ * Management of sockets connecting to the network ports.
+ *
+ * Connections to the networking ports are handled by the logic imposed by this class
+ * and are handled by sockets that bind to those ports and accept or pass packets to game logic.
+ * This game logic is encapsulated by an anonymous function
+ * that is automatically implemented into a game logic machine (consumption and production)
+ * upon unique connections detected / attempted across those sockets.
+ * Multiple sockets can connect to the same port so no compensation is required.
+ *
+ * New sockets to ports can be added to existing groups after the initial socket groups.
+ * New socket groups can be created after the initial information.
+ * @param context hook for setting up the sockets and, eventually, their packet logic
+ * @param initialPortSetup the details of the socket groups
+ */
class SocketPane(
context: ActorContext[SocketPane.Command],
- address: InetAddress,
- next: (ActorRef[MiddlewareActor.Command], InetSocketAddress, String) => Behavior[PlanetSidePacket]
+ private val initialPortSetup: Seq[SocketSetup]
) {
- private[this] val log = org.log4s.getLogger
+ private[this] val log = org.log4s.getLogger("SocketPane")
- private var socketActors: Array[ActorRef[SocketActor.Command]] = SocketPane.getDefaultPorts.map { i =>
- context.spawn(SocketActor(new InetSocketAddress(address, i), next), name=s"world-socket-$i")
+ /** original socket group information, now properly flagged as "original" */
+ private var socketConfigs: Seq[SocketSetup] = initialPortSetup.map { setup => setup.copy(initial = true) }
+ /** all sockets produced by the socket group information and any later socket creation commands */
+ private var socketActors: Array[ActorRef[SocketActor.Command]] = initialPortSetup.flatMap {
+ case SocketSetup(_, SocketSetupInfo(address, ports, plan), _) =>
+ ports.map { portNum => context.spawn(SocketActor(new InetSocketAddress(address, portNum), plan), name=s"world-socket-$portNum") }
+ }.toArray
+ /** load balancing for redirecting newly discovered packet input to different sockets (ports);
+ * should be referenced externally to switch sockets;
+ * see SocketActor.GGetNextPort */
+ private var socketRotations: Array[SocketPanePortRotation] = initialPortSetup.map {
+ case SocketSetup(_, SocketSetupInfo(_, ports, _), _) => SocketPanePortRotation(ports)
}.toArray
- SocketPane.rotation = SocketPanePortRotation(SocketPane.getDefaultPorts)
- log.info(s"Configured ${SocketPane.getDefaultPorts.size} game world instance ports")
+ log.info(s"configured ${socketActors.length} ports initially")
def start(): Behavior[SocketPane.Command] = {
Behaviors
.receiveMessagePartial[SocketPane.Command] {
- case SocketPane.CreateNewSocket(port)
- if SocketPane.Rotation.portNumbers.contains(port) =>
+ case SocketPane.CreateNewSocket(key, _)
+ if !socketConfigs.exists { setup => setup.groupId == key } =>
+ log.warn(s"new port group $key does not exist and can not be appended to")
Behaviors.same
- case SocketPane.CreateNewSocket(port) =>
- socketActors = socketActors :+ context.spawn(SocketActor(new InetSocketAddress(address, port), next), name=s"world-socket-$port")
- SocketPane.rotation = SocketPanePortRotation(SocketPane.rotation, port)
- log.info(s"Requested new socket to port $port has been created")
+ case SocketPane.CreateNewSocket(groupId, port)
+ if socketConfigs
+ .find { setup => setup.groupId == groupId }
+ .exists { case SocketSetup(_, SocketSetupInfo(_, ports, _), _) => ports.contains(port) } =>
+ log.info(s"new port $port for group $groupId already supported")
+ Behaviors.same
+
+ case SocketPane.CreateNewSocket(groupId, port) =>
+ log.info(s"new socket to port $port created in $groupId")
+ val index = socketConfigs.indexWhere { setup => setup.groupId == groupId }
+ val SocketSetup(_, SocketSetupInfo(address, ports, plan), _) = socketConfigs(index)
+ socketActors = socketActors :+
+ context.spawn(SocketActor(new InetSocketAddress(address, port), plan), name=s"world-socket-$port")
+ socketConfigs = (socketConfigs.take(index) :+ SocketSetup(groupId, SocketSetupInfo(address, ports :+ port, plan))) ++
+ socketConfigs.drop(index + 1)
+ socketRotations = (socketRotations.take(index) :+ SocketPanePortRotation(socketRotations(index), port)) ++
+ socketRotations.drop(index + 1)
Behaviors.same
- case _ =>
+ case SocketPane.CreateNewSocketGroup(groupId, _)
+ if socketConfigs.exists { case SocketSetup(oldKey, _, _) => oldKey == groupId } =>
+ log.warn(s"port group $groupId already exists and can not be created twice")
+ Behaviors.same
+
+ case SocketPane.CreateNewSocketGroup(groupId, info @ SocketSetupInfo(address, ports, plan)) =>
+ socketActors = socketActors ++
+ ports.map { portNum => context.spawn(SocketActor(new InetSocketAddress(address, portNum), plan), name=s"world-socket-$portNum") }
+ socketConfigs = socketConfigs :+
+ SocketSetup(groupId, info)
+ socketRotations = socketRotations :+
+ SocketPanePortRotation(ports)
+ Behaviors.same
+
+ case SocketPane.GetNextPort(groupId, replyTo) =>
+ socketConfigs.indexWhere { setup => setup.groupId == groupId } match {
+ case -1 =>
+ log.warn(s"port group $groupId does not exist")
+ case index =>
+ replyTo ! SocketPane.NextPort(groupId, socketConfigs(index).info.address, socketRotations(index).NextPort)
+ }
Behaviors.same
}
.receiveSignal {
case (_, PostStop) =>
socketActors.foreach(context.stop)
- SocketPane.rotation = SocketPanePortRotation(Array(Config.app.world.port))
Behaviors.same
}
}
diff --git a/src/main/scala/net/psforever/services/account/AccountIntermediaryService.scala b/src/main/scala/net/psforever/services/account/AccountIntermediaryService.scala
index 8db0d20a2..3f884bfa6 100644
--- a/src/main/scala/net/psforever/services/account/AccountIntermediaryService.scala
+++ b/src/main/scala/net/psforever/services/account/AccountIntermediaryService.scala
@@ -25,7 +25,7 @@ class AccountIntermediaryService extends Actor {
private val IPAddressBySessionID = mutable.Map[Long, IPAddress]()
private[this] val log = org.log4s.getLogger
- def receive = {
+ def receive: Receive = {
// Called by the LoginSessionActor
case StoreAccountData(token, account) =>
accountsByToken += (token -> account)