import os
import urllib.parse
from threading import Timer, Thread, Event
PATH_TYPES = (str,)
if hasattr(os, "PathLike"):
......@@ -32,3 +33,30 @@ def url_split_auth(url: str):
netloc += ":" + str(split.port)
url = urllib.parse.urlunsplit(split._replace(netloc=netloc))
return url, username, password
class IntervalTimer(Thread):
""" A thread that runs a function over and over until stopped.
t = IntervalTimer(30.0, f, args=None, kwargs=None)
def __init__(self, interval, function, *args, **kwargs):
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.finished = Event()
def set_interval(self, interval):
self.interval = interval
def cancel(self):
def run(self):
while not self.finished.wait(self.interval):
self.function(*self.args, **self.kwargs)
......@@ -3,11 +3,12 @@ Client api implementation. Usually imported directly from :mod:`pycdstar` and no
import os
import threading
import typing
import requests
from pycdstar3._utils import PATH_TYPES, url_split_auth
from pycdstar3._utils import PATH_TYPES, url_split_auth, IntervalTimer
from pycdstar3.model import ApiError, JsonObject, FileDownload, FormUpdate
__all__ = "CDStar", "CDStarVault", "FormUpdate", "ApiError"
......@@ -43,6 +44,7 @@ class CDStar:
self._session = _session or requests.Session()
self._tx = None
self._autocommit = False
self._keepalive_timer = None
def clone(self):
""" Return an independent instance with the same settings.
......@@ -79,20 +81,24 @@ class CDStar:
raise ApiError(rs)
def rest(self, method, *path, expect_status=None, **options) -> JsonObject:
def rest(self, method, *path, expect_status=None, **options) -> typing.Optional[JsonObject]:
""" Just like `raw()`, but expects the response to be JSON and returns
the parsed result instead of the raw response. Non-JSON responses
are errors.
are errors. Empty (204) responses return None.
Disclaimer: Avoid using this method if there is a more specific implementation available.
If you find a feature missing from this class, please submit a feature request instead of
over-using this method.
# TODO: Expect json errors or non-json responses and throw a better error message
return self.raw(method, *path, expect_status=expect_status, **options).json(object_hook=JsonObject)
# TODO: Expect json errors or non-json responses and
# throw a better error message
rs = self.raw(method, *path, expect_status=expect_status, **options)
if rs.status_code == 204:
return None
return rs.json(object_hook=JsonObject)
def begin(self, autocommit=False, readonly=False):
def begin(self, autocommit=False, readonly=False, keepalive=False):
""" Start a new transaction and return self.
Transactions are used to group multiple operations into a single atomic action. After begin(), you have to
......@@ -108,23 +114,28 @@ class CDStar:
closed automatically. If autocommit is true and no exception was raised, it is committed. Otherwise, it is
rolled back.
:param autocommit: Commit this transaction automatically after the with-block without errors? (default: false)
:param readonly: Create a (cheap) read-only transaction.
:param autocommit: Commit this transaction if the with-block ended without errors. (default: False)
:param readonly: Create a (cheaper) read-only transaction. (default: False)
:param keepalive: Automatically call :meth:`keepalive` from a separate thread. This is only
required when waiting for user input for a long time. (default: False)
self._autocommit = autocommit
self._tx ="POST", "_tx", data={"readonly": readonly})
if keepalive:
return self
def tx(self) -> JsonObject:
""" Return the current transaction handle, or None if no transaction is running. """
""" Return the transaction handle, or None if no transaction is running. """
return self._tx
def commit(self):
""" Commit the current transaction. """
if not self._tx:
raise RuntimeError("No transaction running")
self.raw("POST", "_tx",
......@@ -135,17 +146,45 @@ class CDStar:
def rollback(self):
""" Rollback the current transaction, if any. Do nothing otherwise. """
if self._tx:
self.raw("DELETE", "_tx",
self.raw("DELETE", "_tx", self._tx["id"])
self._tx = None
def keepalive(self):
""" If a transaction is running, keep it alive. Otherwise, do nothing. """
""" Keep the current transaction alive (reset the timeout). """
if not self._tx:
raise RuntimeError("No transaction running")
self._tx ="GET", "_tx",
self._tx ="POST", "_tx", self._tx["id"], params={"renew": True})
def _auto_keepalive_start(self):
# leeway: 10% or 2 seconds, but sleep for at least a second
interval = max(1, min(self.tx['ttl'] * 0.9, self.tx['ttl'] - 2))
self._keepalive_timer = IntervalTimer(interval, self._auto_keepalive, self.tx)
self._keepalive_timer.daemon = True
def _auto_keepalive(self, tx):
timer = self._keepalive_timer
if timer != threading.current_thread():
raise RuntimeError()
if not self.tx or self.tx["id"] != tx["id"]:
raise RuntimeError()
# leeway: 10% or 2 seconds, but sleep for at least a second
timer.set_interval(max(1, min(self.tx['ttl'] * 0.9, self.tx['ttl'] - 2)))
def _auto_keepalive_stop(self):
""" Stop the current keepalive timer, if any. """
if self._keepalive_timer:
self._keepalive_timer = None
def __enter__(self):
""" Expect a transaction to be already running. """
