X Tutup
package act.job; /*- * #%L * ACT Framework * %% * Copyright (C) 2014 - 2017 ActFramework * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * #L% */ import static act.util.SimpleProgressGauge.wsJobProgressTag; import static org.osgl.$.F0; import act.Act; import act.app.App; import act.app.event.SysEventId; import act.event.EventBus; import act.event.SysEventListenerBase; import act.route.DuplicateRouteMappingException; import act.util.DestroyableBase; import act.util.ProgressGauge; import act.util.SimpleProgressGauge; import act.ws.WebSocketConnectionManager; import org.osgl.$; import org.osgl.exception.ConfigurationException; import org.osgl.exception.NotAppliedException; import org.osgl.exception.UnexpectedException; import org.osgl.util.C; import org.osgl.util.E; import org.osgl.util.S; import java.util.*; import java.util.concurrent.Callable; /** * A `Job` is a piece of logic that can be run/scheduled in ActFramework */ public class Job extends DestroyableBase implements Runnable { private static class LockableJobList { boolean iterating; List jobList; Job parent; LockableJobList(Job parent) { this.jobList = new ArrayList<>(); this.parent = parent; } synchronized void clear() { jobList.clear(); } synchronized Job add(Job thatJob) { if (parent.isOneTime()) { thatJob.setOneTime(); } if (parent.done() || iterating) { parent.manager.now(thatJob); return parent; } jobList.add(thatJob); return parent; } synchronized void runSubJobs() { runSubJobs(false); } synchronized void runSubJobs(boolean async) { if (jobList.isEmpty()) { return; } final JobManager jobManager = async ? Act.jobManager() : null; iterating = true; try { for (final Job subJob : jobList) { if (null != jobManager) { jobManager.now(new Runnable() { @Override public void run() { subJob.run(); } }); } else { subJob.run(); if (Act.isDev() && subJob.app.hasBlockIssue()) { break; } } } } finally { iterating = false; } } } static final String BRIEF_VIEW = "id,oneTime,executed,trigger"; static final String DETAIL_VIEW = "id,oneTime,executed,trigger,worker"; private static final C.Set> FATAL_EXCEPTIONS = C.set( DuplicateRouteMappingException.class, ConfigurationException.class ); private final String id; private final String jobProgressTag; private App app; private boolean oneTime; private boolean executed; private JobManager manager; private final EventBus eventBus; private JobTrigger trigger; private $.Func0 worker; Object callableResult; Exception callableException; // progress percentage private SimpleProgressGauge progress = new SimpleProgressGauge(); private LockableJobList parallelJobs = new LockableJobList(this); private LockableJobList followingJobs = new LockableJobList(this); private LockableJobList precedenceJobs = new LockableJobList(this); private Job(String id) { this.id = id; this.jobProgressTag = wsJobProgressTag(id); this.eventBus = Act.eventBus(); } Job(String id, JobManager manager) { this(id, manager, ($.Func0)null); } Job(String id, JobManager manager, final Callable callable) { this.id = id; this.manager = $.requireNotNull(manager); this.oneTime = true; this.app = manager.app(); this.jobProgressTag = wsJobProgressTag(id); this.manager.addJob(this); this.eventBus = app.eventBus(); this.worker = new F0() { @Override public Object apply() throws NotAppliedException, $.Break { try { callableResult = callable.call(); } catch (Exception e) { callableException = e; } return null; } }; } Job(String id, JobManager manager, $.Func0 worker) { this(id, manager, worker, true); } Job(String id, JobManager manager, $.Func0 worker, boolean oneTime) { this.id = id; this.manager = $.NPE(manager); this.worker = worker; this.oneTime = oneTime; this.app = manager.app(); this.eventBus = app.eventBus(); this.jobProgressTag = wsJobProgressTag(id); this.manager.addJob(this); } Job(String id, JobManager manager, $.Function worker) { this(id, manager, worker, true); } Job(String id, JobManager manager, $.Function worker, boolean oneTime) { this.id = id; this.manager = $.requireNotNull(manager); $.F1 f1 = $.f1(worker); this.worker = f1.curry(progress); this.oneTime = oneTime; this.app = manager.app(); this.eventBus = app.eventBus(); this.jobProgressTag = wsJobProgressTag(id); this.manager.addJob(this); } public void setProgressGauge(ProgressGauge progressGauge) { progress = SimpleProgressGauge.wrap(progressGauge); progress.setId(getId()); progress.addListener(new ProgressGauge.Listener() { @Override public void onUpdate(ProgressGauge progressGauge) { Map payload = C.Map("act_job_progress", progressGauge); app.getInstance(WebSocketConnectionManager.class).sendJsonToTagged(payload, jobProgressTag); } }); } public SimpleProgressGauge progress() { return progress; } public int getProgressInPercent() { return progress.currentProgressPercent(); } @Override protected void releaseResources() { worker = null; manager = null; parallelJobs.clear(); followingJobs.clear(); precedenceJobs.clear(); super.releaseResources(); } protected String brief() { return S.concat("job[", id, "]\none time job:", S.string(oneTime), "\ntrigger:", S.string(trigger)); } @Override public String toString() { S.Buffer sb = S.buffer(brief()); printSubJobs(parallelJobs.jobList, "parallel jobs", sb); printSubJobs(followingJobs.jobList, "following jobs", sb); printSubJobs(precedenceJobs.jobList, "precedence jobs", sb); return sb.toString(); } private static void printSubJobs(Collection subJobs, String label, S.Buffer sb) { if (null != subJobs && !subJobs.isEmpty()) { sb.append("\n").append(label); for (Job job : subJobs) { sb.append("\n\t").append(job.brief()); } } } Job setOneTime() { oneTime = true; return this; } boolean done() { return executed && oneTime; } final String id() { return id; } final void trigger(JobTrigger trigger) { E.NPE(trigger); this.trigger = trigger; } final Job addParallelJob(Job thatJob) { return parallelJobs.add(thatJob); } final Job addFollowingJob(Job thatJob) { return followingJobs.add(thatJob); } final Job addPrecedenceJob(Job thatJob) { return precedenceJobs.add(thatJob); } @Override public void run() { invokeParallelJobs(); runPrecedenceJobs(); try { if (Act.isDev() && app.isStarted()) { app.checkUpdates(false); } doJob(); } catch (Throwable e) { boolean isFatal = FATAL_EXCEPTIONS.contains(e.getClass()) || Error.class.isInstance(e); Throwable cause = e; if (!isFatal) { cause = e.getCause(); while (null != cause) { isFatal = FATAL_EXCEPTIONS.contains(cause.getClass()); if (isFatal) { break; } cause = cause.getCause(); } } if (isFatal) { if (Act.isDev()) { app.setBlockIssue(e); } else { Act.shutdown(App.instance()); destroy(); if (App.instance().isMainThread()) { if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } throw E.unexpected(e); } else { logger.fatal(cause, "Fatal error executing job %s", id()); } } return; } // TODO inject Job Exception Handling mechanism here logger.warn(e, "error executing job %s", id()); } finally { if (!isDestroyed()) { executed = true; if (isOneTime()) { App app = App.instance(); if (app.isStarted()) { manager.removeJob(this); } else { app.eventBus().bind(SysEventId.POST_START, new SysEventListenerBase() { @Override public void on(EventObject event) throws Exception { manager.removeJob(Job.this); } }); } } progress.destroy(); } } runFollowingJobs(); } protected void _before() { } protected void doJob(){ JobContext.init(); try { _before(); if (null != worker) { worker.apply(); } } finally { JobContext.clear(); scheduleNextInvocation(); _finally(); } } protected void _finally() {} protected void cancel() { manager.cancel(id()); } private void runPrecedenceJobs() { precedenceJobs.runSubJobs(); } private void runFollowingJobs() { followingJobs.runSubJobs(); } private void invokeParallelJobs() { parallelJobs.runSubJobs(true); } protected final JobManager manager() { return manager; } protected void scheduleNextInvocation() { if (null != trigger) trigger.scheduleFollowingCalls(manager(), this); } private static Job of(String jobId, final Runnable runnable, JobManager manager, boolean oneTime) { return new Job(jobId, manager, new F0() { @Override public Object apply() throws NotAppliedException, $.Break { runnable.run(); return null; } }, oneTime); } private static Job of(final Runnable runnable, JobManager manager, boolean oneTime) { return of(Act.cuid(), runnable, manager, oneTime); } static Job once(final Runnable runnable, JobManager manager) { return of(runnable, manager, true); } static Job once(String jobId, final Runnable runnable, JobManager manager) { return of(jobId, runnable, manager, true); } static Job multipleTimes(final Runnable runnable, JobManager manager) { return of(runnable, manager, false); } static Job multipleTimes(String jobId, final Runnable runnable, JobManager manager) { return of(jobId, runnable, manager, false); } private static String uuid() { return UUID.randomUUID().toString(); } // ---- Java bean accessors public String getId() { return id; } public boolean isExecuted() { return executed; } public boolean isOneTime() { return oneTime; } public JobTrigger trigger() { return trigger; } static Job virtualJob(String jobId, JobManager manager) { Job job = new Job(jobId); job.manager = manager; return job; } }
X Tutup