# faststream-compressors
A middleware for the FastStream framework to support message compression.
## ⚠️ Note: RPC Limitation
Due to a bug in FastStream, middleware does not run after receiving a response from the broker, preventing message
decompression when using RPC. I've reported this issue to the FastStream developers, and we're hoping for a fix soon.
In the meantime, you can register separate routers for RPC and Pub/Sub:
- **RPC Router**: Enable only the DecompressionMiddleware.
- **Pub/Sub Router**: Enable both DecompressionMiddleware and CompressionMiddleware.
## Example
```python
from faststream.nats import NatsBroker
from faststream_compressors.compressors import GzipCompressor, GzipDecompressor
from faststream_compressors.middlewares import CompressionMiddleware
from faststream_compressors.middlewares.nats import NatsDecompressionMiddleware
broker = NatsBroker(
middlewares=(
# Compression methods used for compressing messages.
# The order in which compressors are specified matters.
CompressionMiddleware.make_middleware(compressors=GzipCompressor()),
# Your other middlewares here
# Compression methods used for decompressing messages.
# The order does not matter here
NatsDecompressionMiddleware.make_middleware(decompressors=GzipDecompressor()),
)
)
```
| Broker | Is Supported? | Middleware |
|--------|---------------|-----------------------------------------------------------------------|
| NATS | ✅ | `faststream_compressors.middlewares.nats.NatsDecompressionMiddleware` |
| Other | ❌ | |
You can submit a pull request to add support for decompression middleware for your broker. I expect that FastStream
will update its middleware API soon, allowing us to create a universal middleware for each broker. For now, only
NATS is supported.
| Compression Method | Is Supported? | Compressor | Extra Dependency |
|--------------------|---------------|---------------------------------------------------------------------------------------------------------------------|-------------------------------|
| gzip | ✅ | `faststream_compressors.compressors.GzipCompressor`<br/>`faststream_compressors.compressors.GzipDecompressor` | |
| lz4 | ✅ | `faststream_compressors.compressors.lz4.Lz4Compressor`<br/>`faststream_compressors.compressors.lz4.Lz4Decompressor` | `faststream-compressors[lz4]` |
| Other | ❌ | | |
You can submit a pull request to add support for your compression method or use your custom algorithm that adheres to
the BaseCompressor interface.
```python
from faststream import FastStream, Header
from faststream.nats import NatsBroker
from faststream_compressors.compressors import BaseCompressor
from faststream_compressors.middlewares import CompressionMiddleware
from faststream_compressors.middlewares.nats import NatsDecompressionMiddleware
class MyCompressor(BaseCompressor):
ENCODING = "xor1"
def __call__(self, data: bytes) -> bytes:
return bytes(byte ^ 1 for byte in data)
broker = NatsBroker(
middlewares=(
CompressionMiddleware.make_middleware(compressors=MyCompressor()),
NatsDecompressionMiddleware.make_middleware(decompressors=MyCompressor()),
)
)
app = FastStream(broker)
@broker.subscriber("my-subject")
async def my_handler(data: str, encoding: str = Header("content-encoding")):
print(data, encoding)
@app.after_startup
async def ping():
await broker.publish("My secret message", "my-subject")
```
Raw data
{
"_id": null,
"home_page": "https://github.com/ulbwa/faststream-compressors",
"name": "faststream-compressors",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.11",
"maintainer_email": null,
"keywords": null,
"author": "\u0443\u043b\u044c\u0431\u0430",
"author_email": "ulbwa@icloud.com",
"download_url": "https://files.pythonhosted.org/packages/c6/fe/b553e2e57c903385a968e68c0b87563a65e15eb4a70fcfec28e2f9baf1c1/faststream_compressors-0.2.1.tar.gz",
"platform": null,
"description": "# faststream-compressors\n\nA middleware for the FastStream framework to support message compression.\n\n## \u26a0\ufe0f Note: RPC Limitation\n\nDue to a bug in FastStream, middleware does not run after receiving a response from the broker, preventing message \ndecompression when using RPC. I've reported this issue to the FastStream developers, and we're hoping for a fix soon.\n\nIn the meantime, you can register separate routers for RPC and Pub/Sub:\n\n - **RPC Router**: Enable only the DecompressionMiddleware.\n - **Pub/Sub Router**: Enable both DecompressionMiddleware and CompressionMiddleware.\n\n## Example\n\n```python\nfrom faststream.nats import NatsBroker\n\nfrom faststream_compressors.compressors import GzipCompressor, GzipDecompressor\nfrom faststream_compressors.middlewares import CompressionMiddleware\nfrom faststream_compressors.middlewares.nats import NatsDecompressionMiddleware\n\n\nbroker = NatsBroker( \n middlewares=(\n # Compression methods used for compressing messages.\n # The order in which compressors are specified matters.\n CompressionMiddleware.make_middleware(compressors=GzipCompressor()),\n \n # Your other middlewares here\n\n # Compression methods used for decompressing messages.\n # The order does not matter here\n NatsDecompressionMiddleware.make_middleware(decompressors=GzipDecompressor()),\n )\n)\n```\n\n| Broker | Is Supported? | Middleware |\n|--------|---------------|-----------------------------------------------------------------------|\n| NATS | \u2705 | `faststream_compressors.middlewares.nats.NatsDecompressionMiddleware` |\n| Other | \u274c | |\n\nYou can submit a pull request to add support for decompression middleware for your broker. I expect that FastStream \nwill update its middleware API soon, allowing us to create a universal middleware for each broker. For now, only \nNATS is supported.\n\n| Compression Method | Is Supported? | Compressor | Extra Dependency |\n|--------------------|---------------|---------------------------------------------------------------------------------------------------------------------|-------------------------------| \n| gzip | \u2705 | `faststream_compressors.compressors.GzipCompressor`<br/>`faststream_compressors.compressors.GzipDecompressor` | |\n| lz4 | \u2705 | `faststream_compressors.compressors.lz4.Lz4Compressor`<br/>`faststream_compressors.compressors.lz4.Lz4Decompressor` | `faststream-compressors[lz4]` |\n| Other | \u274c | | |\n\nYou can submit a pull request to add support for your compression method or use your custom algorithm that adheres to\nthe BaseCompressor interface.\n\n```python\nfrom faststream import FastStream, Header\nfrom faststream.nats import NatsBroker\n\nfrom faststream_compressors.compressors import BaseCompressor\nfrom faststream_compressors.middlewares import CompressionMiddleware\nfrom faststream_compressors.middlewares.nats import NatsDecompressionMiddleware\n\n\nclass MyCompressor(BaseCompressor):\n ENCODING = \"xor1\"\n\n def __call__(self, data: bytes) -> bytes:\n return bytes(byte ^ 1 for byte in data)\n\n\nbroker = NatsBroker(\n middlewares=(\n CompressionMiddleware.make_middleware(compressors=MyCompressor()),\n NatsDecompressionMiddleware.make_middleware(decompressors=MyCompressor()),\n )\n)\napp = FastStream(broker)\n\n\n@broker.subscriber(\"my-subject\")\nasync def my_handler(data: str, encoding: str = Header(\"content-encoding\")):\n print(data, encoding)\n\n\n@app.after_startup\nasync def ping():\n await broker.publish(\"My secret message\", \"my-subject\")\n```\n",
"bugtrack_url": null,
"license": null,
"summary": "A middleware for the FastStream framework to support message compression.",
"version": "0.2.1",
"project_urls": {
"Homepage": "https://github.com/ulbwa/faststream-compressors",
"Repository": "https://github.com/ulbwa/faststream-compressors"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "beab4fd754b5743420177563d7497d387302ca5cb07bdd55ca4e5e03dd5e0293",
"md5": "8a4978cb289e6888688086229c112afe",
"sha256": "a6212ac6ac21e05df2fd7f26a3f49bed202865544575fac911f70ad0c8c8e36f"
},
"downloads": -1,
"filename": "faststream_compressors-0.2.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "8a4978cb289e6888688086229c112afe",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.11",
"size": 7579,
"upload_time": "2024-08-24T05:43:38",
"upload_time_iso_8601": "2024-08-24T05:43:38.687622Z",
"url": "https://files.pythonhosted.org/packages/be/ab/4fd754b5743420177563d7497d387302ca5cb07bdd55ca4e5e03dd5e0293/faststream_compressors-0.2.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "c6feb553e2e57c903385a968e68c0b87563a65e15eb4a70fcfec28e2f9baf1c1",
"md5": "8dd99308958e34521348f5271ac222e3",
"sha256": "493b5122d4b69265d322f30195adcfa2891b90b315be1b38b5ad3aeacd61bb2f"
},
"downloads": -1,
"filename": "faststream_compressors-0.2.1.tar.gz",
"has_sig": false,
"md5_digest": "8dd99308958e34521348f5271ac222e3",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.11",
"size": 5909,
"upload_time": "2024-08-24T05:43:40",
"upload_time_iso_8601": "2024-08-24T05:43:40.203711Z",
"url": "https://files.pythonhosted.org/packages/c6/fe/b553e2e57c903385a968e68c0b87563a65e15eb4a70fcfec28e2f9baf1c1/faststream_compressors-0.2.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-08-24 05:43:40",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "ulbwa",
"github_project": "faststream-compressors",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "faststream-compressors"
}