PSF-LoginServer/pslogin/src/main/scala/SessionRouter.scala

150 lines
4.8 KiB
Scala
Raw Normal View History

2016-03-04 15:14:50 -05:00
// Copyright (c) 2016 PSForever.net to present
2016-02-05 03:19:13 -05:00
import java.net.InetSocketAddress
import akka.actor._
2016-05-01 21:12:42 -04:00
import org.log4s.MDC
2016-02-05 03:19:13 -05:00
import scodec.bits._
import scala.collection.mutable
2016-05-01 21:12:42 -04:00
import MDCContextAware.Implicits._
import akka.actor.MDCContextAware.MdcMsg
import akka.actor.SupervisorStrategy.Stop
2016-02-05 03:19:13 -05:00
final case class RawPacket(data : ByteVector)
final case class ResponsePacket(data : ByteVector)
case class SessionState(id : Long, address : InetSocketAddress, pipeline : List[ActorRef]) {
2016-05-01 21:12:42 -04:00
def startOfPipe = pipeline.head
def nextOfStart = pipeline.tail.head
}
2016-02-05 03:19:13 -05:00
2016-05-01 21:12:42 -04:00
class SessionRouter extends Actor with MDCContextAware {
private[this] val log = org.log4s.getLogger
val idBySocket = mutable.Map[InetSocketAddress, Long]()
val sessionById = mutable.Map[Long, SessionState]()
val sessionByActor = mutable.Map[ActorRef, SessionState]()
var sessionId = 0L // this is a connection session, not an actual logged in session ID
var inputRef : ActorRef = ActorRef.noSender
override def supervisorStrategy = OneForOneStrategy() { case _ => Stop }
override def preStart = {
log.info("SessionRouter started...ready for PlanetSide sessions")
}
/*
Login sessions are divided between two actors. the crypto session actor transparently handles all of the cryptographic
setup of the connection. Once a correct crypto session has been established, all packets, after being decrypted
will be passed on to the login session actor. This actor has important state that is used to maintain the login
session.
2016-02-05 03:19:13 -05:00
> PlanetSide Session Pipeline <
read() route decrypt
UDP Socket -----> [Session Router] -----> [Crypto Actor] -----> [Session Actor]
^ | ^ | ^ |
| write() | | encrypt | | response |
+--------------+ +-----------+ +-----------------+
*/
def receive = initializing
def initializing : Receive = {
case Hello() =>
inputRef = sender()
inputRef ! SendPacket(hex"41414141", new InetSocketAddress("8.8.8.8", 51000))
context.become(started)
case default =>
log.error(s"Unknown message $default. Stopping...")
context.stop(self)
}
def started : Receive = {
2016-02-05 03:19:13 -05:00
case ReceivedPacket(msg, from) =>
if(idBySocket.contains(from)) {
2016-05-01 21:12:42 -04:00
MDC("sessionId") = idBySocket{from}.toString
log.trace(s"Handling recieved packet")
sessionById{idBySocket{from}}.startOfPipe !> RawPacket(msg)
2016-02-05 03:19:13 -05:00
2016-05-01 21:12:42 -04:00
MDC.clear()
} else {
val session = createNewSession(from)
idBySocket{from} = session.id
2016-02-05 03:19:13 -05:00
sessionById{session.id} = session
sessionByActor{session.startOfPipe} = session
sessionByActor{session.nextOfStart} = session
2016-02-05 03:19:13 -05:00
2016-05-01 21:12:42 -04:00
MDC("sessionId") = session.id.toString
log.info("New session from " + from.toString)
// send the initial message with MDC context (give the session ID to the lower layers)
sessionById{session.id}.startOfPipe !> HelloFriend(sessionById{session.id}.nextOfStart)
sessionById{session.id}.startOfPipe !> RawPacket(msg)
2016-05-01 21:12:42 -04:00
MDC.clear()
2016-02-05 03:19:13 -05:00
}
case ResponsePacket(msg) =>
val session = sessionByActor.get(sender())
// drop any old queued messages from old actors
//if(session.isDefined) {
log.trace(s"Sending response ${msg}")
inputRef ! SendPacket(msg, session.get.address)
//}
case Terminated(actor) =>
val terminatedSession = sessionByActor.get(actor)
2016-05-01 21:12:42 -04:00
if(terminatedSession.isDefined) {
removeSessionById(terminatedSession.get.id, s"${actor.path.name} died")
}
case default =>
log.error(s"Unknown message $default")
2016-02-05 03:19:13 -05:00
}
def createNewSession(address : InetSocketAddress) = {
val id = newSessionId
val cryptoSession = context.actorOf(Props[CryptoSessionActor],
"crypto-session-" + id.toString)
val loginSession = context.actorOf(Props[LoginSessionActor],
"login-session-" + id.toString)
context.watch(cryptoSession)
context.watch(loginSession)
SessionState(id, address, List(cryptoSession, loginSession))
}
def removeSessionById(id : Long, reason : String) : Unit = {
val sessionOption = sessionById.get(id)
if(!sessionOption.isDefined)
return
val session = sessionOption.get
// TODO: add some sort of delay to prevent old session packets from coming through
// kill all session specific actors
session.pipeline.foreach(_ ! PoisonPill)
session.pipeline.foreach(sessionByActor remove _)
sessionById.remove(id)
idBySocket.remove(session.address)
log.info(s"Stopping session ${id} (reason: $reason)")
}
2016-02-05 03:19:13 -05:00
def newSessionId = {
val oldId = sessionId
sessionId += 1
oldId
}
}