pykafka


Namepykafka JSON
Version 2.8.0 PyPI version JSON
download
home_pagehttps://github.com/Parsely/pykafka
SummaryFull-Featured Pure-Python Kafka Client
upload_time2018-09-24 23:17:50
maintainer
docs_urlNone
authorKeith Bourgoin and Emmett Butler
requires_python
licenseApache License 2.0
keywords apache kafka client driver
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI
coveralls test coverage
            .. image:: https://travis-ci.org/Parsely/pykafka.svg?branch=master
    :target: https://travis-ci.org/Parsely/pykafka
.. image:: https://codecov.io/github/Parsely/pykafka/coverage.svg?branch=master
    :target: https://codecov.io/github/Parsely/pykafka?branch=master

PyKafka
=======

.. image:: http://i.imgur.com/ztYl4lG.jpg

PyKafka is a programmer-friendly Kafka client for Python. It includes Python
implementations of Kafka producers and consumers, which are optionally backed
by a C extension built on `librdkafka`_. It runs under Python 2.7+, Python 3.4+,
and PyPy, and supports versions of Kafka 0.8.2 and newer.

.. _librdkafka: https://github.com/edenhill/librdkafka

PyKafka's primary goal is to provide a similar level of abstraction to the
`JVM Kafka client`_ using idioms familiar to Python programmers and exposing
the most Pythonic API possible.

You can install PyKafka from PyPI with

::

    $ pip install pykafka

or from conda-forge with

::

    $ conda install -c conda-forge pykafka

Full documentation and usage examples for PyKafka can be found on `readthedocs`_.

You can install PyKafka for local development and testing by cloning this repository and
running

::

    $ python setup.py develop

.. _JVM Kafka client: https://github.com/apache/kafka/tree/0.8.2/clients/src/main/java/org/apache/kafka
.. _readthedocs: http://pykafka.readthedocs.org/en/latest/

Getting Started
---------------

Assuming you have at least one Kafka instance running on localhost, you can use PyKafka
to connect to it.

.. sourcecode:: python

    >>> from pykafka import KafkaClient
    >>> client = KafkaClient(hosts="127.0.0.1:9092,127.0.0.1:9093,...")

Or, for a TLS connection, you might write (and also see ``SslConfig`` docs
for further details):

.. sourcecode:: python

    >>> from pykafka import KafkaClient, SslConfig
    >>> config = SslConfig(cafile='/your/ca.cert',
    ...                    certfile='/your/client.cert',  # optional
    ...                    keyfile='/your/client.key',  # optional
    ...                    password='unlock my client key please')  # optional
    >>> client = KafkaClient(hosts="127.0.0.1:<ssl-port>,...",
    ...                      ssl_config=config)

If the cluster you've connected to has any topics defined on it, you can list
them with:

.. sourcecode:: python

    >>> client.topics
    >>> topic = client.topics['my.test']

Once you've got a `Topic`, you can create a `Producer` for it and start
producing messages.

.. sourcecode:: python

    >>> with topic.get_sync_producer() as producer:
    ...     for i in range(4):
    ...         producer.produce('test message ' + str(i ** 2))

The example above would produce to kafka synchronously - the call only
returns after we have confirmation that the message made it to the cluster.

To achieve higher throughput, we recommend using the ``Producer`` in
asynchronous mode, so that ``produce()`` calls will return immediately and the
producer may opt to send messages in larger batches. The ``Producer`` collects
produced messages in an internal queue for ``linger_ms`` before sending each batch.
This delay can be removed or changed at the expense of efficiency with ``linger_ms``,
``min_queued_messages``, and other keyword arguments (see `readthedocs`_). You can still obtain
delivery confirmation for messages, through a queue interface which can be
enabled by setting ``delivery_reports=True``.  Here's a rough usage example:

.. sourcecode:: python

    >>> with topic.get_producer(delivery_reports=True) as producer:
    ...     count = 0
    ...     while True:
    ...         count += 1
    ...         producer.produce('test msg', partition_key='{}'.format(count))
    ...         if count % 10 ** 5 == 0:  # adjust this or bring lots of RAM ;)
    ...             while True:
    ...                 try:
    ...                     msg, exc = producer.get_delivery_report(block=False)
    ...                     if exc is not None:
    ...                         print 'Failed to deliver msg {}: {}'.format(
    ...                             msg.partition_key, repr(exc))
    ...                     else:
    ...                         print 'Successfully delivered msg {}'.format(
    ...                         msg.partition_key)
    ...                 except Queue.Empty:
    ...                     break

