MomentumX


NameMomentumX JSON
Version 2.7.0 PyPI version JSON
download
home_pagehttps://github.com/captivationsoftware/MomentumX
SummaryZero-copy shared memory IPC library for building complex streaming data pipelines capable of processing large datasets
upload_time2023-05-09 16:27:03
maintainer
docs_urlNone
authorCaptivation Software, LLC
requires_python>=3.6
license
keywords shm shared memory zero-copy numpy big data scipy pubsub pipeline
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # MomentumX

<p align="center">
    <img src="https://github.com/captivationsoftware/MomentumX/blob/main/Logo.png?raw=true" title="MomentumX Logo" />
    <br/>
    <span>
        <strong>MomentumX</strong> is a <strong>zero-copy shared memory IPC</strong> library for building complex <strong>streaming data pipelines</strong> capable of processing <strong>large datasets</strong> using <strong>Python</strong>. 
    </span>
</p>

<br />

### Key Features:
- High-Throughput, Low Latency
- Supports **streaming and synchronous** modes for use within a wide variety of use cases. 
- Bring your own encoding, or use **raw binary** data.
- Sane **data protections** to ensure **reliability of data** in a cooperative computing environment. 
- Pairs with other high-performance libraries, such as **numpy** and **scipy**, to support parallel processing of memory-intensive scientific data.
- Works on most modern versions of **Linux** using shared memory (via `/dev/shm`).
- Seamlessly integrates into a **Docker** environment with minimal configuration, and readily enables lightweight container-to-container data sharing. 

### Examples:
Below are some simplified use cases for common MomentumX workflows. Consult the examples in the `examples/` directory for additional details and implementation guidance.

#### Stream Mode
```python
# Producer Process
import momentumx as mx

# Create a stream with a total capacity of 10MB (1MB x 10)
stream = mx.Producer('my_stream', buffer_size=int(1e6), buffer_count=10, sync=False)

# Obtain the next available buffer for writing
buffer = stream.next_to_send()
buffer.write(b'1') 

buffer.send()
# NOTE: buffer.send() can also be passed an explicit number of bytes as well. 
# Otherwise an internally managed cursor will be used.
```

```python
# Consumer Process(es)
import momentumx as mx

stream = mx.Consumer('my_stream')

# Receive from my_stream as long as the stream has not ended OR there are unread buffers 
while stream.has_next:

    # Block while waiting to receive buffer 
    # NOTE: Non-blocking receive is possible using blocking=False keyword argument
    buffer = stream.receive()
    
    # If we are here, either the stream ended OR we have a buffer, so check...
    if buffer is not None:

        # We have buffer containing data, so print the entire contents
        print(buffer.read(buffer.data_size))
    
        # See also "Implicit versus Explicit Buffer Release" section below.
```

#### Sync Mode
```python
# Producer Process
import momentumx as mx
import threading
import signal

cancel_event = threading.Event()
signal.signal(signal.SIGINT, (lambda _sig, _frm: cancel_event.set()))

# Create a stream with a total capacity of 10MB
stream = mx.Producer(
    'my_stream', 
    buffer_size=int(1e6), 
    buffer_count=10, 
    sync=True
) # NOTE: sync set to True

min_subscribers = 1

while stream.subscriber_count < min_subscribers:
    print("waiting for subscriber(s)")
    if cancel_event.wait(0.5):
        break

print("All expected subscribers are ready")

# Write the series 0-999 to a consumer 
for n in range(0, 1000):
    if stream.subscriber_count == 0:
        cancel_event.wait(0.5)

    # Note: sending strings directly is possible via the send_string call
    elif stream.send_string(str(n)):
        print(f"Sent: {n}")

```

```python
# Consumer Process(es)
import momentumx as mx

stream = mx.Consumer('my_stream')

while stream.has_next:
    data = stream.receive_string() 

    if data is not None: 
        # Note: receiving strings is possible as well via the receive_string call
        print(f"Received: {data}")

```

#### Iterator Syntax
Working with buffers is even easier using `iter()` builtin:
```python
import momentumx as mx

stream = mx.Consumer(STREAM)

# Iterate over buffers in the stream until stream.receive() returns None
for buffer in iter(stream.receive, None):     
    # Now, buffer is guaranteed to be valid, so no check required -  
    # go ahead and print all the contents again, this time using 
    # the index and slice operators!
    print(buffer[0])                    # print first byte
    print(buffer[1:buffer.data_size])   # print remaining bytes

```


