airflow-spark-on-k8s-job-builder


Nameairflow-spark-on-k8s-job-builder JSON
Version 0.4.4 PyPI version JSON
download
home_pagehttps://github.com/yourusername/my_library
SummaryA library to limit the amount of unnecessary boilerplate code required when launching Spark Jobs against k8s Spark-Operator in Airflow
upload_time2025-07-17 13:35:39
maintainerNone
docs_urlNone
authorStroeer Labs Data Engineering
requires_python>=3.6
licenseNone
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            
# Airflow Spark-on-k8s Job Builder

Trying to avoid excessive boilerplate code duplication for building a DAG that will submit a Spark job to Spark-Operator in a Kubernetes cluster.

## Using this library

This library was only tested together with Airflow deployments versions `2.7.2` and `2.10.3`, and on AWS EKS. 

### airflow version 2.7.2
In version `2.7.2` we had to use the SparkSensor in order to follow progress and infer when a spark job was successfully completed. For that we pass `use_sensor=True` to the `SparkK8sJobBuilder` constructor.

Here is an example of how to use this library:
```python

from airflow import DAG
from datetime import datetime, timedelta

from airflow_spark_on_k8s_job_builder.core import SparkK8sJobBuilder

default_args = {
    "owner": "data-engineering-team",
    "retries": 3,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=2),
}

DAG_ID = "spark_pi"
TASK_ID = "spark-pi-submit"
TASK_NAMESPACE = "airflow"

builder = SparkK8sJobBuilder(
    task_id=TASK_ID,
    job_name="test-spark-on-k8s",
    main_class="org.apache.spark.examples.SparkPi",
    main_application_file="local:///opt/spark/examples/jars/spark-examples_2.12-3.4.1.jar",
    service_account=TASK_NAMESPACE,
    namespace=TASK_NAMESPACE,
    docker_img="apache/spark",
    docker_img_tag="3.4.1-scala2.12-java11-ubuntu",
    use_sensor=True,
)

dag_params = builder.build_dag_params(
    extra_params={
        "env": "dev",
    },
)

with (DAG(
        DAG_ID,
        default_args=default_args,
        description="An example spark k8s task to exemplify how one can build "
                    "spark airflow coordinates apps on top of k8s",
        params=dag_params,
        schedule_interval=timedelta(days=1),
        catchup=False,
        doc_md=__doc__,
        start_date=datetime(2024, 11, 21),
        dagrun_timeout=timedelta(hours=3),
        max_active_runs=1,
        template_searchpath=[
            # we need this to instruct airflow where it can find the spark yaml file
            "/home/airflow/.local/lib/python3.9/site-packages/airflow_spark_on_k8s_job_builder/",
            "/opt/airflow/dags/",
        ],
        tags=["tutorials", "hello-world"],
) as dag):
    spark_tasks = builder \
        .set_dag(dag) \
        .set_executor_cores(2) \
        .set_executor_instances(2) \
        .set_executor_memory("4g") \
        .build()
    spark_tasks[0] >> spark_tasks[1]
```


### airflow version 2.10.3
In version `2.10.3` the sensor was not needed anymore, but we had to [implement a workaround](https://github.com/apache/airflow/issues/39184) for `SparkKubernetesOperator` to be able to understand that spark job was successfully completed.
To use that fix, you can either call `.setup_xcom_sidecar_container()` method on the builder object, or, alternatively, instantiate the builder and pass the option `update_xcom_sidecar_container=True` (directly in `SparkK8sJobBuilder` constructor).

Here is an example of how to use this library:

```python

from datetime import datetime, timedelta

from airflow import DAG

from airflow_spark_on_k8s_job_builder.core import SparkK8sJobBuilder

default_args = {
    "owner": "data-engineering-team",
    "retries": 3,
    "email_on_retry": False,
    "retry_delay": timedelta(minutes=2),
}

DAG_ID = "spark_pi"
TASK_ID = "spark-pi-submit"
TASK_NAMESPACE = "airflow"

builder = SparkK8sJobBuilder(
    task_id=TASK_ID,
    job_name="test-spark-on-k8s",
    main_class="org.apache.spark.examples.SparkPi",
    main_application_file="local:///opt/spark/examples/jars/spark-examples_2.12-3.4.1.jar",
    service_account=TASK_NAMESPACE,
    namespace=TASK_NAMESPACE,
    docker_img="apache/spark",
    docker_img_tag="3.4.1-scala2.12-java11-ubuntu",
    use_sensor=False,
)

dag_params = builder.build_dag_params(
    extra_params={
        "env": "dev",
    },
)

with (DAG(
        DAG_ID,
        default_args=default_args,
        description="An example spark k8s task to exemplify how one can build "
                    "spark airflow coordinates apps on top of k8s",
        params=dag_params,
        schedule_interval=timedelta(days=1),
        catchup=False,
        doc_md=__doc__,
        start_date=datetime(2024, 11, 21),
        dagrun_timeout=timedelta(hours=3),
        max_active_runs=1,
        template_searchpath=[
            # we need this to instruct airflow where it can find the spark yaml file
            "/home/airflow/.local/lib/python3.9/site-packages/airflow_spark_on_k8s_job_builder/",
            "/opt/airflow/dags/",
        ],
        tags=["tutorials", "hello-world"],
) as dag):
    builder \
        .set_dag(dag) \
        .set_executor_cores(2) \
        .set_executor_instances(2) \
        .set_executor_memory("4g") \
        .setup_xcom_sidecar_container() \
        .build()
```
Note that the library also contains the yaml template for the spark job, which is used by the `SparkK8sJobBuilder` to build the DAG. So depending on where you install it, you'll need to reference that same path in the `template_searchpath` option.


## Development

This project uses python 3.9 for development, and pip-compile for dependency
management.

The following setup is a suggestion using pyenv to manage both python version,
and python virtual environment.

```shell

# install current project python version
pyenv install $(cat .python-version)
# confirm your pyenv is using this python version
pyenv which python
pyenv which pip

# create a virtualenv
pyenv virtualenv $(cat .python-version) airflowsparkk8sbuilder

# activate the local virtualenv
pyenv activate airflowsparkk8sbuilder

# make sure pip is up to date
pip install --upgrade pip

# install pip-tools for pip-compile and pip-sync features
pip install pip-tools

# install wheel
pip install wheel

# run pip-sync to install pip-compile generated requirements and dev requirements
pip-sync requirements.txt requirements-dev.txt

```


### Adding Dependencies
The `requirements.txt` and `requirements-dev.txt` files are generated using [pip-compile](https://github.com/jazzband/pip-tools) and should **not** be edited manually. To add new dependencies, simply add them to the respective `requirements.in` or `requirements-dev.in` files and update the `.txt` files by running:

```shell
pip-compile requirements.in --output-file requirements.txt
pip-compile requirements-dev.in --output-file requirements-dev.txt
```

To make sure your environment is up-to-date with the latest changes you added, run `pip-sync` command:
```shell
pip-sync requirements.txt requirements-dev.txt
```

*Note: The dev requirements are constrained by any dependencies in the requirements file.*

### Releasing

#### Releasing to test pypi

Update the library's version in `setup.py`. This should build your app in `./dist` directory.

Then:
```shell
# activate venv
pyenv activate airflowsparkk8sbuilder
# clean up previous builds
python setup.py clean --all && rm -rf dist && rm -rf airflow_spark_on_k8s_job_builder.egg-info
# build a package
python -m build
# upload to test pypi
twine upload --repository testpypi dist/*
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/yourusername/my_library",
    "name": "airflow-spark-on-k8s-job-builder",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.6",
    "maintainer_email": null,
    "keywords": null,
    "author": "Stroeer Labs Data Engineering",
    "author_email": "diogo.aurelio@stroeer.de",
    "download_url": "https://files.pythonhosted.org/packages/40/4d/b6ca6a18e6949ca9eb0485a8e1f044c35bcdbdec6112f50c53773e62e421/airflow_spark_on_k8s_job_builder-0.4.4.tar.gz",
    "platform": null,
    "description": "\n# Airflow Spark-on-k8s Job Builder\n\nTrying to avoid excessive boilerplate code duplication for building a DAG that will submit a Spark job to Spark-Operator in a Kubernetes cluster.\n\n## Using this library\n\nThis library was only tested together with Airflow deployments versions `2.7.2` and `2.10.3`, and on AWS EKS. \n\n### airflow version 2.7.2\nIn version `2.7.2` we had to use the SparkSensor in order to follow progress and infer when a spark job was successfully completed. For that we pass `use_sensor=True` to the `SparkK8sJobBuilder` constructor.\n\nHere is an example of how to use this library:\n```python\n\nfrom airflow import DAG\nfrom datetime import datetime, timedelta\n\nfrom airflow_spark_on_k8s_job_builder.core import SparkK8sJobBuilder\n\ndefault_args = {\n    \"owner\": \"data-engineering-team\",\n    \"retries\": 3,\n    \"email_on_retry\": False,\n    \"retry_delay\": timedelta(minutes=2),\n}\n\nDAG_ID = \"spark_pi\"\nTASK_ID = \"spark-pi-submit\"\nTASK_NAMESPACE = \"airflow\"\n\nbuilder = SparkK8sJobBuilder(\n    task_id=TASK_ID,\n    job_name=\"test-spark-on-k8s\",\n    main_class=\"org.apache.spark.examples.SparkPi\",\n    main_application_file=\"local:///opt/spark/examples/jars/spark-examples_2.12-3.4.1.jar\",\n    service_account=TASK_NAMESPACE,\n    namespace=TASK_NAMESPACE,\n    docker_img=\"apache/spark\",\n    docker_img_tag=\"3.4.1-scala2.12-java11-ubuntu\",\n    use_sensor=True,\n)\n\ndag_params = builder.build_dag_params(\n    extra_params={\n        \"env\": \"dev\",\n    },\n)\n\nwith (DAG(\n        DAG_ID,\n        default_args=default_args,\n        description=\"An example spark k8s task to exemplify how one can build \"\n                    \"spark airflow coordinates apps on top of k8s\",\n        params=dag_params,\n        schedule_interval=timedelta(days=1),\n        catchup=False,\n        doc_md=__doc__,\n        start_date=datetime(2024, 11, 21),\n        dagrun_timeout=timedelta(hours=3),\n        max_active_runs=1,\n        template_searchpath=[\n            # we need this to instruct airflow where it can find the spark yaml file\n            \"/home/airflow/.local/lib/python3.9/site-packages/airflow_spark_on_k8s_job_builder/\",\n            \"/opt/airflow/dags/\",\n        ],\n        tags=[\"tutorials\", \"hello-world\"],\n) as dag):\n    spark_tasks = builder \\\n        .set_dag(dag) \\\n        .set_executor_cores(2) \\\n        .set_executor_instances(2) \\\n        .set_executor_memory(\"4g\") \\\n        .build()\n    spark_tasks[0] >> spark_tasks[1]\n```\n\n\n### airflow version 2.10.3\nIn version `2.10.3` the sensor was not needed anymore, but we had to [implement a workaround](https://github.com/apache/airflow/issues/39184) for `SparkKubernetesOperator` to be able to understand that spark job was successfully completed.\nTo use that fix, you can either call `.setup_xcom_sidecar_container()` method on the builder object, or, alternatively, instantiate the builder and pass the option `update_xcom_sidecar_container=True` (directly in `SparkK8sJobBuilder` constructor).\n\nHere is an example of how to use this library:\n\n```python\n\nfrom datetime import datetime, timedelta\n\nfrom airflow import DAG\n\nfrom airflow_spark_on_k8s_job_builder.core import SparkK8sJobBuilder\n\ndefault_args = {\n    \"owner\": \"data-engineering-team\",\n    \"retries\": 3,\n    \"email_on_retry\": False,\n    \"retry_delay\": timedelta(minutes=2),\n}\n\nDAG_ID = \"spark_pi\"\nTASK_ID = \"spark-pi-submit\"\nTASK_NAMESPACE = \"airflow\"\n\nbuilder = SparkK8sJobBuilder(\n    task_id=TASK_ID,\n    job_name=\"test-spark-on-k8s\",\n    main_class=\"org.apache.spark.examples.SparkPi\",\n    main_application_file=\"local:///opt/spark/examples/jars/spark-examples_2.12-3.4.1.jar\",\n    service_account=TASK_NAMESPACE,\n    namespace=TASK_NAMESPACE,\n    docker_img=\"apache/spark\",\n    docker_img_tag=\"3.4.1-scala2.12-java11-ubuntu\",\n    use_sensor=False,\n)\n\ndag_params = builder.build_dag_params(\n    extra_params={\n        \"env\": \"dev\",\n    },\n)\n\nwith (DAG(\n        DAG_ID,\n        default_args=default_args,\n        description=\"An example spark k8s task to exemplify how one can build \"\n                    \"spark airflow coordinates apps on top of k8s\",\n        params=dag_params,\n        schedule_interval=timedelta(days=1),\n        catchup=False,\n        doc_md=__doc__,\n        start_date=datetime(2024, 11, 21),\n        dagrun_timeout=timedelta(hours=3),\n        max_active_runs=1,\n        template_searchpath=[\n            # we need this to instruct airflow where it can find the spark yaml file\n            \"/home/airflow/.local/lib/python3.9/site-packages/airflow_spark_on_k8s_job_builder/\",\n            \"/opt/airflow/dags/\",\n        ],\n        tags=[\"tutorials\", \"hello-world\"],\n) as dag):\n    builder \\\n        .set_dag(dag) \\\n        .set_executor_cores(2) \\\n        .set_executor_instances(2) \\\n        .set_executor_memory(\"4g\") \\\n        .setup_xcom_sidecar_container() \\\n        .build()\n```\nNote that the library also contains the yaml template for the spark job, which is used by the `SparkK8sJobBuilder` to build the DAG. So depending on where you install it, you'll need to reference that same path in the `template_searchpath` option.\n\n\n## Development\n\nThis project uses python 3.9 for development, and pip-compile for dependency\nmanagement.\n\nThe following setup is a suggestion using pyenv to manage both python version,\nand python virtual environment.\n\n```shell\n\n# install current project python version\npyenv install $(cat .python-version)\n# confirm your pyenv is using this python version\npyenv which python\npyenv which pip\n\n# create a virtualenv\npyenv virtualenv $(cat .python-version) airflowsparkk8sbuilder\n\n# activate the local virtualenv\npyenv activate airflowsparkk8sbuilder\n\n# make sure pip is up to date\npip install --upgrade pip\n\n# install pip-tools for pip-compile and pip-sync features\npip install pip-tools\n\n# install wheel\npip install wheel\n\n# run pip-sync to install pip-compile generated requirements and dev requirements\npip-sync requirements.txt requirements-dev.txt\n\n```\n\n\n### Adding Dependencies\nThe `requirements.txt` and `requirements-dev.txt` files are generated using [pip-compile](https://github.com/jazzband/pip-tools) and should **not** be edited manually. To add new dependencies, simply add them to the respective `requirements.in` or `requirements-dev.in` files and update the `.txt` files by running:\n\n```shell\npip-compile requirements.in --output-file requirements.txt\npip-compile requirements-dev.in --output-file requirements-dev.txt\n```\n\nTo make sure your environment is up-to-date with the latest changes you added, run `pip-sync` command:\n```shell\npip-sync requirements.txt requirements-dev.txt\n```\n\n*Note: The dev requirements are constrained by any dependencies in the requirements file.*\n\n### Releasing\n\n#### Releasing to test pypi\n\nUpdate the library's version in `setup.py`. This should build your app in `./dist` directory.\n\nThen:\n```shell\n# activate venv\npyenv activate airflowsparkk8sbuilder\n# clean up previous builds\npython setup.py clean --all && rm -rf dist && rm -rf airflow_spark_on_k8s_job_builder.egg-info\n# build a package\npython -m build\n# upload to test pypi\ntwine upload --repository testpypi dist/*\n```\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "A library to limit the amount of unnecessary boilerplate code required when launching Spark Jobs against k8s Spark-Operator in Airflow",
    "version": "0.4.4",
    "project_urls": {
        "Homepage": "https://github.com/yourusername/my_library"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "3c2a0a66cd6b0318f2a638cc0b71f64ce29c75708154e1c9b7d0df44ca33dd81",
                "md5": "6a48305bf582415f6c9ec62760d60e2b",
                "sha256": "4c9969cc355336ffa15d9edbdc0f712fd7cb314cc2e52b00a9c894a5427026b1"
            },
            "downloads": -1,
            "filename": "airflow_spark_on_k8s_job_builder-0.4.4-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "6a48305bf582415f6c9ec62760d60e2b",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.6",
            "size": 14016,
            "upload_time": "2025-07-17T13:35:37",
            "upload_time_iso_8601": "2025-07-17T13:35:37.605539Z",
            "url": "https://files.pythonhosted.org/packages/3c/2a/0a66cd6b0318f2a638cc0b71f64ce29c75708154e1c9b7d0df44ca33dd81/airflow_spark_on_k8s_job_builder-0.4.4-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "404db6ca6a18e6949ca9eb0485a8e1f044c35bcdbdec6112f50c53773e62e421",
                "md5": "cb81d8c94b64608273929baa4241eb90",
                "sha256": "df455e4ec1ede5b335f045a2ca15832b388d76409f182df57393b76ed08a4284"
            },
            "downloads": -1,
            "filename": "airflow_spark_on_k8s_job_builder-0.4.4.tar.gz",
            "has_sig": false,
            "md5_digest": "cb81d8c94b64608273929baa4241eb90",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.6",
            "size": 24151,
            "upload_time": "2025-07-17T13:35:39",
            "upload_time_iso_8601": "2025-07-17T13:35:39.168486Z",
            "url": "https://files.pythonhosted.org/packages/40/4d/b6ca6a18e6949ca9eb0485a8e1f044c35bcdbdec6112f50c53773e62e421/airflow_spark_on_k8s_job_builder-0.4.4.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-17 13:35:39",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "yourusername",
    "github_project": "my_library",
    "github_not_found": true,
    "lcname": "airflow-spark-on-k8s-job-builder"
}
        
Elapsed time: 0.45511s