Gitlab Community Edition Instance

Commit 709049ff authored by mhellka's avatar mhellka
Browse files

Support multiple elasticsearch and opensearch versions

Subtile differences make it hard to support ElasticSearch 6, 7, 8 and OpenSearch at the same time. Support is still experimental.
parent c9a8d508
Pipeline #291586 passed with stages
in 8 minutes and 47 seconds
......@@ -3,7 +3,6 @@
cache:
paths:
- .m2/
- .pip-cache/
variables:
MAVEN_OPTS: "-Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=WARN -Dorg.slf4j.simpleLogger.showDateTime=true -Djava.awt.headless=true -Dmaven.repo.local=.m2"
......@@ -11,24 +10,75 @@ variables:
PIP_CACHE_DIR: .pip-cache/
stages:
- build
- test
- verify
- deploy
test-java-11:
stage: test
# This is mostly used to download dependencies and warm up the .m2 cache
build:
stage: build
image: maven:3-jdk-11
script:
- useradd -m notroot
- chown -R notroot:notroot .
- su notroot -c "$MVN clean test verify"
- su notroot -c "$MVN clean package -DskipTests=true"
test-java-latest:
test-jdk-11: &default-test
stage: test
image: maven:3-jdk-11
script:
- useradd -m notroot
- chown -R notroot:notroot .
- su notroot -c "$MVN test"
test-jdk-latest:
<<: *default-test
image: maven:3
verify-es6: &full-test
stage: verify
image: maven:3-jdk-11
script:
- useradd -m notroot
- chown -R notroot:notroot .
- su notroot -c "$MVN clean test verify"
- su notroot -c "$MVN verify"
services:
- name: "elasticsearch:6.8.23"
alias: search
command: ["bin/elasticsearch", "-Ediscovery.type=single-node"]
variables:
ES_JAVA_OPTS: "-Xms512m -Xmx512m"
variables:
TEST_ES_URI: "http://search:9200/"
verify-es7:
<<: *full-test
services:
- name: "elasticsearch:7.17.1"
alias: search
command: ["bin/elasticsearch", "-Ediscovery.type=single-node"]
variables:
ES_JAVA_OPTS: "-Xms512m -Xmx512m"
verify-es8:
<<: *full-test
services:
- name: "elasticsearch:8.0.1"
alias: search
variables:
ES_JAVA_OPTS: "-Xms512m -Xmx512m"
ES_SETTING_DISCOVERY_TYPE: "single-node"
ES_SETTING_XPACK_SECURITY_ENABLED: "false"
verify-os1:
<<: *full-test
services:
- name: "opensearchproject/opensearch:1.2.4"
alias: search
command: ["bin/opensearch", "-Ediscovery.type=single-node", "-Eplugins.security.disabled=true"]
variables:
OPENSEARCH_JAVA_OPTS: "-Xms512m -Xmx512m"
deploy-docker-release:
stage: deploy
......
......@@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import de.gwdg.cdstar.Promise;
import de.gwdg.cdstar.ext.elastic.BulkRequest.ESBulkResult;
import de.gwdg.cdstar.ext.elastic.ElasticSearchClient.BaseResult;
public class BulkRequest implements ElasticSearchClient.ESPreparedRequest<ESBulkResult> {
private final ElasticSearchClient esClient;
......@@ -18,41 +19,36 @@ public class BulkRequest implements ElasticSearchClient.ESPreparedRequest<ESBulk
private final List<BulkEntry> bulks = new ArrayList<>();
public BulkRequest index(String index, String id, Object data) {
private BulkRequest add(String type, String index, String id, ObjectNode data) {
final ObjectNode command = esClient.json();
command.with("index")
command.with(type)
.put("_index", index)
.put("_type", ElasticSearchClient.TYPE_NAME)
.put("_id", id);
if (!(data instanceof ObjectNode))
data = ElasticSearchClient.om.valueToTree(data);
bulks.add(new BulkEntry(command, (ObjectNode) data));
if (esClient.version().needsDoc())
command.with(type).put("_type", ElasticSearchClient.DEFAULT_TYPE_NAME);
bulks.add(new BulkEntry(command, data));
return this;
}
public BulkRequest index(String index, String id, Object data) {
if (data != null && !(data instanceof ObjectNode))
data = ElasticSearchClient.om.valueToTree(data);
return add("index", index, id, (ObjectNode) data);
}
public BulkRequest delete(String index, String id) {
final ObjectNode command = esClient.json();
command.with("delete")
.put("_index", index)
.put("_type", ElasticSearchClient.TYPE_NAME)
.put("_id", id);
bulks.add(new BulkEntry(command, null));
return this;
return add("delete", index, id, null);
}
public BulkRequest update(String index, String id, Object data) {
final ObjectNode command = esClient.json();
command.with("update")
.put("_index", index)
.put("_type", ElasticSearchClient.TYPE_NAME)
.put("_id", id);
if (!(data instanceof ObjectNode))
if (data != null && !(data instanceof ObjectNode))
data = ElasticSearchClient.om.valueToTree(data);
ObjectNode ob = (ObjectNode) data;
if (!(ob.has("script") || ob.has("doc")))
ob = (ObjectNode) esClient.json().set("doc", ob);
bulks.add(new BulkEntry(command, ob));
return this;
return add("update", index, id, ob);
}
@Override
......@@ -108,20 +104,14 @@ public class BulkRequest implements ElasticSearchClient.ESPreparedRequest<ESBulk
}
}
public static class ESBulkResult {
private final ObjectNode result;
public static class ESBulkResult extends BaseResult {
public ESBulkResult(ObjectNode result) {
this.result = result;
super(result);
}
public boolean hasErrors() {
return result.get("errors").asBoolean();
}
public ObjectNode getResult() {
return result;
return getResult().get("errors").asBoolean();
}
}
......
......@@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import de.gwdg.cdstar.Promise;
import de.gwdg.cdstar.ext.elastic.CreateIndexRequest.CereateIndexResult;
import de.gwdg.cdstar.ext.elastic.ElasticSearchClient.BaseResult;
public class CreateIndexRequest implements ElasticSearchClient.ESPreparedRequest<CereateIndexResult> {
......@@ -22,17 +23,30 @@ public class CreateIndexRequest implements ElasticSearchClient.ESPreparedRequest
return esClient.put(esClient.buildRequest(index), payload).map(r -> new CereateIndexResult(r));
}
public class CereateIndexResult {
public class CereateIndexResult extends BaseResult {
public CereateIndexResult(ObjectNode r) {
super(r);
}
}
public CreateIndexRequest mappings(ObjectNode mappings) {
mappings().setAll(mappings);
return this;
}
public ObjectNode mappings() {
return payload.with("mappings").with(ElasticSearchClient.TYPE_NAME);
return esClient.version().needsDoc()
? payload.with("mappings").with(ElasticSearchClient.DEFAULT_TYPE_NAME)
: payload.with("mappings");
}
public ObjectNode settings() {
return payload.with("settings");
}
public CreateIndexRequest settings(ObjectNode settings) {
settings().setAll(settings);
return this;
}
}
......@@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import de.gwdg.cdstar.Promise;
import de.gwdg.cdstar.ext.elastic.DeleteByQueryRequest.DeleteByQueryResult;
import de.gwdg.cdstar.ext.elastic.ElasticSearchClient.BaseResult;
public class DeleteByQueryRequest implements ElasticSearchClient.ESPreparedRequest<DeleteByQueryResult> {
......@@ -27,12 +28,13 @@ public class DeleteByQueryRequest implements ElasticSearchClient.ESPreparedReque
.map(r -> new DeleteByQueryResult(r));
}
public class DeleteByQueryResult {
public class DeleteByQueryResult extends BaseResult {
public DeleteByQueryResult(ObjectNode r) {
// TODO Auto-generated constructor stub
public DeleteByQueryResult(ObjectNode answer) {
super(answer);
}
}
}
package de.gwdg.cdstar.ext.elastic;
import com.fasterxml.jackson.databind.node.ObjectNode;
import de.gwdg.cdstar.Promise;
import de.gwdg.cdstar.ext.elastic.DeleteIndexRequest.DeleteIndexResult;
import de.gwdg.cdstar.ext.elastic.ElasticSearchClient.BaseResult;
public class DeleteIndexRequest implements ElasticSearchClient.ESPreparedRequest<DeleteIndexResult> {
......@@ -15,19 +18,14 @@ public class DeleteIndexRequest implements ElasticSearchClient.ESPreparedRequest
@Override
public Promise<DeleteIndexResult> submit() {
return esClient.send(esClient.buildRequest(index).DELETE())
.map(r -> new DeleteIndexResult(), e -> {
if (e instanceof ESErrorResponse
&& ((ESErrorResponse) e).getType().equals(ESErrorResponse.TYPE_INDEX_NOT_FOUND)) {
// Deleting a non-existing index is always a success (?)
return new DeleteIndexResult();
}
throw e;
});
return esClient.send(esClient.buildRequest(index).DELETE()).map(DeleteIndexResult::new);
}
public class DeleteIndexResult {
public class DeleteIndexResult extends BaseResult {
public DeleteIndexResult(ObjectNode answer) {
super(answer);
}
}
}
......@@ -4,6 +4,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import de.gwdg.cdstar.Promise;
import de.gwdg.cdstar.ext.elastic.DeleteRequest.DeleteResult;
import de.gwdg.cdstar.ext.elastic.ElasticSearchClient.BaseResult;
public class DeleteRequest implements ElasticSearchClient.ESPreparedRequest<DeleteResult> {
......@@ -20,13 +21,17 @@ public class DeleteRequest implements ElasticSearchClient.ESPreparedRequest<Dele
@Override
public Promise<DeleteResult> submit() {
return esClient
.send(esClient.buildRequest(index, ElasticSearchClient.TYPE_NAME, id).DELETE())
.send(
(esClient.version().needsDoc()
? esClient.buildRequest(index, ElasticSearchClient.DEFAULT_TYPE_NAME, id)
: esClient.buildRequest(index, id)).DELETE())
.map(r -> new DeleteResult(r));
}
public class DeleteResult {
public class DeleteResult extends BaseResult {
public DeleteResult(ObjectNode r) {
public DeleteResult(ObjectNode answer) {
super(answer);
// TODO Auto-generated constructor stub
}
......
......@@ -3,16 +3,16 @@ package de.gwdg.cdstar.ext.elastic;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.node.ObjectNode;
public class ESDocument {
import de.gwdg.cdstar.ext.elastic.ElasticSearchClient.BaseResult;
protected final ObjectNode doc;
public class ESDocument extends BaseResult {
public ESDocument(ObjectNode doc) {
this.doc = doc;
super(doc);
}
public ObjectNode source() {
return doc.with("_source");
return getResult().with("_source");
}
public <T> T source(Class<T> type) throws JsonProcessingException {
......@@ -20,6 +20,6 @@ public class ESDocument {
}
public String getId() {
return doc.get("_id").asText();
return getResult().get("_id").asText();
}
}
\ No newline at end of file
......@@ -11,6 +11,8 @@ import java.net.http.HttpResponse.BodyHandlers;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
......@@ -29,15 +31,16 @@ import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.node.ObjectNode;
import de.gwdg.cdstar.Promise;
import de.gwdg.cdstar.UriBuilder;
import de.gwdg.cdstar.Utils;
/**
* A basic elasticsearch REST client based on Apache {@link HttpAsyncClient},
* cdstar {@link Promise}s and jackson (for json processing). The elasticsearch
* java libraries are not used.
* A basic elasticsearch REST client based on Apache {@link HttpClient}, cdstar
* {@link Promise}s and jackson (for json processing). The elasticsearch java
* libraries are not used.
*
* This implementation aims to be more usable and convenient than the official
* low-level client, but less defendant on the whole elasticsearch stack and
* low-level client, but less dependant on the whole elasticsearch stack and
* more stable than the official high-level client. Only the most commonly used
* parts of the ES API are implemented, but direct API access is allowed, so any
* missing parts can be worked around by manually building the REST calls.
......@@ -46,7 +49,7 @@ import de.gwdg.cdstar.Utils;
public class ElasticSearchClient implements Closeable {
public static ObjectMapper om = new ObjectMapper();
static final Logger log = LoggerFactory.getLogger(ElasticSearchClient.class);
public static final String TYPE_NAME = "doc";
public static final String DEFAULT_TYPE_NAME = "doc";
private static final byte[] NL = new byte[] { '\n' };
Set<Promise<?>> inFlight = ConcurrentHashMap.newKeySet();
......@@ -64,14 +67,56 @@ public class ElasticSearchClient implements Closeable {
final URI baseUri;
private HttpClient http;
private ESVersion version;
public ElasticSearchClient(URI clusterUri) {
baseUri = clusterUri;
this.baseUri = clusterUri;
this.version = ESVersion.AUTODETECT;
http = HttpClient.newBuilder()
.version(Version.HTTP_1_1)
.connectTimeout(Duration.ofSeconds(60))
.build();
.version(Version.HTTP_1_1)
.connectTimeout(Duration.ofSeconds(60))
.build();
}
/**
* If version is still {@link ESVersion#AUTODETECT}, then try to figure out
* version now. Otherwise, return whatever was detected previously or configured
* explicitly.
*
* @throws InterruptedException
* @throws ExecutionException
* @throws CancellationException
*
*/
public ESVersion resolveVersion() {
if (version != ESVersion.AUTODETECT)
return version;
ObjectNode clusterInfo;
try {
clusterInfo = get(buildRequest(), null).get();
} catch (CancellationException | ExecutionException | InterruptedException e) {
throw new RuntimeException("Unable to detect ES cluister version", e);
}
var dist = clusterInfo.path("version").path("distribution").textValue();
var num = clusterInfo.path("version").path("number").textValue();
if ("opensearch".equals(dist)) {
if (num.startsWith("1."))
version = ESVersion.OS1;
} else {
if (num.startsWith("6."))
version = ESVersion.ES6;
else if (num.startsWith("7."))
version = ESVersion.ES7;
else if (num.startsWith("8."))
version = ESVersion.ES8;
}
if (version == ESVersion.AUTODETECT)
throw new RuntimeException("Unsupported ES version: " + num + "(" + dist + ")");
return version;
}
@Override
......@@ -94,47 +139,62 @@ public class ElasticSearchClient implements Closeable {
return send(rq);
}
Promise<ObjectNode> post(HttpRequest.Builder rq, Object body) {
Promise<ObjectNode> sendMethod(String method, HttpRequest.Builder rq, Object body) {
try {
rq.POST(BodyPublishers.ofByteArray(om.writeValueAsBytes(body)));
rq.setHeader("Content-Type", "application/json");
if (body != null) {
rq.setHeader("Content-Type", "application/json");
rq.method(method, BodyPublishers.ofByteArray(om.writeValueAsBytes(body)));
} else {
rq.method(method, BodyPublishers.noBody());
}
return send(rq);
} catch (final Exception e) {
return Promise.ofError(e);
}
}
Promise<ObjectNode> get(HttpRequest.Builder rq, Object body) {
return sendMethod("GET", rq, body);
}
Promise<ObjectNode> post(HttpRequest.Builder rq, Object body) {
return sendMethod("POST", rq, body);
}
Promise<ObjectNode> put(HttpRequest.Builder rq, Object body) {
try {
rq.PUT(BodyPublishers.ofByteArray(om.writeValueAsBytes(body)));
rq.setHeader("Content-Type", "application/json");
return send(rq);
} catch (final Exception e) {
return Promise.ofError(e);
}
return sendMethod("PUT", rq, body);
}
public UriBuilder buildUri(String... path) {
return new UriBuilder(baseUri).path(path);
}
public Builder buildRequest(String... parts) {
return HttpRequest.newBuilder(baseUri.resolve(Utils.join("/", parts)));
return buildUri(parts).map(HttpRequest::newBuilder);
}
public Builder buildRequest(URI relativeUri) {
return HttpRequest.newBuilder(baseUri.resolve(relativeUri));
public Builder buildRequest(Map<String, String> params, String... parts) {
var urib = buildUri(parts);
params.forEach(urib::query);
return urib.map(HttpRequest::newBuilder);
}
/**
* Return an cancelable promise for the response.
*/
Promise<ObjectNode> send(HttpRequest.Builder rq) {
var rqb = rq.build();
var p = Promise
.wrap(http.sendAsync(rq.build(), BodyHandlers.ofInputStream()))
.map(rs -> {
var json = om.readValue(rs.body(), ObjectNode.class);
if (rs.statusCode() > 299)
throw new ESErrorResponse(rs, json);
return json;
});
.wrap(http.sendAsync(rqb, BodyHandlers.ofInputStream()))
.map(rs -> {
var json = om.readValue(rs.body(), ObjectNode.class);
if (rs.statusCode() > 299)
throw new ESErrorResponse(rs, json);
return json;
})
.then(
r -> log.debug("ES Result: {}", r),
e -> log.warn("Search error:", e));
inFlight.add(p);
p.then((v, e) -> {
inFlight.remove(p);
......@@ -171,6 +231,18 @@ public class ElasticSearchClient implements Closeable {
return new CreateIndexRequest(this, index);
}
public GetIndexRequest getIndex(String index) {
return new GetIndexRequest(this, index);
}
public UpdateIndexSettingRequest updateIndexSettings(String index) {
return new UpdateIndexSettingRequest(this, index);
}
public UpdateIndexMappingRequest updateIndexMapping(String index) {
return new UpdateIndexMappingRequest(this, index);
}
public DeleteIndexRequest deleteIndex(String index) {
return new DeleteIndexRequest(this, index);
}
......@@ -191,26 +263,84 @@ public class ElasticSearchClient implements Closeable {
}
}
public interface ESPreparedRequest<T> {
public interface ESPreparedRequest<T extends BaseResult> {
Promise<T> submit();
default CompletableFuture<T> submitFuture() {
return submit().toCompletableFuture();
}
default T execute() throws CancellationException, ExecutionException, InterruptedException {
return submit().get();
default T execute() throws CancellationException, InterruptedException, ESErrorResponse {
try {
return submit().get();
} catch (ExecutionException e) {
var cause = e.getCause();
if (cause instanceof ESErrorResponse)
throw (ESErrorResponse) cause;
if (cause instanceof RuntimeException)
throw (RuntimeException) cause;
throw new RuntimeException(cause);
}
}
default T execute(long timeout, TimeUnit unit)
throws CancellationException, ExecutionException, InterruptedException, TimeoutException {
return submit().get(timeout, unit);
throws CancellationException, InterruptedException, TimeoutException, ESErrorResponse {