#### Numpy Integration
```python
import momentumx as mx
import numpy as np

# Create a stream
stream = mx.Consumer('numpy_stream')

# Receive the next buffer (or if a producer, obtain the next_to_send buffer)
buffer = stream.receive()

# Create a numpy array directly from the memory without any copying
np_buff = np.frombuffer(buffer, dtype=uint8)
```

#### Implicit versus Explicit Buffer Release
MomentumX Consumers will, by default, automatically release a buffer under the covers once all references are destroyed. This promotes both usability and data integrity. However, there may be cases where the developer wants to utilize a different strategy and explicity control when buffers are released to the pool of available buffers.

```python
stream = mx.Consumer('my_stream')

buffer = stream.receive()

# Access to buffer is safe!
buffer.read(10)

# Buffer is being returned back to available buffer pool. 
# Be sure you are truly done with your data!
buffer.release() 

# DANGER: DO NOT DO THIS! 
# All operations on a buffer after calling `release` are considered unsafe! 
# All safeguards have been removed and the memory is volatile!
buffer.read(10) 


```


#### Isolated Contexts
MomentumX allows for the usage of streams outside of `/dev/shm` (the default location). Pass the `context` kwarg pointing to a directory on the filesystem for both the `Producer` and all `Consumer` instances to create isolated contexts.

This option is useful if access to `/dev/shm` is unsuitable.

```python
import momentumx as mx

# Create a producer attached to the context path /my/path
stream = mx.Producer('my_stream', ..., context='/my/path/')
...

# Create Consumer elsewhere attached to the same context of /my/path
stream = mx.Consumer('my_stream', context='/my/path/')

```

### License
Captivation Software, LLC offers **MomentumX** under an **Unlimited Use License to the United States Government**, with **all other parties subject to the GPL-3.0 License**.

### Inquiries / Requests
All inquiries and requests may be sent to <a href="mailto:opensource@captivation.us">opensource@captivation.us</a>.


<sub><sup>
    Copyright &copy; 2022-2023 - <a href="https://captivation.us" target="_blank">Captivation Software, LLC</a>.
