# Introduction
[](https://github.com/psf/black)
Utility functions and classes for working with Dataframes, provisioning SparkSession and much more.
Core features:
- Provisioning Spark session with some routine settings set in advance, including Delta Lake configuration. You must
have delta-core jars in class path for this to work.
- Spark job argument wrappers, allowing to specify job inputs for `spark.read.format(...).options(...).load(...)` and
outputs for `spark.write.format(...).save(...)` in a generic way. Those are exposed as `source` and `target` built-in
arguments (see example below).
Consider a simple Spark Job that reads `json` data from `source` and stores it as `parquet` in `target`. This job can be
defined using `spark-utils` as below:
```python
from spark_utils.common.spark_job_args import SparkJobArgs
from spark_utils.common.spark_session_provider import SparkSessionProvider
def main(args=None):
"""
Job entrypoint
:param args:
:return:
"""
spark_args = SparkJobArgs().parse(args)
source_table = spark_args.source('json_source')
target_table = spark_args.output('parquet_target')
# Spark session and hadoop FS
spark_session = SparkSessionProvider().get_session()
df = spark_session.read.format(source_table.data_format).load(source_table.data_path)
df.write.format(target_table.data_format).save(target_table.data_path)
```
You can also provision Spark Session using Kubernetes API server as a resource manager. Use Java options from the
example below for Java 17 installations:
```python
from spark_utils.common.spark_session_provider import SparkSessionProvider
from spark_utils.models.k8s_config import SparkKubernetesConfig
config = {
'spark.local.dir': '/tmp',
'spark.driver.extraJavaOptions': "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:InitiatingHeapOccupancyPercent=35 -XX:OnOutOfMemoryError='kill -9 %p' -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.util.stream=ALL-UNNAMED",
'spark.executor.extraJavaOptions': "-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:InitiatingHeapOccupancyPercent=35 -XX:OnOutOfMemoryError='kill -9 %p' -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.util.stream=ALL-UNNAMED",
'spark.executor.instances': '5'
}
spc = SparkKubernetesConfig(
application_name='test',
k8s_namespace='my-spark-namespace',
spark_image='myregistry.io/spark:v3.3.1',
executor_node_affinity={
'kubernetes.mycompany.com/sparknodetype': 'worker',
'kubernetes.azure.com/scalesetpriority': 'spot'
},
executor_name_prefix='spark-k8s-test'
)
ssp = SparkSessionProvider(additional_configs=config).configure_for_k8s(
master_url='https://my-k8s-cluster.mydomain.io',
spark_config=spc
)
spark_session = ssp.get_session()
```
Now we can call this job directly or with `spark-submit`. Note that you must have `spark-utils` in PYTHONPATH before
running the script:
```commandline
spark-submit --master local[*] --deploy-mode client --name simpleJob ~/path/to/main.py --source 'json_source|file://tmp/test_json/*|json' --output 'parquet_target|file://tmp/test_parquet/*|parquet'
```
- Job argument encryption is supported. This functionality requires an encryption key to be present in a cluster
environment variable `RUNTIME_ENCRYPTION_KEY`. The only supported algorithm now is `fernet`. You can declare an
argument as encrypted using `new_encrypted_arg` function. You then must pass an encrypted value to the declared
argument, which will be decrypted by `spark-utils` when a job is executed and passed to the consumer.
For example, you can pass sensitive spark configuration (storage access keys, hive database passwords etc.) encrypted:
```python
import json
from spark_utils.common.spark_job_args import SparkJobArgs
from spark_utils.common.spark_session_provider import SparkSessionProvider
def main(args=None):
spark_args = SparkJobArgs()
.new_encrypted_arg("--custom-config", type=str, default=None,
help="Optional spark configuration flags to pass. Will be treated as an encrypted value.")
.parse(args)
spark_session = SparkSessionProvider(
additional_configs=json.loads(
spark_args.parsed_args.custom_config) if spark_args.parsed_args.custom_config else None).get_session()
...
```
- Delta Lake utilities
- Table publishing to Hive Metastore.
- Delta OSS compaction with row count / file optimization target.
- Models for common data operations like data copying etc. Note that actual code for those operations will be migrated
to this repo a bit later.
- Utility functions for common data operations, for example, flattening parent-child hierarchy, view concatenation,
column name clear etc.
There are so many possibilities with this project - please feel free to open an issue / PR adding new capabilities or
fixing those nasty bugs!
# Getting Started
Spark Utils must be installed on your cluster or virtual env that Spark is using Python interpreter from:
```commandline
pip install spark-utils
```
# Build and Test
Test pipeline runs Spark in local mode, so everything can be tested against our current runtime. Update the image used
in `build.yaml` if you require a test against a different runtime version.
Raw data
{
"_id": null,
"home_page": "https://github.com/SneaksAndData/spark-utils",
"name": "spark-utils",
"maintainer": "GZU",
"docs_url": null,
"requires_python": "<3.13,>=3.11",
"maintainer_email": "gzu@ecco.com",
"keywords": null,
"author": "ECCO Sneaks & Data",
"author_email": "esdsupport@ecco.com",
"download_url": "https://files.pythonhosted.org/packages/e1/6e/b6d95618b9fc16ba99d4eccefdbfa2adabe658c66b703a58c09abb1472f2/spark_utils-1.3.2.tar.gz",
"platform": null,
"description": "# Introduction\n[](https://github.com/psf/black)\n\nUtility functions and classes for working with Dataframes, provisioning SparkSession and much more.\n\nCore features:\n\n- Provisioning Spark session with some routine settings set in advance, including Delta Lake configuration. You must\n have delta-core jars in class path for this to work.\n- Spark job argument wrappers, allowing to specify job inputs for `spark.read.format(...).options(...).load(...)` and\n outputs for `spark.write.format(...).save(...)` in a generic way. Those are exposed as `source` and `target` built-in\n arguments (see example below).\n\nConsider a simple Spark Job that reads `json` data from `source` and stores it as `parquet` in `target`. This job can be\ndefined using `spark-utils` as below:\n\n```python\nfrom spark_utils.common.spark_job_args import SparkJobArgs\nfrom spark_utils.common.spark_session_provider import SparkSessionProvider\n\n\ndef main(args=None):\n \"\"\"\n Job entrypoint\n :param args:\n :return:\n \"\"\"\n spark_args = SparkJobArgs().parse(args)\n\n source_table = spark_args.source('json_source')\n target_table = spark_args.output('parquet_target')\n\n # Spark session and hadoop FS\n spark_session = SparkSessionProvider().get_session()\n df = spark_session.read.format(source_table.data_format).load(source_table.data_path)\n df.write.format(target_table.data_format).save(target_table.data_path)\n```\n\nYou can also provision Spark Session using Kubernetes API server as a resource manager. Use Java options from the\nexample below for Java 17 installations:\n\n```python\nfrom spark_utils.common.spark_session_provider import SparkSessionProvider\nfrom spark_utils.models.k8s_config import SparkKubernetesConfig\n\nconfig = {\n 'spark.local.dir': '/tmp',\n 'spark.driver.extraJavaOptions': \"-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:InitiatingHeapOccupancyPercent=35 -XX:OnOutOfMemoryError='kill -9 %p' -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.util.stream=ALL-UNNAMED\",\n 'spark.executor.extraJavaOptions': \"-XX:+UseG1GC -XX:+UnlockDiagnosticVMOptions -XX:InitiatingHeapOccupancyPercent=35 -XX:OnOutOfMemoryError='kill -9 %p' -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED --add-opens=java.base/java.util.stream=ALL-UNNAMED\",\n 'spark.executor.instances': '5'\n}\n\nspc = SparkKubernetesConfig(\n application_name='test',\n k8s_namespace='my-spark-namespace',\n spark_image='myregistry.io/spark:v3.3.1',\n executor_node_affinity={\n 'kubernetes.mycompany.com/sparknodetype': 'worker', \n 'kubernetes.azure.com/scalesetpriority': 'spot'\n },\n executor_name_prefix='spark-k8s-test'\n)\nssp = SparkSessionProvider(additional_configs=config).configure_for_k8s(\n master_url='https://my-k8s-cluster.mydomain.io',\n spark_config=spc\n)\n\nspark_session = ssp.get_session()\n```\n\nNow we can call this job directly or with `spark-submit`. Note that you must have `spark-utils` in PYTHONPATH before\nrunning the script:\n\n```commandline\nspark-submit --master local[*] --deploy-mode client --name simpleJob ~/path/to/main.py --source 'json_source|file://tmp/test_json/*|json' --output 'parquet_target|file://tmp/test_parquet/*|parquet'\n```\n\n- Job argument encryption is supported. This functionality requires an encryption key to be present in a cluster\n environment variable `RUNTIME_ENCRYPTION_KEY`. The only supported algorithm now is `fernet`. You can declare an\n argument as encrypted using `new_encrypted_arg` function. You then must pass an encrypted value to the declared\n argument, which will be decrypted by `spark-utils` when a job is executed and passed to the consumer.\n\nFor example, you can pass sensitive spark configuration (storage access keys, hive database passwords etc.) encrypted:\n\n```python\nimport json\n\nfrom spark_utils.common.spark_job_args import SparkJobArgs\nfrom spark_utils.common.spark_session_provider import SparkSessionProvider\n\n\ndef main(args=None):\n spark_args = SparkJobArgs()\n .new_encrypted_arg(\"--custom-config\", type=str, default=None,\n help=\"Optional spark configuration flags to pass. Will be treated as an encrypted value.\")\n .parse(args)\n\n spark_session = SparkSessionProvider(\n additional_configs=json.loads(\n spark_args.parsed_args.custom_config) if spark_args.parsed_args.custom_config else None).get_session()\n\n ...\n```\n\n- Delta Lake utilities\n - Table publishing to Hive Metastore.\n - Delta OSS compaction with row count / file optimization target.\n- Models for common data operations like data copying etc. Note that actual code for those operations will be migrated\n to this repo a bit later.\n- Utility functions for common data operations, for example, flattening parent-child hierarchy, view concatenation,\n column name clear etc.\n\nThere are so many possibilities with this project - please feel free to open an issue / PR adding new capabilities or\nfixing those nasty bugs!\n\n# Getting Started\n\nSpark Utils must be installed on your cluster or virtual env that Spark is using Python interpreter from:\n\n```commandline\npip install spark-utils\n```\n\n# Build and Test\n\nTest pipeline runs Spark in local mode, so everything can be tested against our current runtime. Update the image used\nin `build.yaml` if you require a test against a different runtime version.\n",
"bugtrack_url": null,
"license": "MIT",
"summary": "Utility classes for comfy Spark job authoriing.",
"version": "1.3.2",
"project_urls": {
"Homepage": "https://github.com/SneaksAndData/spark-utils",
"Repository": "https://github.com/SneaksAndData/spark-utils"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "f06a9fce00f717ce6efd57cfa7cc8fae226b95941ed3dcda15f3f7513a701db0",
"md5": "6e47dd49321bf4aed423a9365212a24d",
"sha256": "ab6446a139e52b52f4f2353e1aa419ca27110a08e22e43e5012c65cdf7c4eceb"
},
"downloads": -1,
"filename": "spark_utils-1.3.2-py3-none-any.whl",
"has_sig": false,
"md5_digest": "6e47dd49321bf4aed423a9365212a24d",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<3.13,>=3.11",
"size": 33860,
"upload_time": "2025-02-17T14:41:51",
"upload_time_iso_8601": "2025-02-17T14:41:51.641167Z",
"url": "https://files.pythonhosted.org/packages/f0/6a/9fce00f717ce6efd57cfa7cc8fae226b95941ed3dcda15f3f7513a701db0/spark_utils-1.3.2-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "e16eb6d95618b9fc16ba99d4eccefdbfa2adabe658c66b703a58c09abb1472f2",
"md5": "9af27b96d6f191c2306c28bb40e6aa26",
"sha256": "1c2967996567f4bb7ce2566f724644de1d3f85ff55fd3bc5979ff6b0dd59bc96"
},
"downloads": -1,
"filename": "spark_utils-1.3.2.tar.gz",
"has_sig": false,
"md5_digest": "9af27b96d6f191c2306c28bb40e6aa26",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<3.13,>=3.11",
"size": 18827,
"upload_time": "2025-02-17T14:41:53",
"upload_time_iso_8601": "2025-02-17T14:41:53.516959Z",
"url": "https://files.pythonhosted.org/packages/e1/6e/b6d95618b9fc16ba99d4eccefdbfa2adabe658c66b703a58c09abb1472f2/spark_utils-1.3.2.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-02-17 14:41:53",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "SneaksAndData",
"github_project": "spark-utils",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "spark-utils"
}