langchain-callback-parquet-logger


Namelangchain-callback-parquet-logger JSON
Version 0.4.1 PyPI version JSON
download
home_pageNone
SummaryA Parquet-based callback handler for logging LangChain LLM interactions
upload_time2025-08-30 17:27:10
maintainerNone
docs_urlNone
authorNone
requires_python>=3.8
licenseMIT
keywords langchain logging parquet llm callback monitoring
VCS
bugtrack_url
requirements pyarrow langchain-core
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # LangChain Callback Parquet Logger

A high-performance callback handler for logging LangChain LLM interactions to Parquet files with automatic partitioning, buffering, and batch processing support.

## Features

- 📊 **Parquet Format**: Efficient columnar storage for analytics
- 🚀 **Buffered Writing**: Configurable buffer size for optimal performance
- 📅 **Partitioning**: Optional daily partitioning for better organization
- 🏷️ **Custom Tracking**: Add custom IDs and metadata to your logs
- 🔄 **Batch Processing**: Simple helper for DataFrame batch operations
- 🔒 **Thread-Safe**: Safe for concurrent LLM calls

## Installation

```bash
pip install langchain-callback-parquet-logger
```

## Quick Start

```python
from langchain_callback_parquet_logger import ParquetLogger
from langchain_openai import ChatOpenAI

# Simple usage
llm = ChatOpenAI(model="gpt-4o-mini")
llm.callbacks = [ParquetLogger("./logs")]

response = llm.invoke("What is 2+2?")
```

## Core Features

### 1. Basic Logging

```python
# With context manager (recommended for notebooks)
with ParquetLogger('./logs') as logger:
    llm = ChatOpenAI(model="gpt-4o-mini", callbacks=[logger])
    response = llm.invoke("Hello!")
# Logs automatically flushed on exit
```

### 2. Custom IDs and Metadata

```python
from langchain_callback_parquet_logger import ParquetLogger, with_tags

# Logger-level metadata (included in all logs)
logger = ParquetLogger(
    log_dir="./logs",
    logger_metadata={
        "environment": "production",
        "service": "api-gateway",
        "version": "2.1.0"
    }
)

# Request-level tracking with custom IDs
llm = ChatOpenAI(model="gpt-4o-mini", callbacks=[logger])
response = llm.invoke(
    "What is quantum computing?",
    config=with_tags(
        custom_id="user-123-req-456",
        tags=["production", "high-priority"]
    )
)
```

### 3. Batch Processing (v0.3.0+)

Process DataFrames through LLMs with minimal code:

```python
import pandas as pd
from langchain_callback_parquet_logger import batch_run, with_tags, ParquetLogger

# Prepare your data
df = pd.DataFrame({
    'id': ['001', '002', '003'],
    'question': ['What is AI?', 'Explain quantum computing', 'What is blockchain?']
})

# Add required columns
df['prompt'] = df['question']
df['config'] = df['id'].apply(lambda x: with_tags(custom_id=x))

# Configure LLM with advanced features
with ParquetLogger('./logs') as logger:
    llm = ChatOpenAI(
        model="gpt-4o-mini",
        service_tier="flex",  # Optional: optimize costs
        model_kwargs={"background": True},  # Optional: background processing
        callbacks=[logger]
    )
    
    # Run batch processing
    results = await batch_run(df, llm, max_concurrency=10, show_progress=True)
    df['answer'] = results
```

See [examples/batch_processing.py](examples/batch_processing.py) for advanced usage with structured outputs, web search tools, and more.

#### Memory-Efficient Mode for Huge DataFrames

For massive DataFrames, use `return_results=False` to avoid keeping results in memory:

```python
# Process huge DataFrame without memory overhead
with ParquetLogger('./logs') as logger:
    llm = ChatOpenAI(model="gpt-4o-mini", callbacks=[logger])
    
    # Results saved to parquet only, not kept in memory
    await batch_run(huge_df, llm, return_results=False)
    
# Read results later from parquet files
df_logs = pd.read_parquet('./logs')
results = df_logs[df_logs['event_type'] == 'llm_end']
```

## Configuration Options

