Gitlab Community Edition Instance

Commit 883feeba authored by mhellka's avatar mhellka
Browse files

tus: Improved tus.io protocoll support.

- Advertise creation-defer-length support
- Store upload metadata in separate file
- Pick up old uploads on restart
- Expire uploads 24h after creation
- Implement tus.io protocol more strictly
parent a01ae50d
Pipeline #291696 passed with stages
in 9 minutes and 31 seconds
package de.gwdg.cdstar.rest.ext.tus;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.time.Instant;
import java.util.Date;
import de.gwdg.cdstar.Promise;
import de.gwdg.cdstar.Utils;
import de.gwdg.cdstar.rest.api.Blueprint;
import de.gwdg.cdstar.rest.api.RestBlueprint;
......@@ -23,15 +19,15 @@ public class TusBlueprint implements RestBlueprint {
private static final String TUS_UPLOAD_METADATA = "Upload-Metadata";
private static final String TUS_UPLOAD_EXPIRES = "Upload-Expires";
private static final String TUS_UPLOAD_LENGTH = "Upload-Length";
private static final String TUS_UPLOAD_DEFER_LENGTH = "Upload-Defer-Length";
private static final String TUS_UPLOAD_OFFSET = "Upload-Offset";
private static final String TUS_EXTENSION = "Tus-Extension";
private static final String TUS_RESUMABLE = "Tus-Resumable";
private static final String TUS_EXTENSIONS = "creation,expiration,termination";
private static final String TUS_EXTENSIONS = "creation,creation-defer-length,expiration,termination";
private static final String TUS_VERSION = "1.0.0";
private final TusService chunkService;
public TusBlueprint(TusService service) {
chunkService = service;
}
......@@ -48,28 +44,6 @@ public class TusBlueprint implements RestBlueprint {
.HEAD(this::handleHead);
}
public Void handleCreate(RestContext ctx) {
checkTusHeader(ctx);
// TODO: Access control, DDOS protection
// TODO: Handle Upload-Defer-Length, creation-defer-length extension
final long length = getUploadLength(ctx);
final String meta = getUploadMetadata(ctx);
final TusFile chunk = chunkService.createChunk();
chunk.setLength(length);
chunk.setMeta(meta);
ctx.status(201);
ctx.header(TUS_RESUMABLE, TUS_VERSION);
ctx.header("Location", ctx.resolvePath(chunk.getName(), true));
if (chunk.getExpireMillis() > 0)
ctx.header(TUS_UPLOAD_EXPIRES, Date.from(Instant.ofEpochMilli(chunk.getExpireMillis())));
return null;
}
public Void handleOptions(RestContext ctx) throws IOException {
// The Client SHOULD NOT include the Tus-Resumable header in the request and the
// Server MUST ignore the header.
......@@ -81,130 +55,145 @@ public class TusBlueprint implements RestBlueprint {
return null;
}
public Void handleHead(RestContext ctx) throws IOException {
checkTusHeader(ctx);
final TusFile chunk = getChunk(ctx.getPathParam("chunk"));
public Void handleCreate(RestContext ctx) throws IOException {
checkTusRequestVersion(ctx);
ctx.status(200);
ctx.header("Cache-Control", "no-store");
// TODO: Access control, DDOS protection
long length = "1".equals(ctx.getHeader(TUS_UPLOAD_DEFER_LENGTH))
? -1
: requireLongHeader(ctx, TUS_UPLOAD_LENGTH);
var meta = ctx.getHeader(TUS_UPLOAD_METADATA);
TusUpload upload = chunkService.createNewUpload(length, meta);
ctx.status(201);
ctx.header(TUS_RESUMABLE, TUS_VERSION);
ctx.header(TUS_UPLOAD_OFFSET, chunk.getActualSize());
if (chunk.getLength() > -1)
ctx.header(TUS_UPLOAD_LENGTH, chunk.getLength());
if (chunk.getExpireMillis() > 0)
ctx.header(TUS_UPLOAD_EXPIRES, Date.from(Instant.ofEpochMilli(chunk.getExpireMillis())));
ctx.header(TUS_UPLOAD_EXPIRES, Date.from(upload.getExpiresAt()));
ctx.header("Location", ctx.resolvePath(upload.getName(), true));
return null;
}
public Void handleDelete(RestContext ctx) {
checkTusHeader(ctx);
final TusFile chunk = getChunk(ctx.getPathParam("chunk"));
if (!chunk.tryLock())
throw new ErrorResponse(409, "Conflict", "Failed to open chunk: Resource locked.");
public Void handleHead(RestContext ctx) throws IOException {
checkTusRequestVersion(ctx);
chunkService.removeChunk(chunk);
final TusUpload upload = getUpload(ctx.getPathParam("chunk"));
ctx.status(204);
ctx.header(TUS_RESUMABLE, TUS_VERSION);
ctx.status(200);
addResponseHeaders(ctx, upload);
return null;
}
public Void handlePatch(RestContext ctx) throws IOException {
checkTusHeader(ctx);
final TusFile chunk = getChunk(ctx.getPathParam("chunk"));
checkTusRequestVersion(ctx);
ctx.status(204);
ctx.header(TUS_RESUMABLE, TUS_VERSION);
if (chunk.getLength() > -1)
ctx.header(TUS_UPLOAD_LENGTH, chunk.getLength());
if (chunk.getExpireMillis() > 0)
ctx.header(TUS_UPLOAD_EXPIRES, Date.from(Instant.ofEpochMilli(chunk.getExpireMillis())));
final TusUpload upload = getUpload(ctx.getPathParam("chunk"));
if (!Utils.equal(ctx.getHeader("Content-Type"), "application/offset+octet-stream"))
throw new ErrorResponse(406, "TusError", "Content type MUST be 'application/offset+octet-stream'.");
final long offset = getUploadOffset(ctx);
final long length = getUploadLength(ctx);
if (!upload.tryLock())
throw new ErrorResponse(409, "Conflict", "Failed to open upload: Resource locked.");
if (!chunk.tryLock())
throw new ErrorResponse(409, "Conflict", "Failed to open chunk: Resource locked.");
try {
if (requireLongHeader(ctx, TUS_UPLOAD_OFFSET) != upload.getCurrentOffset())
throw new ErrorResponse(409, "TusError", "Upload-Offset header does not match current upload size.");
if (upload.isComplete())
throw new ErrorResponse(409, "TusError", "Upload complete.");
if (ctx.getHeader(TUS_UPLOAD_LENGTH) != null) {
long length = requireLongHeader(ctx, TUS_UPLOAD_LENGTH);
if (!upload.hasLength()) {
upload.setLength(length);
upload.persistInfo();
} else if (length != upload.getLength()) {
throw new ErrorResponse(400, "TusError", "Cannot change Upload-Length once it has been set.");
}
}
if (offset != chunk.getActualSize()) {
chunk.unlock();
throw new ErrorResponse(409, "TusError", "Upload-Offset header does not match current file size.");
}
// TODO: Limit upload size if target length is known
var channel = upload.getWriteChannel();
new AsyncUpload(ctx, channel).dispatch().whenComplete((totalUpload, err) -> {
Utils.closeQuietly(channel);
upload.unlock();
if (length > -1) {
if (chunk.getLength() == -1) {
chunk.setLength(length);
ctx.header(TUS_UPLOAD_LENGTH, chunk.getLength());
} else if (length != chunk.getLength()) {
chunk.unlock();
throw new ErrorResponse(400, "TusError", "Cannot change Upload-Length once it has a value.");
}
}
if (err != null) {
ctx.abort(err);
return;
}
try {
final FileChannel ch = FileChannel.open(chunk.getPath(), StandardOpenOption.CREATE,
StandardOpenOption.WRITE, StandardOpenOption.APPEND);
Promise.wrap(new AsyncUpload(ctx, ch).dispatch())
.then(r -> {
Utils.closeQuietly(ch);
chunk.unlock();
ctx.header(TUS_UPLOAD_OFFSET, chunk.getActualSize());
ctx.close();
}, err -> {
Utils.closeQuietly(ch);
chunk.unlock();
ctx.header(TUS_UPLOAD_OFFSET, chunk.getActualSize());
ctx.abort(err);
});
ctx.status(204);
addResponseHeaders(ctx, upload);
ctx.close();
});
return null;
} catch (final IOException e) {
chunk.unlock();
} catch (ErrorResponse | IOException e) {
upload.unlock();
throw e;
}
}
private TusFile getChunk(String id) {
public Void handleDelete(RestContext ctx) {
checkTusRequestVersion(ctx);
final TusUpload chunk = getUpload(ctx.getPathParam("chunk"));
if (!chunk.tryLock())
throw new ErrorResponse(409, "Conflict", "Failed to open upload: Resource locked.");
chunkService.removeUpload(chunk);
ctx.status(204);
ctx.header(TUS_RESUMABLE, TUS_VERSION);
return null;
}
private TusUpload getUpload(String id) {
return chunkService
.getChunk(id)
.orElseThrow(() -> new ErrorResponse(404, "TusError", "Chunk not found or expired"));
.getUpload(id)
.orElseThrow(() -> new ErrorResponse(404, "TusError", "Upload not found or expired"));
}
/**
* Add Cache-Control, {@value #TUS_RESUMABLE}, {@value #TUS_UPLOAD_OFFSET},
* {@value #TUS_UPLOAD_LENGTH}/{@value #TUS_UPLOAD_DEFER_LENGTH} and
* {@value #TUS_UPLOAD_EXPIRES} headers as required for HEAD and PATCH requests.
*
* @param ctx
* @param chunk
*/
private void addResponseHeaders(RestContext ctx, final TusUpload upload) {
ctx.header("Cache-Control", "no-store");
ctx.header(TUS_RESUMABLE, TUS_VERSION);
ctx.header(TUS_UPLOAD_OFFSET, upload.getCurrentOffset());
ctx.header(TUS_UPLOAD_EXPIRES, Date.from(upload.getExpiresAt()));
if (upload.hasLength())
ctx.header(TUS_UPLOAD_LENGTH, upload.getLength());
else
ctx.header(TUS_UPLOAD_DEFER_LENGTH, "1");
if (upload.getMeta() != null) // Not required for PATCH
ctx.header(TUS_UPLOAD_METADATA, upload.getMeta());
}
private void checkTusHeader(RestContext ctx) {
private void checkTusRequestVersion(RestContext ctx) {
if (!Utils.equal(ctx.getHeader(TUS_RESUMABLE), TUS_VERSION))
throw new ErrorResponse(400, "TusError", "Tus-Resumable header missing or wrong version.")
.detail("expected", TUS_VERSION);
}
private long getUploadLength(RestContext ctx) {
String lengthHeader = ctx.getHeader(TUS_UPLOAD_LENGTH);
if (lengthHeader == null)
return -1;
private long requireLongHeader(RestContext ctx, String name) {
try {
return Long.parseUnsignedLong(ctx.getHeader(TUS_UPLOAD_LENGTH));
return Long.parseUnsignedLong(ctx.getHeader(name));
} catch (final NumberFormatException e) {
throw new ErrorResponse(400, "TusError", "Upload-Length header present, but invalid.");
throw new ErrorResponse(400, "TusError", name + " header missing or invalid.");
}
}
private long getUploadOffset(RestContext ctx) {
try {
return Long.parseUnsignedLong(ctx.getHeader(TUS_UPLOAD_OFFSET));
} catch (final NumberFormatException e) {
throw new ErrorResponse(400, "TusError", "Upload-Offset header missing or invalid.");
}
}
private String getUploadMetadata(RestContext ctx) {
return ctx.getHeader(TUS_UPLOAD_METADATA);
}
}
\ No newline at end of file
package de.gwdg.cdstar.rest.ext.tus;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class TusFile {
private static final Logger log = LoggerFactory.getLogger(TusService.class);
private long length;
private long expire = -1;
private String meta;
private final String name;
private final Path path;
private boolean locked;
TusFile(String name, Path path) {
this(name, path, -1);
}
TusFile(String name, Path path, long length) {
this.name = name;
this.path = path;
this.length = length;
log.info("New chunk: {} -> {}", name, path);
}
public synchronized boolean tryLock() {
if (locked)
return false;
locked = true;
return locked;
}
public synchronized void unlock() {
locked = false;
}
Path getPath() {
return path;
}
public String getMeta() {
return meta;
}
public void setMeta(String meta) {
this.meta = meta;
}
public long getLength() {
return length;
}
public void setLength(long length) {
this.length = length;
}
public long getActualSize() {
try {
return Files.size(path);
} catch (final IOException e) {
return 0;
}
}
public void expireIn(Duration timeout) {
expire = Instant.now().plus(timeout).toEpochMilli();
}
public void expire() {
expire = 0;
}
public long getExpireMillis() {
return expire;
}
boolean isExpired(Instant now) {
return expire > -1 && expire < now.toEpochMilli();
}
boolean isExpired() {
return isExpired(Instant.now());
}
public String getName() {
return name;
}
}
\ No newline at end of file
package de.gwdg.cdstar.rest.ext.tus;
import java.util.concurrent.TimeUnit;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
......@@ -7,6 +9,7 @@ import de.gwdg.cdstar.rest.v3.async.ArchiveUpdater;
import de.gwdg.cdstar.runtime.Plugin;
import de.gwdg.cdstar.runtime.RuntimeContext;
import de.gwdg.cdstar.runtime.listener.RuntimeListener;
import de.gwdg.cdstar.runtime.services.CronService;
/**
* This plugin installs a TUS (tus.io) REST endpoint to create and write to
......@@ -33,24 +36,21 @@ public class TusPlugin implements RuntimeListener {
metrics.register(MetricRegistry.name(pluginName, "count"), new Gauge<Integer>() {
@Override
public Integer getValue() {
return Integer.valueOf(service.getChunkCount());
return Integer.valueOf(service.getUploadCount());
}
});
metrics.register(MetricRegistry.name(pluginName, "bytes"), new Gauge<Long>() {
@Override
public Long getValue() {
return Long.valueOf(service.getChunkTotalSize());
return Long.valueOf(service.getUploadTotalSize());
}
});
});
service.start();
}
@Override
public void onShutdown(RuntimeContext ctx) {
service.removeExpiredChunks();
ctx.lookupRequired(CronService.class)
.scheduleWithFixedDelay(service::cleanupExpiredUploads, 60, 60, TimeUnit.SECONDS);
}
}
......@@ -10,7 +10,6 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Stream;
import org.slf4j.Logger;
......@@ -18,106 +17,91 @@ import org.slf4j.LoggerFactory;
import de.gwdg.cdstar.Utils;
import de.gwdg.cdstar.rest.v3.async.ArchiveUpdater;
import de.gwdg.cdstar.runtime.Plugin;
/**
* Implements a TUD (tus.io) REST endpoint to create and write to anonymous
* Implements a TUS (tus.io) REST endpoint to create and write to anonymous
* temporary files, and a way for {@link ArchiveUpdater} to import from these
* files. All bundled as an optional plugin.
*/
@Plugin(name = "tus")
public class TusService {
public static final String pluginName = "tus-upload";
static final Logger log = LoggerFactory.getLogger(TusService.class);
private Duration defaultExpire = Duration.ofHours(24);
Duration defaultExpire = Duration.ofHours(24);
Path basePath;
Map<String, TusFile> chunks = new ConcurrentHashMap<>();
LongAdder totalSize = new LongAdder();
Map<String, TusUpload> uploads = new ConcurrentHashMap<>();
public TusService(Path basePath) {
this.basePath = basePath;
}
public void start() throws IOException {
Files.createDirectories(basePath);
try (Stream<Path> s = Files.list(basePath)) {
s.filter(this::isChunk).forEach(path -> {
final TusFile chunk = new TusFile(chunkNameFormPath(path), path);
chunk.expireIn(defaultExpire);
chunks.put(chunk.getName(), chunk);
log.info("Scanning {} for existing tus uploads", basePath);
try (Stream<Path> s = Files.list(Files.createDirectories(basePath))) {
s.filter(path -> path.getFileName().toString().endsWith(TusUpload.EXT_INFO)).forEach(infoFile -> {
try {
var tus = new TusUpload(this, infoFile);
log.info("Found TUS upload: {}", tus);
uploads.put(tus.getName(), tus);
} catch (IOException e) {
log.warn("Failed to load {}", infoFile, e);
}
});
}
}
private String chunkFileName(String name) {
return name + ".chunk";
}
private boolean isChunk(Path path) {
return path.getFileName().toString().endsWith(".chunk");
}
private String chunkNameFormPath(Path path) {
final String fname = path.getFileName().toString();
return fname.substring(0, fname.length() - ".chunk".length());
}
public void removeExpiredChunks() {
public void cleanupExpiredUploads() {
final Instant now = Instant.now();
// Collect (and lock) expired chunks in a thread-save way.
final List<TusFile> expired = new ArrayList<>();
chunks.values().forEach(c -> {
if (c.isExpired(now) && c.tryLock()) {
expired.add(c);
final List<TusUpload> expired = new ArrayList<>();
uploads.values().forEach(upload -> {
if (upload.isExpired(now) && upload.tryLock()) {
expired.add(upload);
}
});
// Remove all expired chunks
expired.forEach(this::removeChunk);
expired.forEach(this::removeUpload);
}
public TusFile createChunk() {
TusFile chunk;
// TODO: Only trigger this every X seconds/minutes
if (chunks.size() % 128 == 0) // poor mans rate limiting
removeExpiredChunks();
public TusUpload createNewUpload(long length, String meta) throws IOException {
TusUpload upload;
do {
final String id = Utils.bytesToHex(Utils.randomBytes(16));
final Path path = basePath.resolve(chunkFileName(id));
chunk = new TusFile(id, path);
chunk.expireIn(defaultExpire);
} while (chunks.putIfAbsent(chunk.getName(), chunk) != null);
return chunk;
upload = new TusUpload(this);
if (length >= 0)
upload.setLength(length);
if (meta != null)
upload.setMeta(meta);
} while (uploads.putIfAbsent(upload.getName(), upload) != null);
upload.persistInfo();
return upload;
}
public Optional<TusFile> getChunk(String name) {
// TODO: Load missing chunks from disk. Make the in-memory chunks map a cache
final TusFile chunk = chunks.get(name);
if (chunk == null || chunk.isExpired())
public Optional<TusUpload> getUpload(String name) {
final TusUpload upload = uploads.get(name);