Gitlab Community Edition Instance

Commit a7d10828 authored by mhellka's avatar mhellka
Browse files

Allow limited introspection into running tasks.

parent be5a1158
Pipeline #120053 passed with stage
in 4 minutes and 57 seconds
package de.gwdg.cdstar.rest.v3;
import java.util.HashMap;
import de.gwdg.cdstar.rest.api.RestBlueprint;
import de.gwdg.cdstar.rest.api.RestConfig;
import de.gwdg.cdstar.rest.api.RestContext;
import de.gwdg.cdstar.runtime.RuntimeContext;
import de.gwdg.cdstar.runtime.tasks.Task;
import de.gwdg.cdstar.runtime.tasks.TaskService;
import de.gwdg.cdstar.web.common.model.ErrorResponse;
import de.gwdg.cdstar.web.common.model.TaskInfo;
/**
* Show and modify (cancel) tasks. Anyone knowing a task id can view and cancel
* it.
*
* TODO: Support tasks that can only be canceled and/or viewed in detail by
* admins or specific users.
*/
public class ServiceTasksEndpoint implements RestBlueprint {
private TaskService taskService;
@Override
public void configure(RestConfig cfg) {
taskService = cfg.lookup(RuntimeContext.class).lookupRequired(TaskService.class);
cfg.route("/_tasks/<id>").GET(this::getTaskInfo);
cfg.route("/_tasks/<id>").DELETE(this::cancelTask);
}
public TaskInfo getTaskInfo(RestContext ctx) {
final String taskId = ctx.getPathParam("id");
final Task task = taskService.getTask(taskId).orElseThrow(
() -> new ErrorResponse(404, "TaskNotFound", "No task with this id, or task result is expired"));
final TaskInfo info = new TaskInfo();
info.id = task.getId();
info.name = task.getName();
info.params = task.getParameterMap();
info.state = new HashMap<>(task.getStateMap());
info.state.keySet().removeIf(key -> key.startsWith("priv:") || key.startsWith("sys:"));
info.created = task.getCreated();
info.started = task.getStarted().orElse(null);
return info;
}
public Void cancelTask(RestContext ctx) {
final String taskId = ctx.getPathParam("id");
final Task task = taskService.getTask(taskId).orElseThrow(
() -> new ErrorResponse(404, "TaskNotFound", "No task with this id, or task result is expired"));
task.cancel();
return null;
}
}
package de.gwdg.cdstar.runtime.lts;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
......@@ -30,8 +31,7 @@ class MigrationTaskRunner implements TaskRunner, RuntimeListener {
}
private String getRequiredParameter(Task task, String param) {
return task
.getOptional(param)
return Optional.ofNullable(task.getParameterMap().get(param))
.orElseThrow(() -> new IllegalArgumentException("Missing task parameter: " + param));
}
......
package de.gwdg.cdstar.runtime.tasks;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
......@@ -10,33 +11,69 @@ public interface Task {
String getName();
/**
* Returns an immutable map of string parameters)
* Returns an immutable map of string parameters, as defined during task
* creation.
*/
Map<String, String> getParameterMap();
/**
* Return a parameter, or null if the parameter is not defined.
* Returns a mutable and thread-save map of task runtime state. These can be
* set by the TaskRunner, e.g. to communicate the current progress of a long
* running task. The state map is NOT persisted in any way.
*
* Keys starting with <code>priv:</code> or <code>sys:</code> are usually
* not shown or exposed to normal users. The <code>sys:</code> key namespace
* may have special meaning. All other keys should be considered public
* knowlege.
*/
default String get(String key) {
return getParameterMap().get(key);
}
default Optional<String> getOptional(String key) {
return Optional.ofNullable(get(key));
}
Map<String, String> getStateMap();
/**
* Try to cancel this task. If it was not started yet, it is immediately
* removed from the task queue and not started. After it was passed to a
* {@link TaskRunner} however, it depends on the runner implementation if
* and how fast canceled tasks are stopped. Canceling a task that is already
* finished, failed, canceled or interrupted has no effect.
*/
void cancel();
/**
* Return true of the task was canceled.
* Return true if the task was canceled.
*/
boolean isCanceled();
/**
* Return true if the task failed.
*/
boolean isFailed();
/**
* Return true if the task completed (successfully or not).
*/
default boolean isCompleted() {
return getCompleted().isPresent();
}
/**
* Return true if the task processing should be stopped as soon as possible,
* e.g. because the runtime is currently shutting down.
*/
boolean isInterrupted();
/**
* Return the point in time this task was submitted to the
* {@link TaskService}.
*/
Instant getCreated();
/**
* Return the point in time this task was passed to its {@link TaskRunner}.
*/
Optional<Instant> getStarted();
/**
* Return the point in time this task was completed.
*/
Optional<Instant> getCompleted();
}
\ No newline at end of file
package de.gwdg.cdstar.runtime.tasks;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import de.gwdg.cdstar.ta.UserTransaction;
......@@ -12,6 +14,7 @@ class TaskImpl implements PreparedTask {
private final String id;
private final String name;
private final Map<String, String> params;
private Map<String, String> state;
private volatile boolean canceled;
private CompletableFuture<Void> future;
private volatile boolean interrupted;
......@@ -21,12 +24,17 @@ class TaskImpl implements PreparedTask {
boolean submitted = false;
boolean bound = false;
private Instant completed;
private Instant started;
private final Instant created;
private Throwable failed;
TaskImpl(TaskServiceImpl owner, String id, String name, Map<String, String> params) {
this.owner = owner;
this.id = id;
this.name = name;
this.params = params == null ? Collections.emptyMap() : Collections.unmodifiableMap(new HashMap<>(params));
created = Instant.now();
}
@Override
......@@ -55,13 +63,41 @@ class TaskImpl implements PreparedTask {
}
@Override
public Optional<String> getOptional(String key) {
return Optional.ofNullable(params.get(key));
public synchronized Map<String, String> getStateMap() {
if (state == null)
state = new ConcurrentHashMap<>();
return state;
}
@Override
public Instant getCreated() {
return created;
}
@Override
public Optional<Instant> getStarted() {
return Optional.ofNullable(started);
}
@Override
public Optional<Instant> getCompleted() {
return Optional.ofNullable(completed);
}
void setFailed(Throwable ex) {
setCompleted();
failed = ex;
}
@Override
public boolean isFailed() {
return failed != null;
}
@Override
public synchronized void cancel() {
canceled = true;
setCompleted();
owner.rollbackTask(this);
if (future != null)
future.cancel(false);
......@@ -83,6 +119,16 @@ class TaskImpl implements PreparedTask {
return interrupted;
}
void setStarted() {
if (started == null)
started = Instant.now();
}
void setCompleted() {
if (completed == null)
completed = Instant.now();
}
/**
* Bind a future to this task. If the task was canceled or interrupted,
* immediately cancel or interrupt the future.
......
......@@ -14,9 +14,7 @@ import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
import de.gwdg.cdstar.GromitIterable;
......@@ -42,14 +40,8 @@ public class TaskServiceImpl implements RuntimeListener, TaskService {
private static final Logger log = LoggerFactory.getLogger(TaskServiceImpl.class);
private Counter mTotal;
private Counter mRunning;
private Counter mFinished;
private Counter mFailed;
private Timer mTimer;
private TaskServiceStats stats;
private final GromitIterable<TaskRunner> runners = new GromitIterable<>();
private final Map<String, TaskImpl> enqueuedTasks = new ConcurrentHashMap<>();
private final Map<String, BoundTaskGroup> boundTaskGroups = new ConcurrentHashMap<>();
......@@ -77,13 +69,7 @@ public class TaskServiceImpl implements RuntimeListener, TaskService {
public void onStartup(RuntimeContext ctx) throws Exception {
io.createPaths();
final MetricRegistry metrics = ctx.lookupRequired(MetricRegistry.class);
mTotal = metrics.counter("tasks.total");
mRunning = metrics.counter("tasks.running");
mFinished = metrics.counter("tasks.finished");
mFailed = metrics.counter("tasks.failed");
mTimer = metrics.timer("tasks.runtime");
stats = new TaskServiceStats("tasks", ctx.lookupRequired(MetricRegistry.class));
ctx.lookupAll(TaskRunner.class).forEach(runners::addIfNotPresent);
pool = ctx.lookupRequired(PoolService.class).getNamedPool("cdstar/tasks");
cron = ctx.lookupRequired(CronService.class);
......@@ -108,7 +94,6 @@ public class TaskServiceImpl implements RuntimeListener, TaskService {
}
@Override
public synchronized void onShutdown(RuntimeContext ctx) {
for (final TaskImpl task : enqueuedTasks.values()) {
......@@ -186,7 +171,7 @@ public class TaskServiceImpl implements RuntimeListener, TaskService {
void enqueueTask(TaskImpl task) {
final String taskId = task.getId();
enqueuedTasks.put(taskId, task);
mTotal.inc();
stats.total.inc();
pool.execute(() -> tryRunTask(taskId));
log.debug("Task enqueued: {}", task);
}
......@@ -238,7 +223,7 @@ public class TaskServiceImpl implements RuntimeListener, TaskService {
* task runner.
*/
private void tryRunTask(String taskId) {
mRunning.inc();
stats.running.inc();
log.debug("Task started: {}", taskId);
final TaskImpl task = enqueuedTasks.get(taskId);
......@@ -256,11 +241,12 @@ public class TaskServiceImpl implements RuntimeListener, TaskService {
if (runner == null)
throw new MissingTaskRunnerException(task.getName());
task.setStarted();
final CompletableFuture<Void> cf = runner.run(task);
if (cf == null)
throw new FatalTaskException("Task runner returned null");
final Context timer = mTimer.time();
final Context timer = stats.runtime.time();
cf.whenComplete((the, err) -> timer.close());
task.setFuture(cf);
......@@ -272,7 +258,7 @@ public class TaskServiceImpl implements RuntimeListener, TaskService {
}
private void whenComplete(TaskImpl task, Throwable err) {
mRunning.dec();
stats.running.dec();
final String taskId = task.getId();
// Ensure task is marked as canceled/interrupted if needed
......@@ -285,7 +271,8 @@ public class TaskServiceImpl implements RuntimeListener, TaskService {
// even if the actual task failed)
if (err == null || task.isCanceled()) {
log.debug("Task finished: {}", task);
mFinished.inc();
task.setCompleted();
stats.finished.inc();
io.forget(task.getId(), true);
enqueuedTasks.remove(taskId);
return; // Done
......@@ -300,7 +287,8 @@ public class TaskServiceImpl implements RuntimeListener, TaskService {
// Task exceeded retry limits or failed with an unrecoverable error
if (task.errorCount > maxErrors || err instanceof FatalTaskException || err instanceof Error) {
mFailed.inc();
stats.failed.inc();
task.setFailed(err);
enqueuedTasks.remove(taskId);
log.error("Task failed (no retry): {}", task, err);
return;
......@@ -320,19 +308,19 @@ public class TaskServiceImpl implements RuntimeListener, TaskService {
}
public long getTasksTotal() {
return mTotal.getCount();
return stats.total.getCount();
}
public long getTasksRunning() {
return mRunning.getCount();
return stats.running.getCount();
}
public long getTasksFinished() {
return mFinished.getCount();
return stats.finished.getCount();
}
public long getTasksFailed() {
return mFailed.getCount();
return stats.failed.getCount();
}
public void setDelay(long minDelay, long maxDelay, TimeUnit unit) {
......
package de.gwdg.cdstar.runtime.tasks;
import com.codahale.metrics.Counter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
class TaskServiceStats {
public Counter total;
public Counter running;
public Counter finished;
public Counter failed;
public Timer runtime;
public TaskServiceStats(String prefix, MetricRegistry metrics) {
total = metrics.counter(prefix + ".total");
running = metrics.counter(prefix + ".running");
finished = metrics.counter(prefix + ".finished");
failed = metrics.counter(prefix + ".failed");
runtime = metrics.timer(prefix + ".runtime");
}
}
\ No newline at end of file
......@@ -192,7 +192,7 @@ public class TaskServiceTest {
latch.countDown();
throw new IllegalStateException("Will fail once, then succeed");
}
assertEquals("value", task.getOptional("param").get());
assertEquals("value", task.getParameterMap().get("param"));
latch.countDown();
return CompletableFuture.completedFuture(null);
}
......
package de.gwdg.cdstar.web.common.model;
import java.time.Instant;
import java.util.Map;
public class TaskInfo {
public String id;
public String name;
public Instant created;
public Instant started;
public Map<String, String> params;
public Map<String, String> state;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment