mirror of
https://github.com/2revoemag/PSF-BotServer.git
synced 2026-01-19 18:14:44 +00:00
Create UDP network simulator to create WAN environments
This commit is contained in:
parent
ae6687c38f
commit
05a0239e22
|
|
@ -173,11 +173,23 @@ object PsLogin {
|
|||
val loginServerPort = 51000
|
||||
val worldServerPort = 51001
|
||||
|
||||
|
||||
// Uncomment for network simulation
|
||||
// TODO: make this config or command flag
|
||||
/*
|
||||
val netParams = NetworkSimulatorParameters(
|
||||
packetLoss = 0.02,
|
||||
packetDelay = 500,
|
||||
packetReorderingChance = 0.005,
|
||||
packetReorderingTime = 400
|
||||
)
|
||||
*/
|
||||
|
||||
/** Create two actors for handling the login and world server endpoints */
|
||||
val listener = system.actorOf(Props(new UdpListener(Props(new SessionRouter("Login", loginTemplate)), "login-session-router",
|
||||
LoginConfig.serverIpAddress, loginServerPort)), "login-udp-endpoint")
|
||||
LoginConfig.serverIpAddress, loginServerPort, None)), "login-udp-endpoint")
|
||||
val worldListener = system.actorOf(Props(new UdpListener(Props(new SessionRouter("World", worldTemplate)), "world-session-router",
|
||||
LoginConfig.serverIpAddress, worldServerPort)), "world-udp-endpoint")
|
||||
LoginConfig.serverIpAddress, worldServerPort, None)), "world-udp-endpoint")
|
||||
|
||||
logger.info(s"NOTE: Set client.ini to point to ${LoginConfig.serverIpAddress.getHostAddress}:$loginServerPort")
|
||||
|
||||
|
|
|
|||
|
|
@ -13,7 +13,11 @@ final case class SendPacket(msg : ByteVector, to : InetSocketAddress)
|
|||
final case class Hello()
|
||||
final case class HelloFriend(sessionId : Long, next: ActorRef)
|
||||
|
||||
class UdpListener(nextActorProps : Props, nextActorName : String, address : InetAddress, port : Int) extends Actor {
|
||||
class UdpListener(nextActorProps : Props,
|
||||
nextActorName : String,
|
||||
listenAddress : InetAddress,
|
||||
port : Int,
|
||||
netParams : Option[NetworkSimulatorParameters]) extends Actor {
|
||||
private val log = org.log4s.getLogger(self.path.name)
|
||||
|
||||
override def supervisorStrategy = OneForOneStrategy() {
|
||||
|
|
@ -21,7 +25,16 @@ class UdpListener(nextActorProps : Props, nextActorName : String, address : Inet
|
|||
}
|
||||
|
||||
import context.system
|
||||
IO(Udp) ! Udp.Bind(self, new InetSocketAddress(address, port))
|
||||
|
||||
// If we have network parameters, start the network simulator
|
||||
if(netParams.isDefined) {
|
||||
// See http://www.cakesolutions.net/teamblogs/understanding-akkas-recommended-practice-for-actor-creation-in-scala
|
||||
// For why we cant do Props(new Actor) here
|
||||
val sim = context.actorOf(Props(classOf[UdpNetworkSimulator], self, netParams.get))
|
||||
IO(Udp).tell(Udp.Bind(sim, new InetSocketAddress(listenAddress, port)), sim)
|
||||
} else {
|
||||
IO(Udp) ! Udp.Bind(self, new InetSocketAddress(listenAddress, port))
|
||||
}
|
||||
|
||||
var bytesRecevied = 0L
|
||||
var bytesSent = 0L
|
||||
|
|
@ -32,7 +45,6 @@ class UdpListener(nextActorProps : Props, nextActorName : String, address : Inet
|
|||
log.info(s"Now listening on UDP:$local")
|
||||
|
||||
createNextActor()
|
||||
|
||||
context.become(ready(sender()))
|
||||
case Udp.CommandFailed(Udp.Bind(_, address, _)) =>
|
||||
log.error("Failed to bind to the network interface: " + address)
|
||||
|
|
|
|||
134
pslogin/src/main/scala/UdpNetworkSimulator.scala
Normal file
134
pslogin/src/main/scala/UdpNetworkSimulator.scala
Normal file
|
|
@ -0,0 +1,134 @@
|
|||
// Copyright (c) 2016 PSForever.net to present
|
||||
import akka.actor.{Actor, ActorRef}
|
||||
import akka.io._
|
||||
import akka.util.ByteString
|
||||
|
||||
import scala.util.Random
|
||||
import scala.collection.mutable
|
||||
import scala.concurrent.duration._
|
||||
|
||||
/** Parameters for the Network simulator
|
||||
*
|
||||
* @param packetLoss The percentage from [0.0, 1.0] that a packet will be lost
|
||||
* @param packetDelay The end-to-end delay (ping) of all packets
|
||||
* @param packetReorderingChance The percentage from [0.0, 1.0] that a packet will be reordered
|
||||
* @param packetReorderingTime The absolute adjustment in milliseconds that a packet can have (either
|
||||
* forward or backwards in time)
|
||||
*/
|
||||
case class NetworkSimulatorParameters(packetLoss : Double,
|
||||
packetDelay : Int,
|
||||
packetReorderingChance : Double,
|
||||
packetReorderingTime : Int) {
|
||||
assert(packetLoss >= 0.0 && packetLoss <= 1.0)
|
||||
assert(packetDelay >= 0)
|
||||
assert(packetReorderingChance >= 0.0 && packetReorderingChance <= 1.0)
|
||||
assert(packetReorderingTime >= 0)
|
||||
}
|
||||
|
||||
|
||||
class UdpNetworkSimulator(server : ActorRef, params : NetworkSimulatorParameters) extends Actor {
|
||||
private val log = org.log4s.getLogger
|
||||
|
||||
import scala.concurrent.ExecutionContext.Implicits.global
|
||||
|
||||
//******* Internal messages
|
||||
private final case class ProcessInputQueue()
|
||||
private final case class ProcessOutputQueue()
|
||||
|
||||
//******* Variables
|
||||
val packetDelayDuration = (params.packetDelay/2).milliseconds
|
||||
|
||||
type QueueItem = (Udp.Message, Long)
|
||||
|
||||
// sort in ascending order (older things get dequeued first)
|
||||
implicit val QueueItem = Ordering.by[QueueItem, Long](_._2).reverse
|
||||
|
||||
val inPacketQueue = mutable.PriorityQueue[QueueItem]()
|
||||
val outPacketQueue = mutable.PriorityQueue[QueueItem]()
|
||||
|
||||
val chaos = new Random()
|
||||
var interface = ActorRef.noSender
|
||||
|
||||
def receive = {
|
||||
case ProcessInputQueue() =>
|
||||
val time = System.nanoTime()
|
||||
var exit = false
|
||||
|
||||
while(inPacketQueue.nonEmpty && !exit) {
|
||||
val lastTime = time - inPacketQueue.head._2
|
||||
|
||||
// this packet needs to be sent within 20 milliseconds or more
|
||||
if (lastTime >= 20000000) {
|
||||
server.tell(inPacketQueue.dequeue._1, interface)
|
||||
} else {
|
||||
schedule(lastTime.nanoseconds, outbound = false)
|
||||
exit = true
|
||||
}
|
||||
}
|
||||
case ProcessOutputQueue() =>
|
||||
val time = System.nanoTime()
|
||||
var exit = false
|
||||
|
||||
while(outPacketQueue.nonEmpty && !exit) {
|
||||
val lastTime = time - outPacketQueue.head._2
|
||||
|
||||
// this packet needs to be sent within 20 milliseconds or more
|
||||
if (lastTime >= 20000000) {
|
||||
interface.tell(outPacketQueue.dequeue._1, server)
|
||||
} else {
|
||||
schedule(lastTime.nanoseconds, outbound = true)
|
||||
exit = true
|
||||
}
|
||||
}
|
||||
// outbound messages
|
||||
case msg @ Udp.Send(payload, target, _) =>
|
||||
handlePacket(msg, outPacketQueue, outbound = true)
|
||||
// inbound messages
|
||||
case msg @ Udp.Received(payload, sender) =>
|
||||
handlePacket(msg, inPacketQueue, outbound = false)
|
||||
case msg @ Udp.Bound(address) =>
|
||||
interface = sender()
|
||||
log.info(s"Hooked ${server.path} for network simulation")
|
||||
server.tell(msg, self) // make sure the server sends *us* the packets
|
||||
case default =>
|
||||
val from = sender()
|
||||
|
||||
if(from == server)
|
||||
interface.tell(default, server)
|
||||
else if(from == interface)
|
||||
server.tell(default, interface)
|
||||
else
|
||||
log.error("Unexpected sending Actor " + from.path)
|
||||
}
|
||||
|
||||
def handlePacket(message : Udp.Message, queue : mutable.PriorityQueue[QueueItem], outbound : Boolean) = {
|
||||
val name : String = if(outbound) "OUT" else "IN"
|
||||
val queue : mutable.PriorityQueue[QueueItem] = if(outbound) outPacketQueue else inPacketQueue
|
||||
|
||||
if(chaos.nextDouble() > params.packetLoss) {
|
||||
// if the message queue is empty, then we need to reschedule our task
|
||||
if(queue.isEmpty)
|
||||
schedule(packetDelayDuration, outbound)
|
||||
|
||||
// perform a reordering
|
||||
if(chaos.nextDouble() <= params.packetReorderingChance) {
|
||||
// creates the range (-1.0, 1.0)
|
||||
// time adjustment to move the packet (forward or backwards in time)
|
||||
val adj = (2*(chaos.nextDouble()-0.5)*params.packetReorderingTime).toLong
|
||||
queue += ((message, System.nanoTime() + adj*1000000))
|
||||
|
||||
log.debug(s"Reordered $name by ${adj}ms - $message")
|
||||
} else { // normal message
|
||||
queue += ((message, System.nanoTime()))
|
||||
}
|
||||
} else {
|
||||
log.debug(s"Dropped $name - $message")
|
||||
}
|
||||
}
|
||||
|
||||
def schedule(duration : FiniteDuration, outbound : Boolean) = context.system.scheduler.scheduleOnce(
|
||||
packetDelayDuration,
|
||||
self,
|
||||
if(outbound) ProcessOutputQueue() else ProcessInputQueue()
|
||||
)
|
||||
}
|
||||
Loading…
Reference in a new issue