gcn-kafka


Namegcn-kafka JSON
Version 0.3.3 PyPI version JSON
download
home_page
SummaryKafka client for NASA's General Coordinates Network (GCN)
upload_time2023-08-23 17:08:41
maintainer
docs_urlNone
author
requires_python>=3.7
licenseCC0-1.0
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            [![PyPI](https://img.shields.io/pypi/v/gcn-kafka)](https://pypi.org/project/gcn-kafka/)
[![codecov](https://codecov.io/gh/nasa-gcn/gcn-kafka-python/branch/main/graph/badge.svg?token=KSFUD0LETW)](https://codecov.io/gh/nasa-gcn/gcn-kafka-python)

# GCN Kafka Client for Python

This is the official Python client for the [General Coordinates Network (GCN)](https://gcn.nasa.gov). It is a very lightweight wrapper around [confluent-kafka-python](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html).

## To Install

Run this command to install with [pip](https://pip.pypa.io/):

```
pip install gcn-kafka
```

or this command to install with with [conda](https://docs.conda.io/):

```
conda install -c conda-forge gcn-kafka
```

## To Use

Create a consumer.

```python
from gcn_kafka import Consumer
consumer = Consumer(client_id='fill me in',
                    client_secret='fill me in')
```

List all topics:

```python
print(consumer.list_topics().topics)
```

Subscribe to topics and receive alerts:

```python
consumer.subscribe(['gcn.classic.text.FERMI_GBM_FIN_POS',
                    'gcn.classic.text.LVC_INITIAL'])
while True:
    for message in consumer.consume(timeout=1):
        if message.error():
            print(message.error())
            continue
        print(message.value())
```

The `timeout` argument to `consume()`, given as an integer number of seconds,
will allow the program to exit quickly once it has reached the end of the
existing message buffer. This is useful for users who just want to recover an
older message from the stream. `timeout` will also make the `while True`
infinite loop interruptible via the standard ctrl-c key sequence, which
`consume()` ignores.

## Testing and Development Kafka Clusters

GCN has three Kafka clusters: production, testing, and an internal development deployment. Use the optional `domain` keyword argument to select which broker to connect to.

```python
# Production (default)
consumer = Consumer(client_id='fill me in',
                    client_secret='fill me in',
                    domain='gcn.nasa.gov')

# Testing
consumer = Consumer(client_id='fill me in',
                    client_secret='fill me in',
                    domain='test.gcn.nasa.gov')

# Development (internal)
consumer = Consumer(client_id='fill me in',
                    client_secret='fill me in',
                    domain='dev.gcn.nasa.gov')
```

## FAQ

**How can I keep track of the last read message when restarting a client?**

A key feature of kafka consumer clients is the ability to perform persistent tracking of which messages have been read. This allows clients to recover missed messages after a   restart by beginning at the earliest unread message rather than the next available message from the stream. In order to enable this feature, you will need to set a client Group ID using the configuration dictionary argument for the Consumer class as well as change the auto offset reset option to the ‘earliest’ setting. Once this is done, every new client with the given Group ID will begin reading the specified topic at the earliest unread message. When doing this, it is recommended to turn OFF the auto commit feature because it can lose track of the last read message if the client crashes before the auto commit interval (5 seconds by default) occurs. Manually committing messages (i.e. storing the state of the last read message) once they are read is the most robust method for tracking the last read message.

Example code: 
```python3
from gcn_kafka import Consumer

config = {'group.id': 'my group name',
          'auto.offset.reset': 'earliest',
          'enable.auto.commit': False}

consumer = Consumer(config=config,
                    client_id='fill me in',
                    client_secret='fill me in',
                    domain='gcn.nasa.gov')

topics = ['gcn.classic.voevent.FERMI_GBM_SUBTHRESH']
consumer.subscribe(topics)

while True:
    for message in consumer.consume(timeout=1):
        print(message.value())
        consumer.commit(message)
```

**How can I read messages beginning at the earliest available messages for a given stream?**

You can begin reading a given topic stream from the earliest message that is present in the stream buffer by setting the Group ID to an empty string and applying the ‘earliest’ setting for the auto offset reset option in the configuration dictionary argument for the Consumer class. This feature allows the user to scan for older messages for testing purposes or to recover messages that may have been missed due to a crash or network outage. Just keep in mind that the stream buffers are finite in size. They currently hold messages from the past few days.

Example code:
```python3
from gcn_kafka import Consumer

config = {'auto.offset.reset': 'earliest'}

consumer = Consumer(config=config,
                    client_id='fill me in',
                    client_secret='fill me in',
                    domain='gcn.nasa.gov')

topics = ['gcn.classic.voevent.INTEGRAL_SPIACS']
consumer.subscribe(topics)

while True:
    for message in consumer.consume(timeout=1):
        print(message.value())
```

**How can I search for messages occurring within a given date range?**

To search for messages in a given date range, you can use the `offsets_for_times()` function from the Consumer class to get the message offsets for the desired date range. You can then assign the starting offset to the Consumer and read the desired number of messages. When doing so, keep in mind that the stream buffers are finite in size. It is not possible to recover messages prior to the start of the stream buffer. The GCN stream buffers are currently set to hold messages from the past few days.

Example code:
```python3
import datetime
from gcn_kafka import Consumer
from confluent_kafka import TopicPartition

consumer = Consumer(client_id='fill me in',
                    client_secret='fill me in',
                    domain='gcn.nasa.gov')

# get messages occurring 3 days ago
timestamp1 = int((datetime.datetime.now() - datetime.timedelta(days=3)).timestamp() * 1000)
timestamp2 = timestamp1 + 86400000 # +1 day

topic = 'gcn.classic.voevent.INTEGRAL_SPIACS'
start = consumer.offsets_for_times(
    [TopicPartition(topic, 0, timestamp1)])
end = consumer.offsets_for_times(
    [TopicPartition(topic, 0, timestamp2)])

consumer.assign(start)
for message in consumer.consume(end[0].offset - start[0].offset, timeout=1):
    print(message.value())
```

## Known Issues

### confluent-kafka-python
If you use confluent-kafka-python v2.1.0 or v2.1.1 with librdkafka v2.1.1 you will encounter a segmentation fault when subscribed to unavailable topics.

Please refer to [the confluent-kafka-python github issue](https://github.com/confluentinc/confluent-kafka-python/issues/1547) for updates on the issue.

            

Raw data

            {
    "_id": null,
    "home_page": "",
    "name": "gcn-kafka",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.7",
    "maintainer_email": "",
    "keywords": "",
    "author": "",
    "author_email": "Leo Singer <leo.p.singer@nasa.gov>, Tom Barclay <tb@umbc.edu>, Eric Burns <ericburns@lsu.edu>",
    "download_url": "https://files.pythonhosted.org/packages/c2/13/b8e7f23a768c13fdb6353e29d999e8ef636cb2a2161615233bebb4c24c97/gcn-kafka-0.3.3.tar.gz",
    "platform": null,
    "description": "[![PyPI](https://img.shields.io/pypi/v/gcn-kafka)](https://pypi.org/project/gcn-kafka/)\n[![codecov](https://codecov.io/gh/nasa-gcn/gcn-kafka-python/branch/main/graph/badge.svg?token=KSFUD0LETW)](https://codecov.io/gh/nasa-gcn/gcn-kafka-python)\n\n# GCN Kafka Client for Python\n\nThis is the official Python client for the [General Coordinates Network (GCN)](https://gcn.nasa.gov). It is a very lightweight wrapper around [confluent-kafka-python](https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html).\n\n## To Install\n\nRun this command to install with [pip](https://pip.pypa.io/):\n\n```\npip install gcn-kafka\n```\n\nor this command to install with with [conda](https://docs.conda.io/):\n\n```\nconda install -c conda-forge gcn-kafka\n```\n\n## To Use\n\nCreate a consumer.\n\n```python\nfrom gcn_kafka import Consumer\nconsumer = Consumer(client_id='fill me in',\n                    client_secret='fill me in')\n```\n\nList all topics:\n\n```python\nprint(consumer.list_topics().topics)\n```\n\nSubscribe to topics and receive alerts:\n\n```python\nconsumer.subscribe(['gcn.classic.text.FERMI_GBM_FIN_POS',\n                    'gcn.classic.text.LVC_INITIAL'])\nwhile True:\n    for message in consumer.consume(timeout=1):\n        if message.error():\n            print(message.error())\n            continue\n        print(message.value())\n```\n\nThe `timeout` argument to `consume()`, given as an integer number of seconds,\nwill allow the program to exit quickly once it has reached the end of the\nexisting message buffer. This is useful for users who just want to recover an\nolder message from the stream. `timeout` will also make the `while True`\ninfinite loop interruptible via the standard ctrl-c key sequence, which\n`consume()` ignores.\n\n## Testing and Development Kafka Clusters\n\nGCN has three Kafka clusters: production, testing, and an internal development deployment. Use the optional `domain` keyword argument to select which broker to connect to.\n\n```python\n# Production (default)\nconsumer = Consumer(client_id='fill me in',\n                    client_secret='fill me in',\n                    domain='gcn.nasa.gov')\n\n# Testing\nconsumer = Consumer(client_id='fill me in',\n                    client_secret='fill me in',\n                    domain='test.gcn.nasa.gov')\n\n# Development (internal)\nconsumer = Consumer(client_id='fill me in',\n                    client_secret='fill me in',\n                    domain='dev.gcn.nasa.gov')\n```\n\n## FAQ\n\n**How can I keep track of the last read message when restarting a client?**\n\nA key feature of kafka consumer clients is the ability to perform persistent tracking of which messages have been read. This allows clients to recover missed messages after a   restart by beginning at the earliest unread message rather than the next available message from the stream. In order to enable this feature, you will need to set a client Group ID using the configuration dictionary argument for the Consumer class as well as change the auto offset reset option to the \u2018earliest\u2019 setting. Once this is done, every new client with the given Group ID will begin reading the specified topic at the earliest unread message. When doing this, it is recommended to turn OFF the auto commit feature because it can lose track of the last read message if the client crashes before the auto commit interval (5 seconds by default) occurs. Manually committing messages (i.e. storing the state of the last read message) once they are read is the most robust method for tracking the last read message.\n\nExample code: \n```python3\nfrom gcn_kafka import Consumer\n\nconfig = {'group.id': 'my group name',\n          'auto.offset.reset': 'earliest',\n          'enable.auto.commit': False}\n\nconsumer = Consumer(config=config,\n                    client_id='fill me in',\n                    client_secret='fill me in',\n                    domain='gcn.nasa.gov')\n\ntopics = ['gcn.classic.voevent.FERMI_GBM_SUBTHRESH']\nconsumer.subscribe(topics)\n\nwhile True:\n    for message in consumer.consume(timeout=1):\n        print(message.value())\n        consumer.commit(message)\n```\n\n**How can I read messages beginning at the earliest available messages for a given stream?**\n\nYou can begin reading a given topic stream from the earliest message that is present in the stream buffer by setting the Group ID to an empty string and applying the \u2018earliest\u2019 setting for the auto offset reset option in the configuration dictionary argument for the Consumer class. This feature allows the user to scan for older messages for testing purposes or to recover messages that may have been missed due to a crash or network outage. Just keep in mind that the stream buffers are finite in size. They currently hold messages from the past few days.\n\nExample code:\n```python3\nfrom gcn_kafka import Consumer\n\nconfig = {'auto.offset.reset': 'earliest'}\n\nconsumer = Consumer(config=config,\n                    client_id='fill me in',\n                    client_secret='fill me in',\n                    domain='gcn.nasa.gov')\n\ntopics = ['gcn.classic.voevent.INTEGRAL_SPIACS']\nconsumer.subscribe(topics)\n\nwhile True:\n    for message in consumer.consume(timeout=1):\n        print(message.value())\n```\n\n**How can I search for messages occurring within a given date range?**\n\nTo search for messages in a given date range, you can use the `offsets_for_times()` function from the Consumer class to get the message offsets for the desired date range. You can then assign the starting offset to the Consumer and read the desired number of messages. When doing so, keep in mind that the stream buffers are finite in size. It is not possible to recover messages prior to the start of the stream buffer. The GCN stream buffers are currently set to hold messages from the past few days.\n\nExample code:\n```python3\nimport datetime\nfrom gcn_kafka import Consumer\nfrom confluent_kafka import TopicPartition\n\nconsumer = Consumer(client_id='fill me in',\n                    client_secret='fill me in',\n                    domain='gcn.nasa.gov')\n\n# get messages occurring 3 days ago\ntimestamp1 = int((datetime.datetime.now() - datetime.timedelta(days=3)).timestamp() * 1000)\ntimestamp2 = timestamp1 + 86400000 # +1 day\n\ntopic = 'gcn.classic.voevent.INTEGRAL_SPIACS'\nstart = consumer.offsets_for_times(\n    [TopicPartition(topic, 0, timestamp1)])\nend = consumer.offsets_for_times(\n    [TopicPartition(topic, 0, timestamp2)])\n\nconsumer.assign(start)\nfor message in consumer.consume(end[0].offset - start[0].offset, timeout=1):\n    print(message.value())\n```\n\n## Known Issues\n\n### confluent-kafka-python\nIf you use confluent-kafka-python v2.1.0 or v2.1.1 with librdkafka v2.1.1 you will encounter a segmentation fault when subscribed to unavailable topics.\n\nPlease refer to [the confluent-kafka-python github issue](https://github.com/confluentinc/confluent-kafka-python/issues/1547) for updates on the issue.\n",
    "bugtrack_url": null,
    "license": "CC0-1.0",
    "summary": "Kafka client for NASA's General Coordinates Network (GCN)",
    "version": "0.3.3",
    "project_urls": {
        "source": "https://github.com/nasa-gcn/gcn-kafka-python"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "637ee1f160b7a27c831047b9791243f8f2e9ba5f6f83eb445e36e687e718ea2e",
                "md5": "70ee9c527eba5ed7cc1bdb4b997b66f4",
                "sha256": "98917938158df945431f9882899ecf145f8610879d7dfc26de6e13aa31c5e414"
            },
            "downloads": -1,
            "filename": "gcn_kafka-0.3.3-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "70ee9c527eba5ed7cc1bdb4b997b66f4",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.7",
            "size": 11659,
            "upload_time": "2023-08-23T17:08:40",
            "upload_time_iso_8601": "2023-08-23T17:08:40.060193Z",
            "url": "https://files.pythonhosted.org/packages/63/7e/e1f160b7a27c831047b9791243f8f2e9ba5f6f83eb445e36e687e718ea2e/gcn_kafka-0.3.3-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "c213b8e7f23a768c13fdb6353e29d999e8ef636cb2a2161615233bebb4c24c97",
                "md5": "98a69365987458597e57ee2f8ee54cb9",
                "sha256": "9a0a165fa6446676d7d785c46b1a0bb6097dffd19d7975fcbc4e58098522d60a"
            },
            "downloads": -1,
            "filename": "gcn-kafka-0.3.3.tar.gz",
            "has_sig": false,
            "md5_digest": "98a69365987458597e57ee2f8ee54cb9",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.7",
            "size": 12950,
            "upload_time": "2023-08-23T17:08:41",
            "upload_time_iso_8601": "2023-08-23T17:08:41.818995Z",
            "url": "https://files.pythonhosted.org/packages/c2/13/b8e7f23a768c13fdb6353e29d999e8ef636cb2a2161615233bebb4c24c97/gcn-kafka-0.3.3.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-08-23 17:08:41",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "nasa-gcn",
    "github_project": "gcn-kafka-python",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "tox": true,
    "lcname": "gcn-kafka"
}
        
Elapsed time: 0.12915s