</sup></sub>

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/captivationsoftware/MomentumX",
    "name": "MomentumX",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.6",
    "maintainer_email": "",
    "keywords": "shm,shared memory,zero-copy,numpy,big data,scipy,pubsub,pipeline",
    "author": "Captivation Software, LLC",
    "author_email": "",
    "download_url": "https://files.pythonhosted.org/packages/8e/72/3ba4f97ce8886acefe039a37bb9bab8c9c8c12d64565e3b87465a20d68ce/MomentumX-2.7.0.tar.gz",
    "platform": null,
    "description": "# MomentumX\n\n<p align=\"center\">\n    <img src=\"https://github.com/captivationsoftware/MomentumX/blob/main/Logo.png?raw=true\" title=\"MomentumX Logo\" />\n    <br/>\n    <span>\n        <strong>MomentumX</strong> is a <strong>zero-copy shared memory IPC</strong> library for building complex <strong>streaming data pipelines</strong> capable of processing <strong>large datasets</strong> using <strong>Python</strong>. \n    </span>\n</p>\n\n<br />\n\n### Key Features:\n- High-Throughput, Low Latency\n- Supports **streaming and synchronous** modes for use within a wide variety of use cases. \n- Bring your own encoding, or use **raw binary** data.\n- Sane **data protections** to ensure **reliability of data** in a cooperative computing environment. \n- Pairs with other high-performance libraries, such as **numpy** and **scipy**, to support parallel processing of memory-intensive scientific data.\n- Works on most modern versions of **Linux** using shared memory (via `/dev/shm`).\n- Seamlessly integrates into a **Docker** environment with minimal configuration, and readily enables lightweight container-to-container data sharing. \n\n### Examples:\nBelow are some simplified use cases for common MomentumX workflows. Consult the examples in the `examples/` directory for additional details and implementation guidance.\n\n#### Stream Mode\n```python\n# Producer Process\nimport momentumx as mx\n\n# Create a stream with a total capacity of 10MB (1MB x 10)\nstream = mx.Producer('my_stream', buffer_size=int(1e6), buffer_count=10, sync=False)\n\n# Obtain the next available buffer for writing\nbuffer = stream.next_to_send()\nbuffer.write(b'1') \n\nbuffer.send()\n# NOTE: buffer.send() can also be passed an explicit number of bytes as well. \n# Otherwise an internally managed cursor will be used.\n```\n\n```python\n# Consumer Process(es)\nimport momentumx as mx\n\nstream = mx.Consumer('my_stream')\n\n# Receive from my_stream as long as the stream has not ended OR there are unread buffers \nwhile stream.has_next:\n\n    # Block while waiting to receive buffer \n    # NOTE: Non-blocking receive is possible using blocking=False keyword argument\n    buffer = stream.receive()\n    \n    # If we are here, either the stream ended OR we have a buffer, so check...\n    if buffer is not None:\n\n        # We have buffer containing data, so print the entire contents\n        print(buffer.read(buffer.data_size))\n    \n        # See also \"Implicit versus Explicit Buffer Release\" section below.\n```\n\n#### Sync Mode\n```python\n# Producer Process\nimport momentumx as mx\nimport threading\nimport signal\n\ncancel_event = threading.Event()\nsignal.signal(signal.SIGINT, (lambda _sig, _frm: cancel_event.set()))\n\n# Create a stream with a total capacity of 10MB\nstream = mx.Producer(\n    'my_stream', \n    buffer_size=int(1e6), \n    buffer_count=10, \n    sync=True\n) # NOTE: sync set to True\n\nmin_subscribers = 1\n\nwhile stream.subscriber_count < min_subscribers:\n    print(\"waiting for subscriber(s)\")\n    if cancel_event.wait(0.5):\n        break\n\nprint(\"All expected subscribers are ready\")\n\n# Write the series 0-999 to a consumer \nfor n in range(0, 1000):\n    if stream.subscriber_count == 0:\n        cancel_event.wait(0.5)\n\n    # Note: sending strings directly is possible via the send_string call\n    elif stream.send_string(str(n)):\n        print(f\"Sent: {n}\")\n\n```\n\n```python\n# Consumer Process(es)\nimport momentumx as mx\n\nstream = mx.Consumer('my_stream')\n\nwhile stream.has_next:\n    data = stream.receive_string() \n\n    if data is not None: \n        # Note: receiving strings is possible as well via the receive_string call\n        print(f\"Received: {data}\")\n\n```\n\n#### Iterator Syntax\nWorking with buffers is even easier using `iter()` builtin:\n```python\nimport\u00a0momentumx\u00a0as\u00a0mx\n\nstream\u00a0=\u00a0mx.Consumer(STREAM)\n\n# Iterate over buffers in the stream until stream.receive() returns None\nfor\u00a0buffer\u00a0in\u00a0iter(stream.receive,\u00a0None):\u00a0\u00a0\u00a0\u00a0\u00a0\n    # Now, buffer is guaranteed to be valid, so no check required -  \n    # go ahead and print all the contents again, this time using \n    # the index and slice operators!\n    print(buffer[0])                    # print first byte\n    print(buffer[1:buffer.data_size])   # print remaining bytes\n\n```\n\n\n#### Numpy Integration\n```python\nimport momentumx as mx\nimport numpy as np\n\n# Create a stream\nstream = mx.Consumer('numpy_stream')\n\n# Receive the next buffer (or if a producer, obtain the next_to_send buffer)\nbuffer = stream.receive()\n\n# Create a numpy array directly from the memory without any copying\nnp_buff = np.frombuffer(buffer, dtype=uint8)\n```\n\n#### Implicit versus Explicit Buffer Release\nMomentumX Consumers will, by default, automatically release a buffer under the covers once all references are destroyed. This promotes both usability and data integrity. However, there may be cases where the developer wants to utilize a different strategy and explicity control when buffers are released to the pool of available buffers.\n\n```python\nstream = mx.Consumer('my_stream')\n\nbuffer = stream.receive()\n\n# Access to buffer is safe!\nbuffer.read(10)\n\n# Buffer is being returned back to available buffer pool. \n# Be sure you are truly done with your data!\nbuffer.release() \n\n# DANGER: DO NOT DO THIS! \n# All operations on a buffer after calling `release` are considered unsafe! \n# All safeguards have been removed and the memory is volatile!\nbuffer.read(10) \n\n\n```\n\n\n#### Isolated Contexts\nMomentumX allows for the usage of streams outside of `/dev/shm` (the default location). Pass the `context` kwarg pointing to a directory on the filesystem for both the `Producer` and all `Consumer` instances to create isolated contexts.\n\nThis option is useful if access to `/dev/shm` is unsuitable.\n\n```python\nimport momentumx as mx\n\n# Create a producer attached to the context path /my/path\nstream = mx.Producer('my_stream', ..., context='/my/path/')\n...\n\n# Create Consumer elsewhere attached to the same context of /my/path\nstream = mx.Consumer('my_stream', context='/my/path/')\n\n```\n\n### License\nCaptivation Software, LLC offers **MomentumX** under an **Unlimited Use License to the United States Government**, with **all other parties subject to the GPL-3.0 License**.\n\n### Inquiries / Requests\nAll inquiries and requests may be sent to <a href=\"mailto:opensource@captivation.us\">opensource@captivation.us</a>.\n\n\n<sub><sup>\n    Copyright &copy; 2022-2023 - <a href=\"https://captivation.us\" target=\"_blank\">Captivation Software, LLC</a>.\n</sup></sub>\n",
    "bugtrack_url": null,
    "license": "",
    "summary": "Zero-copy shared memory IPC library for building complex streaming data pipelines capable of processing large datasets",
    "version": "2.7.0",
    "project_urls": {
        "Homepage": "https://github.com/captivationsoftware/MomentumX"
    },
    "split_keywords": [
        "shm",
        "shared memory",
        "zero-copy",
        "numpy",
        "big data",
        "scipy",
        "pubsub",
        "pipeline"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "62ba997e834a37f3435f115a4ca2da79cdc4eb6c4fabfa8357312d8d547afe43",
                "md5": "fb1184c79469fd1d019d5cd07a088eec",
                "sha256": "8c9e2e5d083cfebe6b33b3e986c1e539aa83150add35ff09df79a99143ce9d97"
            },
            "downloads": -1,
            "filename": "MomentumX-2.7.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
            "has_sig": false,
            "md5_digest": "fb1184c79469fd1d019d5cd07a088eec",
            "packagetype": "bdist_wheel",
            "python_version": "cp310",
            "requires_python": ">=3.6",
            "size": 223260,
            "upload_time": "2023-05-09T16:26:50",
            "upload_time_iso_8601": "2023-05-09T16:26:50.540447Z",
            "url": "https://files.pythonhosted.org/packages/62/ba/997e834a37f3435f115a4ca2da79cdc4eb6c4fabfa8357312d8d547afe43/MomentumX-2.7.0-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "3cb7c503123bac75a97e2f123ba49f50695163c4aaa71368093c47e4c459e905",
                "md5": "1ff062b5c5e7615939d9cb391c3ba4bf",
                "sha256": "d66021ddf670ad2ef3146a61c37f4a279c77dd4992aae07bb60ee8995a9602aa"
            },
            "downloads": -1,
            "filename": "MomentumX-2.7.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
            "has_sig": false,
            "md5_digest": "1ff062b5c5e7615939d9cb391c3ba4bf",
            "packagetype": "bdist_wheel",
            "python_version": "cp311",
            "requires_python": ">=3.6",
            "size": 223190,
            "upload_time": "2023-05-09T16:26:53",
            "upload_time_iso_8601": "2023-05-09T16:26:53.212848Z",
            "url": "https://files.pythonhosted.org/packages/3c/b7/c503123bac75a97e2f123ba49f50695163c4aaa71368093c47e4c459e905/MomentumX-2.7.0-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "8909fa08f64444f16ffd6be3f0ad6417667015d2594a34d1c9b02456c2c012e7",
                "md5": "0b0d75dce7714ced65d65d003900fbfa",
                "sha256": "7377f197a1bb0e5b490e37ddbb6c452e3d75d96466bbf44923568349fda0db42"
            },
            "downloads": -1,
            "filename": "MomentumX-2.7.0-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
            "has_sig": false,
            "md5_digest": "0b0d75dce7714ced65d65d003900fbfa",
            "packagetype": "bdist_wheel",
            "python_version": "cp36",
            "requires_python": ">=3.6",
            "size": 228820,
            "upload_time": "2023-05-09T16:26:55",
            "upload_time_iso_8601": "2023-05-09T16:26:55.378507Z",
            "url": "https://files.pythonhosted.org/packages/89/09/fa08f64444f16ffd6be3f0ad6417667015d2594a34d1c9b02456c2c012e7/MomentumX-2.7.0-cp36-cp36m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "d9051b7c79916aa7a4f0da30654a036d9bfef856c360fa383ea081d6cceeb943",
                "md5": "980570c8aba278b7de124cd2737ea75e",
                "sha256": "fa4a58ade3bcb39accc31bb0a156e5264af6b6082806d99810742e9987c1fac6"
            },
            "downloads": -1,
            "filename": "MomentumX-2.7.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
            "has_sig": false,
            "md5_digest": "980570c8aba278b7de124cd2737ea75e",
            "packagetype": "bdist_wheel",
            "python_version": "cp37",
            "requires_python": ">=3.6",
            "size": 228903,
            "upload_time": "2023-05-09T16:26:57",
            "upload_time_iso_8601": "2023-05-09T16:26:57.244279Z",
            "url": "https://files.pythonhosted.org/packages/d9/05/1b7c79916aa7a4f0da30654a036d9bfef856c360fa383ea081d6cceeb943/MomentumX-2.7.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "80c24cc5ad4b38611d5105a929313827e6cd4a6f2c2746447207aa6dec019b21",
                "md5": "96cb09faf7b363a9bca97d3a92d10b30",
                "sha256": "ab85cb18fa321c73b3c811e09556fbd5f9c5a6cd94035973eb6d3f6c10e93167"
            },
            "downloads": -1,
            "filename": "MomentumX-2.7.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
            "has_sig": false,
            "md5_digest": "96cb09faf7b363a9bca97d3a92d10b30",
            "packagetype": "bdist_wheel",
            "python_version": "cp38",
            "requires_python": ">=3.6",
            "size": 222909,
            "upload_time": "2023-05-09T16:26:58",
            "upload_time_iso_8601": "2023-05-09T16:26:58.955326Z",
            "url": "https://files.pythonhosted.org/packages/80/c2/4cc5ad4b38611d5105a929313827e6cd4a6f2c2746447207aa6dec019b21/MomentumX-2.7.0-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "3f4c1c6f6c93c61a5484320dcb4bc62fe9dfe8e8b21e2db9f0584b4a52aa7831",
                "md5": "0214ae12c0337405cba9cbe8a4cc0b25",
                "sha256": "21b4de7d85d1d1fda0532099fe0ba48d2f8b945d629d1c5df6596afae5f8a8a9"
            },
            "downloads": -1,
            "filename": "MomentumX-2.7.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
            "has_sig": false,
            "md5_digest": "0214ae12c0337405cba9cbe8a4cc0b25",
            "packagetype": "bdist_wheel",
            "python_version": "cp39",
            "requires_python": ">=3.6",
            "size": 223095,
            "upload_time": "2023-05-09T16:27:00",
            "upload_time_iso_8601": "2023-05-09T16:27:00.901030Z",
            "url": "https://files.pythonhosted.org/packages/3f/4c/1c6f6c93c61a5484320dcb4bc62fe9dfe8e8b21e2db9f0584b4a52aa7831/MomentumX-2.7.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "8e723ba4f97ce8886acefe039a37bb9bab8c9c8c12d64565e3b87465a20d68ce",
                "md5": "e7db82266e566cb82fd8d5bc278bbcc4",
                "sha256": "7e86b837a65ca1de11c000d0513f5ff728e06539dd6f71bf4cb6e1dd944a188a"
            },
            "downloads": -1,
            "filename": "MomentumX-2.7.0.tar.gz",
            "has_sig": false,
            "md5_digest": "e7db82266e566cb82fd8d5bc278bbcc4",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.6",
            "size": 51266,
            "upload_time": "2023-05-09T16:27:03",
            "upload_time_iso_8601": "2023-05-09T16:27:03.766326Z",
            "url": "https://files.pythonhosted.org/packages/8e/72/3ba4f97ce8886acefe039a37bb9bab8c9c8c12d64565e3b87465a20d68ce/MomentumX-2.7.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-05-09 16:27:03",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "captivationsoftware",
    "github_project": "MomentumX",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "momentumx"
}
        
Elapsed time: 1.08800s