Gitlab Community Edition Instance

Commit d62a1631 authored by mhellka's avatar mhellka
Browse files

Improved rabbitmq-sink

- More robust error handling (retry until send queue overflows)
- Added metrics (experimental)
parent 8d7e8536
Pipeline #104117 passed with stage
in 4 minutes and 40 seconds
package de.gwdg.cdstar.ext.rabbitmq;
import com.codahale.metrics.Counting;
import com.codahale.metrics.Metric;
@FunctionalInterface
public interface FCounter extends Metric, Counting {
}
......@@ -3,13 +3,16 @@ package de.gwdg.cdstar.ext.rabbitmq;
import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -18,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import de.gwdg.cdstar.Utils;
......@@ -36,149 +38,258 @@ class RabbitMQConnector implements Closeable {
public static final String PROP_BROKER = "broker";
public static final String PROP_EXCHANGE = "exchange";
private volatile boolean blocked = false;
private static final long BUSY_WAIT_MS = 1000;
private static final long ERROR_WAIT_MS = 10000;
private volatile boolean closed = false;
private Connection connection;
private Channel channel;
private Stats stats;
private final String exchangeName;
private final URI brokerUri;
private BlockingQueue<ChangeEvent> sendQueue;
private int sendQueueSize = 1024;
private int maxInFlightEvents = 128;
private BlockingQueue<ChangeEvent> sendQueue = new ArrayBlockingQueue<>(sendQueueSize);
SortedMap<Long, ChangeEvent> inFlight = new TreeMap<>();
private volatile boolean blocked = false;
private int sendQueueSize = 1024;
private Thread sendThread;
private ConnectionFactory connectionFactory;
private Channel rmqChannel;
public RabbitMQConnector(URI brokerUri, String exchangeName) {
this.brokerUri = brokerUri;
this.exchangeName = exchangeName;
try {
connectionFactory = new ConnectionFactory();
connectionFactory.setUri(brokerUri);
// We do NOT enable autoRecovery as we do that ourself
} catch (KeyManagementException | NoSuchAlgorithmException | URISyntaxException e) {
throw new IllegalArgumentException("Failed to configure RabbitMQ connector", e);
}
}
public int getSendQueueSize() {
return sendQueueSize;
}
public void setSendQueueSize(int sendQueueSize) {
public synchronized void setSendQueueSize(int sendQueueSize) {
this.sendQueueSize = sendQueueSize;
final BlockingQueue<ChangeEvent> oldQueue = sendQueue;
sendQueue = sendQueueSize > 0 ? new ArrayBlockingQueue<>(sendQueueSize) : new LinkedBlockingQueue<>();
if (oldQueue != null)
oldQueue.drainTo(sendQueue);
sendQueue = new ArrayBlockingQueue<>(sendQueueSize);
for (ChangeEvent event; (event = oldQueue.poll()) != null;)
enqueueOrDrop(event);
}
public int getMaxInFlightEvents() {
return maxInFlightEvents;
}
public void setMaxInFlightEvents(int maxInFlightEvents) {
this.maxInFlightEvents = maxInFlightEvents;
}
public synchronized void start() throws IOException, TimeoutException {
if (sendThread != null)
throw new IllegalStateException("Already started");
sendThread = new Thread(null, this::sendLoop, "rabbitmq-send");
sendThread.start();
}
public Stats getStats() {
return stats;
}
/**
* Try (multiple times) to connect a channel, or raise the last exception.
* Connect or reconnect to RabbitMQ. Rezurn the opened channel
*
* @return
*/
public synchronized void connect() throws Exception {
setSendQueueSize(sendQueueSize);
final ConnectionFactory factory = new ConnectionFactory();
factory.setUri(brokerUri);
factory.setAutomaticRecoveryEnabled(true);
private synchronized Channel getOrConnectChannel() throws IOException, TimeoutException {
if (rmqChannel != null && rmqChannel.isOpen())
return rmqChannel;
connection = factory.newConnection();
channel = connection.createChannel();
// Clean state
rmqChannel = null;
blocked = false;
handleNack(Long.MAX_VALUE, true);
try {
channel.exchangeDeclarePassive(exchangeName);
} catch (final IOException e) {
log.error("Exchange does not exist: {}", e);
rmqChannel = connectionFactory.newConnection().createChannel();
rmqChannel.exchangeDeclarePassive(exchangeName);
rmqChannel.confirmSelect();
rmqChannel.addConfirmListener(this::handleAck, this::handleNack);
rmqChannel.getConnection().addBlockedListener(this::handleBlocked, this::handleUnblocked);
return rmqChannel;
} catch (Exception e) {
if (rmqChannel != null)
Utils.closeQuietly(rmqChannel.getConnection());
rmqChannel = null;
throw e;
}
channel.confirmSelect();
channel.addConfirmListener(this::handleAck, this::handleNack);
connection.addBlockedListener(this::handleBlocked, this::handleUnblocked);
sendThread = new Thread(null, this::sendLoop, "rabbitmq-send");
sendThread.start();
}
/**
* Thread for sending events from the {@link #sendQueue}.
*/
private void sendLoop() {
ChangeEvent event = null;
try {
while (!closed) {
while ((event = sendQueue.poll(1000, TimeUnit.MILLISECONDS)) != null) {
sendOne(event);
Channel channel;
try {
channel = getOrConnectChannel();
} catch (Exception e) {
log.warn("Failed to connect to RabbitMQ", e);
stats.connectionFailed.increment();
Thread.sleep(ERROR_WAIT_MS);
continue;
}
if (inFlight.size() > maxInFlightEvents) {
Thread.sleep(BUSY_WAIT_MS);
continue;
}
if (blocked) {
Thread.sleep(BUSY_WAIT_MS);
continue;
}
ChangeEvent event = sendQueue.peek();
if (event == null) {
Thread.sleep(BUSY_WAIT_MS);
continue;
}
try {
sendOne(channel, event);
} catch (Exception e) {
if (log.isWarnEnabled())
log.warn("Failed to send event: {}", toJson(event), e);
stats.sendFailed.increment();
Thread.sleep(ERROR_WAIT_MS);
continue;
}
// Success -> Remove event from queue
sendQueue.poll();
}
} catch (final InterruptedException e) {
} catch (InterruptedException e) {
} finally {
close();
}
}
/**
* Try (multiple times) to send an event, or drop it.
*/
private void sendOne(ChangeEvent event) {
private long sendOne(Channel channel, ChangeEvent event) throws IOException {
final long seqNo = channel.getNextPublishSeqNo();
final BasicProperties props = new BasicProperties();
final byte[] payload = toJson(event);
final String routingKey = event.getVault() + ":" + event.getArchive();
try {
final long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish(exchangeName, routingKey, props, payload);
synchronized (inFlight) {
inFlight.put(seqNo, event);
}
} catch (final Exception e) {
drop(event, e);
channel.basicPublish(exchangeName, routingKey, props, payload);
synchronized (inFlight) {
inFlight.put(seqNo, event);
}
return seqNo;
}
/**
* Enqueue an event, or drop it and return false if the send queue is full.
*/
public boolean enqueueOrDrop(ChangeEvent event) {
stats.eventQueued.increment();
if (sendQueue.offer(event))
return true;
drop(event, new RuntimeException("Send queue overflow"));
drop(event, "Send queue overflow");
return false;
}
private boolean retryOrDrop(ChangeEvent event, String dropReason) {
if (sendQueue.offer(event))
return true;
drop(event, dropReason);
return false;
}
private void drop(ChangeEvent event, String reason) {
stats.eventDropped.increment();
// TODO: Implement dead-letter log to file?
log.warn("Event dropped ({}): {}", reason, new String(toJson(event), StandardCharsets.UTF_8));
}
@Override
public synchronized void close() {
if (closed)
return;
closed = true;
// TODO: Wait some time for the send queue to drain?
Utils.closeQuietly(channel);
Utils.closeQuietly(connection);
inFlight.values().forEach(event -> drop(event, new RuntimeException("shutdown")));
inFlight.clear();
// Drop all remaining events
for (ChangeEvent event; (event = sendQueue.poll()) != null;)
drop(event, "shutdown");
synchronized (inFlight) {
inFlight.forEach((key, ev) -> drop(ev, "shutdown"));
inFlight.clear();
}
}
void handleBlocked(String reason) throws IOException {
blocked = true;
log.warn("Connection blocked");
}
/**
* Try to re-enqueue NACKd events, or drop them if queue is full.
*
* This results in out-of-order delivery, but that's better than dropping.
*/
void handleNack(long deliveryTag, boolean multiple) {
void handleUnblocked() throws IOException {
blocked = false;
log.info("Connection unblocked");
}
Consumer<ChangeEvent> onNack = event -> {
retryOrDrop(event, "NACK");
stats.sendFailed.increment();
if (log.isWarnEnabled())
log.debug("Recieved NACK from server for event: {}", toJson(event));
};
void handleNack(long deliveryTag, boolean multiple) throws IOException {
synchronized (inFlight) {
if (multiple) {
final SortedMap<Long, ChangeEvent> failed = inFlight.headMap(deliveryTag + 1);
failed.values().forEach(event -> drop(event, new IOException("NACK")));
failed.values().forEach(onNack);
failed.clear();
} else {
final ChangeEvent event = inFlight.remove(deliveryTag);
if (event != null)
drop(event, new IOException("NACK"));
if (event != null) {
onNack.accept(event);
}
}
}
}
synchronized void handleAck(long deliveryTag, boolean multiple) throws IOException {
synchronized void handleAck(long deliveryTag, boolean multiple) {
synchronized (inFlight) {
if (multiple)
inFlight.headMap(deliveryTag + 1).clear();
else
inFlight.remove(deliveryTag);
if (multiple) {
final SortedMap<Long, ChangeEvent> acked = inFlight.headMap(deliveryTag + 1);
stats.eventDelivered.add(acked.size());
acked.clear();
} else {
if (inFlight.remove(deliveryTag) != null)
stats.eventDelivered.increment();
}
}
}
static byte[] toJson(ChangeEvent event) {
private void handleBlocked(String reason) {
blocked = true;
log.info("Connection blocked: {}", reason);
}
private void handleUnblocked() {
blocked = false;
log.info("Connection unblocked");
}
private static byte[] toJson(ChangeEvent event) {
try {
return mapper.writeValueAsBytes(event);
} catch (final IOException e) {
......@@ -186,8 +297,4 @@ class RabbitMQConnector implements Closeable {
}
}
private void drop(ChangeEvent event, Exception e) {
log.warn("Event dropped: {}", new String(toJson(event), StandardCharsets.UTF_8), e);
}
}
......@@ -2,6 +2,8 @@ package de.gwdg.cdstar.ext.rabbitmq;
import java.net.URI;
import com.codahale.metrics.MetricRegistry;
import de.gwdg.cdstar.Utils;
import de.gwdg.cdstar.event.ChangeEvent;
import de.gwdg.cdstar.runtime.Config;
......@@ -37,6 +39,17 @@ public class RabbitMQSink implements RuntimeListener {
onEvent(event);
}
});
conn = new RabbitMQConnector(brokerUri, exchangeName);
MetricRegistry metrics = runtime.lookupRequired(MetricRegistry.class);
Stats stats = conn.getStats();
// TODO: Define and document a naming scheme for plugin metrics
metrics.register("rmqsink.total", (FCounter) () -> stats.eventsTotal());
metrics.register("rmqsink.dropped", (FCounter) () -> stats.eventsDelivered());
metrics.register("rmqsink.delivered", (FCounter) () -> stats.eventsDropped());
metrics.register("rmqsink.waiting", (FCounter) () -> stats.eventsWaiting());
metrics.register("rmqsink.connectErrors", (FCounter) () -> stats.connectErrors());
metrics.register("rmqsink.sendErrors", (FCounter) () -> stats.sendErrors());
}
void onEvent(ChangeEvent event) {
......@@ -45,8 +58,7 @@ public class RabbitMQSink implements RuntimeListener {
@Override
public void onStartup(RuntimeContext ctx) throws Exception {
conn = new RabbitMQConnector(brokerUri, exchangeName);
conn.connect();
conn.start();
}
@Override
......
package de.gwdg.cdstar.ext.rabbitmq;
import java.util.concurrent.atomic.LongAdder;
class Stats {
LongAdder eventQueued = new LongAdder();
LongAdder eventDropped = new LongAdder();
LongAdder eventDelivered = new LongAdder();
LongAdder connectionFailed = new LongAdder();
LongAdder sendFailed = new LongAdder();
public long eventsTotal() {
return eventQueued.longValue();
}
public long eventsDropped() {
return eventDropped.longValue();
}
public long eventsDelivered() {
return eventDelivered.longValue();
}
public long eventsWaiting() {
return eventsTotal() - eventsDropped() - eventsDelivered();
}
public long connectErrors() {
return connectionFailed.longValue();
}
public long sendErrors() {
return sendFailed.longValue();
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment