diff --git a/build.sbt b/build.sbt index 574d91af4..72efcd569 100644 --- a/build.sbt +++ b/build.sbt @@ -11,6 +11,7 @@ lazy val commonSettings = Seq( "org.scodec" %% "scodec-core" % "1.8.3", "org.scodec" %% "scodec-akka" % "0.1.0-SNAPSHOT", "net.java.dev.jna" % "jna" % "4.2.1", + "com.typesafe.akka" %% "akka-slf4j" % "2.4.4", "ch.qos.logback" % "logback-classic" % "1.1.7", "org.log4s" %% "log4s" % "1.3.0", "org.fusesource.jansi" % "jansi" % "1.12" diff --git a/pslogin/src/main/scala/CryptoSessionActor.scala b/pslogin/src/main/scala/CryptoSessionActor.scala index ea4e0020b..e99ced01d 100644 --- a/pslogin/src/main/scala/CryptoSessionActor.scala +++ b/pslogin/src/main/scala/CryptoSessionActor.scala @@ -1,21 +1,21 @@ // Copyright (c) 2016 PSForever.net to present import java.net.{InetAddress, InetSocketAddress} -import akka.actor.{ActorRef, Identify, Actor} -import psforever.crypto.CryptoInterface.{CryptoStateWithMAC, CryptoState} +import akka.actor.{Actor, ActorLogging, ActorRef, DiagnosticActorLogging, Identify, MDCContextAware} +import psforever.crypto.CryptoInterface.{CryptoState, CryptoStateWithMAC} import psforever.crypto.CryptoInterface import psforever.net._ -import scodec.Attempt.{Successful, Failure} +import scodec.Attempt.{Failure, Successful} import scodec.bits._ -import scodec.{Err, Attempt, Codec} -import scodec.codecs.{uint16L, uint8L, bytes} +import scodec.{Attempt, Codec, Err} +import scodec.codecs.{bytes, uint16L, uint8L} import java.security.SecureRandom /** * Actor that stores crypto state for a connection and filters away any packet metadata. */ -class CryptoSessionActor extends Actor { - private[this] val logger = org.log4s.getLogger +class CryptoSessionActor extends Actor with MDCContextAware { + private[this] val log = org.log4s.getLogger var leftRef : ActorRef = ActorRef.noSender var rightRef : ActorRef = ActorRef.noSender @@ -37,23 +37,25 @@ class CryptoSessionActor extends Actor { def Initializing : Receive = { case HelloFriend(right) => + import MDCContextAware.Implicits._ leftRef = sender() rightRef = right.asInstanceOf[ActorRef] // who ever we send to has to send something back to us - rightRef ! HelloFriend(self) + rightRef !> HelloFriend(self) + + log.trace(s"Left sender ${leftRef.path.name}") context.become(NewClient) - case _ => - logger.error("Unknown message") + case default => + log.error("Unknown message " + default) context.stop(self) } def NewClient : Receive = { case RawPacket(msg) => - // PacketCoding.DecodePacket PacketCoding.UnmarshalPacket(msg) match { - case Failure(e) => logger.error("Could not decode packet: " + e) + case Failure(e) => log.error("Could not decode packet: " + e) case Successful(p) => //println("RECV: " + p) @@ -63,16 +65,16 @@ class CryptoSessionActor extends Actor { context.become(CryptoExchange) case default => - logger.error("Unexpected packet type " + p) + log.error(s"Unexpected packet type ${p} in state NewClient") } } - case default => logger.error(s"Invalid message received ${default}") + case default => log.error(s"Invalid message '$default' received in state NewClient") } def CryptoExchange : Receive = { case RawPacket(msg) => PacketCoding.UnmarshalPacket(msg, CryptoPacketOpcode.ClientChallengeXchg) match { - case Failure(e) => logger.error("Could not decode packet: " + e) + case Failure(e) => log.error("Could not decode packet: " + e) case Successful(p) => //println("RECV: " + p) @@ -102,16 +104,16 @@ class CryptoSessionActor extends Actor { serverMACBuffer ++= sentPacket.drop(3) context.become(CryptoSetupFinishing) - case default => logger.error("Unexpected packet type " + p) + case default => log.error(s"Unexpected packet type $p in state CryptoExchange") } } - case default => logger.error(s"Invalid message received ${default}") + case default => log.error(s"Invalid message '$default' received in state CryptoExchange") } def CryptoSetupFinishing : Receive = { case RawPacket(msg) => PacketCoding.UnmarshalPacket(msg, CryptoPacketOpcode.ClientFinished) match { - case Failure(e) => logger.error("Could not decode packet: " + e) + case Failure(e) => log.error("Could not decode packet: " + e) case Successful(p) => //println("RECV: " + p) @@ -186,10 +188,10 @@ class CryptoSessionActor extends Actor { sendResponse(packet) context.become(Established) - case default => failWithError("Unexpected packet type " + default) + case default => failWithError(s"Unexpected packet type $default in state CryptoSetupFinished") } } - case default => failWithError(s"Invalid message received ${default}") + case default => failWithError(s"Invalid message '$default' received in state CryptoSetupFinished") } def Established : Receive = { @@ -205,12 +207,12 @@ class CryptoSessionActor extends Actor { self ! packet case Failure(e) => - logger.error("Failed to decode encrypted packet: " + e) + log.error("Failed to decode encrypted packet: " + e) } - case default => failWithError("Unexpected packet type " + default) + case default => failWithError(s"Unexpected packet type $default in state Established") } - case Failure(e) => logger.error("Could not decode raw packet: " + e) + case Failure(e) => log.error("Could not decode raw packet: " + e) } case ctrl @ ControlPacket(_, _) => val from = sender() @@ -220,11 +222,11 @@ class CryptoSessionActor extends Actor { val from = sender() handleEstablishedPacket(from, game) - case default => failWithError(s"Invalid message received ${default}") + case default => failWithError(s"Invalid message '$default' received in state Established") } def failWithError(error : String) = { - logger.error(error) + log.error(error) } def resetState() : Unit = { @@ -255,7 +257,7 @@ class CryptoSessionActor extends Actor { val packet = PacketCoding.encryptPacket(cryptoState.get, cont).require sendResponse(packet) } else { - logger.error("Invalid sender") + log.error("Invalid sender") } } diff --git a/pslogin/src/main/scala/LoginSessionActor.scala b/pslogin/src/main/scala/LoginSessionActor.scala index fcc58334f..770821554 100644 --- a/pslogin/src/main/scala/LoginSessionActor.scala +++ b/pslogin/src/main/scala/LoginSessionActor.scala @@ -1,13 +1,13 @@ // Copyright (c) 2016 PSForever.net to present import java.net.{InetAddress, InetSocketAddress} -import akka.actor.{ActorRef, Identify, Actor, ActorLogging} +import akka.actor.{Actor, ActorLogging, ActorRef, Identify, MDCContextAware} import psforever.net._ import scodec.Attempt.{Failure, Successful} import scodec.bits._ -class LoginSessionActor extends Actor { - private[this] val logger = org.log4s.getLogger +class LoginSessionActor extends Actor with MDCContextAware { + private[this] val log = org.log4s.getLogger var leftRef : ActorRef = ActorRef.noSender var rightRef : ActorRef = ActorRef.noSender @@ -21,7 +21,7 @@ class LoginSessionActor extends Actor { context.become(Started) case _ => - logger.error("Unknown message") + log.error("Unknown message") context.stop(self) } @@ -38,7 +38,7 @@ class LoginSessionActor extends Actor { case SlottedMetaPacket(innerPacket) => PacketCoding.DecodePacket(innerPacket) match { case Successful(p) => - logger.trace("RECV[INNER]: " + p) + log.trace("RECV[INNER]: " + p) val packet = LoginRespMessage("AAAABBBBCCCCDDDD", hex"00000000 18FABE0C 00000000 00000000", @@ -55,7 +55,7 @@ class LoginSessionActor extends Actor { )) sendResponse(PacketCoding.CreateGamePacket(0, msg)) - case Failure(e) => logger.error("Failed to decode inner packet " + e) + case Failure(e) => log.error("Failed to decode inner packet " + e) } } } @@ -65,12 +65,12 @@ class LoginSessionActor extends Actor { } def failWithError(error : String) = { - logger.error(error) + log.error(error) sendResponse(PacketCoding.CreateControlPacket(ConnectionClose())) } def sendResponse(cont : PlanetSidePacketContainer) = { - logger.trace("LOGIN SEND: " + cont) + log.trace("LOGIN SEND: " + cont) rightRef ! cont } } diff --git a/pslogin/src/main/scala/MDCContextAware.scala b/pslogin/src/main/scala/MDCContextAware.scala new file mode 100644 index 000000000..eb5a27d4b --- /dev/null +++ b/pslogin/src/main/scala/MDCContextAware.scala @@ -0,0 +1,65 @@ +// Taken from http://code.hootsuite.com/logging-contextual-info-in-an-asynchronous-scala-application/ +package akka.actor + +import akka.util.Timeout +import org.slf4j.MDC + +import scala.concurrent.Future + +trait MDCContextAware extends Actor with ActorLogging { + import MDCContextAware._ + + // This is why this needs to be in package akka.actor + override protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = { + val orig = MDC.getCopyOfContextMap + try { + msg match { + case MdcMsg(mdc, origMsg) => + if (mdc != null) + MDC.setContextMap(mdc) + else + MDC.clear() + super.aroundReceive(receive, origMsg) + case _ => + super.aroundReceive(receive, msg) + } + } finally { + if (orig != null) + MDC.setContextMap(orig) + else + MDC.clear() + } + } +} + +object MDCContextAware { + private case class MdcMsg(mdc: java.util.Map[String, String], msg: Any) + + object Implicits { + + /** + * Add two new methods that allow MDC info to be passed to MDCContextAware actors. + * + * Do NOT use these methods to send to actors that are not MDCContextAware. + */ + implicit class ContextLocalAwareActorRef(val ref: ActorRef) extends AnyVal { + + import akka.pattern.ask + + /** + * Send a message to an actor that is MDCContextAware - it will propagate + * the current MDC values. Note: we MUST capture the ActorContext in order for senders + * to be correct! This was a bug from the original author. + */ + def !>(msg: Any)(implicit context: ActorContext) : Unit = + ref.tell(MdcMsg(MDC.getCopyOfContextMap, msg), context.self) + + /** + * "Ask" an actor that is MDCContextAware for something - it will propagate + * the current MDC values + */ + def ?>(msg: Any)(implicit context: ActorContext, timeout: Timeout): Future[Any] = + ref.ask(MdcMsg(MDC.getCopyOfContextMap, msg), context.self) + } + } +} \ No newline at end of file diff --git a/pslogin/src/main/scala/MDCPropagatingExecutionContext.scala b/pslogin/src/main/scala/MDCPropagatingExecutionContext.scala new file mode 100644 index 000000000..8f2d28b15 --- /dev/null +++ b/pslogin/src/main/scala/MDCPropagatingExecutionContext.scala @@ -0,0 +1,64 @@ +// Taken from http://code.hootsuite.com/logging-contextual-info-in-an-asynchronous-scala-application/ + +import org.slf4j.MDC + +import scala.concurrent.ExecutionContext + +trait MDCPropagatingExecutionContext extends ExecutionContext { + // name the self-type "self" so we can refer to it inside the nested class + self => + + override def prepare(): ExecutionContext = new ExecutionContext { + // Save the call-site MDC state + val context = MDC.getCopyOfContextMap + + def execute(r: Runnable): Unit = self.execute(new Runnable { + def run(): Unit = { + // Save the existing execution-site MDC state + val oldContext = MDC.getCopyOfContextMap + try { + // Set the call-site MDC state into the execution-site MDC + if (context != null ) + MDC.setContextMap(context) + else + MDC.clear() + + r.run() + } finally { + // Restore the existing execution-site MDC state + if (oldContext != null) + MDC.setContextMap(oldContext) + else + MDC.clear() + } + } + }) + + def reportFailure(t: Throwable): Unit = self.reportFailure(t) + } +} + +object MDCPropagatingExecutionContext { + object Implicits { + // Convenience wrapper around the Scala global ExecutionContext so you can just do: + // import MDCPropagatingExecutionContext.Implicits.global + implicit lazy val global = MDCPropagatingExecutionContextWrapper(ExecutionContext.Implicits.global) + } +} + +/** + * Wrapper around an existing ExecutionContext that makes it propagate MDC information. + */ +class MDCPropagatingExecutionContextWrapper(wrapped: ExecutionContext) + extends ExecutionContext with MDCPropagatingExecutionContext { + + override def execute(r: Runnable): Unit = wrapped.execute(r) + + override def reportFailure(t: Throwable): Unit = wrapped.reportFailure(t) +} + +object MDCPropagatingExecutionContextWrapper { + def apply(wrapped: ExecutionContext): MDCPropagatingExecutionContextWrapper = { + new MDCPropagatingExecutionContextWrapper(wrapped) + } +} \ No newline at end of file diff --git a/pslogin/src/main/scala/PsLogin.scala b/pslogin/src/main/scala/PsLogin.scala index 77a2f14d1..01fa40719 100644 --- a/pslogin/src/main/scala/PsLogin.scala +++ b/pslogin/src/main/scala/PsLogin.scala @@ -1,15 +1,16 @@ // Copyright (c) 2016 PSForever.net to present import akka.actor.{ActorSystem, Props} import ch.qos.logback.classic.LoggerContext +import ch.qos.logback.core.status._ import ch.qos.logback.core.util.StatusPrinter +import com.typesafe.config.ConfigFactory import psforever.crypto.CryptoInterface -import org.log4s._ import org.slf4j -import org.slf4j.LoggerFactory - import org.fusesource.jansi.Ansi._ import org.fusesource.jansi.Ansi.Color._ +import scala.collection.JavaConverters._ + object PsLogin { private val logger = org.log4s.getLogger @@ -22,14 +23,22 @@ object PsLogin { println } + def loggerHasErrors(context : LoggerContext) = { + val statusUtil = new StatusUtil(context) + + statusUtil.getHighestLevel(0) >= Status.WARN + } + def main(args : Array[String]) : Unit = { banner // assume SLF4J is bound to logback in the current environment val lc = slf4j.LoggerFactory.getILoggerFactory.asInstanceOf[LoggerContext] - // print logback's internal status - StatusPrinter.printInCaseOfErrorsOrWarnings(lc) + if(loggerHasErrors(lc)) { + StatusPrinter.printInCaseOfErrorsOrWarnings(lc) + sys.exit(1) + } try { CryptoInterface.initialize() @@ -41,12 +50,19 @@ object PsLogin { sys.exit(1) } + logger.info(s"Detected ${Runtime.getRuntime.availableProcessors()} available logical processors") + logger.info("Starting actor subsystems...") - val system = ActorSystem("PsLogin") + val config = Map( + "akka.loggers" -> List("akka.event.slf4j.Slf4jLogger").asJava, + "akka.loglevel" -> "INFO", + "akka.logging-filter" -> "akka.event.slf4j.Slf4jLoggingFilter" + ).asJava + + //val system = ActorSystem("PsLogin", Some(ConfigFactory.parseMap(config)), None, Some(MDCPropagatingExecutionContextWrapper(ExecutionContext.Implicits.global))) + val system = ActorSystem("PsLogin", ConfigFactory.parseMap(config)) val session = system.actorOf(Props[SessionRouter], "session-router") val listener = system.actorOf(Props(new UdpListener(session)), "udp-listener") - - system.awaitTermination() } } diff --git a/pslogin/src/main/scala/SessionRouter.scala b/pslogin/src/main/scala/SessionRouter.scala index 8eadcab57..4cae2792c 100644 --- a/pslogin/src/main/scala/SessionRouter.scala +++ b/pslogin/src/main/scala/SessionRouter.scala @@ -2,19 +2,23 @@ import java.net.InetSocketAddress import akka.actor._ +import org.log4s.MDC import scodec.bits._ import scala.collection.mutable +import MDCContextAware.Implicits._ +import akka.actor.MDCContextAware.MdcMsg final case class RawPacket(data : ByteVector) final case class ResponsePacket(data : ByteVector) case class SessionState(id : Long, address : InetSocketAddress, pipeline : List[ActorRef]) { - def inject(pkt : RawPacket) = pipeline.head ! pkt + def startOfPipe = pipeline.head + def nextOfStart = pipeline.tail.head } -class SessionRouter extends Actor { - private[this] val logger = org.log4s.getLogger +class SessionRouter extends Actor with MDCContextAware { + private[this] val log = org.log4s.getLogger val idBySocket = mutable.Map[InetSocketAddress, Long]() val sessionById = mutable.Map[Long, SessionState]() @@ -45,30 +49,43 @@ class SessionRouter extends Actor { inputRef = sender() context.become(started) case _ => - logger.error("Unknown message") + log.error("Unknown message") context.stop(self) } def started : Receive = { case ReceivedPacket(msg, from) => if(idBySocket.contains(from)) { - sessionById{idBySocket{from}}.inject(RawPacket(msg)) - } else { - logger.info("New session from " + from.toString) + MDC("sessionId") = idBySocket{from}.toString + log.trace(s"Handling recieved packet") + sessionById{idBySocket{from}}.startOfPipe !> RawPacket(msg) + + MDC.clear() + } else { val session = createNewSession(from) idBySocket{from} = session.id sessionById{session.id} = session sessionByActor{session.pipeline.head} = session - sessionById{session.id}.inject(RawPacket(msg)) + MDC("sessionId") = session.id.toString + + log.info("New session from " + from.toString) + + // send the initial message with MDC context (give the session ID to the lower layers) + sessionById{session.id}.startOfPipe !> HelloFriend(sessionById{session.id}.nextOfStart) + sessionById{session.id}.startOfPipe ! RawPacket(msg) + + MDC.clear() } case ResponsePacket(msg) => val session = sessionByActor{sender()} + log.trace(s"Sending response ${msg}") + inputRef ! SendPacket(msg, session.address) - case _ => logger.error("Unknown message") + case _ => log.error("Unknown message") } def createNewSession(address : InetSocketAddress) = { @@ -79,9 +96,6 @@ class SessionRouter extends Actor { val loginSession = context.actorOf(Props[LoginSessionActor], "login-session" + id.toString) - // start the pipeline setup - cryptoSession ! HelloFriend(loginSession) - SessionState(id, address, List(cryptoSession, loginSession)) }