# congruity
[![GitHub Actions Build](https://github.com/databricks/congruity/actions/workflows/main.yml/badge.svg)](https://github.com/databricks/congruity/actions/workflows/main.yml)
[![PyPI Downloads](https://static.pepy.tech/personalized-badge/spark-congruity?period=month&units=international_system&left_color=black&right_color=orange&left_text=PyPI%20downloads)](https://pypi.org/project/spark-congruity/)
In many ways, the migration from using classic Spark applications using the full
power and flexibility to be using only the Spark Connect compatible DataFrame API
can be challenging.
The goal of this library is to provide a compatibility layer that makes it easier to
adopt Spark Connect. The library is designed to be simply imported in your application
and will then monkey-patch the existing API to provide the legacy functionality.
## Non-Goals
This library is not intended to be a long-term solution. The goal is to provide a
compatibility layer that becomes obsolete over time. In addition, we do not aim to
provide compatibility for all methods and features but only a select subset. Lastly,
we do not aim to achieve the same performance as using some of the native RDD APIs.
## Usage
Spark JVM & Spark Connect compatibility library.
```shell
pip install spark-congruity
```
```python
import congruity
```
### Example
Here is code that works on Spark JVM:
```python
from pyspark.sql import SparkSession
spark = SparkSession.builder.remote("sc://localhost").getOrCreate()
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
spark.sparkContext.parallelize(data).toDF()
```
This code doesn't work with Spark Connect. The congruity library rearranges the code under the hood, so the old syntax
works on Spark Connect clusters as well:
```python
import congruity # noqa: F401
from pyspark.sql import SparkSession
spark = SparkSession.builder.remote("sc://localhost").getOrCreate()
data = [("Java", "20000"), ("Python", "100000"), ("Scala", "3000")]
spark.sparkContext.parallelize(data).toDF()
```
## Contributing
We very much welcome contributions to this project. The easiest way to start is to pick any of
the below RDD or SparkContext methods and implement the compatibility layer. Once you have done
that open a pull request and we will review it.
## What's supported?
### RDD
| RDD | API | Comment |
|-----------------------------------|--------------------|-------------------------------------------------------------------|
| aggregate | :white_check_mark: | |
| aggregateByKey | :x: | |
| barrier | :x: | |
| cache | :x: | |
| cartesian | :x: | |
| checkpoint | :x: | |
| cleanShuffleDependencies | :x: | |
| coalesce | :x: | |
| cogroup | :x: | |
| collect | :white_check_mark: | |
| collectAsMap | :x: | |
| collectWithJobGroup | :x: | |
| combineByKey | :x: | |
| count | :white_check_mark: | |
| countApprox | :x: | |
| countByKey | :x: | |
| countByValue | :x: | |
| distinct | :x: | |
| filter | :white_check_mark: | |
| first | :white_check_mark: | |
| flatMap | :x: | |
| fold | :white_check_mark: | First version |
| foreach | :x: | |
| foreachPartition | :x: | |
| fullOuterJoin | :x: | |
| getCheckpointFile | :x: | |
| getNumPartitions | :x: | |
| getResourceProfile | :x: | |
| getStorageLevel | :x: | |
| glom | :white_check_mark: | |
| groupBy | :white_check_mark: | |
| groupByKey | :white_check_mark: | |
| groupWith | :x: | |
| histogram | :white_check_mark: | |
| id | :x: | |
| intersection | :x: | |
| isCheckpointed | :x: | |
| isEmpty | :x: | |
| isLocallyCheckpointed | :x: | |
| join | :x: | |
| keyBy | :white_check_mark: | |
| keys | :white_check_mark: | |
| leftOuterJoin | :x: | |
| localCheckpoint | :x: | |
| lookup | :x: | |
| map | :white_check_mark: | |
| mapPartitions | :white_check_mark: | First version, based on mapInArrow. |
| mapPartitionsWithIndex | :x: | |
| mapPartitionsWithSplit | :x: | |
| mapValues | :white_check_mark: | |
| max | :white_check_mark: | |
| mean | :white_check_mark: | |
| meanApprox | :x: | |
| min | :white_check_mark: | |
| name | :x: | |
| partitionBy | :x: | |
| persist | :x: | |
| pipe | :x: | |
| randomSplit | :x: | |
| reduce | :white_check_mark: | |
| reduceByKey | :x: | |
| repartition | :x: | |
| repartitionAndSortWithinPartition | :x: | |
| rightOuterJoin | :x: | |
| sample | :x: | |
| sampleByKey | :x: | |
| sampleStdev | :white_check_mark: | |
| sampleVariance | :white_check_mark: | |
| saveAsHadoopDataset | :x: | |
| saveAsHadoopFile | :x: | |
| saveAsNewAPIHadoopDataset | :x: | |
| saveAsNewAPIHadoopFile | :x: | |
| saveAsPickleFile | :x: | |
| saveAsTextFile | :x: | |
| setName | :x: | |
| sortBy | :x: | |
| sortByKey | :x: | |
| stats | :white_check_mark: | |
| stdev | :white_check_mark: | |
| subtract | :x: | |
| substractByKey | :x: | |
| sum | :white_check_mark: | First version. |
| sumApprox | :x: | |
| take | :white_check_mark: | Ordering might not be guaranteed in the same way as it is in RDD. |
| takeOrdered | :x: | |
| takeSample | :x: | |
| toDF | :white_check_mark: | |
| toDebugString | :x: | |
| toLocalIterator | :x: | |
| top | :x: | |
| treeAggregate | :x: | |
| treeReduce | :x: | |
| union | :x: | |
| unpersist | :x: | |
| values | :white_check_mark: | |
| variance | :white_check_mark: | |
| withResources | :x: | |
| zip | :x: | |
| zipWithIndex | :x: | |
| zipWithUniqueId | :x: | |
### SparkContext
| RDD | API | Comment |
|-------------|--------------------|---------------------------------|
| parallelize | :white_check_mark: | Does not support numSlices yet. |
## Limitations
* Error handling and checking is kind of limited right now. We try
to emulate the existing behavior, but this is not always possible
because the invariants are not encode in Python but rather somewhere
in Scala.
* `numSlices` - we don't emulate this behavior for now.
Raw data
{
"_id": null,
"home_page": "https://github.com/databricks/congruity",
"name": "spark-congruity",
"maintainer": null,
"docs_url": null,
"requires_python": "<4.0,>=3.8",
"maintainer_email": null,
"keywords": null,
"author": "Matthew Powers",
"author_email": "matthewkevinpowers@gmail.com",
"download_url": "https://files.pythonhosted.org/packages/7e/bc/00db37e6c25b4b9f75cb76826ef0fe4871578f448bd02f20312148e895a5/spark_congruity-0.0.1rc5.tar.gz",
"platform": null,
"description": "# congruity\n\n[![GitHub Actions Build](https://github.com/databricks/congruity/actions/workflows/main.yml/badge.svg)](https://github.com/databricks/congruity/actions/workflows/main.yml)\n[![PyPI Downloads](https://static.pepy.tech/personalized-badge/spark-congruity?period=month&units=international_system&left_color=black&right_color=orange&left_text=PyPI%20downloads)](https://pypi.org/project/spark-congruity/)\n\nIn many ways, the migration from using classic Spark applications using the full\npower and flexibility to be using only the Spark Connect compatible DataFrame API\ncan be challenging.\n\nThe goal of this library is to provide a compatibility layer that makes it easier to\nadopt Spark Connect. The library is designed to be simply imported in your application\nand will then monkey-patch the existing API to provide the legacy functionality.\n\n## Non-Goals\n\nThis library is not intended to be a long-term solution. The goal is to provide a\ncompatibility layer that becomes obsolete over time. In addition, we do not aim to\nprovide compatibility for all methods and features but only a select subset. Lastly,\nwe do not aim to achieve the same performance as using some of the native RDD APIs.\n\n## Usage\n\nSpark JVM & Spark Connect compatibility library.\n\n```shell\npip install spark-congruity\n```\n\n```python\nimport congruity\n```\n\n### Example\n\nHere is code that works on Spark JVM:\n\n```python\nfrom pyspark.sql import SparkSession\n\nspark = SparkSession.builder.remote(\"sc://localhost\").getOrCreate()\ndata = [(\"Java\", \"20000\"), (\"Python\", \"100000\"), (\"Scala\", \"3000\")]\nspark.sparkContext.parallelize(data).toDF()\n```\n\nThis code doesn't work with Spark Connect. The congruity library rearranges the code under the hood, so the old syntax\nworks on Spark Connect clusters as well:\n\n```python\nimport congruity # noqa: F401\nfrom pyspark.sql import SparkSession\n\nspark = SparkSession.builder.remote(\"sc://localhost\").getOrCreate()\ndata = [(\"Java\", \"20000\"), (\"Python\", \"100000\"), (\"Scala\", \"3000\")]\nspark.sparkContext.parallelize(data).toDF()\n```\n\n## Contributing\n\nWe very much welcome contributions to this project. The easiest way to start is to pick any of\nthe below RDD or SparkContext methods and implement the compatibility layer. Once you have done\nthat open a pull request and we will review it.\n\n## What's supported?\n\n### RDD\n\n| RDD | API | Comment |\n|-----------------------------------|--------------------|-------------------------------------------------------------------|\n| aggregate | :white_check_mark: | |\n| aggregateByKey | :x: | |\n| barrier | :x: | |\n| cache | :x: | |\n| cartesian | :x: | |\n| checkpoint | :x: | |\n| cleanShuffleDependencies | :x: | |\n| coalesce | :x: | |\n| cogroup | :x: | |\n| collect | :white_check_mark: | |\n| collectAsMap | :x: | |\n| collectWithJobGroup | :x: | |\n| combineByKey | :x: | |\n| count | :white_check_mark: | |\n| countApprox | :x: | |\n| countByKey | :x: | |\n| countByValue | :x: | |\n| distinct | :x: | |\n| filter | :white_check_mark: | |\n| first | :white_check_mark: | |\n| flatMap | :x: | |\n| fold | :white_check_mark: | First version |\n| foreach | :x: | |\n| foreachPartition | :x: | |\n| fullOuterJoin | :x: | |\n| getCheckpointFile | :x: | |\n| getNumPartitions | :x: | |\n| getResourceProfile | :x: | |\n| getStorageLevel | :x: | |\n| glom | :white_check_mark: | |\n| groupBy | :white_check_mark: | |\n| groupByKey | :white_check_mark: | |\n| groupWith | :x: | |\n| histogram | :white_check_mark: | |\n| id | :x: | |\n| intersection | :x: | |\n| isCheckpointed | :x: | |\n| isEmpty | :x: | |\n| isLocallyCheckpointed | :x: | |\n| join | :x: | |\n| keyBy | :white_check_mark: | |\n| keys | :white_check_mark: | |\n| leftOuterJoin | :x: | |\n| localCheckpoint | :x: | |\n| lookup | :x: | |\n| map | :white_check_mark: | |\n| mapPartitions | :white_check_mark: | First version, based on mapInArrow. |\n| mapPartitionsWithIndex | :x: | |\n| mapPartitionsWithSplit | :x: | |\n| mapValues | :white_check_mark: | |\n| max | :white_check_mark: | |\n| mean | :white_check_mark: | |\n| meanApprox | :x: | |\n| min | :white_check_mark: | |\n| name | :x: | |\n| partitionBy | :x: | |\n| persist | :x: | |\n| pipe | :x: | |\n| randomSplit | :x: | |\n| reduce | :white_check_mark: | |\n| reduceByKey | :x: | |\n| repartition | :x: | |\n| repartitionAndSortWithinPartition | :x: | |\n| rightOuterJoin | :x: | |\n| sample | :x: | |\n| sampleByKey | :x: | |\n| sampleStdev | :white_check_mark: | |\n| sampleVariance | :white_check_mark: | |\n| saveAsHadoopDataset | :x: | |\n| saveAsHadoopFile | :x: | |\n| saveAsNewAPIHadoopDataset | :x: | |\n| saveAsNewAPIHadoopFile | :x: | |\n| saveAsPickleFile | :x: | |\n| saveAsTextFile | :x: | |\n| setName | :x: | |\n| sortBy | :x: | |\n| sortByKey | :x: | |\n| stats | :white_check_mark: | |\n| stdev | :white_check_mark: | |\n| subtract | :x: | |\n| substractByKey | :x: | |\n| sum | :white_check_mark: | First version. |\n| sumApprox | :x: | |\n| take | :white_check_mark: | Ordering might not be guaranteed in the same way as it is in RDD. |\n| takeOrdered | :x: | |\n| takeSample | :x: | |\n| toDF | :white_check_mark: | |\n| toDebugString | :x: | |\n| toLocalIterator | :x: | |\n| top | :x: | |\n| treeAggregate | :x: | |\n| treeReduce | :x: | |\n| union | :x: | |\n| unpersist | :x: | |\n| values | :white_check_mark: | |\n| variance | :white_check_mark: | |\n| withResources | :x: | |\n| zip | :x: | |\n| zipWithIndex | :x: | |\n| zipWithUniqueId | :x: | |\n\n### SparkContext\n\n| RDD | API | Comment |\n|-------------|--------------------|---------------------------------|\n| parallelize | :white_check_mark: | Does not support numSlices yet. |\n\n## Limitations\n\n* Error handling and checking is kind of limited right now. We try\n to emulate the existing behavior, but this is not always possible\n because the invariants are not encode in Python but rather somewhere\n in Scala.\n* `numSlices` - we don't emulate this behavior for now.\n",
"bugtrack_url": null,
"license": "Apache-2.0",
"summary": null,
"version": "0.0.1rc5",
"project_urls": {
"Homepage": "https://github.com/databricks/congruity",
"Repository": "https://github.com/databricks/congruity"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "1857c323b4f819e99550038d8164414de186ff694fee9c535ed80c637488f2a3",
"md5": "ed15d19787301e8b51b90209a1ca9631",
"sha256": "e38107b1e561e0509315f0b7c2511b52689e9ef607d8cbbed7edca492b50b6ab"
},
"downloads": -1,
"filename": "spark_congruity-0.0.1rc5-py3-none-any.whl",
"has_sig": false,
"md5_digest": "ed15d19787301e8b51b90209a1ca9631",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": "<4.0,>=3.8",
"size": 19537,
"upload_time": "2024-05-29T09:38:49",
"upload_time_iso_8601": "2024-05-29T09:38:49.424704Z",
"url": "https://files.pythonhosted.org/packages/18/57/c323b4f819e99550038d8164414de186ff694fee9c535ed80c637488f2a3/spark_congruity-0.0.1rc5-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "7ebc00db37e6c25b4b9f75cb76826ef0fe4871578f448bd02f20312148e895a5",
"md5": "34e34975b696429c3b24d97adefd1d3b",
"sha256": "aaf06969e68c6248932e5045b79ca8b9aa185a825e89cf0a0d682527e432f381"
},
"downloads": -1,
"filename": "spark_congruity-0.0.1rc5.tar.gz",
"has_sig": false,
"md5_digest": "34e34975b696429c3b24d97adefd1d3b",
"packagetype": "sdist",
"python_version": "source",
"requires_python": "<4.0,>=3.8",
"size": 15581,
"upload_time": "2024-05-29T09:38:50",
"upload_time_iso_8601": "2024-05-29T09:38:50.715923Z",
"url": "https://files.pythonhosted.org/packages/7e/bc/00db37e6c25b4b9f75cb76826ef0fe4871578f448bd02f20312148e895a5/spark_congruity-0.0.1rc5.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-05-29 09:38:50",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "databricks",
"github_project": "congruity",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "spark-congruity"
}