# LangChain Callback Parquet Logger
A high-performance callback handler for logging LangChain LLM interactions to Parquet files with automatic partitioning, buffering, and batch processing support.
## Features
- 📊 **Parquet Format**: Efficient columnar storage for analytics
- 🚀 **Buffered Writing**: Configurable buffer size for optimal performance
- 📅 **Partitioning**: Optional daily partitioning for better organization
- 🏷️ **Custom Tracking**: Add custom IDs and metadata to your logs
- 🔄 **Batch Processing**: Simple helper for DataFrame batch operations
- 🔒 **Thread-Safe**: Safe for concurrent LLM calls
## Installation
```bash
pip install langchain-callback-parquet-logger
```
## Quick Start
```python
from langchain_callback_parquet_logger import ParquetLogger
from langchain_openai import ChatOpenAI
# Simple usage
llm = ChatOpenAI(model="gpt-4o-mini")
llm.callbacks = [ParquetLogger("./logs")]
response = llm.invoke("What is 2+2?")
```
## Core Features
### 1. Basic Logging
```python
# With context manager (recommended for notebooks)
with ParquetLogger('./logs') as logger:
llm = ChatOpenAI(model="gpt-4o-mini", callbacks=[logger])
response = llm.invoke("Hello!")
# Logs automatically flushed on exit
```
### 2. Custom IDs and Metadata
```python
from langchain_callback_parquet_logger import ParquetLogger, with_tags
# Logger-level metadata (included in all logs)
logger = ParquetLogger(
log_dir="./logs",
logger_metadata={
"environment": "production",
"service": "api-gateway",
"version": "2.1.0"
}
)
# Request-level tracking with custom IDs
llm = ChatOpenAI(model="gpt-4o-mini", callbacks=[logger])
response = llm.invoke(
"What is quantum computing?",
config=with_tags(
custom_id="user-123-req-456",
tags=["production", "high-priority"]
)
)
```
### 3. Batch Processing (v0.3.0+)
Process DataFrames through LLMs with minimal code:
```python
import pandas as pd
from langchain_callback_parquet_logger import batch_run, with_tags, ParquetLogger
# Prepare your data
df = pd.DataFrame({
'id': ['001', '002', '003'],
'question': ['What is AI?', 'Explain quantum computing', 'What is blockchain?']
})
# Add required columns
df['prompt'] = df['question']
df['config'] = df['id'].apply(lambda x: with_tags(custom_id=x))
# Configure LLM with advanced features
with ParquetLogger('./logs') as logger:
llm = ChatOpenAI(
model="gpt-4o-mini",
service_tier="flex", # Optional: optimize costs
model_kwargs={"background": True}, # Optional: background processing
callbacks=[logger]
)
# Run batch processing
results = await batch_run(df, llm, max_concurrency=10, show_progress=True)
df['answer'] = results
```
See [examples/batch_processing.py](examples/batch_processing.py) for advanced usage with structured outputs, web search tools, and more.
#### Memory-Efficient Mode for Huge DataFrames
For massive DataFrames, use `return_results=False` to avoid keeping results in memory:
```python
# Process huge DataFrame without memory overhead
with ParquetLogger('./logs') as logger:
llm = ChatOpenAI(model="gpt-4o-mini", callbacks=[logger])
# Results saved to parquet only, not kept in memory
await batch_run(huge_df, llm, return_results=False)
# Read results later from parquet files
df_logs = pd.read_parquet('./logs')
results = df_logs[df_logs['event_type'] == 'llm_end']
```
## Configuration Options
### ParquetLogger Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `log_dir` | str | "./llm_logs" | Directory for log files |
| `buffer_size` | int | 100 | Entries before auto-flush |
| `provider` | str | "openai" | LLM provider name |
| `logger_metadata` | dict | {} | Metadata for all logs |
| `partition_on` | str/None | "date" | "date" or None for no partitioning |
### batch_run Parameters
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `df` | DataFrame | required | DataFrame with data |
| `llm` | LangChain LLM | required | Configured LLM instance |
| `prompt_col` | str | "prompt" | Column with prompts |
| `config_col` | str | "config" | Column with config dicts |
| `tools_col` | str/None | "tools" | Column with tools lists |
| `max_concurrency` | int | 10 | Max parallel requests |
| `show_progress` | bool | True | Show progress bar |
| `return_results` | bool | True | If False, don't keep results in memory |
## Reading Logs
### With Pandas
```python
import pandas as pd
import json
df = pd.read_parquet("./logs")
df['data'] = df['payload'].apply(json.loads)
# Analyze by custom ID
custom_requests = df[df['logger_custom_id'] != '']
print(f"Found {len(custom_requests)} tagged requests")
```
### With DuckDB
```python
import duckdb
conn = duckdb.connect()
df = conn.execute("""
SELECT
logger_custom_id,
event_type,
timestamp,
json_extract_string(payload, '$.usage.total_tokens') as tokens
FROM read_parquet('./logs/**/*.parquet')
WHERE logger_custom_id != ''
ORDER BY timestamp DESC
""").df()
```
## Log Schema
| Column | Type | Description |
|--------|------|-------------|
| `timestamp` | timestamp | Event time (UTC) |
| `run_id` | string | Unique run ID |
| `logger_custom_id` | string | Your custom ID |
| `event_type` | string | llm_start/end/error |
| `provider` | string | LLM provider |
| `logger_metadata` | string | JSON metadata |
| `payload` | string | JSON event data |
## Important Notes
### Notebook Usage
In Jupyter/Colab, use one of these approaches for immediate writes:
- **Context manager** (recommended): `with ParquetLogger() as logger:`
- **Small buffer**: `ParquetLogger(buffer_size=1)`
- **Manual flush**: `logger.flush()`
### File Organization
```
logs/
├── date=2024-01-15/ # With partitioning (default)
│ └── logs_143022_123456.parquet
└── date=2024-01-16/
└── logs_090122_345678.parquet
```
## Background Response Retrieval (v0.4.0+)
Retrieve completed responses from OpenAI's background/async requests:
```python
import pandas as pd
import openai
from langchain_callback_parquet_logger import retrieve_background_responses, ParquetLogger
# DataFrame with response IDs from background requests
df = pd.DataFrame({
'response_id': ['resp_123...', 'resp_456...'],
'logger_custom_id': ['user-001', 'user-002']
})
# Retrieve and log responses
client = openai.AsyncClient()
with ParquetLogger('./retrieval_logs') as logger:
results = await retrieve_background_responses(
df,
client,
logger=logger,
show_progress=True,
checkpoint_file='./checkpoint.parquet' # Resume capability
)
```
### Features
- **Automatic rate limiting** with exponential backoff
- **Checkpoint/resume** for interrupted retrievals
- **Memory-efficient mode** with `return_results=False`
- **Progress tracking** with tqdm
- **Structured logging** of attempts, completions, and errors
See [examples/retrieve_background_responses.py](examples/retrieve_background_responses.py) for detailed usage.
## Examples
- [`basic_usage.py`](examples/basic_usage.py) - Simple logging example
- [`batch_processing.py`](examples/batch_processing.py) - Advanced batch processing with all features
- [`simple_batch_example.py`](examples/simple_batch_example.py) - Before/after batch processing comparison
- [`memory_efficient_batch.py`](examples/memory_efficient_batch.py) - Memory-efficient processing for huge DataFrames
- [`partitioning_example.py`](examples/partitioning_example.py) - Partitioning strategies
- [`retrieve_background_responses.py`](examples/retrieve_background_responses.py) - Background response retrieval
## License
MIT License - see LICENSE file
## Contributing
Contributions welcome! Please submit a Pull Request.
## Support
For issues and questions, use [GitHub issues](https://github.com/turbo3136/langchain-callback-parquet-logger/issues).
Raw data
{
"_id": null,
"home_page": null,
"name": "langchain-callback-parquet-logger",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "langchain, logging, parquet, llm, callback, monitoring",
"author": null,
"author_email": "turbo3136 <turbo3136@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/77/30/2c10086c836c03e3e31394f96404050a0d70aa72da14c225d1e16afe9157/langchain_callback_parquet_logger-0.4.1.tar.gz",
"platform": null,
"description": "# LangChain Callback Parquet Logger\n\nA high-performance callback handler for logging LangChain LLM interactions to Parquet files with automatic partitioning, buffering, and batch processing support.\n\n## Features\n\n- \ud83d\udcca **Parquet Format**: Efficient columnar storage for analytics\n- \ud83d\ude80 **Buffered Writing**: Configurable buffer size for optimal performance\n- \ud83d\udcc5 **Partitioning**: Optional daily partitioning for better organization\n- \ud83c\udff7\ufe0f **Custom Tracking**: Add custom IDs and metadata to your logs\n- \ud83d\udd04 **Batch Processing**: Simple helper for DataFrame batch operations\n- \ud83d\udd12 **Thread-Safe**: Safe for concurrent LLM calls\n\n## Installation\n\n```bash\npip install langchain-callback-parquet-logger\n```\n\n## Quick Start\n\n```python\nfrom langchain_callback_parquet_logger import ParquetLogger\nfrom langchain_openai import ChatOpenAI\n\n# Simple usage\nllm = ChatOpenAI(model=\"gpt-4o-mini\")\nllm.callbacks = [ParquetLogger(\"./logs\")]\n\nresponse = llm.invoke(\"What is 2+2?\")\n```\n\n## Core Features\n\n### 1. Basic Logging\n\n```python\n# With context manager (recommended for notebooks)\nwith ParquetLogger('./logs') as logger:\n llm = ChatOpenAI(model=\"gpt-4o-mini\", callbacks=[logger])\n response = llm.invoke(\"Hello!\")\n# Logs automatically flushed on exit\n```\n\n### 2. Custom IDs and Metadata\n\n```python\nfrom langchain_callback_parquet_logger import ParquetLogger, with_tags\n\n# Logger-level metadata (included in all logs)\nlogger = ParquetLogger(\n log_dir=\"./logs\",\n logger_metadata={\n \"environment\": \"production\",\n \"service\": \"api-gateway\",\n \"version\": \"2.1.0\"\n }\n)\n\n# Request-level tracking with custom IDs\nllm = ChatOpenAI(model=\"gpt-4o-mini\", callbacks=[logger])\nresponse = llm.invoke(\n \"What is quantum computing?\",\n config=with_tags(\n custom_id=\"user-123-req-456\",\n tags=[\"production\", \"high-priority\"]\n )\n)\n```\n\n### 3. Batch Processing (v0.3.0+)\n\nProcess DataFrames through LLMs with minimal code:\n\n```python\nimport pandas as pd\nfrom langchain_callback_parquet_logger import batch_run, with_tags, ParquetLogger\n\n# Prepare your data\ndf = pd.DataFrame({\n 'id': ['001', '002', '003'],\n 'question': ['What is AI?', 'Explain quantum computing', 'What is blockchain?']\n})\n\n# Add required columns\ndf['prompt'] = df['question']\ndf['config'] = df['id'].apply(lambda x: with_tags(custom_id=x))\n\n# Configure LLM with advanced features\nwith ParquetLogger('./logs') as logger:\n llm = ChatOpenAI(\n model=\"gpt-4o-mini\",\n service_tier=\"flex\", # Optional: optimize costs\n model_kwargs={\"background\": True}, # Optional: background processing\n callbacks=[logger]\n )\n \n # Run batch processing\n results = await batch_run(df, llm, max_concurrency=10, show_progress=True)\n df['answer'] = results\n```\n\nSee [examples/batch_processing.py](examples/batch_processing.py) for advanced usage with structured outputs, web search tools, and more.\n\n#### Memory-Efficient Mode for Huge DataFrames\n\nFor massive DataFrames, use `return_results=False` to avoid keeping results in memory:\n\n```python\n# Process huge DataFrame without memory overhead\nwith ParquetLogger('./logs') as logger:\n llm = ChatOpenAI(model=\"gpt-4o-mini\", callbacks=[logger])\n \n # Results saved to parquet only, not kept in memory\n await batch_run(huge_df, llm, return_results=False)\n \n# Read results later from parquet files\ndf_logs = pd.read_parquet('./logs')\nresults = df_logs[df_logs['event_type'] == 'llm_end']\n```\n\n## Configuration Options\n\n### ParquetLogger Parameters\n\n| Parameter | Type | Default | Description |\n|-----------|------|---------|-------------|\n| `log_dir` | str | \"./llm_logs\" | Directory for log files |\n| `buffer_size` | int | 100 | Entries before auto-flush |\n| `provider` | str | \"openai\" | LLM provider name |\n| `logger_metadata` | dict | {} | Metadata for all logs |\n| `partition_on` | str/None | \"date\" | \"date\" or None for no partitioning |\n\n### batch_run Parameters\n\n| Parameter | Type | Default | Description |\n|-----------|------|---------|-------------|\n| `df` | DataFrame | required | DataFrame with data |\n| `llm` | LangChain LLM | required | Configured LLM instance |\n| `prompt_col` | str | \"prompt\" | Column with prompts |\n| `config_col` | str | \"config\" | Column with config dicts |\n| `tools_col` | str/None | \"tools\" | Column with tools lists |\n| `max_concurrency` | int | 10 | Max parallel requests |\n| `show_progress` | bool | True | Show progress bar |\n| `return_results` | bool | True | If False, don't keep results in memory |\n\n## Reading Logs\n\n### With Pandas\n```python\nimport pandas as pd\nimport json\n\ndf = pd.read_parquet(\"./logs\")\ndf['data'] = df['payload'].apply(json.loads)\n\n# Analyze by custom ID\ncustom_requests = df[df['logger_custom_id'] != '']\nprint(f\"Found {len(custom_requests)} tagged requests\")\n```\n\n### With DuckDB\n```python\nimport duckdb\n\nconn = duckdb.connect()\ndf = conn.execute(\"\"\"\n SELECT \n logger_custom_id,\n event_type,\n timestamp,\n json_extract_string(payload, '$.usage.total_tokens') as tokens\n FROM read_parquet('./logs/**/*.parquet')\n WHERE logger_custom_id != ''\n ORDER BY timestamp DESC\n\"\"\").df()\n```\n\n## Log Schema\n\n| Column | Type | Description |\n|--------|------|-------------|\n| `timestamp` | timestamp | Event time (UTC) |\n| `run_id` | string | Unique run ID |\n| `logger_custom_id` | string | Your custom ID |\n| `event_type` | string | llm_start/end/error |\n| `provider` | string | LLM provider |\n| `logger_metadata` | string | JSON metadata |\n| `payload` | string | JSON event data |\n\n## Important Notes\n\n### Notebook Usage\nIn Jupyter/Colab, use one of these approaches for immediate writes:\n- **Context manager** (recommended): `with ParquetLogger() as logger:`\n- **Small buffer**: `ParquetLogger(buffer_size=1)`\n- **Manual flush**: `logger.flush()`\n\n### File Organization\n```\nlogs/\n\u251c\u2500\u2500 date=2024-01-15/ # With partitioning (default)\n\u2502 \u2514\u2500\u2500 logs_143022_123456.parquet\n\u2514\u2500\u2500 date=2024-01-16/\n \u2514\u2500\u2500 logs_090122_345678.parquet\n```\n\n## Background Response Retrieval (v0.4.0+)\n\nRetrieve completed responses from OpenAI's background/async requests:\n\n```python\nimport pandas as pd\nimport openai\nfrom langchain_callback_parquet_logger import retrieve_background_responses, ParquetLogger\n\n# DataFrame with response IDs from background requests\ndf = pd.DataFrame({\n 'response_id': ['resp_123...', 'resp_456...'],\n 'logger_custom_id': ['user-001', 'user-002']\n})\n\n# Retrieve and log responses\nclient = openai.AsyncClient()\nwith ParquetLogger('./retrieval_logs') as logger:\n results = await retrieve_background_responses(\n df,\n client,\n logger=logger,\n show_progress=True,\n checkpoint_file='./checkpoint.parquet' # Resume capability\n )\n```\n\n### Features\n- **Automatic rate limiting** with exponential backoff\n- **Checkpoint/resume** for interrupted retrievals\n- **Memory-efficient mode** with `return_results=False`\n- **Progress tracking** with tqdm\n- **Structured logging** of attempts, completions, and errors\n\nSee [examples/retrieve_background_responses.py](examples/retrieve_background_responses.py) for detailed usage.\n\n## Examples\n\n- [`basic_usage.py`](examples/basic_usage.py) - Simple logging example\n- [`batch_processing.py`](examples/batch_processing.py) - Advanced batch processing with all features\n- [`simple_batch_example.py`](examples/simple_batch_example.py) - Before/after batch processing comparison\n- [`memory_efficient_batch.py`](examples/memory_efficient_batch.py) - Memory-efficient processing for huge DataFrames\n- [`partitioning_example.py`](examples/partitioning_example.py) - Partitioning strategies\n- [`retrieve_background_responses.py`](examples/retrieve_background_responses.py) - Background response retrieval\n\n## License\n\nMIT License - see LICENSE file\n\n## Contributing\n\nContributions welcome! Please submit a Pull Request.\n\n## Support\n\nFor issues and questions, use [GitHub issues](https://github.com/turbo3136/langchain-callback-parquet-logger/issues).\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "A Parquet-based callback handler for logging LangChain LLM interactions",
"version": "0.4.1",
"project_urls": {
"Homepage": "https://github.com/turbo3136/langchain-callback-parquet-logger",
"Issues": "https://github.com/turbo3136/langchain-callback-parquet-logger/issues",
"Repository": "https://github.com/turbo3136/langchain-callback-parquet-logger"
},
"split_keywords": [
"langchain",
" logging",
" parquet",
" llm",
" callback",
" monitoring"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "609ad3b616f32814a81b85661a30952073671911502ee1f30ce181fb828ceff6",
"md5": "1a12c9ce9fa8dc783e5a18074fee111a",
"sha256": "e9b6d0042ecb1a1434084b4e88337afb1844feab9cba83b5c2ed76b7aacb8114"
},
"downloads": -1,
"filename": "langchain_callback_parquet_logger-0.4.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "1a12c9ce9fa8dc783e5a18074fee111a",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 15559,
"upload_time": "2025-08-30T17:27:09",
"upload_time_iso_8601": "2025-08-30T17:27:09.482408Z",
"url": "https://files.pythonhosted.org/packages/60/9a/d3b616f32814a81b85661a30952073671911502ee1f30ce181fb828ceff6/langchain_callback_parquet_logger-0.4.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "77302c10086c836c03e3e31394f96404050a0d70aa72da14c225d1e16afe9157",
"md5": "356834720e805aac53c0d1e7de52cf24",
"sha256": "7fa8539d13cec3f4564085f52610f5280efff02aba09e34f2837f4af9f874faa"
},
"downloads": -1,
"filename": "langchain_callback_parquet_logger-0.4.1.tar.gz",
"has_sig": false,
"md5_digest": "356834720e805aac53c0d1e7de52cf24",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 21714,
"upload_time": "2025-08-30T17:27:10",
"upload_time_iso_8601": "2025-08-30T17:27:10.804649Z",
"url": "https://files.pythonhosted.org/packages/77/30/2c10086c836c03e3e31394f96404050a0d70aa72da14c225d1e16afe9157/langchain_callback_parquet_logger-0.4.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-08-30 17:27:10",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "turbo3136",
"github_project": "langchain-callback-parquet-logger",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"requirements": [
{
"name": "pyarrow",
"specs": [
[
">=",
"10.0.0"
]
]
},
{
"name": "langchain-core",
"specs": [
[
">=",
"0.1.0"
]
]
}
],
"lcname": "langchain-callback-parquet-logger"
}