Create advance session management and reaping

This commit is contained in:
Chord 2016-07-30 21:08:40 -04:00
parent c1257cb1ec
commit ae6687c38f
10 changed files with 301 additions and 84 deletions

View file

@ -9,23 +9,43 @@ import scala.collection.mutable
import MDCContextAware.Implicits._
import akka.actor.MDCContextAware.MdcMsg
import akka.actor.SupervisorStrategy.Stop
import net.psforever.packet.PacketCoding
import net.psforever.packet.control.ConnectionClose
final case class RawPacket(data : ByteVector)
final case class ResponsePacket(data : ByteVector)
import scala.concurrent.duration._
case class SessionState(id : Long, address : InetSocketAddress, pipeline : List[ActorRef]) {
def startOfPipe = pipeline.head
def nextOfStart = pipeline.tail.head
}
sealed trait SessionRouterAPI
final case class RawPacket(data : ByteVector) extends SessionRouterAPI
final case class ResponsePacket(data : ByteVector) extends SessionRouterAPI
final case class DropSession(id : Long, reason : String) extends SessionRouterAPI
final case class SessionReaper() extends SessionRouterAPI
case class SessionPipeline(nameTemplate : String, props : Props)
class SessionRouter(pipeline : List[SessionPipeline]) extends Actor with MDCContextAware {
/**
* Login sessions are divided between two actors. The crypto session actor transparently handles all of the cryptographic
* setup of the connection. Once a correct crypto session has been established, all packets, after being decrypted
* will be passed on to the login session actor. This actor has important state that is used to maintain the login
* session.
*
* > PlanetSide Session Pipeline <
*
* read() route decrypt
* UDP Socket -----> [Session Router] -----> [Crypto Actor] -----> [Session Actor]
* ^ | ^ | ^ |
* | write() | | encrypt | | response |
* +--------------+ +-----------+ +-----------------+
**/
class SessionRouter(role : String, pipeline : List[SessionPipeline]) extends Actor with MDCContextAware {
private[this] val log = org.log4s.getLogger(self.path.name)
import scala.concurrent.ExecutionContext.Implicits.global
val sessionReaper = context.system.scheduler.schedule(10 seconds, 5 seconds, self, SessionReaper())
val idBySocket = mutable.Map[InetSocketAddress, Long]()
val sessionById = mutable.Map[Long, SessionState]()
val sessionByActor = mutable.Map[ActorRef, SessionState]()
val sessionById = mutable.Map[Long, Session]()
val sessionByActor = mutable.Map[ActorRef, Session]()
val closePacket = PacketCoding.EncodePacket(ConnectionClose()).require.bytes
var sessionId = 0L // this is a connection session, not an actual logged in session ID
var inputRef : ActorRef = ActorRef.noSender
@ -33,24 +53,9 @@ class SessionRouter(pipeline : List[SessionPipeline]) extends Actor with MDCCont
override def supervisorStrategy = OneForOneStrategy() { case _ => Stop }
override def preStart = {
log.info("SessionRouter started...ready for PlanetSide sessions")
log.info(s"SessionRouter started...ready for ${role} sessions")
}
/**
* Login sessions are divided between two actors. The crypto session actor transparently handles all of the cryptographic
* setup of the connection. Once a correct crypto session has been established, all packets, after being decrypted
* will be passed on to the login session actor. This actor has important state that is used to maintain the login
* session.
*
* > PlanetSide Session Pipeline <
*
* read() route decrypt
* UDP Socket -----> [Session Router] -----> [Crypto Actor] -----> [Session Actor]
* ^ | ^ | ^ |
* | write() | | encrypt | | response |
* +--------------+ +-----------+ +-----------------+
**/
def receive = initializing
def initializing : Receive = {
@ -62,47 +67,70 @@ class SessionRouter(pipeline : List[SessionPipeline]) extends Actor with MDCCont
context.stop(self)
}
override def postStop() = {
sessionReaper.cancel()
}
def started : Receive = {
case ReceivedPacket(msg, from) =>
if(idBySocket.contains(from)) {
MDC("sessionId") = idBySocket{from}.toString
case recv @ ReceivedPacket(msg, from) =>
var session : Session = null
log.trace(s"Handling received packet ${msg} -> ${sessionById{idBySocket{from}}.startOfPipe.path.name}")
sessionById{idBySocket{from}}.startOfPipe !> RawPacket(msg)
MDC.clear()
} else {
val session = createNewSession(from)
idBySocket{from} = session.id
sessionById{session.id} = session
sessionByActor{session.startOfPipe} = session
sessionByActor{session.nextOfStart} = session
MDC("sessionId") = session.id.toString
log.info("New session from " + from.toString)
// send the initial message with MDC context (give the session ID to the lower layers)
sessionById{session.id}.startOfPipe !> HelloFriend(session.id, sessionById{session.id}.nextOfStart)
sessionById{session.id}.startOfPipe !> RawPacket(msg)
if(!idBySocket.contains(from)) {
session = createNewSession(from)
}
else {
val id = idBySocket{from}
session = sessionById{id}
}
if(session.state != Closed()) {
MDC("sessionId") = session.sessionId.toString
log.trace(s"RECV: ${msg} -> ${session.getPipeline.head.path.name}")
session.receive(RawPacket(msg))
MDC.clear()
}
case ResponsePacket(msg) =>
val session = sessionByActor.get(sender())
if(session.isDefined) {
log.trace(s"Sending response ${msg}")
inputRef ! SendPacket(msg, session.get.address)
if(session.isDefined) {
if(session.get.state != Closed()) {
MDC("sessionId") = session.get.sessionId.toString
log.trace(s"SEND: ${msg} -> ${inputRef.path.name}")
session.get.send(msg)
MDC.clear()
}
} else {
log.error("Dropped old response packet from session actor " + sender().path.name)
log.error("Dropped old response packet from actor " + sender().path.name)
}
case DropSession(id, reason) =>
val session = sessionById.get(id)
if(session.isDefined) {
removeSessionById(id, reason, graceful = true)
} else {
log.error(s"Requested to drop non-existent session ID=$id from ${sender()}")
}
case SessionReaper() =>
sessionById.foreach { case (id, session) =>
log.debug(session.toString)
if(session.getState == Closed()) {
// clear mappings
session.getPipeline.foreach(sessionByActor remove)
sessionById.remove(id)
idBySocket.remove(session.socketAddress)
log.debug(s"Reaped session ID=$id")
} else if(session.timeSinceLastInboundEvent > 10000) {
removeSessionById(id, "session timed out (inbound)", graceful = false)
} else if(session.timeSinceLastOutboundEvent > 4000) {
removeSessionById(id, "session timed out (outbound)", graceful = true) // tell client to STFU
}
}
case Terminated(actor) =>
val terminatedSession = sessionByActor.get(actor)
if(terminatedSession.isDefined) {
removeSessionById(terminatedSession.get.id, s"${actor.path.name} died")
removeSessionById(terminatedSession.get.sessionId, s"${actor.path.name} died",
graceful = true)
} else {
log.error("Received an invalid actor Termination from " + actor.path.name)
}
@ -112,33 +140,38 @@ class SessionRouter(pipeline : List[SessionPipeline]) extends Actor with MDCCont
def createNewSession(address : InetSocketAddress) = {
val id = newSessionId
val session = new Session(id, address, inputRef, pipeline)
// inflate the pipeline
val actors = pipeline.map { actor =>
val a = context.actorOf(actor.props, actor.nameTemplate + id.toString)
context.watch(a)
a
// establish mappings for easy lookup
idBySocket{address} = id
sessionById{id} = session
session.getPipeline.foreach { actor =>
sessionByActor{actor} = session
}
SessionState(id, address, actors)
log.info(s"New session ID=${id} from " + address.toString)
session
}
def removeSessionById(id : Long, reason : String) : Unit = {
def removeSessionById(id : Long, reason : String, graceful : Boolean) : Unit = {
val sessionOption = sessionById.get(id)
if(sessionOption.isEmpty)
return
val session = sessionOption.get
val session : Session = sessionOption.get
if(graceful) {
for(i <- 0 to 5) {
session.send(closePacket)
}
}
// TODO: add some sort of delay to prevent old session packets from coming through
// kill all session specific actors
session.pipeline.foreach(_ ! PoisonPill)
session.pipeline.foreach(sessionByActor remove)
sessionById.remove(id)
idBySocket.remove(session.address)
log.info(s"Stopping session ${id} (reason: $reason)")
session.dropSession(graceful)
log.info(s"Dropping session ID=${id} (reason: $reason)")
}
def newSessionId = {