Lifetime scopes¶
A Scope is the runtime concept behind the SCOPED lifecycle.
It is a bounded cache plus teardown registry: while the scope
is open, every SCOPED binding resolves to the same instance;
on exit, every cached instance that exposes close or
aclose is collected for the eventual teardown propagation
(3.9). The canonical use case is request-scoped dependency
injection in a web framework - one scope per request, the
resolver hands out the same Request, Logger, DBConnection
throughout, and the container closes them on response.
API¶
class Scope:
def lookup(self, token: DependencyToken) -> Any: ...
def remember(self, token: DependencyToken, instance: Any) -> None: ...
def teardowns(self) -> tuple[Closeable | AsyncCloseable, ...]: ...
def current_scope() -> Scope | None: ...
@contextmanager
def lifetime_scope() -> Iterator[Scope]: ...
@asynccontextmanager
async def alifetime_scope() -> AsyncIterator[Scope]: ...
Scope is constructed exclusively by the scope context
managers; users do not instantiate it directly outside tests.
current_scope() reads the active scope (or None) from the
backing ContextVar, which is what the resolver consults to
decide whether a SCOPED binding has a home.
Opening a scope¶
from tripack_runtime import lifetime_scope, Resolver
resolver = Resolver(graph)
with lifetime_scope() as scope:
handler = resolver.resolve(RequestHandler)
# any `SCOPED` resolution inside this block shares `scope`'s cache
Inside the block, current_scope() is scope. On exit, the
previous value (typically None) is restored. The scope
object survives the block - callers can still inspect
scope.teardowns() after the with exits to know what was
cached. The teardown propagation lands in 3.9; for now the
scope only collects targets.
The async variant alifetime_scope has identical semantics:
Each asyncio.Task opened inside the surrounding context
inherits its own copy of the backing ContextVar, so two
coroutines launched under asyncio.gather each open and close
their own scope without interfering. Nested scopes work the
same way: the inner scope shadows the outer for the duration of
its block, and the outer is restored on exit.
SCOPED lifecycle dispatch¶
When the resolver encounters a SCOPED binding, it:
- reads
current_scope(); - if no scope is active, raises
ScopeErrorimmediately (the cycle-detection stack is not pushed - the error names the missing scope, not a phantom resolution frame); - otherwise consults
scope.lookup(token)for a cached instance, returns it on a hit; - on a miss, opens a cycle-detection frame, invokes the
factory, then
scope.remember(token, instance).
graph.register(
Binding(token=Clock, factory=Clock, lifecycle=Lifecycle.SCOPED)
)
# No active scope: ScopeError
resolver.resolve(Clock)
# -> ScopeError: Token <class 'Clock'> has SCOPED lifecycle but no
# scope is active; open one with lifetime_scope() or alifetime_scope()
# before resolving.
# Inside a scope: cached for the scope duration
with lifetime_scope():
a = resolver.resolve(Clock)
b = resolver.resolve(Clock)
assert a is b
# Across scopes: separate instances
with lifetime_scope():
c = resolver.resolve(Clock)
with lifetime_scope():
d = resolver.resolve(Clock)
assert c is not d
Teardown propagation¶
SCOPED instances exposing close or aclose are appended to
scope.teardowns() in construction order and torn down in
LIFO order on scope exit. TRANSIENT instances are never
registered (no owner to close them); SINGLETON instances are
registered on the resolver instead.
with lifetime_scope() as scope:
pool = resolver.resolve(ConnectionPool) # SCOPED + closeable
cache = resolver.resolve(RedisCache) # SCOPED + closeable
# pool and cache are closed in reverse order on the `with` exit:
# cache.close() runs first, then pool.close()
The async variant alifetime_scope awaits aclose on each
target on exit, falling back to sync close for sync-only
ones:
async with alifetime_scope() as scope:
pool = await resolver.aresolve(AsyncConnectionPool)
# scope's aclose() runs: await pool.aclose() then ...
Two convenience guarantees:
- Teardown happens even when the body raises. The scope's
close/acloseis invoked from the context manager'sfinally, so resources do not leak on the error path. - Single failing target does not skip its siblings. Errors
are collected and surfaced as one
ExceptionGroupat the end; every target gets a chance to close. A target'scloseis required to be idempotent by theCloseablecontract, so a double invocation is safe.
If the sync lifetime_scope ends but the scope holds async-
only teardowns (only aclose, no close), those targets are
skipped silently - reach them through alifetime_scope
instead. The skip is intentional: a sync exit cannot await.
Teardown registry¶
with lifetime_scope() as scope:
pool = resolver.resolve(ConnectionPool) # SCOPED + closeable
assert scope.teardowns() == (pool,)
The classifier is structural duck-typing (any object with a
callable close or aclose qualifies), matching the spirit
of the Closeable / AsyncCloseable Protocols which are
intentionally not @runtime_checkable.
Factory error semantics¶
A SCOPED factory that raises does NOT poison the cache: the
next call within the same scope retries. A factory that
recursively resolves its own token trips the cycle detector and
leaves the scope's cache empty.
Idempotent registration¶
Scope.remember is idempotent at the cache level. If the same
token has already been cached, the existing entry wins: the
cache is not overwritten, the teardown list is not extended,
and the existing instance is the return value.
scope = Scope()
first = ConnectionPool()
second = ConnectionPool()
a = scope.remember(ConnectionPool, first)
b = scope.remember(ConnectionPool, second)
assert a is first
assert b is first # second discarded, first wins
assert scope.teardowns() == (first,) # only the first is registered
This protects against the async race where two coroutines both
build a SCOPED instance after observing an empty cache: the
first one to reach remember wins, and the second receives the
canonical instance back. The discarded instance's lifetime is
the caller's responsibility - the runtime cannot retroactively
close a value the factory already produced. Sync resolution is
single-threaded within one task, so the race only matters for
async paths.
The same guard sits in the resolver's SINGLETON dispatch: two
concurrent aresolve(Token) calls on a SINGLETON binding both
complete their factory but return the same canonical instance,
with a single teardown registration.
Boundary with other lifecycles¶
| Lifecycle | Cache lives on | Survives scope exit |
|---|---|---|
TRANSIENT |
nowhere (no cache) | n/a |
SCOPED |
the active Scope |
no - new scope, new instance |
SINGLETON |
the Resolver |
yes - shared across all scopes |
A SINGLETON resolved inside a lifetime_scope() block is
still the same instance everywhere; the open scope simply does
not see it. A TRANSIENT resolution inside a scope is still a
fresh instance every call, and the scope's teardown list stays
empty regardless.