Gitlab Community Edition Instance

Commit 7e67c7d2 authored by mhellka's avatar mhellka
Browse files

Make UrlFetchService (e.g. TUS) fully asynchronous.

parent 759bbb00
......@@ -2,6 +2,7 @@ package de.gwdg.cdstar.rest.ext.tus;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
......@@ -175,12 +176,11 @@ class TusUpload {
};
}
public synchronized FileChannel getReadChannel() throws IOException {
public synchronized AsynchronousFileChannel getReadChannel() throws IOException {
if (writeChannel != null && writeChannel.isOpen())
throw new IllegalStateException("Write in progress");
if (!locked)
throw new IllegalStateException("Upload not locked");
return FileChannel.open(getChunkPath(), StandardOpenOption.READ);
return AsynchronousFileChannel.open(getChunkPath(), StandardOpenOption.READ);
}
}
\ No newline at end of file
......@@ -2,10 +2,11 @@ package de.gwdg.cdstar.rest.ext.tus;
import java.io.IOException;
import java.net.URI;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.channels.CompletionHandler;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import de.gwdg.cdstar.Utils;
import de.gwdg.cdstar.rest.v3.async.UrlFetchService;
......@@ -16,7 +17,6 @@ import de.gwdg.cdstar.rest.v3.async.UrlFetchService;
*/
public class TusUrlFetch implements UrlFetchService {
private static final int MAX_COPY_CHUNK = 1024 * 1024 * 128;
private final TusService tus;
private final String scheme;
......@@ -31,42 +31,59 @@ public class TusUrlFetch implements UrlFetchService {
}
@Override
public CompletableFuture<Void> fetchInto(URI uri, WritableByteChannel target, Runnable onProgress) {
final CompletableFuture<Void> cf = new CompletableFuture<>();
public FetchHandle resolve(URI uri) throws IOException {
tus.getPool().execute(() -> {
try {
final TusUpload upload = tus.getUpload(uri.getSchemeSpecificPart())
.orElseThrow(() -> new IOException("Upload not found"));
var upload = tus.getUpload(uri.getSchemeSpecificPart())
.orElseThrow(() -> new IOException("TUS Upload not found"));
if (!upload.tryLock())
throw new IllegalStateException("TUS upload is locked");
if (!upload.tryLock())
throw new IOException("TUS upload locked");
if (!upload.isComplete())
throw new IOException("TUS upload not completed");
return new FetchHandle() {
private AsynchronousFileChannel readChannel;
boolean closed;
long position = 0;
@Override
public long size() {
return upload.getLength();
}
@Override
public void read(ByteBuffer dst, BiConsumer<Integer, Throwable> handler) {
try {
if (!upload.isComplete())
throw new IllegalStateException("TUS upload is incomplete");
try (FileChannel src = upload.getReadChannel(); target) {
long copied = 0;
long total = upload.getLength();
while (copied < total) {
if (cf.isCompletedExceptionally())
break; // job cancelled
var maxBytes = Math.min(MAX_COPY_CHUNK, total - copied);
copied += src.transferTo(copied, maxBytes, target);
onProgress.run();
if (readChannel == null)
readChannel = upload.getReadChannel();
readChannel.read(dst, position, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
if (result > 0)
position += result;
handler.accept(result, null);
}
}
} finally {
upload.unlock();
@Override
public void failed(Throwable exc, Void attachment) {
handler.accept(null, exc);
}
});
} catch (Exception e) {
handler.accept(null, e);
}
}
cf.complete(null);
} catch (final Exception e) {
cf.completeExceptionally(e);
@Override
public synchronized void close() {
if (!closed) {
upload.unlock();
Utils.closeQuietly(readChannel);
closed = true;
}
}
});
};
return cf;
}
}
\ No newline at end of file
......@@ -7,11 +7,14 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import org.junit.Before;
import org.junit.Rule;
......@@ -36,18 +39,19 @@ public class TusUrlFetchTest {
var mockPool = Mockito.mock(Executor.class);
Mockito.doAnswer(invocation -> {
((Runnable)invocation.getArguments()[0]).run();
return null;
((Runnable) invocation.getArguments()[0]).run();
return null;
}).when(mockPool).execute(Mockito.any(Runnable.class));
tus.start(mockPool);
}
@Test
public void test() throws IOException, URISyntaxException, InterruptedException, ExecutionException {
public void test()
throws IOException, URISyntaxException, InterruptedException, ExecutionException, TimeoutException {
var upload = tus.createNewUpload(1024, null);
assertTrue(upload.tryLock());
try(var writeChannel = upload.getWriteChannel()) {
try (var writeChannel = upload.getWriteChannel()) {
Channels.newOutputStream(writeChannel).write(new byte[1024]);
}
upload.unlock();
......@@ -57,13 +61,23 @@ public class TusUrlFetchTest {
assertTrue(fetchService.canHandle(new URI("tus", upload.getName(), null)));
assertFalse(fetchService.canHandle(new URI("notus", upload.getName(), null)));
var targetFile = tmp.newFile();
var chan = Files.newByteChannel(targetFile.toPath(), StandardOpenOption.WRITE);
var cf = fetchService.fetchInto(new URI("tus", upload.getName(), null), chan, () -> {});
cf.get();
assertFalse(chan.isOpen());
assertEquals(upload.getLength(), Files.size(targetFile.toPath()));
var fetch = fetchService.resolve(new URI("tus", upload.getName(), null));
assertEquals(upload.getLength(), fetch.size());
var cf = new CompletableFuture<Void>();
var buf = ByteBuffer.allocate(1024);
fetch.read(buf, new BiConsumer<Integer, Throwable>() {
@Override
public void accept(Integer r, Throwable e) {
if (e != null)
cf.completeExceptionally(e);
else if (r >= 0 && buf.remaining() > 0) {
fetch.read(buf, this);
} else
cf.complete(null);
};
});
cf.get(10, TimeUnit.SECONDS);
}
}
......@@ -11,8 +11,10 @@ import java.util.Deque;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
......@@ -25,6 +27,7 @@ import de.gwdg.cdstar.rest.api.RestContext;
import de.gwdg.cdstar.rest.utils.form.FormParser;
import de.gwdg.cdstar.rest.utils.form.FormParserException;
import de.gwdg.cdstar.rest.utils.form.FormPart;
import de.gwdg.cdstar.rest.v3.async.UrlFetchService.FetchHandle;
import de.gwdg.cdstar.rest.v3.errors.ApiErrors;
import de.gwdg.cdstar.runtime.RuntimeContext;
import de.gwdg.cdstar.runtime.client.CDStarACLEntry;
......@@ -138,7 +141,7 @@ public class ArchiveUpdater {
}
private void onError(Throwable t) {
while(t instanceof CompletionException)
while (t instanceof CompletionException)
t = t.getCause();
future.completeExceptionally(t);
}
......@@ -369,33 +372,97 @@ public class ArchiveUpdater {
try {
uri = new URI(fieldValue);
} catch (final URISyntaxException e) {
throw ApiErrors.badRequest("Not an URI.");
throw ApiErrors.badRequest("Bad fetch URI.");
}
UrlFetchService fetchHandler = null;
for (final UrlFetchService ads : ctx.getConfig().lookup(RuntimeContext.class)
.lookupAll(UrlFetchService.class)) {
if (ads.canHandle(uri)) {
fetchHandler = ads;
break;
}
UrlFetchService fetchHandler = ctx.getConfig()
.lookup(RuntimeContext.class)
.lookupAll(UrlFetchService.class)
.stream()
.filter(c -> c.canHandle(uri))
.findFirst()
.orElseThrow(() -> ApiErrors.notImplemented("Fetch handler not found for the given URI."));
FetchHandle fetch;
try {
fetch = fetchHandler.resolve(uri);
} catch (Exception e) {
throw new ErrorResponse(500, "FetchFailed", "Fetch operation failed.")
.detail("error", e.getMessage());
}
if (fetchHandler == null)
throw ApiErrors.notImplemented("Fetch operation not available for the given URI.");
final CDStarFile target = getFileForWriting(fileName);
final WritableByteChannel writeChannel = Utils.wrapError(target::getWriteChannel);
return fetchHandler.fetchInto(uri, writeChannel, () -> ac.keepAlive())
.handle((res, err) -> {
if (err != null)
throw new ErrorResponse(500, "FetchFailed", "Fetch operation failed.")
.detail("error", err.getMessage());
Utils.closeQuietly(writeChannel); // Keep open on errors
report.rememberFile(target);
return res;
});
CDStarFile target;
WritableByteChannel writeChannel;
try {
target = getFileForWriting(fileName);
writeChannel = Utils.wrapError(target::getWriteChannel);
} catch (Exception e) {
Utils.closeQuietly(fetch);
// Keep writeChannel open on errors, so it triggers a rollback later!
throw e;
}
var buf = ac.getBuffer();
var cf = new CompletableFuture<Void>();
var fetchReadHandler = new BiConsumer<Integer, Throwable>() {
long bytesFetched = 0;
@Override
public void accept(Integer result, Throwable err) {
if (err != null) {
cf.completeExceptionally(err);
close();
return;
}
if (cf.isDone()) {
close();
return;
}
try {
buf.flip();
bytesFetched += buf.remaining();
while (buf.remaining() > 0)
writeChannel.write(buf);
if (bytesFetched < fetch.size()) {
ac.keepAlive();
buf.clear();
fetch.read(buf, this);
} else {
cf.complete(null);
}
} catch (Exception e) {
cf.completeExceptionally(e);
close();
}
}
private void close() {
ac.recycleBuffer(buf);
Utils.closeQuietly(fetch);
}
};
try {
fetch.read(buf, fetchReadHandler);
} catch (Exception e) {
cf.completeExceptionally(e);
}
return cf.handle((res, err) -> {
if (err instanceof CancellationException)
throw new ErrorResponse(500, "FetchFailed", "Fetch operation canceled.");
if (err != null) {
log.warn("Fetch operation failed", err);
throw new ErrorResponse(500, "FetchFailed", "Fetch operation failed.")
.detail("error", err.getMessage());
}
Utils.closeQuietly(writeChannel); // Keep open on errors
report.rememberFile(target);
return res;
});
}
private CompletableFuture<Void> handleType(String fileName, String fieldValue) throws FileNotFound {
......
package de.gwdg.cdstar.rest.v3.async;
import java.io.IOException;
import java.net.URI;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.CompletableFuture;
import java.nio.ByteBuffer;
import java.util.function.BiConsumer;
public interface UrlFetchService {
boolean canHandle(URI uri);
/**
* Fetch a given {@link URI} into an existing {@link WritableByteChannel} file. The
* onProgress runnable should be triggered in reasonable intervals to allow the
* caller to monitor progress or prevent timeouts. Exceptions thrown from the
* progress runnable or closing the returned feature should abort the fetch
* operation.
*/
CompletableFuture<Void> fetchInto(URI uri, WritableByteChannel target, Runnable onProgress);
FetchHandle resolve(URI uri) throws IOException;
interface FetchHandle extends AutoCloseable {
long size();
/**
* Read some data into a {@link ByteBuffer}.
*/
void read(ByteBuffer dst, BiConsumer<Integer, Throwable> handler);
@Override
void close();
}
}
......@@ -9,15 +9,14 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BooleanSupplier;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
......@@ -33,7 +32,6 @@ import de.gwdg.cdstar.runtime.CDStarRuntime;
import de.gwdg.cdstar.runtime.lts.LTSConfig;
import de.gwdg.cdstar.runtime.lts.bagit.BagitTarget;
import de.gwdg.cdstar.runtime.profiles.ProfileRegistry;
import de.gwdg.cdstar.web.common.model.ErrorResponse;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.core.Form;
import jakarta.ws.rs.core.MediaType;
......@@ -59,16 +57,31 @@ public class ArchiveUpdateTest extends BaseRestTest {
}
@Override
public CompletableFuture<Void> fetchInto(URI uri, WritableByteChannel target, Runnable onProgress) {
try {
onProgress.run();
target.write(ByteBuffer.wrap(Utils.base64decode(uri.getSchemeSpecificPart())));
return CompletableFuture.completedFuture((Void) null);
} catch (IllegalArgumentException e) {
return CompletableFuture.failedFuture(new ErrorResponse(400, "BadFetchData", "Fetch data URL invalid"));
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
public FetchHandle resolve(URI uri) {
ByteBuffer data = ByteBuffer.wrap(Utils.base64decode(uri.getSchemeSpecificPart()));
return new FetchHandle() {
@Override
public long size() {
return data.limit();
}
@Override
public void read(ByteBuffer dst, BiConsumer<Integer, Throwable> handler) {
var available = data.remaining();
if(available == 0)
handler.accept(-1, null);
else {
dst.put(data);
handler.accept(available- data.remaining(), null);
}
}
@Override
public void close() {
data.position(data.limit());
}
};
}
});
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment