resolving cherry-pick of commit

This commit is contained in:
FateJH 2017-10-24 20:47:33 -04:00
parent 3bb878ac10
commit 33403c24dd
4 changed files with 212 additions and 113 deletions

View file

@ -13,34 +13,31 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
class TaskResolver() extends Actor {
/** list of all work currently managed by this TaskResolver */
/** list of all work currently managed by this resolver */
private val tasks : ListBuffer[TaskResolver.TaskEntry] = new ListBuffer[TaskResolver.TaskEntry]
/** scheduled examination of all managed work */
/** scheduled termination of all managed work */
private var timeoutCleanup : Cancellable = TaskResolver.DefaultCancellable
//private[this] val log = org.log4s.getLogger
/** logging utilities; default to tracing */
private[this] val log = org.log4s.getLogger
private def trace(msg : String) = log.trace(msg)
/**
* Deal with any tasks that are still enqueued with this expiring `TaskResolver`.<br>
* <br>
* First, eliminate all timed-out tasks.
* Secondly, deal with all tasks that have reported "success" but have not yet been handled.
* Finally, all other remaining tasks should be treated as if they had failed.
* Deal with any tasks that are still enqueued with this expiring `TaskResolver`.
*/
override def aroundPostStop() = {
super.aroundPostStop()
/*
First, eliminate all timed-out tasks.
Secondly, deal with all tasks that have reported "success" but have not yet been handled.
Finally, all other remaining tasks should be treated as if they had failed.
*/
timeoutCleanup.cancel()
TimeoutCleanup()
OnSuccess()
val ex : Throwable = new Exception(s"a task is being stopped")
OnFailure(ex)
tasks.indices.foreach({index =>
val entry = tasks(index)
PropagateAbort(index, ex)
if(entry.isASubtask) {
entry.supertaskRef ! Failure(ex) //alert our superior task's resolver we have completed
}
tasks.foreach(entry => {
OnFailure(entry.task, ex)
})
super.aroundPostStop()
}
def receive : Receive = {
@ -51,15 +48,21 @@ class TaskResolver() extends Actor {
QueueSubtasks(aTask, subtasks)
case TaskResolver.GiveSubtask(aTask, subtasks, resolver) =>
QueueSubtasks(aTask, subtasks, true, resolver)
QueueSubtasks(aTask, subtasks, resolver)
case TaskResolver.CompletedSubtask() =>
ExecuteNewTasks()
case TaskResolver.CompletedSubtask(obj) => //inter-resolver calls
ExecuteNewTasks(obj)
case Success(_) => //ignore the contents as unreliable
case Success(obj : Task) => //inter-resolver calls
OnSuccess(obj)
case Success | Success(_) => //success redirected from called event
OnSuccess()
case Failure(ex) =>
case TaskResolver.Failure(obj, ex) => //inter-resolver calls
OnFailure(obj, ex)
case Failure(ex) => //failure redirected from called event
OnFailure(ex)
case TaskResolver.AbortTask(task, ex) =>
@ -68,7 +71,8 @@ class TaskResolver() extends Actor {
case TaskResolver.TimeoutCleanup() =>
TimeoutCleanup()
case _ => ;
case msg =>
log.warn(s"$self received an unexpected message $msg from $sender")
}
/**
@ -78,7 +82,8 @@ class TaskResolver() extends Actor {
private def GiveTask(aTask : Task) : Unit = {
val entry : TaskResolver.TaskEntry = TaskResolver.TaskEntry(aTask)
tasks += entry
entry.Execute(self) //send this Actor; aesthetically pleasant expression
trace(s"enqueue and start task $aTask")
entry.Execute(self)
StartTimeoutCheck()
}
@ -101,16 +106,15 @@ class TaskResolver() extends Actor {
* The parent of this `TaskResolver` is the router logic for all brethren `TaskResolver` `Actors`.
* @param task the work to be completed
* @param subtasks other work that needs to be completed first
* @param isSubTask `true`, if this task counts as internal or as a leaf in the chain of `Task` dependency;
* `false`, by default, if we are the top of the chain fo dependency
* @param resolver the `TaskResolver` that distributed this work, thus determining that this work is a sub-task;
* by default, no one, as the work is identified as a main task
*/
private def QueueSubtasks(task : Task, subtasks : List[TaskResolver.GiveTask], isSubTask : Boolean = false, resolver : ActorRef = Actor.noSender) : Unit = {
val sublist : List[Task] = subtasks.map(task => task.task)
val entry : TaskResolver.TaskEntry = TaskResolver.TaskEntry(task, sublist, isSubTask, resolver)
private def QueueSubtasks(task : Task, subtasks : List[TaskResolver.GiveTask], resolver : ActorRef = Actor.noSender) : Unit = {
val entry : TaskResolver.TaskEntry = TaskResolver.TaskEntry(task, subtasks.map(task => task.task), resolver)
tasks += entry
if(sublist.isEmpty) { //a leaf in terms of task dependency; so, not dependent on any other work
trace(s"enqueue task $task")
if(subtasks.isEmpty) { //a leaf in terms of task dependency; so, not dependent on any other work
trace(s"start task $task")
entry.Execute(self)
}
else {
@ -123,65 +127,121 @@ class TaskResolver() extends Actor {
/**
* Perform these checks when a task has reported successful completion to this TaskResolver.
* Since the `Success(_)` can not be associated with a specific task, every task and subtask will be checked.
* Every task and subtask will be checked, starting from the end of the list of queued entries
* and only the first discovered one will be used.
*/
private def OnSuccess(): Unit = {
//by reversing the List, we can remove TaskEntries without disrupting the order
TaskResolver.filterCompletion(tasks.indices.reverseIterator, tasks.toList, Task.Resolution.Success).foreach({index =>
val entry = tasks(index)
entry.task.onSuccess()
if(entry.isASubtask) {
entry.supertaskRef ! TaskResolver.CompletedSubtask() //alert our dependent task's resolver that we have completed
}
TaskCleanup(index)
})
//by reversing the List, we find the most outstanding Task with the completion state
TaskResolver.filterCompletion(tasks.indices.reverseIterator, tasks.toList, Task.Resolution.Success) match {
case Some(index) =>
GeneralOnSuccess(index)
case None => ;
}
}
/**
* Perform these checks when a task has reported successful completion to this TaskResolver.
* @param task a `Task` object
*/
private def OnSuccess(task : Task): Unit = {
//find specific task and dequeue
TaskResolver.findTask(tasks.iterator, task) match {
case Some(index) =>
GeneralOnSuccess(index)
case None => ;
}
}
/**
* Perform these checks when a task has reported successful completion to this TaskResolver.
* This is what actually happens upon completion.
* @param index the `TaskEntry` index
*/
private def GeneralOnSuccess(index : Int) : Unit = {
val entry = tasks(index)
entry.task.onSuccess()
trace(s"success with this task ${entry.task}")
if(entry.supertaskRef != ActorRef.noSender) {
entry.supertaskRef ! TaskResolver.CompletedSubtask(entry.task) //alert our dependent task's resolver that we have completed
}
TaskCleanup(index)
}
/**
* Scan across a group of sub-tasks and determine if the associated main `Task` may execute.
* All of the sub-tasks must report a `Success` completion status before the main work can begin.
* @param subtask a `Task` that is a subtask of some parent task in this resolver's group
*/
private def ExecuteNewTasks() : Unit = {
tasks.filter({taskEntry => taskEntry.subtasks.nonEmpty}).foreach(entry => {
if(TaskResolver.filterCompletionMatch(entry.subtasks.iterator, Task.Resolution.Success)) {
entry.Execute(self)
StartTimeoutCheck()
}
})
private def ExecuteNewTasks(subtask : Task) : Unit = {
TaskResolver.findTaskWithSubtask(tasks.iterator, subtask) match {
case Some(index) =>
val entry = tasks(index)
if(TaskResolver.filterCompletionMatch(entry.subtasks.iterator, Task.Resolution.Success)) {
trace(s"start new task ${entry.task}")
entry.Execute(self)
StartTimeoutCheck()
}
case None => ;
}
}
/**
* Perform these checks when a task has reported failure to this TaskResolver.
* Since the `Failure(Throwable)` can not be associated with a specific task, every task and subtask will be checked.
* Perform these checks when a task has reported failure to this `TaskResolver`.
* Since the `Failure(Throwable)` can not be associated with a specific task,
* every task and subtask will be checked, starting from the end of the list of queued entries,
* and only the first discovered one will be used.
* Consequently, the specific `Throwable` that contains the error message may have nothing to do with the failed task.
* @param ex a `Throwable` that reports what happened to the task
*/
private def OnFailure(ex : Throwable) : Unit = {
TaskResolver.filterCompletion(tasks.indices.reverseIterator, tasks.toList, Task.Resolution.Failure).foreach({index =>
val entry = tasks(index)
PropagateAbort(index, ex)
entry.task.onFailure(ex) //TODO let the error be disjoint?
if(entry.isASubtask) {
entry.supertaskRef ! Failure(ex) //alert our superior task's resolver we have completed
}
})
FaultSubtasks()
//by reversing the List, we find the most outstanding Task with the completion state
TaskResolver.filterCompletion(tasks.indices.reverseIterator, tasks.toList, Task.Resolution.Failure) match {
case Some(index) =>
GeneralOnFailure(index, ex)
case None => ;
}
}
/**
* Scan across a group of sub-tasks and, if any have reported `Failure`, report to the main `Task` that it should fail as well.
* Perform these checks when a task has reported failure to this `TaskResolver`.
* @param subtask the task that had reported failure from some other resolver
* @param ex a `Throwable` that reports what happened to the task
*/
private def FaultSubtasks() : Unit = {
tasks.indices.filter({index => tasks(index).subtasks.nonEmpty}).reverse.foreach(index => {
val entry = tasks(index)
if(TaskResolver.filterCompletionMatch(entry.subtasks.iterator, Task.Resolution.Failure)) {
val ex : Throwable = new Exception(s"a task ${entry.task} had a subtask that failed")
entry.task.onFailure(ex)
if(entry.isASubtask) {
entry.supertaskRef ! Failure(ex) //alert our superior task's resolver we have completed
}
TaskCleanup(index)
}
private def OnFailure(subtask : Task, ex : Throwable) : Unit = {
TaskResolver.findTaskWithSubtask(tasks.iterator, subtask) match {
case Some(index) =>
GeneralOnFailure(index, ex)
case None => ;
}
}
/**
* Perform these checks when a task has reported failure to this `TaskResolver`.
* This is what actually happens upon completion.
* @param index the `TaskEntry` index
* @param ex a `Throwable` that reports what happened to the task
*/
private def GeneralOnFailure(index : Int, ex : Throwable) : Unit = {
val entry = tasks(index)
val task = entry.task
trace(s"failure with this task $task")
task.onAbort(ex)
task.onFailure(ex)
if(entry.supertaskRef != ActorRef.noSender) {
entry.supertaskRef ! TaskResolver.Failure(task, ex) //alert our superior task's resolver we have completed
}
FaultSubtasks(entry)
TaskCleanup(index)
}
/**
* Instruct all subtasks of a given `Task` to fail.
* @param entry the target parent entry (that has failed)
*/
private def FaultSubtasks(entry : TaskResolver.TaskEntry) : Unit = {
val ex : Throwable = new Exception(s"a task ${entry.task} had a subtask that failed")
entry.subtasks.foreach(subtask => {
context.parent ! Broadcast(TaskResolver.Failure(subtask, ex)) //we have no clue where this subtask was hosted
})
}
@ -191,9 +251,10 @@ class TaskResolver() extends Actor {
* @param ex a `Throwable` that reports what happened to the work
*/
private def OnAbort(task : Task, ex : Throwable) : Unit = {
TaskResolver.findTaskIndex(tasks.iterator, task) match {
TaskResolver.findTask(tasks.iterator, task) match {
case Some(index) =>
PropagateAbort(index, ex)
TaskCleanup(index)
case None => ;
}
}
@ -206,11 +267,11 @@ class TaskResolver() extends Actor {
private def PropagateAbort(index : Int, ex : Throwable) : Unit = {
tasks(index).subtasks.foreach({subtask =>
if(subtask.isComplete == Task.Resolution.Success) {
trace(s"aborting task $subtask")
subtask.onAbort(ex)
}
context.parent ! Broadcast(TaskResolver.AbortTask(subtask, ex))
})
TaskCleanup(index)
}
/**
@ -260,9 +321,17 @@ object TaskResolver {
private final case class TimeoutCleanup()
/**
*
* A specific kind of `Failure` that reports on which specific `Task` has reported failure.
* @param obj a task object
* @param ex information about what went wrong
*/
private final case class CompletedSubtask()
private final case class Failure(obj : Task, ex : Throwable)
/**
* A specific kind of `Success` that reports on which specific `Task` has reported Success where that `Task` was some other `Task`'s subtask.
* @param obj a task object
*/
private final case class CompletedSubtask(obj : Task)
/**
* A `Broadcast` message designed to find and remove a particular task from this series of routed `Actors`.
@ -275,10 +344,10 @@ object TaskResolver {
* Storage unit for a specific unit of work, plus extra information.
* @param task the work to be completed
* @param subtasks other work that needs to be completed first
* @param isASubtask whether this work is intermediary or the last in a dependency chain
* //@param isASubtask whether this work is intermediary or the last in a dependency chain
* @param supertaskRef the `TaskResolver` that will handle work that depends on the outcome of this work
*/
private final case class TaskEntry(task : Task, subtasks : List[Task] = Nil, isASubtask : Boolean = false, supertaskRef : ActorRef = Actor.noSender) {
private final case class TaskEntry(task : Task, subtasks : List[Task] = Nil, supertaskRef : ActorRef = Actor.noSender) {
private var start : Long = 0L
private var isExecuting : Boolean = false
@ -286,10 +355,14 @@ object TaskResolver {
def Executing : Boolean = isExecuting
/**
* Only execute each task once.
* @param ref the `TaskResolver` currently handling this `Task`/`TaskEntry`
*/
def Execute(ref : ActorRef) : Unit = {
if(!isExecuting) {
start = Task.TimeNow
isExecuting = true
start = Task.TimeNow
task.Execute(ref)
}
}
@ -304,45 +377,22 @@ object TaskResolver {
}
/**
* Find the index of the targeted `Task`, if it is enqueued here.
* @param iter an `Iterator` of
* @param task a target `Task`
* @param index the current index in the aforementioned `List`;
* defaults to 0
* @return the index of the discovered task, or `None`
* Scan across a group of tasks to determine which ones match the target completion status.
* @param iter an `Iterator` of enqueued `TaskEntry` indices
* @param resolution the target completion status
* @return the first valid index when `TaskEntry` has its primary `Task` matching the completion status
*/
@tailrec private def findTaskIndex(iter : Iterator[TaskResolver.TaskEntry], task : Task, index : Int = 0) : Option[Int] = {
@tailrec private def filterCompletion(iter : Iterator[Int], tasks : List[TaskEntry], resolution : Task.Resolution.Value) : Option[Int] = {
if(!iter.hasNext) {
None
}
else {
if(iter.next.task == task) {
val index : Int = iter.next
if(tasks(index).task.isComplete == resolution) {
Some(index)
}
else {
findTaskIndex(iter, task, index + 1)
}
}
}
/**
* Scan across a group of tasks to determine which ones match the target completion status.
* @param iter an `Iterator` of enqueued `TaskEntry` indices
* @param resolution the target completion status
* @param indexList a persistent `List` of indices
* @return the `List` of all valid `Task` indices
*/
@tailrec private def filterCompletion(iter : Iterator[Int], tasks : List[TaskEntry], resolution : Task.Resolution.Value, indexList : List[Int] = Nil) : List[Int] = {
if(!iter.hasNext) {
indexList
}
else {
val index : Int = iter.next
if(tasks(index).task.isComplete == resolution) {
filterCompletion(iter, tasks, resolution, indexList :+ index)
}
else {
filterCompletion(iter, tasks, resolution, indexList)
filterCompletion(iter, tasks, resolution)
}
}
}
@ -390,4 +440,49 @@ object TaskResolver {
}
}
}
/**
* Find the index of the targeted `Task`, if it is enqueued here.
* @param iter an `Iterator` of entries
* @param target a target `Task`
* @param index the current index in the aforementioned `List`;
* defaults to 0
* @return the index of the discovered task, or `None`
*/
@tailrec private def findTask(iter : Iterator[TaskEntry], target : Task, index : Int = 0) : Option[Int] = {
if(!iter.hasNext) {
None
}
else {
if(iter.next.task == target) {
Some(index)
}
else {
findTask(iter, target, index + 1)
}
}
}
/**
* Find the index of the targeted `Task`, if it is enqueued here, given a specific "subtask" of that `Task`.
* @param iter an `Iterator` of entries
* @param target a target subtask
* @param index the current index in the aforementioned `List`;
* defaults to 0
* @return the index of the discovered task, or `None`
*/
@tailrec private def findTaskWithSubtask(iter : Iterator[TaskEntry], target : Task, index : Int = 0) : Option[Int] = {
if(!iter.hasNext) {
None
}
else {
val tEntry = iter.next
if(tEntry.subtasks.contains(target)) {
Some(index)
}
else {
findTaskWithSubtask(iter, target, index + 1)
}
}
}
}

View file

@ -2,6 +2,7 @@
package net.psforever.objects.zones
import akka.actor.{ActorContext, ActorRef, Props}
import akka.routing.RandomPool
import net.psforever.objects.serverobject.doors.Base
import net.psforever.objects.{PlanetSideGameObject, Player}
import net.psforever.objects.equipment.Equipment
@ -66,7 +67,7 @@ class Zone(private val zoneId : String, zoneMap : ZoneMap, zoneNumber : Int) {
def Init(implicit context : ActorContext) : Unit = {
if(accessor == ActorRef.noSender) {
implicit val guid : NumberPoolHub = this.guid //passed into builderObject.Build implicitly
accessor = context.actorOf(Props(classOf[UniqueNumberSystem], guid, UniqueNumberSystem.AllocateNumberPoolActors(guid)), s"$Id-uns")
accessor = context.actorOf(RandomPool(25).props(Props(classOf[UniqueNumberSystem], guid, UniqueNumberSystem.AllocateNumberPoolActors(guid))), s"$Id-uns")
ground = context.actorOf(Props(classOf[ZoneGroundActor], equipmentOnGround), s"$Id-ground")
Map.LocalObjects.foreach({ builderObject =>

View file

@ -229,6 +229,8 @@ object PsLogin {
LocalObject(DoorObjectBuilder(door, 330))
LocalObject(DoorObjectBuilder(door, 332))
LocalObject(DoorObjectBuilder(door, 370))
LocalObject(DoorObjectBuilder(door, 371))
LocalObject(DoorObjectBuilder(door, 372))
LocalObject(DoorObjectBuilder(door, 373))
LocalObject(IFFLockObjectBuilder(lock_external, 556))

View file

@ -586,7 +586,7 @@ class WorldSessionActor extends Actor with MDCContextAware {
avatarService ! AvatarServiceMessage(tplayer.Continent, AvatarAction.EquipmentInHand(player_guid, slot, item))
}
case None =>
continent.Actor ! Zone.DropItemOnGround(item, item.Position, item.Orientation) //restore
continent.Ground ! Zone.DropItemOnGround(item, item.Position, item.Orientation) //restore
}
case ItemHacking(tplayer, target, tool_guid, delta, completeAction, tickAction) =>
@ -1390,7 +1390,7 @@ class WorldSessionActor extends Actor with MDCContextAware {
def Execute(resolver : ActorRef) : Unit = {
localTarget.Slot(localIndex).Equipment = localObject
resolver ! scala.util.Success(localObject)
resolver ! scala.util.Success(this)
}
override def onSuccess() : Unit = {
@ -1442,8 +1442,9 @@ class WorldSessionActor extends Actor with MDCContextAware {
}
def Execute(resolver : ActorRef) : Unit = {
log.info(s"Player $localPlayer is registered")
resolver ! scala.util.Success(this)
localAnnounce ! PlayerLoaded(localPlayer) //alerts WSA
resolver ! scala.util.Success(localPlayer)
}
override def onFailure(ex : Throwable) : Unit = {
@ -1536,7 +1537,7 @@ class WorldSessionActor extends Actor with MDCContextAware {
def Execute(resolver : ActorRef) : Unit = {
localTarget.Slot(localIndex).Equipment = None
resolver ! scala.util.Success(localObject)
resolver ! scala.util.Success(this)
}
override def onSuccess() : Unit = {