From 0d4a5ad40efa3f20bbbd89e283e3aeacc5870e47 Mon Sep 17 00:00:00 2001 From: Jakob Gillich Date: Sat, 26 Feb 2022 21:34:36 +0100 Subject: [PATCH] basic client it's able to join the world and perform basic state updates. packet parsing is very primitive. --- build.sbt | 8 +- docker-compose.yml | 2 +- .../net/psforever/util/DiffieHellman.scala | 6 +- .../net/psforever/tools/client/Client.scala | 412 +++++++++++++++--- .../net/psforever/tools/client/State.scala | 98 +++++ 5 files changed, 454 insertions(+), 72 deletions(-) create mode 100644 tools/client/src/main/scala/net/psforever/tools/client/State.scala diff --git a/build.sbt b/build.sbt index 3e8bf5ce..aa6fd9f6 100644 --- a/build.sbt +++ b/build.sbt @@ -44,16 +44,16 @@ lazy val psforeverSettings = Seq( "com.typesafe.akka" %% "akka-slf4j" % "2.6.17", "com.typesafe.akka" %% "akka-protobuf-v3" % "2.6.17", "com.typesafe.akka" %% "akka-stream" % "2.6.17", - "com.typesafe.akka" %% "akka-testkit" % "2.6.17" % "test", + "com.typesafe.akka" %% "akka-testkit" % "2.6.17" % "test", "com.typesafe.akka" %% "akka-actor-typed" % "2.6.17", + "com.typesafe.akka" %% "akka-slf4j" % "2.6.17", "com.typesafe.akka" %% "akka-cluster-typed" % "2.6.17", "com.typesafe.akka" %% "akka-coordination" % "2.6.17", "com.typesafe.akka" %% "akka-cluster-tools" % "2.6.17", - "com.typesafe.akka" %% "akka-slf4j" % "2.6.17", "com.typesafe.akka" %% "akka-http" % "10.2.6", "com.typesafe.scala-logging" %% "scala-logging" % "3.9.4", "org.specs2" %% "specs2-core" % "4.13.0" % "test", - "org.scalatest" %% "scalatest" % "3.2.10" % "test", + "org.scalatest" %% "scalatest" % "3.2.10" % "test", "org.scodec" %% "scodec-core" % "1.11.9", "ch.qos.logback" % "logback-classic" % "1.2.6", "org.log4s" %% "log4s" % "1.10.0", @@ -65,7 +65,7 @@ lazy val psforeverSettings = Seq( "io.kamon" %% "kamon-bundle" % "2.3.1", "io.kamon" %% "kamon-apm-reporter" % "2.3.1", "org.json4s" %% "json4s-native" % "4.0.3", - "io.getquill" %% "quill-jasync-postgres" % "3.10.0", + "io.getquill" %% "quill-jasync-postgres" % "3.12.0", "org.flywaydb" % "flyway-core" % "8.0.3", "org.postgresql" % "postgresql" % "42.3.1", "com.typesafe" % "config" % "1.4.1", diff --git a/docker-compose.yml b/docker-compose.yml index d91d6bb0..b7e3d422 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -19,7 +19,7 @@ services: ports: - 51010:8080 db: - image: postgres + image: postgres:12 ports: - 5432:5432 environment: diff --git a/src/main/scala/net/psforever/util/DiffieHellman.scala b/src/main/scala/net/psforever/util/DiffieHellman.scala index d9a963c4..6da4d1c2 100644 --- a/src/main/scala/net/psforever/util/DiffieHellman.scala +++ b/src/main/scala/net/psforever/util/DiffieHellman.scala @@ -9,9 +9,9 @@ import java.security.SecureRandom case class DiffieHellman(p: Array[Byte], g: Array[Byte]) { import DiffieHellman._ - private val _p = BigInt(1, p) - private val _g = BigInt(1, g) - private val privateKey: BigInt = BigInt(128, random) + private val _p = BigInt(1, p) + private val _g = BigInt(1, g) + private val privateKey = BigInt(128, random) val publicKey: Array[Byte] = bytes(_g.modPow(privateKey, _p)) diff --git a/tools/client/src/main/scala/net/psforever/tools/client/Client.scala b/tools/client/src/main/scala/net/psforever/tools/client/Client.scala index e31b0c5b..7f75fca0 100644 --- a/tools/client/src/main/scala/net/psforever/tools/client/Client.scala +++ b/tools/client/src/main/scala/net/psforever/tools/client/Client.scala @@ -2,10 +2,8 @@ package net.psforever.tools.client import java.net.{DatagramPacket, DatagramSocket, InetSocketAddress} import java.security.{SecureRandom, Security} - import akka.actor.typed.ActorRef import akka.io.Udp -import enumeratum.{Enum, EnumEntry} import net.psforever.packet.{ CryptoPacketOpcode, PacketCoding, @@ -15,34 +13,59 @@ import net.psforever.packet.{ PlanetSidePacket } import net.psforever.packet.PacketCoding.CryptoCoding -import net.psforever.packet.control.{ClientStart, ServerStart} -import net.psforever.packet.crypto.{ClientChallengeXchg, ServerChallengeXchg} +import net.psforever.packet.control.{ + ClientStart, + ConnectionClose, + HandleGamePacket, + MultiPacketEx, + ServerStart, + SlottedMetaPacket +} +import net.psforever.packet.crypto.{ClientChallengeXchg, ClientFinished, ServerChallengeXchg, ServerFinished} +import net.psforever.packet.game.{ + BeginZoningMessage, + CharacterInfoMessage, + CharacterRequestAction, + CharacterRequestMessage, + ConnectToWorldRequestMessage, + KeepAliveMessage, + LoadMapMessage, + LoginMessage, + LoginRespMessage, + PlayerStateMessageUpstream, + VNLWorldStatusMessage, + WorldInformation +} +import net.psforever.tools.client.State.Connection +import net.psforever.util.{DiffieHellman, Md5Mac} import org.bouncycastle.jce.provider.BouncyCastleProvider import scodec.{Attempt, Err} import scodec.Attempt.{Failure, Successful} import scodec.bits._ +import javax.crypto.spec.SecretKeySpec +import scala.collection.mutable import scala.concurrent.duration.{DurationInt, FiniteDuration} +import scala.reflect.ClassTag +import java.util.concurrent.{Executors, TimeUnit} object Client { Security.addProvider(new BouncyCastleProvider) + private[this] val log = org.log4s.getLogger + def main(args: Array[String]): Unit = { val client = new Client("test", "test") client.login(new InetSocketAddress("localhost", 51000)) - } - - sealed trait ClientState extends EnumEntry - - object ClientState extends Enum[ClientState] { - - case object Disconnected extends ClientState - case object WorldSelection extends ClientState - case object AvatarSelection extends ClientState - case object AvatarCreation extends ClientState - - val values: IndexedSeq[ClientState] = findValues + client.joinWorld(client.state.worlds.head) + client.selectCharacter(client.state.characters.head.charId) + client.startTasks() + while (true) { + client.updateAvatar(client.state.avatar.copy(crouching = !client.state.avatar.crouching)) + Thread.sleep(2000) + //Thread.sleep(Int.MaxValue) + } } } @@ -56,65 +79,251 @@ class Client(username: String, password: String) { r } - private val socket = new DatagramSocket() - socket.setSoTimeout(1000) - private var host: Option[InetSocketAddress] = None + private var _state = State() + def state: State = _state + + private[this] val log = org.log4s.getLogger + + private var socket: Option[DatagramSocket] = None private var ref: Option[ActorRef[Udp.Message]] = None private var crypto: Option[CryptoCoding] = None private val buffer = new Array[Byte](65535) val random = new SecureRandom() - private var _state: ClientState = ClientState.Disconnected - def state: ClientState = _state + private val inQueue: mutable.Queue[PlanetSidePacket] = mutable.Queue() + private val splitPackets: mutable.ArrayDeque[(Int, ByteVector)] = mutable.ArrayDeque() + + private val scheduler = Executors.newScheduledThreadPool(2) + + /** Establish encrypted connection */ + private def setupConnection(): Unit = { + assert(state.connection == Connection.Disconnected) + var macBuffer: ByteVector = ByteVector.empty + + send(ClientStart(0)).require + val serverStart = waitFor[ServerStart]().require + assert(serverStart.clientNonce == 0) + + val time = System.currentTimeMillis() / 1000 + val randomChallenge = randomBytes(12) + val clientChallenge = ServerChallengeXchg.getCompleteChallenge(time, randomChallenge) + val p = randomBytes(16) + val g = ByteVector(1.toByte).reverse.padTo(16).reverse + val dh = DiffieHellman(p.toArray, g.toArray) + send(ClientChallengeXchg(time, randomChallenge, p, g)).require + val serverChallengeMsg = waitFor[ServerChallengeXchg](CryptoPacketOpcode.ServerChallengeXchg).require + + val serverChallenge = + ServerChallengeXchg.getCompleteChallenge(serverChallengeMsg.time, serverChallengeMsg.challenge) + val agreedKey = dh.agree(serverChallengeMsg.pubKey.toArray) + + val agreedMessage = ByteVector("master secret".getBytes) ++ clientChallenge ++ + hex"00000000" ++ serverChallenge ++ hex"00000000" + val masterSecret = new Md5Mac(ByteVector.view(agreedKey)).updateFinal(agreedMessage) + val mac = new Md5Mac(masterSecret) + val serverExpansion = ByteVector.view("server expansion".getBytes) ++ hex"0000" ++ serverChallenge ++ + hex"00000000" ++ clientChallenge ++ hex"00000000" + val clientExpansion = ByteVector.view("client expansion".getBytes) ++ hex"0000" ++ serverChallenge ++ + hex"00000000" ++ clientChallenge ++ hex"00000000" + val serverKey = mac.updateFinal(serverExpansion, 64) + val clientKey = mac.updateFinal(clientExpansion, 64) + + send(ClientFinished(16, ByteVector.view(dh.publicKey), ByteVector.empty)).require + crypto = Some( + CryptoCoding( + new SecretKeySpec(clientKey.take(20).toArray, "RC5"), + new SecretKeySpec(serverKey.take(20).toArray, "RC5"), + clientKey.slice(20, 36), + serverKey.slice(20, 36) + ) + ) + waitFor[ServerFinished](CryptoPacketOpcode.ServerFinished).require + } /** Login using given host address */ def login(host: InetSocketAddress): Unit = { - this.host = Some(host) + val sock = new DatagramSocket() + sock.setSoTimeout(10000) + sock.connect(host) + socket = Some(sock) login() } /** Login using given actor ref */ - /* def login(ref: ActorRef[Udp.Message]): Unit = { this.ref = Some(ref) login() } - */ - private def login() = { - assert(state == ClientState.Disconnected) - var macBuffer: ByteVector = ByteVector.empty - - send(ClientStart(0)) - val serverStart = waitFor[ServerStart]().require - assert(serverStart.clientNonce == 0) - - val time = System.currentTimeMillis() - val challenge = randomBytes(12) - val p = randomBytes(16) - val g = ByteVector(1.toByte).reverse.padTo(16).reverse - send(ClientChallengeXchg(time, challenge, p, g)) - - val serverKey = waitFor[ServerChallengeXchg]().require.pubKey + private def login(): Unit = { + setupConnection() + send(LoginMessage(0, 0, "", username, Some(password), None, 0)).require + waitFor[LoginRespMessage]().require + waitFor[VNLWorldStatusMessage]().require + assert(state.connection == Connection.WorldSelection) + disconnect() } - private def waitFor[T]( + def disconnect(): Unit = { + send(ConnectionClose()).require + socket match { + case Some(socket) => socket.disconnect() + case _ => ??? + } + crypto = None + // Server does not send any confirmation for ConnectionClose + _state = state.copy(connection = Connection.Disconnected) + } + + /** Join world */ + def joinWorld(world: WorldInformation): Unit = { + socket match { + case Some(_) => + val sock = new DatagramSocket() + sock.setSoTimeout(60000) + log.info(s"joinWorld ${world.connections.head.address}") + sock.connect(world.connections.head.address) + socket = Some(sock) + case _ => ??? + } + setupConnection() + send(ConnectToWorldRequestMessage("", state.token.get, 0, 0, 0, "", 0)).require + waitFor[CharacterInfoMessage]().require + } + + def selectCharacter(charId: Long): Unit = { + assert(state.connection == Connection.AvatarSelection) + send(CharacterRequestMessage(charId, CharacterRequestAction.Select)).require + waitFor[LoadMapMessage](timeout = 15.seconds).require + } + + def createCharacter(): Unit = { + ??? + } + + def deleteCharacter(charId: Long): Unit = { + ??? // never been tested + assert(state.connection == Connection.AvatarSelection) + send(CharacterRequestMessage(charId, CharacterRequestAction.Delete)).require + } + + def updateAvatar(avatar: State.Avatar): Unit = { + this._state = this.state.copy(avatar = avatar) + } + + /** Start processing tasks. Must be run after login/joinWorld. */ + def startTasks(): Unit = { + scheduler.scheduleAtFixedRate(new Runnable() { override def run(): Unit = tick() }, 0, 250, TimeUnit.MILLISECONDS) + + scheduler.scheduleAtFixedRate( + new Runnable() { + override def run(): Unit = { + receive().foreach { + case Failure(cause) => log.error(s"receive error: ${cause}") + case _ => () + } + while (inQueue.nonEmpty) { + process() + } + } + }, + 0, + 10, + TimeUnit.MILLISECONDS + ) + } + + /** Stop auto processing tasks. */ + def stopTasks(): Unit = { + scheduler.shutdown() + } + + /** recurring task used for keep alive and state updates */ + private def tick(): Unit = { + send(KeepAliveMessage()) + (state.avatar.guid, state.avatar.position) match { + case (Some(guid), Some(pos)) => + send( + PlayerStateMessageUpstream( + guid, + pos, + state.avatar.velocity, + state.avatar.yaw, + state.avatar.pitch, + state.avatar.yawUpper, + 0, + 0, + state.avatar.crouching, + state.avatar.jumping, + jump_thrust = false, + state.avatar.cloaked, + 0, + 0 + ) + ) + case _ => + log.warn("not ready, skipping PlayerStateMessageUpstream") + } + } + + /** Process next queued packet */ + def process(): (State, Option[PlanetSidePacket]) = { + if (inQueue.nonEmpty) { + val packet = inQueue.dequeue() + _process(packet) + (state, Some(packet)) + } else { + (state, None) + } + } + + /** Process next queued packet matching predicate */ + def processFirst(p: PlanetSidePacket => Boolean): (State, Option[PlanetSidePacket]) = { + if (inQueue.nonEmpty) { + val packet = inQueue.dequeueFirst(p) + if (packet.isDefined) { + _process(packet.get) + } + (state, packet) + } else { + (state, None) + } + } + + private def _process(packet: PlanetSidePacket): Unit = { + packet match { + case _: KeepAliveMessage => () + case _: LoadMapMessage => + log.info(s"process: ${packet}") + send(BeginZoningMessage()).require + _state = state.update(packet) + case packet: PlanetSideGamePacket => + _state = state.update(packet) + log.info(s"process: ${packet}") + () + case _ => () + } + } + + private def waitFor[T <: PlanetSidePacket: ClassTag]( cryptoState: CryptoPacketOpcode.Type = CryptoPacketOpcode.Ignore, timeout: FiniteDuration = 5.seconds ): Attempt[T] = { val time = System.currentTimeMillis() var res: Attempt[T] = Failure(Err("timeout")) while (res.isFailure && System.currentTimeMillis() - time < timeout.toMillis) { - receive(cryptoState) match { - case Successful((packet, sequence)) => - packet match { - case packet: T => res = Successful(packet) - case p => - println(s"receive: ${p}") - () - } - case Failure(cause) => ??? - + receive(cryptoState).foreach { + case Failure(cause) => + res = Failure(cause) + case _ => () + } + processFirst { + case packet if implicitly[ClassTag[T]].runtimeClass.isInstance(packet) => true + case _ => false + } match { + case (_, Some(packet: T)) => + res = Successful(packet) + case _ => () } } res @@ -137,6 +346,10 @@ class Client(username: String, password: String) { sequence: Option[Int], crypto: Option[CryptoCoding] ): Attempt[BitVector] = { + packet match { + case _: KeepAliveMessage => () + case _ => log.info(s"send: ${packet}") + } PacketCoding.marshalPacket(packet, sequence, crypto) match { case Successful(payload) => send(payload.toByteArray) @@ -147,24 +360,95 @@ class Client(username: String, password: String) { } private def send(payload: Array[Byte]): Unit = { - (host, ref) match { - case (Some(host), None) => - socket.send(new DatagramPacket(payload, payload.length, host)) - case (None, Some(ref)) => - // ref ! Udp.Received(ByteString(payload), new InetSocketAddress(socket.getInetAddress, socket.getPort)) - case _ => ; + (socket, ref) match { + case (Some(socket), _) => + socket.send(new DatagramPacket(payload, payload.length)) + case (_, Some(ref)) => + // ref ! Udp.Received(ByteString(payload), new InetSocketAddress(socket.getInetAddress, socket.getPort)) + ??? + case _ => ??? } } - private def receive( + def receive( cryptoState: CryptoPacketOpcode.Type = CryptoPacketOpcode.Ignore - ): Attempt[(PlanetSidePacket, Option[Int])] = { - try { - val p = new DatagramPacket(buffer, buffer.length) - socket.receive(p) - PacketCoding.unmarshalPacket(ByteVector.view(p.getData), crypto, cryptoState) - } catch { - case e: Throwable => Failure(Err(e.getMessage)) + ): Seq[Attempt[PlanetSidePacket]] = { + (socket, ref) match { + case (Some(socket), _) => + try { + val p = new DatagramPacket(buffer, buffer.length) + socket.receive(p) + val data = ByteVector.view(p.getData).drop(p.getOffset).take(p.getLength) + PacketCoding.unmarshalPacket(data, crypto, cryptoState) match { + case Successful((packet, sequence)) => + unwrapPacket(packet, sequence).map { + case Successful(packet) => + inQueue.enqueue(packet) + Successful(packet) + case Failure(cause) => + Failure(cause) + } + case Failure(cause) => + Seq(Failure(cause)) + } + + } catch { + case e: Throwable => Seq(Failure(Err(e.getMessage))) + } + case _ => ??? + } + } + + private def unwrapPacket(packet: PlanetSidePacket, sequence: Option[Int]): Seq[Attempt[PlanetSidePacket]] = { + packet match { + case SlottedMetaPacket(slot, _, data) if slot != 4 => + PacketCoding.decodePacket(data) match { + case Successful(packet) => unwrapPacket(packet, sequence) + case Failure(cause) => Seq(Failure(cause)) + } + // SMP4 should be split packet + case SlottedMetaPacket(slot, _, data) if slot == 4 => + PacketCoding.decodePacket(data) match { + case Successful(HandleGamePacket(_, _, _)) => + splitPackets += ((sequence.get, data)) + tryMergePackets() + Seq() + case Successful(packet) => unwrapPacket(packet, sequence) + case Failure(_) if sequence.isDefined => + splitPackets += ((sequence.get, data)) + tryMergePackets() + Seq() + case Failure(cause) => Seq(Failure(cause)) + } + case MultiPacketEx(data) => + data.flatMap { data => + PacketCoding.decodePacket(data) match { + case Successful(packet) => unwrapPacket(packet, sequence) + case Failure(cause) => Seq(Failure(cause)) + } + } + case p => Seq(Successful(p)) + } + } + + private def tryMergePackets(): Unit = { + splitPackets.foreach { + case (sequence, data) => + PacketCoding.decodePacket(data) match { + case Successful(HandleGamePacket(len, bytes, _)) => + val data = + ByteVector.view(bytes.toArray ++ splitPackets.filter(_._1 > sequence).sortBy(_._1).flatMap(_._2.toArray)) + if (data.length == len) { + PacketCoding.decodePacket(data) match { + case Successful(packet) => + inQueue.enqueue(packet) + // may silently remove old incomplete packets but there is no proper solution here + splitPackets.removeAll() + case Failure(cause) => ??? + } + } + case _ => () + } } } diff --git a/tools/client/src/main/scala/net/psforever/tools/client/State.scala b/tools/client/src/main/scala/net/psforever/tools/client/State.scala new file mode 100644 index 00000000..e5e632f3 --- /dev/null +++ b/tools/client/src/main/scala/net/psforever/tools/client/State.scala @@ -0,0 +1,98 @@ +package net.psforever.tools.client + +import enumeratum.{Enum, EnumEntry} +import net.psforever.packet.PlanetSidePacket +import net.psforever.packet.control.ServerStart +import net.psforever.packet.crypto.ServerFinished +import net.psforever.packet.game.{ + AvatarDeadStateMessage, + CharacterInfoMessage, + DeadState, + LoginRespMessage, + ObjectCreateDetailedMessage, + PlayerStateMessage, + SetCurrentAvatarMessage, + VNLWorldStatusMessage, + WorldInformation +} +import net.psforever.tools.client.State.{Avatar, Connection} +import net.psforever.types.{PlanetSideEmpire, PlanetSideGUID, Vector3} + +object State { + sealed trait Connection extends EnumEntry + object Connection extends Enum[Connection] { + case object Disconnected extends Connection + case object CryptoSetup extends Connection + case object Login extends Connection + case object WorldSelection extends Connection + case object AvatarSelection extends Connection + //case object AvatarCreation extends Connection + + val values: IndexedSeq[Connection] = findValues + } + + case class Avatar( + guid: Option[PlanetSideGUID] = None, + state: Option[DeadState.Value] = None, + position: Option[Vector3] = None, + faction: Option[PlanetSideEmpire.Value] = None, + crouching: Boolean = false, + velocity: Option[Vector3] = None, + yaw: Float = 0, + pitch: Float = 0, + yawUpper: Float = 0, + jumping: Boolean = false, + cloaked: Boolean = false + ) { + def update(packet: PlanetSidePacket): Avatar = { + packet match { + case SetCurrentAvatarMessage(guid, _, _) => this.copy(guid = Some(guid)) + case AvatarDeadStateMessage(state, _, _, pos, faction, _) => + this.copy( + state = Some(state), + position = Some(pos), + faction = Some(faction) + ) + // doesn't look like PlayerStateMessage is sent for own avatar + //case PlayerStateMessage(guid, pos, vel, yaw, pitch, yawUpper, _, crouching, jumping, _, cloaked) + // if this.guid.contains(guid) => + // this.copy( + // position = Some(pos), + // velocity = vel, + // crouching = Some(crouching), + // jumping = Some(jumping), + // cloaked = Some(cloaked), + // yaw = Some(yaw), + // pitch = Some(pitch), + // yawUpper = Some(yawUpper) + // ) + + case _ => this + } + } + + } +} + +case class State( + connection: Connection = Connection.Disconnected, + worlds: Seq[WorldInformation] = Seq(), + token: Option[String] = None, + objects: Seq[Integer] = Seq(), + characters: Seq[CharacterInfoMessage] = Seq(), + avatar: Avatar = Avatar() +) { + def update(packet: PlanetSidePacket): State = { + (packet match { + case ServerStart(_, _) => this.copy(connection = Connection.CryptoSetup) + case ServerFinished(_) => this.copy(connection = Connection.Login) + case LoginRespMessage(token, _, _, _, _, _, _) => this.copy(token = Some(token)) + case VNLWorldStatusMessage(_, worlds) => this.copy(worlds = worlds, connection = Connection.WorldSelection) + case ObjectCreateDetailedMessage(_, objectClass, guid, _, _) => this.copy(objects = objects ++ Seq(guid.guid)) + case message @ CharacterInfoMessage(_, _, _, _, _, _) => + this.copy(characters = characters ++ Seq(message), connection = Connection.AvatarSelection) + case _ => this + }).copy(avatar = avatar.update(packet)) + + } +}