cs.taskqueue


Namecs.taskqueue JSON
Version 20240423 PyPI version JSON
download
home_pageNone
SummaryA general purpose Task and TaskQueue for running tasks with dependencies and failure/retry, potentially in parallel.
upload_time2024-04-23 09:29:38
maintainerNone
docs_urlNone
authorNone
requires_pythonNone
licenseGNU General Public License v3 or later (GPLv3+)
keywords python3
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            A general purpose Task and TaskQueue for running tasks with
dependencies and failure/retry, potentially in parallel.

*Latest release 20240423*:
Small fixes.

## Class `BaseTask(cs.fsm.FSM, cs.resources.RunStateMixin)`

A base class subclassing `cs.fsm.FSM` with a `RunStateMixin`.

Note that this class and the `FSM` base class does not provide
a `FSM_DEFAULT_STATE` attribute; a default `state` value of
`None` will leave `.fsm_state` _unset_.

This behaviour is is chosen mostly to support subclasses
with unusual behaviour, particularly Django's `Model` class
whose `refresh_from_db` method seems to not refresh fields
which already exist, and setting `.fsm_state` from a
`FSM_DEFAULT_STATE` class attribute thus breaks this method.
Subclasses of this class and `Model` should _not_ provide a
`FSM_DEFAULT_STATE` attribute, instead relying on the field
definition to provide this default in the usual way.

*Method `BaseTask.as_dot(self, name=None, **kw)`*:
Return a DOT syntax digraph starting at this `Task`.
Parameters are as for `Task.tasks_as_dot`.

*Method `BaseTask.dot_node_label(self)`*:
The default DOT node label.

*Method `BaseTask.tasks_as_dot(tasks, name=None, *, follow_blocking=False, sep=None)`*:
Return a DOT syntax digraph of the iterable `tasks`.
        Nodes will be coloured according to `DOT_NODE_FILLCOLOR_PALETTE`
        based on their state.

        Parameters:
        * `tasks`: an iterable of `Task`s to populate the graph
        * `name`: optional graph name
        * `follow_blocking`: optional flag to follow each `Task`'s
          `.blocking` attribute recursively and also render those
          `Task`s
        * `sep`: optional node seprator, default `'
'`

*Method `BaseTask.tasks_as_svg(tasks, name=None, **kw)`*:
Return an SVG diagram of the iterable `tasks`.
This takes the same parameters as `tasks_as_dot`.

## Class `BlockedError(TaskError)`

Raised by a blocked `Task` if attempted.

## Function `main(argv)`

Dummy main programme to exercise something.

## Function `make(*tasks, fail_fast=False, queue=None)`

Generator which completes all the supplied `tasks` by dispatching them
once they are no longer blocked.
Yield each task from `tasks` as it completes (or becomes cancelled).

Parameters:
* `tasks`: `Task`s as positional parameters
* `fail_fast`: default `False`; if true, cease evaluation as soon as a
  task completes in a state with is not `DONE`
* `queue`: optional callable to submit a task for execution later
  via some queue such as `Later` or celery

The following rules are applied by this function:
- if a task is being prepared, raise an `FSMError`
- if a task is already running or queued, wait for its completion
- if a task is pending:
  * if any prerequisite has failed, fail this task
  * if any prerequisite is cancelled, cancel this task
  * if any prerequisite is pending, make it first
  * if any prerequisite is not done, fail this task
  * otherwise dispatch this task and then yield it
- if `fail_fast` and the task is not done, return

Examples:

    >>> t1 = Task('t1', lambda: print('doing t1'), track=True)
    >>> t2 = t1.then('t2', lambda: print('doing t2'), track=True)
    >>> list(make(t2))    # doctest: +ELLIPSIS
    t1 PENDING->dispatch->RUNNING
    doing t1
    t1 RUNNING->done->DONE
    t2 PENDING->dispatch->RUNNING
    doing t2
    t2 RUNNING->done->DONE
    [Task('t2',<function <lambda> at ...>,state='DONE')]

