# Mr. FIFO
[![made-with-python](https://img.shields.io/badge/Made%20with-Python-1f425f.svg)](https://www.python.org/)
[![PyPI](https://badge.fury.io/py/mrfifo.svg)](https://badge.fury.io/py/mrfifo)
![PyPI - License](https://img.shields.io/pypi/l/mrfifo)
[![Coverage Status](https://coveralls.io/repos/github/marvin-jens/mrfifo/badge.svg?branch=main)](https://coveralls.io/github/marvin-jens/mrfifo?branch=main)
[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)
Mr. FIFO is a python/cython package that builds on top of the `multiprocessing` module, providing high throughput and very low-overhead, directional, point-to-point interprocess communication. It essentially provides _Map-Reduce parallelism over FIFOs_ (aka named pipes).
# Abstract
Some problems can very nicely be implemented in python and are easily parallelizable, but require high throughput. The python multiprocessing module primitives (Queue, Pipe, etc.) however suffer from significant overhead (pickling/un-pickling etc.) and can become a severe bottleneck.
Mr. FIFO provides few very low overhead function primitives (implemented in Cython) that exploit the time-tested and extremely optimized inter-process communication framework known as *named pipes* or FIFOs (first-in first-out). The development is heavily geared towards bioinformatics tasks (processing large BAM files for instance), but is in no way restricted to that.
# Install
```
pip install mrfifo
```
should do the trick.
# Example workflow
Here's an instructive example that shows off several of the main features. The following code decompresses a `BAM` file, distributes the SAM lines to four separate worker processes running in parallel, and then re-assembles a new BAM with the exact same content.
The workers execute the function `pass_through` which iterates over its input, counts the number of lines (SAM records) it encounters, and writes the line to an output. After exhausting the input, the function returns the count and exits.
The magic happens before and after the calls to `pass_through`. For each of the calls defining the workflow (called "parts" of the workflow), a separate `multiprocessing.Process` is started. In fact, in reverse order, so starting with the `.funnel()` and then working its way up to the `.BAM_reader()`. However, before any processes get started, the workflow object figures out how many named pipes it needs to create (in a safe temp directory), by way of the `mf.FIFO` objects being created and passed into each part. Unless, explicitly specified (using `manage_pipes=False`), these pipes are additionally managed by mrfifo, to avoid deadlocks. For instance, `pass_through` is already called with two opened file objects, one for reading and one for writing, which have been created by mrfifo to connect it with the `.distribute` part on the input side and the `.collect` part on the output side.
Note that return values and exceptions (including tracebacks) are automatically captured by the workflow object to simplify your program and to avoid hanging processes waiting for a pipe to be closed.
A little extra is the BAM header. This gets detected and separated from the bulk of the data by the `.distribute` part and is sent to its own FIFO named `header`. The header thus bypasses the workers and is merged with their outputs only at the `.collect` part. However, the header could alternatively have been broadcast to all workers as well. See the documentation for more details.
```python
def pass_through(input, output):
i = 0
for line in input:
i += 1
output.write(line)
return i
def is_header(line):
return line.startswith("@")
def test_bam_reconstruct(chunk_size=1, n=4):
import mrfifo as mf
w = (
mf.Workflow("BAM_reconstruct")
.BAM_reader(input="test_data/tiny_test.bam")
.distribute(
input=mf.FIFO("input_sam", "rt"),
outputs=mf.FIFO("dist{n}", "wt", n=n),
chunk_size=chunk_size,
header_detect_func=is_header,
header_broadcast=False,
header_fifo=mf.FIFO("header", "wt"),
)
.workers(
input=mf.FIFO("dist{n}", "rt"),
output=mf.FIFO("out{n}", "wt"),
func=pass_through,
n=n,
)
.collect(
inputs=mf.FIFO("out{n}", "rt", n=n),
header_fifo=mf.FIFO("header", "rt"),
output=mf.FIFO("out_sam", "wt"),
chunk_size=chunk_size,
)
.funnel(
input=mf.FIFO("out_sam", "rt"),
output="test_data/reconstruct.bam",
_manage_fifos=False,
func=mf.parts.bam_writer,
)
.run()
)
```
Please note that we assign the arguments of `pass_through` to `mf.FIFO` instances. This tells mrfifo that input and output are considered *internal plumbing* between the various parts executed in parallel. As such, Mr.FIFO will actually create named pipes in a temporary directory for each connection between parts. Unless explicitly turned of with `manage_pipes=False`, Mr. FIFO will even open and close the named pipes to avoid deadlocks in case of any exceptions.
The example above actually creates a number of sub-processes:
* `BAM_reconstruct.bam_reader0`: reads a BAM file from `test_data/tiny_test.bam` and decompresses to the named pipe `input_sam`
* `BAM_reconstruct.dist0`: reads line-by-line from `input_sam` and distributes in a round-robin fashion to
four named pipes `['dist0', 'dist1', 'dist2', 'dist3']`. The BAM header is separately sent to its own FIFO named `header`.
* `BAM_reconstruct.worker{i}`: four processes, each reading from `dist{i}` and writing to `out{i}` where `i` goes from 0 to 3.
* `BAM_reconstruct.collect0`: reads line-by-line, round-robin from `['counted0', ..., 'counted3']` and writes the line to `out_sam`. But not before passing on the contents of the `header`!
* `BAM_reconstruct.funnel0`: sends the content of `out_sam` to a `samtools view` process which re-creates a compressed BAM file with the identical original context.
# API
Under the hood, the workflow creates `mrfifo.Job` instances for each process. These are thin wrappers around `mutliprocessing.Process`, which allow to keep track of the inputs and outputs, collect return values and unhandled exceptions. `func` is a callable that must not especially be prepared for mrfifo. In general, it should probably have an argument that can be iterated over (line-wise) so that we can connect it to a FIFO. The API to compose the workflow consists of useful shorthands to create meaningfully different jobs. But there may be different ways to combine code with the offered primitives. The currently proposed logic is as follows:
## .reader
Readers sequentially process one or more input files and write to a single output. In the above example we use a as input, but we could also have provided `/dev/stdin`. Typically, the entry-point to your workflow will be defined by one or more `.reader()` calls.
## .distribute
A distributor splits an input into multiple outputs by cycling through the outputs as it iterates over chunks of input. The default distributor is implemented in cython, tries to allocate large pipe buffers and has low overhead.
For more complex input streams, it offers detection and separate treatment of a header. The header can either be broadcast to each of the outputs, or sent to a dedicated output that is exclusively used for the header (see the documentation). The code also supports very basic pattern matching and an output lookup mechanism which allows, for example, to split BAM records by the first few bases of the Unique Molecular Identifier (UMI) or Cell Barcode (CB). This is very useful if the downstream workers, *running in parallel* need to ensure that records with the same UMI or CB are always sent to the same worker.
## .workers
This is the only API function that creates multiple jobs from one call. inputs and outputs are expanded for each worker using the `n` variable in the inputs and outputs argument. Note that the input FIFO contains a wildcard that will be replaced with the appropriate number for each worker. This allows to route multiple inputs (from multiple distributors) into workers while keeping the input streams in sync.
## .collect
Collectors do the opposite of distributors. They read round-robin from multiple inputs and write to a single output. This output can be a file and may represent the output of your workflow. It can of course also be a FIFO.
## .funnel
A funnel is just a job with a single output, but possibly multiple inputs. Any kind of processing may fit in here. Here, for example, we re-compress already collected output.
## w.results_dict
Note that after a workflow is complete, you can find the return values of the functions assigned to subprocesses in the dictionary `w.results_dict[<job-name>]`. Similarly, exception tracebacks can be retrieved from `w.exc_dict`.
Raw data
{
"_id": null,
"home_page": "https://github.com/marvin-jens/mrfifo",
"name": "mrfifo",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.10",
"maintainer_email": null,
"keywords": null,
"author": "Marvin Jens",
"author_email": "marvin.jens@charite.de",
"download_url": "https://files.pythonhosted.org/packages/d3/7c/234e668df60c89638fd00607e4a1bbc1b09b148965f0d862e5fecae03588/mrfifo-0.7.tar.gz",
"platform": null,
"description": "# Mr. FIFO\n\n[![made-with-python](https://img.shields.io/badge/Made%20with-Python-1f425f.svg)](https://www.python.org/)\n[![PyPI](https://badge.fury.io/py/mrfifo.svg)](https://badge.fury.io/py/mrfifo)\n![PyPI - License](https://img.shields.io/pypi/l/mrfifo)\n[![Coverage Status](https://coveralls.io/repos/github/marvin-jens/mrfifo/badge.svg?branch=main)](https://coveralls.io/github/marvin-jens/mrfifo?branch=main)\n[![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black)\n\nMr. FIFO is a python/cython package that builds on top of the `multiprocessing` module, providing high throughput and very low-overhead, directional, point-to-point interprocess communication. It essentially provides _Map-Reduce parallelism over FIFOs_ (aka named pipes).\n\n# Abstract\n\nSome problems can very nicely be implemented in python and are easily parallelizable, but require high throughput. The python multiprocessing module primitives (Queue, Pipe, etc.) however suffer from significant overhead (pickling/un-pickling etc.) and can become a severe bottleneck.\nMr. FIFO provides few very low overhead function primitives (implemented in Cython) that exploit the time-tested and extremely optimized inter-process communication framework known as *named pipes* or FIFOs (first-in first-out). The development is heavily geared towards bioinformatics tasks (processing large BAM files for instance), but is in no way restricted to that.\n\n# Install\n\n```\n pip install mrfifo\n```\n\nshould do the trick. \n\n# Example workflow\n\nHere's an instructive example that shows off several of the main features. The following code decompresses a `BAM` file, distributes the SAM lines to four separate worker processes running in parallel, and then re-assembles a new BAM with the exact same content.\nThe workers execute the function `pass_through` which iterates over its input, counts the number of lines (SAM records) it encounters, and writes the line to an output. After exhausting the input, the function returns the count and exits. \nThe magic happens before and after the calls to `pass_through`. For each of the calls defining the workflow (called \"parts\" of the workflow), a separate `multiprocessing.Process` is started. In fact, in reverse order, so starting with the `.funnel()` and then working its way up to the `.BAM_reader()`. However, before any processes get started, the workflow object figures out how many named pipes it needs to create (in a safe temp directory), by way of the `mf.FIFO` objects being created and passed into each part. Unless, explicitly specified (using `manage_pipes=False`), these pipes are additionally managed by mrfifo, to avoid deadlocks. For instance, `pass_through` is already called with two opened file objects, one for reading and one for writing, which have been created by mrfifo to connect it with the `.distribute` part on the input side and the `.collect` part on the output side.\nNote that return values and exceptions (including tracebacks) are automatically captured by the workflow object to simplify your program and to avoid hanging processes waiting for a pipe to be closed.\nA little extra is the BAM header. This gets detected and separated from the bulk of the data by the `.distribute` part and is sent to its own FIFO named `header`. The header thus bypasses the workers and is merged with their outputs only at the `.collect` part. However, the header could alternatively have been broadcast to all workers as well. See the documentation for more details.\n\n\n```python\n\n def pass_through(input, output):\n i = 0\n for line in input:\n i += 1\n output.write(line)\n\n return i\n\n def is_header(line):\n return line.startswith(\"@\")\n\n def test_bam_reconstruct(chunk_size=1, n=4):\n import mrfifo as mf\n w = (\n mf.Workflow(\"BAM_reconstruct\")\n .BAM_reader(input=\"test_data/tiny_test.bam\")\n .distribute(\n input=mf.FIFO(\"input_sam\", \"rt\"),\n outputs=mf.FIFO(\"dist{n}\", \"wt\", n=n),\n chunk_size=chunk_size,\n header_detect_func=is_header,\n header_broadcast=False,\n header_fifo=mf.FIFO(\"header\", \"wt\"),\n )\n .workers(\n input=mf.FIFO(\"dist{n}\", \"rt\"),\n output=mf.FIFO(\"out{n}\", \"wt\"),\n func=pass_through,\n n=n,\n )\n .collect(\n inputs=mf.FIFO(\"out{n}\", \"rt\", n=n),\n header_fifo=mf.FIFO(\"header\", \"rt\"),\n output=mf.FIFO(\"out_sam\", \"wt\"),\n chunk_size=chunk_size,\n )\n .funnel(\n input=mf.FIFO(\"out_sam\", \"rt\"),\n output=\"test_data/reconstruct.bam\",\n _manage_fifos=False,\n func=mf.parts.bam_writer,\n )\n .run()\n )\n```\n\nPlease note that we assign the arguments of `pass_through` to `mf.FIFO` instances. This tells mrfifo that input and output are considered *internal plumbing* between the various parts executed in parallel. As such, Mr.FIFO will actually create named pipes in a temporary directory for each connection between parts. Unless explicitly turned of with `manage_pipes=False`, Mr. FIFO will even open and close the named pipes to avoid deadlocks in case of any exceptions.\n\nThe example above actually creates a number of sub-processes:\n\n * `BAM_reconstruct.bam_reader0`: reads a BAM file from `test_data/tiny_test.bam` and decompresses to the named pipe `input_sam`\n * `BAM_reconstruct.dist0`: reads line-by-line from `input_sam` and distributes in a round-robin fashion to\n four named pipes `['dist0', 'dist1', 'dist2', 'dist3']`. The BAM header is separately sent to its own FIFO named `header`.\n * `BAM_reconstruct.worker{i}`: four processes, each reading from `dist{i}` and writing to `out{i}` where `i` goes from 0 to 3.\n * `BAM_reconstruct.collect0`: reads line-by-line, round-robin from `['counted0', ..., 'counted3']` and writes the line to `out_sam`. But not before passing on the contents of the `header`!\n * `BAM_reconstruct.funnel0`: sends the content of `out_sam` to a `samtools view` process which re-creates a compressed BAM file with the identical original context.\n\n # API\n\n Under the hood, the workflow creates `mrfifo.Job` instances for each process. These are thin wrappers around `mutliprocessing.Process`, which allow to keep track of the inputs and outputs, collect return values and unhandled exceptions. `func` is a callable that must not especially be prepared for mrfifo. In general, it should probably have an argument that can be iterated over (line-wise) so that we can connect it to a FIFO. The API to compose the workflow consists of useful shorthands to create meaningfully different jobs. But there may be different ways to combine code with the offered primitives. The currently proposed logic is as follows:\n\n ## .reader\n\n Readers sequentially process one or more input files and write to a single output. In the above example we use a as input, but we could also have provided `/dev/stdin`. Typically, the entry-point to your workflow will be defined by one or more `.reader()` calls.\n\n ## .distribute\n\nA distributor splits an input into multiple outputs by cycling through the outputs as it iterates over chunks of input. The default distributor is implemented in cython, tries to allocate large pipe buffers and has low overhead.\nFor more complex input streams, it offers detection and separate treatment of a header. The header can either be broadcast to each of the outputs, or sent to a dedicated output that is exclusively used for the header (see the documentation). The code also supports very basic pattern matching and an output lookup mechanism which allows, for example, to split BAM records by the first few bases of the Unique Molecular Identifier (UMI) or Cell Barcode (CB). This is very useful if the downstream workers, *running in parallel* need to ensure that records with the same UMI or CB are always sent to the same worker.\n\n## .workers\n\nThis is the only API function that creates multiple jobs from one call. inputs and outputs are expanded for each worker using the `n` variable in the inputs and outputs argument. Note that the input FIFO contains a wildcard that will be replaced with the appropriate number for each worker. This allows to route multiple inputs (from multiple distributors) into workers while keeping the input streams in sync.\n\n## .collect\n\nCollectors do the opposite of distributors. They read round-robin from multiple inputs and write to a single output. This output can be a file and may represent the output of your workflow. It can of course also be a FIFO.\n\n## .funnel\n\nA funnel is just a job with a single output, but possibly multiple inputs. Any kind of processing may fit in here. Here, for example, we re-compress already collected output.\n\n## w.results_dict\n\nNote that after a workflow is complete, you can find the return values of the functions assigned to subprocesses in the dictionary `w.results_dict[<job-name>]`. Similarly, exception tracebacks can be retrieved from `w.exc_dict`.\n\n\n\n\n\n\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Map-Reduce parallelism over FIFOs (named pipes)",
"version": "0.7",
"project_urls": {
"Bug Tracker": "https://github.com/marvin-jens/mrfifo/issues",
"Homepage": "https://github.com/marvin-jens/mrfifo"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "d37c234e668df60c89638fd00607e4a1bbc1b09b148965f0d862e5fecae03588",
"md5": "0f8fe55e56b7993aebb9b753176815b7",
"sha256": "d1c3a49033582e7208f24578bbdcd48bab1b22cab2b60e6d0b2f6dc47a38639e"
},
"downloads": -1,
"filename": "mrfifo-0.7.tar.gz",
"has_sig": false,
"md5_digest": "0f8fe55e56b7993aebb9b753176815b7",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.10",
"size": 99535,
"upload_time": "2024-05-06T09:37:16",
"upload_time_iso_8601": "2024-05-06T09:37:16.001578Z",
"url": "https://files.pythonhosted.org/packages/d3/7c/234e668df60c89638fd00607e4a1bbc1b09b148965f0d862e5fecae03588/mrfifo-0.7.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-05-06 09:37:16",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "marvin-jens",
"github_project": "mrfifo",
"travis_ci": false,
"coveralls": true,
"github_actions": true,
"lcname": "mrfifo"
}