# Airflow Spark-on-k8s Job Builder
Trying to avoid excessive boilerplate code duplication for building a DAG that will submit a Spark job to Spark-Operator in a Kubernetes cluster.
## Using this library
This library was only tested together with Airflow deployments versions `2.7.2` and `2.10.3`, and on AWS EKS.
### airflow version 2.7.2
In version `2.7.2` we had to use the SparkSensor in order to follow progress and infer when a spark job was successfully completed. For that we pass `use_sensor=True` to the `SparkK8sJobBuilder` constructor.
Here is an example of how to use this library:
```python
from airflow import DAG
from datetime import datetime, timedelta
from airflow_spark_on_k8s_job_builder.core import SparkK8sJobBuilder
default_args = {
"owner": "data-engineering-team",
"retries": 3,
"email_on_retry": False,
"retry_delay": timedelta(minutes=2),
}
DAG_ID = "spark_pi"
TASK_ID = "spark-pi-submit"
TASK_NAMESPACE = "airflow"
builder = SparkK8sJobBuilder(
task_id=TASK_ID,
job_name="test-spark-on-k8s",
main_class="org.apache.spark.examples.SparkPi",
main_application_file="local:///opt/spark/examples/jars/spark-examples_2.12-3.4.1.jar",
service_account=TASK_NAMESPACE,
namespace=TASK_NAMESPACE,
docker_img="apache/spark",
docker_img_tag="3.4.1-scala2.12-java11-ubuntu",
use_sensor=True,
)
dag_params = builder.build_dag_params(
extra_params={
"env": "dev",
},
)
with (DAG(
DAG_ID,
default_args=default_args,
description="An example spark k8s task to exemplify how one can build "
"spark airflow coordinates apps on top of k8s",
params=dag_params,
schedule_interval=timedelta(days=1),
catchup=False,
doc_md=__doc__,
start_date=datetime(2024, 11, 21),
dagrun_timeout=timedelta(hours=3),
max_active_runs=1,
template_searchpath=[
# we need this to instruct airflow where it can find the spark yaml file
"/home/airflow/.local/lib/python3.9/site-packages/airflow_spark_on_k8s_job_builder/",
"/opt/airflow/dags/",
],
tags=["tutorials", "hello-world"],
) as dag):
spark_tasks = builder \
.set_dag(dag) \
.set_executor_cores(2) \
.set_executor_instances(2) \
.set_executor_memory("4g") \
.build()
spark_tasks[0] >> spark_tasks[1]
```
### airflow version 2.10.3
In version `2.10.3` the sensor was not needed anymore, but we had to [implement a workaround](https://github.com/apache/airflow/issues/39184) for `SparkKubernetesOperator` to be able to understand that spark job was successfully completed.
To use that fix, you can either call `.setup_xcom_sidecar_container()` method on the builder object, or, alternatively, instantiate the builder and pass the option `update_xcom_sidecar_container=True` (directly in `SparkK8sJobBuilder` constructor).
Here is an example of how to use this library:
```python
from datetime import datetime, timedelta
from airflow import DAG
from airflow_spark_on_k8s_job_builder.core import SparkK8sJobBuilder
default_args = {
"owner": "data-engineering-team",
"retries": 3,
"email_on_retry": False,
"retry_delay": timedelta(minutes=2),
}
DAG_ID = "spark_pi"
TASK_ID = "spark-pi-submit"
TASK_NAMESPACE = "airflow"
builder = SparkK8sJobBuilder(
task_id=TASK_ID,
job_name="test-spark-on-k8s",
main_class="org.apache.spark.examples.SparkPi",
main_application_file="local:///opt/spark/examples/jars/spark-examples_2.12-3.4.1.jar",
service_account=TASK_NAMESPACE,
namespace=TASK_NAMESPACE,
docker_img="apache/spark",
docker_img_tag="3.4.1-scala2.12-java11-ubuntu",
use_sensor=False,
)
dag_params = builder.build_dag_params(
extra_params={
"env": "dev",
},
)
with (DAG(
DAG_ID,
default_args=default_args,
description="An example spark k8s task to exemplify how one can build "
"spark airflow coordinates apps on top of k8s",
params=dag_params,
schedule_interval=timedelta(days=1),
catchup=False,
doc_md=__doc__,
start_date=datetime(2024, 11, 21),
dagrun_timeout=timedelta(hours=3),
max_active_runs=1,
template_searchpath=[
# we need this to instruct airflow where it can find the spark yaml file
"/home/airflow/.local/lib/python3.9/site-packages/airflow_spark_on_k8s_job_builder/",
"/opt/airflow/dags/",
],
tags=["tutorials", "hello-world"],
) as dag):
builder \
.set_dag(dag) \
.set_executor_cores(2) \
.set_executor_instances(2) \
.set_executor_memory("4g") \
.setup_xcom_sidecar_container() \
.build()
```
Note that the library also contains the yaml template for the spark job, which is used by the `SparkK8sJobBuilder` to build the DAG. So depending on where you install it, you'll need to reference that same path in the `template_searchpath` option.
## Development
This project uses python 3.9 for development, and pip-compile for dependency
management.
The following setup is a suggestion using pyenv to manage both python version,
and python virtual environment.
```shell
# install current project python version
pyenv install $(cat .python-version)
# confirm your pyenv is using this python version
pyenv which python
pyenv which pip
# create a virtualenv
pyenv virtualenv $(cat .python-version) airflowsparkk8sbuilder
# activate the local virtualenv
pyenv activate airflowsparkk8sbuilder
# make sure pip is up to date
pip install --upgrade pip
# install pip-tools for pip-compile and pip-sync features
pip install pip-tools
# install wheel
pip install wheel
# run pip-sync to install pip-compile generated requirements and dev requirements
pip-sync requirements.txt requirements-dev.txt
```
### Adding Dependencies
The `requirements.txt` and `requirements-dev.txt` files are generated using [pip-compile](https://github.com/jazzband/pip-tools) and should **not** be edited manually. To add new dependencies, simply add them to the respective `requirements.in` or `requirements-dev.in` files and update the `.txt` files by running:
```shell
pip-compile requirements.in --output-file requirements.txt
pip-compile requirements-dev.in --output-file requirements-dev.txt
```
To make sure your environment is up-to-date with the latest changes you added, run `pip-sync` command:
```shell
pip-sync requirements.txt requirements-dev.txt
```
*Note: The dev requirements are constrained by any dependencies in the requirements file.*
### Releasing
#### Releasing to test pypi
Update the library's version in `setup.py`. This should build your app in `./dist` directory.
Then:
```shell
# activate venv
pyenv activate airflowsparkk8sbuilder
# clean up previous builds
python setup.py clean --all && rm -rf dist && rm -rf airflow_spark_on_k8s_job_builder.egg-info
# build a package
python -m build
# upload to test pypi
twine upload --repository testpypi dist/*
```
Raw data
{
"_id": null,
"home_page": "https://github.com/yourusername/my_library",
"name": "airflow-spark-on-k8s-job-builder",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.6",
"maintainer_email": null,
"keywords": null,
"author": "Stroeer Labs Data Engineering",
"author_email": "diogo.aurelio@stroeer.de",
"download_url": "https://files.pythonhosted.org/packages/40/4d/b6ca6a18e6949ca9eb0485a8e1f044c35bcdbdec6112f50c53773e62e421/airflow_spark_on_k8s_job_builder-0.4.4.tar.gz",
"platform": null,
"description": "\n# Airflow Spark-on-k8s Job Builder\n\nTrying to avoid excessive boilerplate code duplication for building a DAG that will submit a Spark job to Spark-Operator in a Kubernetes cluster.\n\n## Using this library\n\nThis library was only tested together with Airflow deployments versions `2.7.2` and `2.10.3`, and on AWS EKS. \n\n### airflow version 2.7.2\nIn version `2.7.2` we had to use the SparkSensor in order to follow progress and infer when a spark job was successfully completed. For that we pass `use_sensor=True` to the `SparkK8sJobBuilder` constructor.\n\nHere is an example of how to use this library:\n```python\n\nfrom airflow import DAG\nfrom datetime import datetime, timedelta\n\nfrom airflow_spark_on_k8s_job_builder.core import SparkK8sJobBuilder\n\ndefault_args = {\n \"owner\": \"data-engineering-team\",\n \"retries\": 3,\n \"email_on_retry\": False,\n \"retry_delay\": timedelta(minutes=2),\n}\n\nDAG_ID = \"spark_pi\"\nTASK_ID = \"spark-pi-submit\"\nTASK_NAMESPACE = \"airflow\"\n\nbuilder = SparkK8sJobBuilder(\n task_id=TASK_ID,\n job_name=\"test-spark-on-k8s\",\n main_class=\"org.apache.spark.examples.SparkPi\",\n main_application_file=\"local:///opt/spark/examples/jars/spark-examples_2.12-3.4.1.jar\",\n service_account=TASK_NAMESPACE,\n namespace=TASK_NAMESPACE,\n docker_img=\"apache/spark\",\n docker_img_tag=\"3.4.1-scala2.12-java11-ubuntu\",\n use_sensor=True,\n)\n\ndag_params = builder.build_dag_params(\n extra_params={\n \"env\": \"dev\",\n },\n)\n\nwith (DAG(\n DAG_ID,\n default_args=default_args,\n description=\"An example spark k8s task to exemplify how one can build \"\n \"spark airflow coordinates apps on top of k8s\",\n params=dag_params,\n schedule_interval=timedelta(days=1),\n catchup=False,\n doc_md=__doc__,\n start_date=datetime(2024, 11, 21),\n dagrun_timeout=timedelta(hours=3),\n max_active_runs=1,\n template_searchpath=[\n # we need this to instruct airflow where it can find the spark yaml file\n \"/home/airflow/.local/lib/python3.9/site-packages/airflow_spark_on_k8s_job_builder/\",\n \"/opt/airflow/dags/\",\n ],\n tags=[\"tutorials\", \"hello-world\"],\n) as dag):\n spark_tasks = builder \\\n .set_dag(dag) \\\n .set_executor_cores(2) \\\n .set_executor_instances(2) \\\n .set_executor_memory(\"4g\") \\\n .build()\n spark_tasks[0] >> spark_tasks[1]\n```\n\n\n### airflow version 2.10.3\nIn version `2.10.3` the sensor was not needed anymore, but we had to [implement a workaround](https://github.com/apache/airflow/issues/39184) for `SparkKubernetesOperator` to be able to understand that spark job was successfully completed.\nTo use that fix, you can either call `.setup_xcom_sidecar_container()` method on the builder object, or, alternatively, instantiate the builder and pass the option `update_xcom_sidecar_container=True` (directly in `SparkK8sJobBuilder` constructor).\n\nHere is an example of how to use this library:\n\n```python\n\nfrom datetime import datetime, timedelta\n\nfrom airflow import DAG\n\nfrom airflow_spark_on_k8s_job_builder.core import SparkK8sJobBuilder\n\ndefault_args = {\n \"owner\": \"data-engineering-team\",\n \"retries\": 3,\n \"email_on_retry\": False,\n \"retry_delay\": timedelta(minutes=2),\n}\n\nDAG_ID = \"spark_pi\"\nTASK_ID = \"spark-pi-submit\"\nTASK_NAMESPACE = \"airflow\"\n\nbuilder = SparkK8sJobBuilder(\n task_id=TASK_ID,\n job_name=\"test-spark-on-k8s\",\n main_class=\"org.apache.spark.examples.SparkPi\",\n main_application_file=\"local:///opt/spark/examples/jars/spark-examples_2.12-3.4.1.jar\",\n service_account=TASK_NAMESPACE,\n namespace=TASK_NAMESPACE,\n docker_img=\"apache/spark\",\n docker_img_tag=\"3.4.1-scala2.12-java11-ubuntu\",\n use_sensor=False,\n)\n\ndag_params = builder.build_dag_params(\n extra_params={\n \"env\": \"dev\",\n },\n)\n\nwith (DAG(\n DAG_ID,\n default_args=default_args,\n description=\"An example spark k8s task to exemplify how one can build \"\n \"spark airflow coordinates apps on top of k8s\",\n params=dag_params,\n schedule_interval=timedelta(days=1),\n catchup=False,\n doc_md=__doc__,\n start_date=datetime(2024, 11, 21),\n dagrun_timeout=timedelta(hours=3),\n max_active_runs=1,\n template_searchpath=[\n # we need this to instruct airflow where it can find the spark yaml file\n \"/home/airflow/.local/lib/python3.9/site-packages/airflow_spark_on_k8s_job_builder/\",\n \"/opt/airflow/dags/\",\n ],\n tags=[\"tutorials\", \"hello-world\"],\n) as dag):\n builder \\\n .set_dag(dag) \\\n .set_executor_cores(2) \\\n .set_executor_instances(2) \\\n .set_executor_memory(\"4g\") \\\n .setup_xcom_sidecar_container() \\\n .build()\n```\nNote that the library also contains the yaml template for the spark job, which is used by the `SparkK8sJobBuilder` to build the DAG. So depending on where you install it, you'll need to reference that same path in the `template_searchpath` option.\n\n\n## Development\n\nThis project uses python 3.9 for development, and pip-compile for dependency\nmanagement.\n\nThe following setup is a suggestion using pyenv to manage both python version,\nand python virtual environment.\n\n```shell\n\n# install current project python version\npyenv install $(cat .python-version)\n# confirm your pyenv is using this python version\npyenv which python\npyenv which pip\n\n# create a virtualenv\npyenv virtualenv $(cat .python-version) airflowsparkk8sbuilder\n\n# activate the local virtualenv\npyenv activate airflowsparkk8sbuilder\n\n# make sure pip is up to date\npip install --upgrade pip\n\n# install pip-tools for pip-compile and pip-sync features\npip install pip-tools\n\n# install wheel\npip install wheel\n\n# run pip-sync to install pip-compile generated requirements and dev requirements\npip-sync requirements.txt requirements-dev.txt\n\n```\n\n\n### Adding Dependencies\nThe `requirements.txt` and `requirements-dev.txt` files are generated using [pip-compile](https://github.com/jazzband/pip-tools) and should **not** be edited manually. To add new dependencies, simply add them to the respective `requirements.in` or `requirements-dev.in` files and update the `.txt` files by running:\n\n```shell\npip-compile requirements.in --output-file requirements.txt\npip-compile requirements-dev.in --output-file requirements-dev.txt\n```\n\nTo make sure your environment is up-to-date with the latest changes you added, run `pip-sync` command:\n```shell\npip-sync requirements.txt requirements-dev.txt\n```\n\n*Note: The dev requirements are constrained by any dependencies in the requirements file.*\n\n### Releasing\n\n#### Releasing to test pypi\n\nUpdate the library's version in `setup.py`. This should build your app in `./dist` directory.\n\nThen:\n```shell\n# activate venv\npyenv activate airflowsparkk8sbuilder\n# clean up previous builds\npython setup.py clean --all && rm -rf dist && rm -rf airflow_spark_on_k8s_job_builder.egg-info\n# build a package\npython -m build\n# upload to test pypi\ntwine upload --repository testpypi dist/*\n```\n",
"bugtrack_url": null,
"license": null,
"summary": "A library to limit the amount of unnecessary boilerplate code required when launching Spark Jobs against k8s Spark-Operator in Airflow",
"version": "0.4.4",
"project_urls": {
"Homepage": "https://github.com/yourusername/my_library"
},
"split_keywords": [],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "3c2a0a66cd6b0318f2a638cc0b71f64ce29c75708154e1c9b7d0df44ca33dd81",
"md5": "6a48305bf582415f6c9ec62760d60e2b",
"sha256": "4c9969cc355336ffa15d9edbdc0f712fd7cb314cc2e52b00a9c894a5427026b1"
},
"downloads": -1,
"filename": "airflow_spark_on_k8s_job_builder-0.4.4-py3-none-any.whl",
"has_sig": false,
"md5_digest": "6a48305bf582415f6c9ec62760d60e2b",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.6",
"size": 14016,
"upload_time": "2025-07-17T13:35:37",
"upload_time_iso_8601": "2025-07-17T13:35:37.605539Z",
"url": "https://files.pythonhosted.org/packages/3c/2a/0a66cd6b0318f2a638cc0b71f64ce29c75708154e1c9b7d0df44ca33dd81/airflow_spark_on_k8s_job_builder-0.4.4-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": null,
"digests": {
"blake2b_256": "404db6ca6a18e6949ca9eb0485a8e1f044c35bcdbdec6112f50c53773e62e421",
"md5": "cb81d8c94b64608273929baa4241eb90",
"sha256": "df455e4ec1ede5b335f045a2ca15832b388d76409f182df57393b76ed08a4284"
},
"downloads": -1,
"filename": "airflow_spark_on_k8s_job_builder-0.4.4.tar.gz",
"has_sig": false,
"md5_digest": "cb81d8c94b64608273929baa4241eb90",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.6",
"size": 24151,
"upload_time": "2025-07-17T13:35:39",
"upload_time_iso_8601": "2025-07-17T13:35:39.168486Z",
"url": "https://files.pythonhosted.org/packages/40/4d/b6ca6a18e6949ca9eb0485a8e1f044c35bcdbdec6112f50c53773e62e421/airflow_spark_on_k8s_job_builder-0.4.4.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-07-17 13:35:39",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "yourusername",
"github_project": "my_library",
"github_not_found": true,
"lcname": "airflow-spark-on-k8s-job-builder"
}