Add SLF4J MDC session logging

This commit is contained in:
Chord 2016-05-01 21:12:42 -04:00
parent 2fc78f49a9
commit e299abb03a
7 changed files with 216 additions and 54 deletions

View file

@ -11,6 +11,7 @@ lazy val commonSettings = Seq(
"org.scodec" %% "scodec-core" % "1.8.3", "org.scodec" %% "scodec-core" % "1.8.3",
"org.scodec" %% "scodec-akka" % "0.1.0-SNAPSHOT", "org.scodec" %% "scodec-akka" % "0.1.0-SNAPSHOT",
"net.java.dev.jna" % "jna" % "4.2.1", "net.java.dev.jna" % "jna" % "4.2.1",
"com.typesafe.akka" %% "akka-slf4j" % "2.4.4",
"ch.qos.logback" % "logback-classic" % "1.1.7", "ch.qos.logback" % "logback-classic" % "1.1.7",
"org.log4s" %% "log4s" % "1.3.0", "org.log4s" %% "log4s" % "1.3.0",
"org.fusesource.jansi" % "jansi" % "1.12" "org.fusesource.jansi" % "jansi" % "1.12"

View file

@ -1,21 +1,21 @@
// Copyright (c) 2016 PSForever.net to present // Copyright (c) 2016 PSForever.net to present
import java.net.{InetAddress, InetSocketAddress} import java.net.{InetAddress, InetSocketAddress}
import akka.actor.{ActorRef, Identify, Actor} import akka.actor.{Actor, ActorLogging, ActorRef, DiagnosticActorLogging, Identify, MDCContextAware}
import psforever.crypto.CryptoInterface.{CryptoStateWithMAC, CryptoState} import psforever.crypto.CryptoInterface.{CryptoState, CryptoStateWithMAC}
import psforever.crypto.CryptoInterface import psforever.crypto.CryptoInterface
import psforever.net._ import psforever.net._
import scodec.Attempt.{Successful, Failure} import scodec.Attempt.{Failure, Successful}
import scodec.bits._ import scodec.bits._
import scodec.{Err, Attempt, Codec} import scodec.{Attempt, Codec, Err}
import scodec.codecs.{uint16L, uint8L, bytes} import scodec.codecs.{bytes, uint16L, uint8L}
import java.security.SecureRandom import java.security.SecureRandom
/** /**
* Actor that stores crypto state for a connection and filters away any packet metadata. * Actor that stores crypto state for a connection and filters away any packet metadata.
*/ */
class CryptoSessionActor extends Actor { class CryptoSessionActor extends Actor with MDCContextAware {
private[this] val logger = org.log4s.getLogger private[this] val log = org.log4s.getLogger
var leftRef : ActorRef = ActorRef.noSender var leftRef : ActorRef = ActorRef.noSender
var rightRef : ActorRef = ActorRef.noSender var rightRef : ActorRef = ActorRef.noSender
@ -37,23 +37,25 @@ class CryptoSessionActor extends Actor {
def Initializing : Receive = { def Initializing : Receive = {
case HelloFriend(right) => case HelloFriend(right) =>
import MDCContextAware.Implicits._
leftRef = sender() leftRef = sender()
rightRef = right.asInstanceOf[ActorRef] rightRef = right.asInstanceOf[ActorRef]
// who ever we send to has to send something back to us // who ever we send to has to send something back to us
rightRef ! HelloFriend(self) rightRef !> HelloFriend(self)
log.trace(s"Left sender ${leftRef.path.name}")
context.become(NewClient) context.become(NewClient)
case _ => case default =>
logger.error("Unknown message") log.error("Unknown message " + default)
context.stop(self) context.stop(self)
} }
def NewClient : Receive = { def NewClient : Receive = {
case RawPacket(msg) => case RawPacket(msg) =>
// PacketCoding.DecodePacket
PacketCoding.UnmarshalPacket(msg) match { PacketCoding.UnmarshalPacket(msg) match {
case Failure(e) => logger.error("Could not decode packet: " + e) case Failure(e) => log.error("Could not decode packet: " + e)
case Successful(p) => case Successful(p) =>
//println("RECV: " + p) //println("RECV: " + p)
@ -63,16 +65,16 @@ class CryptoSessionActor extends Actor {
context.become(CryptoExchange) context.become(CryptoExchange)
case default => case default =>
logger.error("Unexpected packet type " + p) log.error(s"Unexpected packet type ${p} in state NewClient")
} }
} }
case default => logger.error(s"Invalid message received ${default}") case default => log.error(s"Invalid message '$default' received in state NewClient")
} }
def CryptoExchange : Receive = { def CryptoExchange : Receive = {
case RawPacket(msg) => case RawPacket(msg) =>
PacketCoding.UnmarshalPacket(msg, CryptoPacketOpcode.ClientChallengeXchg) match { PacketCoding.UnmarshalPacket(msg, CryptoPacketOpcode.ClientChallengeXchg) match {
case Failure(e) => logger.error("Could not decode packet: " + e) case Failure(e) => log.error("Could not decode packet: " + e)
case Successful(p) => case Successful(p) =>
//println("RECV: " + p) //println("RECV: " + p)
@ -102,16 +104,16 @@ class CryptoSessionActor extends Actor {
serverMACBuffer ++= sentPacket.drop(3) serverMACBuffer ++= sentPacket.drop(3)
context.become(CryptoSetupFinishing) context.become(CryptoSetupFinishing)
case default => logger.error("Unexpected packet type " + p) case default => log.error(s"Unexpected packet type $p in state CryptoExchange")
} }
} }
case default => logger.error(s"Invalid message received ${default}") case default => log.error(s"Invalid message '$default' received in state CryptoExchange")
} }
def CryptoSetupFinishing : Receive = { def CryptoSetupFinishing : Receive = {
case RawPacket(msg) => case RawPacket(msg) =>
PacketCoding.UnmarshalPacket(msg, CryptoPacketOpcode.ClientFinished) match { PacketCoding.UnmarshalPacket(msg, CryptoPacketOpcode.ClientFinished) match {
case Failure(e) => logger.error("Could not decode packet: " + e) case Failure(e) => log.error("Could not decode packet: " + e)
case Successful(p) => case Successful(p) =>
//println("RECV: " + p) //println("RECV: " + p)
@ -186,10 +188,10 @@ class CryptoSessionActor extends Actor {
sendResponse(packet) sendResponse(packet)
context.become(Established) context.become(Established)
case default => failWithError("Unexpected packet type " + default) case default => failWithError(s"Unexpected packet type $default in state CryptoSetupFinished")
} }
} }
case default => failWithError(s"Invalid message received ${default}") case default => failWithError(s"Invalid message '$default' received in state CryptoSetupFinished")
} }
def Established : Receive = { def Established : Receive = {
@ -205,12 +207,12 @@ class CryptoSessionActor extends Actor {
self ! packet self ! packet
case Failure(e) => case Failure(e) =>
logger.error("Failed to decode encrypted packet: " + e) log.error("Failed to decode encrypted packet: " + e)
} }
case default => failWithError("Unexpected packet type " + default) case default => failWithError(s"Unexpected packet type $default in state Established")
} }
case Failure(e) => logger.error("Could not decode raw packet: " + e) case Failure(e) => log.error("Could not decode raw packet: " + e)
} }
case ctrl @ ControlPacket(_, _) => case ctrl @ ControlPacket(_, _) =>
val from = sender() val from = sender()
@ -220,11 +222,11 @@ class CryptoSessionActor extends Actor {
val from = sender() val from = sender()
handleEstablishedPacket(from, game) handleEstablishedPacket(from, game)
case default => failWithError(s"Invalid message received ${default}") case default => failWithError(s"Invalid message '$default' received in state Established")
} }
def failWithError(error : String) = { def failWithError(error : String) = {
logger.error(error) log.error(error)
} }
def resetState() : Unit = { def resetState() : Unit = {
@ -255,7 +257,7 @@ class CryptoSessionActor extends Actor {
val packet = PacketCoding.encryptPacket(cryptoState.get, cont).require val packet = PacketCoding.encryptPacket(cryptoState.get, cont).require
sendResponse(packet) sendResponse(packet)
} else { } else {
logger.error("Invalid sender") log.error("Invalid sender")
} }
} }

