properties)
{
_taskExecutor = taskExecutor;
_timerExecutor = timerExecutor;
_loggerFactory = loggerFactory;
_properties = properties;
_allLogger = loggerFactory.getLogger(LOGGER_BASE + ":all");
_rootLogger = loggerFactory.getLogger(LOGGER_BASE + ":root");
}
public Object getProperty(String key)
{
return _properties.get(key);
}
/**
* Runs the given task with its own context. Use {@code Tasks.seq} and
* {@code Tasks.par} to create and run composite tasks.
*
* @param task the task to run
*/
public void run(final Task task)
{
State currState, newState;
do
{
currState = _stateRef.get();
if (currState._stateName != StateName.RUN)
{
task.cancel(new EngineShutdownException("Task submitted after engine shutdown"));
return;
}
newState = new State(StateName.RUN, currState._pendingCount + 1);
} while (!_stateRef.compareAndSet(currState, newState));
final long planId = NEXT_PLAN_ID.getAndIncrement();
final Logger planLogger = _loggerFactory.getLogger(LOGGER_BASE + ":planClass=" + task.getClass().getName());
final TaskLogger taskLogger = new TaskLogger(task, _allLogger, _rootLogger, planLogger);
final Executor taskExecutor = new SerialExecutor(_taskExecutor, new CancelPlanRejectionHandler(task));
new ContextImpl(new PlanContext(planId, this, taskExecutor, _timerExecutor, taskLogger), task).runTask();
InternalUtil.unwildcardTask(task).addListener(_taskDoneListener);
}
/**
* If the engine is currently running, this method will initiate an orderly
* shutdown. No new tasks will be accepted, but already running tasks will be
* allowed to finish. Use {@link #awaitTermination(int, java.util.concurrent.TimeUnit)}
* to wait for the engine to shutdown.
*
* If the engine is already shutting down or stopped this method will have
* no effect.
*/
public void shutdown()
{
if (tryTransitionShutdown())
{
tryTransitionTerminate();
}
}
/**
* Returns {@code true} if engine shutdown has been started or if the engine
* is terminated. Use {@link #isTerminated()} to determine if the engine is
* actually stopped and {@link #awaitTermination(int, java.util.concurrent.TimeUnit)}
* to wait for the engine to stop.
*
* @return {@code true} if the engine has started shutting down or if it has
* finished shutting down.
*/
public boolean isShutdown()
{
return _stateRef.get()._stateName != StateName.RUN;
}
/**
* Returns {@code true} if the engine has completely stopped. Use
* {@link #awaitTermination(int, java.util.concurrent.TimeUnit)} to wait for
* the engine to terminate. Use {@link #shutdown()} to start engine shutdown.
*
* @return {@code true} if the engine has completed stopped.
*/
public boolean isTerminated()
{
return _stateRef.get()._stateName == StateName.TERMINATED;
}
/**
* Waits for the engine to stop. Use {@link #shutdown()} to initiate
* shutdown.
*
* @param time the amount of time to wait
* @param unit the unit for the time to wait
* @return {@code true} if shutdown completed within the specified time or
* {@code false} if not.
* @throws InterruptedException if this thread is interrupted while waiting
* for the engine to stop.
*/
public boolean awaitTermination(final int time, final TimeUnit unit) throws InterruptedException
{
return _terminated.await(time, unit);
}
private boolean tryTransitionShutdown()
{
State currState, newState;
do
{
currState = _stateRef.get();
if (currState._stateName != StateName.RUN)
{
return false;
}
newState = new State(StateName.SHUTDOWN, currState._pendingCount);
}
while (!_stateRef.compareAndSet(currState, newState));
return true;
}
private void tryTransitionTerminate()
{
State currState;
do
{
currState = _stateRef.get();
if (currState._stateName != StateName.SHUTDOWN ||
currState._pendingCount != 0)
{
return;
}
}
while (!_stateRef.compareAndSet(currState, TERMINATED));
_terminated.countDown();
}
private static class State
{
private final StateName _stateName;
private final long _pendingCount;
private State(final StateName stateName, final long pendingCount)
{
_pendingCount = pendingCount;
_stateName = stateName;
}
}
private static class CancelPlanRejectionHandler implements RejectedSerialExecutionHandler
{
private final Task _task;
private CancelPlanRejectionHandler(Task task)
{
_task = task;
}
@Override
public void rejectedExecution(Throwable error)
{
final String msg = "Serial executor loop failed for plan: " + _task.getName();
final SerialExecutionException ex = new SerialExecutionException(msg, error);
final boolean wasCancelled = _task.cancel(ex);
LOG.error(msg + ". The plan was " + (wasCancelled ? "" : "not ") + "cancelled.", ex);
}
}
}