### ParquetLogger Parameters

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `log_dir` | str | "./llm_logs" | Directory for log files |
| `buffer_size` | int | 100 | Entries before auto-flush |
| `provider` | str | "openai" | LLM provider name |
| `logger_metadata` | dict | {} | Metadata for all logs |
| `partition_on` | str/None | "date" | "date" or None for no partitioning |

### batch_run Parameters

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `df` | DataFrame | required | DataFrame with data |
| `llm` | LangChain LLM | required | Configured LLM instance |
| `prompt_col` | str | "prompt" | Column with prompts |
| `config_col` | str | "config" | Column with config dicts |
| `tools_col` | str/None | "tools" | Column with tools lists |
| `max_concurrency` | int | 10 | Max parallel requests |
| `show_progress` | bool | True | Show progress bar |
| `return_results` | bool | True | If False, don't keep results in memory |

## Reading Logs

### With Pandas
```python
import pandas as pd
import json

df = pd.read_parquet("./logs")
df['data'] = df['payload'].apply(json.loads)

# Analyze by custom ID
custom_requests = df[df['logger_custom_id'] != '']
print(f"Found {len(custom_requests)} tagged requests")
```

### With DuckDB
```python
import duckdb

conn = duckdb.connect()
df = conn.execute("""
    SELECT 
        logger_custom_id,
        event_type,
        timestamp,
        json_extract_string(payload, '$.usage.total_tokens') as tokens
    FROM read_parquet('./logs/**/*.parquet')
    WHERE logger_custom_id != ''
    ORDER BY timestamp DESC
""").df()
```

## Log Schema

| Column | Type | Description |
|--------|------|-------------|
| `timestamp` | timestamp | Event time (UTC) |
| `run_id` | string | Unique run ID |
| `logger_custom_id` | string | Your custom ID |
| `event_type` | string | llm_start/end/error |
| `provider` | string | LLM provider |
| `logger_metadata` | string | JSON metadata |
| `payload` | string | JSON event data |

## Important Notes

### Notebook Usage
In Jupyter/Colab, use one of these approaches for immediate writes:
- **Context manager** (recommended): `with ParquetLogger() as logger:`
- **Small buffer**: `ParquetLogger(buffer_size=1)`
- **Manual flush**: `logger.flush()`

### File Organization
```
logs/
├── date=2024-01-15/          # With partitioning (default)
│   └── logs_143022_123456.parquet
└── date=2024-01-16/
    └── logs_090122_345678.parquet
```

## Background Response Retrieval (v0.4.0+)

Retrieve completed responses from OpenAI's background/async requests:

```python
import pandas as pd
import openai
from langchain_callback_parquet_logger import retrieve_background_responses, ParquetLogger

# DataFrame with response IDs from background requests
df = pd.DataFrame({
    'response_id': ['resp_123...', 'resp_456...'],
    'logger_custom_id': ['user-001', 'user-002']
})

# Retrieve and log responses
client = openai.AsyncClient()
with ParquetLogger('./retrieval_logs') as logger:
    results = await retrieve_background_responses(
        df,
        client,
        logger=logger,
        show_progress=True,
        checkpoint_file='./checkpoint.parquet'  # Resume capability
    )
```

### Features
- **Automatic rate limiting** with exponential backoff
- **Checkpoint/resume** for interrupted retrievals
- **Memory-efficient mode** with `return_results=False`
- **Progress tracking** with tqdm
- **Structured logging** of attempts, completions, and errors

See [examples/retrieve_background_responses.py](examples/retrieve_background_responses.py) for detailed usage.

## Examples

- [`basic_usage.py`](examples/basic_usage.py) - Simple logging example
- [`batch_processing.py`](examples/batch_processing.py) - Advanced batch processing with all features
- [`simple_batch_example.py`](examples/simple_batch_example.py) - Before/after batch processing comparison
- [`memory_efficient_batch.py`](examples/memory_efficient_batch.py) - Memory-efficient processing for huge DataFrames
- [`partitioning_example.py`](examples/partitioning_example.py) - Partitioning strategies
- [`retrieve_background_responses.py`](examples/retrieve_background_responses.py) - Background response retrieval

## License

MIT License - see LICENSE file

## Contributing

Contributions welcome! Please submit a Pull Request.

## Support