View file

@ -1,13 +1,13 @@
// Copyright (c) 2016 PSForever.net to present // Copyright (c) 2016 PSForever.net to present
import java.net.{InetAddress, InetSocketAddress} import java.net.{InetAddress, InetSocketAddress}
import akka.actor.{ActorRef, Identify, Actor, ActorLogging} import akka.actor.{Actor, ActorLogging, ActorRef, Identify, MDCContextAware}
import psforever.net._ import psforever.net._
import scodec.Attempt.{Failure, Successful} import scodec.Attempt.{Failure, Successful}
import scodec.bits._ import scodec.bits._
class LoginSessionActor extends Actor { class LoginSessionActor extends Actor with MDCContextAware {
private[this] val logger = org.log4s.getLogger private[this] val log = org.log4s.getLogger
var leftRef : ActorRef = ActorRef.noSender var leftRef : ActorRef = ActorRef.noSender
var rightRef : ActorRef = ActorRef.noSender var rightRef : ActorRef = ActorRef.noSender
@ -21,7 +21,7 @@ class LoginSessionActor extends Actor {
context.become(Started) context.become(Started)
case _ => case _ =>
logger.error("Unknown message") log.error("Unknown message")
context.stop(self) context.stop(self)
} }
@ -38,7 +38,7 @@ class LoginSessionActor extends Actor {
case SlottedMetaPacket(innerPacket) => case SlottedMetaPacket(innerPacket) =>
PacketCoding.DecodePacket(innerPacket) match { PacketCoding.DecodePacket(innerPacket) match {
case Successful(p) => case Successful(p) =>
logger.trace("RECV[INNER]: " + p) log.trace("RECV[INNER]: " + p)
val packet = LoginRespMessage("AAAABBBBCCCCDDDD", val packet = LoginRespMessage("AAAABBBBCCCCDDDD",
hex"00000000 18FABE0C 00000000 00000000", hex"00000000 18FABE0C 00000000 00000000",
@ -55,7 +55,7 @@ class LoginSessionActor extends Actor {
)) ))
sendResponse(PacketCoding.CreateGamePacket(0, msg)) sendResponse(PacketCoding.CreateGamePacket(0, msg))
case Failure(e) => logger.error("Failed to decode inner packet " + e) case Failure(e) => log.error("Failed to decode inner packet " + e)
} }
} }
} }
@ -65,12 +65,12 @@ class LoginSessionActor extends Actor {
} }
def failWithError(error : String) = { def failWithError(error : String) = {
logger.error(error) log.error(error)
sendResponse(PacketCoding.CreateControlPacket(ConnectionClose())) sendResponse(PacketCoding.CreateControlPacket(ConnectionClose()))
} }
def sendResponse(cont : PlanetSidePacketContainer) = { def sendResponse(cont : PlanetSidePacketContainer) = {
logger.trace("LOGIN SEND: " + cont) log.trace("LOGIN SEND: " + cont)
rightRef ! cont rightRef ! cont
} }
} }

