Gitlab Community Edition Instance

Commit 8a8e4a7e authored by mhellka's avatar mhellka
Browse files

Replaced apache http(async)client with HttpClient

parent 52d7d9a9
package de.gwdg.cdstar;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.function.Function;
public class UriBuilder {
private String scheme;
private String auth;
private String host;
private int port;
private StringBuilder path = new StringBuilder();
private StringBuilder query = new StringBuilder();
private String fragment;
public UriBuilder() {
}
public UriBuilder(URI base) {
scheme = base.getScheme();
auth = base.getRawUserInfo();
host = base.getHost();
port = base.getPort();
if(base.getRawPath() != null)
path.append(base.getRawPath());
if(base.getRawQuery() != null)
query.append(base.getRawQuery());
fragment = base.getRawFragment();
}
public UriBuilder scheme(String scheme) {
this.scheme = scheme;
return this;
}
public UriBuilder auth(String auth) {
this.auth = auth;
return this;
}
public UriBuilder host(String host) {
this.host = host;
return this;
}
public UriBuilder port(int port) {
this.port = port;
return this;
}
public UriBuilder path(String ... parts) {
if(parts.length == 0)
path.setLength(0);
for(var part: parts)
path.append("/").append(URLEncoder.encode(part, StandardCharsets.UTF_8).replace("+", "%20"));
return this;
}
public UriBuilder query() {
this.query.setLength(0);
return this;
}
public UriBuilder query(String key, String value) {
if(query.length() > 0)
query.append("&");
query.append(URLEncoder.encode(key, StandardCharsets.UTF_8));
if(value != null)
query.append("=").append(URLEncoder.encode(key, StandardCharsets.UTF_8));
return this;
}
public UriBuilder fragment(String fragment) {
this.fragment = fragment;
return this;
}
public URI build() {
try {
return new URI(scheme, auth, host, port, path.toString(), query.toString(), fragment);
} catch (URISyntaxException e) {
throw new IllegalStateException("Cannot build URI", e);
}
}
public <T> T map(Function<URI, T> mapping) {
return mapping.apply(build());
}
}
......@@ -19,15 +19,5 @@
<groupId>de.gwdg.cdstar</groupId>
<artifactId>cdstar-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpasyncclient</artifactId>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>
\ No newline at end of file
package de.gwdg.cdstar.ext.elastic;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.http.client.methods.HttpPost;
import com.fasterxml.jackson.databind.node.ObjectNode;
import de.gwdg.cdstar.NullTerminatedInterable;
import de.gwdg.cdstar.Promise;
import de.gwdg.cdstar.ext.elastic.BulkRequest.ESBulkResult;
......@@ -81,19 +78,17 @@ public class BulkRequest implements ElasticSearchClient.ESPreparedRequest<ESBulk
if (bulks.isEmpty())
return Promise.ofError(new IllegalStateException("Empty bulk request"));
// Produce a destructive iterator so that BulkEntries can be
// garbage-collected as soon as they are sent (as long as no reference
// to the full BulkRequest is hold).
final ArrayDeque<Object> partsDeque = new ArrayDeque<>();
for (final BulkEntry p : bulks) {
partsDeque.add(p.command);
if (p.data != null)
partsDeque.add(p.data);
}
final Iterable<Object> partsIterator = new NullTerminatedInterable<>(partsDeque::pollFirst);
final HttpPost rq = new HttpPost(esClient.uri("_bulk"));
return esClient.sendBulk(rq, partsIterator).map(r -> new ESBulkResult(r));
Collection<Object> copy = new ArrayList<>(bulks.size() * 2);
bulks.forEach(item -> {
copy.add(item.command);
if (item.data != null)
copy.add(item.data);
});
bulks.clear(); // Allow GC
return esClient
.sendBulk(esClient.buildRequest("_bulk"), copy)
.map(r -> new ESBulkResult(r));
}
public class BulkEntry {
......
package de.gwdg.cdstar.ext.elastic;
import org.apache.http.client.methods.HttpPut;
import com.fasterxml.jackson.databind.node.ObjectNode;
import de.gwdg.cdstar.Promise;
......@@ -21,8 +19,7 @@ public class CreateIndexRequest implements ElasticSearchClient.ESPreparedRequest
@Override
public Promise<CereateIndexResult> submit() {
final HttpPut rq = new HttpPut(esClient.uri(index));
return esClient.send(rq, payload).map(r -> new CereateIndexResult(r));
return esClient.put(esClient.buildRequest(index), payload).map(r -> new CereateIndexResult(r));
}
public class CereateIndexResult {
......
package de.gwdg.cdstar.ext.elastic;
import org.apache.http.client.methods.HttpPost;
import com.fasterxml.jackson.databind.node.ObjectNode;
import de.gwdg.cdstar.Promise;
......@@ -25,8 +23,8 @@ public class DeleteByQueryRequest implements ElasticSearchClient.ESPreparedReque
@Override
public Promise<DeleteByQueryResult> submit() {
final HttpPost rq = new HttpPost(esClient.uri(index, "_delete_by_query"));
return esClient.send(rq, query).map(r -> new DeleteByQueryResult(r));
return esClient.post(esClient.buildRequest(index, "_delete_by_query"), query)
.map(r -> new DeleteByQueryResult(r));
}
public class DeleteByQueryResult {
......
package de.gwdg.cdstar.ext.elastic;
import org.apache.http.client.methods.HttpDelete;
import de.gwdg.cdstar.Promise;
import de.gwdg.cdstar.ext.elastic.DeleteIndexRequest.DeleteIndexResult;
......@@ -17,15 +15,15 @@ public class DeleteIndexRequest implements ElasticSearchClient.ESPreparedRequest
@Override
public Promise<DeleteIndexResult> submit() {
final HttpDelete rq = new HttpDelete(esClient.uri(index));
return esClient.send(rq).map(r -> new DeleteIndexResult(), e -> {
if (e instanceof ESErrorResponse
&& ((ESErrorResponse) e).getType().equals(ESErrorResponse.TYPE_INDEX_NOT_FOUND)) {
// Deleting a non-existing index is always a success (?)
return new DeleteIndexResult();
}
throw e;
});
return esClient.send(esClient.buildRequest(index).DELETE())
.map(r -> new DeleteIndexResult(), e -> {
if (e instanceof ESErrorResponse
&& ((ESErrorResponse) e).getType().equals(ESErrorResponse.TYPE_INDEX_NOT_FOUND)) {
// Deleting a non-existing index is always a success (?)
return new DeleteIndexResult();
}
throw e;
});
}
......
package de.gwdg.cdstar.ext.elastic;
import org.apache.http.client.methods.HttpDelete;
import com.fasterxml.jackson.databind.node.ObjectNode;
import de.gwdg.cdstar.Promise;
......@@ -21,8 +19,9 @@ public class DeleteRequest implements ElasticSearchClient.ESPreparedRequest<Dele
@Override
public Promise<DeleteResult> submit() {
final HttpDelete rq = new HttpDelete(esClient.uri(index, ElasticSearchClient.TYPE_NAME, id));
return esClient.send(rq).map(r -> new DeleteResult(r));
return esClient
.send(esClient.buildRequest(index, ElasticSearchClient.TYPE_NAME, id).DELETE())
.map(r -> new DeleteResult(r));
}
public class DeleteResult {
......
package de.gwdg.cdstar.ext.elastic;
import org.apache.http.HttpResponse;
import java.net.http.HttpResponse;
import com.fasterxml.jackson.databind.node.ObjectNode;
......@@ -10,11 +10,11 @@ public class ESErrorResponse extends Exception {
public static final String TYPE_INDEX_ALREADY_EXISTS = "resource_already_exists_exception";
public static final String TYPE_INDEX_NOT_FOUND = "index_not_found_exception";
private final HttpResponse response;
private final HttpResponse<?> response;
private final ObjectNode json;
public ESErrorResponse(HttpResponse response, ObjectNode objectNode) {
super(response.getStatusLine().getStatusCode() + ": " + objectNode.toString());
public ESErrorResponse(HttpResponse<?> response, ObjectNode objectNode) {
super(response.statusCode() + ": " + objectNode.toString());
this.response = response;
json = objectNode;
}
......@@ -27,7 +27,7 @@ public class ESErrorResponse extends Exception {
return json.path("error").path("type").asText("missing_error_type");
}
public HttpResponse getResponse() {
public HttpResponse<?> getResponse() {
return response;
}
}
......@@ -2,28 +2,23 @@ package de.gwdg.cdstar.ext.elastic;
import java.io.Closeable;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpRequest.Builder;
import java.net.http.HttpResponse.BodyHandlers;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.http.HttpEntityEnclosingRequest;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.utils.URIUtils;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.EntityTemplate;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.nio.client.HttpAsyncClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -49,11 +44,10 @@ import de.gwdg.cdstar.Utils;
*/
public class ElasticSearchClient implements Closeable {
private static final ContentType BULK_CONTENT_TYPE = ContentType.create("application/x-ndjson");
private static final ContentType JSON_CONTENT_TYPE = ContentType.APPLICATION_JSON;
public static ObjectMapper om = new ObjectMapper();
static final Logger log = LoggerFactory.getLogger(ElasticSearchClient.class);
public static final String TYPE_NAME = "doc";
private static final byte[] NL = new byte[] { '\n' };
Set<Promise<?>> inFlight = ConcurrentHashMap.newKeySet();
......@@ -68,98 +62,82 @@ public class ElasticSearchClient implements Closeable {
om.disable(SerializationFeature.WRITE_DATE_TIMESTAMPS_AS_NANOSECONDS);
}
private final CloseableHttpAsyncClient http;
final URI baseUri;
private EntityTemplate entity;
private HttpClient http;
public ElasticSearchClient(URI clusterUri) {
baseUri = clusterUri;
final RequestConfig rc = RequestConfig.custom()
.setSocketTimeout(60_000)
.setConnectTimeout(60_000)
.setConnectionRequestTimeout(60_000)
http = HttpClient.newBuilder()
.version(Version.HTTP_1_1)
.connectTimeout(Duration.ofSeconds(60))
.build();
http = HttpAsyncClients.custom().setDefaultRequestConfig(rc).build();
http.start();
}
@Override
public void close() {
Utils.closeQuietly(http);
inFlight.forEach(Promise::cancel);
}
Promise<ObjectNode> sendBulk(HttpEntityEnclosingRequest rq, Iterable<Object> bodyParts) {
entity = new EntityTemplate((stream) -> {
for (final Object b : bodyParts) {
om.writeValue(stream, b);
stream.write('\n');
stream.flush();
Promise<ObjectNode> sendBulk(HttpRequest.Builder rq, Collection<Object> bodyParts) {
rq.setHeader("Content-Type", "application/x-ndjson");
var lines = new ArrayList<byte[]>(bodyParts.size() * 2);
try {
for (var part : bodyParts) {
lines.add(om.writeValueAsBytes(part));
lines.add(NL);
}
});
entity.setContentType(BULK_CONTENT_TYPE.toString());
rq.setEntity(entity);
return send((HttpUriRequest) rq);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
rq.POST(BodyPublishers.ofByteArrays(lines));
return send(rq);
}
Promise<ObjectNode> post(HttpRequest.Builder rq, Object body) {
try {
rq.POST(BodyPublishers.ofByteArray(om.writeValueAsBytes(body)));
rq.setHeader("Content-Type", "application/json");
return send(rq);
} catch (final Exception e) {
return Promise.ofError(e);
}
}
Promise<ObjectNode> send(HttpEntityEnclosingRequest rq, Object body) {
Promise<ObjectNode> put(HttpRequest.Builder rq, Object body) {
try {
final byte[] payload = body != null ? om.writeValueAsBytes(body) : new byte[] {};
rq.setEntity(new ByteArrayEntity(payload, JSON_CONTENT_TYPE));
rq.PUT(BodyPublishers.ofByteArray(om.writeValueAsBytes(body)));
rq.setHeader("Content-Type", "application/json");
return send(rq);
} catch (final Exception e) {
return Promise.ofError(e);
}
}
return send((HttpUriRequest) rq);
public Builder buildRequest(String... parts) {
return HttpRequest.newBuilder(baseUri.resolve(Utils.join("/", parts)));
}
public URI uri(String... parts) {
return URIUtils.resolve(baseUri, Utils.join("/", parts));
public Builder buildRequest(URI relativeUri) {
return HttpRequest.newBuilder(baseUri.resolve(relativeUri));
}
/**
* Return an cancelable promise for the response.
*/
Promise<ObjectNode> send(HttpUriRequest rq) {
final Promise<ObjectNode> p = Promise.empty(null);
Promise<ObjectNode> send(HttpRequest.Builder rq) {
var p = Promise
.wrap(http.sendAsync(rq.build(), BodyHandlers.ofInputStream()))
.map(rs -> {
var json = om.readValue(rs.body(), ObjectNode.class);
if (rs.statusCode() > 299)
throw new ESErrorResponse(rs, json);
return json;
});
inFlight.add(p);
final Future<HttpResponse> future = http.execute(rq, new FutureCallback<HttpResponse>() {
@Override
public void failed(Exception ex) {
// TODO: Wrap this exception to give some context to the stack
// trace.
p.reject(ex);
}
@Override
public void completed(HttpResponse result) {
try {
final ObjectNode json = (ObjectNode) om.readTree(result.getEntity().getContent());
if (result.getStatusLine().getStatusCode() > 299)
p.reject(new ESErrorResponse(result, json));
else
p.resolve(json);
} catch (final Exception e) {
p.reject(e);
}
}
@Override
public void cancelled() {
p.cancel();
}
});
// Cancel the request if the promise is manually cancelled. Remove
// promise from inFlight list in any case.
p.then((v, e) -> {
inFlight.remove(p);
if (e != null)
future.cancel(true);
});
return p;
......@@ -231,9 +209,8 @@ public class ElasticSearchClient implements Closeable {
}
public Promise<String> waitForYellow() {
final HttpGet rq = new HttpGet(uri("_cluster", "health?wait_for_status=yellow"));
var rq = buildRequest("_cluster", "health?wait_for_status=yellow");
return send(rq).map(r -> r.path("status").asText("unknown"));
}
}
package de.gwdg.cdstar.ext.elastic;
import org.apache.http.client.methods.HttpGet;
import de.gwdg.cdstar.Promise;
public class FetchRequest implements ElasticSearchClient.ESPreparedRequest<ESDocument> {
......@@ -17,7 +15,7 @@ public class FetchRequest implements ElasticSearchClient.ESPreparedRequest<ESDoc
@Override
public Promise<ESDocument> submit() {
final HttpGet searchRequest = new HttpGet(esClient.uri(index, ElasticSearchClient.TYPE_NAME, id));
return esClient.send(searchRequest).map(r -> new ESDocument(r));
var rq = esClient.buildRequest(index, ElasticSearchClient.TYPE_NAME, id);
return esClient.send(rq).map(r -> new ESDocument(r));
}
}
\ No newline at end of file
package de.gwdg.cdstar.ext.elastic;
import java.net.http.HttpRequest;
import java.util.concurrent.TimeUnit;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import com.fasterxml.jackson.databind.node.ObjectNode;
import de.gwdg.cdstar.Promise;
import de.gwdg.cdstar.UriBuilder;
import de.gwdg.cdstar.ext.elastic.HealthRequest.HealthResult;
public class HealthRequest implements ElasticSearchClient.ESPreparedRequest<HealthResult> {
......@@ -32,12 +31,12 @@ public class HealthRequest implements ElasticSearchClient.ESPreparedRequest<Heal
@Override
public Promise<HealthResult> submit() {
final URIBuilder urib = new URIBuilder(esClient.uri("cluster", "health"));
var ub = new UriBuilder(esClient.baseUri).path("cluster", "health");
if (waitForStatus != null)
urib.addParameter("wait_for_status", waitForStatus.name().toLowerCase());
ub.query("wait_for_status", waitForStatus.name().toLowerCase());
if (timeout >= 0)
urib.addParameter("timeout", Long.toString(timeout) + "ms");
final HttpGet rq = new HttpGet(urib.toString());
ub.query("timeout", Long.toString(timeout) + "ms");
var rq = ub.map(HttpRequest::newBuilder);
return esClient.send(rq).map(r -> new HealthResult(r));
}
......
package de.gwdg.cdstar.ext.elastic;
import java.net.URI;
import org.apache.http.client.methods.HttpPut;
import java.net.http.HttpRequest.Builder;
import com.fasterxml.jackson.databind.node.ObjectNode;
......@@ -25,14 +23,13 @@ public class IndexRequest implements ElasticSearchClient.ESPreparedRequest<Index
@Override
public Promise<IndexResponse> submit() {
URI uri;
Builder rq;
if (id == null)
uri = esClient.uri(index, ElasticSearchClient.TYPE_NAME);
rq = esClient.buildRequest(index, ElasticSearchClient.TYPE_NAME);
else
uri = esClient.uri(index, ElasticSearchClient.TYPE_NAME, id);
rq = esClient.buildRequest(index, ElasticSearchClient.TYPE_NAME, id);
final HttpPut rq = new HttpPut(uri);
return esClient.send(rq, document).map(r -> new IndexResponse(r));
return esClient.put(rq, document).map(r -> new IndexResponse(r));
}
public class IndexResponse {
......