Note that the delivery report queue is thread-local: it will only serve reports
for messages which were produced from the current thread. Also, if you're using
`delivery_reports=True`, failing to consume the delivery report queue will cause
PyKafka's memory usage to grow unbounded.

You can also consume messages from this topic using a `Consumer` instance.

.. sourcecode:: python

    >>> consumer = topic.get_simple_consumer()
    >>> for message in consumer:
    ...     if message is not None:
    ...         print message.offset, message.value
    0 test message 0
    1 test message 1
    2 test message 4
    3 test message 9

This `SimpleConsumer` doesn't scale - if you have two `SimpleConsumers`
consuming the same topic, they will receive duplicate messages. To get around
this, you can use the `BalancedConsumer`.

.. sourcecode:: python

    >>> balanced_consumer = topic.get_balanced_consumer(
    ...     consumer_group='testgroup',
    ...     auto_commit_enable=True,
    ...     zookeeper_connect='myZkClusterNode1.com:2181,myZkClusterNode2.com:2181/myZkChroot'
    ... )

You can have as many `BalancedConsumer` instances consuming a topic as that
topic has partitions. If they are all connected to the same zookeeper instance,
they will communicate with it to automatically balance the partitions between
themselves. The partition assignment strategy used by the `BalancedConsumer` is
the "range" strategy by default. The strategy is switchable via the `membership_protocol`
keyword argument, and can be either an object exposed by `pykafka.membershipprotocol` or
a custom instance of `pykafka.membershipprotocol.GroupMembershipProtocol`.

You can also use the Kafka 0.9 Group Membership API with the ``managed``
keyword argument on ``get_balanced_consumer``.

Using the librdkafka extension
------------------------------

PyKafka includes a C extension that makes use of librdkafka to speed up producer
and consumer operation. To use the librdkafka extension, you need to make sure the header
files and shared library are somewhere where python can find them, both when you build
the extension (which is taken care of by ``setup.py develop``) and at run time.
Typically, this means that you need to either install librdkafka in a place
conventional for your system, or declare ``C_INCLUDE_PATH``, ``LIBRARY_PATH``,
and ``LD_LIBRARY_PATH`` in your shell environment to point to the installation location
of the librdkafka shared objects. You can find this location with `locate librdkafka.so`.

After that, all that's needed is that you pass an extra parameter
``use_rdkafka=True`` to ``topic.get_producer()``,
``topic.get_simple_consumer()``, or ``topic.get_balanced_consumer()``.  Note
that some configuration options may have different optimal values; it may be
worthwhile to consult librdkafka's `configuration notes`_ for this.

.. _0.9.1: https://github.com/edenhill/librdkafka/releases/tag/0.9.1
.. _configuration notes: https://github.com/edenhill/librdkafka/blob/0.9.1/CONFIGURATION.md

Operational Tools
-----------------

PyKafka includes a small collection of `CLI tools`_ that can help with common tasks
related to the administration of a Kafka cluster, including offset and lag monitoring and
topic inspection. The full, up-to-date interface for these tools can be fould by running

.. sourcecode:: sh

    $ python cli/kafka_tools.py --help

or after installing PyKafka via setuptools or pip:

.. sourcecode:: sh

    $ kafka-tools --help

.. _CLI tools: https://github.com/Parsely/pykafka/blob/master/pykafka/cli/kafka_tools.py

PyKafka or kafka-python?
------------------------

These are two different projects.
See `the discussion here <https://github.com/Parsely/pykafka/issues/334>`_ for comparisons
between the two projects.

Contributing
------------

If you're interested in contributing code to PyKafka, a good place to start is the
`"help wanted"`_ issue tag. We also recommend taking a look at the `contribution guide`_.

.. _"help wanted": https://github.com/Parsely/pykafka/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22

Support
-------

If you need help using PyKafka, there are a bunch of resources available.
For usage questions or common recipes, check out the `StackOverflow tag`_.
The `Google Group`_ can be useful for more in-depth questions or inquries
you'd like to send directly to the PyKafka maintainers. If you believe you've
found a bug in PyKafka, please open a `github issue`_ after reading the
`contribution guide`_.

