From eb3b180fccd4877a5275dc2cbdc35bfd95eef5a2 Mon Sep 17 00:00:00 2001 From: Fate-JH Date: Tue, 25 Jun 2024 21:10:52 -0400 Subject: [PATCH] combining port management into the sector pane --- .../scala/net/psforever/server/Server.scala | 38 ++-- .../net/psforever/actors/net/LoginActor.scala | 132 +++++-------- .../actors/net/NetworkSimulator.scala | 58 ++++++ .../psforever/actors/net/SocketActor.scala | 77 ++------ .../net/psforever/actors/net/SocketPane.scala | 184 ++++++++++++++---- .../account/AccountIntermediaryService.scala | 2 +- 6 files changed, 294 insertions(+), 197 deletions(-) create mode 100644 src/main/scala/net/psforever/actors/net/NetworkSimulator.scala diff --git a/server/src/main/scala/net/psforever/server/Server.scala b/server/src/main/scala/net/psforever/server/Server.scala index c79f47fc6..2ec30a503 100644 --- a/server/src/main/scala/net/psforever/server/Server.scala +++ b/server/src/main/scala/net/psforever/server/Server.scala @@ -14,7 +14,7 @@ import akka.{actor => classic} import ch.qos.logback.classic.LoggerContext import ch.qos.logback.classic.joran.JoranConfigurator import io.sentry.{Sentry, SentryOptions} -import net.psforever.actors.net.{LoginActor, MiddlewareActor, SocketActor, SocketPane} +import net.psforever.actors.net.{LoginActor, MiddlewareActor, SocketSetup, SocketSetupInfo, SocketPane} import net.psforever.actors.session.SessionActor import net.psforever.login.psadmin.PsAdminActor import net.psforever.login._ @@ -98,6 +98,19 @@ object Server { implicit val system: ActorSystem = classic.ActorSystem("PsLogin") Default(system) + val zones = Zones.zones :+ Zone.Nowhere + val serviceManager = ServiceManager.boot + serviceManager ! ServiceManager.Register(classic.Props[AccountIntermediaryService](), "accountIntermediary") + serviceManager ! ServiceManager.Register(classic.Props[GalaxyService](), "galaxy") + serviceManager ! ServiceManager.Register(classic.Props[SquadService](), "squad") + serviceManager ! ServiceManager.Register(classic.Props[AccountPersistenceService](), "accountPersistence") + serviceManager ! ServiceManager.Register(classic.Props[PropertyOverrideManager](), "propertyOverrideManager") + serviceManager ! ServiceManager.Register(classic.Props[HartService](), "hart") + + system.spawn(CavernRotationService(), CavernRotationService.CavernRotationServiceKey.id) + system.spawn(InterstellarClusterService(zones), InterstellarClusterService.InterstellarClusterServiceKey.id) + system.spawn(ChatService(), ChatService.ChatServiceKey.id) + // typed to classic wrappers for login and session actors val loginPlan = (ref: ActorRef[MiddlewareActor.Command], info: InetSocketAddress, connectionId: String) => { import net.psforever.services.account.IPAddress @@ -121,22 +134,13 @@ object Server { }) }) } - - val zones = Zones.zones ++ Seq(Zone.Nowhere) - val serviceManager = ServiceManager.boot - serviceManager ! ServiceManager.Register(classic.Props[AccountIntermediaryService](), "accountIntermediary") - serviceManager ! ServiceManager.Register(classic.Props[GalaxyService](), "galaxy") - serviceManager ! ServiceManager.Register(classic.Props[SquadService](), "squad") - serviceManager ! ServiceManager.Register(classic.Props[AccountPersistenceService](), "accountPersistence") - serviceManager ! ServiceManager.Register(classic.Props[PropertyOverrideManager](), "propertyOverrideManager") - serviceManager ! ServiceManager.Register(classic.Props[HartService](), "hart") - - system.spawn(CavernRotationService(), CavernRotationService.CavernRotationServiceKey.id) - system.spawn(InterstellarClusterService(zones), InterstellarClusterService.InterstellarClusterServiceKey.id) - system.spawn(ChatService(), ChatService.ChatServiceKey.id) - - system.spawn(SocketActor(new InetSocketAddress(bindAddress, Config.app.login.port), loginPlan), "login-socket") - system.spawn(SocketPane(bindAddress, sessionPlan), "world-socket-pane") + system.spawn( + SocketPane(Seq( + SocketSetup("login", SocketSetupInfo(bindAddress, Seq(Config.app.login.port), loginPlan)), + SocketSetup("world", SocketSetupInfo(bindAddress, Config.app.world.port +: Config.app.world.ports, sessionPlan)) + )), + name = SocketPane.SocketPaneKey.id + ) val adminListener = system.actorOf( classic.Props( diff --git a/src/main/scala/net/psforever/actors/net/LoginActor.scala b/src/main/scala/net/psforever/actors/net/LoginActor.scala index af7a66413..e359a2937 100644 --- a/src/main/scala/net/psforever/actors/net/LoginActor.scala +++ b/src/main/scala/net/psforever/actors/net/LoginActor.scala @@ -1,7 +1,10 @@ package net.psforever.actors.net +import akka.actor.typed.receptionist.Receptionist + import java.net.{InetAddress, InetSocketAddress} import akka.actor.{Actor, ActorRef, Cancellable, MDCContextAware, typed} +import akka.actor.typed.scaladsl.adapter._ import com.github.t3hnar.bcrypt._ import net.psforever.objects.{Account, Default} import net.psforever.packet.PlanetSideGamePacket @@ -17,31 +20,18 @@ import net.psforever.util.Database._ import java.security.MessageDigest import org.joda.time.LocalDateTime + +import scala.collection.mutable import scala.concurrent.Future import scala.concurrent.duration._ +import scala.util.matching.Regex import scala.util.{Failure, Success} -/* object LoginActor { - /ef apply( - middlewareActor: typed.ActorRef[MiddlewareActor.Command], - uuid: String - ): Behavior[Command] = - Behaviors.setup(context => new LoginActor(context, middlewareActor, uuid).start()) - sealed trait Command -} -class LoginActor( - middlewareActor: typed.ActorRef[MiddlewareActor.Command], - uuid: String -) { - - def start(): Unit = { - Behaviors.receiveMessagePartial {} - } + final case class ReceptionistListing(listing: Receptionist.Listing) extends Command } - */ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], connectionId: String, sessionId: Long) extends Actor @@ -51,11 +41,12 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne private case class UpdateServerList() - val usernameRegex = """[A-Za-z0-9]{3,}""".r + val usernameRegex: Regex = """[A-Za-z0-9]{3,}""".r - var leftRef: ActorRef = ActorRef.noSender - var rightRef: ActorRef = ActorRef.noSender - var accountIntermediary: ActorRef = ActorRef.noSender + var leftRef: ActorRef = Default.Actor + var rightRef: ActorRef = Default.Actor + var accountIntermediary: ActorRef = Default.Actor + var sockets: typed.ActorRef[SocketPane.Command] = Default.typed.Actor var updateServerListTask: Cancellable = Default.Cancellable @@ -64,14 +55,15 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne var canonicalHostName: String = "" var port: Int = 0 - val serverName = Config.app.world.serverName + val serverName: String = Config.app.world.serverName val publicAddress = new InetSocketAddress(InetAddress.getByName(Config.app.public), Config.app.world.port) private val bcryptRounds = 12 ServiceManager.serviceManager ! Lookup("accountIntermediary") + ServiceManager.receptionist ! Receptionist.Find(SocketPane.SocketPaneKey, context.self) - override def postStop() = { + override def postStop(): Unit = { if (updateServerListTask != null) updateServerListTask.cancel() } @@ -79,42 +71,46 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne def receive: Receive = { case ServiceManager.LookupResult("accountIntermediary", endpoint) => accountIntermediary = endpoint + + case SocketPane.SocketPaneKey.Listing(listings) => + sockets = listings.head + case ReceiveIPAddress(address) => ipAddress = address.Address hostName = address.HostName canonicalHostName = address.CanonicalHostName port = address.Port + case UpdateServerList() => updateServerList() + case packet: PlanetSideGamePacket => handleGamePkt(packet) - case default => failWithError(s"Invalid packet class received: $default") + + case SocketPane.NextPort(_, address, portNum) => + val response = ConnectToWorldMessage(serverName, address.getHostAddress, portNum) + middlewareActor ! MiddlewareActor.Send(response) + middlewareActor ! MiddlewareActor.Close() + + case default => + failWithError(s"Invalid packet class received: $default") } - def handleGamePkt(pkt: PlanetSideGamePacket) = + def handleGamePkt(pkt: PlanetSideGamePacket): Unit = pkt match { case LoginMessage(majorVersion, minorVersion, buildDate, username, password, token, revision) => // TODO: prevent multiple LoginMessages from being processed in a row!! We need a state machine - val clientVersion = s"Client Version: $majorVersion.$minorVersion.$revision, $buildDate" - if (token.isDefined) log.debug(s"New login UN:$username Token:${token.get}. $clientVersion") else { log.debug(s"New login UN:$username. $clientVersion") } - - getAccountLogin(username, password, token) + requestAccountLogin(username, password, token) case ConnectToWorldRequestMessage(name, _, _, _, _, _, _, _) => log.info(s"Connect to world request for '$name'") - val response = ConnectToWorldMessage( - serverName, - publicAddress.getAddress.getHostAddress, - SocketPane.Rotation.NextPort - ) - middlewareActor ! MiddlewareActor.Send(response) - middlewareActor ! MiddlewareActor.Close() + sockets ! SocketPane.GetNextPort("world", context.self) case _ => log.warning(s"Unhandled GamePacket $pkt") @@ -123,23 +119,19 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne // generates a password from username and password combination // mimics the process the launcher follows and hashes the password salted by the username def generateNewPassword(username: String, password: String): String = { - // salt password hash with username (like the launcher does) (username + password) val saltedPassword = username.concat(password) - // https://stackoverflow.com/a/46332228 // hash password (like the launcher sends) val hashedPassword = MessageDigest.getInstance("SHA-256") .digest(saltedPassword.getBytes("UTF-8")) .map("%02x".format(_)).mkString - // bcrypt hash for DB storage val bcryptedPassword = hashedPassword.bcryptBounded(bcryptRounds) - bcryptedPassword } - def getAccountLogin(username: String, passwordOpt: Option[String], tokenOpt: Option[String]): Unit = { + def requestAccountLogin(username: String, passwordOpt: Option[String], tokenOpt: Option[String]): Unit = { tokenOpt match { case Some(token) => accountLoginWithToken(token) case None => accountLogin(username, passwordOpt.getOrElse("")) @@ -147,9 +139,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne } def accountLogin(username: String, password: String): Unit = { - import ctx._ - val newToken = this.generateToken() val result = for { // backwards compatibility: prefer exact match first, then try lowercase @@ -160,21 +150,16 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne case Some(_) => Future.successful(Seq()) } - accountOption <- accountsExact.headOption orElse accountsLower.headOption match { - // account found case Some(account) => Future.successful(Some(account)) - // create new account case None => if (Config.app.login.createMissingAccounts) { - // generate bcrypted passwords val bcryptedPassword = generateNewPassword(username, password) val passhash = password.bcryptBounded(bcryptRounds) - // save bcrypted password hash to DB ctx.run( query[persistence.Account] @@ -187,7 +172,6 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne ) flatMap { id => ctx.run(query[persistence.Account].filter(_.id == lift(id))) } map { accounts => Some(accounts.head) } - } else { loginFailureResponse(username, newToken) Future.successful(None) @@ -196,14 +180,11 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne login <- accountOption match { case Some(account) => - // remember: this is the in client "StagingTest" login handling // the password is send in clear and needs to be checked against the "old" (only bcrypted) passhash // if there ever is a way to update the password in the future passhash and password need be updated (account.inactive, password.isBcryptedBounded(account.passhash)) match { - case (false, true) => - accountIntermediary ! StoreAccountData(newToken, Account(account.id, account.username, account.gm)) val future = ctx.run( query[persistence.Login].insert( @@ -214,14 +195,11 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne _.port -> lift(port) ) ) - // handle new password if (account.password == "") { - // generate bcrypted password // use username as provided by the user (db entry could be wrong), that is the way the launcher does it val bcryptedPassword = generateNewPassword(username, password) - // update account, set password ctx.run( query[persistence.Account] @@ -229,21 +207,21 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne .update(_.password -> lift(bcryptedPassword)) ) } - loginSuccessfulResponse(username, newToken) updateServerListTask = context.system.scheduler.scheduleWithFixedDelay(0 seconds, 5 seconds, self, UpdateServerList()) future + case (_, false) => loginPwdFailureResponse(username, newToken) Future.successful(None) + case (true, _) => loginAccountFailureResponse(username, newToken) Future.successful(None) } case None => Future.successful(None) } - } yield login result.onComplete { @@ -253,18 +231,12 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne } def accountLoginWithToken(token: String): Unit = { - import ctx._ - val newToken = this.generateToken() val result = for { - accountsExact <- ctx.run(query[persistence.Account].filter(_.token.getOrNull == lift(token))) - accountOption <- accountsExact.headOption match { - case Some(account) => - // token expires after 2 hours // new connections and players leaving a world server will return to desktop if (LocalDateTime.now().isAfter(account.tokenCreated.get.plusHours(2))) { @@ -282,9 +254,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne login <- accountOption match { case Some(account) => (account.inactive, account.token.getOrElse("") == token) match { - case (false, true) => - accountIntermediary ! StoreAccountData(newToken, Account(account.id, account.username, account.gm)) val future = ctx.run( query[persistence.Login].insert( @@ -295,7 +265,6 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne _.port -> lift(port) ) ) - loginSuccessfulResponseToken(account.username, token, newToken) updateServerListTask = context.system.scheduler.scheduleWithFixedDelay(0 seconds, 5 seconds, self, UpdateServerList()) @@ -309,10 +278,8 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne loginAccountFailureResponseToken(account.username, token, newToken) Future.successful(None) } - case None => Future.successful(None) } - } yield login result.onComplete { @@ -321,7 +288,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne } } - def loginSuccessfulResponse(username: String, newToken: String) = { + def loginSuccessfulResponse(username: String, newToken: String): Unit = { middlewareActor ! MiddlewareActor.Send( LoginRespMessage( newToken, @@ -335,10 +302,8 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne ) } - def loginSuccessfulResponseToken(username: String, token: String, newToken: String) = { - + def loginSuccessfulResponseToken(username: String, token: String, newToken: String): Unit = { log.info(s"User $username logged in unsing token $token") - middlewareActor ! MiddlewareActor.Send( LoginRespMessage( newToken, @@ -352,7 +317,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne ) } - def loginPwdFailureResponse(username: String, newToken: String) = { + def loginPwdFailureResponse(username: String, newToken: String): Unit = { log.warning(s"Failed login to account $username") middlewareActor ! MiddlewareActor.Send( LoginRespMessage( @@ -367,7 +332,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne ) } - def loginFailureResponseToken(token: String, newToken: String) = { + def loginFailureResponseToken(token: String, newToken: String): Unit = { log.warning(s"Failed login using unknown token $token") middlewareActor ! MiddlewareActor.Send( LoginRespMessage( @@ -382,7 +347,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne ) } - def loginFailureResponseTokenExpired(token: String, newToken: String) = { + def loginFailureResponseTokenExpired(token: String, newToken: String): Unit = { log.warning(s"Failed login using expired token $token") middlewareActor ! MiddlewareActor.Send( LoginRespMessage( @@ -397,7 +362,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne ) } - def loginFailureResponse(username: String, newToken: String) = { + def loginFailureResponse(username: String, newToken: String): Unit = { log.warning(s"DB problem username: $username") middlewareActor ! MiddlewareActor.Send( LoginRespMessage( @@ -412,7 +377,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne ) } - def loginFailureResponseToken(username: String, token: String, newToken: String) = { + def loginFailureResponseToken(username: String, token: String, newToken: String): Unit = { log.warning(s"DB problem username $username token: $token") middlewareActor ! MiddlewareActor.Send( LoginRespMessage( @@ -427,7 +392,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne ) } - def loginAccountFailureResponse(username: String, newToken: String) = { + def loginAccountFailureResponse(username: String, newToken: String): Unit = { log.warning(s"Account $username inactive") middlewareActor ! MiddlewareActor.Send( LoginRespMessage( @@ -442,7 +407,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne ) } - def loginAccountFailureResponseToken(username: String, token: String, newToken: String) = { + def loginAccountFailureResponseToken(username: String, token: String, newToken: String): Unit = { log.warning(s"Account $username inactive token: $token ") middlewareActor ! MiddlewareActor.Send( LoginRespMessage( @@ -457,16 +422,16 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne ) } - def generateToken() = { + def generateToken(): String = { val r = new scala.util.Random - val sb = new StringBuilder + val sb = new mutable.StringBuilder for (_ <- 1 to 31) { sb.append(r.nextPrintableChar()) } sb.toString } - def updateServerList() = { + def updateServerList(): Unit = { middlewareActor ! MiddlewareActor.Send( VNLWorldStatusMessage( "Welcome to PlanetSide! ", @@ -475,7 +440,7 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne serverName, WorldStatus.Up, Config.app.world.serverType, - Vector(WorldConnectionInfo(publicAddress)), + Vector(WorldConnectionInfo(publicAddress)), //todo ideally, ask for info from SocketPane PlanetSideEmpire.VS ) ) @@ -487,5 +452,4 @@ class LoginActor(middlewareActor: typed.ActorRef[MiddlewareActor.Command], conne log.error(error) middlewareActor ! MiddlewareActor.Close() } - } diff --git a/src/main/scala/net/psforever/actors/net/NetworkSimulator.scala b/src/main/scala/net/psforever/actors/net/NetworkSimulator.scala new file mode 100644 index 000000000..3f2e35104 --- /dev/null +++ b/src/main/scala/net/psforever/actors/net/NetworkSimulator.scala @@ -0,0 +1,58 @@ +// Copyright (c) 2024 PSForever +package net.psforever.actors.net + +import akka.actor.typed.{ActorRef, Behavior} +import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors} +import akka.io.Udp +import net.psforever.util.Config + +import java.util.concurrent.ThreadLocalRandom +import scala.util.Random +import scala.concurrent.duration._ + +// TODO? This doesn't quite support all parameters of the old network simulator +// Need to decide wheter they are necessary or not +// https://github.com/psforever/PSF-LoginServer/blob/07f447c2344ab55d581317316c41571772ac2242/src/main/scala/net/psforever/login/UdpNetworkSimulator.scala +private object NetworkSimulator { + def apply(socketActor: ActorRef[SocketActor.Command]): Behavior[Udp.Message] = + Behaviors.setup(context => new NetworkSimulator(context, socketActor)) +} + +private class NetworkSimulator(context: ActorContext[Udp.Message], socketActor: ActorRef[SocketActor.Command]) + extends AbstractBehavior[Udp.Message](context) { + + private[this] val log = org.log4s.getLogger + + override def onMessage(message: Udp.Message): Behavior[Udp.Message] = { + message match { + case _: Udp.Received | _: Udp.Send => + simulate(message) + Behaviors.same + case _ => + socketActor ! toSocket(message) + Behaviors.same + } + } + + def simulate(message: Udp.Message): Unit = { + if (Random.nextDouble() > Config.app.development.netSim.loss) { + if (Random.nextDouble() <= Config.app.development.netSim.reorderChance) { + context.scheduleOnce( + ThreadLocalRandom.current().nextDouble(0.01, 0.2).seconds, + socketActor, + toSocket(message) + ) + } else { + socketActor ! toSocket(message) + } + } else { + log.trace("Network simulator dropped packet") + } + } + + def toSocket(message: Udp.Message): SocketActor.Command = + message match { + case message: Udp.Command => SocketActor.UdpCommandMessage(message) + case message: Udp.Event => SocketActor.UdpEventMessage(message) + } +} diff --git a/src/main/scala/net/psforever/actors/net/SocketActor.scala b/src/main/scala/net/psforever/actors/net/SocketActor.scala index 88150a4ba..09ca2e26d 100644 --- a/src/main/scala/net/psforever/actors/net/SocketActor.scala +++ b/src/main/scala/net/psforever/actors/net/SocketActor.scala @@ -3,11 +3,10 @@ package net.psforever.actors.net import java.net.InetSocketAddress import java.security.SecureRandom import java.util.UUID.randomUUID -import java.util.concurrent.ThreadLocalRandom import akka.actor.Cancellable import akka.{actor => classic} import akka.actor.typed.{ActorRef, ActorTags, Behavior, PostStop, Terminated} -import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors} +import akka.actor.typed.scaladsl.{ActorContext, Behaviors} import akka.io.{IO, Udp} import akka.actor.typed.scaladsl.adapter._ import net.psforever.packet.PlanetSidePacket @@ -16,12 +15,16 @@ import scodec.interop.akka.EnrichedByteString import scala.collection.mutable import scala.concurrent.ExecutionContextExecutor -import scala.concurrent.duration.{DurationDouble, DurationInt} -import scala.util.Random +import scala.concurrent.duration.DurationInt -/** SocketActor creates a UDP socket, receives packets and forwards them to MiddlewareActor - * There is only one SocketActor, but each connected client gets its own MiddlewareActor - */ +/** + * Create a networking port attachment that accepts user datagram protocol (UDP) packets + * and forwards those packets to a business logic unit referred to herein as a "plan". + * The landing site of a plan is a processing entity of middleware logic that dissects composed packet data + * and pushes that further down the chain to the business logic. + * Each instance of middleware support and then business logic + * is associated with a unique clients that attempt to connect to the server though this socket port. + */ object SocketActor { def apply( address: InetSocketAddress, @@ -36,6 +39,8 @@ object SocketActor { private final case class UdpUnboundMessage(message: Udp.Unbound) extends Command private final case class Bound(socket: classic.ActorRef) extends Command private final case class StopChild(ref: ActorRef[MiddlewareActor.Command]) extends Command + final case class AskSocketLoad(replyTo: ActorRef[Any]) extends Command + final case class SocketLoad(sessions: Int) // Typed actors cannot access sender but you can only get the socket that way private class SenderHack(ref: ActorRef[SocketActor.Command]) extends classic.Actor { @@ -49,58 +54,11 @@ object SocketActor { } } -// TODO? This doesn't quite support all parameters of the old network simulator -// Need to decide wheter they are necessary or not -// https://github.com/psforever/PSF-LoginServer/blob/07f447c2344ab55d581317316c41571772ac2242/src/main/scala/net/psforever/login/UdpNetworkSimulator.scala -private object NetworkSimulator { - def apply(socketActor: ActorRef[SocketActor.Command]): Behavior[Udp.Message] = - Behaviors.setup(context => new NetworkSimulator(context, socketActor)) -} - -private class NetworkSimulator(context: ActorContext[Udp.Message], socketActor: ActorRef[SocketActor.Command]) - extends AbstractBehavior[Udp.Message](context) { - - private[this] val log = org.log4s.getLogger - - override def onMessage(message: Udp.Message): Behavior[Udp.Message] = { - message match { - case _: Udp.Received | _: Udp.Send => - simulate(message) - Behaviors.same - case _ => - socketActor ! toSocket(message) - Behaviors.same - } - } - - def simulate(message: Udp.Message): Unit = { - if (Random.nextDouble() > Config.app.development.netSim.loss) { - if (Random.nextDouble() <= Config.app.development.netSim.reorderChance) { - context.scheduleOnce( - ThreadLocalRandom.current().nextDouble(0.01, 0.2).seconds, - socketActor, - toSocket(message) - ) - } else { - socketActor ! toSocket(message) - } - } else { - log.trace("Network simulator dropped packet") - } - } - - def toSocket(message: Udp.Message): SocketActor.Command = - message match { - case message: Udp.Command => SocketActor.UdpCommandMessage(message) - case message: Udp.Event => SocketActor.UdpEventMessage(message) - } -} - class SocketActor( context: ActorContext[SocketActor.Command], address: InetSocketAddress, nextPlan: (ActorRef[MiddlewareActor.Command], InetSocketAddress, String) => Behavior[PlanetSidePacket] -) { + ) { import SocketActor._ import SocketActor.Command @@ -122,8 +80,8 @@ class SocketActor( context.spawnAnonymous(NetworkSimulator(context.self)) } - val updUnboundAdapter: ActorRef[Udp.Unbound] = context.messageAdapter[Udp.Unbound](UdpUnboundMessage) - val senderHack: classic.ActorRef = context.actorOf(classic.Props(new SenderHack(context.self))) + //val updUnboundAdapter: ActorRef[Udp.Unbound] = context.messageAdapter[Udp.Unbound](UdpUnboundMessage) + val senderHack: classic.ActorRef = context.actorOf(classic.Props(new SenderHack(context.self))) IO(Udp)(context.system.classicSystem).tell(Udp.Bind(updEventAdapter.toClassic, address), senderHack) @@ -207,13 +165,16 @@ class SocketActor( socket ! message Behaviors.same + case AskSocketLoad(replyTo) => + replyTo ! SocketLoad(packetActors.size) + Behaviors.same + case UdpUnboundMessage(_) => Behaviors.stopped case StopChild(ref) => context.stop(ref) Behaviors.same - } .receiveSignal { case (_, PostStop) => diff --git a/src/main/scala/net/psforever/actors/net/SocketPane.scala b/src/main/scala/net/psforever/actors/net/SocketPane.scala index f287ec205..aa163d926 100644 --- a/src/main/scala/net/psforever/actors/net/SocketPane.scala +++ b/src/main/scala/net/psforever/actors/net/SocketPane.scala @@ -1,22 +1,34 @@ // Copyright (c) 2024 PSForever package net.psforever.actors.net -import net.psforever.util.Config +import akka.actor.typed.receptionist.{Receptionist, ServiceKey} import java.net.{InetAddress, InetSocketAddress} -//import akka.{actor => classic} +import akka.{actor => classic} import akka.actor.typed.{ActorRef, Behavior, PostStop} import akka.actor.typed.scaladsl.{ActorContext, Behaviors} import net.psforever.packet.PlanetSidePacket -final case class SocketPanePortRotation( - portNumbers: Seq[Int], - index: Int = 0 - ) { +/** + * For a given sequence of ports, + * cycle through port numbers until the last port number has been produced + * then start again from the first port number again. + * @param portNumbers the sequence of ports to be cycled between + * @param index optional; + * the starting index in the sequence of ports + */ +private[net] case class SocketPanePortRotation( + portNumbers: Seq[Int], + index: Int = 0 + ) { private var currentIndex: Int = index - def NextPort: Int = this.synchronized { - val out = portNumbers.lift(currentIndex).getOrElse(Config.app.world.port) + /** + * Retrieve the sequentially next port number. + * @return the next port number + */ + def NextPort: Int = { + val out = portNumbers.lift(currentIndex).orElse(portNumbers.headOption).getOrElse(0) currentIndex += 1 currentIndex %= portNumbers.size out @@ -24,73 +36,171 @@ final case class SocketPanePortRotation( } object SocketPanePortRotation { + /** + * Overwritten constructor for `SocketPanePortRotation` entities. + * Copy constructor, essentially, that retains the internal current rotation index. + * @param rotation the previous rotation entity + * @return a copy of the previous rotation entity + */ def apply(rotation: SocketPanePortRotation): SocketPanePortRotation = { SocketPanePortRotation(rotation.portNumbers, rotation.currentIndex) } + /** + * Overwritten constructor for `SocketPanePortRotation` entities. + * Adda new port to the list of ports but retain the internal current rotation index. + * @param rotation the previous rotation entity + * @param newPort the new port number + * @return a copy of the previous rotation entity with an additional port that can be selected + */ def apply(rotation: SocketPanePortRotation, newPort: Int): SocketPanePortRotation = { SocketPanePortRotation(rotation.portNumbers :+ newPort, rotation.currentIndex) } } -/** SocketActor creates a UDP socket, receives packets and forwards them to MiddlewareActor - * There is only one SocketActor, but each connected client gets its own MiddlewareActor +/** + * Information explaining how to manage a socket group. + * @param groupId moniker for the group + * @param info how the sockets in this group operate + * @param initial should this socket group be protected as "original"; + * defaults to `false` */ +final case class SocketSetup(groupId: String, info: SocketSetupInfo, initial: Boolean = false) { + assert(info.ports.nonEmpty, s"port group $groupId should define port numbers") + /* the same port can belong to multiple groups, but the same port may not be repeated in the same group */ + assert(info.ports.size == info.ports.distinct.size, s"port group $groupId should not contain duplicate port numbers") +} + +/** + * Information explaining the details of a socket group. + * @param address Internet protocol location of the host + * @param ports network ports that are used as endpoints of transmission; + * corresponds to a number of sockets + * @param planOfAction whenever a new connection across a socket is made, the method of consuming network packets + */ +final case class SocketSetupInfo( + address: InetAddress, + ports: Seq[Int], + planOfAction: (ActorRef[MiddlewareActor.Command], InetSocketAddress, String) => Behavior[PlanetSidePacket] + ) + object SocketPane { - def apply( - address: InetAddress, - nextPlan: (ActorRef[MiddlewareActor.Command], InetSocketAddress, String) => Behavior[PlanetSidePacket] - ): Behavior[Command] = - Behaviors.setup(context => new SocketPane(context, address, nextPlan).start()) + val SocketPaneKey: ServiceKey[Command] = ServiceKey[SocketPane.Command](id = "socketPane") + + /** + * Overwritten constructor for `SocketPane` entities. + * Registers the entity with the actor receptionist. + * @param setupInfo the details of the socket groups + * @return a `SocketPane` entity + */ + def apply(setupInfo: Seq[SocketSetup]): Behavior[Command] = { + Behaviors.setup { context => + context.system.receptionist ! Receptionist.Register(SocketPaneKey, context.self) + new SocketPane(context, setupInfo).start() + } + } sealed trait Command - final case class CreateNewSocket(port: Int) extends Command + final case class CreateNewSocket(groupId: String, port: Int) extends Command - private var rotation: SocketPanePortRotation = SocketPanePortRotation(Array(Config.app.world.port)) + final case class CreateNewSocketGroup(groupId: String, info: SocketSetupInfo) extends Command - def Rotation: SocketPanePortRotation = SocketPanePortRotation(rotation) + final case class GetNextPort(groupId: String, replyTo: classic.ActorRef) extends Command - final def getDefaultPorts: Seq[Int] = { - val config = Config.app.world - (config.port +: config.ports).distinct - } + final case class NextPort(groupId: String, address: InetAddress, port: Int) } +/** + * Management of sockets connecting to the network ports. + *

+ * Connections to the networking ports are handled by the logic imposed by this class + * and are handled by sockets that bind to those ports and accept or pass packets to game logic. + * This game logic is encapsulated by an anonymous function + * that is automatically implemented into a game logic machine (consumption and production) + * upon unique connections detected / attempted across those sockets. + * Multiple sockets can connect to the same port so no compensation is required. + *

+ * New sockets to ports can be added to existing groups after the initial socket groups. + * New socket groups can be created after the initial information. + * @param context hook for setting up the sockets and, eventually, their packet logic + * @param initialPortSetup the details of the socket groups + */ class SocketPane( context: ActorContext[SocketPane.Command], - address: InetAddress, - next: (ActorRef[MiddlewareActor.Command], InetSocketAddress, String) => Behavior[PlanetSidePacket] + private val initialPortSetup: Seq[SocketSetup] ) { - private[this] val log = org.log4s.getLogger + private[this] val log = org.log4s.getLogger("SocketPane") - private var socketActors: Array[ActorRef[SocketActor.Command]] = SocketPane.getDefaultPorts.map { i => - context.spawn(SocketActor(new InetSocketAddress(address, i), next), name=s"world-socket-$i") + /** original socket group information, now properly flagged as "original" */ + private var socketConfigs: Seq[SocketSetup] = initialPortSetup.map { setup => setup.copy(initial = true) } + /** all sockets produced by the socket group information and any later socket creation commands */ + private var socketActors: Array[ActorRef[SocketActor.Command]] = initialPortSetup.flatMap { + case SocketSetup(_, SocketSetupInfo(address, ports, plan), _) => + ports.map { portNum => context.spawn(SocketActor(new InetSocketAddress(address, portNum), plan), name=s"world-socket-$portNum") } + }.toArray + /** load balancing for redirecting newly discovered packet input to different sockets (ports); + * should be referenced externally to switch sockets; + * see SocketActor.GGetNextPort */ + private var socketRotations: Array[SocketPanePortRotation] = initialPortSetup.map { + case SocketSetup(_, SocketSetupInfo(_, ports, _), _) => SocketPanePortRotation(ports) }.toArray - SocketPane.rotation = SocketPanePortRotation(SocketPane.getDefaultPorts) - log.info(s"Configured ${SocketPane.getDefaultPorts.size} game world instance ports") + log.info(s"configured ${socketActors.length} ports initially") def start(): Behavior[SocketPane.Command] = { Behaviors .receiveMessagePartial[SocketPane.Command] { - case SocketPane.CreateNewSocket(port) - if SocketPane.Rotation.portNumbers.contains(port) => + case SocketPane.CreateNewSocket(key, _) + if !socketConfigs.exists { setup => setup.groupId == key } => + log.warn(s"new port group $key does not exist and can not be appended to") Behaviors.same - case SocketPane.CreateNewSocket(port) => - socketActors = socketActors :+ context.spawn(SocketActor(new InetSocketAddress(address, port), next), name=s"world-socket-$port") - SocketPane.rotation = SocketPanePortRotation(SocketPane.rotation, port) - log.info(s"Requested new socket to port $port has been created") + case SocketPane.CreateNewSocket(groupId, port) + if socketConfigs + .find { setup => setup.groupId == groupId } + .exists { case SocketSetup(_, SocketSetupInfo(_, ports, _), _) => ports.contains(port) } => + log.info(s"new port $port for group $groupId already supported") + Behaviors.same + + case SocketPane.CreateNewSocket(groupId, port) => + log.info(s"new socket to port $port created in $groupId") + val index = socketConfigs.indexWhere { setup => setup.groupId == groupId } + val SocketSetup(_, SocketSetupInfo(address, ports, plan), _) = socketConfigs(index) + socketActors = socketActors :+ + context.spawn(SocketActor(new InetSocketAddress(address, port), plan), name=s"world-socket-$port") + socketConfigs = (socketConfigs.take(index) :+ SocketSetup(groupId, SocketSetupInfo(address, ports :+ port, plan))) ++ + socketConfigs.drop(index + 1) + socketRotations = (socketRotations.take(index) :+ SocketPanePortRotation(socketRotations(index), port)) ++ + socketRotations.drop(index + 1) Behaviors.same - case _ => + case SocketPane.CreateNewSocketGroup(groupId, _) + if socketConfigs.exists { case SocketSetup(oldKey, _, _) => oldKey == groupId } => + log.warn(s"port group $groupId already exists and can not be created twice") + Behaviors.same + + case SocketPane.CreateNewSocketGroup(groupId, info @ SocketSetupInfo(address, ports, plan)) => + socketActors = socketActors ++ + ports.map { portNum => context.spawn(SocketActor(new InetSocketAddress(address, portNum), plan), name=s"world-socket-$portNum") } + socketConfigs = socketConfigs :+ + SocketSetup(groupId, info) + socketRotations = socketRotations :+ + SocketPanePortRotation(ports) + Behaviors.same + + case SocketPane.GetNextPort(groupId, replyTo) => + socketConfigs.indexWhere { setup => setup.groupId == groupId } match { + case -1 => + log.warn(s"port group $groupId does not exist") + case index => + replyTo ! SocketPane.NextPort(groupId, socketConfigs(index).info.address, socketRotations(index).NextPort) + } Behaviors.same } .receiveSignal { case (_, PostStop) => socketActors.foreach(context.stop) - SocketPane.rotation = SocketPanePortRotation(Array(Config.app.world.port)) Behaviors.same } } diff --git a/src/main/scala/net/psforever/services/account/AccountIntermediaryService.scala b/src/main/scala/net/psforever/services/account/AccountIntermediaryService.scala index 8db0d20a2..3f884bfa6 100644 --- a/src/main/scala/net/psforever/services/account/AccountIntermediaryService.scala +++ b/src/main/scala/net/psforever/services/account/AccountIntermediaryService.scala @@ -25,7 +25,7 @@ class AccountIntermediaryService extends Actor { private val IPAddressBySessionID = mutable.Map[Long, IPAddress]() private[this] val log = org.log4s.getLogger - def receive = { + def receive: Receive = { // Called by the LoginSessionActor case StoreAccountData(token, account) => accountsByToken += (token -> account)