## Function `make_later(L, *tasks, fail_fast=False)`

Dispatch the `tasks` via `L:Later` for asynchronous execution
if it is not already completed.
The caller can wait on `t.result` for completion.

This calls `make_now()` in a thread and uses `L.defer` to
queue the task and its prerequisites for execution.

## Function `make_now(*tasks, fail_fast=False, queue=None)`

Run the generator `make(*tasks)` to completion and return the
list of completed tasks.

## Class `Task(BaseTask, cs.threads.HasThreadState)`

A task which may require the completion of other tasks.

The model here may not be quite as expected; it is aimed at
tasks which can be repaired and rerun.
As such, if `self.run(func,...)` raises an exception from
`func` then this `Task` will still block dependent `Task`s.
Dually, a `Task` which completes without an exception is
considered complete and does not block dependent `Task`s.

Keyword parameters:
* `cancel_on_exception`: if true, cancel this `Task` if `.run`
  raises an exception; the default is `False`, allowing repair
  and retry
* `cancel_on_result`: optional callable to test the `Task.result`
  after `.run`; if the callable returns `True` the `Task` is marked
  as cancelled, allowing repair and retry
* `func`: the function to call to complete the `Task`;
  it will be called as `func(*func_args,**func_kwargs)`
* `func_args`: optional positional arguments, default `()`
* `func_kwargs`: optional keyword arguments, default `{}`
* `lock`: optional lock, default an `RLock`
* `state`: initial state, default from `self._state.initial_state`,
  which is initally '`PENDING`'
* `track`: default `False`;
  if `True` then apply a callback for all states to print task transitions;
  otherwise it should be a callback function suitable for `FSM.fsm_callback`
Other arguments are passed to the `Result` initialiser.

Example:

    t1 = Task(name="task1")
    t1.bg(time.sleep, 10)
    t2 = Task(name="task2")
    # prevent t2 from running until t1 completes
    t2.require(t1)
    # try to run sleep(5) for t2 immediately after t1 completes
    t1.notify(t2.call, sleep, 5)

Users wanting more immediate semantics can supply
`cancel_on_exception` and/or `cancel_on_result` to control
these behaviours.

Example:

    t1 = Task(name="task1")
    t1.bg(time.sleep, 2)
    t2 = Task(name="task2")
    # prevent t2 from running until t1 completes
    t2.require(t1)
    # try to run sleep(5) for t2 immediately after t1 completes
    t1.notify(t2.call, sleep, 5)

*Method `Task.__call__(self)`*:
Block on `self.result` awaiting completion
by calling `self.result()`.

*Method `Task.bg(self)`*:
Dispatch a function to complete the `Task` in a separate `Thread`,
returning the `Thread`.
This raises `BlockedError` for a blocked task.
otherwise the thread runs `self.dispatch()`.

*Method `Task.block(self, otask)`*:
Block another task until we are complete.
The converse of `.require()`.

*Method `Task.blockers(self)`*:
A generator yielding tasks from `self.required`
which should block this task.
Aborted tasks are not blockers
but if we encounter one we do abort the current task.

*Method `Task.cancel(self)`*:
Transition this `Task` to `CANCELLED` state.
If the task is running, set `.cancelled` on the `RunState`,
allowing clean task cancellation and subsequent transition
(mediated by the `.run()` method).
Otherwise fire the `'cancel'` event directly.

*Method `Task.dispatch(self)`*:
Dispatch the `Task`:
If the task is blocked, raise `BlockedError`.
If a prerequisite is aborted, fire the 'abort' method.
Otherwise fire the `'dispatch'` event and then run the
task's function via the `.run()` method.

*Method `Task.isblocked(self)`*:
A task is blocked if any prerequisite is not complete.

