A general purpose Task and TaskQueue for running tasks with
dependencies and failure/retry, potentially in parallel.
*Latest release 20240423*:
Small fixes.
## Class `BaseTask(cs.fsm.FSM, cs.resources.RunStateMixin)`
A base class subclassing `cs.fsm.FSM` with a `RunStateMixin`.
Note that this class and the `FSM` base class does not provide
a `FSM_DEFAULT_STATE` attribute; a default `state` value of
`None` will leave `.fsm_state` _unset_.
This behaviour is is chosen mostly to support subclasses
with unusual behaviour, particularly Django's `Model` class
whose `refresh_from_db` method seems to not refresh fields
which already exist, and setting `.fsm_state` from a
`FSM_DEFAULT_STATE` class attribute thus breaks this method.
Subclasses of this class and `Model` should _not_ provide a
`FSM_DEFAULT_STATE` attribute, instead relying on the field
definition to provide this default in the usual way.
*Method `BaseTask.as_dot(self, name=None, **kw)`*:
Return a DOT syntax digraph starting at this `Task`.
Parameters are as for `Task.tasks_as_dot`.
*Method `BaseTask.dot_node_label(self)`*:
The default DOT node label.
*Method `BaseTask.tasks_as_dot(tasks, name=None, *, follow_blocking=False, sep=None)`*:
Return a DOT syntax digraph of the iterable `tasks`.
Nodes will be coloured according to `DOT_NODE_FILLCOLOR_PALETTE`
based on their state.
Parameters:
* `tasks`: an iterable of `Task`s to populate the graph
* `name`: optional graph name
* `follow_blocking`: optional flag to follow each `Task`'s
`.blocking` attribute recursively and also render those
`Task`s
* `sep`: optional node seprator, default `'
'`
*Method `BaseTask.tasks_as_svg(tasks, name=None, **kw)`*:
Return an SVG diagram of the iterable `tasks`.
This takes the same parameters as `tasks_as_dot`.
## Class `BlockedError(TaskError)`
Raised by a blocked `Task` if attempted.
## Function `main(argv)`
Dummy main programme to exercise something.
## Function `make(*tasks, fail_fast=False, queue=None)`
Generator which completes all the supplied `tasks` by dispatching them
once they are no longer blocked.
Yield each task from `tasks` as it completes (or becomes cancelled).
Parameters:
* `tasks`: `Task`s as positional parameters
* `fail_fast`: default `False`; if true, cease evaluation as soon as a
task completes in a state with is not `DONE`
* `queue`: optional callable to submit a task for execution later
via some queue such as `Later` or celery
The following rules are applied by this function:
- if a task is being prepared, raise an `FSMError`
- if a task is already running or queued, wait for its completion
- if a task is pending:
* if any prerequisite has failed, fail this task
* if any prerequisite is cancelled, cancel this task
* if any prerequisite is pending, make it first
* if any prerequisite is not done, fail this task
* otherwise dispatch this task and then yield it
- if `fail_fast` and the task is not done, return
Examples:
>>> t1 = Task('t1', lambda: print('doing t1'), track=True)
>>> t2 = t1.then('t2', lambda: print('doing t2'), track=True)
>>> list(make(t2)) # doctest: +ELLIPSIS
t1 PENDING->dispatch->RUNNING
doing t1
t1 RUNNING->done->DONE
t2 PENDING->dispatch->RUNNING
doing t2
t2 RUNNING->done->DONE
[Task('t2',<function <lambda> at ...>,state='DONE')]
## Function `make_later(L, *tasks, fail_fast=False)`
Dispatch the `tasks` via `L:Later` for asynchronous execution
if it is not already completed.
The caller can wait on `t.result` for completion.
This calls `make_now()` in a thread and uses `L.defer` to
queue the task and its prerequisites for execution.
## Function `make_now(*tasks, fail_fast=False, queue=None)`
Run the generator `make(*tasks)` to completion and return the
list of completed tasks.
## Class `Task(BaseTask, cs.threads.HasThreadState)`
A task which may require the completion of other tasks.
The model here may not be quite as expected; it is aimed at
tasks which can be repaired and rerun.
As such, if `self.run(func,...)` raises an exception from
`func` then this `Task` will still block dependent `Task`s.
Dually, a `Task` which completes without an exception is
considered complete and does not block dependent `Task`s.
Keyword parameters:
* `cancel_on_exception`: if true, cancel this `Task` if `.run`
raises an exception; the default is `False`, allowing repair
and retry
* `cancel_on_result`: optional callable to test the `Task.result`
after `.run`; if the callable returns `True` the `Task` is marked
as cancelled, allowing repair and retry
* `func`: the function to call to complete the `Task`;
it will be called as `func(*func_args,**func_kwargs)`
* `func_args`: optional positional arguments, default `()`
* `func_kwargs`: optional keyword arguments, default `{}`
* `lock`: optional lock, default an `RLock`
* `state`: initial state, default from `self._state.initial_state`,
which is initally '`PENDING`'
* `track`: default `False`;
if `True` then apply a callback for all states to print task transitions;
otherwise it should be a callback function suitable for `FSM.fsm_callback`
Other arguments are passed to the `Result` initialiser.
Example:
t1 = Task(name="task1")
t1.bg(time.sleep, 10)
t2 = Task(name="task2")
# prevent t2 from running until t1 completes
t2.require(t1)
# try to run sleep(5) for t2 immediately after t1 completes
t1.notify(t2.call, sleep, 5)
Users wanting more immediate semantics can supply
`cancel_on_exception` and/or `cancel_on_result` to control
these behaviours.
Example:
t1 = Task(name="task1")
t1.bg(time.sleep, 2)
t2 = Task(name="task2")
# prevent t2 from running until t1 completes
t2.require(t1)
# try to run sleep(5) for t2 immediately after t1 completes
t1.notify(t2.call, sleep, 5)
*Method `Task.__call__(self)`*:
Block on `self.result` awaiting completion
by calling `self.result()`.
*Method `Task.bg(self)`*:
Dispatch a function to complete the `Task` in a separate `Thread`,
returning the `Thread`.
This raises `BlockedError` for a blocked task.
otherwise the thread runs `self.dispatch()`.
*Method `Task.block(self, otask)`*:
Block another task until we are complete.
The converse of `.require()`.
*Method `Task.blockers(self)`*:
A generator yielding tasks from `self.required`
which should block this task.
Aborted tasks are not blockers
but if we encounter one we do abort the current task.
*Method `Task.cancel(self)`*:
Transition this `Task` to `CANCELLED` state.
If the task is running, set `.cancelled` on the `RunState`,
allowing clean task cancellation and subsequent transition
(mediated by the `.run()` method).
Otherwise fire the `'cancel'` event directly.
*Method `Task.dispatch(self)`*:
Dispatch the `Task`:
If the task is blocked, raise `BlockedError`.
If a prerequisite is aborted, fire the 'abort' method.
Otherwise fire the `'dispatch'` event and then run the
task's function via the `.run()` method.
*Method `Task.isblocked(self)`*:
A task is blocked if any prerequisite is not complete.
*Method `Task.iscompleted(self)`*:
This task is completed (even if failed) and does not block contingent tasks.
*Method `Task.join(self)`*:
Wait for this task to complete.
*Method `Task.make(self, fail_fast=False)`*:
Complete `self` and its prerequisites.
This calls the global `make()` function with `self`.
It returns a Boolean indicating whether this task completed.
*`Task.perthread_state`*
*Method `Task.require(self, otask: 'TaskSubType')`*:
Add a requirement that `otask` be complete before we proceed.
This is the simple `Task` only version of `.then()`.
*Method `Task.run(self)`*:
Run the function associated with this task,
completing the `self.result` `Result` appropriately when finished.
*WARNING*: this _ignores_ the current state and any blocking `Task`s.
You should usually use `dispatch` or `make`.
During the run the thread local `Task.default()`
will be `self` and the `self.runstate` will be running.
Otherwise run `func_result=self.func(*self.func_args,**self.func_kwargs)`
with the following effects:
* if the function raises a `CancellationError`, cancel the `Task`
* if the function raises another exception,
if `self.cancel_on_exception` then cancel the task
else complete `self.result` with the exception
and fire the `'error'` `event
* if `self.runstate.canceled` or `self.cancel_on_result`
was provided and `self.cancel_on_result(func_result)` is
true, cancel the task
* otherwise complete `self.result` with `func_result`
and fire the `'done'` event
*Method `Task.then(self, func: Union[str, Callable, ForwardRef('TaskSubType')], *a, func_args=(), func_kwargs=None, **task_kw)`*:
Prepare a new `Task` or function which may not run before `self` completes.
This may be called in two ways:
- `task.then(some_Task): block the `Task` instance `some_Task` behind `self`
- `task.then([name,]func[,func_args=][,func_kwargs=][,Task_kwargs...]):
make a new `Task` to be blocked behind `self`
Return the new `Task`.
This supports preparing a chain of actions:
>>> t_root = Task("t_root", lambda: 0)
>>> t_leaf = t_root.then(lambda: 1).then(lambda: 2)
>>> t_root.iscompleted() # the root task has not yet run
False
>>> t_leaf.iscompleted() # the final task has not yet run
False
>>> # t_leaf is blocked by t_root
>>> t_leaf.dispatch() # doctest: +ELLIPSIS
Traceback (most recent call last):
...
cs.taskqueue.BlockedError: ...
>>> t_leaf.make() # make the leaf, but make t_root first
True
>>> t_root.iscompleted() # implicitly completed by make
True
>>> t_leaf.iscompleted()
True
## Class `TaskError(cs.fsm.FSMError)`
Raised by `Task` related errors.
## Class `TaskQueue`
A task queue for managing and running a set of related tasks.
Unlike `make` and `Task.make`, this is aimed at a "dispatch" worker
which dispatches individual tasks as required.
Example 1, put 2 dependent tasks in a queue and run:
>>> t1 = Task("t1", lambda: print("t1"))
>>> t2 = t1.then("t2", lambda: print("t2"))
>>> q = TaskQueue(t1, t2)
>>> for _ in q.run(): pass
...
t1
t2
Example 2, put 1 task in a queue and run.
The queue only runs the specified tasks:
>>> t1 = Task("t1", lambda: print("t1"))
>>> t2 = t1.then("t2", lambda: print("t2"))
>>> q = TaskQueue(t1)
>>> for _ in q.run(): pass
...
t1
Example 2, put 1 task in a queue with `run_dependent_tasks=True` and run.
The queue pulls in the dependencies of completed tasks and also runs those:
>>> t1 = Task("t1", lambda: print("t1"))
>>> t2 = t1.then("t2", lambda: print("t2"))
>>> q = TaskQueue(t1, run_dependent_tasks=True)
>>> for _ in q.run(): pass
...
t1
t2
*Method `TaskQueue.__init__(self, *tasks, run_dependent_tasks=False)`*:
Initialise the queue with the supplied `tasks`.
*Method `TaskQueue.add(self, task)`*:
Add a task to the tasks managed by this queue.
*Method `TaskQueue.as_dot(self, name=None, **kw)`*:
Compute a DOT syntax graph description of the tasks in the queue.
*Method `TaskQueue.get(self)`*:
Pull a completed or an unblocked pending task from the queue.
Return the task or `None` if nothing is available.
The returned task is no longer tracked by this queue.
*Method `TaskQueue.run(self, runstate=None, once=False)`*:
Process tasks in the queue until the queue has no completed tasks,
yielding each task, immediately if `task.iscompleted()`
otherwise after `taks.dispatch()`.
An optional `RunState` may be provided to allow early termination
via `runstate.cancel()`.
An incomplete task is `dispatch`ed before `yield`;
ideally it will be complete when the yield happens,
but its semantics might mean it is in another state such as `CANCELLED`.
The consumer of `run` must handle these situations.
# Release Log
*Release 20240423*:
Small fixes.
*Release 20230401*:
Add missing requirement to DISTINFO.
*Release 20230331*:
* Task: subclass BaseTask instead of (FSM, RunStateMixin).
* BaseTask.__init__: use @uses_runstate to ensure we've got a RunState.
*Release 20230217*:
Task: subclass HasThreadState, drop .current_task() class method.
*Release 20221207*:
* Pull out core stuff from Task into BaseTask, aids subclassing.
* BaseTask: explainatory docustring about unusual FSM_DEFAULT_STATE design choice.
* BaseTask.tasks_as_dot: express the edges using the node ids instead of their labels.
* BaseTask: new tasks_as_svg() method like tasks_as_dot() but returning SVG.
*Release 20220805*:
Initial PyPI release.
Raw data
{
"_id": null,
"home_page": null,
"name": "cs.taskqueue",
"maintainer": null,
"docs_url": null,
"requires_python": null,
"maintainer_email": null,
"keywords": "python3",
"author": null,
"author_email": "Cameron Simpson <cs@cskk.id.au>",
"download_url": "https://files.pythonhosted.org/packages/f1/5d/92a0bba4861b023fd90ccb13349791cd5b56588320ab3a67d8db5f2978e9/cs.taskqueue-20240423.tar.gz",
"platform": null,
"description": "A general purpose Task and TaskQueue for running tasks with\ndependencies and failure/retry, potentially in parallel.\n\n*Latest release 20240423*:\nSmall fixes.\n\n## Class `BaseTask(cs.fsm.FSM, cs.resources.RunStateMixin)`\n\nA base class subclassing `cs.fsm.FSM` with a `RunStateMixin`.\n\nNote that this class and the `FSM` base class does not provide\na `FSM_DEFAULT_STATE` attribute; a default `state` value of\n`None` will leave `.fsm_state` _unset_.\n\nThis behaviour is is chosen mostly to support subclasses\nwith unusual behaviour, particularly Django's `Model` class\nwhose `refresh_from_db` method seems to not refresh fields\nwhich already exist, and setting `.fsm_state` from a\n`FSM_DEFAULT_STATE` class attribute thus breaks this method.\nSubclasses of this class and `Model` should _not_ provide a\n`FSM_DEFAULT_STATE` attribute, instead relying on the field\ndefinition to provide this default in the usual way.\n\n*Method `BaseTask.as_dot(self, name=None, **kw)`*:\nReturn a DOT syntax digraph starting at this `Task`.\nParameters are as for `Task.tasks_as_dot`.\n\n*Method `BaseTask.dot_node_label(self)`*:\nThe default DOT node label.\n\n*Method `BaseTask.tasks_as_dot(tasks, name=None, *, follow_blocking=False, sep=None)`*:\nReturn a DOT syntax digraph of the iterable `tasks`.\n Nodes will be coloured according to `DOT_NODE_FILLCOLOR_PALETTE`\n based on their state.\n\n Parameters:\n * `tasks`: an iterable of `Task`s to populate the graph\n * `name`: optional graph name\n * `follow_blocking`: optional flag to follow each `Task`'s\n `.blocking` attribute recursively and also render those\n `Task`s\n * `sep`: optional node seprator, default `'\n'`\n\n*Method `BaseTask.tasks_as_svg(tasks, name=None, **kw)`*:\nReturn an SVG diagram of the iterable `tasks`.\nThis takes the same parameters as `tasks_as_dot`.\n\n## Class `BlockedError(TaskError)`\n\nRaised by a blocked `Task` if attempted.\n\n## Function `main(argv)`\n\nDummy main programme to exercise something.\n\n## Function `make(*tasks, fail_fast=False, queue=None)`\n\nGenerator which completes all the supplied `tasks` by dispatching them\nonce they are no longer blocked.\nYield each task from `tasks` as it completes (or becomes cancelled).\n\nParameters:\n* `tasks`: `Task`s as positional parameters\n* `fail_fast`: default `False`; if true, cease evaluation as soon as a\n task completes in a state with is not `DONE`\n* `queue`: optional callable to submit a task for execution later\n via some queue such as `Later` or celery\n\nThe following rules are applied by this function:\n- if a task is being prepared, raise an `FSMError`\n- if a task is already running or queued, wait for its completion\n- if a task is pending:\n * if any prerequisite has failed, fail this task\n * if any prerequisite is cancelled, cancel this task\n * if any prerequisite is pending, make it first\n * if any prerequisite is not done, fail this task\n * otherwise dispatch this task and then yield it\n- if `fail_fast` and the task is not done, return\n\nExamples:\n\n >>> t1 = Task('t1', lambda: print('doing t1'), track=True)\n >>> t2 = t1.then('t2', lambda: print('doing t2'), track=True)\n >>> list(make(t2)) # doctest: +ELLIPSIS\n t1 PENDING->dispatch->RUNNING\n doing t1\n t1 RUNNING->done->DONE\n t2 PENDING->dispatch->RUNNING\n doing t2\n t2 RUNNING->done->DONE\n [Task('t2',<function <lambda> at ...>,state='DONE')]\n\n## Function `make_later(L, *tasks, fail_fast=False)`\n\nDispatch the `tasks` via `L:Later` for asynchronous execution\nif it is not already completed.\nThe caller can wait on `t.result` for completion.\n\nThis calls `make_now()` in a thread and uses `L.defer` to\nqueue the task and its prerequisites for execution.\n\n## Function `make_now(*tasks, fail_fast=False, queue=None)`\n\nRun the generator `make(*tasks)` to completion and return the\nlist of completed tasks.\n\n## Class `Task(BaseTask, cs.threads.HasThreadState)`\n\nA task which may require the completion of other tasks.\n\nThe model here may not be quite as expected; it is aimed at\ntasks which can be repaired and rerun.\nAs such, if `self.run(func,...)` raises an exception from\n`func` then this `Task` will still block dependent `Task`s.\nDually, a `Task` which completes without an exception is\nconsidered complete and does not block dependent `Task`s.\n\nKeyword parameters:\n* `cancel_on_exception`: if true, cancel this `Task` if `.run`\n raises an exception; the default is `False`, allowing repair\n and retry\n* `cancel_on_result`: optional callable to test the `Task.result`\n after `.run`; if the callable returns `True` the `Task` is marked\n as cancelled, allowing repair and retry\n* `func`: the function to call to complete the `Task`;\n it will be called as `func(*func_args,**func_kwargs)`\n* `func_args`: optional positional arguments, default `()`\n* `func_kwargs`: optional keyword arguments, default `{}`\n* `lock`: optional lock, default an `RLock`\n* `state`: initial state, default from `self._state.initial_state`,\n which is initally '`PENDING`'\n* `track`: default `False`;\n if `True` then apply a callback for all states to print task transitions;\n otherwise it should be a callback function suitable for `FSM.fsm_callback`\nOther arguments are passed to the `Result` initialiser.\n\nExample:\n\n t1 = Task(name=\"task1\")\n t1.bg(time.sleep, 10)\n t2 = Task(name=\"task2\")\n # prevent t2 from running until t1 completes\n t2.require(t1)\n # try to run sleep(5) for t2 immediately after t1 completes\n t1.notify(t2.call, sleep, 5)\n\nUsers wanting more immediate semantics can supply\n`cancel_on_exception` and/or `cancel_on_result` to control\nthese behaviours.\n\nExample:\n\n t1 = Task(name=\"task1\")\n t1.bg(time.sleep, 2)\n t2 = Task(name=\"task2\")\n # prevent t2 from running until t1 completes\n t2.require(t1)\n # try to run sleep(5) for t2 immediately after t1 completes\n t1.notify(t2.call, sleep, 5)\n\n*Method `Task.__call__(self)`*:\nBlock on `self.result` awaiting completion\nby calling `self.result()`.\n\n*Method `Task.bg(self)`*:\nDispatch a function to complete the `Task` in a separate `Thread`,\nreturning the `Thread`.\nThis raises `BlockedError` for a blocked task.\notherwise the thread runs `self.dispatch()`.\n\n*Method `Task.block(self, otask)`*:\nBlock another task until we are complete.\nThe converse of `.require()`.\n\n*Method `Task.blockers(self)`*:\nA generator yielding tasks from `self.required`\nwhich should block this task.\nAborted tasks are not blockers\nbut if we encounter one we do abort the current task.\n\n*Method `Task.cancel(self)`*:\nTransition this `Task` to `CANCELLED` state.\nIf the task is running, set `.cancelled` on the `RunState`,\nallowing clean task cancellation and subsequent transition\n(mediated by the `.run()` method).\nOtherwise fire the `'cancel'` event directly.\n\n*Method `Task.dispatch(self)`*:\nDispatch the `Task`:\nIf the task is blocked, raise `BlockedError`.\nIf a prerequisite is aborted, fire the 'abort' method.\nOtherwise fire the `'dispatch'` event and then run the\ntask's function via the `.run()` method.\n\n*Method `Task.isblocked(self)`*:\nA task is blocked if any prerequisite is not complete.\n\n*Method `Task.iscompleted(self)`*:\nThis task is completed (even if failed) and does not block contingent tasks.\n\n*Method `Task.join(self)`*:\nWait for this task to complete.\n\n*Method `Task.make(self, fail_fast=False)`*:\nComplete `self` and its prerequisites.\nThis calls the global `make()` function with `self`.\nIt returns a Boolean indicating whether this task completed.\n\n*`Task.perthread_state`*\n\n*Method `Task.require(self, otask: 'TaskSubType')`*:\nAdd a requirement that `otask` be complete before we proceed.\nThis is the simple `Task` only version of `.then()`.\n\n*Method `Task.run(self)`*:\nRun the function associated with this task,\ncompleting the `self.result` `Result` appropriately when finished.\n\n*WARNING*: this _ignores_ the current state and any blocking `Task`s.\nYou should usually use `dispatch` or `make`.\n\nDuring the run the thread local `Task.default()`\nwill be `self` and the `self.runstate` will be running.\n\nOtherwise run `func_result=self.func(*self.func_args,**self.func_kwargs)`\nwith the following effects:\n* if the function raises a `CancellationError`, cancel the `Task`\n* if the function raises another exception,\n if `self.cancel_on_exception` then cancel the task\n else complete `self.result` with the exception\n and fire the `'error'` `event\n* if `self.runstate.canceled` or `self.cancel_on_result`\n was provided and `self.cancel_on_result(func_result)` is\n true, cancel the task\n* otherwise complete `self.result` with `func_result`\n and fire the `'done'` event\n\n*Method `Task.then(self, func: Union[str, Callable, ForwardRef('TaskSubType')], *a, func_args=(), func_kwargs=None, **task_kw)`*:\nPrepare a new `Task` or function which may not run before `self` completes.\nThis may be called in two ways:\n- `task.then(some_Task): block the `Task` instance `some_Task` behind `self`\n- `task.then([name,]func[,func_args=][,func_kwargs=][,Task_kwargs...]):\n make a new `Task` to be blocked behind `self`\nReturn the new `Task`.\n\nThis supports preparing a chain of actions:\n\n >>> t_root = Task(\"t_root\", lambda: 0)\n >>> t_leaf = t_root.then(lambda: 1).then(lambda: 2)\n >>> t_root.iscompleted() # the root task has not yet run\n False\n >>> t_leaf.iscompleted() # the final task has not yet run\n False\n >>> # t_leaf is blocked by t_root\n >>> t_leaf.dispatch() # doctest: +ELLIPSIS\n Traceback (most recent call last):\n ...\n cs.taskqueue.BlockedError: ...\n >>> t_leaf.make() # make the leaf, but make t_root first\n True\n >>> t_root.iscompleted() # implicitly completed by make\n True\n >>> t_leaf.iscompleted()\n True\n\n## Class `TaskError(cs.fsm.FSMError)`\n\nRaised by `Task` related errors.\n\n## Class `TaskQueue`\n\nA task queue for managing and running a set of related tasks.\n\nUnlike `make` and `Task.make`, this is aimed at a \"dispatch\" worker\nwhich dispatches individual tasks as required.\n\nExample 1, put 2 dependent tasks in a queue and run:\n\n >>> t1 = Task(\"t1\", lambda: print(\"t1\"))\n >>> t2 = t1.then(\"t2\", lambda: print(\"t2\"))\n >>> q = TaskQueue(t1, t2)\n >>> for _ in q.run(): pass\n ...\n t1\n t2\n\nExample 2, put 1 task in a queue and run.\nThe queue only runs the specified tasks:\n\n >>> t1 = Task(\"t1\", lambda: print(\"t1\"))\n >>> t2 = t1.then(\"t2\", lambda: print(\"t2\"))\n >>> q = TaskQueue(t1)\n >>> for _ in q.run(): pass\n ...\n t1\n\nExample 2, put 1 task in a queue with `run_dependent_tasks=True` and run.\nThe queue pulls in the dependencies of completed tasks and also runs those:\n\n >>> t1 = Task(\"t1\", lambda: print(\"t1\"))\n >>> t2 = t1.then(\"t2\", lambda: print(\"t2\"))\n >>> q = TaskQueue(t1, run_dependent_tasks=True)\n >>> for _ in q.run(): pass\n ...\n t1\n t2\n\n*Method `TaskQueue.__init__(self, *tasks, run_dependent_tasks=False)`*:\nInitialise the queue with the supplied `tasks`.\n\n*Method `TaskQueue.add(self, task)`*:\nAdd a task to the tasks managed by this queue.\n\n*Method `TaskQueue.as_dot(self, name=None, **kw)`*:\nCompute a DOT syntax graph description of the tasks in the queue.\n\n*Method `TaskQueue.get(self)`*:\nPull a completed or an unblocked pending task from the queue.\nReturn the task or `None` if nothing is available.\n\nThe returned task is no longer tracked by this queue.\n\n*Method `TaskQueue.run(self, runstate=None, once=False)`*:\nProcess tasks in the queue until the queue has no completed tasks,\nyielding each task, immediately if `task.iscompleted()`\notherwise after `taks.dispatch()`.\n\nAn optional `RunState` may be provided to allow early termination\nvia `runstate.cancel()`.\n\nAn incomplete task is `dispatch`ed before `yield`;\nideally it will be complete when the yield happens,\nbut its semantics might mean it is in another state such as `CANCELLED`.\nThe consumer of `run` must handle these situations.\n\n# Release Log\n\n\n\n*Release 20240423*:\nSmall fixes.\n\n*Release 20230401*:\nAdd missing requirement to DISTINFO.\n\n*Release 20230331*:\n* Task: subclass BaseTask instead of (FSM, RunStateMixin).\n* BaseTask.__init__: use @uses_runstate to ensure we've got a RunState.\n\n*Release 20230217*:\nTask: subclass HasThreadState, drop .current_task() class method.\n\n*Release 20221207*:\n* Pull out core stuff from Task into BaseTask, aids subclassing.\n* BaseTask: explainatory docustring about unusual FSM_DEFAULT_STATE design choice.\n* BaseTask.tasks_as_dot: express the edges using the node ids instead of their labels.\n* BaseTask: new tasks_as_svg() method like tasks_as_dot() but returning SVG.\n\n*Release 20220805*:\nInitial PyPI release.\n\n",
"bugtrack_url": null,
"license": "GNU General Public License v3 or later (GPLv3+)",
"summary": "A general purpose Task and TaskQueue for running tasks with dependencies and failure/retry, potentially in parallel.",
"version": "20240423",
"project_urls": {
"URL": "https://bitbucket.org/cameron_simpson/css/commits/all"
},
"split_keywords": [
"python3"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "aa8dc75fe44f3cb1502057adca3539fd9b6bc4bc05e8e61eaf64749fd5a959cf",
"md5": "4678d6d11eb858684870b8ce96d1e6cf",
"sha256": "c65d20abf95fffdec3c5e3672df2ce64835dd0cd7e9aaf5d015b9504805f07d4"
},
"downloads": -1,
"filename": "cs.taskqueue-20240423-py3-none-any.whl",
"has_sig": false,
"md5_digest": "4678d6d11eb858684870b8ce96d1e6cf",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": null,
"size": 15044,
"upload_time": "2024-04-23T09:29:36",
"upload_time_iso_8601": "2024-04-23T09:29:36.563500Z",
"url": "https://files.pythonhosted.org/packages/aa/8d/c75fe44f3cb1502057adca3539fd9b6bc4bc05e8e61eaf64749fd5a959cf/cs.taskqueue-20240423-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "f15d92a0bba4861b023fd90ccb13349791cd5b56588320ab3a67d8db5f2978e9",
"md5": "79a273b6c03ab31475dd1a26c8a70654",
"sha256": "b0123a558c22a844e012e0f2ea730460fa383f4391f044a0d2475b8464a32215"
},
"downloads": -1,
"filename": "cs.taskqueue-20240423.tar.gz",
"has_sig": false,
"md5_digest": "79a273b6c03ab31475dd1a26c8a70654",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 16242,
"upload_time": "2024-04-23T09:29:38",
"upload_time_iso_8601": "2024-04-23T09:29:38.158221Z",
"url": "https://files.pythonhosted.org/packages/f1/5d/92a0bba4861b023fd90ccb13349791cd5b56588320ab3a67d8db5f2978e9/cs.taskqueue-20240423.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-04-23 09:29:38",
"github": false,
"gitlab": false,
"bitbucket": true,
"codeberg": false,
"bitbucket_user": "cameron_simpson",
"bitbucket_project": "css",
"lcname": "cs.taskqueue"
}