.. _StackOverflow tag: https://stackoverflow.com/questions/tagged/pykafka
.. _github issue: https://github.com/Parsely/pykafka/issues
.. _Google Group: https://groups.google.com/forum/#!forum/pykafka-user
.. _contribution guide: https://github.com/Parsely/pykafka/blob/master/CONTRIBUTING.rst
            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/Parsely/pykafka",
    "name": "pykafka",
    "maintainer": "",
    "docs_url": null,
    "requires_python": "",
    "maintainer_email": "",
    "keywords": "apache kafka client driver",
    "author": "Keith Bourgoin and Emmett Butler",
    "author_email": "pykafka-user@googlegroups.com",
    "download_url": "https://files.pythonhosted.org/packages/55/4b/4828ec5ed766cca0c27de234688122494c5762965e70deeb88b84f5d8d98/pykafka-2.8.0.tar.gz",
    "platform": "",
    "description": ".. image:: https://travis-ci.org/Parsely/pykafka.svg?branch=master\n    :target: https://travis-ci.org/Parsely/pykafka\n.. image:: https://codecov.io/github/Parsely/pykafka/coverage.svg?branch=master\n    :target: https://codecov.io/github/Parsely/pykafka?branch=master\n\nPyKafka\n=======\n\n.. image:: http://i.imgur.com/ztYl4lG.jpg\n\nPyKafka is a programmer-friendly Kafka client for Python. It includes Python\nimplementations of Kafka producers and consumers, which are optionally backed\nby a C extension built on `librdkafka`_. It runs under Python 2.7+, Python 3.4+,\nand PyPy, and supports versions of Kafka 0.8.2 and newer.\n\n.. _librdkafka: https://github.com/edenhill/librdkafka\n\nPyKafka's primary goal is to provide a similar level of abstraction to the\n`JVM Kafka client`_ using idioms familiar to Python programmers and exposing\nthe most Pythonic API possible.\n\nYou can install PyKafka from PyPI with\n\n::\n\n    $ pip install pykafka\n\nor from conda-forge with\n\n::\n\n    $ conda install -c conda-forge pykafka\n\nFull documentation and usage examples for PyKafka can be found on `readthedocs`_.\n\nYou can install PyKafka for local development and testing by cloning this repository and\nrunning\n\n::\n\n    $ python setup.py develop\n\n.. _JVM Kafka client: https://github.com/apache/kafka/tree/0.8.2/clients/src/main/java/org/apache/kafka\n.. _readthedocs: http://pykafka.readthedocs.org/en/latest/\n\nGetting Started\n---------------\n\nAssuming you have at least one Kafka instance running on localhost, you can use PyKafka\nto connect to it.\n\n.. sourcecode:: python\n\n    >>> from pykafka import KafkaClient\n    >>> client = KafkaClient(hosts=\"127.0.0.1:9092,127.0.0.1:9093,...\")\n\nOr, for a TLS connection, you might write (and also see ``SslConfig`` docs\nfor further details):\n\n.. sourcecode:: python\n\n    >>> from pykafka import KafkaClient, SslConfig\n    >>> config = SslConfig(cafile='/your/ca.cert',\n    ...                    certfile='/your/client.cert',  # optional\n    ...                    keyfile='/your/client.key',  # optional\n    ...                    password='unlock my client key please')  # optional\n    >>> client = KafkaClient(hosts=\"127.0.0.1:<ssl-port>,...\",\n    ...                      ssl_config=config)\n\nIf the cluster you've connected to has any topics defined on it, you can list\nthem with:\n\n.. sourcecode:: python\n\n    >>> client.topics\n    >>> topic = client.topics['my.test']\n\nOnce you've got a `Topic`, you can create a `Producer` for it and start\nproducing messages.\n\n.. sourcecode:: python\n\n    >>> with topic.get_sync_producer() as producer:\n    ...     for i in range(4):\n    ...         producer.produce('test message ' + str(i ** 2))\n\nThe example above would produce to kafka synchronously - the call only\nreturns after we have confirmation that the message made it to the cluster.\n\nTo achieve higher throughput, we recommend using the ``Producer`` in\nasynchronous mode, so that ``produce()`` calls will return immediately and the\nproducer may opt to send messages in larger batches. The ``Producer`` collects\nproduced messages in an internal queue for ``linger_ms`` before sending each batch.\nThis delay can be removed or changed at the expense of efficiency with ``linger_ms``,\n``min_queued_messages``, and other keyword arguments (see `readthedocs`_). You can still obtain\ndelivery confirmation for messages, through a queue interface which can be\nenabled by setting ``delivery_reports=True``.  Here's a rough usage example:\n\n.. sourcecode:: python\n\n    >>> with topic.get_producer(delivery_reports=True) as producer:\n    ...     count = 0\n    ...     while True:\n    ...         count += 1\n    ...         producer.produce('test msg', partition_key='{}'.format(count))\n    ...         if count % 10 ** 5 == 0:  # adjust this or bring lots of RAM ;)\n    ...             while True:\n    ...                 try:\n    ...                     msg, exc = producer.get_delivery_report(block=False)\n    ...                     if exc is not None:\n    ...                         print 'Failed to deliver msg {}: {}'.format(\n    ...                             msg.partition_key, repr(exc))\n    ...                     else:\n    ...                         print 'Successfully delivered msg {}'.format(\n    ...                         msg.partition_key)\n    ...                 except Queue.Empty:\n    ...                     break\n\nNote that the delivery report queue is thread-local: it will only serve reports\nfor messages which were produced from the current thread. Also, if you're using\n`delivery_reports=True`, failing to consume the delivery report queue will cause\nPyKafka's memory usage to grow unbounded.\n\nYou can also consume messages from this topic using a `Consumer` instance.\n\n.. sourcecode:: python\n\n    >>> consumer = topic.get_simple_consumer()\n    >>> for message in consumer:\n    ...     if message is not None:\n    ...         print message.offset, message.value\n    0 test message 0\n    1 test message 1\n    2 test message 4\n    3 test message 9\n\nThis `SimpleConsumer` doesn't scale - if you have two `SimpleConsumers`\nconsuming the same topic, they will receive duplicate messages. To get around\nthis, you can use the `BalancedConsumer`.\n\n.. sourcecode:: python\n\n    >>> balanced_consumer = topic.get_balanced_consumer(\n    ...     consumer_group='testgroup',\n    ...     auto_commit_enable=True,\n    ...     zookeeper_connect='myZkClusterNode1.com:2181,myZkClusterNode2.com:2181/myZkChroot'\n    ... )\n\nYou can have as many `BalancedConsumer` instances consuming a topic as that\ntopic has partitions. If they are all connected to the same zookeeper instance,\nthey will communicate with it to automatically balance the partitions between\nthemselves. The partition assignment strategy used by the `BalancedConsumer` is\nthe \"range\" strategy by default. The strategy is switchable via the `membership_protocol`\nkeyword argument, and can be either an object exposed by `pykafka.membershipprotocol` or\na custom instance of `pykafka.membershipprotocol.GroupMembershipProtocol`.\n\nYou can also use the Kafka 0.9 Group Membership API with the ``managed``\nkeyword argument on ``get_balanced_consumer``.\n\nUsing the librdkafka extension\n------------------------------\n\nPyKafka includes a C extension that makes use of librdkafka to speed up producer\nand consumer operation. To use the librdkafka extension, you need to make sure the header\nfiles and shared library are somewhere where python can find them, both when you build\nthe extension (which is taken care of by ``setup.py develop``) and at run time.\nTypically, this means that you need to either install librdkafka in a place\nconventional for your system, or declare ``C_INCLUDE_PATH``, ``LIBRARY_PATH``,\nand ``LD_LIBRARY_PATH`` in your shell environment to point to the installation location\nof the librdkafka shared objects. You can find this location with `locate librdkafka.so`.\n\nAfter that, all that's needed is that you pass an extra parameter\n``use_rdkafka=True`` to ``topic.get_producer()``,\n``topic.get_simple_consumer()``, or ``topic.get_balanced_consumer()``.  Note\nthat some configuration options may have different optimal values; it may be\nworthwhile to consult librdkafka's `configuration notes`_ for this.\n\n.. _0.9.1: https://github.com/edenhill/librdkafka/releases/tag/0.9.1\n.. _configuration notes: https://github.com/edenhill/librdkafka/blob/0.9.1/CONFIGURATION.md\n\nOperational Tools\n-----------------\n\nPyKafka includes a small collection of `CLI tools`_ that can help with common tasks\nrelated to the administration of a Kafka cluster, including offset and lag monitoring and\ntopic inspection. The full, up-to-date interface for these tools can be fould by running\n\n.. sourcecode:: sh\n\n    $ python cli/kafka_tools.py --help\n\nor after installing PyKafka via setuptools or pip:\n\n.. sourcecode:: sh\n\n    $ kafka-tools --help\n\n.. _CLI tools: https://github.com/Parsely/pykafka/blob/master/pykafka/cli/kafka_tools.py\n\nPyKafka or kafka-python?\n------------------------\n\nThese are two different projects.\nSee `the discussion here <https://github.com/Parsely/pykafka/issues/334>`_ for comparisons\nbetween the two projects.\n\nContributing\n------------\n\nIf you're interested in contributing code to PyKafka, a good place to start is the\n`\"help wanted\"`_ issue tag. We also recommend taking a look at the `contribution guide`_.\n\n.. _\"help wanted\": https://github.com/Parsely/pykafka/issues?q=is%3Aopen+is%3Aissue+label%3A%22help+wanted%22\n\nSupport\n-------\n\nIf you need help using PyKafka, there are a bunch of resources available.\nFor usage questions or common recipes, check out the `StackOverflow tag`_.\nThe `Google Group`_ can be useful for more in-depth questions or inquries\nyou'd like to send directly to the PyKafka maintainers. If you believe you've\nfound a bug in PyKafka, please open a `github issue`_ after reading the\n`contribution guide`_.\n\n.. _StackOverflow tag: https://stackoverflow.com/questions/tagged/pykafka\n.. _github issue: https://github.com/Parsely/pykafka/issues\n.. _Google Group: https://groups.google.com/forum/#!forum/pykafka-user\n.. _contribution guide: https://github.com/Parsely/pykafka/blob/master/CONTRIBUTING.rst",
    "bugtrack_url": null,
    "license": "Apache License 2.0",
    "summary": "Full-Featured Pure-Python Kafka Client",
    "version": "2.8.0",
    "split_keywords": [
        "apache",
        "kafka",
        "client",
        "driver"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "346649d6f8b602ccb7ede3f17ae808ca3a3fefdcac1272e1a7bed138b3662d72",
                "md5": "c75bccf323d197545fdef3f9bf5535ce",
                "sha256": "6b075909a52cb0c95325bc16ab797bbcdbb37386652ea460705ed4472ce91459"
            },
            "downloads": -1,
            "filename": "pykafka-2.8.0-py3.6-linux-x86_64.egg",
            "has_sig": false,
            "md5_digest": "c75bccf323d197545fdef3f9bf5535ce",
            "packagetype": "bdist_egg",
            "python_version": "3.6",
            "requires_python": null,
            "size": 415615,
            "upload_time": "2018-09-24T23:17:47",
            "upload_time_iso_8601": "2018-09-24T23:17:47.990118Z",
            "url": "https://files.pythonhosted.org/packages/34/66/49d6f8b602ccb7ede3f17ae808ca3a3fefdcac1272e1a7bed138b3662d72/pykafka-2.8.0-py3.6-linux-x86_64.egg",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "554b4828ec5ed766cca0c27de234688122494c5762965e70deeb88b84f5d8d98",
                "md5": "f51e3bde594b9bbdc864e1118c2f61c1",
                "sha256": "f0bbd394ae6970042a587c99fe4dc0966e67787249d963d4ce2f810dc9490577"
            },
            "downloads": -1,
            "filename": "pykafka-2.8.0.tar.gz",
            "has_sig": false,
            "md5_digest": "f51e3bde594b9bbdc864e1118c2f61c1",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 141584,
            "upload_time": "2018-09-24T23:17:50",
            "upload_time_iso_8601": "2018-09-24T23:17:50.307677Z",
            "url": "https://files.pythonhosted.org/packages/55/4b/4828ec5ed766cca0c27de234688122494c5762965e70deeb88b84f5d8d98/pykafka-2.8.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2018-09-24 23:17:50",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "github_user": "Parsely",
    "github_project": "pykafka",
    "travis_ci": true,
    "coveralls": true,
    "github_actions": false,
    "tox": true,
    "lcname": "pykafka"
}
        
Elapsed time: 0.06734s