This document describes the current stable version of Celery (5.0). For development docs, go here.
celery.contrib.testing.manager
¶
API Reference¶
Integration testing utilities.
-
class
celery.contrib.testing.manager.
Manager
(app, **kwargs)[source]¶ Test helpers for task integration tests.
-
class
celery.contrib.testing.manager.
ManagerMixin
[source]¶ Mixin that adds
Manager
capabilities.-
assert_result_tasks_in_progress_or_completed
(async_results, interval=0.5, desc='waiting for tasks to be started or completed', **policy)[source]¶
-
ensure_not_for_a_while
(fun, catch, desc='thing', max_retries=20, interval_start=0.1, interval_step=0.02, interval_max=1.0, emit_warning=False, **options)[source]¶ Make sure something does not happen (at least for a while).
-
wait_for
(fun: Callable, catch: Sequence[Any], desc: str = 'thing', args: Tuple = (), kwargs: Dict = None, errback: Callable = None, max_retries: int = 10, interval_start: float = 0.1, interval_step: float = 0.5, interval_max: float = 5.0, emit_warning: bool = False, **options: Any) → Any[source]¶ Wait for event to happen.
The catch argument specifies the exception that means the event has not happened yet.
-