# MomentumX
<p align="center">
<img src="https://github.com/captivationsoftware/MomentumX/blob/main/Logo.png?raw=true" title="MomentumX Logo" />
<br/>
<span>
<strong>MomentumX</strong> is a <strong>zero-copy shared memory IPC</strong> library for building complex <strong>streaming data pipelines</strong> capable of processing <strong>large datasets</strong> using <strong>Python</strong>.
</span>
</p>
<br />
### Key Features:
- High-Throughput, Low Latency
- Supports **streaming and synchronous** modes for use within a wide variety of use cases.
- Bring your own encoding, or use **raw binary** data.
- Sane **data protections** to ensure **reliability of data** in a cooperative computing environment.
- Pairs with other high-performance libraries, such as **numpy** and **scipy**, to support parallel processing of memory-intensive scientific data.
- Works on most modern versions of **Linux** using shared memory (via `/dev/shm`).
- Seamlessly integrates into a **Docker** environment with minimal configuration, and readily enables lightweight container-to-container data sharing.
### Examples:
Below are some simplified use cases for common MomentumX workflows. Consult the examples in the `examples/` directory for additional details and implementation guidance.
#### Stream Mode
```python
# Producer Process
import momentumx as mx
# Create a stream with a total capacity of 10MB (1MB x 10)
stream = mx.Producer('my_stream', buffer_size=int(1e6), buffer_count=10, sync=False)
# Obtain the next available buffer for writing
buffer = stream.next_to_send()
buffer.write(b'1')
buffer.send()
# NOTE: buffer.send() can also be passed an explicit number of bytes as well.
# Otherwise an internally managed cursor will be used.
```
```python
# Consumer Process(es)
import momentumx as mx
stream = mx.Consumer('my_stream')
# Receive from my_stream as long as the stream has not ended OR there are unread buffers
while stream.has_next:
# Block while waiting to receive buffer
# NOTE: Non-blocking receive is possible using blocking=False keyword argument
buffer = stream.receive()
# If we are here, either the stream ended OR we have a buffer, so check...
if buffer is not None:
# We have buffer containing data, so print the entire contents
print(buffer.read(buffer.data_size))
# See also "Implicit versus Explicit Buffer Release" section below.
```
#### Sync Mode
```python
# Producer Process
import momentumx as mx
import threading
import signal
cancel_event = threading.Event()
signal.signal(signal.SIGINT, (lambda _sig, _frm: cancel_event.set()))
# Create a stream with a total capacity of 10MB
stream = mx.Producer(
'my_stream',
buffer_size=int(1e6),
buffer_count=10,
sync=True
) # NOTE: sync set to True
min_subscribers = 1
while stream.subscriber_count < min_subscribers:
print("waiting for subscriber(s)")
if cancel_event.wait(0.5):
break
print("All expected subscribers are ready")
# Write the series 0-999 to a consumer
for n in range(0, 1000):
if stream.subscriber_count == 0:
cancel_event.wait(0.5)
# Note: sending strings directly is possible via the send_string call
elif stream.send_string(str(n)):
print(f"Sent: {n}")
```
```python
# Consumer Process(es)
import momentumx as mx
stream = mx.Consumer('my_stream')
while stream.has_next:
data = stream.receive_string()
if data is not None:
# Note: receiving strings is possible as well via the receive_string call
print(f"Received: {data}")
```
#### Iterator Syntax
Working with buffers is even easier using `iter()` builtin:
```python
import momentumx as mx
stream = mx.Consumer(STREAM)
# Iterate over buffers in the stream until stream.receive() returns None
for buffer in iter(stream.receive, None):
# Now, buffer is guaranteed to be valid, so no check required -
# go ahead and print all the contents again, this time using
# the index and slice operators!
print(buffer[0]) # print first byte
print(buffer[1:buffer.data_size]) # print remaining bytes
```
#### Numpy Integration
```python
import momentumx as mx
import numpy as np
# Create a stream
stream = mx.Consumer('numpy_stream')
# Receive the next buffer (or if a producer, obtain the next_to_send buffer)
buffer = stream.receive()
# Create a numpy array directly from the memory without any copying
np_buff = np.frombuffer(buffer, dtype=uint8)
```
#### Implicit versus Explicit Buffer Release
MomentumX Consumers will, by default, automatically release a buffer under the covers once all references are destroyed. This promotes both usability and data integrity. However, there may be cases where the developer wants to utilize a different strategy and explicity control when buffers are released to the pool of available buffers.
```python
stream = mx.Consumer('my_stream')
buffer = stream.receive()
# Access to buffer is safe!
buffer.read(10)
# Buffer is being returned back to available buffer pool.
# Be sure you are truly done with your data!
buffer.release()
# DANGER: DO NOT DO THIS!
# All operations on a buffer after calling `release` are considered unsafe!
# All safeguards have been removed and the memory is volatile!
buffer.read(10)
```
#### Isolated Contexts
MomentumX allows for the usage of streams outside of `/dev/shm` (the default location). Pass the `context` kwarg pointing to a directory on the filesystem for both the `Producer` and all `Consumer` instances to create isolated contexts.
This option is useful if access to `/dev/shm` is unsuitable.
```python
import momentumx as mx
# Create a producer attached to the context path /my/path
stream = mx.Producer('my_stream', ..., context='/my/path/')
...
# Create Consumer elsewhere attached to the same context of /my/path
stream = mx.Consumer('my_stream', context='/my/path/')
```
### License
Captivation Software, LLC offers **MomentumX** under an **Unlimited Use License to the United States Government**, with **all other parties subject to the GPL-3.0 License**.
### Inquiries / Requests
All inquiries and requests may be sent to <a href="mailto:opensource@captivation.us">opensource@captivation.us</a>.
<sub><sup>
Copyright © 2022-2023 - <a href="https://captivation.us" target="_blank">Captivation Software, LLC</a>.
</sup></sub>
Raw data
{
"_id": null,
"home_page": "https://github.com/captivationsoftware/MomentumX",
"name": "MomentumX",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.6",
"maintainer_email": "",
"keywords": "shm,shared memory,zero-copy,numpy,big data,scipy,pubsub,pipeline",
"author": "Captivation Software, LLC",
"author_email": "",
"download_url": "https://files.pythonhosted.org/packages/8e/72/3ba4f97ce8886acefe039a37bb9bab8c9c8c12d64565e3b87465a20d68ce/MomentumX-2.7.0.tar.gz",
"platform": null,
"description": "# MomentumX\n\n<p align=\"center\">\n <img src=\"https://github.com/captivationsoftware/MomentumX/blob/main/Logo.png?raw=true\" title=\"MomentumX Logo\" />\n <br/>\n <span>\n <strong>MomentumX</strong> is a <strong>zero-copy shared memory IPC</strong> library for building complex <strong>streaming data pipelines</strong> capable of processing <strong>large datasets</strong> using <strong>Python</strong>. \n </span>\n</p>\n\n<br />\n\n### Key Features:\n- High-Throughput, Low Latency\n- Supports **streaming and synchronous** modes for use within a wide variety of use cases. \n- Bring your own encoding, or use **raw binary** data.\n- Sane **data protections** to ensure **reliability of data** in a cooperative computing environment. \n- Pairs with other high-performance libraries, such as **numpy** and **scipy**, to support parallel processing of memory-intensive scientific data.\n- Works on most modern versions of **Linux** using shared memory (via `/dev/shm`).\n- Seamlessly integrates into a **Docker** environment with minimal configuration, and readily enables lightweight container-to-container data sharing. \n\n### Examples:\nBelow are some simplified use cases for common MomentumX workflows. Consult the examples in the `examples/` directory for additional details and implementation guidance.\n\n#### Stream Mode\n```python\n# Producer Process\nimport momentumx as mx\n\n# Create a stream with a total capacity of 10MB (1MB x 10)\nstream = mx.Producer('my_stream', buffer_size=int(1e6), buffer_count=10, sync=False)\n\n# Obtain the next available buffer for writing\nbuffer = stream.next_to_send()\nbuffer.write(b'1') \n\nbuffer.send()\n# NOTE: buffer.send() can also be passed an explicit number of bytes as well. \n# Otherwise an internally managed cursor will be used.\n```\n\n```python\n# Consumer Process(es)\nimport momentumx as mx\n\nstream = mx.Consumer('my_stream')\n\n# Receive from my_stream as long as the stream has not ended OR there are unread buffers \nwhile stream.has_next:\n\n # Block while waiting to receive buffer \n # NOTE: Non-blocking receive is possible using blocking=False keyword argument\n buffer = stream.receive()\n \n # If we are here, either the stream ended OR we have a buffer, so check...\n if buffer is not None:\n\n # We have buffer containing data, so print the entire contents\n print(buffer.read(buffer.data_size))\n \n # See also \"Implicit versus Explicit Buffer Release\" section below.\n```\n\n#### Sync Mode\n```python\n# Producer Process\nimport momentumx as mx\nimport threading\nimport signal\n\ncancel_event = threading.Event()\nsignal.signal(signal.SIGINT, (lambda _sig, _frm: cancel_event.set()))\n\n# Create a stream with a total capacity of 10MB\nstream = mx.Producer(\n 'my_stream', \n buffer_size=int(1e6), \n buffer_count=10, \n sync=True\n) # NOTE: sync set to True\n\nmin_subscribers = 1\n\nwhile stream.subscriber_count < min_subscribers:\n print(\"waiting for subscriber(s)\")\n if cancel_event.wait(0.5):\n break\n\nprint(\"All expected subscribers are ready\")\n\n# Write the series 0-999 to a consumer \nfor n in range(0, 1000):\n if stream.subscriber_count == 0:\n cancel_event.wait(0.5)\n\n # Note: sending strings directly is possible via the send_string call\n elif stream.send_string(str(n)):\n print(f\"Sent: {n}\")\n\n```\n\n```python\n# Consumer Process(es)\nimport momentumx as mx\n\nstream = mx.Consumer('my_stream')\n\nwhile stream.has_next:\n data = stream.receive_string() \n\n if data is not None: \n # Note: receiving strings is possible as well via the receive_string call\n print(f\"Received: {data}\")\n\n```\n\n#### Iterator Syntax\nWorking with buffers is even easier using `iter()` builtin:\n```python\nimport\u00a0momentumx\u00a0as\u00a0mx\n\nstream\u00a0=\u00a0mx.Consumer(STREAM)\n\n# Iterate over buffers in the stream until stream.receive() returns None\nfor\u00a0buffer\u00a0in\u00a0iter(stream.receive,\u00a0None):\u00a0\u00a0\u00a0\u00a0\u00a0\n # Now, buffer is guaranteed to be valid, so no check required - \n # go ahead and print all the contents again, this time using \n # the index and slice operators!\n print(buffer[0]) # print first byte\n print(buffer[1:buffer.data_size]) # print remaining bytes\n\n```\n\n\n#### Numpy Integration\n```python\nimport momentumx as mx\nimport numpy as np\n\n# Create a stream\nstream = mx.Consumer('numpy_stream')\n\n# Receive the next buffer (or if a producer, obtain the next_to_send buffer)\nbuffer = stream.receive()\n\n# Create a numpy array directly from the memory without any copying\nnp_buff = np.frombuffer(buffer, dtype=uint8)\n```\n\n#### Implicit versus Explicit Buffer Release\nMomentumX Consumers will, by default, automatically release a buffer under the covers once all references are destroyed. This promotes both usability and data integrity. However, there may be cases where the developer wants to utilize a different strategy and explicity control when buffers are released to the pool of available buffers.\n\n```python\nstream = mx.Consumer('my_stream')\n\nbuffer = stream.receive()\n\n# Access to buffer is safe!\nbuffer.read(10)\n\n# Buffer is being returned back to available buffer pool. \n# Be sure you are truly done with your data!\nbuffer.release() \n\n# DANGER: DO NOT DO THIS! \n# All operations on a buffer after calling `release` are considered unsafe! \n# All safeguards have been removed and the memory is volatile!\nbuffer.read(10) \n\n\n```\n\n\n#### Isolated Contexts\nMomentumX allows for the usage of streams outside of `/dev/shm` (the default location). Pass the `context` kwarg pointing to a directory on the filesystem for both the `Producer` and all `Consumer` instances to create isolated contexts.\n\nThis option is useful if access to `/dev/shm` is unsuitable.\n\n```python\nimport momentumx as mx\n\n# Create a producer attached to the context path /my/path\nstream = mx.Producer('my_stream', ..., context='/my/path/')\n...\n\n# Create Consumer elsewhere attached to the same context of /my/path\nstream = mx.Consumer('my_stream', context='/my/path/')\n\n```\n\n### License\nCaptivation Software, LLC offers **MomentumX** under an **Unlimited Use License to the United States Government**, with **all other parties subject to the GPL-3.0 License**.\n\n### Inquiries / Requests\nAll inquiries and requests may be sent to <a href=\"mailto:opensource@captivation.us\">opensource@captivation.us</a>.\n\n\n<sub><sup>\n Copyright © 2022-2023 - <a href=\"https://captivation.us\" target=\"_blank\">Captivation Software, LLC</a>.\n</sup></sub>\n",
"bugtrack_url": null,
"license": "",
"summary": "Zero-copy shared memory IPC library for building complex streaming data pipelines capable of processing large datasets",
"version": "2.7.0",
"project_urls": {
"Homepage": "https://github.com/captivationsoftware/MomentumX"
},
"split_keywords": [
"shm",
"shared memory",
"zero-copy",
"numpy",
"big data",
"scipy",
"pubsub",
"pipeline"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "62ba997e834a37f3435f115a4ca2da79cdc4eb6c4fabfa8357312d8d547afe43",
"md5": "fb1184c79469fd1d019d5cd07a088eec",
"sha256": "8c9e2e5d083cfebe6b33b3e986c1e539aa83150add35ff09df79a99143ce9d97"
},
"downloads": -1,
"filename": "MomentumX-2.7.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
"has_sig": false,
"md5_digest": "fb1184c79469fd1d019d5cd07a088eec",
"packagetype": "bdist_wheel",
"python_version": "cp310",
"requires_python": ">=3.6",
"size": 223260,
"upload_time": "2023-05-09T16:26:50",
"upload_time_iso_8601": "2023-05-09T16:26:50.540447Z",
"url": "https://files.pythonhosted.org/packages/62/ba/997e834a37f3435f115a4ca2da79cdc4eb6c4fabfa8357312d8d547afe43/MomentumX-2.7.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "3cb7c503123bac75a97e2f123ba49f50695163c4aaa71368093c47e4c459e905",
"md5": "1ff062b5c5e7615939d9cb391c3ba4bf",
"sha256": "d66021ddf670ad2ef3146a61c37f4a279c77dd4992aae07bb60ee8995a9602aa"
},
"downloads": -1,
"filename": "MomentumX-2.7.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
"has_sig": false,
"md5_digest": "1ff062b5c5e7615939d9cb391c3ba4bf",
"packagetype": "bdist_wheel",
"python_version": "cp311",
"requires_python": ">=3.6",
"size": 223190,
"upload_time": "2023-05-09T16:26:53",
"upload_time_iso_8601": "2023-05-09T16:26:53.212848Z",
"url": "https://files.pythonhosted.org/packages/3c/b7/c503123bac75a97e2f123ba49f50695163c4aaa71368093c47e4c459e905/MomentumX-2.7.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "8909fa08f64444f16ffd6be3f0ad6417667015d2594a34d1c9b02456c2c012e7",
"md5": "0b0d75dce7714ced65d65d003900fbfa",
"sha256": "7377f197a1bb0e5b490e37ddbb6c452e3d75d96466bbf44923568349fda0db42"
},
"downloads": -1,
"filename": "MomentumX-2.7.0-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
"has_sig": false,
"md5_digest": "0b0d75dce7714ced65d65d003900fbfa",
"packagetype": "bdist_wheel",
"python_version": "cp36",
"requires_python": ">=3.6",
"size": 228820,
"upload_time": "2023-05-09T16:26:55",
"upload_time_iso_8601": "2023-05-09T16:26:55.378507Z",
"url": "https://files.pythonhosted.org/packages/89/09/fa08f64444f16ffd6be3f0ad6417667015d2594a34d1c9b02456c2c012e7/MomentumX-2.7.0-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "d9051b7c79916aa7a4f0da30654a036d9bfef856c360fa383ea081d6cceeb943",
"md5": "980570c8aba278b7de124cd2737ea75e",
"sha256": "fa4a58ade3bcb39accc31bb0a156e5264af6b6082806d99810742e9987c1fac6"
},
"downloads": -1,
"filename": "MomentumX-2.7.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
"has_sig": false,
"md5_digest": "980570c8aba278b7de124cd2737ea75e",
"packagetype": "bdist_wheel",
"python_version": "cp37",
"requires_python": ">=3.6",
"size": 228903,
"upload_time": "2023-05-09T16:26:57",
"upload_time_iso_8601": "2023-05-09T16:26:57.244279Z",
"url": "https://files.pythonhosted.org/packages/d9/05/1b7c79916aa7a4f0da30654a036d9bfef856c360fa383ea081d6cceeb943/MomentumX-2.7.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "80c24cc5ad4b38611d5105a929313827e6cd4a6f2c2746447207aa6dec019b21",
"md5": "96cb09faf7b363a9bca97d3a92d10b30",
"sha256": "ab85cb18fa321c73b3c811e09556fbd5f9c5a6cd94035973eb6d3f6c10e93167"
},
"downloads": -1,
"filename": "MomentumX-2.7.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
"has_sig": false,
"md5_digest": "96cb09faf7b363a9bca97d3a92d10b30",
"packagetype": "bdist_wheel",
"python_version": "cp38",
"requires_python": ">=3.6",
"size": 222909,
"upload_time": "2023-05-09T16:26:58",
"upload_time_iso_8601": "2023-05-09T16:26:58.955326Z",
"url": "https://files.pythonhosted.org/packages/80/c2/4cc5ad4b38611d5105a929313827e6cd4a6f2c2746447207aa6dec019b21/MomentumX-2.7.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "3f4c1c6f6c93c61a5484320dcb4bc62fe9dfe8e8b21e2db9f0584b4a52aa7831",
"md5": "0214ae12c0337405cba9cbe8a4cc0b25",
"sha256": "21b4de7d85d1d1fda0532099fe0ba48d2f8b945d629d1c5df6596afae5f8a8a9"
},
"downloads": -1,
"filename": "MomentumX-2.7.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
"has_sig": false,
"md5_digest": "0214ae12c0337405cba9cbe8a4cc0b25",
"packagetype": "bdist_wheel",
"python_version": "cp39",
"requires_python": ">=3.6",
"size": 223095,
"upload_time": "2023-05-09T16:27:00",
"upload_time_iso_8601": "2023-05-09T16:27:00.901030Z",
"url": "https://files.pythonhosted.org/packages/3f/4c/1c6f6c93c61a5484320dcb4bc62fe9dfe8e8b21e2db9f0584b4a52aa7831/MomentumX-2.7.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "8e723ba4f97ce8886acefe039a37bb9bab8c9c8c12d64565e3b87465a20d68ce",
"md5": "e7db82266e566cb82fd8d5bc278bbcc4",
"sha256": "7e86b837a65ca1de11c000d0513f5ff728e06539dd6f71bf4cb6e1dd944a188a"
},
"downloads": -1,
"filename": "MomentumX-2.7.0.tar.gz",
"has_sig": false,
"md5_digest": "e7db82266e566cb82fd8d5bc278bbcc4",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.6",
"size": 51266,
"upload_time": "2023-05-09T16:27:03",
"upload_time_iso_8601": "2023-05-09T16:27:03.766326Z",
"url": "https://files.pythonhosted.org/packages/8e/72/3ba4f97ce8886acefe039a37bb9bab8c9c8c12d64565e3b87465a20d68ce/MomentumX-2.7.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-05-09 16:27:03",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "captivationsoftware",
"github_project": "MomentumX",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "momentumx"
}