*Method `Task.iscompleted(self)`*:
This task is completed (even if failed) and does not block contingent tasks.

*Method `Task.join(self)`*:
Wait for this task to complete.

*Method `Task.make(self, fail_fast=False)`*:
Complete `self` and its prerequisites.
This calls the global `make()` function with `self`.
It returns a Boolean indicating whether this task completed.

*`Task.perthread_state`*

*Method `Task.require(self, otask: 'TaskSubType')`*:
Add a requirement that `otask` be complete before we proceed.
This is the simple `Task` only version of `.then()`.

*Method `Task.run(self)`*:
Run the function associated with this task,
completing the `self.result` `Result` appropriately when finished.

*WARNING*: this _ignores_ the current state and any blocking `Task`s.
You should usually use `dispatch` or `make`.

During the run the thread local `Task.default()`
will be `self` and the `self.runstate` will be running.

Otherwise run `func_result=self.func(*self.func_args,**self.func_kwargs)`
with the following effects:
* if the function raises a `CancellationError`, cancel the `Task`
* if the function raises another exception,
  if `self.cancel_on_exception` then cancel the task
  else complete `self.result` with the exception
  and fire the `'error'` `event
* if `self.runstate.canceled` or `self.cancel_on_result`
  was provided and `self.cancel_on_result(func_result)` is
  true, cancel the task
* otherwise complete `self.result` with `func_result`
  and fire the `'done'` event

*Method `Task.then(self, func: Union[str, Callable, ForwardRef('TaskSubType')], *a, func_args=(), func_kwargs=None, **task_kw)`*:
Prepare a new `Task` or function which may not run before `self` completes.
This may be called in two ways:
- `task.then(some_Task): block the `Task` instance `some_Task` behind `self`
- `task.then([name,]func[,func_args=][,func_kwargs=][,Task_kwargs...]):
  make a new `Task` to be blocked behind `self`
Return the new `Task`.

This supports preparing a chain of actions:

    >>> t_root = Task("t_root", lambda: 0)
    >>> t_leaf = t_root.then(lambda: 1).then(lambda: 2)
    >>> t_root.iscompleted()   # the root task has not yet run
    False
    >>> t_leaf.iscompleted()   # the final task has not yet run
    False
    >>> # t_leaf is blocked by t_root
    >>> t_leaf.dispatch()      # doctest: +ELLIPSIS
    Traceback (most recent call last):
      ...
    cs.taskqueue.BlockedError: ...
    >>> t_leaf.make()          # make the leaf, but make t_root first
    True
    >>> t_root.iscompleted()   # implicitly completed by make
    True
    >>> t_leaf.iscompleted()
    True

## Class `TaskError(cs.fsm.FSMError)`

Raised by `Task` related errors.

## Class `TaskQueue`

A task queue for managing and running a set of related tasks.

Unlike `make` and `Task.make`, this is aimed at a "dispatch" worker
which dispatches individual tasks as required.

Example 1, put 2 dependent tasks in a queue and run:

     >>> t1 = Task("t1", lambda: print("t1"))
     >>> t2 = t1.then("t2", lambda: print("t2"))
     >>> q = TaskQueue(t1, t2)
     >>> for _ in q.run(): pass
     ...
     t1
     t2

Example 2, put 1 task in a queue and run.
The queue only runs the specified tasks:

     >>> t1 = Task("t1", lambda: print("t1"))
     >>> t2 = t1.then("t2", lambda: print("t2"))
     >>> q = TaskQueue(t1)
     >>> for _ in q.run(): pass
     ...
     t1

Example 2, put 1 task in a queue with `run_dependent_tasks=True` and run.
The queue pulls in the dependencies of completed tasks and also runs those:

     >>> t1 = Task("t1", lambda: print("t1"))
     >>> t2 = t1.then("t2", lambda: print("t2"))
     >>> q = TaskQueue(t1, run_dependent_tasks=True)
     >>> for _ in q.run(): pass
     ...
     t1
     t2

*Method `TaskQueue.__init__(self, *tasks, run_dependent_tasks=False)`*:
Initialise the queue with the supplied `tasks`.

*Method `TaskQueue.add(self, task)`*:
Add a task to the tasks managed by this queue.

*Method `TaskQueue.as_dot(self, name=None, **kw)`*:
Compute a DOT syntax graph description of the tasks in the queue.

*Method `TaskQueue.get(self)`*:
Pull a completed or an unblocked pending task from the queue.
Return the task or `None` if nothing is available.

The returned task is no longer tracked by this queue.

*Method `TaskQueue.run(self, runstate=None, once=False)`*:
Process tasks in the queue until the queue has no completed tasks,
yielding each task, immediately if `task.iscompleted()`
otherwise after `taks.dispatch()`.

An optional `RunState` may be provided to allow early termination
via `runstate.cancel()`.

An incomplete task is `dispatch`ed before `yield`;
ideally it will be complete when the yield happens,
but its semantics might mean it is in another state such as `CANCELLED`.
The consumer of `run` must handle these situations.

# Release Log



*Release 20240423*:
Small fixes.

*Release 20230401*:
Add missing requirement to DISTINFO.

*Release 20230331*:
* Task: subclass BaseTask instead of (FSM, RunStateMixin).
* BaseTask.__init__: use @uses_runstate to ensure we've got a RunState.

*Release 20230217*:
Task: subclass HasThreadState, drop .current_task() class method.

*Release 20221207*:
* Pull out core stuff from Task into BaseTask, aids subclassing.
* BaseTask: explainatory docustring about unusual FSM_DEFAULT_STATE design choice.
* BaseTask.tasks_as_dot: express the edges using the node ids instead of their labels.
* BaseTask: new tasks_as_svg() method like tasks_as_dot() but returning SVG.

*Release 20220805*:
Initial PyPI release.


            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "cs.taskqueue",
    "maintainer": null,
    "docs_url": null,
    "requires_python": null,
    "maintainer_email": null,
    "keywords": "python3",
    "author": null,
    "author_email": "Cameron Simpson <cs@cskk.id.au>",
    "download_url": "https://files.pythonhosted.org/packages/f1/5d/92a0bba4861b023fd90ccb13349791cd5b56588320ab3a67d8db5f2978e9/cs.taskqueue-20240423.tar.gz",
    "platform": null,
    "description": "A general purpose Task and TaskQueue for running tasks with\ndependencies and failure/retry, potentially in parallel.\n\n*Latest release 20240423*:\nSmall fixes.\n\n## Class `BaseTask(cs.fsm.FSM, cs.resources.RunStateMixin)`\n\nA base class subclassing `cs.fsm.FSM` with a `RunStateMixin`.\n\nNote that this class and the `FSM` base class does not provide\na `FSM_DEFAULT_STATE` attribute; a default `state` value of\n`None` will leave `.fsm_state` _unset_.\n\nThis behaviour is is chosen mostly to support subclasses\nwith unusual behaviour, particularly Django's `Model` class\nwhose `refresh_from_db` method seems to not refresh fields\nwhich already exist, and setting `.fsm_state` from a\n`FSM_DEFAULT_STATE` class attribute thus breaks this method.\nSubclasses of this class and `Model` should _not_ provide a\n`FSM_DEFAULT_STATE` attribute, instead relying on the field\ndefinition to provide this default in the usual way.\n\n*Method `BaseTask.as_dot(self, name=None, **kw)`*:\nReturn a DOT syntax digraph starting at this `Task`.\nParameters are as for `Task.tasks_as_dot`.\n\n*Method `BaseTask.dot_node_label(self)`*:\nThe default DOT node label.\n\n*Method `BaseTask.tasks_as_dot(tasks, name=None, *, follow_blocking=False, sep=None)`*:\nReturn a DOT syntax digraph of the iterable `tasks`.\n        Nodes will be coloured according to `DOT_NODE_FILLCOLOR_PALETTE`\n        based on their state.\n\n        Parameters:\n        * `tasks`: an iterable of `Task`s to populate the graph\n        * `name`: optional graph name\n        * `follow_blocking`: optional flag to follow each `Task`'s\n          `.blocking` attribute recursively and also render those\n          `Task`s\n        * `sep`: optional node seprator, default `'\n'`\n\n*Method `BaseTask.tasks_as_svg(tasks, name=None, **kw)`*:\nReturn an SVG diagram of the iterable `tasks`.\nThis takes the same parameters as `tasks_as_dot`.\n\n## Class `BlockedError(TaskError)`\n\nRaised by a blocked `Task` if attempted.\n\n## Function `main(argv)`\n\nDummy main programme to exercise something.\n\n## Function `make(*tasks, fail_fast=False, queue=None)`\n\nGenerator which completes all the supplied `tasks` by dispatching them\nonce they are no longer blocked.\nYield each task from `tasks` as it completes (or becomes cancelled).\n\nParameters:\n* `tasks`: `Task`s as positional parameters\n* `fail_fast`: default `False`; if true, cease evaluation as soon as a\n  task completes in a state with is not `DONE`\n* `queue`: optional callable to submit a task for execution later\n  via some queue such as `Later` or celery\n\nThe following rules are applied by this function:\n- if a task is being prepared, raise an `FSMError`\n- if a task is already running or queued, wait for its completion\n- if a task is pending:\n  * if any prerequisite has failed, fail this task\n  * if any prerequisite is cancelled, cancel this task\n  * if any prerequisite is pending, make it first\n  * if any prerequisite is not done, fail this task\n  * otherwise dispatch this task and then yield it\n- if `fail_fast` and the task is not done, return\n\nExamples:\n\n    >>> t1 = Task('t1', lambda: print('doing t1'), track=True)\n    >>> t2 = t1.then('t2', lambda: print('doing t2'), track=True)\n    >>> list(make(t2))    # doctest: +ELLIPSIS\n    t1 PENDING->dispatch->RUNNING\n    doing t1\n    t1 RUNNING->done->DONE\n    t2 PENDING->dispatch->RUNNING\n    doing t2\n    t2 RUNNING->done->DONE\n    [Task('t2',<function <lambda> at ...>,state='DONE')]\n\n## Function `make_later(L, *tasks, fail_fast=False)`\n\nDispatch the `tasks` via `L:Later` for asynchronous execution\nif it is not already completed.\nThe caller can wait on `t.result` for completion.\n\nThis calls `make_now()` in a thread and uses `L.defer` to\nqueue the task and its prerequisites for execution.\n\n## Function `make_now(*tasks, fail_fast=False, queue=None)`\n\nRun the generator `make(*tasks)` to completion and return the\nlist of completed tasks.\n\n## Class `Task(BaseTask, cs.threads.HasThreadState)`\n\nA task which may require the completion of other tasks.\n\nThe model here may not be quite as expected; it is aimed at\ntasks which can be repaired and rerun.\nAs such, if `self.run(func,...)` raises an exception from\n`func` then this `Task` will still block dependent `Task`s.\nDually, a `Task` which completes without an exception is\nconsidered complete and does not block dependent `Task`s.\n\nKeyword parameters:\n* `cancel_on_exception`: if true, cancel this `Task` if `.run`\n  raises an exception; the default is `False`, allowing repair\n  and retry\n* `cancel_on_result`: optional callable to test the `Task.result`\n  after `.run`; if the callable returns `True` the `Task` is marked\n  as cancelled, allowing repair and retry\n* `func`: the function to call to complete the `Task`;\n  it will be called as `func(*func_args,**func_kwargs)`\n* `func_args`: optional positional arguments, default `()`\n* `func_kwargs`: optional keyword arguments, default `{}`\n* `lock`: optional lock, default an `RLock`\n* `state`: initial state, default from `self._state.initial_state`,\n  which is initally '`PENDING`'\n* `track`: default `False`;\n  if `True` then apply a callback for all states to print task transitions;\n  otherwise it should be a callback function suitable for `FSM.fsm_callback`\nOther arguments are passed to the `Result` initialiser.\n\nExample:\n\n    t1 = Task(name=\"task1\")\n    t1.bg(time.sleep, 10)\n    t2 = Task(name=\"task2\")\n    # prevent t2 from running until t1 completes\n    t2.require(t1)\n    # try to run sleep(5) for t2 immediately after t1 completes\n    t1.notify(t2.call, sleep, 5)\n\nUsers wanting more immediate semantics can supply\n`cancel_on_exception` and/or `cancel_on_result` to control\nthese behaviours.\n\nExample:\n\n    t1 = Task(name=\"task1\")\n    t1.bg(time.sleep, 2)\n    t2 = Task(name=\"task2\")\n    # prevent t2 from running until t1 completes\n    t2.require(t1)\n    # try to run sleep(5) for t2 immediately after t1 completes\n    t1.notify(t2.call, sleep, 5)\n\n*Method `Task.__call__(self)`*:\nBlock on `self.result` awaiting completion\nby calling `self.result()`.\n\n*Method `Task.bg(self)`*:\nDispatch a function to complete the `Task` in a separate `Thread`,\nreturning the `Thread`.\nThis raises `BlockedError` for a blocked task.\notherwise the thread runs `self.dispatch()`.\n\n*Method `Task.block(self, otask)`*:\nBlock another task until we are complete.\nThe converse of `.require()`.\n\n*Method `Task.blockers(self)`*:\nA generator yielding tasks from `self.required`\nwhich should block this task.\nAborted tasks are not blockers\nbut if we encounter one we do abort the current task.\n\n*Method `Task.cancel(self)`*:\nTransition this `Task` to `CANCELLED` state.\nIf the task is running, set `.cancelled` on the `RunState`,\nallowing clean task cancellation and subsequent transition\n(mediated by the `.run()` method).\nOtherwise fire the `'cancel'` event directly.\n\n*Method `Task.dispatch(self)`*:\nDispatch the `Task`:\nIf the task is blocked, raise `BlockedError`.\nIf a prerequisite is aborted, fire the 'abort' method.\nOtherwise fire the `'dispatch'` event and then run the\ntask's function via the `.run()` method.\n\n*Method `Task.isblocked(self)`*:\nA task is blocked if any prerequisite is not complete.\n\n*Method `Task.iscompleted(self)`*:\nThis task is completed (even if failed) and does not block contingent tasks.\n\n*Method `Task.join(self)`*:\nWait for this task to complete.\n\n*Method `Task.make(self, fail_fast=False)`*:\nComplete `self` and its prerequisites.\nThis calls the global `make()` function with `self`.\nIt returns a Boolean indicating whether this task completed.\n\n*`Task.perthread_state`*\n\n*Method `Task.require(self, otask: 'TaskSubType')`*:\nAdd a requirement that `otask` be complete before we proceed.\nThis is the simple `Task` only version of `.then()`.\n\n*Method `Task.run(self)`*:\nRun the function associated with this task,\ncompleting the `self.result` `Result` appropriately when finished.\n\n*WARNING*: this _ignores_ the current state and any blocking `Task`s.\nYou should usually use `dispatch` or `make`.\n\nDuring the run the thread local `Task.default()`\nwill be `self` and the `self.runstate` will be running.\n\nOtherwise run `func_result=self.func(*self.func_args,**self.func_kwargs)`\nwith the following effects:\n* if the function raises a `CancellationError`, cancel the `Task`\n* if the function raises another exception,\n  if `self.cancel_on_exception` then cancel the task\n  else complete `self.result` with the exception\n  and fire the `'error'` `event\n* if `self.runstate.canceled` or `self.cancel_on_result`\n  was provided and `self.cancel_on_result(func_result)` is\n  true, cancel the task\n* otherwise complete `self.result` with `func_result`\n  and fire the `'done'` event\n\n*Method `Task.then(self, func: Union[str, Callable, ForwardRef('TaskSubType')], *a, func_args=(), func_kwargs=None, **task_kw)`*:\nPrepare a new `Task` or function which may not run before `self` completes.\nThis may be called in two ways:\n- `task.then(some_Task): block the `Task` instance `some_Task` behind `self`\n- `task.then([name,]func[,func_args=][,func_kwargs=][,Task_kwargs...]):\n  make a new `Task` to be blocked behind `self`\nReturn the new `Task`.\n\nThis supports preparing a chain of actions:\n\n    >>> t_root = Task(\"t_root\", lambda: 0)\n    >>> t_leaf = t_root.then(lambda: 1).then(lambda: 2)\n    >>> t_root.iscompleted()   # the root task has not yet run\n    False\n    >>> t_leaf.iscompleted()   # the final task has not yet run\n    False\n    >>> # t_leaf is blocked by t_root\n    >>> t_leaf.dispatch()      # doctest: +ELLIPSIS\n    Traceback (most recent call last):\n      ...\n    cs.taskqueue.BlockedError: ...\n    >>> t_leaf.make()          # make the leaf, but make t_root first\n    True\n    >>> t_root.iscompleted()   # implicitly completed by make\n    True\n    >>> t_leaf.iscompleted()\n    True\n\n## Class `TaskError(cs.fsm.FSMError)`\n\nRaised by `Task` related errors.\n\n## Class `TaskQueue`\n\nA task queue for managing and running a set of related tasks.\n\nUnlike `make` and `Task.make`, this is aimed at a \"dispatch\" worker\nwhich dispatches individual tasks as required.\n\nExample 1, put 2 dependent tasks in a queue and run:\n\n     >>> t1 = Task(\"t1\", lambda: print(\"t1\"))\n     >>> t2 = t1.then(\"t2\", lambda: print(\"t2\"))\n     >>> q = TaskQueue(t1, t2)\n     >>> for _ in q.run(): pass\n     ...\n     t1\n     t2\n\nExample 2, put 1 task in a queue and run.\nThe queue only runs the specified tasks:\n\n     >>> t1 = Task(\"t1\", lambda: print(\"t1\"))\n     >>> t2 = t1.then(\"t2\", lambda: print(\"t2\"))\n     >>> q = TaskQueue(t1)\n     >>> for _ in q.run(): pass\n     ...\n     t1\n\nExample 2, put 1 task in a queue with `run_dependent_tasks=True` and run.\nThe queue pulls in the dependencies of completed tasks and also runs those:\n\n     >>> t1 = Task(\"t1\", lambda: print(\"t1\"))\n     >>> t2 = t1.then(\"t2\", lambda: print(\"t2\"))\n     >>> q = TaskQueue(t1, run_dependent_tasks=True)\n     >>> for _ in q.run(): pass\n     ...\n     t1\n     t2\n\n*Method `TaskQueue.__init__(self, *tasks, run_dependent_tasks=False)`*:\nInitialise the queue with the supplied `tasks`.\n\n*Method `TaskQueue.add(self, task)`*:\nAdd a task to the tasks managed by this queue.\n\n*Method `TaskQueue.as_dot(self, name=None, **kw)`*:\nCompute a DOT syntax graph description of the tasks in the queue.\n\n*Method `TaskQueue.get(self)`*:\nPull a completed or an unblocked pending task from the queue.\nReturn the task or `None` if nothing is available.\n\nThe returned task is no longer tracked by this queue.\n\n*Method `TaskQueue.run(self, runstate=None, once=False)`*:\nProcess tasks in the queue until the queue has no completed tasks,\nyielding each task, immediately if `task.iscompleted()`\notherwise after `taks.dispatch()`.\n\nAn optional `RunState` may be provided to allow early termination\nvia `runstate.cancel()`.\n\nAn incomplete task is `dispatch`ed before `yield`;\nideally it will be complete when the yield happens,\nbut its semantics might mean it is in another state such as `CANCELLED`.\nThe consumer of `run` must handle these situations.\n\n# Release Log\n\n\n\n*Release 20240423*:\nSmall fixes.\n\n*Release 20230401*:\nAdd missing requirement to DISTINFO.\n\n*Release 20230331*:\n* Task: subclass BaseTask instead of (FSM, RunStateMixin).\n* BaseTask.__init__: use @uses_runstate to ensure we've got a RunState.\n\n*Release 20230217*:\nTask: subclass HasThreadState, drop .current_task() class method.\n\n*Release 20221207*:\n* Pull out core stuff from Task into BaseTask, aids subclassing.\n* BaseTask: explainatory docustring about unusual FSM_DEFAULT_STATE design choice.\n* BaseTask.tasks_as_dot: express the edges using the node ids instead of their labels.\n* BaseTask: new tasks_as_svg() method like tasks_as_dot() but returning SVG.\n\n*Release 20220805*:\nInitial PyPI release.\n\n",
    "bugtrack_url": null,
    "license": "GNU General Public License v3 or later (GPLv3+)",
    "summary": "A general purpose Task and TaskQueue for running tasks with dependencies and failure/retry, potentially in parallel.",
    "version": "20240423",
    "project_urls": {
        "URL": "https://bitbucket.org/cameron_simpson/css/commits/all"
    },
    "split_keywords": [
        "python3"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "aa8dc75fe44f3cb1502057adca3539fd9b6bc4bc05e8e61eaf64749fd5a959cf",
                "md5": "4678d6d11eb858684870b8ce96d1e6cf",
                "sha256": "c65d20abf95fffdec3c5e3672df2ce64835dd0cd7e9aaf5d015b9504805f07d4"
            },
            "downloads": -1,
            "filename": "cs.taskqueue-20240423-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "4678d6d11eb858684870b8ce96d1e6cf",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": null,
            "size": 15044,
            "upload_time": "2024-04-23T09:29:36",
            "upload_time_iso_8601": "2024-04-23T09:29:36.563500Z",
            "url": "https://files.pythonhosted.org/packages/aa/8d/c75fe44f3cb1502057adca3539fd9b6bc4bc05e8e61eaf64749fd5a959cf/cs.taskqueue-20240423-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "f15d92a0bba4861b023fd90ccb13349791cd5b56588320ab3a67d8db5f2978e9",
                "md5": "79a273b6c03ab31475dd1a26c8a70654",
                "sha256": "b0123a558c22a844e012e0f2ea730460fa383f4391f044a0d2475b8464a32215"
            },
            "downloads": -1,
            "filename": "cs.taskqueue-20240423.tar.gz",
            "has_sig": false,
            "md5_digest": "79a273b6c03ab31475dd1a26c8a70654",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 16242,
            "upload_time": "2024-04-23T09:29:38",
            "upload_time_iso_8601": "2024-04-23T09:29:38.158221Z",
            "url": "https://files.pythonhosted.org/packages/f1/5d/92a0bba4861b023fd90ccb13349791cd5b56588320ab3a67d8db5f2978e9/cs.taskqueue-20240423.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-04-23 09:29:38",
    "github": false,
    "gitlab": false,
    "bitbucket": true,
    "codeberg": false,
    "bitbucket_user": "cameron_simpson",
    "bitbucket_project": "css",
    "lcname": "cs.taskqueue"
}
        
Elapsed time: 0.24318s