diff --git a/build.sbt b/build.sbt index ae991ac28..80d2acaa1 100644 --- a/build.sbt +++ b/build.sbt @@ -15,7 +15,8 @@ lazy val commonSettings = Seq( "ch.qos.logback" % "logback-classic" % "1.1.7", "org.log4s" %% "log4s" % "1.3.0", "org.fusesource.jansi" % "jansi" % "1.12", - "org.scoverage" %% "scalac-scoverage-plugin" % "1.1.1" + "org.scoverage" %% "scalac-scoverage-plugin" % "1.1.1", + "com.github.nscala-time" %% "nscala-time" % "2.12.0" ) ) diff --git a/common/src/main/scala/net/psforever/packet/ControlPacketOpcode.scala b/common/src/main/scala/net/psforever/packet/ControlPacketOpcode.scala index e52a6f45d..d333ba580 100644 --- a/common/src/main/scala/net/psforever/packet/ControlPacketOpcode.scala +++ b/common/src/main/scala/net/psforever/packet/ControlPacketOpcode.scala @@ -17,7 +17,7 @@ object ControlPacketOpcode extends Enumeration { ServerStart, // second packet sent in response to ClientStart MultiPacket, // used to send multiple packets with one UDP message (subpackets limited to <= 255) Unknown4, - Unknown5, + TeardownConnection, Unknown6, ControlSync, // sent to the server from the client ControlSyncResp, // the response generated by the server @@ -61,7 +61,7 @@ object ControlPacketOpcode extends Enumeration { case ClientStart => control.ClientStart.decode case MultiPacket => control.MultiPacket.decode case Unknown4 => noDecoder(opcode) - case Unknown5 => noDecoder(opcode) + case TeardownConnection => control.TeardownConnection.decode case Unknown6 => noDecoder(opcode) case ControlSync => control.ControlSync.decode case ControlSyncResp => control.ControlSyncResp.decode diff --git a/common/src/main/scala/net/psforever/packet/control/TeardownConnection.scala b/common/src/main/scala/net/psforever/packet/control/TeardownConnection.scala new file mode 100644 index 000000000..52bc6e20e --- /dev/null +++ b/common/src/main/scala/net/psforever/packet/control/TeardownConnection.scala @@ -0,0 +1,19 @@ +// Copyright (c) 2016 PSForever.net to present +package net.psforever.packet.control + +import net.psforever.packet.{ControlPacketOpcode, Marshallable, PacketHelpers, PlanetSideControlPacket} +import scodec.Codec +import scodec.codecs._ + +final case class TeardownConnection(targetNonce : Long) extends PlanetSideControlPacket { + type Packet = TeardownConnection + def opcode = ControlPacketOpcode.TeardownConnection + def encode = TeardownConnection.encode(this) +} + +object TeardownConnection extends Marshallable[TeardownConnection] { + implicit val codec : Codec[TeardownConnection] = ( + ("nonce" | uint32L) :: + ("unk" | uint16).unit(6) + ).as[TeardownConnection] +} \ No newline at end of file diff --git a/common/src/test/scala/ControlPacketTest.scala b/common/src/test/scala/ControlPacketTest.scala index 5dc9ca201..b809253b0 100644 --- a/common/src/test/scala/ControlPacketTest.scala +++ b/common/src/test/scala/ControlPacketTest.scala @@ -167,5 +167,24 @@ class ControlPacketTest extends Specification { } } } + + "TeardownConnection" should { + val string = hex"00 05 02 4F 57 17 00 06" + + "decode" in { + PacketCoding.DecodePacket(string).require match { + case TeardownConnection(nonce) => + nonce mustEqual 391597826 + case default => + ko + } + } + + "encode" in { + val encoded = PacketCoding.EncodePacket(TeardownConnection(391597826)).require + + encoded.toByteVector mustEqual string + } + } } } \ No newline at end of file diff --git a/pslogin/src/main/scala/CryptoSessionActor.scala b/pslogin/src/main/scala/CryptoSessionActor.scala index c4a7ec1b4..77a542f12 100644 --- a/pslogin/src/main/scala/CryptoSessionActor.scala +++ b/pslogin/src/main/scala/CryptoSessionActor.scala @@ -11,12 +11,15 @@ import scodec.{Attempt, Codec, Err} import scodec.codecs.{bytes, uint16L, uint8L} import java.security.SecureRandom -import net.psforever.packet.control.{ClientStart, ServerStart} +import net.psforever.packet.control.{ClientStart, ServerStart, TeardownConnection} import net.psforever.packet.crypto._ import net.psforever.packet.game.PingMsg import org.log4s.MDC import MDCContextAware.Implicits._ +sealed trait CryptoSessionAPI +final case class DropCryptoSession() extends CryptoSessionAPI + /** * Actor that stores crypto state for a connection, appropriately encrypts and decrypts packets, * and passes packets along to the next hop once processed. @@ -41,6 +44,9 @@ class CryptoSessionActor extends Actor with MDCContextAware { var clientChallenge = ByteVector.empty var clientChallengeResult = ByteVector.empty + var clientNonce : Long = 0 + var serverNonce : Long = 0 + // Don't leak crypto object memory even on an exception override def postStop() = { cleanupCrypto() @@ -74,7 +80,9 @@ class CryptoSessionActor extends Actor with MDCContextAware { p match { case ControlPacket(_, ClientStart(nonce)) => - sendResponse(PacketCoding.CreateControlPacket(ServerStart(nonce, Math.abs(random.nextInt())))) + clientNonce = nonce + serverNonce = Math.abs(random.nextInt()) + sendResponse(PacketCoding.CreateControlPacket(ServerStart(nonce, serverNonce))) context.become(CryptoExchange) case default => @@ -252,6 +260,14 @@ class CryptoSessionActor extends Actor with MDCContextAware { case Failure(e) => log.error("Could not decode raw packet: " + e) } } + case api : CryptoSessionAPI => + api match { + case DropCryptoSession() => + handleEstablishedPacket( + sender(), + PacketCoding.CreateControlPacket(TeardownConnection(clientNonce)) + ) + } case ctrl @ ControlPacket(_, _) => val from = sender() @@ -260,6 +276,8 @@ class CryptoSessionActor extends Actor with MDCContextAware { val from = sender() handleEstablishedPacket(from, game) + case sessionAPI : SessionRouterAPI => + leftRef !> sessionAPI case default => failWithError(s"Invalid message '$default' received in state Established") } diff --git a/pslogin/src/main/scala/LoginSessionActor.scala b/pslogin/src/main/scala/LoginSessionActor.scala index 29fcc93ec..6f291910d 100644 --- a/pslogin/src/main/scala/LoginSessionActor.scala +++ b/pslogin/src/main/scala/LoginSessionActor.scala @@ -1,7 +1,7 @@ // Copyright (c) 2016 PSForever.net to present import java.net.{InetAddress, InetSocketAddress} -import akka.actor.{Actor, ActorRef, MDCContextAware} +import akka.actor.{Actor, ActorRef, Cancellable, MDCContextAware} import net.psforever.packet.{PlanetSideGamePacket, _} import net.psforever.packet.control._ import net.psforever.packet.game._ @@ -10,17 +10,26 @@ import scodec.Attempt.{Failure, Successful} import scodec.bits._ import MDCContextAware.Implicits._ +import scala.concurrent.duration._ import scala.util.Random class LoginSessionActor extends Actor with MDCContextAware { private[this] val log = org.log4s.getLogger + import scala.concurrent.ExecutionContext.Implicits.global private case class UpdateServerList() var sessionId : Long = 0 var leftRef : ActorRef = ActorRef.noSender var rightRef : ActorRef = ActorRef.noSender + var updateServerListTask : Cancellable = null + + override def postStop() = { + if(updateServerListTask != null) + updateServerListTask.cancel() + } + def receive = Initializing def Initializing : Receive = { @@ -112,12 +121,14 @@ class LoginSessionActor extends Actor with MDCContextAware { 0, 1, 2, 685276011, username, 0, false) sendResponse(PacketCoding.CreateGamePacket(0, response)) - updateServerList() + + updateServerListTask = context.system.scheduler.schedule(0 seconds, 2 seconds, self, UpdateServerList()) case ConnectToWorldRequestMessage(name, _, _, _, _, _, _) => log.info(s"Connect to world request for '${name}'") val response = ConnectToWorldMessage(serverName, serverAddress.getHostString, serverAddress.getPort) sendResponse(PacketCoding.CreateGamePacket(0, response)) + sendResponse(DropSession(sessionId, "user transferring to world")) case default => log.debug(s"Unhandled GamePacket ${pkt}") } @@ -136,7 +147,7 @@ class LoginSessionActor extends Actor with MDCContextAware { //sendResponse(PacketCoding.CreateControlPacket(ConnectionClose())) } - def sendResponse(cont : PlanetSidePacketContainer) = { + def sendResponse(cont : Any) = { log.trace("LOGIN SEND: " + cont) MDC("sessionId") = sessionId.toString diff --git a/pslogin/src/main/scala/PsLogin.scala b/pslogin/src/main/scala/PsLogin.scala index 2614b10bf..b73d79e62 100644 --- a/pslogin/src/main/scala/PsLogin.scala +++ b/pslogin/src/main/scala/PsLogin.scala @@ -174,9 +174,9 @@ object PsLogin { val worldServerPort = 51001 /** Create two actors for handling the login and world server endpoints */ - val listener = system.actorOf(Props(new UdpListener(Props(new SessionRouter(loginTemplate)), "login-session-router", + val listener = system.actorOf(Props(new UdpListener(Props(new SessionRouter("Login", loginTemplate)), "login-session-router", LoginConfig.serverIpAddress, loginServerPort)), "login-udp-endpoint") - val worldListener = system.actorOf(Props(new UdpListener(Props(new SessionRouter(worldTemplate)), "world-session-router", + val worldListener = system.actorOf(Props(new UdpListener(Props(new SessionRouter("World", worldTemplate)), "world-session-router", LoginConfig.serverIpAddress, worldServerPort)), "world-udp-endpoint") logger.info(s"NOTE: Set client.ini to point to ${LoginConfig.serverIpAddress.getHostAddress}:$loginServerPort") diff --git a/pslogin/src/main/scala/Session.scala b/pslogin/src/main/scala/Session.scala new file mode 100644 index 000000000..a4a2c8fda --- /dev/null +++ b/pslogin/src/main/scala/Session.scala @@ -0,0 +1,101 @@ +// Copyright (c) 2016 PSForever.net to present + +import java.net.InetSocketAddress + +import akka.actor._ +import org.log4s.MDC +import scodec.bits._ + +import akka.actor.{ActorContext, ActorRef, PoisonPill} +import com.github.nscala_time.time.Imports._ +import MDCContextAware.Implicits._ + +sealed trait SessionState +final case class New() extends SessionState +final case class Related() extends SessionState +final case class Handshaking() extends SessionState +final case class Established() extends SessionState +final case class Closing() extends SessionState +final case class Closed() extends SessionState + + +class Session(val sessionId : Long, + val socketAddress : InetSocketAddress, + returnActor : ActorRef, + sessionPipeline : List[SessionPipeline]) + (implicit val context: ActorContext, implicit val self : ActorRef) { + + var state : SessionState = New() + val sessionCreatedTime : DateTime = DateTime.now + var sessionEndedTime : DateTime = DateTime.now + + val pipeline = sessionPipeline.map { actor => + val a = context.actorOf(actor.props, actor.nameTemplate + sessionId.toString) + context.watch(a) + a + } + + pipeline.head ! HelloFriend(sessionId, pipeline.tail.head) + + // statistics + var bytesSent : Long = 0 + var bytesReceived : Long = 0 + var inboundPackets : Long = 0 + var outboundPackets : Long = 0 + + var lastInboundEvent : Long = System.nanoTime() + var lastOutboundEvent : Long = System.nanoTime() + + var inboundPacketRate : Double = 0.0 + var outboundPacketRate : Double = 0.0 + var inboundBytesPerSecond : Double = 0.0 + var outboundBytesPerSecond : Double = 0.0 + + def receive(packet : RawPacket) : Unit = { + bytesReceived += packet.data.size + inboundPackets += 1 + lastInboundEvent = System.nanoTime() + + pipeline.head !> packet + } + + def send(packet : ByteVector) : Unit = { + bytesSent += packet.size + outboundPackets += 1 + lastOutboundEvent = System.nanoTime() + + returnActor ! SendPacket(packet, socketAddress) + } + + def dropSession(graceful : Boolean) = { + pipeline.foreach(context.unwatch) + pipeline.foreach(_ ! PoisonPill) + + sessionEndedTime = DateTime.now + setState(Closed()) + } + + def getState = state + + def setState(newState : SessionState) : Unit = { + state = newState + } + def getPipeline : List[ActorRef] = pipeline + + def getTotalBytes = { + bytesSent + bytesReceived + } + + def timeSinceLastInboundEvent = { + (System.nanoTime() - lastInboundEvent)/1000000 + } + + def timeSinceLastOutboundEvent = { + (System.nanoTime() - lastOutboundEvent)/1000000 + } + + + override def toString : String = { + s"Session($sessionId, $getTotalBytes)" + } +} diff --git a/pslogin/src/main/scala/SessionRouter.scala b/pslogin/src/main/scala/SessionRouter.scala index 1df8006e5..3b420e9f0 100644 --- a/pslogin/src/main/scala/SessionRouter.scala +++ b/pslogin/src/main/scala/SessionRouter.scala @@ -9,23 +9,43 @@ import scala.collection.mutable import MDCContextAware.Implicits._ import akka.actor.MDCContextAware.MdcMsg import akka.actor.SupervisorStrategy.Stop +import net.psforever.packet.PacketCoding +import net.psforever.packet.control.ConnectionClose -final case class RawPacket(data : ByteVector) -final case class ResponsePacket(data : ByteVector) +import scala.concurrent.duration._ -case class SessionState(id : Long, address : InetSocketAddress, pipeline : List[ActorRef]) { - def startOfPipe = pipeline.head - def nextOfStart = pipeline.tail.head -} +sealed trait SessionRouterAPI +final case class RawPacket(data : ByteVector) extends SessionRouterAPI +final case class ResponsePacket(data : ByteVector) extends SessionRouterAPI +final case class DropSession(id : Long, reason : String) extends SessionRouterAPI +final case class SessionReaper() extends SessionRouterAPI case class SessionPipeline(nameTemplate : String, props : Props) -class SessionRouter(pipeline : List[SessionPipeline]) extends Actor with MDCContextAware { +/** + * Login sessions are divided between two actors. The crypto session actor transparently handles all of the cryptographic + * setup of the connection. Once a correct crypto session has been established, all packets, after being decrypted + * will be passed on to the login session actor. This actor has important state that is used to maintain the login + * session. + * + * > PlanetSide Session Pipeline < + * + * read() route decrypt + * UDP Socket -----> [Session Router] -----> [Crypto Actor] -----> [Session Actor] + * ^ | ^ | ^ | + * | write() | | encrypt | | response | + * +--------------+ +-----------+ +-----------------+ + **/ +class SessionRouter(role : String, pipeline : List[SessionPipeline]) extends Actor with MDCContextAware { private[this] val log = org.log4s.getLogger(self.path.name) + import scala.concurrent.ExecutionContext.Implicits.global + val sessionReaper = context.system.scheduler.schedule(10 seconds, 5 seconds, self, SessionReaper()) + val idBySocket = mutable.Map[InetSocketAddress, Long]() - val sessionById = mutable.Map[Long, SessionState]() - val sessionByActor = mutable.Map[ActorRef, SessionState]() + val sessionById = mutable.Map[Long, Session]() + val sessionByActor = mutable.Map[ActorRef, Session]() + val closePacket = PacketCoding.EncodePacket(ConnectionClose()).require.bytes var sessionId = 0L // this is a connection session, not an actual logged in session ID var inputRef : ActorRef = ActorRef.noSender @@ -33,24 +53,9 @@ class SessionRouter(pipeline : List[SessionPipeline]) extends Actor with MDCCont override def supervisorStrategy = OneForOneStrategy() { case _ => Stop } override def preStart = { - log.info("SessionRouter started...ready for PlanetSide sessions") + log.info(s"SessionRouter started...ready for ${role} sessions") } - /** - * Login sessions are divided between two actors. The crypto session actor transparently handles all of the cryptographic - * setup of the connection. Once a correct crypto session has been established, all packets, after being decrypted - * will be passed on to the login session actor. This actor has important state that is used to maintain the login - * session. - * - * > PlanetSide Session Pipeline < - * - * read() route decrypt - * UDP Socket -----> [Session Router] -----> [Crypto Actor] -----> [Session Actor] - * ^ | ^ | ^ | - * | write() | | encrypt | | response | - * +--------------+ +-----------+ +-----------------+ - **/ - def receive = initializing def initializing : Receive = { @@ -62,47 +67,70 @@ class SessionRouter(pipeline : List[SessionPipeline]) extends Actor with MDCCont context.stop(self) } + override def postStop() = { + sessionReaper.cancel() + } + def started : Receive = { - case ReceivedPacket(msg, from) => - if(idBySocket.contains(from)) { - MDC("sessionId") = idBySocket{from}.toString + case recv @ ReceivedPacket(msg, from) => + var session : Session = null - log.trace(s"Handling received packet ${msg} -> ${sessionById{idBySocket{from}}.startOfPipe.path.name}") - sessionById{idBySocket{from}}.startOfPipe !> RawPacket(msg) - - MDC.clear() - } else { - val session = createNewSession(from) - idBySocket{from} = session.id - - sessionById{session.id} = session - sessionByActor{session.startOfPipe} = session - sessionByActor{session.nextOfStart} = session - - MDC("sessionId") = session.id.toString - - log.info("New session from " + from.toString) - - // send the initial message with MDC context (give the session ID to the lower layers) - sessionById{session.id}.startOfPipe !> HelloFriend(session.id, sessionById{session.id}.nextOfStart) - sessionById{session.id}.startOfPipe !> RawPacket(msg) + if(!idBySocket.contains(from)) { + session = createNewSession(from) + } + else { + val id = idBySocket{from} + session = sessionById{id} + } + if(session.state != Closed()) { + MDC("sessionId") = session.sessionId.toString + log.trace(s"RECV: ${msg} -> ${session.getPipeline.head.path.name}") + session.receive(RawPacket(msg)) MDC.clear() } case ResponsePacket(msg) => val session = sessionByActor.get(sender()) - if(session.isDefined) { - log.trace(s"Sending response ${msg}") - inputRef ! SendPacket(msg, session.get.address) + if(session.isDefined) { + if(session.get.state != Closed()) { + MDC("sessionId") = session.get.sessionId.toString + log.trace(s"SEND: ${msg} -> ${inputRef.path.name}") + session.get.send(msg) + MDC.clear() + } } else { - log.error("Dropped old response packet from session actor " + sender().path.name) + log.error("Dropped old response packet from actor " + sender().path.name) + } + case DropSession(id, reason) => + val session = sessionById.get(id) + + if(session.isDefined) { + removeSessionById(id, reason, graceful = true) + } else { + log.error(s"Requested to drop non-existent session ID=$id from ${sender()}") + } + case SessionReaper() => + sessionById.foreach { case (id, session) => + log.debug(session.toString) + if(session.getState == Closed()) { + // clear mappings + session.getPipeline.foreach(sessionByActor remove) + sessionById.remove(id) + idBySocket.remove(session.socketAddress) + log.debug(s"Reaped session ID=$id") + } else if(session.timeSinceLastInboundEvent > 10000) { + removeSessionById(id, "session timed out (inbound)", graceful = false) + } else if(session.timeSinceLastOutboundEvent > 4000) { + removeSessionById(id, "session timed out (outbound)", graceful = true) // tell client to STFU + } } case Terminated(actor) => val terminatedSession = sessionByActor.get(actor) if(terminatedSession.isDefined) { - removeSessionById(terminatedSession.get.id, s"${actor.path.name} died") + removeSessionById(terminatedSession.get.sessionId, s"${actor.path.name} died", + graceful = true) } else { log.error("Received an invalid actor Termination from " + actor.path.name) } @@ -112,33 +140,38 @@ class SessionRouter(pipeline : List[SessionPipeline]) extends Actor with MDCCont def createNewSession(address : InetSocketAddress) = { val id = newSessionId + val session = new Session(id, address, inputRef, pipeline) - // inflate the pipeline - val actors = pipeline.map { actor => - val a = context.actorOf(actor.props, actor.nameTemplate + id.toString) - context.watch(a) - a + // establish mappings for easy lookup + idBySocket{address} = id + sessionById{id} = session + + session.getPipeline.foreach { actor => + sessionByActor{actor} = session } - SessionState(id, address, actors) + log.info(s"New session ID=${id} from " + address.toString) + + session } - def removeSessionById(id : Long, reason : String) : Unit = { + def removeSessionById(id : Long, reason : String, graceful : Boolean) : Unit = { val sessionOption = sessionById.get(id) if(sessionOption.isEmpty) return - val session = sessionOption.get + val session : Session = sessionOption.get + + if(graceful) { + for(i <- 0 to 5) { + session.send(closePacket) + } + } - // TODO: add some sort of delay to prevent old session packets from coming through // kill all session specific actors - session.pipeline.foreach(_ ! PoisonPill) - session.pipeline.foreach(sessionByActor remove) - sessionById.remove(id) - idBySocket.remove(session.address) - - log.info(s"Stopping session ${id} (reason: $reason)") + session.dropSession(graceful) + log.info(s"Dropping session ID=${id} (reason: $reason)") } def newSessionId = { diff --git a/pslogin/src/main/scala/WorldSessionActor.scala b/pslogin/src/main/scala/WorldSessionActor.scala index 47c309298..70d9b0ead 100644 --- a/pslogin/src/main/scala/WorldSessionActor.scala +++ b/pslogin/src/main/scala/WorldSessionActor.scala @@ -1,7 +1,7 @@ // Copyright (c) 2016 PSForever.net to present import java.net.{InetAddress, InetSocketAddress} -import akka.actor.{Actor, ActorRef, MDCContextAware} +import akka.actor.{Actor, ActorRef, Cancellable, MDCContextAware} import net.psforever.packet.{PlanetSideGamePacket, _} import net.psforever.packet.control._ import net.psforever.packet.game._ @@ -9,6 +9,7 @@ import scodec.Attempt.{Failure, Successful} import scodec.bits._ import org.log4s.MDC import MDCContextAware.Implicits._ +import net.psforever.types.ChatMessageType class WorldSessionActor extends Actor with MDCContextAware { private[this] val log = org.log4s.getLogger @@ -19,6 +20,13 @@ class WorldSessionActor extends Actor with MDCContextAware { var leftRef : ActorRef = ActorRef.noSender var rightRef : ActorRef = ActorRef.noSender + var clientKeepAlive : Cancellable = null + + override def postStop() = { + if(clientKeepAlive != null) + clientKeepAlive.cancel() + } + def receive = Initializing def Initializing : Receive = { @@ -139,7 +147,7 @@ class WorldSessionActor extends Actor with MDCContextAware { import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global - context.system.scheduler.schedule(0 seconds, 1000 milliseconds, self, PokeClient()) + clientKeepAlive = context.system.scheduler.schedule(0 seconds, 1000 milliseconds, self, PokeClient()) } case default => log.error("Unsupported " + default + " in " + msg) @@ -160,6 +168,12 @@ class WorldSessionActor extends Actor with MDCContextAware { case msg @ ChatMsg(messagetype, has_wide_contents, recipient, contents, note_contents) => log.info("Chat: " + msg) + // TODO: handle this appropriately + if(messagetype == ChatMessageType.PopupQuit) { + sendResponse(DropCryptoSession()) + sendResponse(DropSession(sessionId, "user quit")) + } + // TODO: Depending on messagetype, may need to prepend sender's name to contents with proper spacing // TODO: Just replays the packet straight back to sender; actually needs to be routed to recipients! sendResponse(PacketCoding.CreateGamePacket(0, ChatMsg(messagetype, has_wide_contents, recipient, contents, note_contents))) @@ -235,17 +249,18 @@ class WorldSessionActor extends Actor with MDCContextAware { //sendResponse(PacketCoding.CreateControlPacket(ConnectionClose())) } - def sendResponse(cont : PlanetSidePacketContainer) = { + def sendResponse(cont : PlanetSidePacketContainer) : Unit = { log.trace("WORLD SEND: " + cont) + sendResponse(cont.asInstanceOf[Any]) + } + def sendResponse(msg : Any) : Unit = { MDC("sessionId") = sessionId.toString - rightRef !> cont + rightRef !> msg } def sendRawResponse(pkt : ByteVector) = { log.trace("WORLD SEND RAW: " + pkt) - - MDC("sessionId") = sessionId.toString - rightRef !> RawPacket(pkt) + sendResponse(RawPacket(pkt)) } }