From 33403c24dd166e0a593c6b63d5e6dd57cc69f9ee Mon Sep 17 00:00:00 2001 From: FateJH Date: Tue, 24 Oct 2017 20:47:33 -0400 Subject: [PATCH] resolving cherry-pick of commit --- .../psforever/objects/guid/TaskResolver.scala | 311 ++++++++++++------ .../net/psforever/objects/zones/Zone.scala | 3 +- pslogin/src/main/scala/PsLogin.scala | 2 + .../src/main/scala/WorldSessionActor.scala | 9 +- 4 files changed, 212 insertions(+), 113 deletions(-) diff --git a/common/src/main/scala/net/psforever/objects/guid/TaskResolver.scala b/common/src/main/scala/net/psforever/objects/guid/TaskResolver.scala index bea02e66..275416cc 100644 --- a/common/src/main/scala/net/psforever/objects/guid/TaskResolver.scala +++ b/common/src/main/scala/net/psforever/objects/guid/TaskResolver.scala @@ -13,34 +13,31 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.util.{Failure, Success} class TaskResolver() extends Actor { - /** list of all work currently managed by this TaskResolver */ + /** list of all work currently managed by this resolver */ private val tasks : ListBuffer[TaskResolver.TaskEntry] = new ListBuffer[TaskResolver.TaskEntry] - /** scheduled examination of all managed work */ + /** scheduled termination of all managed work */ private var timeoutCleanup : Cancellable = TaskResolver.DefaultCancellable - //private[this] val log = org.log4s.getLogger + /** logging utilities; default to tracing */ + private[this] val log = org.log4s.getLogger + private def trace(msg : String) = log.trace(msg) /** - * Deal with any tasks that are still enqueued with this expiring `TaskResolver`.
- *
- * First, eliminate all timed-out tasks. - * Secondly, deal with all tasks that have reported "success" but have not yet been handled. - * Finally, all other remaining tasks should be treated as if they had failed. + * Deal with any tasks that are still enqueued with this expiring `TaskResolver`. */ override def aroundPostStop() = { - super.aroundPostStop() - + /* + First, eliminate all timed-out tasks. + Secondly, deal with all tasks that have reported "success" but have not yet been handled. + Finally, all other remaining tasks should be treated as if they had failed. + */ timeoutCleanup.cancel() TimeoutCleanup() OnSuccess() val ex : Throwable = new Exception(s"a task is being stopped") - OnFailure(ex) - tasks.indices.foreach({index => - val entry = tasks(index) - PropagateAbort(index, ex) - if(entry.isASubtask) { - entry.supertaskRef ! Failure(ex) //alert our superior task's resolver we have completed - } + tasks.foreach(entry => { + OnFailure(entry.task, ex) }) + super.aroundPostStop() } def receive : Receive = { @@ -51,15 +48,21 @@ class TaskResolver() extends Actor { QueueSubtasks(aTask, subtasks) case TaskResolver.GiveSubtask(aTask, subtasks, resolver) => - QueueSubtasks(aTask, subtasks, true, resolver) + QueueSubtasks(aTask, subtasks, resolver) - case TaskResolver.CompletedSubtask() => - ExecuteNewTasks() + case TaskResolver.CompletedSubtask(obj) => //inter-resolver calls + ExecuteNewTasks(obj) - case Success(_) => //ignore the contents as unreliable + case Success(obj : Task) => //inter-resolver calls + OnSuccess(obj) + + case Success | Success(_) => //success redirected from called event OnSuccess() - case Failure(ex) => + case TaskResolver.Failure(obj, ex) => //inter-resolver calls + OnFailure(obj, ex) + + case Failure(ex) => //failure redirected from called event OnFailure(ex) case TaskResolver.AbortTask(task, ex) => @@ -68,7 +71,8 @@ class TaskResolver() extends Actor { case TaskResolver.TimeoutCleanup() => TimeoutCleanup() - case _ => ; + case msg => + log.warn(s"$self received an unexpected message $msg from $sender") } /** @@ -78,7 +82,8 @@ class TaskResolver() extends Actor { private def GiveTask(aTask : Task) : Unit = { val entry : TaskResolver.TaskEntry = TaskResolver.TaskEntry(aTask) tasks += entry - entry.Execute(self) //send this Actor; aesthetically pleasant expression + trace(s"enqueue and start task $aTask") + entry.Execute(self) StartTimeoutCheck() } @@ -101,16 +106,15 @@ class TaskResolver() extends Actor { * The parent of this `TaskResolver` is the router logic for all brethren `TaskResolver` `Actors`. * @param task the work to be completed * @param subtasks other work that needs to be completed first - * @param isSubTask `true`, if this task counts as internal or as a leaf in the chain of `Task` dependency; - * `false`, by default, if we are the top of the chain fo dependency * @param resolver the `TaskResolver` that distributed this work, thus determining that this work is a sub-task; * by default, no one, as the work is identified as a main task */ - private def QueueSubtasks(task : Task, subtasks : List[TaskResolver.GiveTask], isSubTask : Boolean = false, resolver : ActorRef = Actor.noSender) : Unit = { - val sublist : List[Task] = subtasks.map(task => task.task) - val entry : TaskResolver.TaskEntry = TaskResolver.TaskEntry(task, sublist, isSubTask, resolver) + private def QueueSubtasks(task : Task, subtasks : List[TaskResolver.GiveTask], resolver : ActorRef = Actor.noSender) : Unit = { + val entry : TaskResolver.TaskEntry = TaskResolver.TaskEntry(task, subtasks.map(task => task.task), resolver) tasks += entry - if(sublist.isEmpty) { //a leaf in terms of task dependency; so, not dependent on any other work + trace(s"enqueue task $task") + if(subtasks.isEmpty) { //a leaf in terms of task dependency; so, not dependent on any other work + trace(s"start task $task") entry.Execute(self) } else { @@ -123,65 +127,121 @@ class TaskResolver() extends Actor { /** * Perform these checks when a task has reported successful completion to this TaskResolver. - * Since the `Success(_)` can not be associated with a specific task, every task and subtask will be checked. + * Every task and subtask will be checked, starting from the end of the list of queued entries + * and only the first discovered one will be used. */ private def OnSuccess(): Unit = { - //by reversing the List, we can remove TaskEntries without disrupting the order - TaskResolver.filterCompletion(tasks.indices.reverseIterator, tasks.toList, Task.Resolution.Success).foreach({index => - val entry = tasks(index) - entry.task.onSuccess() - if(entry.isASubtask) { - entry.supertaskRef ! TaskResolver.CompletedSubtask() //alert our dependent task's resolver that we have completed - } - TaskCleanup(index) - }) + //by reversing the List, we find the most outstanding Task with the completion state + TaskResolver.filterCompletion(tasks.indices.reverseIterator, tasks.toList, Task.Resolution.Success) match { + case Some(index) => + GeneralOnSuccess(index) + case None => ; + } + } + + /** + * Perform these checks when a task has reported successful completion to this TaskResolver. + * @param task a `Task` object + */ + private def OnSuccess(task : Task): Unit = { + //find specific task and dequeue + TaskResolver.findTask(tasks.iterator, task) match { + case Some(index) => + GeneralOnSuccess(index) + case None => ; + } + } + + /** + * Perform these checks when a task has reported successful completion to this TaskResolver. + * This is what actually happens upon completion. + * @param index the `TaskEntry` index + */ + private def GeneralOnSuccess(index : Int) : Unit = { + val entry = tasks(index) + entry.task.onSuccess() + trace(s"success with this task ${entry.task}") + if(entry.supertaskRef != ActorRef.noSender) { + entry.supertaskRef ! TaskResolver.CompletedSubtask(entry.task) //alert our dependent task's resolver that we have completed + } + TaskCleanup(index) } /** * Scan across a group of sub-tasks and determine if the associated main `Task` may execute. * All of the sub-tasks must report a `Success` completion status before the main work can begin. + * @param subtask a `Task` that is a subtask of some parent task in this resolver's group */ - private def ExecuteNewTasks() : Unit = { - tasks.filter({taskEntry => taskEntry.subtasks.nonEmpty}).foreach(entry => { - if(TaskResolver.filterCompletionMatch(entry.subtasks.iterator, Task.Resolution.Success)) { - entry.Execute(self) - StartTimeoutCheck() - } - }) + private def ExecuteNewTasks(subtask : Task) : Unit = { + TaskResolver.findTaskWithSubtask(tasks.iterator, subtask) match { + case Some(index) => + val entry = tasks(index) + if(TaskResolver.filterCompletionMatch(entry.subtasks.iterator, Task.Resolution.Success)) { + trace(s"start new task ${entry.task}") + entry.Execute(self) + StartTimeoutCheck() + } + case None => ; + } } /** - * Perform these checks when a task has reported failure to this TaskResolver. - * Since the `Failure(Throwable)` can not be associated with a specific task, every task and subtask will be checked. + * Perform these checks when a task has reported failure to this `TaskResolver`. + * Since the `Failure(Throwable)` can not be associated with a specific task, + * every task and subtask will be checked, starting from the end of the list of queued entries, + * and only the first discovered one will be used. * Consequently, the specific `Throwable` that contains the error message may have nothing to do with the failed task. * @param ex a `Throwable` that reports what happened to the task */ private def OnFailure(ex : Throwable) : Unit = { - TaskResolver.filterCompletion(tasks.indices.reverseIterator, tasks.toList, Task.Resolution.Failure).foreach({index => - val entry = tasks(index) - PropagateAbort(index, ex) - entry.task.onFailure(ex) //TODO let the error be disjoint? - if(entry.isASubtask) { - entry.supertaskRef ! Failure(ex) //alert our superior task's resolver we have completed - } - }) - FaultSubtasks() + //by reversing the List, we find the most outstanding Task with the completion state + TaskResolver.filterCompletion(tasks.indices.reverseIterator, tasks.toList, Task.Resolution.Failure) match { + case Some(index) => + GeneralOnFailure(index, ex) + case None => ; + } } /** - * Scan across a group of sub-tasks and, if any have reported `Failure`, report to the main `Task` that it should fail as well. + * Perform these checks when a task has reported failure to this `TaskResolver`. + * @param subtask the task that had reported failure from some other resolver + * @param ex a `Throwable` that reports what happened to the task */ - private def FaultSubtasks() : Unit = { - tasks.indices.filter({index => tasks(index).subtasks.nonEmpty}).reverse.foreach(index => { - val entry = tasks(index) - if(TaskResolver.filterCompletionMatch(entry.subtasks.iterator, Task.Resolution.Failure)) { - val ex : Throwable = new Exception(s"a task ${entry.task} had a subtask that failed") - entry.task.onFailure(ex) - if(entry.isASubtask) { - entry.supertaskRef ! Failure(ex) //alert our superior task's resolver we have completed - } - TaskCleanup(index) - } + private def OnFailure(subtask : Task, ex : Throwable) : Unit = { + TaskResolver.findTaskWithSubtask(tasks.iterator, subtask) match { + case Some(index) => + GeneralOnFailure(index, ex) + case None => ; + } + } + + /** + * Perform these checks when a task has reported failure to this `TaskResolver`. + * This is what actually happens upon completion. + * @param index the `TaskEntry` index + * @param ex a `Throwable` that reports what happened to the task + */ + private def GeneralOnFailure(index : Int, ex : Throwable) : Unit = { + val entry = tasks(index) + val task = entry.task + trace(s"failure with this task $task") + task.onAbort(ex) + task.onFailure(ex) + if(entry.supertaskRef != ActorRef.noSender) { + entry.supertaskRef ! TaskResolver.Failure(task, ex) //alert our superior task's resolver we have completed + } + FaultSubtasks(entry) + TaskCleanup(index) + } + + /** + * Instruct all subtasks of a given `Task` to fail. + * @param entry the target parent entry (that has failed) + */ + private def FaultSubtasks(entry : TaskResolver.TaskEntry) : Unit = { + val ex : Throwable = new Exception(s"a task ${entry.task} had a subtask that failed") + entry.subtasks.foreach(subtask => { + context.parent ! Broadcast(TaskResolver.Failure(subtask, ex)) //we have no clue where this subtask was hosted }) } @@ -191,9 +251,10 @@ class TaskResolver() extends Actor { * @param ex a `Throwable` that reports what happened to the work */ private def OnAbort(task : Task, ex : Throwable) : Unit = { - TaskResolver.findTaskIndex(tasks.iterator, task) match { + TaskResolver.findTask(tasks.iterator, task) match { case Some(index) => PropagateAbort(index, ex) + TaskCleanup(index) case None => ; } } @@ -206,11 +267,11 @@ class TaskResolver() extends Actor { private def PropagateAbort(index : Int, ex : Throwable) : Unit = { tasks(index).subtasks.foreach({subtask => if(subtask.isComplete == Task.Resolution.Success) { + trace(s"aborting task $subtask") subtask.onAbort(ex) } context.parent ! Broadcast(TaskResolver.AbortTask(subtask, ex)) }) - TaskCleanup(index) } /** @@ -260,9 +321,17 @@ object TaskResolver { private final case class TimeoutCleanup() /** - * + * A specific kind of `Failure` that reports on which specific `Task` has reported failure. + * @param obj a task object + * @param ex information about what went wrong */ - private final case class CompletedSubtask() + private final case class Failure(obj : Task, ex : Throwable) + + /** + * A specific kind of `Success` that reports on which specific `Task` has reported Success where that `Task` was some other `Task`'s subtask. + * @param obj a task object + */ + private final case class CompletedSubtask(obj : Task) /** * A `Broadcast` message designed to find and remove a particular task from this series of routed `Actors`. @@ -275,10 +344,10 @@ object TaskResolver { * Storage unit for a specific unit of work, plus extra information. * @param task the work to be completed * @param subtasks other work that needs to be completed first - * @param isASubtask whether this work is intermediary or the last in a dependency chain + * //@param isASubtask whether this work is intermediary or the last in a dependency chain * @param supertaskRef the `TaskResolver` that will handle work that depends on the outcome of this work */ - private final case class TaskEntry(task : Task, subtasks : List[Task] = Nil, isASubtask : Boolean = false, supertaskRef : ActorRef = Actor.noSender) { + private final case class TaskEntry(task : Task, subtasks : List[Task] = Nil, supertaskRef : ActorRef = Actor.noSender) { private var start : Long = 0L private var isExecuting : Boolean = false @@ -286,10 +355,14 @@ object TaskResolver { def Executing : Boolean = isExecuting + /** + * Only execute each task once. + * @param ref the `TaskResolver` currently handling this `Task`/`TaskEntry` + */ def Execute(ref : ActorRef) : Unit = { if(!isExecuting) { - start = Task.TimeNow isExecuting = true + start = Task.TimeNow task.Execute(ref) } } @@ -304,45 +377,22 @@ object TaskResolver { } /** - * Find the index of the targeted `Task`, if it is enqueued here. - * @param iter an `Iterator` of - * @param task a target `Task` - * @param index the current index in the aforementioned `List`; - * defaults to 0 - * @return the index of the discovered task, or `None` + * Scan across a group of tasks to determine which ones match the target completion status. + * @param iter an `Iterator` of enqueued `TaskEntry` indices + * @param resolution the target completion status + * @return the first valid index when `TaskEntry` has its primary `Task` matching the completion status */ - @tailrec private def findTaskIndex(iter : Iterator[TaskResolver.TaskEntry], task : Task, index : Int = 0) : Option[Int] = { + @tailrec private def filterCompletion(iter : Iterator[Int], tasks : List[TaskEntry], resolution : Task.Resolution.Value) : Option[Int] = { if(!iter.hasNext) { None } else { - if(iter.next.task == task) { + val index : Int = iter.next + if(tasks(index).task.isComplete == resolution) { Some(index) } else { - findTaskIndex(iter, task, index + 1) - } - } - } - - /** - * Scan across a group of tasks to determine which ones match the target completion status. - * @param iter an `Iterator` of enqueued `TaskEntry` indices - * @param resolution the target completion status - * @param indexList a persistent `List` of indices - * @return the `List` of all valid `Task` indices - */ - @tailrec private def filterCompletion(iter : Iterator[Int], tasks : List[TaskEntry], resolution : Task.Resolution.Value, indexList : List[Int] = Nil) : List[Int] = { - if(!iter.hasNext) { - indexList - } - else { - val index : Int = iter.next - if(tasks(index).task.isComplete == resolution) { - filterCompletion(iter, tasks, resolution, indexList :+ index) - } - else { - filterCompletion(iter, tasks, resolution, indexList) + filterCompletion(iter, tasks, resolution) } } } @@ -390,4 +440,49 @@ object TaskResolver { } } } + + /** + * Find the index of the targeted `Task`, if it is enqueued here. + * @param iter an `Iterator` of entries + * @param target a target `Task` + * @param index the current index in the aforementioned `List`; + * defaults to 0 + * @return the index of the discovered task, or `None` + */ + @tailrec private def findTask(iter : Iterator[TaskEntry], target : Task, index : Int = 0) : Option[Int] = { + if(!iter.hasNext) { + None + } + else { + if(iter.next.task == target) { + Some(index) + } + else { + findTask(iter, target, index + 1) + } + } + } + + /** + * Find the index of the targeted `Task`, if it is enqueued here, given a specific "subtask" of that `Task`. + * @param iter an `Iterator` of entries + * @param target a target subtask + * @param index the current index in the aforementioned `List`; + * defaults to 0 + * @return the index of the discovered task, or `None` + */ + @tailrec private def findTaskWithSubtask(iter : Iterator[TaskEntry], target : Task, index : Int = 0) : Option[Int] = { + if(!iter.hasNext) { + None + } + else { + val tEntry = iter.next + if(tEntry.subtasks.contains(target)) { + Some(index) + } + else { + findTaskWithSubtask(iter, target, index + 1) + } + } + } } diff --git a/common/src/main/scala/net/psforever/objects/zones/Zone.scala b/common/src/main/scala/net/psforever/objects/zones/Zone.scala index 68415c50..94c30d49 100644 --- a/common/src/main/scala/net/psforever/objects/zones/Zone.scala +++ b/common/src/main/scala/net/psforever/objects/zones/Zone.scala @@ -2,6 +2,7 @@ package net.psforever.objects.zones import akka.actor.{ActorContext, ActorRef, Props} +import akka.routing.RandomPool import net.psforever.objects.serverobject.doors.Base import net.psforever.objects.{PlanetSideGameObject, Player} import net.psforever.objects.equipment.Equipment @@ -66,7 +67,7 @@ class Zone(private val zoneId : String, zoneMap : ZoneMap, zoneNumber : Int) { def Init(implicit context : ActorContext) : Unit = { if(accessor == ActorRef.noSender) { implicit val guid : NumberPoolHub = this.guid //passed into builderObject.Build implicitly - accessor = context.actorOf(Props(classOf[UniqueNumberSystem], guid, UniqueNumberSystem.AllocateNumberPoolActors(guid)), s"$Id-uns") + accessor = context.actorOf(RandomPool(25).props(Props(classOf[UniqueNumberSystem], guid, UniqueNumberSystem.AllocateNumberPoolActors(guid))), s"$Id-uns") ground = context.actorOf(Props(classOf[ZoneGroundActor], equipmentOnGround), s"$Id-ground") Map.LocalObjects.foreach({ builderObject => diff --git a/pslogin/src/main/scala/PsLogin.scala b/pslogin/src/main/scala/PsLogin.scala index 6e172cbe..e3e4bc1b 100644 --- a/pslogin/src/main/scala/PsLogin.scala +++ b/pslogin/src/main/scala/PsLogin.scala @@ -229,6 +229,8 @@ object PsLogin { LocalObject(DoorObjectBuilder(door, 330)) LocalObject(DoorObjectBuilder(door, 332)) + LocalObject(DoorObjectBuilder(door, 370)) + LocalObject(DoorObjectBuilder(door, 371)) LocalObject(DoorObjectBuilder(door, 372)) LocalObject(DoorObjectBuilder(door, 373)) LocalObject(IFFLockObjectBuilder(lock_external, 556)) diff --git a/pslogin/src/main/scala/WorldSessionActor.scala b/pslogin/src/main/scala/WorldSessionActor.scala index c202a572..1c9cc53d 100644 --- a/pslogin/src/main/scala/WorldSessionActor.scala +++ b/pslogin/src/main/scala/WorldSessionActor.scala @@ -586,7 +586,7 @@ class WorldSessionActor extends Actor with MDCContextAware { avatarService ! AvatarServiceMessage(tplayer.Continent, AvatarAction.EquipmentInHand(player_guid, slot, item)) } case None => - continent.Actor ! Zone.DropItemOnGround(item, item.Position, item.Orientation) //restore + continent.Ground ! Zone.DropItemOnGround(item, item.Position, item.Orientation) //restore } case ItemHacking(tplayer, target, tool_guid, delta, completeAction, tickAction) => @@ -1390,7 +1390,7 @@ class WorldSessionActor extends Actor with MDCContextAware { def Execute(resolver : ActorRef) : Unit = { localTarget.Slot(localIndex).Equipment = localObject - resolver ! scala.util.Success(localObject) + resolver ! scala.util.Success(this) } override def onSuccess() : Unit = { @@ -1442,8 +1442,9 @@ class WorldSessionActor extends Actor with MDCContextAware { } def Execute(resolver : ActorRef) : Unit = { + log.info(s"Player $localPlayer is registered") + resolver ! scala.util.Success(this) localAnnounce ! PlayerLoaded(localPlayer) //alerts WSA - resolver ! scala.util.Success(localPlayer) } override def onFailure(ex : Throwable) : Unit = { @@ -1536,7 +1537,7 @@ class WorldSessionActor extends Actor with MDCContextAware { def Execute(resolver : ActorRef) : Unit = { localTarget.Slot(localIndex).Equipment = None - resolver ! scala.util.Success(localObject) + resolver ! scala.util.Success(this) } override def onSuccess() : Unit = {