View file

@ -0,0 +1,65 @@
// Taken from http://code.hootsuite.com/logging-contextual-info-in-an-asynchronous-scala-application/
package akka.actor
import akka.util.Timeout
import org.slf4j.MDC
import scala.concurrent.Future
trait MDCContextAware extends Actor with ActorLogging {
import MDCContextAware._
// This is why this needs to be in package akka.actor
override protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = {
val orig = MDC.getCopyOfContextMap
try {
msg match {
case MdcMsg(mdc, origMsg) =>
if (mdc != null)
MDC.setContextMap(mdc)
else
MDC.clear()
super.aroundReceive(receive, origMsg)
case _ =>
super.aroundReceive(receive, msg)
}
} finally {
if (orig != null)
MDC.setContextMap(orig)
else
MDC.clear()
}
}
}
object MDCContextAware {
private case class MdcMsg(mdc: java.util.Map[String, String], msg: Any)
object Implicits {
/**
* Add two new methods that allow MDC info to be passed to MDCContextAware actors.
*
* Do NOT use these methods to send to actors that are not MDCContextAware.
*/
implicit class ContextLocalAwareActorRef(val ref: ActorRef) extends AnyVal {
import akka.pattern.ask
/**
* Send a message to an actor that is MDCContextAware - it will propagate
* the current MDC values. Note: we MUST capture the ActorContext in order for senders
* to be correct! This was a bug from the original author.
*/
def !>(msg: Any)(implicit context: ActorContext) : Unit =
ref.tell(MdcMsg(MDC.getCopyOfContextMap, msg), context.self)
/**
* "Ask" an actor that is MDCContextAware for something - it will propagate
* the current MDC values
*/
def ?>(msg: Any)(implicit context: ActorContext, timeout: Timeout): Future[Any] =
ref.ask(MdcMsg(MDC.getCopyOfContextMap, msg), context.self)
}
}
}

View file

