Gitlab Community Edition Instance

Commit bca37abf authored by mhellka's avatar mhellka
Browse files

Split implementation from public SessionRegistry interface

parent ceacf07d
Pipeline #116800 passed with stages
in 9 minutes and 39 seconds
......@@ -50,7 +50,7 @@ import de.gwdg.cdstar.runtime.services.MetricsFeature;
import de.gwdg.cdstar.runtime.services.PluginLoader;
import de.gwdg.cdstar.runtime.services.PoolServiceImpl;
import de.gwdg.cdstar.runtime.services.ServiceRegistry;
import de.gwdg.cdstar.runtime.services.SessionRegistry;
import de.gwdg.cdstar.runtime.services.SessionRegistryImpl;
import de.gwdg.cdstar.runtime.services.VaultRegistry;
import de.gwdg.cdstar.runtime.services.health.HealthMonitorFeature;
import de.gwdg.cdstar.runtime.tasks.TaskServiceImpl;
......@@ -82,7 +82,7 @@ public class CDStarRuntime implements RuntimeContext {
private final VaultRegistry vaultRegistry;
private final ServiceRegistry services;
private final AuthConfigImpl authConfig;
private final SessionRegistry sessionRegistry;
private final SessionRegistryImpl sessionRegistry;
private final ProfileRegistry profileRegistry;
private final SimpleAuthorizer systemRealm;
......@@ -153,7 +153,7 @@ public class CDStarRuntime implements RuntimeContext {
services = new ServiceRegistry();
authConfig = new AuthConfigImpl();
sessionRegistry = new SessionRegistry();
sessionRegistry = new SessionRegistryImpl();
vaultRegistry = new VaultRegistry(dataPath);
profileRegistry = new ProfileRegistry();
transactionService = new DiskTransactionManager(getServiceDir("tx"));
......@@ -423,7 +423,7 @@ public class CDStarRuntime implements RuntimeContext {
@Override
public CDStarSession resumeSession(String tx) {
return ensureStarted(sessionRegistry).get(tx);
return ensureStarted(sessionRegistry).get(tx).orElse(null);
}
@Override
......
package de.gwdg.cdstar.runtime.services;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.MetricRegistry;
import de.gwdg.cdstar.runtime.RuntimeContext;
import de.gwdg.cdstar.runtime.client.CDStarSession;
import de.gwdg.cdstar.runtime.listener.RuntimeListener;
import de.gwdg.cdstar.runtime.listener.SessionListener;
import de.gwdg.cdstar.runtime.listener.SessionStartListener;
public class SessionRegistry implements SessionStartListener, SessionListener, RuntimeListener {
private final ConcurrentHashMap<String, CDStarSession> allSessions = new ConcurrentHashMap<>();
private ScheduledFuture<?> sessionReaper;
private final AtomicLong totalCounter = new AtomicLong();
private static final Logger log = LoggerFactory.getLogger(SessionRegistry.class);
@Override
public void onStartup(RuntimeContext ctx) throws Exception {
sessionReaper = ctx.lookupRequired(CronService.class).scheduleWithFixedDelay(this::reapExpiredSessions, 60, 10,
TimeUnit.SECONDS);
ctx.lookup(MetricRegistry.class).ifPresent(mr -> {
mr.gauge("sess.open", () -> () -> allSessions.size());
mr.gauge("sess.open.ro", () -> () -> allSessions.values().stream()
.filter(CDStarSession::isReadOnly).count());
mr.gauge("sess.open.rw", () -> () -> allSessions.values().stream()
.filter(((Predicate<CDStarSession>)CDStarSession::isReadOnly).negate()).count());
mr.gauge("sess.total", () -> () -> totalCounter.get());
});
}
private void reapExpiredSessions() {
for (final CDStarSession s : allSessions.values()) {
if (s.isExpired()) {
log.info("Closing expired session: {} (readOnly={})", s.getSessionId(), String.valueOf(s.isReadOnly()));
s.rollback(new TimeoutException("Session expired"));
}
}
}
@Override
public void onShutdown(RuntimeContext ctx) {
sessionReaper.cancel(false);
for (final CDStarSession s : allSessions.values()) {
try {
synchronized (s) {
if (!s.isClosed()) {
log.error("Forcefully closing session: {}", s.getSessionId());
s.rollback(new CancellationException("Runtime shutting down"));
}
}
} catch (final Exception e) {
log.error("Failed to rollback session during shutdown: {}", s.getSessionId(), e);
}
}
}
@Override
public void onSessionStarted(CDStarSession session) {
allSessions.put(session.getSessionId(), session);
totalCounter.incrementAndGet();
session.addListener(this);
}
@Override
public void onCommit(CDStarSession session) {
allSessions.remove(session.getSessionId());
}
@Override
public void onRollback(CDStarSession session) {
allSessions.remove(session.getSessionId());
}
public interface SessionRegistry {
/**
* Return the session with the specified ID, if present.
*/
public CDStarSession get(String tx) {
return allSessions.get(tx);
}
Optional<CDStarSession> get(String tx);
}
}
\ No newline at end of file
package de.gwdg.cdstar.runtime.services;
import java.util.Optional;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.MetricRegistry;
import de.gwdg.cdstar.runtime.RuntimeContext;
import de.gwdg.cdstar.runtime.client.CDStarSession;
import de.gwdg.cdstar.runtime.listener.RuntimeListener;
import de.gwdg.cdstar.runtime.listener.SessionListener;
import de.gwdg.cdstar.runtime.listener.SessionStartListener;
public class SessionRegistryImpl implements SessionStartListener, SessionListener, RuntimeListener, SessionRegistry {
private final ConcurrentHashMap<String, CDStarSession> allSessions = new ConcurrentHashMap<>();
private ScheduledFuture<?> sessionReaper;
private final AtomicLong totalCounter = new AtomicLong();
private static final Logger log = LoggerFactory.getLogger(SessionRegistryImpl.class);
@Override
public void onStartup(RuntimeContext ctx) throws Exception {
sessionReaper = ctx.lookupRequired(CronService.class).scheduleWithFixedDelay(this::reapExpiredSessions, 60, 10,
TimeUnit.SECONDS);
ctx.lookup(MetricRegistry.class).ifPresent(mr -> {
mr.gauge("sess.open", () -> () -> allSessions.size());
mr.gauge("sess.open.ro", () -> () -> allSessions.values().stream()
.filter(CDStarSession::isReadOnly).count());
mr.gauge("sess.open.rw", () -> () -> allSessions.values().stream()
.filter(((Predicate<CDStarSession>)CDStarSession::isReadOnly).negate()).count());
mr.gauge("sess.total", () -> () -> totalCounter.get());
});
}
private void reapExpiredSessions() {
for (final CDStarSession s : allSessions.values()) {
if (s.isExpired()) {
log.info("Closing expired session: {} (readOnly={})", s.getSessionId(), String.valueOf(s.isReadOnly()));
s.rollback(new TimeoutException("Session expired"));
}
}
}
@Override
public void onShutdown(RuntimeContext ctx) {
sessionReaper.cancel(false);
for (final CDStarSession s : allSessions.values()) {
try {
synchronized (s) {
if (!s.isClosed()) {
log.error("Forcefully closing session: {}", s.getSessionId());
s.rollback(new CancellationException("Runtime shutting down"));
}
}
} catch (final Exception e) {
log.error("Failed to rollback session during shutdown: {}", s.getSessionId(), e);
}
}
}
@Override
public void onSessionStarted(CDStarSession session) {
allSessions.put(session.getSessionId(), session);
totalCounter.incrementAndGet();
session.addListener(this);
}
@Override
public void onCommit(CDStarSession session) {
allSessions.remove(session.getSessionId());
}
@Override
public void onRollback(CDStarSession session) {
allSessions.remove(session.getSessionId());
}
@Override
public Optional<CDStarSession> get(String tx) {
return Optional.ofNullable(allSessions.get(tx));
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment