genai-processors-pydantic


Namegenai-processors-pydantic JSON
Version 0.1.1 PyPI version JSON
download
home_pageNone
SummaryA Pydantic validator processor for Google's genai-processors framework
upload_time2025-08-04 15:35:07
maintainerNone
docs_urlNone
authorNone
requires_python>=3.11
licenseApache-2.0
keywords ai contrib deepmind gemini genai genai-processors genai-processors-contrib generative-ai google processor processors pydantic schema validation
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # genai-processors-pydantic

[![PyPI version](https://img.shields.io/pypi/v/genai-processors-pydantic.svg)](https://pypi.org/project/genai-processors-pydantic/)
[![Validation](https://github.com/mbeacom/genai-processors-pydantic/actions/workflows/validate.yml/badge.svg)](https://github.com/mbeacom/genai-processors-pydantic/actions/workflows/validate.yml)
[![codecov](https://codecov.io/github/mbeacom/pydantic-gemini-processor/graph/badge.svg?token=9Ue94I4FEw)](https://codecov.io/github/mbeacom/pydantic-gemini-processor)
[![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](LICENSE)

A Pydantic validator processor for Google's [genai-processors](https://github.com/google-gemini/genai-processors) framework.

**Note:** This is an independent contrib processor that extends the genai-processors ecosystem.

## ⚠️ Important: Current Limitations & Roadmap

This processor was developed based on feedback from the genai-processors maintainers. While functional and tested, it has known limitations in certain scenarios. See [MAINTAINER_FEEDBACK.md](MAINTAINER_FEEDBACK.md) for detailed analysis and our roadmap to address these challenges:

* **Streaming**: Currently works best with complete JSON in single Parts
* **Tool Integration**: Planned support for `genai_types.ToolResponse` Parts
* **Multi-Model Validation**: Single-model design; multi-model support planned
* **MIME Type Independence**: ✅ Already handles unmarked JSON Parts

We're committed to addressing these limitations while maintaining a stable API.

## PydanticValidator

The PydanticValidator is a PartProcessor that validates the JSON content of a ProcessorPart against a specified [Pydantic](https://docs.pydantic.dev/latest/) model. It provides a simple, declarative way to enforce data schemas and improve the robustness of your AI pipelines.

## Motivation

In many AI applications, processors ingest data from external sources like user inputs or API calls. This data can be unpredictable or malformed. The PydanticValidator solves this by:

* **Preventing Errors:** It catches invalid data early, before it can cause errors in downstream processors like a GenaiModel or a database writer.
* **Ensuring Structure:** It guarantees that any data moving forward in the pipeline conforms to a reliable, expected structure.
* **Simplifying Logic:** It allows other processors to focus on their core tasks without being cluttered with boilerplate data validation code.

## Installation

Install the package from PyPI:

```bash
pip install genai-processors-pydantic
```

Or with uv:

```bash
uv add genai-processors-pydantic
```

This will automatically install the required dependencies:

* `genai-processors>=1.0.4`
* `pydantic>=2.0`

## Configuration

You can customize the validator's behavior by passing a ValidationConfig object during initialization.

```python
from genai_processors_pydantic import PydanticValidator, ValidationConfig

config = ValidationConfig(fail_on_error=True, strict_mode=True)
validator = PydanticValidator(MyModel, config=config)
```

### ValidationConfig Parameters

* fail_on_error (bool, default: False):
  * If False, the processor will catch ValidationErrors, add error details to the part's metadata, and allow the stream to continue.
  * If True, the processor will re-raise the ValidationError, stopping the stream immediately. This is useful for "fail-fast" scenarios.
* strict_mode (bool, default: False):
  * If False, Pydantic will attempt to coerce types where possible (e.g., converting the string "123" to the integer 123).
  * If True, Pydantic will enforce strict type matching and will not perform type coercion.

## Behavior and Metadata

The PydanticValidator processes parts that contain valid JSON in their text field. For each part it processes, it yields one or more new parts:

1. **The Result Part:** The original part, now with added metadata.
2. **A Status Part:** A message sent to the STATUS_STREAM indicating the outcome.

### On Successful Validation

* The yielded part's metadata['validation_status'] is set to 'success'.
* The metadata['validated_data'] contains the serialized dictionary representation of the validated data (ensuring ProcessorParts remain serializable).
* The part's text is updated to be the formatted JSON representation of the validated data.
* A processor.status() message like ✅ Successfully validated... is yielded.

### On Failed Validation

* The yielded part's metadata['validation_status'] is set to 'failure'.
* metadata['validation_errors'] contains a structured list of validation errors.
* metadata['original_data'] contains the raw data that failed validation.
* A processor.status() message like ❌ Validation failed... is yielded.

## Practical Demonstration: Building a Robust Pipeline

A common use case is to validate a stream of user data and route valid and invalid items to different downstream processors.
This example shows how to create a filter to separate the stream after validation.

### Example

```python
import asyncio
import json

from genai_processors import streams, processor
from genai_processors_pydantic import PydanticValidator
from pydantic import BaseModel, Field


# 1. Define the data schema.
class UserEvent(BaseModel):
    user_id: int
    event_name: str = Field(min_length=3)


# 2. Create the validator.
validator = PydanticValidator(model=UserEvent)

# 3. Define downstream processors for success and failure cases.
class DatabaseWriter(processor.PartProcessor):
    async def call(self, part: processor.ProcessorPart):
        validated_data = part.metadata['validated_data']
        print(
            f"DATABASE: Writing event '{validated_data['event_name']}' "
            f"for user {validated_data['user_id']}"
        )
        yield part


class ErrorLogger(processor.PartProcessor):
    async def call(self, part: processor.ProcessorPart):
        errors = part.metadata['validation_errors']
        print(f"ERROR_LOG: Found {len(errors)} validation errors.")
        yield part


# 4. Create a stream with mixed-quality data.
input_stream = streams.stream_content([
    # Valid example
    processor.ProcessorPart(json.dumps({"user_id": 101, "event_name": "login"})),
    # Invalid user_id
    processor.ProcessorPart(json.dumps({"user_id": "102", "event_name": "logout"})),
    # Invalid event_name
    processor.ProcessorPart(json.dumps({"user_id": 103, "event_name": "up"})),
    # Ignore this part
    processor.ProcessorPart("This is not a JSON part and will be ignored."),
])


# 5. Build and run the pipeline.
async def main():
    print("--- Running Validation Pipeline ---")

    # Process each input part through the validator as it arrives
    # This avoids buffering the entire stream in memory
    valid_parts = []
    invalid_parts = []

    async for input_part in input_stream:
        async for validated_part in validator(input_part):
            # Filter based on validation status (skip status messages)
            status = validated_part.metadata.get("validation_status")
            if status == "success":
                valid_parts.append(validated_part)
            elif status == "failure":
                invalid_parts.append(validated_part)

    # Create streams from the filtered parts
    valid_stream = streams.stream_content(valid_parts)
    invalid_stream = streams.stream_content(invalid_parts)

    # Create processor instances
    db_writer = DatabaseWriter()
    error_logger = ErrorLogger()

    # Process both streams concurrently
    async def process_valid():
        async for part in valid_stream:
            async for result in db_writer(part):
                pass  # Results are printed in the processor

    async def process_invalid():
        async for part in invalid_stream:
            async for result in error_logger(part):
                pass  # Results are printed in the processor

    # Run both processing pipelines concurrently
    await asyncio.gather(process_valid(), process_invalid())
    print("--- Pipeline Finished ---")


if __name__ == "__main__":
    asyncio.run(main())


# Expected Output:
# --- Running Validation Pipeline ---
# DATABASE: Writing event 'login' for user 101
# ERROR_LOG: Found 1 validation errors.
# ERROR_LOG: Found 1 validation errors.
# --- Pipeline Finished ---
```

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "genai-processors-pydantic",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.11",
    "maintainer_email": null,
    "keywords": "ai, contrib, deepmind, gemini, genai, genai-processors, genai-processors-contrib, generative-ai, google, processor, processors, pydantic, schema, validation",
    "author": null,
    "author_email": "Mark Beacom <m@beacom.dev>",
    "download_url": "https://files.pythonhosted.org/packages/91/0f/39cc47e7d7632e506f84bf3a202639a418b9ce388702794a27b978dc28d7/genai_processors_pydantic-0.1.1.tar.gz",
    "platform": null,
    "description": "# genai-processors-pydantic\n\n[![PyPI version](https://img.shields.io/pypi/v/genai-processors-pydantic.svg)](https://pypi.org/project/genai-processors-pydantic/)\n[![Validation](https://github.com/mbeacom/genai-processors-pydantic/actions/workflows/validate.yml/badge.svg)](https://github.com/mbeacom/genai-processors-pydantic/actions/workflows/validate.yml)\n[![codecov](https://codecov.io/github/mbeacom/pydantic-gemini-processor/graph/badge.svg?token=9Ue94I4FEw)](https://codecov.io/github/mbeacom/pydantic-gemini-processor)\n[![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](LICENSE)\n\nA Pydantic validator processor for Google's [genai-processors](https://github.com/google-gemini/genai-processors) framework.\n\n**Note:** This is an independent contrib processor that extends the genai-processors ecosystem.\n\n## \u26a0\ufe0f Important: Current Limitations & Roadmap\n\nThis processor was developed based on feedback from the genai-processors maintainers. While functional and tested, it has known limitations in certain scenarios. See [MAINTAINER_FEEDBACK.md](MAINTAINER_FEEDBACK.md) for detailed analysis and our roadmap to address these challenges:\n\n* **Streaming**: Currently works best with complete JSON in single Parts\n* **Tool Integration**: Planned support for `genai_types.ToolResponse` Parts\n* **Multi-Model Validation**: Single-model design; multi-model support planned\n* **MIME Type Independence**: \u2705 Already handles unmarked JSON Parts\n\nWe're committed to addressing these limitations while maintaining a stable API.\n\n## PydanticValidator\n\nThe PydanticValidator is a PartProcessor that validates the JSON content of a ProcessorPart against a specified [Pydantic](https://docs.pydantic.dev/latest/) model. It provides a simple, declarative way to enforce data schemas and improve the robustness of your AI pipelines.\n\n## Motivation\n\nIn many AI applications, processors ingest data from external sources like user inputs or API calls. This data can be unpredictable or malformed. The PydanticValidator solves this by:\n\n* **Preventing Errors:** It catches invalid data early, before it can cause errors in downstream processors like a GenaiModel or a database writer.\n* **Ensuring Structure:** It guarantees that any data moving forward in the pipeline conforms to a reliable, expected structure.\n* **Simplifying Logic:** It allows other processors to focus on their core tasks without being cluttered with boilerplate data validation code.\n\n## Installation\n\nInstall the package from PyPI:\n\n```bash\npip install genai-processors-pydantic\n```\n\nOr with uv:\n\n```bash\nuv add genai-processors-pydantic\n```\n\nThis will automatically install the required dependencies:\n\n* `genai-processors>=1.0.4`\n* `pydantic>=2.0`\n\n## Configuration\n\nYou can customize the validator's behavior by passing a ValidationConfig object during initialization.\n\n```python\nfrom genai_processors_pydantic import PydanticValidator, ValidationConfig\n\nconfig = ValidationConfig(fail_on_error=True, strict_mode=True)\nvalidator = PydanticValidator(MyModel, config=config)\n```\n\n### ValidationConfig Parameters\n\n* fail_on_error (bool, default: False):\n  * If False, the processor will catch ValidationErrors, add error details to the part's metadata, and allow the stream to continue.\n  * If True, the processor will re-raise the ValidationError, stopping the stream immediately. This is useful for \"fail-fast\" scenarios.\n* strict_mode (bool, default: False):\n  * If False, Pydantic will attempt to coerce types where possible (e.g., converting the string \"123\" to the integer 123).\n  * If True, Pydantic will enforce strict type matching and will not perform type coercion.\n\n## Behavior and Metadata\n\nThe PydanticValidator processes parts that contain valid JSON in their text field. For each part it processes, it yields one or more new parts:\n\n1. **The Result Part:** The original part, now with added metadata.\n2. **A Status Part:** A message sent to the STATUS_STREAM indicating the outcome.\n\n### On Successful Validation\n\n* The yielded part's metadata['validation_status'] is set to 'success'.\n* The metadata['validated_data'] contains the serialized dictionary representation of the validated data (ensuring ProcessorParts remain serializable).\n* The part's text is updated to be the formatted JSON representation of the validated data.\n* A processor.status() message like \u2705 Successfully validated... is yielded.\n\n### On Failed Validation\n\n* The yielded part's metadata['validation_status'] is set to 'failure'.\n* metadata['validation_errors'] contains a structured list of validation errors.\n* metadata['original_data'] contains the raw data that failed validation.\n* A processor.status() message like \u274c Validation failed... is yielded.\n\n## Practical Demonstration: Building a Robust Pipeline\n\nA common use case is to validate a stream of user data and route valid and invalid items to different downstream processors.\nThis example shows how to create a filter to separate the stream after validation.\n\n### Example\n\n```python\nimport asyncio\nimport json\n\nfrom genai_processors import streams, processor\nfrom genai_processors_pydantic import PydanticValidator\nfrom pydantic import BaseModel, Field\n\n\n# 1. Define the data schema.\nclass UserEvent(BaseModel):\n    user_id: int\n    event_name: str = Field(min_length=3)\n\n\n# 2. Create the validator.\nvalidator = PydanticValidator(model=UserEvent)\n\n# 3. Define downstream processors for success and failure cases.\nclass DatabaseWriter(processor.PartProcessor):\n    async def call(self, part: processor.ProcessorPart):\n        validated_data = part.metadata['validated_data']\n        print(\n            f\"DATABASE: Writing event '{validated_data['event_name']}' \"\n            f\"for user {validated_data['user_id']}\"\n        )\n        yield part\n\n\nclass ErrorLogger(processor.PartProcessor):\n    async def call(self, part: processor.ProcessorPart):\n        errors = part.metadata['validation_errors']\n        print(f\"ERROR_LOG: Found {len(errors)} validation errors.\")\n        yield part\n\n\n# 4. Create a stream with mixed-quality data.\ninput_stream = streams.stream_content([\n    # Valid example\n    processor.ProcessorPart(json.dumps({\"user_id\": 101, \"event_name\": \"login\"})),\n    # Invalid user_id\n    processor.ProcessorPart(json.dumps({\"user_id\": \"102\", \"event_name\": \"logout\"})),\n    # Invalid event_name\n    processor.ProcessorPart(json.dumps({\"user_id\": 103, \"event_name\": \"up\"})),\n    # Ignore this part\n    processor.ProcessorPart(\"This is not a JSON part and will be ignored.\"),\n])\n\n\n# 5. Build and run the pipeline.\nasync def main():\n    print(\"--- Running Validation Pipeline ---\")\n\n    # Process each input part through the validator as it arrives\n    # This avoids buffering the entire stream in memory\n    valid_parts = []\n    invalid_parts = []\n\n    async for input_part in input_stream:\n        async for validated_part in validator(input_part):\n            # Filter based on validation status (skip status messages)\n            status = validated_part.metadata.get(\"validation_status\")\n            if status == \"success\":\n                valid_parts.append(validated_part)\n            elif status == \"failure\":\n                invalid_parts.append(validated_part)\n\n    # Create streams from the filtered parts\n    valid_stream = streams.stream_content(valid_parts)\n    invalid_stream = streams.stream_content(invalid_parts)\n\n    # Create processor instances\n    db_writer = DatabaseWriter()\n    error_logger = ErrorLogger()\n\n    # Process both streams concurrently\n    async def process_valid():\n        async for part in valid_stream:\n            async for result in db_writer(part):\n                pass  # Results are printed in the processor\n\n    async def process_invalid():\n        async for part in invalid_stream:\n            async for result in error_logger(part):\n                pass  # Results are printed in the processor\n\n    # Run both processing pipelines concurrently\n    await asyncio.gather(process_valid(), process_invalid())\n    print(\"--- Pipeline Finished ---\")\n\n\nif __name__ == \"__main__\":\n    asyncio.run(main())\n\n\n# Expected Output:\n# --- Running Validation Pipeline ---\n# DATABASE: Writing event 'login' for user 101\n# ERROR_LOG: Found 1 validation errors.\n# ERROR_LOG: Found 1 validation errors.\n# --- Pipeline Finished ---\n```\n",
    "bugtrack_url": null,
    "license": "Apache-2.0",
    "summary": "A Pydantic validator processor for Google's genai-processors framework",
    "version": "0.1.1",
    "project_urls": {
        "Documentation": "https://github.com/mbeacom/genai-processors-pydantic#readme",
        "Homepage": "https://github.com/mbeacom/genai-processors-pydantic",
        "Issues": "https://github.com/mbeacom/genai-processors-pydantic/issues",
        "Repository": "https://github.com/mbeacom/genai-processors-pydantic.git"
    },
    "split_keywords": [
        "ai",
        " contrib",
        " deepmind",
        " gemini",
        " genai",
        " genai-processors",
        " genai-processors-contrib",
        " generative-ai",
        " google",
        " processor",
        " processors",
        " pydantic",
        " schema",
        " validation"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "b11cf1974e92ac13b3e96aedbb9f2f7719d41ca5ae23bcee87bef25e7858d585",
                "md5": "3d4e8370a8883c6bd1ce3795ac550d7a",
                "sha256": "59ecd3d14dc2fc0f79b0d89b24c93e3d07641e129cd9ffd2b7a2b636ad7aa709"
            },
            "downloads": -1,
            "filename": "genai_processors_pydantic-0.1.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "3d4e8370a8883c6bd1ce3795ac550d7a",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.11",
            "size": 13933,
            "upload_time": "2025-08-04T15:35:07",
            "upload_time_iso_8601": "2025-08-04T15:35:07.059126Z",
            "url": "https://files.pythonhosted.org/packages/b1/1c/f1974e92ac13b3e96aedbb9f2f7719d41ca5ae23bcee87bef25e7858d585/genai_processors_pydantic-0.1.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "910f39cc47e7d7632e506f84bf3a202639a418b9ce388702794a27b978dc28d7",
                "md5": "95aa7e032e979b0b6f99914a0298361a",
                "sha256": "c08ef4743bee7c94cb21dfdcc591b1b24e7c27b2dacf73c70b4ec2e8600d3856"
            },
            "downloads": -1,
            "filename": "genai_processors_pydantic-0.1.1.tar.gz",
            "has_sig": false,
            "md5_digest": "95aa7e032e979b0b6f99914a0298361a",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.11",
            "size": 17572,
            "upload_time": "2025-08-04T15:35:07",
            "upload_time_iso_8601": "2025-08-04T15:35:07.879822Z",
            "url": "https://files.pythonhosted.org/packages/91/0f/39cc47e7d7632e506f84bf3a202639a418b9ce388702794a27b978dc28d7/genai_processors_pydantic-0.1.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-08-04 15:35:07",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "mbeacom",
    "github_project": "genai-processors-pydantic#readme",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "genai-processors-pydantic"
}
        
Elapsed time: 1.31175s