Source code for pyworkflow.utils.retry_streaming



import logging
import os
import time
import random
import sqlite3
from functools import wraps

from pyworkflow.utils import yellowStr, redStr

logger = logging.getLogger(__name__)

# Stream state constants (mirrors pyworkflow.object.Set)
_STREAM_OPEN = 1
_STREAM_STATE_KEY = '_streamState'


[docs]def safeIsStreamOpen(setObj) -> bool: """Check if a Set's stream is open using an independent SQLite connection. Opens a short-lived, read-only connection to the Set's backing SQLite file and queries ONLY the _streamState property. This avoids: - Reusing the Set's internal mapper connection (no mapper contention) - Holding protocol-level locks during the check - The heavyweight loadAllProperties() call that reads ALL properties Safe to call from any thread while the producer may be writing concurrently. Returns False (stream closed) on any error, since a missing/corrupt DB means no more data is coming. """ dbPath = getattr(setObj, 'getFileName', lambda: None)() if not dbPath or not os.path.exists(dbPath): return False try: conn = sqlite3.connect(f"file:{dbPath}?mode=ro", uri=True, timeout=3) try: cursor = conn.execute( "SELECT value FROM Properties WHERE key=?", (_STREAM_STATE_KEY,) ) row = cursor.fetchone() if row is None: return False return int(row[0]) == _STREAM_OPEN finally: conn.close() except Exception: return False
[docs]def refreshStreamState(setObj) -> None: """Update the in-memory stream state of a Set from the database. Uses safeIsStreamOpen() to read the current state from an independent connection, then patches the in-memory _streamState attribute so that subsequent setObj.isStreamOpen() calls return the fresh value without needing loadAllProperties(). """ from pyworkflow.object import Set if safeIsStreamOpen(setObj): setObj.setStreamState(Set.STREAM_OPEN) else: setObj.setStreamState(Set.STREAM_CLOSED)
[docs]def is_sqlite_lock_error(exc: Exception) -> bool: """Return True if exc is a SQLite OperationalError indicating locked/busy DB.""" if isinstance(exc, sqlite3.OperationalError): msg = str(exc).lower() # Cover common variants reported by SQLite return ( "database is locked" in msg or "database is busy" in msg or "locked" in msg or "busy" in msg ) return False
[docs]def retry_on_sqlite_lock( max_attempts: int = 15, initial_delay: float = 0.25, backoff_factor: float = 1.7, max_delay: float = 10, jitter: float = 0.05, log=None, predicate=is_sqlite_lock_error, ): """ Decorator that retries the wrapped function when SQLite signals lock/busy. Behavior: - Exponential backoff with small jitter to avoid retry synchronization. - Only retries when predicate(exc) is True (i.e., lock/busy by default). - Propagates any non-retriable exceptions immediately. - If max attempts are exhausted, re-raises the original exception. Parameters: max_attempts Max number of tries (including the first call). initial_delay Initial sleep before the first retry (seconds). backoff_factor Multiply delay on each retry. max_delay Cap the delay (seconds). jitter Random noise added to delay (seconds). log Optional logger (e.g., logging.getLogger(__name__)). predicate Function deciding whether the exception should be retried. """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): attempts = 0 delay = float(initial_delay) while True: try: return func(*args, **kwargs) except Exception as exc: if predicate(exc): attempts += 1 who = getattr(func, "__qualname__", func.__name__) if log: if attempts == 1: log.error(yellowStr(f"[{who}] SQLite locked/busy; retrying up to {max_attempts} attempts")) log.error(yellowStr(f"[{who}] attempt {attempts}/{max_attempts} -> {exc}; sleeping {delay:.2f}s")) if attempts >= max_attempts: if log: log.error(redStr(f"[{who}] exhausted retries; raising")) raise sleep_for = delay + random.uniform(0.0, jitter) time.sleep(sleep_for) delay = min(delay * backoff_factor, max_delay) continue # Not a lock/busy error -> propagate raise return wrapper return decorator