kafka-scrapy-connect


Namekafka-scrapy-connect JSON
Version 2.5.0 PyPI version JSON
download
home_pagehttps://github.com/spicyparrot/kafka_scrapy_connect
SummaryIntegrating Scrapy with kafka using the confluent-kafka python client
upload_time2024-03-06 18:30:44
maintainer
docs_urlNone
author
requires_python
license
keywords kafka scrapy crawling scraping
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            [![Python 3.9](https://img.shields.io/badge/python-3.9-blue.svg)](https://www.python.org/downloads/release/python-3918) [![Python 3.10](https://img.shields.io/badge/python-3.10-blue.svg)](https://www.python.org/downloads/release/python-31013/) [![Python 3.11](https://img.shields.io/badge/python-3.11-blue.svg)](https://www.python.org/downloads/release/python-3117/)
## 🚀 Kafka Scrapy Connect

### Overview

`kafka_scrapy_connect` is a custom Scrapy library that integrates Scrapy with Kafka.

It consists of two main components: spiders and pipelines, which interact with Kafka for message consumption and item publishing.

The library also comes with a custom extension that publishes log stats to a kafka topic at EoD, which allows the user to analyse offline how well the spider is performing!

This project has been motivated by the great work undertaken in: https://github.com/dfdeshom/scrapy-kafka. 

`kafka_scrapy_connect` utilises [Confluent's](https://github.com/confluentinc/confluent-kafka-python) Kafka Python client under the hood, to provide high-level producer and consumer features.

## Features

1️⃣ **Integration with Kafka** 📈
  - Enables communication between Scrapy spiders and Kafka topics for efficient data processing.
  - Through partitions and consumer groups message processing can be parallelised across multiple spiders!
  - Reduces overhead and improves throughput by giving the user the ability to consume messages in batches.

2️⃣ **Customisable Settings** 🛠️
  - Provides flexibility through customisable configuration for both consumers and producers.

3️⃣ **Error Handling** 🚑
  - Automatically handles network errors during crawling and publishes failed URLs to a designated output topic. 

4️⃣ **Serialisation Customisation** 🧬
  - Allows users to customize how Kafka messages are deserializsd by overriding the process_kafka_message method.

## Installation

You can install `kafka_scrapy_connect` via pip:
```
pip install kafka_scrapy_connect
```

## Example

A full [example](https://github.com/spicyparrot/kafka_scrapy_connect?tab=readme-ov-file#example) using the `kafka_scrapy_connect` library can be found inside the repo.

The *only prerequisite for walking through the exampl*e is the installation of **Docker** 🐳 . 

This is needed because a kafka cluster will be created locally using containers so `kafka_scrapy_connect` can communicate with a broker. 

If all set, follow the below steps 👇

1. Create a virtual environment, clone the repo and install requirements:
```bash
python3 -m venv .venv
source .venv/bin/activate
git clone https://github.com/spicyparrot/kafka_scrapy_connect.git && cd kafka_scrapy_connect
pip install -r requirements.txt
```
2. Create a local kafka cluster with required topics:

```bash
bash ./examples/kafka/kafka_start.sh --input-topic ScrapyInput,1 --output-topic ScrapyOutput,1 --error-topic ScrapyError,1
```

3. Initiate the spider:
```bash
cd examples/quotes && scrapy crawl quotes
```

4. Publish a message to the input kafka topic and watch the spider consume and process the mesasge 🪄 
   1. ☝️ This will require some custom producer code to publish messages or go to https://localhost:8080 and use the UI to publish some example messages in! (*The UI was created when bringing up your local kafka cluster*)


5. When satisfied with testing, exit the spider and clean up the local kafka cluster:
```bash
bash ./examples/kafka/kafka_stop.sh
```

## Usage

### Custom Settings
`kafka_scrapy_connect` supports the following custom settings:

- `SCRAPY_KAFKA_HOSTS`  - A list of kafka broker hosts. (Default: `localhost:29092`)
- `SCRAPY_INPUT_TOPIC`  - Topic from which the spider[s] *consumes* messages from. (Default: `ScrapyInput`)
- `SCRAPY_OUTPUT_TOPIC` - Topic where scraped items are published. (Default: `ScrapyOutput`)
- `SCRAPY_ERROR_TOPIC`  - Topic for publishing URLs that failed due to *network errors*. (Default: `ScrapyError`)
- `SCRAPY_CONSUMER_CONFIG` - Additional configuration options for Kafka consumers (see [here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md))
- `SCRAPY_PRODUCER_CONFIG` - Additional configuration options for Kafka producers (see [here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md))
- `SCRAPY_KAFKA_PRODUCER` - Key used for partitioning messages in Kafka producer (Default: `""` *Roundrobin*)
- `SCRAPY_KAFKA_PRODUCER_CALLBACKS` - Enable or disable asynchronous message delivery callbacks. (Default: `False`)

### Customisation

---

**Customising deserialisation** 

You can customize how Kafka messages are deserialized by overriding the process_kafka_message method in your spider class. 

This allows for handling custom message formats or data transformations.

```python
class CustomSpider(KafkaListeningSpider):
    def process_kafka_message(self, message, meta={}, headers={}):
        # Custom deserialization logic
        # Return URL, metadata or None if extraction fails
        pass
```

⚠️ *By default*, if no custom `process_kafka_message` method is provided, the spider method `process_kafka_message` **will expect** a JSON payload or a string containing a valid url. If it's a JSON object, it expects `url` in the K/V pair.

---

**Customising Producer & Consumer settings**

You can customize producer and consumer settings by providing a dictionary of configuration options in your Scrapy settings under `SCRAPY_PRODUCER_CONFIG` and `SCRAPY_CONSUMER_CONFIG`.

```python
# Example SCRAPY_PRODUCER_CONFIG
SCRAPY_PRODUCER_CONFIG = {
    'compression.type': 'gzip',
    'request.timeout.ms': 5000
}

# Example SCRAPY_CONSUMER_CONFIG
SCRAPY_CONSUMER_CONFIG = {
    'fetch.wait.max.ms': 10,
    'max.poll.interval.ms': 600000,
    'auto.offset.reset': 'latest'
}
```
---
**Custom stats extensions**

`kafka_scrapy_connect` comes with a custom Scrapy stats extension that:
1. logs basic scraping statistics every minute (*frequency can be configured by the scrapy setting* `KAFKA_LOGSTATS_INTERVAL`)
2. At **end-of-day**, will publish logging statistics to a Kafka topic (specified by the scrapy setting `SCRAPY_STATS_TOPIC`).
   1. Each summary message will be published with a key specifying the summary date (`2024-02-27`) for easy identification.
3.  If the spider is shutdown or closed, due to a deployment etc, a summary payload will also be sent to a kafka topic (`SCRAPY_STATS_TOPIC`)


To enable this custom extension, disable the standard LogStats extension and modify your `settings.py` to include the below:
```
# Kafka topic for capturing stats
SCRAPY_STATS_TOPIC = 'ScrapyStats'

# Disable standard logging extension (use custom kafka_scrapy_connect extension)
EXTENSIONS = {
  "scrapy.extensions.logstats.LogStats": None,
  "kafka_scrapy_connect.extensions.KafkaLogStats": 500
}
```

An example payload sent to the statistics topic will look like:
```json
{
	"pages_crawled": 3,
	"items_scraped": 30,
	"avg pages/min": 0.4,
	"avg pages/hour": 23.78,
	"avg pages/day": 570.63,
	"avg items/min": 3.96,
	"avg items/hour": 237.76,
	"avg items/day": 5706.3,
	"successful_request_pct": 100.0,
	"http_status_counts": "200: 3",
	"max_memory": 76136448,
	"elapsed_time": 454.23
}
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/spicyparrot/kafka_scrapy_connect",
    "name": "kafka-scrapy-connect",
    "maintainer": "",
    "docs_url": null,
    "requires_python": "",
    "maintainer_email": "",
    "keywords": "kafka,scrapy,crawling,scraping",
    "author": "",
    "author_email": "",
    "download_url": "https://files.pythonhosted.org/packages/7d/b3/3755d850ddf6c7330d6bf881b7c37f40a15a32d1f50594c399afdd1f9634/kafka-scrapy-connect-2.5.0.tar.gz",
    "platform": null,
    "description": "[![Python 3.9](https://img.shields.io/badge/python-3.9-blue.svg)](https://www.python.org/downloads/release/python-3918) [![Python 3.10](https://img.shields.io/badge/python-3.10-blue.svg)](https://www.python.org/downloads/release/python-31013/) [![Python 3.11](https://img.shields.io/badge/python-3.11-blue.svg)](https://www.python.org/downloads/release/python-3117/)\n## \ud83d\ude80 Kafka Scrapy Connect\n\n### Overview\n\n`kafka_scrapy_connect` is a custom Scrapy library that integrates Scrapy with Kafka.\n\nIt consists of two main components: spiders and pipelines, which interact with Kafka for message consumption and item publishing.\n\nThe library also comes with a custom extension that publishes log stats to a kafka topic at EoD, which allows the user to analyse offline how well the spider is performing!\n\nThis project has been motivated by the great work undertaken in: https://github.com/dfdeshom/scrapy-kafka. \n\n`kafka_scrapy_connect` utilises [Confluent's](https://github.com/confluentinc/confluent-kafka-python) Kafka Python client under the hood, to provide high-level producer and consumer features.\n\n## Features\n\n1\ufe0f\u20e3 **Integration with Kafka** \ud83d\udcc8\n  - Enables communication between Scrapy spiders and Kafka topics for efficient data processing.\n  - Through partitions and consumer groups message processing can be parallelised across multiple spiders!\n  - Reduces overhead and improves throughput by giving the user the ability to consume messages in batches.\n\n2\ufe0f\u20e3 **Customisable Settings** \ud83d\udee0\ufe0f\n  - Provides flexibility through customisable configuration for both consumers and producers.\n\n3\ufe0f\u20e3 **Error Handling** \ud83d\ude91\n  - Automatically handles network errors during crawling and publishes failed URLs to a designated output topic. \n\n4\ufe0f\u20e3 **Serialisation Customisation** \ud83e\uddec\n  - Allows users to customize how Kafka messages are deserializsd by overriding the process_kafka_message method.\n\n## Installation\n\nYou can install `kafka_scrapy_connect` via pip:\n```\npip install kafka_scrapy_connect\n```\n\n## Example\n\nA full [example](https://github.com/spicyparrot/kafka_scrapy_connect?tab=readme-ov-file#example) using the `kafka_scrapy_connect` library can be found inside the repo.\n\nThe *only prerequisite for walking through the exampl*e is the installation of **Docker** \ud83d\udc33 . \n\nThis is needed because a kafka cluster will be created locally using containers so `kafka_scrapy_connect` can communicate with a broker. \n\nIf all set, follow the below steps \ud83d\udc47\n\n1. Create a virtual environment, clone the repo and install requirements:\n```bash\npython3 -m venv .venv\nsource .venv/bin/activate\ngit clone https://github.com/spicyparrot/kafka_scrapy_connect.git && cd kafka_scrapy_connect\npip install -r requirements.txt\n```\n2. Create a local kafka cluster with required topics:\n\n```bash\nbash ./examples/kafka/kafka_start.sh --input-topic ScrapyInput,1 --output-topic ScrapyOutput,1 --error-topic ScrapyError,1\n```\n\n3. Initiate the spider:\n```bash\ncd examples/quotes && scrapy crawl quotes\n```\n\n4. Publish a message to the input kafka topic and watch the spider consume and process the mesasge \ud83e\ude84 \n   1. \u261d\ufe0f This will require some custom producer code to publish messages or go to https://localhost:8080 and use the UI to publish some example messages in! (*The UI was created when bringing up your local kafka cluster*)\n\n\n5. When satisfied with testing, exit the spider and clean up the local kafka cluster:\n```bash\nbash ./examples/kafka/kafka_stop.sh\n```\n\n## Usage\n\n### Custom Settings\n`kafka_scrapy_connect` supports the following custom settings:\n\n- `SCRAPY_KAFKA_HOSTS`  - A list of kafka broker hosts. (Default: `localhost:29092`)\n- `SCRAPY_INPUT_TOPIC`  - Topic from which the spider[s] *consumes* messages from. (Default: `ScrapyInput`)\n- `SCRAPY_OUTPUT_TOPIC` - Topic where scraped items are published. (Default: `ScrapyOutput`)\n- `SCRAPY_ERROR_TOPIC`  - Topic for publishing URLs that failed due to *network errors*. (Default: `ScrapyError`)\n- `SCRAPY_CONSUMER_CONFIG` - Additional configuration options for Kafka consumers (see [here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md))\n- `SCRAPY_PRODUCER_CONFIG` - Additional configuration options for Kafka producers (see [here](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md))\n- `SCRAPY_KAFKA_PRODUCER` - Key used for partitioning messages in Kafka producer (Default: `\"\"` *Roundrobin*)\n- `SCRAPY_KAFKA_PRODUCER_CALLBACKS` - Enable or disable asynchronous message delivery callbacks. (Default: `False`)\n\n### Customisation\n\n---\n\n**Customising deserialisation** \n\nYou can customize how Kafka messages are deserialized by overriding the process_kafka_message method in your spider class. \n\nThis allows for handling custom message formats or data transformations.\n\n```python\nclass CustomSpider(KafkaListeningSpider):\n    def process_kafka_message(self, message, meta={}, headers={}):\n        # Custom deserialization logic\n        # Return URL, metadata or None if extraction fails\n        pass\n```\n\n\u26a0\ufe0f *By default*, if no custom `process_kafka_message` method is provided, the spider method `process_kafka_message` **will expect** a JSON payload or a string containing a valid url. If it's a JSON object, it expects `url` in the K/V pair.\n\n---\n\n**Customising Producer & Consumer settings**\n\nYou can customize producer and consumer settings by providing a dictionary of configuration options in your Scrapy settings under `SCRAPY_PRODUCER_CONFIG` and `SCRAPY_CONSUMER_CONFIG`.\n\n```python\n# Example SCRAPY_PRODUCER_CONFIG\nSCRAPY_PRODUCER_CONFIG = {\n    'compression.type': 'gzip',\n    'request.timeout.ms': 5000\n}\n\n# Example SCRAPY_CONSUMER_CONFIG\nSCRAPY_CONSUMER_CONFIG = {\n    'fetch.wait.max.ms': 10,\n    'max.poll.interval.ms': 600000,\n    'auto.offset.reset': 'latest'\n}\n```\n---\n**Custom stats extensions**\n\n`kafka_scrapy_connect` comes with a custom Scrapy stats extension that:\n1. logs basic scraping statistics every minute (*frequency can be configured by the scrapy setting* `KAFKA_LOGSTATS_INTERVAL`)\n2. At **end-of-day**, will publish logging statistics to a Kafka topic (specified by the scrapy setting `SCRAPY_STATS_TOPIC`).\n   1. Each summary message will be published with a key specifying the summary date (`2024-02-27`) for easy identification.\n3.  If the spider is shutdown or closed, due to a deployment etc, a summary payload will also be sent to a kafka topic (`SCRAPY_STATS_TOPIC`)\n\n\nTo enable this custom extension, disable the standard LogStats extension and modify your `settings.py` to include the below:\n```\n# Kafka topic for capturing stats\nSCRAPY_STATS_TOPIC = 'ScrapyStats'\n\n# Disable standard logging extension (use custom kafka_scrapy_connect extension)\nEXTENSIONS = {\n  \"scrapy.extensions.logstats.LogStats\": None,\n  \"kafka_scrapy_connect.extensions.KafkaLogStats\": 500\n}\n```\n\nAn example payload sent to the statistics topic will look like:\n```json\n{\n\t\"pages_crawled\": 3,\n\t\"items_scraped\": 30,\n\t\"avg pages/min\": 0.4,\n\t\"avg pages/hour\": 23.78,\n\t\"avg pages/day\": 570.63,\n\t\"avg items/min\": 3.96,\n\t\"avg items/hour\": 237.76,\n\t\"avg items/day\": 5706.3,\n\t\"successful_request_pct\": 100.0,\n\t\"http_status_counts\": \"200: 3\",\n\t\"max_memory\": 76136448,\n\t\"elapsed_time\": 454.23\n}\n```\n",
    "bugtrack_url": null,
    "license": "",
    "summary": "Integrating Scrapy with kafka using the confluent-kafka python client",
    "version": "2.5.0",
    "project_urls": {
        "Homepage": "https://github.com/spicyparrot/kafka_scrapy_connect"
    },
    "split_keywords": [
        "kafka",
        "scrapy",
        "crawling",
        "scraping"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "99c1c9753b87c16f5ea74bb19bc1a589e52dbd0d960927526d12a766bd388270",
                "md5": "6e73b06b3a402996c9fc08df1aaac332",
                "sha256": "48e9747dca0bba70c9e3776d0967ceabefb24ea04cd451e4db417ad59c759c20"
            },
            "downloads": -1,
            "filename": "kafka_scrapy_connect-2.5.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "6e73b06b3a402996c9fc08df1aaac332",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": null,
            "size": 11657,
            "upload_time": "2024-03-06T18:30:43",
            "upload_time_iso_8601": "2024-03-06T18:30:43.746398Z",
            "url": "https://files.pythonhosted.org/packages/99/c1/c9753b87c16f5ea74bb19bc1a589e52dbd0d960927526d12a766bd388270/kafka_scrapy_connect-2.5.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "7db33755d850ddf6c7330d6bf881b7c37f40a15a32d1f50594c399afdd1f9634",
                "md5": "c4c58a07873b912c45961f25bb9c8386",
                "sha256": "56f02963f0318b3495a9b31d497f16163d1b3b736cfac8a24b0c3282900418fb"
            },
            "downloads": -1,
            "filename": "kafka-scrapy-connect-2.5.0.tar.gz",
            "has_sig": false,
            "md5_digest": "c4c58a07873b912c45961f25bb9c8386",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 12741,
            "upload_time": "2024-03-06T18:30:44",
            "upload_time_iso_8601": "2024-03-06T18:30:44.783471Z",
            "url": "https://files.pythonhosted.org/packages/7d/b3/3755d850ddf6c7330d6bf881b7c37f40a15a32d1f50594c399afdd1f9634/kafka-scrapy-connect-2.5.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-03-06 18:30:44",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "spicyparrot",
    "github_project": "kafka_scrapy_connect",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "requirements": [],
    "lcname": "kafka-scrapy-connect"
}
        
Elapsed time: 0.24868s