Gitlab Community Edition Instance

Commit 15854854 authored by mhellka's avatar mhellka
Browse files

Prevent idle timeout during large archive update fetch operations.

parent 883feeba
......@@ -750,7 +750,7 @@ public class Utils {
private static final DateTimeFormatter ISO8601 = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSX");
private static final DateTimeFormatter ISO8601_buggy = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSSX");
public static final ZoneId GMT = ZoneId.of("GMT");
public static String toIsoDate(Instant date) {
......@@ -1007,10 +1007,19 @@ public class Utils {
}
}
public static <T, E extends Exception> T wrapError(FailableSupplier<T, E> cb) {
try {
return cb.supply();
} catch (final RuntimeException e) {
throw e;
} catch (final Exception e) {
throw new RuntimeException("Wrapped exception", e);
}
}
public static <T> void forEach(T[] array, Consumer<T> callback) {
for (final T t : array)
callback.accept(t);
}
/**
......
package de.gwdg.cdstar.rest.ext.tus;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import com.codahale.metrics.Gauge;
......@@ -10,6 +11,8 @@ import de.gwdg.cdstar.runtime.Plugin;
import de.gwdg.cdstar.runtime.RuntimeContext;
import de.gwdg.cdstar.runtime.listener.RuntimeListener;
import de.gwdg.cdstar.runtime.services.CronService;
import de.gwdg.cdstar.runtime.services.FeatureRegistry;
import de.gwdg.cdstar.runtime.services.PoolService;
/**
* This plugin installs a TUS (tus.io) REST endpoint to create and write to
......@@ -48,7 +51,11 @@ public class TusPlugin implements RuntimeListener {
});
});
service.start();
service.start(ctx.lookupRequired(PoolService.class).getNamedPool("tus"));
ctx.lookup(FeatureRegistry.class)
.ifPresent(fr -> fr.addFeature("tus", "1.0"));
ctx.lookupRequired(CronService.class)
.scheduleWithFixedDelay(service::cleanupExpiredUploads, 60, 60, TimeUnit.SECONDS);
}
......
......@@ -10,6 +10,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.stream.Stream;
import org.slf4j.Logger;
......@@ -31,12 +32,15 @@ public class TusService {
Path basePath;
Map<String, TusUpload> uploads = new ConcurrentHashMap<>();
private Executor pool;
public TusService(Path basePath) {
this.basePath = basePath;
}
public void start() throws IOException {
public void start(Executor pool) throws IOException {
this.pool = pool;
log.info("Scanning {} for existing tus uploads", basePath);
try (Stream<Path> s = Files.list(Files.createDirectories(basePath))) {
......@@ -104,4 +108,8 @@ public class TusService {
.sum();
}
public Executor getPool() {
return pool;
}
}
......@@ -9,7 +9,6 @@ import java.util.concurrent.CompletableFuture;
import de.gwdg.cdstar.Utils;
import de.gwdg.cdstar.rest.v3.async.UrlFetchService;
import de.gwdg.cdstar.runtime.client.CDStarFile;
/**
* An {@link UrlFetchService} to allow other components (e.g. the archive update
......@@ -17,6 +16,7 @@ import de.gwdg.cdstar.runtime.client.CDStarFile;
*/
public class TusUrlFetch implements UrlFetchService {
private static final int MAX_COPY_CHUNK = 1024 * 1024 * 128;
private final TusService tus;
private final String scheme;
......@@ -31,36 +31,41 @@ public class TusUrlFetch implements UrlFetchService {
}
@Override
public CompletableFuture<Void> fetchInto(URI uri, CDStarFile target) {
public CompletableFuture<Void> fetchInto(URI uri, WritableByteChannel target, Runnable onProgress) {
final CompletableFuture<Void> cf = new CompletableFuture<>();
try {
final TusUpload upload = tus.getUpload(uri.getSchemeSpecificPart())
.orElseThrow(() -> new IOException("Upload not found"));
if (!upload.tryLock())
throw new IllegalStateException("TUS upload is locked");
tus.getPool().execute(() -> {
try {
if (!upload.isComplete())
throw new IllegalStateException("TUS upload is incomplete");
final TusUpload upload = tus.getUpload(uri.getSchemeSpecificPart())
.orElseThrow(() -> new IOException("Upload not found"));
if (!upload.tryLock())
throw new IllegalStateException("TUS upload is locked");
try (FileChannel src = upload.getReadChannel();
WritableByteChannel dst = target.getWriteChannel()) {
long copied = 0;
long total = upload.getLength();
while (copied < total)
copied += src.transferTo(copied, total - copied, dst);
try {
if (!upload.isComplete())
throw new IllegalStateException("TUS upload is incomplete");
try (FileChannel src = upload.getReadChannel(); target) {
long copied = 0;
long total = upload.getLength();
while (copied < total) {
if (cf.isCompletedExceptionally())
break; // job cancelled
var maxBytes = Math.min(MAX_COPY_CHUNK, total - copied);
copied += src.transferTo(copied, maxBytes, target);
onProgress.run();
}
}
} finally {
upload.unlock();
}
} finally {
upload.unlock();
cf.complete(null);
} catch (final Exception e) {
cf.completeExceptionally(e);
}
cf.complete(null);
} catch (final IOException e) {
cf.completeExceptionally(e);
}
});
return cf;
}
......
......@@ -6,12 +6,14 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Executor;
import java.util.function.IntConsumer;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import de.gwdg.cdstar.rest.RestConfigImpl;
import de.gwdg.cdstar.rest.testutils.TestClient;
......@@ -32,6 +34,7 @@ public class TusServiceTest {
public void setUp() throws IOException {
rest = new RestConfigImpl(null);
tus = new TusService(tmp.newFolder().toPath());
tus.start(Mockito.mock(Executor.class));
tusApi = new TusBlueprint(tus);
rest.install(tusApi);
client = new TestClient(rest);
......
package de.gwdg.cdstar.rest.ext.tus;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.Channels;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import de.gwdg.cdstar.rest.RestConfigImpl;
public class TusUrlFetchTest {
@Rule
public TemporaryFolder tmp = new TemporaryFolder();
private TusService tus;
private RestConfigImpl rest;
@Before
public void setUp() throws IOException {
rest = new RestConfigImpl(null);
tus = new TusService(tmp.newFolder().toPath());
var mockPool = Mockito.mock(Executor.class);
Mockito.doAnswer(invocation -> {
((Runnable)invocation.getArguments()[0]).run();
return null;
}).when(mockPool).execute(Mockito.any(Runnable.class));
tus.start(mockPool);
}
@Test
public void test() throws IOException, URISyntaxException, InterruptedException, ExecutionException {
var upload = tus.createNewUpload(1024, null);
assertTrue(upload.tryLock());
try(var writeChannel = upload.getWriteChannel()) {
Channels.newOutputStream(writeChannel).write(new byte[1024]);
}
upload.unlock();
var fetchService = new TusUrlFetch(tus, "tus");
assertTrue(fetchService.canHandle(new URI("tus", upload.getName(), null)));
assertFalse(fetchService.canHandle(new URI("notus", upload.getName(), null)));
var targetFile = tmp.newFile();
var chan = Files.newByteChannel(targetFile.toPath(), StandardOpenOption.WRITE);
var cf = fetchService.fetchInto(new URI("tus", upload.getName(), null), chan, () -> {});
cf.get();
assertFalse(chan.isOpen());
assertEquals(upload.getLength(), Files.size(targetFile.toPath()));
}
}
......@@ -12,6 +12,7 @@ import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import org.slf4j.Logger;
......@@ -59,7 +60,6 @@ public class ArchiveUpdater {
protected final FormParser parser;
protected final Deque<FormPart> parts = new ArrayDeque<>();
protected final RestContext ctx;
private boolean replaceExisting;
CompletableFuture<ArchiveUpdated> future;
private ArchiveOrSnapshot target;
......@@ -80,11 +80,6 @@ public class ArchiveUpdater {
return future;
}
public ArchiveUpdater setReplaceExisting(boolean replaceExisting) {
this.replaceExisting = replaceExisting;
return this;
}
/**
* Called by {@link #start()} once, or {@link #process()} if it runs out of
* parts to process. Triggers {@link #process()} if new parts are available, or
......@@ -143,6 +138,8 @@ public class ArchiveUpdater {
}
private void onError(Throwable t) {
while(t instanceof CompletionException)
t = t.getCause();
future.completeExceptionally(t);
}
......@@ -207,21 +204,7 @@ public class ArchiveUpdater {
}
private CDStarFile getFileForWriting(String fileName) throws FileExists, InvalidFileName {
Objects.requireNonNull(fileName);
CDStarArchive archive = assumeArchive();
if (replaceExisting && archive.hasFile(fileName)) {
try {
final CDStarFile file = archive.getFile(fileName);
if (file.getSize() > 0)
file.truncate(0);
return file;
} catch (final IOException e) {
throw Utils.wtf(e);
}
}
return archive.createFile(fileName);
return assumeArchive().createFile(Objects.requireNonNull(fileName));
}
private CompletableFuture<Void> handleTextCommand(FormPart part) {
......@@ -317,7 +300,6 @@ public class ArchiveUpdater {
target.setMediaType(source.getMediaType(), source.getContentEncoding());
try {
target.truncate(0);
target.transferFrom(source);
} catch (IOException e) {
throw new ErrorResponse(500, "CopyFailed", "File copy operation failed");
......@@ -340,7 +322,6 @@ public class ArchiveUpdater {
target.setMediaType(source.getMediaType(), source.getContentEncoding());
try {
target.truncate(0);
target.transferFrom(source);
} catch (IOException e) {
throw new ErrorResponse(500, "CloneFailed", "File clone operation failed");
......@@ -359,17 +340,10 @@ public class ArchiveUpdater {
private CompletableFuture<Void> handleMove(String fileName, String fieldValue)
throws FileNotFound, FileExists, InvalidFileName {
CDStarArchive archive = assumeArchive();
final CDStarFile source = archive.getFile(fieldValue);
if (replaceExisting && archive.hasFile(fileName)) {
final CDStarFile remove = archive.getFile(fileName);
report.rememberFile(remove);
remove.remove();
}
report.rememberFile(source);
var source = assumeArchive().getFile(fieldValue);
source.setName(fileName);
report.rememberFile(source);
return CompletableFuture.completedFuture(null);
}
......@@ -410,13 +384,18 @@ public class ArchiveUpdater {
throw ApiErrors.notImplemented("Fetch operation not available for the given URI.");
final CDStarFile target = getFileForWriting(fileName);
return fetchHandler.fetchInto(uri, target).handle((res, err) -> {
if (err != null)
throw new ErrorResponse(500, "FetchFailed", "Fetch operation failed.").detail("error",
err.getMessage());
report.rememberFile(target);
return res;
});
final WritableByteChannel writeChannel = Utils.wrapError(target::getWriteChannel);
return fetchHandler.fetchInto(uri, writeChannel, () -> ac.keepAlive())
.handle((res, err) -> {
if (err != null)
throw new ErrorResponse(500, "FetchFailed", "Fetch operation failed.")
.detail("error", err.getMessage());
Utils.closeQuietly(writeChannel); // Keep open on errors
report.rememberFile(target);
return res;
});
}
private CompletableFuture<Void> handleType(String fileName, String fieldValue) throws FileNotFound {
......
package de.gwdg.cdstar.rest.v3.async;
import java.net.URI;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.CompletableFuture;
import de.gwdg.cdstar.runtime.client.CDStarFile;
public interface UrlFetchService {
boolean canHandle(URI uri);
CompletableFuture<Void> fetchInto(URI uri, CDStarFile target);
/**
* Fetch a given {@link URI} into an existing {@link WritableByteChannel} file. The
* onProgress runnable should be triggered in reasonable intervals to allow the
* caller to monitor progress or prevent timeouts. Exceptions thrown from the
* progress runnable or closing the returned feature should abort the fetch
* operation.
*/
CompletableFuture<Void> fetchInto(URI uri, WritableByteChannel target, Runnable onProgress);
}
......@@ -6,21 +6,20 @@ import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.core.Form;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response.Status;
import org.glassfish.jersey.media.multipart.FormDataMultiPart;
import org.glassfish.jersey.media.multipart.file.StreamDataBodyPart;
import org.junit.Before;
......@@ -29,10 +28,16 @@ import org.junit.Test;
import de.gwdg.cdstar.MimeUtils;
import de.gwdg.cdstar.Utils;
import de.gwdg.cdstar.rest.BaseRestTest;
import de.gwdg.cdstar.rest.v3.async.UrlFetchService;
import de.gwdg.cdstar.runtime.CDStarRuntime;
import de.gwdg.cdstar.runtime.lts.LTSConfig;
import de.gwdg.cdstar.runtime.lts.bagit.BagitTarget;
import de.gwdg.cdstar.runtime.profiles.ProfileRegistry;
import de.gwdg.cdstar.web.common.model.ErrorResponse;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.core.Form;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response.Status;
public class ArchiveUpdateTest extends BaseRestTest {
......@@ -46,6 +51,27 @@ public class ArchiveUpdateTest extends BaseRestTest {
@Override
public void initRuntime(CDStarRuntime runtime) throws Exception {
runtime.register(new BagitTarget("zip", runtime.getServiceDir("zip")));
runtime.register(new UrlFetchService() {
@Override
public boolean canHandle(URI uri) {
return "base64".equals(uri.getScheme());
}
@Override
public CompletableFuture<Void> fetchInto(URI uri, WritableByteChannel target, Runnable onProgress) {
try {
onProgress.run();
target.write(ByteBuffer.wrap(Utils.base64decode(uri.getSchemeSpecificPart())));
return CompletableFuture.completedFuture((Void) null);
} catch (IllegalArgumentException e) {
return CompletableFuture.failedFuture(new ErrorResponse(400, "BadFetchData", "Fetch data URL invalid"));
} catch (Exception e) {
return CompletableFuture.failedFuture(e);
}
}
});
}
@Before
......@@ -315,6 +341,31 @@ public class ArchiveUpdateTest extends BaseRestTest {
assertTrue(getJson("files.1.meta").path("dc:title").isMissingNode());
}
@Test
public void testFetch() {
final String id = create();
var payload = "test";
field("fetch:/target.txt", "base64:" + Utils.base64encode(payload));
update(id);
load(id);
assertEquals("target.txt", getJsonString("files.0.name"));
assertEquals(Utils.sha256("test"), getJsonString("files.0.digests.sha256"));
}
@Test
public void testFetchError() {
final String id = create();
field("fetch:/target.txt", "http://foo");
updateNoCheck(id);
assertError(501, "NotImplemented");
field("fetch:/target.txt", "base64:bad.input");
updateNoCheck(id);
assertError(500, "FetchFailed");
}
@Test
public void testClone() {
file("example.txt");
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment