cs.later


Namecs.later JSON
Version 20240412 PyPI version JSON
download
home_pageNone
SummaryQueue functions for execution later in priority and time order.
upload_time2024-04-12 05:13:34
maintainerNone
docs_urlNone
authorNone
requires_pythonNone
licenseGNU General Public License v3 or later (GPLv3+)
keywords python3
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            Queue functions for execution later in priority and time order.

*Latest release 20240412*:
Later: new optional default=False parameter, set to true to have the Later push itself as the default for HasThreadStates.

I use `Later` objects for convenient queuing of functions whose
execution occurs later in a priority order with capacity constraints.

Why not futures?
I already had this before futures came out,
I prefer its naming scheme and interface,
and futures did not then support prioritised execution.

Use is simple enough: create a `Later` instance and typically queue
functions with the `.defer()` method::

    L = Later(4)      # a Later with a parallelism of 4
    ...
    LF = L.defer(func, *args, **kwargs)
    ...
    x = LF()          # collect result

The `.defer` method and its siblings return a `LateFunction`,
which is a subclass of `cs.result.Result`.
As such it is a callable,
so to collect the result you just call the `LateFunction`.

## Function `defer(func, *a, **kw)`

Queue a function using the current default Later.
Return the `LateFunction`.

## Class `LateFunction(cs.result.Result)`

State information about a pending function,
a subclass of `cs.result.Result`.

A `LateFunction` is callable,
so a synchronous call can be done like this:

    def func():
      return 3
    L = Later(4)
    LF = L.defer(func)
    x = LF()
    print(x)        # prints 3

Used this way, if the called function raises an exception it is visible:

    LF = L.defer()
    try:
      x = LF()
    except SomeException as e:
      # handle the exception ...

To avoid handling exceptions with try/except the .wait()
method should be used:

    LF = L.defer()
    x, exc_info = LF.wait()
    if exc_info:
      # handle exception
      exc_type, exc_value, exc_traceback = exc_info
      ...
    else:
      # use `x`, the function result

TODO: .cancel(), timeout for wait().

*Method `LateFunction.__init__(self, func, name=None, retry_delay=None)`*:
Initialise a `LateFunction`.

Parameters:
* `func` is the callable for later execution.
* `name`, if supplied, specifies an identifying name for the `LateFunction`.
* `retry_local`: time delay before retry of this function on RetryError.
  Default from `later.retry_delay`.

*Method `LateFunction.wait(self)`*:
Obsolete name for `.join`.

## Class `LatePool(cs.context.ContextManagerMixin)`

A context manager after the style of subprocess.Pool
but with deferred completion.

Example usage:

    L = Later(4)    # a 4 thread Later
    with LatePool(L) as LP:
      # several calls to LatePool.defer, perhaps looped
      LP.defer(func, *args, **kwargs)
      LP.defer(func, *args, **kwargs)
    # now we can LP.join() to block for all `LateFunctions`
    #
    # or iterate over LP to collect `LateFunction`s as they complete
    for LF in LP:
      result = LF()
      print(result)

*Method `LatePool.__init__(self, *, later: Optional[inspect._empty] = <function <lambda> at 0x10a2b0160>, priority=None, delay=None, when=None, pfx=None, block=False)`*:
Initialise the `LatePool`.

Parameters:
* `later`: optional `Later` instance, default from `Later.default()`
* `priority`, `delay`, `when`, `name`, `pfx`:
  default values passed to Later.submit.
* `block`: if true, wait for `LateFunction` completion
  before leaving `__exit__`.

*Method `LatePool.__enter_exit__(self)`*:
Generator supporting `__enter__` and `__exit__`.

*Method `LatePool.__iter__(self)`*:
Report completion of the `LateFunction`s.

*Method `LatePool.add(self, LF)`*:
Add a `LateFunction` to those to be tracked by this LatePool.

*Method `LatePool.defer(self, func, *a, **kw)`*:
Defer a function using the LatePool's default parameters.

*Method `LatePool.join(self)`*:
Wait for completion of all the `LateFunction`s.

*Method `LatePool.submit(self, func, **params)`*:
Submit a function using the LatePool's default parameters, overridden by `params`.

## Class `Later(cs.resources.MultiOpenMixin, cs.threads.HasThreadState)`

A management class to queue function calls for later execution.

Methods are provided for submitting functions to run ASAP or
after a delay or after other pending functions. These methods
return `LateFunction`s, a subclass of `cs.result.Result`.

A Later instance' close method closes the Later for further
submission.
Shutdown does not imply that all submitted functions have
completed or even been dispatched.
Callers may wait for completion and optionally cancel functions.

TODO: __enter__ returns a SubLater, __exit__ closes the SubLater.

TODO: drop global default Later.

*Method `Later.__init__(self, capacity, name=None, inboundCapacity=0, retry_delay=None, default=False)`*:
Initialise the Later instance.

Parameters:
* `capacity`: resource contraint on this Later; if an int, it is used
  to size a Semaphore to constrain the number of dispatched functions
  which may be in play at a time; if not an int it is presumed to be a
  suitable Semaphore-like object, perhaps shared with other subsystems.
* `name`: optional identifying name for this instance.
* `inboundCapacity`: if >0, used as a limit on the number of
  undispatched functions that may be queued up; the default is 0 (no
  limit).  Calls to submit functions when the inbound limit is reached
  block until some functions are dispatched.
* `retry_delay`: time delay for requeued functions.
  Default: `DEFAULT_RETRY_DELAY`.

*Method `Later.__call__(self, func, *a, **kw)`*:
A Later object can be called with a function and arguments
with the effect of deferring the function and waiting for
it to complete, returning its return value.

Example:

  def f(a):
    return a*2
  x = L(f, 3)   # x == 6

*Method `Later.__enter_exit__(self)`*:
Run both the inherited context managers.

*Method `Later.after(self, LFs, R, func, *a, **kw)`*:
Queue the function `func` for later dispatch after completion of `LFs`.
Return a `Result` for collection of the result of `func`.

This function will not be submitted until completion of
the supplied `LateFunction`s `LFs`.
If `R` is `None` a new `Result` is allocated to
accept the function return value.
After `func` completes, its return value is passed to `R.put()`.

Typical use case is as follows: suppose you're submitting
work via this `Later` object, and a submitted function itself
might submit more `LateFunction`s for which it must wait.
Code like this:

      def f():
        LF = L.defer(something)
        return LF()

may deadlock if the Later is at capacity. The `after()` method
addresses this:

      def f():
        LF1 = L.defer(something)
        LF2 = L.defer(somethingelse)
        R = L.after( [LF1, LF2], None, when_done )
        return R

This submits the `when_done()` function after the LFs have
completed without spawning a thread or using the `Later`'s
capacity.

See the retry method for a convenience method that uses the
above pattern in a repeating style.

*Method `Later.bg(self, func, *a, **kw)`*:
Queue a function to run right now,
ignoring the `Later`'s capacity and priority system.

This is really just an easy way to utilise the `Later`'s thread pool
and get back a handy `LateFunction` for result collection.
Frankly, you're probably better off using `cs.result.bg` instead.

It can be useful for transient control functions that themselves
queue things through the `Later` queuing system but do not want to
consume capacity themselves, thus avoiding deadlock at the cost of
transient overthreading.

The premise here is that the capacity limit
is more about managing compute contention than pure `Thread` count,
and that control functions should arrange other subfunctions
and then block or exit,
thus consuming neglible compute.
It is common to want to dispatch a higher order operation
via such a control function,
but that higher order would itself normally consume some
of the capacity
thus requiring an an hoc increase to the required capacity
to avoid deadlock.

*Method `Later.complete(self, outstanding=None, until_idle=False)`*:
Generator which waits for outstanding functions to complete and yields them.

Parameters:
* `outstanding`: if not None, an iterable of `LateFunction`s;
  default `self.outstanding`.
* `until_idle`: if true,
  continue until `self.outstanding` is empty.
  This requires the `outstanding` parameter to be `None`.

*Method `Later.debug(self, *a, **kw)`*:
Issue a debug message with `later_name` in `'extra'`.

*Method `Later.defer(self, func, *a, **kw)`*:
Queue the function `func` for later dispatch using the
default priority with the specified arguments `*a` and `**kw`.
Return the corresponding `LateFunction` for result collection.

`func` may optionally be preceeded by one or both of:
* a string specifying the function's descriptive name,
* a mapping containing parameters for `priority`,
  `delay`, and `when`.

Equivalent to:

    submit(functools.partial(func, *a, **kw), **params)

*Method `Later.defer_iterable(self, it: Iterable, outQ, *, greedy: bool = False, test_ready: Optional[Callable[[], bool]] = None)`*:
Submit an iterable `it` for asynchronous stepwise iteration
to put results onto the queue `outQ`.
Return a `Result` for final synchronisation.

This prepares a function to perform a single iteration of
`it`, call `outQ.put(result)` with the result, and to queue
itself again until the iterator is exhausted.
That function is queued.

Parameters:
* `it`: the iterable for for asynchronous stepwise iteration
* `outQ`: an `IterableQueue`like object
  with a `.put` method to accept items
  and a `.close` method to indicate the end of items.
  When the iteration is complete,
  call `outQ.close()` and complete the `Result`.
  If iteration ran to completion then the `Result`'s `.result`
  will be the number of iterations, otherwise if an iteration
  raised an exception the the `Result`'s .exc_info will contain
  the exception information.
* `test_ready`: if not `None`, a callable to test if iteration
  is presently permitted; iteration will be deferred until
  the callable returns a true value.

*Method `Later.error(self, *a, **kw)`*:
Issue an error message with `later_name` in `'extra'`.

*Property `Later.finished`*:
Probe the finishedness.

*Method `Later.info(self, *a, **kw)`*:
Issue an info message with `later_name` in `'extra'`.

*Method `Later.is_submittable(self) -> bool`*:
Test whether this `Later` is accepting new submissions.

*`Later.later_perthread_state`*

*Method `Later.logTo(self, filename, logger=None, log_level=None)`*:
Log to the file specified by `filename` using the specified
logger named `logger` (default the module name, cs.later) at the
specified log level `log_level` (default logging.INFO).

*Method `Later.log_status(self)`*:
Log the current delayed, pending and running state.

*Method `Later.pool(self, *a, **kw)`*:
Return a `LatePool` to manage some tasks run with this `Later`.

*Method `Later.priority(self, pri)`*:
A context manager to temporarily set the default priority.

Example:

    L = Later(4)
    with L.priority(1):
      L.defer(f)  # queue f() with priority 1
    with L.priority(2):
      L.defer(f, 3)  # queue f(3) with priority 2

WARNING: this is NOT thread safe!

TODO: is a thread safe version even a sane idea without a
      per-thread priority stack?

*Method `Later.ready(self, **kwargs)`*:
Awful name.
Return a context manager to block until the `Later` provides a timeslot.

*Method `Later.state(self, new_state, *a)`*:
Update the state of this Later.

*Method `Later.submit(self, func, priority=None, delay=None, when=None, name=None, pfx=None, LF=None, retry_delay=None)`*:
Submit the callable `func` for later dispatch.
Return the corresponding `LateFunction` for result collection.

If the parameter `priority` is not None then use it as the priority
otherwise use the default priority.

If the parameter `delay` is not None, delay consideration of
this function until `delay` seconds from now.

If the parameter `when` is not None, delay consideration of
this function until the time `when`.
It is an error to specify both `when` and `delay`.

If the parameter `name` is not None, use it to name the `LateFunction`.

If the parameter `pfx` is not None, submit pfx.partial(func);
  see the cs.logutils.Pfx.partial method for details.

If the parameter `LF` is not None, construct a new `LateFunction` to
  track function completion.

*Method `Later.wait(self, timeout=None)`*:
Wait for the `Later` to be finished.
Return the result of `self.finished_event.wait(timeout)`.

*Method `Later.wait_outstanding(self, until_idle=False)`*:
Wrapper for complete(), to collect and discard completed `LateFunction`s.

*Method `Later.warning(self, *a, **kw)`*:
Issue a warning message with `later_name` in `'extra'`.

*Method `Later.with_result_of(self, callable1, func, *a, **kw)`*:
Defer `callable1`, then append its result to the arguments for
`func` and defer `func`.
Return the `LateFunction` for `func`.

## Function `retry(retry_interval, func, *a, **kw)`

Call the callable `func` with the supplied arguments.

If it raises `RetryError`,
run `time.sleep(retry_interval)`
and then call again until it does not raise `RetryError`.

## Class `RetryError(builtins.Exception)`

Exception raised by functions which should be resubmitted to the queue.

## Class `SubLater`

A class for managing a group of deferred tasks using an existing `Later`.

*Method `SubLater.__init__(self, *, later: Optional[cs.later.Later] = <function <lambda> at 0x10a2b0160>)`*:
Initialise the `SubLater` with its parent `Later`.

TODO: accept `discard=False` param to suppress the queue and
associated checks.

*Method `SubLater.__iter__(self)`*:
Iteration over the `SubLater`
iterates over the queue of completed `LateFUnction`s.

*Method `SubLater.close(self)`*:
Close the SubLater.

This prevents further deferrals.

*Method `SubLater.defer(self, func, *a, **kw)`*:
Defer a function, return its `LateFunction`.

The resulting `LateFunction` will queue itself for collection
on completion.

*Method `SubLater.reaper(self, handler=None)`*:
Dispatch a `Thread` to collect completed `LateFunction`s.
Return the `Thread`.

`handler`: an optional callable to be passed each `LateFunction`
as it completes.

# Release Log



*Release 20240412*:
Later: new optional default=False parameter, set to true to have the Later push itself as the default for HasThreadStates.

*Release 20240305*:
Later: new thread_states=True parameter to propagate all HasThreadStates to the LateFUnction Threads; adjust LateFunction to match.

*Release 20230612*:
Updates stemming from cs.threads changes.

*Release 20230212.1*:
Bugfix LateFunction.__init__: the thread must run self.run_func(self.func) in order to collect the result/exception.

*Release 20230212*:
* SubLater.reaper: use HasThreadState.Thread to prepare the reap Thread.
* Some finalisation fixes etc.

*Release 20230125*:
Later: use HasThreadState mixin, provide @uses_later decorator.

*Release 20221228*:
* Later: replace submittable checks with decorator accepting a force=True override.
* Later.defer_iterable: implement greedy vs nongreedy.

*Release 20220918*:
* Later.wait: new optional timeout, replaces hardwired 5s timeout; return the Event.finished return.
* Later: expose the finished Event as .finished_event.
* Later.finished_event logic fixes.

*Release 20220805*:
Update for recent changes to Result.

*Release 20220605*:
* Later: replace the default = _ThreadLocal with a default = ThreadState(current=None).
* Later: fold startup/shutdown/__enter__/__exit__ into the startup_shutdown context manager, fixes MultiOpenMixin misbehaviour.

*Release 20201021*:
* Later: subclass MultiOpenMixin.
* Later._defer: make a shallow copy of the keyword parameters as we do for the positional parameters.

*Release 20191007*:
Drop pipeline functionality, moved to new cs.pipeline module.

*Release 20181231*:
* New SubLater class to provide a grouping for deferred functions and an iteration to collect them as they complete.
* Drop WorkerThreadPool (leaks idle Threads, brings little benefit).
* Later: drop worker queue thread and semaphore, just try a dispatch on submit or complete.
* Later: drop tracking code. Drop capacity context manager, never used.

*Release 20181109*:
* Updates for cs.asynchron renamed to cs.result.
* Later: no longer subclass MultiOpenMixin, users now call close to end submission, shutdown to terminate activity and wait to await finalisation.
* Clean lint, add docstrings, minor bugfixes.

*Release 20160828*:
* Use "install_requires" instead of "requires" in DISTINFO.
* Add LatePool, a context manager after the flavour of subprocess.Pool.
* Python 2 fix.
* Rename NestingOpenCloseMixin to MultiOpenMixin - easier to type, say and remember, not to mention being more accurate.
* Add RetryError exception for use by Later.retriable.
* LateFunction: support RetryError exception from function, causing requeue.
* LateFunction: accept retry_delay parameter, used to delay function retry.
* Later.defer_iterable: accept `test_ready` callable to support deferring iteration until the callable returns truthiness.
* New function retry(retry_interval, func, *a, **kw) to call func until it does not raise RetryError.
* Later: wrap several methods in @MultiOpenMixin.is_opened.
* Assorted bugfixes and improvements.

*Release 20150115*:
First PyPI release.


            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "cs.later",
    "maintainer": null,
    "docs_url": null,
    "requires_python": null,
    "maintainer_email": null,
    "keywords": "python3",
    "author": null,
    "author_email": "Cameron Simpson <cs@cskk.id.au>",
    "download_url": "https://files.pythonhosted.org/packages/6a/72/82869b453784eb77244ae81b5e5945e17b4b766a84ab873ac94584781af4/cs.later-20240412.tar.gz",
    "platform": null,
    "description": "Queue functions for execution later in priority and time order.\n\n*Latest release 20240412*:\nLater: new optional default=False parameter, set to true to have the Later push itself as the default for HasThreadStates.\n\nI use `Later` objects for convenient queuing of functions whose\nexecution occurs later in a priority order with capacity constraints.\n\nWhy not futures?\nI already had this before futures came out,\nI prefer its naming scheme and interface,\nand futures did not then support prioritised execution.\n\nUse is simple enough: create a `Later` instance and typically queue\nfunctions with the `.defer()` method::\n\n    L = Later(4)      # a Later with a parallelism of 4\n    ...\n    LF = L.defer(func, *args, **kwargs)\n    ...\n    x = LF()          # collect result\n\nThe `.defer` method and its siblings return a `LateFunction`,\nwhich is a subclass of `cs.result.Result`.\nAs such it is a callable,\nso to collect the result you just call the `LateFunction`.\n\n## Function `defer(func, *a, **kw)`\n\nQueue a function using the current default Later.\nReturn the `LateFunction`.\n\n## Class `LateFunction(cs.result.Result)`\n\nState information about a pending function,\na subclass of `cs.result.Result`.\n\nA `LateFunction` is callable,\nso a synchronous call can be done like this:\n\n    def func():\n      return 3\n    L = Later(4)\n    LF = L.defer(func)\n    x = LF()\n    print(x)        # prints 3\n\nUsed this way, if the called function raises an exception it is visible:\n\n    LF = L.defer()\n    try:\n      x = LF()\n    except SomeException as e:\n      # handle the exception ...\n\nTo avoid handling exceptions with try/except the .wait()\nmethod should be used:\n\n    LF = L.defer()\n    x, exc_info = LF.wait()\n    if exc_info:\n      # handle exception\n      exc_type, exc_value, exc_traceback = exc_info\n      ...\n    else:\n      # use `x`, the function result\n\nTODO: .cancel(), timeout for wait().\n\n*Method `LateFunction.__init__(self, func, name=None, retry_delay=None)`*:\nInitialise a `LateFunction`.\n\nParameters:\n* `func` is the callable for later execution.\n* `name`, if supplied, specifies an identifying name for the `LateFunction`.\n* `retry_local`: time delay before retry of this function on RetryError.\n  Default from `later.retry_delay`.\n\n*Method `LateFunction.wait(self)`*:\nObsolete name for `.join`.\n\n## Class `LatePool(cs.context.ContextManagerMixin)`\n\nA context manager after the style of subprocess.Pool\nbut with deferred completion.\n\nExample usage:\n\n    L = Later(4)    # a 4 thread Later\n    with LatePool(L) as LP:\n      # several calls to LatePool.defer, perhaps looped\n      LP.defer(func, *args, **kwargs)\n      LP.defer(func, *args, **kwargs)\n    # now we can LP.join() to block for all `LateFunctions`\n    #\n    # or iterate over LP to collect `LateFunction`s as they complete\n    for LF in LP:\n      result = LF()\n      print(result)\n\n*Method `LatePool.__init__(self, *, later: Optional[inspect._empty] = <function <lambda> at 0x10a2b0160>, priority=None, delay=None, when=None, pfx=None, block=False)`*:\nInitialise the `LatePool`.\n\nParameters:\n* `later`: optional `Later` instance, default from `Later.default()`\n* `priority`, `delay`, `when`, `name`, `pfx`:\n  default values passed to Later.submit.\n* `block`: if true, wait for `LateFunction` completion\n  before leaving `__exit__`.\n\n*Method `LatePool.__enter_exit__(self)`*:\nGenerator supporting `__enter__` and `__exit__`.\n\n*Method `LatePool.__iter__(self)`*:\nReport completion of the `LateFunction`s.\n\n*Method `LatePool.add(self, LF)`*:\nAdd a `LateFunction` to those to be tracked by this LatePool.\n\n*Method `LatePool.defer(self, func, *a, **kw)`*:\nDefer a function using the LatePool's default parameters.\n\n*Method `LatePool.join(self)`*:\nWait for completion of all the `LateFunction`s.\n\n*Method `LatePool.submit(self, func, **params)`*:\nSubmit a function using the LatePool's default parameters, overridden by `params`.\n\n## Class `Later(cs.resources.MultiOpenMixin, cs.threads.HasThreadState)`\n\nA management class to queue function calls for later execution.\n\nMethods are provided for submitting functions to run ASAP or\nafter a delay or after other pending functions. These methods\nreturn `LateFunction`s, a subclass of `cs.result.Result`.\n\nA Later instance' close method closes the Later for further\nsubmission.\nShutdown does not imply that all submitted functions have\ncompleted or even been dispatched.\nCallers may wait for completion and optionally cancel functions.\n\nTODO: __enter__ returns a SubLater, __exit__ closes the SubLater.\n\nTODO: drop global default Later.\n\n*Method `Later.__init__(self, capacity, name=None, inboundCapacity=0, retry_delay=None, default=False)`*:\nInitialise the Later instance.\n\nParameters:\n* `capacity`: resource contraint on this Later; if an int, it is used\n  to size a Semaphore to constrain the number of dispatched functions\n  which may be in play at a time; if not an int it is presumed to be a\n  suitable Semaphore-like object, perhaps shared with other subsystems.\n* `name`: optional identifying name for this instance.\n* `inboundCapacity`: if >0, used as a limit on the number of\n  undispatched functions that may be queued up; the default is 0 (no\n  limit).  Calls to submit functions when the inbound limit is reached\n  block until some functions are dispatched.\n* `retry_delay`: time delay for requeued functions.\n  Default: `DEFAULT_RETRY_DELAY`.\n\n*Method `Later.__call__(self, func, *a, **kw)`*:\nA Later object can be called with a function and arguments\nwith the effect of deferring the function and waiting for\nit to complete, returning its return value.\n\nExample:\n\n  def f(a):\n    return a*2\n  x = L(f, 3)   # x == 6\n\n*Method `Later.__enter_exit__(self)`*:\nRun both the inherited context managers.\n\n*Method `Later.after(self, LFs, R, func, *a, **kw)`*:\nQueue the function `func` for later dispatch after completion of `LFs`.\nReturn a `Result` for collection of the result of `func`.\n\nThis function will not be submitted until completion of\nthe supplied `LateFunction`s `LFs`.\nIf `R` is `None` a new `Result` is allocated to\naccept the function return value.\nAfter `func` completes, its return value is passed to `R.put()`.\n\nTypical use case is as follows: suppose you're submitting\nwork via this `Later` object, and a submitted function itself\nmight submit more `LateFunction`s for which it must wait.\nCode like this:\n\n      def f():\n        LF = L.defer(something)\n        return LF()\n\nmay deadlock if the Later is at capacity. The `after()` method\naddresses this:\n\n      def f():\n        LF1 = L.defer(something)\n        LF2 = L.defer(somethingelse)\n        R = L.after( [LF1, LF2], None, when_done )\n        return R\n\nThis submits the `when_done()` function after the LFs have\ncompleted without spawning a thread or using the `Later`'s\ncapacity.\n\nSee the retry method for a convenience method that uses the\nabove pattern in a repeating style.\n\n*Method `Later.bg(self, func, *a, **kw)`*:\nQueue a function to run right now,\nignoring the `Later`'s capacity and priority system.\n\nThis is really just an easy way to utilise the `Later`'s thread pool\nand get back a handy `LateFunction` for result collection.\nFrankly, you're probably better off using `cs.result.bg` instead.\n\nIt can be useful for transient control functions that themselves\nqueue things through the `Later` queuing system but do not want to\nconsume capacity themselves, thus avoiding deadlock at the cost of\ntransient overthreading.\n\nThe premise here is that the capacity limit\nis more about managing compute contention than pure `Thread` count,\nand that control functions should arrange other subfunctions\nand then block or exit,\nthus consuming neglible compute.\nIt is common to want to dispatch a higher order operation\nvia such a control function,\nbut that higher order would itself normally consume some\nof the capacity\nthus requiring an an hoc increase to the required capacity\nto avoid deadlock.\n\n*Method `Later.complete(self, outstanding=None, until_idle=False)`*:\nGenerator which waits for outstanding functions to complete and yields them.\n\nParameters:\n* `outstanding`: if not None, an iterable of `LateFunction`s;\n  default `self.outstanding`.\n* `until_idle`: if true,\n  continue until `self.outstanding` is empty.\n  This requires the `outstanding` parameter to be `None`.\n\n*Method `Later.debug(self, *a, **kw)`*:\nIssue a debug message with `later_name` in `'extra'`.\n\n*Method `Later.defer(self, func, *a, **kw)`*:\nQueue the function `func` for later dispatch using the\ndefault priority with the specified arguments `*a` and `**kw`.\nReturn the corresponding `LateFunction` for result collection.\n\n`func` may optionally be preceeded by one or both of:\n* a string specifying the function's descriptive name,\n* a mapping containing parameters for `priority`,\n  `delay`, and `when`.\n\nEquivalent to:\n\n    submit(functools.partial(func, *a, **kw), **params)\n\n*Method `Later.defer_iterable(self, it: Iterable, outQ, *, greedy: bool = False, test_ready: Optional[Callable[[], bool]] = None)`*:\nSubmit an iterable `it` for asynchronous stepwise iteration\nto put results onto the queue `outQ`.\nReturn a `Result` for final synchronisation.\n\nThis prepares a function to perform a single iteration of\n`it`, call `outQ.put(result)` with the result, and to queue\nitself again until the iterator is exhausted.\nThat function is queued.\n\nParameters:\n* `it`: the iterable for for asynchronous stepwise iteration\n* `outQ`: an `IterableQueue`like object\n  with a `.put` method to accept items\n  and a `.close` method to indicate the end of items.\n  When the iteration is complete,\n  call `outQ.close()` and complete the `Result`.\n  If iteration ran to completion then the `Result`'s `.result`\n  will be the number of iterations, otherwise if an iteration\n  raised an exception the the `Result`'s .exc_info will contain\n  the exception information.\n* `test_ready`: if not `None`, a callable to test if iteration\n  is presently permitted; iteration will be deferred until\n  the callable returns a true value.\n\n*Method `Later.error(self, *a, **kw)`*:\nIssue an error message with `later_name` in `'extra'`.\n\n*Property `Later.finished`*:\nProbe the finishedness.\n\n*Method `Later.info(self, *a, **kw)`*:\nIssue an info message with `later_name` in `'extra'`.\n\n*Method `Later.is_submittable(self) -> bool`*:\nTest whether this `Later` is accepting new submissions.\n\n*`Later.later_perthread_state`*\n\n*Method `Later.logTo(self, filename, logger=None, log_level=None)`*:\nLog to the file specified by `filename` using the specified\nlogger named `logger` (default the module name, cs.later) at the\nspecified log level `log_level` (default logging.INFO).\n\n*Method `Later.log_status(self)`*:\nLog the current delayed, pending and running state.\n\n*Method `Later.pool(self, *a, **kw)`*:\nReturn a `LatePool` to manage some tasks run with this `Later`.\n\n*Method `Later.priority(self, pri)`*:\nA context manager to temporarily set the default priority.\n\nExample:\n\n    L = Later(4)\n    with L.priority(1):\n      L.defer(f)  # queue f() with priority 1\n    with L.priority(2):\n      L.defer(f, 3)  # queue f(3) with priority 2\n\nWARNING: this is NOT thread safe!\n\nTODO: is a thread safe version even a sane idea without a\n      per-thread priority stack?\n\n*Method `Later.ready(self, **kwargs)`*:\nAwful name.\nReturn a context manager to block until the `Later` provides a timeslot.\n\n*Method `Later.state(self, new_state, *a)`*:\nUpdate the state of this Later.\n\n*Method `Later.submit(self, func, priority=None, delay=None, when=None, name=None, pfx=None, LF=None, retry_delay=None)`*:\nSubmit the callable `func` for later dispatch.\nReturn the corresponding `LateFunction` for result collection.\n\nIf the parameter `priority` is not None then use it as the priority\notherwise use the default priority.\n\nIf the parameter `delay` is not None, delay consideration of\nthis function until `delay` seconds from now.\n\nIf the parameter `when` is not None, delay consideration of\nthis function until the time `when`.\nIt is an error to specify both `when` and `delay`.\n\nIf the parameter `name` is not None, use it to name the `LateFunction`.\n\nIf the parameter `pfx` is not None, submit pfx.partial(func);\n  see the cs.logutils.Pfx.partial method for details.\n\nIf the parameter `LF` is not None, construct a new `LateFunction` to\n  track function completion.\n\n*Method `Later.wait(self, timeout=None)`*:\nWait for the `Later` to be finished.\nReturn the result of `self.finished_event.wait(timeout)`.\n\n*Method `Later.wait_outstanding(self, until_idle=False)`*:\nWrapper for complete(), to collect and discard completed `LateFunction`s.\n\n*Method `Later.warning(self, *a, **kw)`*:\nIssue a warning message with `later_name` in `'extra'`.\n\n*Method `Later.with_result_of(self, callable1, func, *a, **kw)`*:\nDefer `callable1`, then append its result to the arguments for\n`func` and defer `func`.\nReturn the `LateFunction` for `func`.\n\n## Function `retry(retry_interval, func, *a, **kw)`\n\nCall the callable `func` with the supplied arguments.\n\nIf it raises `RetryError`,\nrun `time.sleep(retry_interval)`\nand then call again until it does not raise `RetryError`.\n\n## Class `RetryError(builtins.Exception)`\n\nException raised by functions which should be resubmitted to the queue.\n\n## Class `SubLater`\n\nA class for managing a group of deferred tasks using an existing `Later`.\n\n*Method `SubLater.__init__(self, *, later: Optional[cs.later.Later] = <function <lambda> at 0x10a2b0160>)`*:\nInitialise the `SubLater` with its parent `Later`.\n\nTODO: accept `discard=False` param to suppress the queue and\nassociated checks.\n\n*Method `SubLater.__iter__(self)`*:\nIteration over the `SubLater`\niterates over the queue of completed `LateFUnction`s.\n\n*Method `SubLater.close(self)`*:\nClose the SubLater.\n\nThis prevents further deferrals.\n\n*Method `SubLater.defer(self, func, *a, **kw)`*:\nDefer a function, return its `LateFunction`.\n\nThe resulting `LateFunction` will queue itself for collection\non completion.\n\n*Method `SubLater.reaper(self, handler=None)`*:\nDispatch a `Thread` to collect completed `LateFunction`s.\nReturn the `Thread`.\n\n`handler`: an optional callable to be passed each `LateFunction`\nas it completes.\n\n# Release Log\n\n\n\n*Release 20240412*:\nLater: new optional default=False parameter, set to true to have the Later push itself as the default for HasThreadStates.\n\n*Release 20240305*:\nLater: new thread_states=True parameter to propagate all HasThreadStates to the LateFUnction Threads; adjust LateFunction to match.\n\n*Release 20230612*:\nUpdates stemming from cs.threads changes.\n\n*Release 20230212.1*:\nBugfix LateFunction.__init__: the thread must run self.run_func(self.func) in order to collect the result/exception.\n\n*Release 20230212*:\n* SubLater.reaper: use HasThreadState.Thread to prepare the reap Thread.\n* Some finalisation fixes etc.\n\n*Release 20230125*:\nLater: use HasThreadState mixin, provide @uses_later decorator.\n\n*Release 20221228*:\n* Later: replace submittable checks with decorator accepting a force=True override.\n* Later.defer_iterable: implement greedy vs nongreedy.\n\n*Release 20220918*:\n* Later.wait: new optional timeout, replaces hardwired 5s timeout; return the Event.finished return.\n* Later: expose the finished Event as .finished_event.\n* Later.finished_event logic fixes.\n\n*Release 20220805*:\nUpdate for recent changes to Result.\n\n*Release 20220605*:\n* Later: replace the default = _ThreadLocal with a default = ThreadState(current=None).\n* Later: fold startup/shutdown/__enter__/__exit__ into the startup_shutdown context manager, fixes MultiOpenMixin misbehaviour.\n\n*Release 20201021*:\n* Later: subclass MultiOpenMixin.\n* Later._defer: make a shallow copy of the keyword parameters as we do for the positional parameters.\n\n*Release 20191007*:\nDrop pipeline functionality, moved to new cs.pipeline module.\n\n*Release 20181231*:\n* New SubLater class to provide a grouping for deferred functions and an iteration to collect them as they complete.\n* Drop WorkerThreadPool (leaks idle Threads, brings little benefit).\n* Later: drop worker queue thread and semaphore, just try a dispatch on submit or complete.\n* Later: drop tracking code. Drop capacity context manager, never used.\n\n*Release 20181109*:\n* Updates for cs.asynchron renamed to cs.result.\n* Later: no longer subclass MultiOpenMixin, users now call close to end submission, shutdown to terminate activity and wait to await finalisation.\n* Clean lint, add docstrings, minor bugfixes.\n\n*Release 20160828*:\n* Use \"install_requires\" instead of \"requires\" in DISTINFO.\n* Add LatePool, a context manager after the flavour of subprocess.Pool.\n* Python 2 fix.\n* Rename NestingOpenCloseMixin to MultiOpenMixin - easier to type, say and remember, not to mention being more accurate.\n* Add RetryError exception for use by Later.retriable.\n* LateFunction: support RetryError exception from function, causing requeue.\n* LateFunction: accept retry_delay parameter, used to delay function retry.\n* Later.defer_iterable: accept `test_ready` callable to support deferring iteration until the callable returns truthiness.\n* New function retry(retry_interval, func, *a, **kw) to call func until it does not raise RetryError.\n* Later: wrap several methods in @MultiOpenMixin.is_opened.\n* Assorted bugfixes and improvements.\n\n*Release 20150115*:\nFirst PyPI release.\n\n",
    "bugtrack_url": null,
    "license": "GNU General Public License v3 or later (GPLv3+)",
    "summary": "Queue functions for execution later in priority and time order.",
    "version": "20240412",
    "project_urls": {
        "URL": "https://bitbucket.org/cameron_simpson/css/commits/all"
    },
    "split_keywords": [
        "python3"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "eb6e6a7f27b4efa5d0bed724cbb6b46821a118e641653bb018553d77bd0320be",
                "md5": "f6a46eeaebd291e0b8a8f94e89ec8b95",
                "sha256": "8546aeae3630a8fe8e16c13ee33dd4b7957438b782ccf5fa98292925dc918208"
            },
            "downloads": -1,
            "filename": "cs.later-20240412-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "f6a46eeaebd291e0b8a8f94e89ec8b95",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": null,
            "size": 17851,
            "upload_time": "2024-04-12T05:13:32",
            "upload_time_iso_8601": "2024-04-12T05:13:32.117336Z",
            "url": "https://files.pythonhosted.org/packages/eb/6e/6a7f27b4efa5d0bed724cbb6b46821a118e641653bb018553d77bd0320be/cs.later-20240412-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "6a7282869b453784eb77244ae81b5e5945e17b4b766a84ab873ac94584781af4",
                "md5": "17332f6b9501d43c79bf99416b6a3bb7",
                "sha256": "bcf10b664826d7918460d7b0e7040b1186cb84b178dd61bfb0c588d5b7ec969a"
            },
            "downloads": -1,
            "filename": "cs.later-20240412.tar.gz",
            "has_sig": false,
            "md5_digest": "17332f6b9501d43c79bf99416b6a3bb7",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 20759,
            "upload_time": "2024-04-12T05:13:34",
            "upload_time_iso_8601": "2024-04-12T05:13:34.914278Z",
            "url": "https://files.pythonhosted.org/packages/6a/72/82869b453784eb77244ae81b5e5945e17b4b766a84ab873ac94584781af4/cs.later-20240412.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-04-12 05:13:34",
    "github": false,
    "gitlab": false,
    "bitbucket": true,
    "codeberg": false,
    "bitbucket_user": "cameron_simpson",
    "bitbucket_project": "css",
    "lcname": "cs.later"
}
        
Elapsed time: 0.25610s