diff --git a/common/src/main/scala/net/psforever/objects/guid/TaskResolver.scala b/common/src/main/scala/net/psforever/objects/guid/TaskResolver.scala
index bea02e66..275416cc 100644
--- a/common/src/main/scala/net/psforever/objects/guid/TaskResolver.scala
+++ b/common/src/main/scala/net/psforever/objects/guid/TaskResolver.scala
@@ -13,34 +13,31 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
class TaskResolver() extends Actor {
- /** list of all work currently managed by this TaskResolver */
+ /** list of all work currently managed by this resolver */
private val tasks : ListBuffer[TaskResolver.TaskEntry] = new ListBuffer[TaskResolver.TaskEntry]
- /** scheduled examination of all managed work */
+ /** scheduled termination of all managed work */
private var timeoutCleanup : Cancellable = TaskResolver.DefaultCancellable
- //private[this] val log = org.log4s.getLogger
+ /** logging utilities; default to tracing */
+ private[this] val log = org.log4s.getLogger
+ private def trace(msg : String) = log.trace(msg)
/**
- * Deal with any tasks that are still enqueued with this expiring `TaskResolver`.
- *
- * First, eliminate all timed-out tasks.
- * Secondly, deal with all tasks that have reported "success" but have not yet been handled.
- * Finally, all other remaining tasks should be treated as if they had failed.
+ * Deal with any tasks that are still enqueued with this expiring `TaskResolver`.
*/
override def aroundPostStop() = {
- super.aroundPostStop()
-
+ /*
+ First, eliminate all timed-out tasks.
+ Secondly, deal with all tasks that have reported "success" but have not yet been handled.
+ Finally, all other remaining tasks should be treated as if they had failed.
+ */
timeoutCleanup.cancel()
TimeoutCleanup()
OnSuccess()
val ex : Throwable = new Exception(s"a task is being stopped")
- OnFailure(ex)
- tasks.indices.foreach({index =>
- val entry = tasks(index)
- PropagateAbort(index, ex)
- if(entry.isASubtask) {
- entry.supertaskRef ! Failure(ex) //alert our superior task's resolver we have completed
- }
+ tasks.foreach(entry => {
+ OnFailure(entry.task, ex)
})
+ super.aroundPostStop()
}
def receive : Receive = {
@@ -51,15 +48,21 @@ class TaskResolver() extends Actor {
QueueSubtasks(aTask, subtasks)
case TaskResolver.GiveSubtask(aTask, subtasks, resolver) =>
- QueueSubtasks(aTask, subtasks, true, resolver)
+ QueueSubtasks(aTask, subtasks, resolver)
- case TaskResolver.CompletedSubtask() =>
- ExecuteNewTasks()
+ case TaskResolver.CompletedSubtask(obj) => //inter-resolver calls
+ ExecuteNewTasks(obj)
- case Success(_) => //ignore the contents as unreliable
+ case Success(obj : Task) => //inter-resolver calls
+ OnSuccess(obj)
+
+ case Success | Success(_) => //success redirected from called event
OnSuccess()
- case Failure(ex) =>
+ case TaskResolver.Failure(obj, ex) => //inter-resolver calls
+ OnFailure(obj, ex)
+
+ case Failure(ex) => //failure redirected from called event
OnFailure(ex)
case TaskResolver.AbortTask(task, ex) =>
@@ -68,7 +71,8 @@ class TaskResolver() extends Actor {
case TaskResolver.TimeoutCleanup() =>
TimeoutCleanup()
- case _ => ;
+ case msg =>
+ log.warn(s"$self received an unexpected message $msg from $sender")
}
/**
@@ -78,7 +82,8 @@ class TaskResolver() extends Actor {
private def GiveTask(aTask : Task) : Unit = {
val entry : TaskResolver.TaskEntry = TaskResolver.TaskEntry(aTask)
tasks += entry
- entry.Execute(self) //send this Actor; aesthetically pleasant expression
+ trace(s"enqueue and start task $aTask")
+ entry.Execute(self)
StartTimeoutCheck()
}
@@ -101,16 +106,15 @@ class TaskResolver() extends Actor {
* The parent of this `TaskResolver` is the router logic for all brethren `TaskResolver` `Actors`.
* @param task the work to be completed
* @param subtasks other work that needs to be completed first
- * @param isSubTask `true`, if this task counts as internal or as a leaf in the chain of `Task` dependency;
- * `false`, by default, if we are the top of the chain fo dependency
* @param resolver the `TaskResolver` that distributed this work, thus determining that this work is a sub-task;
* by default, no one, as the work is identified as a main task
*/
- private def QueueSubtasks(task : Task, subtasks : List[TaskResolver.GiveTask], isSubTask : Boolean = false, resolver : ActorRef = Actor.noSender) : Unit = {
- val sublist : List[Task] = subtasks.map(task => task.task)
- val entry : TaskResolver.TaskEntry = TaskResolver.TaskEntry(task, sublist, isSubTask, resolver)
+ private def QueueSubtasks(task : Task, subtasks : List[TaskResolver.GiveTask], resolver : ActorRef = Actor.noSender) : Unit = {
+ val entry : TaskResolver.TaskEntry = TaskResolver.TaskEntry(task, subtasks.map(task => task.task), resolver)
tasks += entry
- if(sublist.isEmpty) { //a leaf in terms of task dependency; so, not dependent on any other work
+ trace(s"enqueue task $task")
+ if(subtasks.isEmpty) { //a leaf in terms of task dependency; so, not dependent on any other work
+ trace(s"start task $task")
entry.Execute(self)
}
else {
@@ -123,65 +127,121 @@ class TaskResolver() extends Actor {
/**
* Perform these checks when a task has reported successful completion to this TaskResolver.
- * Since the `Success(_)` can not be associated with a specific task, every task and subtask will be checked.
+ * Every task and subtask will be checked, starting from the end of the list of queued entries
+ * and only the first discovered one will be used.
*/
private def OnSuccess(): Unit = {
- //by reversing the List, we can remove TaskEntries without disrupting the order
- TaskResolver.filterCompletion(tasks.indices.reverseIterator, tasks.toList, Task.Resolution.Success).foreach({index =>
- val entry = tasks(index)
- entry.task.onSuccess()
- if(entry.isASubtask) {
- entry.supertaskRef ! TaskResolver.CompletedSubtask() //alert our dependent task's resolver that we have completed
- }
- TaskCleanup(index)
- })
+ //by reversing the List, we find the most outstanding Task with the completion state
+ TaskResolver.filterCompletion(tasks.indices.reverseIterator, tasks.toList, Task.Resolution.Success) match {
+ case Some(index) =>
+ GeneralOnSuccess(index)
+ case None => ;
+ }
+ }
+
+ /**
+ * Perform these checks when a task has reported successful completion to this TaskResolver.
+ * @param task a `Task` object
+ */
+ private def OnSuccess(task : Task): Unit = {
+ //find specific task and dequeue
+ TaskResolver.findTask(tasks.iterator, task) match {
+ case Some(index) =>
+ GeneralOnSuccess(index)
+ case None => ;
+ }
+ }
+
+ /**
+ * Perform these checks when a task has reported successful completion to this TaskResolver.
+ * This is what actually happens upon completion.
+ * @param index the `TaskEntry` index
+ */
+ private def GeneralOnSuccess(index : Int) : Unit = {
+ val entry = tasks(index)
+ entry.task.onSuccess()
+ trace(s"success with this task ${entry.task}")
+ if(entry.supertaskRef != ActorRef.noSender) {
+ entry.supertaskRef ! TaskResolver.CompletedSubtask(entry.task) //alert our dependent task's resolver that we have completed
+ }
+ TaskCleanup(index)
}
/**
* Scan across a group of sub-tasks and determine if the associated main `Task` may execute.
* All of the sub-tasks must report a `Success` completion status before the main work can begin.
+ * @param subtask a `Task` that is a subtask of some parent task in this resolver's group
*/
- private def ExecuteNewTasks() : Unit = {
- tasks.filter({taskEntry => taskEntry.subtasks.nonEmpty}).foreach(entry => {
- if(TaskResolver.filterCompletionMatch(entry.subtasks.iterator, Task.Resolution.Success)) {
- entry.Execute(self)
- StartTimeoutCheck()
- }
- })
+ private def ExecuteNewTasks(subtask : Task) : Unit = {
+ TaskResolver.findTaskWithSubtask(tasks.iterator, subtask) match {
+ case Some(index) =>
+ val entry = tasks(index)
+ if(TaskResolver.filterCompletionMatch(entry.subtasks.iterator, Task.Resolution.Success)) {
+ trace(s"start new task ${entry.task}")
+ entry.Execute(self)
+ StartTimeoutCheck()
+ }
+ case None => ;
+ }
}
/**
- * Perform these checks when a task has reported failure to this TaskResolver.
- * Since the `Failure(Throwable)` can not be associated with a specific task, every task and subtask will be checked.
+ * Perform these checks when a task has reported failure to this `TaskResolver`.
+ * Since the `Failure(Throwable)` can not be associated with a specific task,
+ * every task and subtask will be checked, starting from the end of the list of queued entries,
+ * and only the first discovered one will be used.
* Consequently, the specific `Throwable` that contains the error message may have nothing to do with the failed task.
* @param ex a `Throwable` that reports what happened to the task
*/
private def OnFailure(ex : Throwable) : Unit = {
- TaskResolver.filterCompletion(tasks.indices.reverseIterator, tasks.toList, Task.Resolution.Failure).foreach({index =>
- val entry = tasks(index)
- PropagateAbort(index, ex)
- entry.task.onFailure(ex) //TODO let the error be disjoint?
- if(entry.isASubtask) {
- entry.supertaskRef ! Failure(ex) //alert our superior task's resolver we have completed
- }
- })
- FaultSubtasks()
+ //by reversing the List, we find the most outstanding Task with the completion state
+ TaskResolver.filterCompletion(tasks.indices.reverseIterator, tasks.toList, Task.Resolution.Failure) match {
+ case Some(index) =>
+ GeneralOnFailure(index, ex)
+ case None => ;
+ }
}
/**
- * Scan across a group of sub-tasks and, if any have reported `Failure`, report to the main `Task` that it should fail as well.
+ * Perform these checks when a task has reported failure to this `TaskResolver`.
+ * @param subtask the task that had reported failure from some other resolver
+ * @param ex a `Throwable` that reports what happened to the task
*/
- private def FaultSubtasks() : Unit = {
- tasks.indices.filter({index => tasks(index).subtasks.nonEmpty}).reverse.foreach(index => {
- val entry = tasks(index)
- if(TaskResolver.filterCompletionMatch(entry.subtasks.iterator, Task.Resolution.Failure)) {
- val ex : Throwable = new Exception(s"a task ${entry.task} had a subtask that failed")
- entry.task.onFailure(ex)
- if(entry.isASubtask) {
- entry.supertaskRef ! Failure(ex) //alert our superior task's resolver we have completed
- }
- TaskCleanup(index)
- }
+ private def OnFailure(subtask : Task, ex : Throwable) : Unit = {
+ TaskResolver.findTaskWithSubtask(tasks.iterator, subtask) match {
+ case Some(index) =>
+ GeneralOnFailure(index, ex)
+ case None => ;
+ }
+ }
+
+ /**
+ * Perform these checks when a task has reported failure to this `TaskResolver`.
+ * This is what actually happens upon completion.
+ * @param index the `TaskEntry` index
+ * @param ex a `Throwable` that reports what happened to the task
+ */
+ private def GeneralOnFailure(index : Int, ex : Throwable) : Unit = {
+ val entry = tasks(index)
+ val task = entry.task
+ trace(s"failure with this task $task")
+ task.onAbort(ex)
+ task.onFailure(ex)
+ if(entry.supertaskRef != ActorRef.noSender) {
+ entry.supertaskRef ! TaskResolver.Failure(task, ex) //alert our superior task's resolver we have completed
+ }
+ FaultSubtasks(entry)
+ TaskCleanup(index)
+ }
+
+ /**
+ * Instruct all subtasks of a given `Task` to fail.
+ * @param entry the target parent entry (that has failed)
+ */
+ private def FaultSubtasks(entry : TaskResolver.TaskEntry) : Unit = {
+ val ex : Throwable = new Exception(s"a task ${entry.task} had a subtask that failed")
+ entry.subtasks.foreach(subtask => {
+ context.parent ! Broadcast(TaskResolver.Failure(subtask, ex)) //we have no clue where this subtask was hosted
})
}
@@ -191,9 +251,10 @@ class TaskResolver() extends Actor {
* @param ex a `Throwable` that reports what happened to the work
*/
private def OnAbort(task : Task, ex : Throwable) : Unit = {
- TaskResolver.findTaskIndex(tasks.iterator, task) match {
+ TaskResolver.findTask(tasks.iterator, task) match {
case Some(index) =>
PropagateAbort(index, ex)
+ TaskCleanup(index)
case None => ;
}
}
@@ -206,11 +267,11 @@ class TaskResolver() extends Actor {
private def PropagateAbort(index : Int, ex : Throwable) : Unit = {
tasks(index).subtasks.foreach({subtask =>
if(subtask.isComplete == Task.Resolution.Success) {
+ trace(s"aborting task $subtask")
subtask.onAbort(ex)
}
context.parent ! Broadcast(TaskResolver.AbortTask(subtask, ex))
})
- TaskCleanup(index)
}
/**
@@ -260,9 +321,17 @@ object TaskResolver {
private final case class TimeoutCleanup()
/**
- *
+ * A specific kind of `Failure` that reports on which specific `Task` has reported failure.
+ * @param obj a task object
+ * @param ex information about what went wrong
*/
- private final case class CompletedSubtask()
+ private final case class Failure(obj : Task, ex : Throwable)
+
+ /**
+ * A specific kind of `Success` that reports on which specific `Task` has reported Success where that `Task` was some other `Task`'s subtask.
+ * @param obj a task object
+ */
+ private final case class CompletedSubtask(obj : Task)
/**
* A `Broadcast` message designed to find and remove a particular task from this series of routed `Actors`.
@@ -275,10 +344,10 @@ object TaskResolver {
* Storage unit for a specific unit of work, plus extra information.
* @param task the work to be completed
* @param subtasks other work that needs to be completed first
- * @param isASubtask whether this work is intermediary or the last in a dependency chain
+ * //@param isASubtask whether this work is intermediary or the last in a dependency chain
* @param supertaskRef the `TaskResolver` that will handle work that depends on the outcome of this work
*/
- private final case class TaskEntry(task : Task, subtasks : List[Task] = Nil, isASubtask : Boolean = false, supertaskRef : ActorRef = Actor.noSender) {
+ private final case class TaskEntry(task : Task, subtasks : List[Task] = Nil, supertaskRef : ActorRef = Actor.noSender) {
private var start : Long = 0L
private var isExecuting : Boolean = false
@@ -286,10 +355,14 @@ object TaskResolver {
def Executing : Boolean = isExecuting
+ /**
+ * Only execute each task once.
+ * @param ref the `TaskResolver` currently handling this `Task`/`TaskEntry`
+ */
def Execute(ref : ActorRef) : Unit = {
if(!isExecuting) {
- start = Task.TimeNow
isExecuting = true
+ start = Task.TimeNow
task.Execute(ref)
}
}
@@ -304,45 +377,22 @@ object TaskResolver {
}
/**
- * Find the index of the targeted `Task`, if it is enqueued here.
- * @param iter an `Iterator` of
- * @param task a target `Task`
- * @param index the current index in the aforementioned `List`;
- * defaults to 0
- * @return the index of the discovered task, or `None`
+ * Scan across a group of tasks to determine which ones match the target completion status.
+ * @param iter an `Iterator` of enqueued `TaskEntry` indices
+ * @param resolution the target completion status
+ * @return the first valid index when `TaskEntry` has its primary `Task` matching the completion status
*/
- @tailrec private def findTaskIndex(iter : Iterator[TaskResolver.TaskEntry], task : Task, index : Int = 0) : Option[Int] = {
+ @tailrec private def filterCompletion(iter : Iterator[Int], tasks : List[TaskEntry], resolution : Task.Resolution.Value) : Option[Int] = {
if(!iter.hasNext) {
None
}
else {
- if(iter.next.task == task) {
+ val index : Int = iter.next
+ if(tasks(index).task.isComplete == resolution) {
Some(index)
}
else {
- findTaskIndex(iter, task, index + 1)
- }
- }
- }
-
- /**
- * Scan across a group of tasks to determine which ones match the target completion status.
- * @param iter an `Iterator` of enqueued `TaskEntry` indices
- * @param resolution the target completion status
- * @param indexList a persistent `List` of indices
- * @return the `List` of all valid `Task` indices
- */
- @tailrec private def filterCompletion(iter : Iterator[Int], tasks : List[TaskEntry], resolution : Task.Resolution.Value, indexList : List[Int] = Nil) : List[Int] = {
- if(!iter.hasNext) {
- indexList
- }
- else {
- val index : Int = iter.next
- if(tasks(index).task.isComplete == resolution) {
- filterCompletion(iter, tasks, resolution, indexList :+ index)
- }
- else {
- filterCompletion(iter, tasks, resolution, indexList)
+ filterCompletion(iter, tasks, resolution)
}
}
}
@@ -390,4 +440,49 @@ object TaskResolver {
}
}
}
+
+ /**
+ * Find the index of the targeted `Task`, if it is enqueued here.
+ * @param iter an `Iterator` of entries
+ * @param target a target `Task`
+ * @param index the current index in the aforementioned `List`;
+ * defaults to 0
+ * @return the index of the discovered task, or `None`
+ */
+ @tailrec private def findTask(iter : Iterator[TaskEntry], target : Task, index : Int = 0) : Option[Int] = {
+ if(!iter.hasNext) {
+ None
+ }
+ else {
+ if(iter.next.task == target) {
+ Some(index)
+ }
+ else {
+ findTask(iter, target, index + 1)
+ }
+ }
+ }
+
+ /**
+ * Find the index of the targeted `Task`, if it is enqueued here, given a specific "subtask" of that `Task`.
+ * @param iter an `Iterator` of entries
+ * @param target a target subtask
+ * @param index the current index in the aforementioned `List`;
+ * defaults to 0
+ * @return the index of the discovered task, or `None`
+ */
+ @tailrec private def findTaskWithSubtask(iter : Iterator[TaskEntry], target : Task, index : Int = 0) : Option[Int] = {
+ if(!iter.hasNext) {
+ None
+ }
+ else {
+ val tEntry = iter.next
+ if(tEntry.subtasks.contains(target)) {
+ Some(index)
+ }
+ else {
+ findTaskWithSubtask(iter, target, index + 1)
+ }
+ }
+ }
}
diff --git a/common/src/main/scala/net/psforever/objects/zones/Zone.scala b/common/src/main/scala/net/psforever/objects/zones/Zone.scala
index 68415c50..94c30d49 100644
--- a/common/src/main/scala/net/psforever/objects/zones/Zone.scala
+++ b/common/src/main/scala/net/psforever/objects/zones/Zone.scala
@@ -2,6 +2,7 @@
package net.psforever.objects.zones
import akka.actor.{ActorContext, ActorRef, Props}
+import akka.routing.RandomPool
import net.psforever.objects.serverobject.doors.Base
import net.psforever.objects.{PlanetSideGameObject, Player}
import net.psforever.objects.equipment.Equipment
@@ -66,7 +67,7 @@ class Zone(private val zoneId : String, zoneMap : ZoneMap, zoneNumber : Int) {
def Init(implicit context : ActorContext) : Unit = {
if(accessor == ActorRef.noSender) {
implicit val guid : NumberPoolHub = this.guid //passed into builderObject.Build implicitly
- accessor = context.actorOf(Props(classOf[UniqueNumberSystem], guid, UniqueNumberSystem.AllocateNumberPoolActors(guid)), s"$Id-uns")
+ accessor = context.actorOf(RandomPool(25).props(Props(classOf[UniqueNumberSystem], guid, UniqueNumberSystem.AllocateNumberPoolActors(guid))), s"$Id-uns")
ground = context.actorOf(Props(classOf[ZoneGroundActor], equipmentOnGround), s"$Id-ground")
Map.LocalObjects.foreach({ builderObject =>
diff --git a/pslogin/src/main/scala/PsLogin.scala b/pslogin/src/main/scala/PsLogin.scala
index 6e172cbe..e3e4bc1b 100644
--- a/pslogin/src/main/scala/PsLogin.scala
+++ b/pslogin/src/main/scala/PsLogin.scala
@@ -229,6 +229,8 @@ object PsLogin {
LocalObject(DoorObjectBuilder(door, 330))
LocalObject(DoorObjectBuilder(door, 332))
+ LocalObject(DoorObjectBuilder(door, 370))
+ LocalObject(DoorObjectBuilder(door, 371))
LocalObject(DoorObjectBuilder(door, 372))
LocalObject(DoorObjectBuilder(door, 373))
LocalObject(IFFLockObjectBuilder(lock_external, 556))
diff --git a/pslogin/src/main/scala/WorldSessionActor.scala b/pslogin/src/main/scala/WorldSessionActor.scala
index c202a572..1c9cc53d 100644
--- a/pslogin/src/main/scala/WorldSessionActor.scala
+++ b/pslogin/src/main/scala/WorldSessionActor.scala
@@ -586,7 +586,7 @@ class WorldSessionActor extends Actor with MDCContextAware {
avatarService ! AvatarServiceMessage(tplayer.Continent, AvatarAction.EquipmentInHand(player_guid, slot, item))
}
case None =>
- continent.Actor ! Zone.DropItemOnGround(item, item.Position, item.Orientation) //restore
+ continent.Ground ! Zone.DropItemOnGround(item, item.Position, item.Orientation) //restore
}
case ItemHacking(tplayer, target, tool_guid, delta, completeAction, tickAction) =>
@@ -1390,7 +1390,7 @@ class WorldSessionActor extends Actor with MDCContextAware {
def Execute(resolver : ActorRef) : Unit = {
localTarget.Slot(localIndex).Equipment = localObject
- resolver ! scala.util.Success(localObject)
+ resolver ! scala.util.Success(this)
}
override def onSuccess() : Unit = {
@@ -1442,8 +1442,9 @@ class WorldSessionActor extends Actor with MDCContextAware {
}
def Execute(resolver : ActorRef) : Unit = {
+ log.info(s"Player $localPlayer is registered")
+ resolver ! scala.util.Success(this)
localAnnounce ! PlayerLoaded(localPlayer) //alerts WSA
- resolver ! scala.util.Success(localPlayer)
}
override def onFailure(ex : Throwable) : Unit = {
@@ -1536,7 +1537,7 @@ class WorldSessionActor extends Actor with MDCContextAware {
def Execute(resolver : ActorRef) : Unit = {
localTarget.Slot(localIndex).Equipment = None
- resolver ! scala.util.Success(localObject)
+ resolver ! scala.util.Success(this)
}
override def onSuccess() : Unit = {