For issues and questions, use [GitHub issues](https://github.com/turbo3136/langchain-callback-parquet-logger/issues).

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "langchain-callback-parquet-logger",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": "langchain, logging, parquet, llm, callback, monitoring",
    "author": null,
    "author_email": "turbo3136 <turbo3136@gmail.com>",
    "download_url": "https://files.pythonhosted.org/packages/77/30/2c10086c836c03e3e31394f96404050a0d70aa72da14c225d1e16afe9157/langchain_callback_parquet_logger-0.4.1.tar.gz",
    "platform": null,
    "description": "# LangChain Callback Parquet Logger\n\nA high-performance callback handler for logging LangChain LLM interactions to Parquet files with automatic partitioning, buffering, and batch processing support.\n\n## Features\n\n- \ud83d\udcca **Parquet Format**: Efficient columnar storage for analytics\n- \ud83d\ude80 **Buffered Writing**: Configurable buffer size for optimal performance\n- \ud83d\udcc5 **Partitioning**: Optional daily partitioning for better organization\n- \ud83c\udff7\ufe0f **Custom Tracking**: Add custom IDs and metadata to your logs\n- \ud83d\udd04 **Batch Processing**: Simple helper for DataFrame batch operations\n- \ud83d\udd12 **Thread-Safe**: Safe for concurrent LLM calls\n\n## Installation\n\n```bash\npip install langchain-callback-parquet-logger\n```\n\n## Quick Start\n\n```python\nfrom langchain_callback_parquet_logger import ParquetLogger\nfrom langchain_openai import ChatOpenAI\n\n# Simple usage\nllm = ChatOpenAI(model=\"gpt-4o-mini\")\nllm.callbacks = [ParquetLogger(\"./logs\")]\n\nresponse = llm.invoke(\"What is 2+2?\")\n```\n\n## Core Features\n\n### 1. Basic Logging\n\n```python\n# With context manager (recommended for notebooks)\nwith ParquetLogger('./logs') as logger:\n    llm = ChatOpenAI(model=\"gpt-4o-mini\", callbacks=[logger])\n    response = llm.invoke(\"Hello!\")\n# Logs automatically flushed on exit\n```\n\n### 2. Custom IDs and Metadata\n\n```python\nfrom langchain_callback_parquet_logger import ParquetLogger, with_tags\n\n# Logger-level metadata (included in all logs)\nlogger = ParquetLogger(\n    log_dir=\"./logs\",\n    logger_metadata={\n        \"environment\": \"production\",\n        \"service\": \"api-gateway\",\n        \"version\": \"2.1.0\"\n    }\n)\n\n# Request-level tracking with custom IDs\nllm = ChatOpenAI(model=\"gpt-4o-mini\", callbacks=[logger])\nresponse = llm.invoke(\n    \"What is quantum computing?\",\n    config=with_tags(\n        custom_id=\"user-123-req-456\",\n        tags=[\"production\", \"high-priority\"]\n    )\n)\n```\n\n### 3. Batch Processing (v0.3.0+)\n\nProcess DataFrames through LLMs with minimal code:\n\n```python\nimport pandas as pd\nfrom langchain_callback_parquet_logger import batch_run, with_tags, ParquetLogger\n\n# Prepare your data\ndf = pd.DataFrame({\n    'id': ['001', '002', '003'],\n    'question': ['What is AI?', 'Explain quantum computing', 'What is blockchain?']\n})\n\n# Add required columns\ndf['prompt'] = df['question']\ndf['config'] = df['id'].apply(lambda x: with_tags(custom_id=x))\n\n# Configure LLM with advanced features\nwith ParquetLogger('./logs') as logger:\n    llm = ChatOpenAI(\n        model=\"gpt-4o-mini\",\n        service_tier=\"flex\",  # Optional: optimize costs\n        model_kwargs={\"background\": True},  # Optional: background processing\n        callbacks=[logger]\n    )\n    \n    # Run batch processing\n    results = await batch_run(df, llm, max_concurrency=10, show_progress=True)\n    df['answer'] = results\n```\n\nSee [examples/batch_processing.py](examples/batch_processing.py) for advanced usage with structured outputs, web search tools, and more.\n\n#### Memory-Efficient Mode for Huge DataFrames\n\nFor massive DataFrames, use `return_results=False` to avoid keeping results in memory:\n\n```python\n# Process huge DataFrame without memory overhead\nwith ParquetLogger('./logs') as logger:\n    llm = ChatOpenAI(model=\"gpt-4o-mini\", callbacks=[logger])\n    \n    # Results saved to parquet only, not kept in memory\n    await batch_run(huge_df, llm, return_results=False)\n    \n# Read results later from parquet files\ndf_logs = pd.read_parquet('./logs')\nresults = df_logs[df_logs['event_type'] == 'llm_end']\n```\n\n## Configuration Options\n\n### ParquetLogger Parameters\n\n| Parameter | Type | Default | Description |\n|-----------|------|---------|-------------|\n| `log_dir` | str | \"./llm_logs\" | Directory for log files |\n| `buffer_size` | int | 100 | Entries before auto-flush |\n| `provider` | str | \"openai\" | LLM provider name |\n| `logger_metadata` | dict | {} | Metadata for all logs |\n| `partition_on` | str/None | \"date\" | \"date\" or None for no partitioning |\n\n### batch_run Parameters\n\n| Parameter | Type | Default | Description |\n|-----------|------|---------|-------------|\n| `df` | DataFrame | required | DataFrame with data |\n| `llm` | LangChain LLM | required | Configured LLM instance |\n| `prompt_col` | str | \"prompt\" | Column with prompts |\n| `config_col` | str | \"config\" | Column with config dicts |\n| `tools_col` | str/None | \"tools\" | Column with tools lists |\n| `max_concurrency` | int | 10 | Max parallel requests |\n| `show_progress` | bool | True | Show progress bar |\n| `return_results` | bool | True | If False, don't keep results in memory |\n\n## Reading Logs\n\n### With Pandas\n```python\nimport pandas as pd\nimport json\n\ndf = pd.read_parquet(\"./logs\")\ndf['data'] = df['payload'].apply(json.loads)\n\n# Analyze by custom ID\ncustom_requests = df[df['logger_custom_id'] != '']\nprint(f\"Found {len(custom_requests)} tagged requests\")\n```\n\n### With DuckDB\n```python\nimport duckdb\n\nconn = duckdb.connect()\ndf = conn.execute(\"\"\"\n    SELECT \n        logger_custom_id,\n        event_type,\n        timestamp,\n        json_extract_string(payload, '$.usage.total_tokens') as tokens\n    FROM read_parquet('./logs/**/*.parquet')\n    WHERE logger_custom_id != ''\n    ORDER BY timestamp DESC\n\"\"\").df()\n```\n\n## Log Schema\n\n| Column | Type | Description |\n|--------|------|-------------|\n| `timestamp` | timestamp | Event time (UTC) |\n| `run_id` | string | Unique run ID |\n| `logger_custom_id` | string | Your custom ID |\n| `event_type` | string | llm_start/end/error |\n| `provider` | string | LLM provider |\n| `logger_metadata` | string | JSON metadata |\n| `payload` | string | JSON event data |\n\n## Important Notes\n\n### Notebook Usage\nIn Jupyter/Colab, use one of these approaches for immediate writes:\n- **Context manager** (recommended): `with ParquetLogger() as logger:`\n- **Small buffer**: `ParquetLogger(buffer_size=1)`\n- **Manual flush**: `logger.flush()`\n\n### File Organization\n```\nlogs/\n\u251c\u2500\u2500 date=2024-01-15/          # With partitioning (default)\n\u2502   \u2514\u2500\u2500 logs_143022_123456.parquet\n\u2514\u2500\u2500 date=2024-01-16/\n    \u2514\u2500\u2500 logs_090122_345678.parquet\n```\n\n## Background Response Retrieval (v0.4.0+)\n\nRetrieve completed responses from OpenAI's background/async requests:\n\n```python\nimport pandas as pd\nimport openai\nfrom langchain_callback_parquet_logger import retrieve_background_responses, ParquetLogger\n\n# DataFrame with response IDs from background requests\ndf = pd.DataFrame({\n    'response_id': ['resp_123...', 'resp_456...'],\n    'logger_custom_id': ['user-001', 'user-002']\n})\n\n# Retrieve and log responses\nclient = openai.AsyncClient()\nwith ParquetLogger('./retrieval_logs') as logger:\n    results = await retrieve_background_responses(\n        df,\n        client,\n        logger=logger,\n        show_progress=True,\n        checkpoint_file='./checkpoint.parquet'  # Resume capability\n    )\n```\n\n### Features\n- **Automatic rate limiting** with exponential backoff\n- **Checkpoint/resume** for interrupted retrievals\n- **Memory-efficient mode** with `return_results=False`\n- **Progress tracking** with tqdm\n- **Structured logging** of attempts, completions, and errors\n\nSee [examples/retrieve_background_responses.py](examples/retrieve_background_responses.py) for detailed usage.\n\n## Examples\n\n- [`basic_usage.py`](examples/basic_usage.py) - Simple logging example\n- [`batch_processing.py`](examples/batch_processing.py) - Advanced batch processing with all features\n- [`simple_batch_example.py`](examples/simple_batch_example.py) - Before/after batch processing comparison\n- [`memory_efficient_batch.py`](examples/memory_efficient_batch.py) - Memory-efficient processing for huge DataFrames\n- [`partitioning_example.py`](examples/partitioning_example.py) - Partitioning strategies\n- [`retrieve_background_responses.py`](examples/retrieve_background_responses.py) - Background response retrieval\n\n## License\n\nMIT License - see LICENSE file\n\n## Contributing\n\nContributions welcome! Please submit a Pull Request.\n\n## Support\n\nFor issues and questions, use [GitHub issues](https://github.com/turbo3136/langchain-callback-parquet-logger/issues).\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "A Parquet-based callback handler for logging LangChain LLM interactions",
    "version": "0.4.1",
    "project_urls": {
        "Homepage": "https://github.com/turbo3136/langchain-callback-parquet-logger",
        "Issues": "https://github.com/turbo3136/langchain-callback-parquet-logger/issues",
        "Repository": "https://github.com/turbo3136/langchain-callback-parquet-logger"
    },
    "split_keywords": [
        "langchain",
        " logging",
        " parquet",
        " llm",
        " callback",
        " monitoring"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "609ad3b616f32814a81b85661a30952073671911502ee1f30ce181fb828ceff6",
                "md5": "1a12c9ce9fa8dc783e5a18074fee111a",
                "sha256": "e9b6d0042ecb1a1434084b4e88337afb1844feab9cba83b5c2ed76b7aacb8114"
            },
            "downloads": -1,
            "filename": "langchain_callback_parquet_logger-0.4.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "1a12c9ce9fa8dc783e5a18074fee111a",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 15559,
            "upload_time": "2025-08-30T17:27:09",
            "upload_time_iso_8601": "2025-08-30T17:27:09.482408Z",
            "url": "https://files.pythonhosted.org/packages/60/9a/d3b616f32814a81b85661a30952073671911502ee1f30ce181fb828ceff6/langchain_callback_parquet_logger-0.4.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "77302c10086c836c03e3e31394f96404050a0d70aa72da14c225d1e16afe9157",
                "md5": "356834720e805aac53c0d1e7de52cf24",
                "sha256": "7fa8539d13cec3f4564085f52610f5280efff02aba09e34f2837f4af9f874faa"
            },
            "downloads": -1,
            "filename": "langchain_callback_parquet_logger-0.4.1.tar.gz",
            "has_sig": false,
            "md5_digest": "356834720e805aac53c0d1e7de52cf24",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 21714,
            "upload_time": "2025-08-30T17:27:10",
            "upload_time_iso_8601": "2025-08-30T17:27:10.804649Z",
            "url": "https://files.pythonhosted.org/packages/77/30/2c10086c836c03e3e31394f96404050a0d70aa72da14c225d1e16afe9157/langchain_callback_parquet_logger-0.4.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-08-30 17:27:10",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "turbo3136",
    "github_project": "langchain-callback-parquet-logger",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "requirements": [
        {
            "name": "pyarrow",
            "specs": [
                [
                    ">=",
                    "10.0.0"
                ]
            ]
        },
        {
            "name": "langchain-core",
            "specs": [
                [
                    ">=",
                    "0.1.0"
                ]
            ]
        }
    ],
    "lcname": "langchain-callback-parquet-logger"
}
        
Elapsed time: 1.57300s