Gitlab Community Edition Instance

Commit b2f9b9f4 authored by mhellka's avatar mhellka
Browse files

Fix: Durable session should not timeout out during active request.

Also reworked the way AsyncContext timeouts are tracked.
parent 2466ecbc
Pipeline #127643 passed with stages
in 10 minutes and 5 seconds
......@@ -62,14 +62,23 @@ public interface CDStarSession extends AutoCloseable {
long setTimeout(long millis);
/**
* Return the timeout value, which might be -1 for sessions that never
* expire.
* Disable (or re-enable) timeout processing for this session. Useful for tasks
* that take a long time. Re-enabling the timeout (passing false) will also
* refresh the timeout.
*/
void setTimeoutSuspended(boolean suspend);
boolean isTimeoutSuspended();
/**
* Return the timeout value, which might be -1 for sessions that never expire.
*/
long getTimeout();
/**
* @return Remaining time in milliseconds, which is 0 for expired sessions
* and -1 for sessions that do not expire.
* Return the remaining time in milliseconds, which is 0 for expired sessions
* and -1 for sessions that do not expire, either because they have an infinite
* timeout or the timeout is suspended.
*/
long getRemainingTime();
......
......@@ -130,7 +130,7 @@ public abstract class RestContextBase implements RestContext {
public void abort(Throwable err) {
if (isCommitted()) {
log.error("Cannot write error to committed response.", new IllegalStateException(err));
log.error("Cannot write error to committed or closed response.", new IllegalStateException(err));
close();
return;
}
......
......@@ -2,7 +2,9 @@ package de.gwdg.cdstar.rest.api;
import java.io.Closeable;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import de.gwdg.cdstar.FailableConsumer;
......@@ -99,34 +101,38 @@ public interface AsyncContext extends Closeable {
* any case, the write buffer is no longer needed and can be recycled or re-used
* for the next write request.
*
* @throws IllegalStateException if there is a pending write request.
* If the task takes longer than a given {@code timeout}, it is aborted with a
* {@link TimeoutException}. Null or negative durations will not disable the
* timeout, but set it to a sensible default value.
*
* @throws IllegalStateException if there is a pending write request.
*/
void asyncWrite(ByteBuffer buffer, AsyncResultCallback callback);
void asyncWrite(ByteBuffer buffer, AsyncResultCallback callback, Duration timeout);
@Deprecated
default void asyncWrite(ByteBuffer buffer, AsyncResultCallback callback) {
asyncWrite(buffer, callback, null);
}
/**
* Same as {@link #asyncWrite(ByteBuffer, AsyncResultCallback)} but accepts two
* separate callbacks for success and error. If the success callback throws an
* exception, the error callback is also called.
* Same as {@link #asyncWrite(ByteBuffer, AsyncResultCallback, Duration)} but
* accepts two separate callbacks for success and error. If the success callback
* throws an exception, the error callback is also called.
*/
default void asyncWrite(ByteBuffer buffer, FailableConsumer<ByteBuffer> onSuccess,
Consumer<Throwable> onError) {
Consumer<Throwable> onError, Duration timeout) {
asyncWrite(buffer, (a, buff, error) -> {
if (error == null)
onSuccess.accept(buffer);
else
onError.accept(error);
});
}, timeout);
}
/**
* Same as {@link #asyncWrite(ByteBuffer, AsyncResultCallback)} but returns a
* {@link CompletableFuture}.
*/
default CompletableFuture<ByteBuffer> asyncWrite(ByteBuffer buffer) {
final CompletableFuture<ByteBuffer> cf = new CompletableFuture<>();
asyncWrite(buffer, cf::complete, cf::completeExceptionally);
return cf;
@Deprecated
default void asyncWrite(ByteBuffer buffer, FailableConsumer<ByteBuffer> onSuccess,
Consumer<Throwable> onError) {
asyncWrite(buffer, onSuccess, onError, null);
}
/**
......@@ -143,11 +149,20 @@ public interface AsyncContext extends Closeable {
* {@link #endOfStream()} returns true, will raise an
* {@link IllegalStateException}.
*
* If the task takes longer than a given {@code timeout}, it is aborted with a
* {@link TimeoutException}. Null or negative durations will not disable the
* timeout, but set it to a sensible default value.
*
* @throws IllegalStateException if there is a pending read request, if the
* stream is already closed, or if
* {@link #endOfStream()} returns true.
*/
void asyncRead(ByteBuffer buffer, AsyncResultCallback callback);
void asyncRead(ByteBuffer buffer, AsyncResultCallback callback, Duration timeout);
@Deprecated
default void asyncRead(ByteBuffer buffer, AsyncResultCallback callback) {
asyncRead(buffer, callback, null);
}
/**
* Same as {@link #asyncRead(ByteBuffer buffer, AsyncResultCallback)} but
......@@ -155,23 +170,19 @@ public interface AsyncContext extends Closeable {
* throws an exception, the error callback is also called.
*/
default void asyncRead(ByteBuffer buffer, final FailableConsumer<ByteBuffer> onSuccess,
final Consumer<Throwable> onError) {
final Consumer<Throwable> onError, Duration timeout) {
asyncRead(buffer, (a, b, error) -> {
if (error == null)
onSuccess.accept(b);
else
onError.accept(error);
});
}, timeout);
}
/**
* Same as {@link #asyncRead(AsyncResultCallback)} but returns a
* {@link CompletableFuture}.
*/
default CompletableFuture<ByteBuffer> asyncRead(ByteBuffer buffer) {
final CompletableFuture<ByteBuffer> cf = new CompletableFuture<>();
asyncRead(buffer, cf::complete, cf::completeExceptionally);
return cf;
@Deprecated
default void asyncRead(ByteBuffer buffer, FailableConsumer<ByteBuffer> onSuccess,
Consumer<Throwable> onError) {
asyncRead(buffer, onSuccess, onError, null);
}
/**
......
......@@ -239,9 +239,9 @@ public interface RestContext extends AutoCloseable {
}
/**
* Return true if status and headers have already been sent. Calling
* {@link #status(int)} or {@link #header(String, String)} after this has no
* effect.
* Return true if status and headers have already been sent, or the context was
* closed. Calling {@link #status(int)} or {@link #header(String, String)} after
* this has no effect.
*/
boolean isCommitted();
......
......@@ -2,8 +2,8 @@ package de.gwdg.cdstar.rest.servlet;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
......@@ -32,18 +32,22 @@ import de.gwdg.cdstar.rest.api.AsyncResultCallback;
import de.gwdg.cdstar.rest.api.RestContext;
public class AsyncContextImpl implements AsyncContext, WriteListener, ReadListener {
private static final Logger log = LoggerFactory.getLogger(AsyncContext.class);
private static final Logger log = LoggerFactory.getLogger(AsyncContextImpl.class);
private static final ByteBuffer nullBuffer = ByteBuffer.allocate(0);
private static final BufferPool bufferPool = new BufferPool(16, 1024 * 64);
private static final ScheduledExecutorService scheduler;
private static final Set<AsyncContextImpl> asyncRequests = Collections
.newSetFromMap(new ConcurrentHashMap<AsyncContextImpl, Boolean>());
private static final Set<AsyncContextImpl> asyncRequests = ConcurrentHashMap.newKeySet();
static long defaultReadTimeout = 30 * 1000;
static long defaultWriteTimeout = 30 * 1000;
static long defaultIdleTimeout = 30 * 60 * 1000;
/*
* A client connected via first generation mobile internet (or a 56k modem)
* could fill a default buffer (64k) in under 10 seconds. We allow 30 seconds
* (17kbit/s).
*/
static final long defaultReadTimeout = 30 * 1000;
static final long defaultWriteTimeout = 30 * 1000;
static final long defaultIdleTimeout = 30 * 60 * 1000;
static {
scheduler = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("async-request-reaper").deamon(true));
......@@ -52,8 +56,8 @@ public class AsyncContextImpl implements AsyncContext, WriteListener, ReadListen
for (final AsyncContextImpl ac : asyncRequests)
ac.enforceTimeout(now);
if (asyncRequests.size() > 1000) {
log.warn("Possible async request leak (or DOS attack). Found {} open requests.",
Integer.toString(asyncRequests.size()));
log.warn("Possible async request leak (or DoS attack). Found {} open async requests.",
Integer.toString(asyncRequests.size()));
}
}, 1, 1, TimeUnit.SECONDS);
}
......@@ -67,13 +71,13 @@ public class AsyncContextImpl implements AsyncContext, WriteListener, ReadListen
private AsyncResultCallback readCallback;
private ByteBuffer readBuffer;
private long readStarted;
private long readExpires;
private boolean readEndOfStream;
private boolean readPartial = false;
private AsyncResultCallback writeCallback;
private ByteBuffer writeBuffer;
private long writeStarted;
private long writeExpires;
// Tracks the last time an IO task completed, or #keepAlive() was called.
private long lastActivity;
......@@ -87,28 +91,27 @@ public class AsyncContextImpl implements AsyncContext, WriteListener, ReadListen
/**
* Check if there is a timeout and abort the request with an error if necessary.
*
* The current time is passed in as a value so there is only one (expensive)
* {@link System#currentTimeMillis()} call for all current requests. This may
* result in false negatives (not aborting a timed-out request) but these
* requests are reaped the next round, so this is not an issue.
* The current time is passed in as a value so only one
* {@link System#currentTimeMillis()} call per rount is required to check all
* requests.
*
* @return true if the request was reaped, false otherwise.
*/
private synchronized boolean enforceTimeout(long now) {
if (readCallback != null && now - readStarted > readTimeout) {
if (readCallback != null && now > readExpires) {
onError(new TimeoutException("Request timed out (reading)"));
return true;
}
if (writeCallback != null && now - writeStarted > writeTimeout) {
if (writeCallback != null && now > writeExpires) {
onError(new TimeoutException("Request timed out (writing)"));
return true;
}
if (readCallback == null && writeCallback == null && now - lastActivity > idleTimeout) {
log.warn("Async request starved on unresponsive application. No activity for {}/{} ms",
now - lastActivity, idleTimeout);
now - lastActivity, idleTimeout);
onApplicationError(new TimeoutException("Request timed out (idle)"));
return true;
}
......@@ -123,6 +126,9 @@ public class AsyncContextImpl implements AsyncContext, WriteListener, ReadListen
* complete. The idle timeout limits the time the context is kept alive without
* waiting for read or write requests.
*
* Note that slow clients need longer to fill large buffers, so make sure that
* the timeouts are larger than {@code bufferSize/minRate}.
*
* @param read Timeout for read requests.
* @param write Timeout for write requests.
* @param idle Idle timeout. The context is considered idle while no read or
......@@ -225,8 +231,14 @@ public class AsyncContextImpl implements AsyncContext, WriteListener, ReadListen
return ctx;
}
private long getExpireTime(Duration userTimeout, long defaultTimeout) {
if (userTimeout == null || userTimeout.isNegative())
return System.currentTimeMillis() + defaultTimeout;
return System.currentTimeMillis() + userTimeout.toMillis();
}
@Override
public synchronized void asyncWrite(ByteBuffer buffer, AsyncResultCallback callback) {
public synchronized void asyncWrite(ByteBuffer buffer, AsyncResultCallback callback, Duration timeout) {
if (error != null)
throw new IllegalStateException("Context in error state.");
if (writeCallback != null)
......@@ -234,7 +246,7 @@ public class AsyncContextImpl implements AsyncContext, WriteListener, ReadListen
writeCallback = callback;
writeBuffer = buffer;
writeStarted = System.currentTimeMillis();
writeExpires = getExpireTime(timeout, writeTimeout);
if (output == null) {
try {
......@@ -251,7 +263,7 @@ public class AsyncContextImpl implements AsyncContext, WriteListener, ReadListen
}
@Override
public synchronized void asyncRead(ByteBuffer buffer, AsyncResultCallback callback) {
public synchronized void asyncRead(ByteBuffer buffer, AsyncResultCallback callback, Duration timeout) {
if (error != null)
throw new IllegalStateException("Context in error state.");
if (readCallback != null)
......@@ -263,7 +275,7 @@ public class AsyncContextImpl implements AsyncContext, WriteListener, ReadListen
readCallback = callback;
readBuffer = buffer;
readStarted = System.currentTimeMillis();
readExpires = getExpireTime(timeout, readTimeout);
if (input == null) {
try {
......@@ -288,26 +300,31 @@ public class AsyncContextImpl implements AsyncContext, WriteListener, ReadListen
/**
* Called on read- or write errors or IO timeouts. All three situations are
* non-recoverable and no attempt is made to send an error response, as these
* errors usually mean that the client disconnected already. Pending callbacks
* are notified about the error, though.
* errors usually mean that the client disconnected already. The request is just
* closed. Pending callbacks are notified about the error, though.
*
* TODO: Read timeouts are a situation where a response could be sent. We
* currently only set an appropriate status code (408 Request Timeout).
*/
@Override
public synchronized void onError(Throwable t) {
log.debug("Async request closed (client disconnect?)", t);
log.debug("Async request failed", t);
if (error == null) {
error = t;
final AsyncResultCallback rcb = readCallback;
final AsyncResultCallback wcb = writeCallback;
final ByteBuffer wbuff = writeBuffer;
readCallback = writeCallback = null;
readBuffer = writeBuffer = null;
readStarted = writeStarted = 0;
if (rcb != null)
runInPool(rcb, nullBuffer, t);
if (wcb != null)
runInPool(wcb, wbuff, t);
ctx.status(error instanceof TimeoutException ? 408 : 500);
ctx.close();
} else {
error.addSuppressed(t);
......@@ -315,8 +332,9 @@ public class AsyncContextImpl implements AsyncContext, WriteListener, ReadListen
}
/**
* Called on unhandled application errors (thrown from callbacks) or timeouts
* (while idle). Tries to send an error response, then closes the request.
* Called on idle timeouts (application dropped the ball) or if the error
* handler of a read/write task failed. We have no one to ask for help now. Try
* to send an error response and close the request.
*/
public synchronized void onApplicationError(Throwable t) {
if (readCallback != null || writeCallback != null)
......@@ -324,10 +342,14 @@ public class AsyncContextImpl implements AsyncContext, WriteListener, ReadListen
if (error == null) {
error = t;
if (!(ctx.isClosed() || ctx.isCommitted()))
// Error handlers may override this, but it's nice to set a sensible default
ctx.status(error instanceof TimeoutException ? 408 : 500);
if (!ctx.isCommitted())
ctx.abort(t);
else
ctx.close();
ctx.close();
} else {
error.addSuppressed(t);
}
......@@ -347,7 +369,7 @@ public class AsyncContextImpl implements AsyncContext, WriteListener, ReadListen
do {
final int n;
if (input.isFinished()
|| (n = input.read(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining())) < 0) {
|| (n = input.read(buf.array(), buf.arrayOffset() + buf.position(), buf.remaining())) < 0) {
readEndOfStream = true;
} else {
buf.position(buf.position() + n);
......@@ -359,7 +381,6 @@ public class AsyncContextImpl implements AsyncContext, WriteListener, ReadListen
readCallback = null;
readBuffer = null;
readStarted = 0;
keepAlive();
runInPool(cb, buf, null);
......@@ -442,7 +463,6 @@ public class AsyncContextImpl implements AsyncContext, WriteListener, ReadListen
// All data sent.
writeCallback = null;
writeBuffer = null;
writeStarted = 0;
keepAlive();
runInPool(callback, b, null);
......@@ -459,34 +479,30 @@ public class AsyncContextImpl implements AsyncContext, WriteListener, ReadListen
@Override
public CompletableFuture<Void> consumeRequest() {
if (readEndOfStream) {
if (endOfStream())
return CompletableFuture.completedFuture(null);
}
// TODO: Switch to input.seek() and bypass the readTask creation?
final CompletableFuture<Void> cf = new CompletableFuture<>();
final AsyncResultCallback callback = new AsyncResultCallback() {
@Override
public void done(AsyncContext ac, ByteBuffer buffer, Throwable error) throws Exception {
try {
if (error != null) {
cf.completeExceptionally(error);
} else if (!buffer.hasRemaining()) {
ac.recycleBuffer(buffer);
cf.complete(null);
} else {
buffer.clear();
ac.asyncRead(buffer, this);
try {
asyncRead(getBuffer(), new AsyncResultCallback() {
@Override
public void done(AsyncContext ac, ByteBuffer buffer, Throwable error) throws Exception {
try {
if (error != null) {
cf.completeExceptionally(error);
} else if (ac.endOfStream()) {
ac.recycleBuffer(buffer);
cf.complete(null);
} else {
buffer.clear();
ac.asyncRead(buffer, this, null);
}
} catch (final Exception e) {
cf.completeExceptionally(e);
}
} catch (final Exception e) {
cf.completeExceptionally(e);
}
}
};
try {
asyncRead(getBuffer(), callback);
}, null);
} catch (final Exception e) {
cf.completeExceptionally(e);
}
......
......@@ -79,7 +79,7 @@ public class ServletRestContext extends RestContextBase {
@Override
public boolean isCommitted() {
return rs.isCommitted();
return isClosed() || rs.isCommitted();
}
@Override
......
package de.gwdg.cdstar.rest.testutils;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
......@@ -49,7 +50,7 @@ public class TestAsyncContext implements AsyncContext {
}
@Override
public synchronized void asyncWrite(ByteBuffer buffer, AsyncResultCallback callback) {
public synchronized void asyncWrite(ByteBuffer buffer, AsyncResultCallback callback, Duration timeout) {
if (ctx.isClosed())
throw new IllegalStateException("Context closed");
if (writePending)
......@@ -69,7 +70,7 @@ public class TestAsyncContext implements AsyncContext {
}
@Override
public synchronized void asyncRead(ByteBuffer buffer, AsyncResultCallback callback) {
public synchronized void asyncRead(ByteBuffer buffer, AsyncResultCallback callback, Duration timeout) {
if (!buffer.hasRemaining())
throw new IllegalStateException("Buffer has no space left for reading");
if (readPending)
......
......@@ -3,6 +3,7 @@ package de.gwdg.cdstar.rest.utils;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.Base64;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -75,10 +76,47 @@ public class SessionHelper {
});
}
// Ensure that sessions to not timeout while we are using it.
if(session.getTimeout() > -1)
suspendSessionTimeout(ctx, session);
ctx.setAttribute("cdstar.session", session);
return session;
}
/**
* Prevent {@link CDStarSession} timeouts while this {@link RestContext} is
* alive. Enable this for requests that potentially take a long time to process.
*
* Other than using {@link CDStarSession#setTimeoutSuspended(boolean)} directly,
* this is thread-safe and reentrant. Multiple {@link RestContext} can suspend
* the same {@link CDStarSession} multiple times, and only the last closing
* {@link RestContext} will re-enable the session timeout again.
*/
private static void suspendSessionTimeout(RestContext ctx, CDStarSession session) {
AtomicInteger refcount;
/*
* Increment a counter for each suspend call on this session. Disable and
* remember the original timeout if we are the first.
*/
synchronized (session) {
refcount = (AtomicInteger) session.getContext().computeIfAbsent("suspendCounter", k -> new AtomicInteger());
if (refcount.getAndIncrement() == 0)
session.setTimeoutSuspended(true);
}
/**
* Register a close handler that decrements the counter and re-enables the
* timeout if it reaches zero again.
*/
ctx.runAfterRequest(() -> {
synchronized (session) {
if (refcount.decrementAndGet() == 0)
session.setTimeoutSuspended(false);
}
});
}
public static CDStarSession createDurableSession(RestContext ctx, boolean readOnly, Mode mode, Duration timeout) {
final Subject subject = getSubject(ctx);
final RuntimeContext runtime = ctx.getService(RuntimeContext.class);
......
......@@ -70,6 +70,11 @@ public class TransactionEndpoint implements RestBlueprint {
return null;
}
/**
* Get a specific session NOT USING the {@link SessionHelper}, as that would
* have unwanted side-effects (e.g. binding the session, refreshing or suspending
* session timeouts, ...)
*/
private CDStarSession getTxOr404(RestContext ctx) {
final String txid = ctx.getPathParam("tx");
if (txid == null || txid.isEmpty())
......
......@@ -153,7 +153,7 @@ public class ArchiveImporter {
public Promise<ArchiveUpdated> dispatch() {
Utils.assertTrue(ac == null, "Already started");
ac = ctx.startAsync();
ac.asyncRead(ac.getBuffer(), this::onRead, this::onError);
ac.asyncRead(ac.getBuffer(), this::onRead, this::onError, null);
return promise;
}
......
......@@ -75,7 +75,7 @@ public class ArchiveUpdater {
public CompletableFuture<ArchiveUpdated> dispatch() {
Utils.assertTrue(ac == null, "Already started");
ac = ctx.startAsync();
ac.asyncRead(ac.getBuffer(), this::onRead, this::onError);
ac.asyncRead(ac.getBuffer(), this::onRead, this::onError, null);
return future;