# NucliaDB Telemetry
Open telemetry compatible plugin to propagate traceid on FastAPI, Nats and GRPC with Asyncio.
ENV vars:
```
JAEGER_ENABLED = True
JAEGER_HOST = "127.0.0.1"
JAEGER_PORT = server.port
```
On FastAPI you should add:
```python
tracer_provider = get_telemetry("HTTP_SERVICE")
app = FastAPI(title="Test API") # type: ignore
if not tracer_provider.initialized:
await init_telemetry(tracer_provider)
set_global_textmap(B3MultiFormat())
FastAPIInstrumentor.instrument_app(app, tracer_provider=tracer_provider)
..
await init_telemetry(tracer_provider) # To start asyncio task
..
```
On GRPC Server you should add:
```python
tracer_provider = get_telemetry("GRPC_SERVER_SERVICE")
telemetry_grpc = GRPCTelemetry("GRPC_CLIENT_SERVICE", tracer_provider)
if not tracer_provider.initialized:
await init_telemetry(tracer_provider)
set_global_textmap(B3MultiFormat())
server = telemetry_grpc.init_server()
helloworld_pb2_grpc.add_GreeterServicer_to_server(SERVICER, server)
..
await init_telemetry(tracer_provider) # To start asyncio task
..
```
On GRPC Client you should add:
```python
tracer_provider = get_telemetry("GRPC_CLIENT_SERVICE")
telemetry_grpc = GRPCTelemetry("GRPC_CLIENT_SERVICE", tracer_provider)
if not tracer_provider.initialized:
await init_telemetry(tracer_provider)
set_global_textmap(B3MultiFormat())
channel = telemetry_grpc.init_client(f"localhost:{grpc_service}")
stub = helloworld_pb2_grpc.GreeterStub(channel)
..
await init_telemetry(tracer_provider) # To start asyncio task
..
```
On Nats jetstream push subscriber you should add:
```python
nc = await nats.connect(servers=[self.natsd])
js = self.nc.jetstream()
tracer_provider = get_telemetry("NATS_SERVICE")
if not tracer_provider.initialized:
await init_telemetry(tracer_provider)
set_global_textmap(B3MultiFormat())
jsotel = JetStreamContextTelemetry(
js, "NATS_SERVICE", tracer_provider
)
subscription = await jsotel.subscribe(
subject="testing.telemetry",
stream="testing",
cb=handler,
)
```
On Nats publisher you should add:
```python
nc = await nats.connect(servers=[self.natsd])
js = self.nc.jetstream()
tracer_provider = get_telemetry("NATS_SERVICE")
if not tracer_provider.initialized:
await init_telemetry(tracer_provider)
set_global_textmap(B3MultiFormat())
jsotel = JetStreamContextTelemetry(
js, "NATS_SERVICE", tracer_provider
)
await jsotel.publish("testing.telemetry", request.name.encode())
```
On Nats jetstream pull subscription you can use different patterns if you want to
just get one message and exit or pull several ones. For just one message
```python
nc = await nats.connect(servers=[self.natsd])
js = self.nc.jetstream()
tracer_provider = get_telemetry("NATS_SERVICE")
if not tracer_provider.initialized:
await init_telemetry(tracer_provider)
set_global_textmap(B3MultiFormat())
jsotel = JetStreamContextTelemetry(
js, "NATS_SERVICE", tracer_provider
)
# You can use either pull_subscribe or pull_subscribe_bind
subscription = await jsotel.pull_subscribe(
subject="testing.telemetry",
durable="consumer_name"
stream="testing",
)
async def callback(message):
# Do something with your message
# and optionally return something
return True
try:
result = await jsotel.pull_one(subscription, callback)
except errors.TimeoutError
pass
```
For multiple messages just wrap it in a loop:
```python
while True:
try:
result = await jsotel.pull_one(subscription, callback)
except errors.TimeoutError
pass
```
On Nats client (NO Jestream! ) publisher you should add:
```python
nc = await nats.connect(servers=[self.natsd])
js = self.nc.jetstream()
tracer_provider = get_telemetry("NATS_SERVICE")
if not tracer_provider.initialized:
await init_telemetry(tracer_provider)
set_global_textmap(B3MultiFormat())
ncotel = NatsClientTelemetry(
nc, "NATS_SERVICE", tracer_provider
)
await ncotel.publish("testing.telemetry", request.name.encode())
```
On Nats client (NO Jestream! ) subscriber you should add:
```python
nc = await nats.connect(servers=[self.natsd])
js = self.nc.jetstream()
tracer_provider = get_telemetry("NATS_SERVICE")
if not tracer_provider.initialized:
await init_telemetry(tracer_provider)
set_global_textmap(B3MultiFormat())
ncotel = NatsClientContextTelemetry(
js, "NATS_SERVICE", tracer_provider
)
subscription = await ncotel.subscribe(
subject="testing.telemetry",
queue="queue_nname",
cb=handler,
)
```
On Nats client (NO Jestream! ) request you should add:
```python
nc = await nats.connect(servers=[self.natsd])
js = self.nc.jetstream()
tracer_provider = get_telemetry("NATS_SERVICE")
if not tracer_provider.initialized:
await init_telemetry(tracer_provider)
set_global_textmap(B3MultiFormat())
ncotel = NatsClientTelemetry(
nc, "NATS_SERVICE", tracer_provider
)
response = await ncotel.request("testing.telemetry", request.name.encode())
```
And to handle responses on the other side, you can use the same pattern as in plain Nats client
subscriber, just adding the `msg.respond()` on the handler when done
Raw data
{
"_id": null,
"home_page": "https://github.com/nuclia/nucliadb",
"name": "nucliadb-telemetry",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9",
"maintainer_email": null,
"keywords": null,
"author": "nucliadb Authors",
"author_email": "nucliadb@nuclia.com",
"download_url": null,
"platform": null,
"description": "# NucliaDB Telemetry\n\nOpen telemetry compatible plugin to propagate traceid on FastAPI, Nats and GRPC with Asyncio.\n\nENV vars:\n\n```\n JAEGER_ENABLED = True\n JAEGER_HOST = \"127.0.0.1\"\n JAEGER_PORT = server.port\n```\n\nOn FastAPI you should add:\n\n```python\n tracer_provider = get_telemetry(\"HTTP_SERVICE\")\n app = FastAPI(title=\"Test API\") # type: ignore\n if not tracer_provider.initialized:\n await init_telemetry(tracer_provider)\n\n set_global_textmap(B3MultiFormat())\n FastAPIInstrumentor.instrument_app(app, tracer_provider=tracer_provider)\n\n ..\n await init_telemetry(tracer_provider) # To start asyncio task\n ..\n\n```\n\nOn GRPC Server you should add:\n\n```python\n tracer_provider = get_telemetry(\"GRPC_SERVER_SERVICE\")\n telemetry_grpc = GRPCTelemetry(\"GRPC_CLIENT_SERVICE\", tracer_provider)\n if not tracer_provider.initialized:\n await init_telemetry(tracer_provider)\n\n set_global_textmap(B3MultiFormat())\n server = telemetry_grpc.init_server()\n helloworld_pb2_grpc.add_GreeterServicer_to_server(SERVICER, server)\n\n ..\n await init_telemetry(tracer_provider) # To start asyncio task\n ..\n```\n\nOn GRPC Client you should add:\n\n```python\n tracer_provider = get_telemetry(\"GRPC_CLIENT_SERVICE\")\n telemetry_grpc = GRPCTelemetry(\"GRPC_CLIENT_SERVICE\", tracer_provider)\n if not tracer_provider.initialized:\n await init_telemetry(tracer_provider)\n\n set_global_textmap(B3MultiFormat())\n channel = telemetry_grpc.init_client(f\"localhost:{grpc_service}\")\n stub = helloworld_pb2_grpc.GreeterStub(channel)\n\n ..\n await init_telemetry(tracer_provider) # To start asyncio task\n ..\n\n```\n\nOn Nats jetstream push subscriber you should add:\n\n```python\n nc = await nats.connect(servers=[self.natsd])\n js = self.nc.jetstream()\n tracer_provider = get_telemetry(\"NATS_SERVICE\")\n if not tracer_provider.initialized:\n await init_telemetry(tracer_provider)\n set_global_textmap(B3MultiFormat())\n jsotel = JetStreamContextTelemetry(\n js, \"NATS_SERVICE\", tracer_provider\n )\n\n subscription = await jsotel.subscribe(\n subject=\"testing.telemetry\",\n stream=\"testing\",\n cb=handler,\n )\n\n```\n\nOn Nats publisher you should add:\n\n```python\n nc = await nats.connect(servers=[self.natsd])\n js = self.nc.jetstream()\n tracer_provider = get_telemetry(\"NATS_SERVICE\")\n if not tracer_provider.initialized:\n await init_telemetry(tracer_provider)\n\n set_global_textmap(B3MultiFormat())\n jsotel = JetStreamContextTelemetry(\n js, \"NATS_SERVICE\", tracer_provider\n )\n\n await jsotel.publish(\"testing.telemetry\", request.name.encode())\n\n```\n\n\nOn Nats jetstream pull subscription you can use different patterns if you want to\njust get one message and exit or pull several ones. For just one message\n\n```python\n nc = await nats.connect(servers=[self.natsd])\n js = self.nc.jetstream()\n tracer_provider = get_telemetry(\"NATS_SERVICE\")\n if not tracer_provider.initialized:\n await init_telemetry(tracer_provider)\n set_global_textmap(B3MultiFormat())\n jsotel = JetStreamContextTelemetry(\n js, \"NATS_SERVICE\", tracer_provider\n )\n\n # You can use either pull_subscribe or pull_subscribe_bind\n subscription = await jsotel.pull_subscribe(\n subject=\"testing.telemetry\",\n durable=\"consumer_name\"\n stream=\"testing\",\n )\n\n async def callback(message):\n # Do something with your message\n # and optionally return something\n return True\n\n try:\n result = await jsotel.pull_one(subscription, callback)\n except errors.TimeoutError\n pass\n\n```\nFor multiple messages just wrap it in a loop:\n\n```python\n while True:\n try:\n result = await jsotel.pull_one(subscription, callback)\n except errors.TimeoutError\n pass\n\n```\n\n\nOn Nats client (NO Jestream! ) publisher you should add:\n\n```python\n nc = await nats.connect(servers=[self.natsd])\n js = self.nc.jetstream()\n tracer_provider = get_telemetry(\"NATS_SERVICE\")\n if not tracer_provider.initialized:\n await init_telemetry(tracer_provider)\n\n set_global_textmap(B3MultiFormat())\n ncotel = NatsClientTelemetry(\n nc, \"NATS_SERVICE\", tracer_provider\n )\n\n await ncotel.publish(\"testing.telemetry\", request.name.encode())\n\n```\n\nOn Nats client (NO Jestream! ) subscriber you should add:\n\n```python\n nc = await nats.connect(servers=[self.natsd])\n js = self.nc.jetstream()\n tracer_provider = get_telemetry(\"NATS_SERVICE\")\n if not tracer_provider.initialized:\n await init_telemetry(tracer_provider)\n set_global_textmap(B3MultiFormat())\n ncotel = NatsClientContextTelemetry(\n js, \"NATS_SERVICE\", tracer_provider\n )\n\n subscription = await ncotel.subscribe(\n subject=\"testing.telemetry\",\n queue=\"queue_nname\",\n cb=handler,\n )\n\n```\n\n\nOn Nats client (NO Jestream! ) request you should add:\n\n```python\n nc = await nats.connect(servers=[self.natsd])\n js = self.nc.jetstream()\n tracer_provider = get_telemetry(\"NATS_SERVICE\")\n if not tracer_provider.initialized:\n await init_telemetry(tracer_provider)\n\n set_global_textmap(B3MultiFormat())\n ncotel = NatsClientTelemetry(\n nc, \"NATS_SERVICE\", tracer_provider\n )\n\n response = await ncotel.request(\"testing.telemetry\", request.name.encode())\n\n```\n\nAnd to handle responses on the other side, you can use the same pattern as in plain Nats client\nsubscriber, just adding the `msg.respond()` on the handler when done\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "NucliaDB Telemetry Library Python process",
"version": "4.0.0.post515",
"project_urls": {
"Homepage": "https://github.com/nuclia/nucliadb"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "b2dfe4e17feb0256a8dcc0dbc87c2a27220bd7b10f2f7a9cdc82cdb5898d4042",
"md5": "1faee139eb8b2401dde15f95041a5b9d",
"sha256": "89f5c0792031ebdbefcb757fd44c314cfb89d1c9a822bbe4a3e0c4196efec6f1"
},
"downloads": -1,
"filename": "nucliadb_telemetry-4.0.0.post515-py3-none-any.whl",
"has_sig": false,
"md5_digest": "1faee139eb8b2401dde15f95041a5b9d",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.9",
"size": 73950,
"upload_time": "2024-05-17T14:24:40",
"upload_time_iso_8601": "2024-05-17T14:24:40.998059Z",
"url": "https://files.pythonhosted.org/packages/b2/df/e4e17feb0256a8dcc0dbc87c2a27220bd7b10f2f7a9cdc82cdb5898d4042/nucliadb_telemetry-4.0.0.post515-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-05-17 14:24:40",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "nuclia",
"github_project": "nucliadb",
"travis_ci": false,
"coveralls": true,
"github_actions": true,
"lcname": "nucliadb-telemetry"
}