@ -0,0 +1,64 @@
// Taken from http://code.hootsuite.com/logging-contextual-info-in-an-asynchronous-scala-application/
import org.slf4j.MDC
import scala.concurrent.ExecutionContext
trait MDCPropagatingExecutionContext extends ExecutionContext {
// name the self-type "self" so we can refer to it inside the nested class
self =>
override def prepare(): ExecutionContext = new ExecutionContext {
// Save the call-site MDC state
val context = MDC.getCopyOfContextMap
def execute(r: Runnable): Unit = self.execute(new Runnable {
def run(): Unit = {
// Save the existing execution-site MDC state
val oldContext = MDC.getCopyOfContextMap
try {
// Set the call-site MDC state into the execution-site MDC
if (context != null )
MDC.setContextMap(context)
else
MDC.clear()
r.run()
} finally {
// Restore the existing execution-site MDC state
if (oldContext != null)
MDC.setContextMap(oldContext)
else
MDC.clear()
}
}
})
def reportFailure(t: Throwable): Unit = self.reportFailure(t)
}
}
object MDCPropagatingExecutionContext {
object Implicits {
// Convenience wrapper around the Scala global ExecutionContext so you can just do:
// import MDCPropagatingExecutionContext.Implicits.global
implicit lazy val global = MDCPropagatingExecutionContextWrapper(ExecutionContext.Implicits.global)
}
}
/**
* Wrapper around an existing ExecutionContext that makes it propagate MDC information.
*/
class MDCPropagatingExecutionContextWrapper(wrapped: ExecutionContext)
extends ExecutionContext with MDCPropagatingExecutionContext {
override def execute(r: Runnable): Unit = wrapped.execute(r)
override def reportFailure(t: Throwable): Unit = wrapped.reportFailure(t)
}
object MDCPropagatingExecutionContextWrapper {
def apply(wrapped: ExecutionContext): MDCPropagatingExecutionContextWrapper = {
new MDCPropagatingExecutionContextWrapper(wrapped)
}
}

View file

@ -1,15 +1,16 @@
// Copyright (c) 2016 PSForever.net to present // Copyright (c) 2016 PSForever.net to present
import akka.actor.{ActorSystem, Props} import akka.actor.{ActorSystem, Props}
import ch.qos.logback.classic.LoggerContext import ch.qos.logback.classic.LoggerContext
import ch.qos.logback.core.status._
import ch.qos.logback.core.util.StatusPrinter import ch.qos.logback.core.util.StatusPrinter
import com.typesafe.config.ConfigFactory
import psforever.crypto.CryptoInterface import psforever.crypto.CryptoInterface
import org.log4s._
import org.slf4j import org.slf4j
import org.slf4j.LoggerFactory
import org.fusesource.jansi.Ansi._ import org.fusesource.jansi.Ansi._
import org.fusesource.jansi.Ansi.Color._ import org.fusesource.jansi.Ansi.Color._
import scala.collection.JavaConverters._
object PsLogin { object PsLogin {
private val logger = org.log4s.getLogger private val logger = org.log4s.getLogger
@ -22,14 +23,22 @@ object PsLogin {
println println
} }
def loggerHasErrors(context : LoggerContext) = {
val statusUtil = new StatusUtil(context)
statusUtil.getHighestLevel(0) >= Status.WARN
}
def main(args : Array[String]) : Unit = { def main(args : Array[String]) : Unit = {
banner banner
// assume SLF4J is bound to logback in the current environment // assume SLF4J is bound to logback in the current environment
val lc = slf4j.LoggerFactory.getILoggerFactory.asInstanceOf[LoggerContext] val lc = slf4j.LoggerFactory.getILoggerFactory.asInstanceOf[LoggerContext]
// print logback's internal status if(loggerHasErrors(lc)) {
StatusPrinter.printInCaseOfErrorsOrWarnings(lc) StatusPrinter.printInCaseOfErrorsOrWarnings(lc)
sys.exit(1)
}
try { try {
CryptoInterface.initialize() CryptoInterface.initialize()
@ -41,12 +50,19 @@ object PsLogin {
sys.exit(1) sys.exit(1)
} }
logger.info(s"Detected ${Runtime.getRuntime.availableProcessors()} available logical processors")
logger.info("Starting actor subsystems...") logger.info("Starting actor subsystems...")
val system = ActorSystem("PsLogin") val config = Map(
"akka.loggers" -> List("akka.event.slf4j.Slf4jLogger").asJava,
"akka.loglevel" -> "INFO",
"akka.logging-filter" -> "akka.event.slf4j.Slf4jLoggingFilter"
).asJava
//val system = ActorSystem("PsLogin", Some(ConfigFactory.parseMap(config)), None, Some(MDCPropagatingExecutionContextWrapper(ExecutionContext.Implicits.global)))
val system = ActorSystem("PsLogin", ConfigFactory.parseMap(config))
val session = system.actorOf(Props[SessionRouter], "session-router") val session = system.actorOf(Props[SessionRouter], "session-router")
val listener = system.actorOf(Props(new UdpListener(session)), "udp-listener") val listener = system.actorOf(Props(new UdpListener(session)), "udp-listener")
system.awaitTermination()
} }
} }

