# genai-processors-pydantic
[](https://pypi.org/project/genai-processors-pydantic/)
[](https://github.com/mbeacom/genai-processors-pydantic/actions/workflows/validate.yml)
[](https://codecov.io/github/mbeacom/pydantic-gemini-processor)
[](LICENSE)
A Pydantic validator processor for Google's [genai-processors](https://github.com/google-gemini/genai-processors) framework.
**Note:** This is an independent contrib processor that extends the genai-processors ecosystem.
## ⚠️ Important: Current Limitations & Roadmap
This processor was developed based on feedback from the genai-processors maintainers. While functional and tested, it has known limitations in certain scenarios. See [MAINTAINER_FEEDBACK.md](MAINTAINER_FEEDBACK.md) for detailed analysis and our roadmap to address these challenges:
* **Streaming**: Currently works best with complete JSON in single Parts
* **Tool Integration**: Planned support for `genai_types.ToolResponse` Parts
* **Multi-Model Validation**: Single-model design; multi-model support planned
* **MIME Type Independence**: ✅ Already handles unmarked JSON Parts
We're committed to addressing these limitations while maintaining a stable API.
## PydanticValidator
The PydanticValidator is a PartProcessor that validates the JSON content of a ProcessorPart against a specified [Pydantic](https://docs.pydantic.dev/latest/) model. It provides a simple, declarative way to enforce data schemas and improve the robustness of your AI pipelines.
## Motivation
In many AI applications, processors ingest data from external sources like user inputs or API calls. This data can be unpredictable or malformed. The PydanticValidator solves this by:
* **Preventing Errors:** It catches invalid data early, before it can cause errors in downstream processors like a GenaiModel or a database writer.
* **Ensuring Structure:** It guarantees that any data moving forward in the pipeline conforms to a reliable, expected structure.
* **Simplifying Logic:** It allows other processors to focus on their core tasks without being cluttered with boilerplate data validation code.
## Installation
Install the package from PyPI:
```bash
pip install genai-processors-pydantic
```
Or with uv:
```bash
uv add genai-processors-pydantic
```
This will automatically install the required dependencies:
* `genai-processors>=1.0.4`
* `pydantic>=2.0`
## Configuration
You can customize the validator's behavior by passing a ValidationConfig object during initialization.
```python
from genai_processors_pydantic import PydanticValidator, ValidationConfig
config = ValidationConfig(fail_on_error=True, strict_mode=True)
validator = PydanticValidator(MyModel, config=config)
```
### ValidationConfig Parameters
* fail_on_error (bool, default: False):
* If False, the processor will catch ValidationErrors, add error details to the part's metadata, and allow the stream to continue.
* If True, the processor will re-raise the ValidationError, stopping the stream immediately. This is useful for "fail-fast" scenarios.
* strict_mode (bool, default: False):
* If False, Pydantic will attempt to coerce types where possible (e.g., converting the string "123" to the integer 123).
* If True, Pydantic will enforce strict type matching and will not perform type coercion.
## Behavior and Metadata
The PydanticValidator processes parts that contain valid JSON in their text field. For each part it processes, it yields one or more new parts:
1. **The Result Part:** The original part, now with added metadata.
2. **A Status Part:** A message sent to the STATUS_STREAM indicating the outcome.
### On Successful Validation
* The yielded part's metadata['validation_status'] is set to 'success'.
* The metadata['validated_data'] contains the serialized dictionary representation of the validated data (ensuring ProcessorParts remain serializable).
* The part's text is updated to be the formatted JSON representation of the validated data.
* A processor.status() message like ✅ Successfully validated... is yielded.
### On Failed Validation
* The yielded part's metadata['validation_status'] is set to 'failure'.
* metadata['validation_errors'] contains a structured list of validation errors.
* metadata['original_data'] contains the raw data that failed validation.
* A processor.status() message like ❌ Validation failed... is yielded.
## Practical Demonstration: Building a Robust Pipeline
A common use case is to validate a stream of user data and route valid and invalid items to different downstream processors.
This example shows how to create a filter to separate the stream after validation.
### Example
```python
import asyncio
import json
from genai_processors import streams, processor
from genai_processors_pydantic import PydanticValidator
from pydantic import BaseModel, Field
# 1. Define the data schema.
class UserEvent(BaseModel):
user_id: int
event_name: str = Field(min_length=3)
# 2. Create the validator.
validator = PydanticValidator(model=UserEvent)
# 3. Define downstream processors for success and failure cases.
class DatabaseWriter(processor.PartProcessor):
async def call(self, part: processor.ProcessorPart):
validated_data = part.metadata['validated_data']
print(
f"DATABASE: Writing event '{validated_data['event_name']}' "
f"for user {validated_data['user_id']}"
)
yield part
class ErrorLogger(processor.PartProcessor):
async def call(self, part: processor.ProcessorPart):
errors = part.metadata['validation_errors']
print(f"ERROR_LOG: Found {len(errors)} validation errors.")
yield part
# 4. Create a stream with mixed-quality data.
input_stream = streams.stream_content([
# Valid example
processor.ProcessorPart(json.dumps({"user_id": 101, "event_name": "login"})),
# Invalid user_id
processor.ProcessorPart(json.dumps({"user_id": "102", "event_name": "logout"})),
# Invalid event_name
processor.ProcessorPart(json.dumps({"user_id": 103, "event_name": "up"})),
# Ignore this part
processor.ProcessorPart("This is not a JSON part and will be ignored."),
])
# 5. Build and run the pipeline.
async def main():
print("--- Running Validation Pipeline ---")
# Process each input part through the validator as it arrives
# This avoids buffering the entire stream in memory
valid_parts = []
invalid_parts = []
async for input_part in input_stream:
async for validated_part in validator(input_part):
# Filter based on validation status (skip status messages)
status = validated_part.metadata.get("validation_status")
if status == "success":
valid_parts.append(validated_part)
elif status == "failure":
invalid_parts.append(validated_part)
# Create streams from the filtered parts
valid_stream = streams.stream_content(valid_parts)
invalid_stream = streams.stream_content(invalid_parts)
# Create processor instances
db_writer = DatabaseWriter()
error_logger = ErrorLogger()
# Process both streams concurrently
async def process_valid():
async for part in valid_stream:
async for result in db_writer(part):
pass # Results are printed in the processor
async def process_invalid():
async for part in invalid_stream:
async for result in error_logger(part):
pass # Results are printed in the processor
# Run both processing pipelines concurrently
await asyncio.gather(process_valid(), process_invalid())
print("--- Pipeline Finished ---")
if __name__ == "__main__":
asyncio.run(main())
# Expected Output:
# --- Running Validation Pipeline ---
# DATABASE: Writing event 'login' for user 101
# ERROR_LOG: Found 1 validation errors.
# ERROR_LOG: Found 1 validation errors.
# --- Pipeline Finished ---
```
Raw data
{
"_id": null,
"home_page": null,
"name": "genai-processors-pydantic",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.11",
"maintainer_email": null,
"keywords": "ai, contrib, deepmind, gemini, genai, genai-processors, genai-processors-contrib, generative-ai, google, processor, processors, pydantic, schema, validation",
"author": null,
"author_email": "Mark Beacom <m@beacom.dev>",
"download_url": "https://files.pythonhosted.org/packages/91/0f/39cc47e7d7632e506f84bf3a202639a418b9ce388702794a27b978dc28d7/genai_processors_pydantic-0.1.1.tar.gz",
"platform": null,
"description": "# genai-processors-pydantic\n\n[](https://pypi.org/project/genai-processors-pydantic/)\n[](https://github.com/mbeacom/genai-processors-pydantic/actions/workflows/validate.yml)\n[](https://codecov.io/github/mbeacom/pydantic-gemini-processor)\n[](LICENSE)\n\nA Pydantic validator processor for Google's [genai-processors](https://github.com/google-gemini/genai-processors) framework.\n\n**Note:** This is an independent contrib processor that extends the genai-processors ecosystem.\n\n## \u26a0\ufe0f Important: Current Limitations & Roadmap\n\nThis processor was developed based on feedback from the genai-processors maintainers. While functional and tested, it has known limitations in certain scenarios. See [MAINTAINER_FEEDBACK.md](MAINTAINER_FEEDBACK.md) for detailed analysis and our roadmap to address these challenges:\n\n* **Streaming**: Currently works best with complete JSON in single Parts\n* **Tool Integration**: Planned support for `genai_types.ToolResponse` Parts\n* **Multi-Model Validation**: Single-model design; multi-model support planned\n* **MIME Type Independence**: \u2705 Already handles unmarked JSON Parts\n\nWe're committed to addressing these limitations while maintaining a stable API.\n\n## PydanticValidator\n\nThe PydanticValidator is a PartProcessor that validates the JSON content of a ProcessorPart against a specified [Pydantic](https://docs.pydantic.dev/latest/) model. It provides a simple, declarative way to enforce data schemas and improve the robustness of your AI pipelines.\n\n## Motivation\n\nIn many AI applications, processors ingest data from external sources like user inputs or API calls. This data can be unpredictable or malformed. The PydanticValidator solves this by:\n\n* **Preventing Errors:** It catches invalid data early, before it can cause errors in downstream processors like a GenaiModel or a database writer.\n* **Ensuring Structure:** It guarantees that any data moving forward in the pipeline conforms to a reliable, expected structure.\n* **Simplifying Logic:** It allows other processors to focus on their core tasks without being cluttered with boilerplate data validation code.\n\n## Installation\n\nInstall the package from PyPI:\n\n```bash\npip install genai-processors-pydantic\n```\n\nOr with uv:\n\n```bash\nuv add genai-processors-pydantic\n```\n\nThis will automatically install the required dependencies:\n\n* `genai-processors>=1.0.4`\n* `pydantic>=2.0`\n\n## Configuration\n\nYou can customize the validator's behavior by passing a ValidationConfig object during initialization.\n\n```python\nfrom genai_processors_pydantic import PydanticValidator, ValidationConfig\n\nconfig = ValidationConfig(fail_on_error=True, strict_mode=True)\nvalidator = PydanticValidator(MyModel, config=config)\n```\n\n### ValidationConfig Parameters\n\n* fail_on_error (bool, default: False):\n * If False, the processor will catch ValidationErrors, add error details to the part's metadata, and allow the stream to continue.\n * If True, the processor will re-raise the ValidationError, stopping the stream immediately. This is useful for \"fail-fast\" scenarios.\n* strict_mode (bool, default: False):\n * If False, Pydantic will attempt to coerce types where possible (e.g., converting the string \"123\" to the integer 123).\n * If True, Pydantic will enforce strict type matching and will not perform type coercion.\n\n## Behavior and Metadata\n\nThe PydanticValidator processes parts that contain valid JSON in their text field. For each part it processes, it yields one or more new parts:\n\n1. **The Result Part:** The original part, now with added metadata.\n2. **A Status Part:** A message sent to the STATUS_STREAM indicating the outcome.\n\n### On Successful Validation\n\n* The yielded part's metadata['validation_status'] is set to 'success'.\n* The metadata['validated_data'] contains the serialized dictionary representation of the validated data (ensuring ProcessorParts remain serializable).\n* The part's text is updated to be the formatted JSON representation of the validated data.\n* A processor.status() message like \u2705 Successfully validated... is yielded.\n\n### On Failed Validation\n\n* The yielded part's metadata['validation_status'] is set to 'failure'.\n* metadata['validation_errors'] contains a structured list of validation errors.\n* metadata['original_data'] contains the raw data that failed validation.\n* A processor.status() message like \u274c Validation failed... is yielded.\n\n## Practical Demonstration: Building a Robust Pipeline\n\nA common use case is to validate a stream of user data and route valid and invalid items to different downstream processors.\nThis example shows how to create a filter to separate the stream after validation.\n\n### Example\n\n```python\nimport asyncio\nimport json\n\nfrom genai_processors import streams, processor\nfrom genai_processors_pydantic import PydanticValidator\nfrom pydantic import BaseModel, Field\n\n\n# 1. Define the data schema.\nclass UserEvent(BaseModel):\n user_id: int\n event_name: str = Field(min_length=3)\n\n\n# 2. Create the validator.\nvalidator = PydanticValidator(model=UserEvent)\n\n# 3. Define downstream processors for success and failure cases.\nclass DatabaseWriter(processor.PartProcessor):\n async def call(self, part: processor.ProcessorPart):\n validated_data = part.metadata['validated_data']\n print(\n f\"DATABASE: Writing event '{validated_data['event_name']}' \"\n f\"for user {validated_data['user_id']}\"\n )\n yield part\n\n\nclass ErrorLogger(processor.PartProcessor):\n async def call(self, part: processor.ProcessorPart):\n errors = part.metadata['validation_errors']\n print(f\"ERROR_LOG: Found {len(errors)} validation errors.\")\n yield part\n\n\n# 4. Create a stream with mixed-quality data.\ninput_stream = streams.stream_content([\n # Valid example\n processor.ProcessorPart(json.dumps({\"user_id\": 101, \"event_name\": \"login\"})),\n # Invalid user_id\n processor.ProcessorPart(json.dumps({\"user_id\": \"102\", \"event_name\": \"logout\"})),\n # Invalid event_name\n processor.ProcessorPart(json.dumps({\"user_id\": 103, \"event_name\": \"up\"})),\n # Ignore this part\n processor.ProcessorPart(\"This is not a JSON part and will be ignored.\"),\n])\n\n\n# 5. Build and run the pipeline.\nasync def main():\n print(\"--- Running Validation Pipeline ---\")\n\n # Process each input part through the validator as it arrives\n # This avoids buffering the entire stream in memory\n valid_parts = []\n invalid_parts = []\n\n async for input_part in input_stream:\n async for validated_part in validator(input_part):\n # Filter based on validation status (skip status messages)\n status = validated_part.metadata.get(\"validation_status\")\n if status == \"success\":\n valid_parts.append(validated_part)\n elif status == \"failure\":\n invalid_parts.append(validated_part)\n\n # Create streams from the filtered parts\n valid_stream = streams.stream_content(valid_parts)\n invalid_stream = streams.stream_content(invalid_parts)\n\n # Create processor instances\n db_writer = DatabaseWriter()\n error_logger = ErrorLogger()\n\n # Process both streams concurrently\n async def process_valid():\n async for part in valid_stream:\n async for result in db_writer(part):\n pass # Results are printed in the processor\n\n async def process_invalid():\n async for part in invalid_stream:\n async for result in error_logger(part):\n pass # Results are printed in the processor\n\n # Run both processing pipelines concurrently\n await asyncio.gather(process_valid(), process_invalid())\n print(\"--- Pipeline Finished ---\")\n\n\nif __name__ == \"__main__\":\n asyncio.run(main())\n\n\n# Expected Output:\n# --- Running Validation Pipeline ---\n# DATABASE: Writing event 'login' for user 101\n# ERROR_LOG: Found 1 validation errors.\n# ERROR_LOG: Found 1 validation errors.\n# --- Pipeline Finished ---\n```\n",
"bugtrack_url": null,
"license": "Apache-2.0",
"summary": "A Pydantic validator processor for Google's genai-processors framework",
"version": "0.1.1",
"project_urls": {
"Documentation": "https://github.com/mbeacom/genai-processors-pydantic#readme",
"Homepage": "https://github.com/mbeacom/genai-processors-pydantic",
"Issues": "https://github.com/mbeacom/genai-processors-pydantic/issues",
"Repository": "https://github.com/mbeacom/genai-processors-pydantic.git"
},
"split_keywords": [
"ai",
" contrib",
" deepmind",
" gemini",
" genai",
" genai-processors",
" genai-processors-contrib",
" generative-ai",
" google",
" processor",
" processors",
" pydantic",
" schema",
" validation"
],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "b11cf1974e92ac13b3e96aedbb9f2f7719d41ca5ae23bcee87bef25e7858d585",
"md5": "3d4e8370a8883c6bd1ce3795ac550d7a",
"sha256": "59ecd3d14dc2fc0f79b0d89b24c93e3d07641e129cd9ffd2b7a2b636ad7aa709"
},
"downloads": -1,
"filename": "genai_processors_pydantic-0.1.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "3d4e8370a8883c6bd1ce3795ac550d7a",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.11",
"size": 13933,
"upload_time": "2025-08-04T15:35:07",
"upload_time_iso_8601": "2025-08-04T15:35:07.059126Z",
"url": "https://files.pythonhosted.org/packages/b1/1c/f1974e92ac13b3e96aedbb9f2f7719d41ca5ae23bcee87bef25e7858d585/genai_processors_pydantic-0.1.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "910f39cc47e7d7632e506f84bf3a202639a418b9ce388702794a27b978dc28d7",
"md5": "95aa7e032e979b0b6f99914a0298361a",
"sha256": "c08ef4743bee7c94cb21dfdcc591b1b24e7c27b2dacf73c70b4ec2e8600d3856"
},
"downloads": -1,
"filename": "genai_processors_pydantic-0.1.1.tar.gz",
"has_sig": false,
"md5_digest": "95aa7e032e979b0b6f99914a0298361a",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.11",
"size": 17572,
"upload_time": "2025-08-04T15:35:07",
"upload_time_iso_8601": "2025-08-04T15:35:07.879822Z",
"url": "https://files.pythonhosted.org/packages/91/0f/39cc47e7d7632e506f84bf3a202639a418b9ce388702794a27b978dc28d7/genai_processors_pydantic-0.1.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-08-04 15:35:07",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "mbeacom",
"github_project": "genai-processors-pydantic#readme",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "genai-processors-pydantic"
}