X Tutup
Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions src/main/java/com/github/dockerjava/api/command/EventsCmd.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

import java.util.concurrent.ExecutorService;


/**
* Get events
*
* @param since - Show all events created since timestamp
* @param until - Stream events until this timestamp
*/
public interface EventsCmd extends DockerCmd<Void> {
public interface EventsCmd extends DockerCmd<ExecutorService> {
public EventsCmd withSince(String since);

public EventsCmd withUntil(String until);
Expand All @@ -18,11 +19,9 @@ public interface EventsCmd extends DockerCmd<Void> {
public String getUntil();

public EventCallback getEventCallback();

public EventsCmd withEventCallback(EventCallback eventCallback);

public ExecutorService getExecutorService();

public void stop();

public static interface Exec extends DockerCmdExec<EventsCmd, Void> {
public static interface Exec extends DockerCmdExec<EventsCmd, ExecutorService> {
}
}
31 changes: 12 additions & 19 deletions src/main/java/com/github/dockerjava/core/command/EventsCmdImpl.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
package com.github.dockerjava.core.command;

import java.util.concurrent.ExecutorService;

import com.github.dockerjava.api.command.EventCallback;
import com.github.dockerjava.api.command.EventsCmd;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Stream docker events
*/
public class EventsCmdImpl extends AbstrDockerCmd<EventsCmd, Void> implements EventsCmd {
public class EventsCmdImpl extends AbstrDockerCmd<EventsCmd, ExecutorService> implements EventsCmd {

private final EventCallback eventCallback;

private ExecutorService executorService = Executors.newSingleThreadExecutor();
private String since;
private String until;
private EventCallback eventCallback;

public EventsCmdImpl(EventsCmd.Exec exec, EventCallback eventCallback) {
super(exec);
this.eventCallback = eventCallback;
withEventCallback(eventCallback);
}

@Override
Expand All @@ -33,6 +30,12 @@ public EventsCmd withUntil(String until) {
this.until = until;
return this;
}

@Override
public EventsCmd withEventCallback(EventCallback eventCallback) {
this.eventCallback = eventCallback;
return this;
}

@Override
public String getSince() {
Expand All @@ -50,17 +53,7 @@ public EventCallback getEventCallback() {
}

@Override
public ExecutorService getExecutorService() {
return executorService;
}

@Override
public void stop() {
executorService.shutdown();
}

@Override
public Void exec() {
public ExecutorService exec() {
return super.exec();
}

Expand Down
62 changes: 57 additions & 5 deletions src/main/java/com/github/dockerjava/jaxrs/EventsCmdExec.java
Original file line number Diff line number Diff line change
@@ -1,28 +1,80 @@
package com.github.dockerjava.jaxrs;

import java.io.InputStream;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.dockerjava.api.command.EventCallback;
import com.github.dockerjava.api.command.EventsCmd;
import com.github.dockerjava.api.model.Event;
import com.github.dockerjava.api.model.EventNotifier;
import com.google.common.base.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;

public class EventsCmdExec extends AbstrDockerCmdExec<EventsCmd, Void> implements EventsCmd.Exec {
public class EventsCmdExec extends AbstrDockerCmdExec<EventsCmd, ExecutorService> implements EventsCmd.Exec {
private static final Logger LOGGER = LoggerFactory.getLogger(EventsCmdExec.class);

public EventsCmdExec(WebTarget baseResource) {
super(baseResource);
}

@Override
protected Void execute(EventsCmd command) {
protected ExecutorService execute(EventsCmd command) {
ExecutorService executorService = Executors.newSingleThreadExecutor();

WebTarget webResource = getBaseResource().path("/events")
.queryParam("since", command.getSince())
.queryParam("until", command.getUntil());

LOGGER.trace("GET: {}", webResource);
EventNotifier eventNotifier = EventNotifier.create(command.getEventCallback(), webResource);
command.getExecutorService().submit(eventNotifier);
return null;
executorService.submit(eventNotifier);
return executorService;
}

private static class EventNotifier implements Callable<Void> {
private static final JsonFactory JSON_FACTORY = new JsonFactory();
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private final EventCallback eventCallback;
private final WebTarget webTarget;

private EventNotifier(EventCallback eventCallback, WebTarget webTarget) {
this.eventCallback = eventCallback;
this.webTarget = webTarget;
}

public static EventNotifier create(EventCallback eventCallback, WebTarget webTarget) {
Preconditions.checkNotNull(eventCallback, "An EventCallback must be provided");
Preconditions.checkNotNull(webTarget, "An WebTarget must be provided");
return new EventNotifier(eventCallback, webTarget);
}

@Override
public Void call() throws Exception {
Response response = webTarget.request().get(Response.class);
InputStream inputStream = response.readEntity(InputStream.class);
try {
JsonParser jp = JSON_FACTORY.createParser(inputStream);
while (jp.nextToken() != JsonToken.END_OBJECT && !jp.isClosed()) {
eventCallback.onEvent(OBJECT_MAPPER.readValue(jp, Event.class));
}
} finally {
if (response != null) {
response.close();
}
}
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.github.dockerjava.api.command.EventsCmd;
import com.github.dockerjava.api.model.Event;
import com.github.dockerjava.client.AbstractDockerClientTest;

import org.testng.ITestResult;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.AfterTest;
Expand All @@ -16,6 +17,7 @@
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

public class EventsCmdImplTest extends AbstractDockerClientTest {
Expand Down Expand Up @@ -59,11 +61,11 @@ public void testEventStreamTimeBound() throws InterruptedException, IOException
EventCallback eventCallback = new EventCallbackTest(countDownLatch);

EventsCmd eventsCmd = dockerClient.eventsCmd(eventCallback).withSince(startTime).withUntil(endTime);
eventsCmd.exec();
ExecutorService executorService = eventsCmd.exec();

boolean zeroCount = countDownLatch.await(5, TimeUnit.SECONDS);

eventsCmd.stop();
executorService.shutdown();
assertTrue(zeroCount, "Expected 4 events, [create, start, die, stop]");
}

Expand All @@ -76,12 +78,12 @@ public void testEventStreaming() throws InterruptedException, IOException {
EventCallback eventCallback = new EventCallbackTest(countDownLatch);

EventsCmd eventsCmd = dockerClient.eventsCmd(eventCallback).withSince(getEpochTime());
eventsCmd.exec();
ExecutorService executorService = eventsCmd.exec();

generateEvents();

boolean zeroCount = countDownLatch.await(5, TimeUnit.SECONDS);
eventsCmd.stop();
executorService.shutdown();
assertTrue(zeroCount, "Expected 4 events, [create, start, die, stop]");
}

Expand Down
X Tutup