View file

@ -2,19 +2,23 @@
import java.net.InetSocketAddress import java.net.InetSocketAddress
import akka.actor._ import akka.actor._
import org.log4s.MDC
import scodec.bits._ import scodec.bits._
import scala.collection.mutable import scala.collection.mutable
import MDCContextAware.Implicits._
import akka.actor.MDCContextAware.MdcMsg
final case class RawPacket(data : ByteVector) final case class RawPacket(data : ByteVector)
final case class ResponsePacket(data : ByteVector) final case class ResponsePacket(data : ByteVector)
case class SessionState(id : Long, address : InetSocketAddress, pipeline : List[ActorRef]) { case class SessionState(id : Long, address : InetSocketAddress, pipeline : List[ActorRef]) {
def inject(pkt : RawPacket) = pipeline.head ! pkt def startOfPipe = pipeline.head
def nextOfStart = pipeline.tail.head
} }
class SessionRouter extends Actor { class SessionRouter extends Actor with MDCContextAware {
private[this] val logger = org.log4s.getLogger private[this] val log = org.log4s.getLogger
val idBySocket = mutable.Map[InetSocketAddress, Long]() val idBySocket = mutable.Map[InetSocketAddress, Long]()
val sessionById = mutable.Map[Long, SessionState]() val sessionById = mutable.Map[Long, SessionState]()
@ -45,30 +49,43 @@ class SessionRouter extends Actor {
inputRef = sender() inputRef = sender()
context.become(started) context.become(started)
case _ => case _ =>
logger.error("Unknown message") log.error("Unknown message")
context.stop(self) context.stop(self)
} }
def started : Receive = { def started : Receive = {
case ReceivedPacket(msg, from) => case ReceivedPacket(msg, from) =>
if(idBySocket.contains(from)) { if(idBySocket.contains(from)) {
sessionById{idBySocket{from}}.inject(RawPacket(msg)) MDC("sessionId") = idBySocket{from}.toString
} else {
logger.info("New session from " + from.toString)
log.trace(s"Handling recieved packet")
sessionById{idBySocket{from}}.startOfPipe !> RawPacket(msg)
MDC.clear()
} else {
val session = createNewSession(from) val session = createNewSession(from)
idBySocket{from} = session.id idBySocket{from} = session.id
sessionById{session.id} = session sessionById{session.id} = session
sessionByActor{session.pipeline.head} = session sessionByActor{session.pipeline.head} = session
sessionById{session.id}.inject(RawPacket(msg)) MDC("sessionId") = session.id.toString
log.info("New session from " + from.toString)
// send the initial message with MDC context (give the session ID to the lower layers)
sessionById{session.id}.startOfPipe !> HelloFriend(sessionById{session.id}.nextOfStart)
sessionById{session.id}.startOfPipe ! RawPacket(msg)
MDC.clear()
} }
case ResponsePacket(msg) => case ResponsePacket(msg) =>
val session = sessionByActor{sender()} val session = sessionByActor{sender()}
log.trace(s"Sending response ${msg}")
inputRef ! SendPacket(msg, session.address) inputRef ! SendPacket(msg, session.address)
case _ => logger.error("Unknown message") case _ => log.error("Unknown message")
} }
def createNewSession(address : InetSocketAddress) = { def createNewSession(address : InetSocketAddress) = {
@ -79,9 +96,6 @@ class SessionRouter extends Actor {
val loginSession = context.actorOf(Props[LoginSessionActor], val loginSession = context.actorOf(Props[LoginSessionActor],
"login-session" + id.toString) "login-session" + id.toString)
// start the pipeline setup
cryptoSession ! HelloFriend(loginSession)
SessionState(id, address, List(cryptoSession, loginSession)) SessionState(id, address, List(cryptoSession, loginSession))
} }