# Getting Started: td-pyspark
[Treasure Data](https://treasuredata.com) extension for using [pyspark](https://spark.apache.org/docs/latest/api/python/index.html).
## Installation
You can install td-pyspark from PyPI by using `pip` as follows:
```sh
$ pip install td-pyspark
```
If you want to install PySpark via PyPI as well, you can install as:
```sh
$ pip install td-pyspark[spark]
```
## Introduction
First contact [support@treasure-data.com](mailto:support@treasure-data.com) to enable td-spark feature. This feature is disabled by default.
td-pyspark is a library to enable Python to access tables in Treasure Data.
The features of td_pyspark include:
- Reading tables in Treasure Data as DataFrame
- Writing DataFrames to Treasure Data
- Submitting Presto queries and read the query results as DataFrames
For more details, see also [td-spark FAQs](https://docs.treasuredata.com/display/public/PD/Apache+Spark+Driver+%28td-spark%29+FAQs).
### Quick Start with Docker
You can try td_pyspark using Docker without installing Spark nor Python.
First create __td-spark.conf__ file and set your TD API KEY and site (us, jp, eu01, ap02) configurations:
__td-spark.conf__
```
spark.td.apikey (Your TD API KEY)
spark.td.site (Your site: us, jp, eu01, ap02)
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.execution.arrow.pyspark.enabled true
```
Launch pyspark Docker image. This image already has a pre-installed td_pyspark library:
```shell
$ docker run -it -e TD_SPARK_CONF=td-spark.conf -v $(pwd):/opt/spark/work devtd/td-spark-pyspark:latest_spark3.1.1
Python 3.9.2 (default, Feb 19 2021, 17:33:48)
[GCC 10.2.1 20201203] on linux
Type "help", "copyright", "credits" or "license" for more information.
21/05/10 09:04:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.1.1
/_/
Using Python version 3.9.2 (default, Feb 19 2021 17:33:48)
SparkSession available as 'spark'.
2021-05-10 09:04:53.268Z debug [spark] Loading com.treasuredata.spark package - (package.scala:23)
...
>>>
```
Try read a sample table by specifying a time range:
```python
>>> df = td.table("sample_datasets.www_access").within("+2d/2014-10-04").df()
>>> df.show()
2021-05-10 09:07:40.233Z info [PartitionScanner] Fetching the partition list of sample_datasets.www_access within time range:[2014-10-04 00:00:00Z,2014-10-06 00:00:00Z) - (PartitionScanner.scala:29)
2021-05-10 09:07:42.262Z info [PartitionScanner] Retrieved 2 partition entries - (PartitionScanner.scala:36)
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user| host| path| referer|code| agent|size|method| time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null|192.225.229.196| /category/software| -| 200|Mozilla/5.0 (Maci...| 117| GET|1412382292|
|null|120.168.215.131| /category/software| -| 200|Mozilla/5.0 (comp...| 53| GET|1412382284|
|null|180.198.173.136|/category/electro...| /category/computers| 200|Mozilla/5.0 (Wind...| 106| GET|1412382275|
|null| 140.168.145.49| /item/garden/2832| /item/toys/230| 200|Mozilla/5.0 (Maci...| 122| GET|1412382267|
|null| 52.168.78.222|/category/electro...| /item/games/2532| 200|Mozilla/5.0 (comp...| 73| GET|1412382259|
|null| 32.42.160.165| /category/cameras|/category/cameras...| 200|Mozilla/5.0 (Wind...| 117| GET|1412382251|
|null| 48.204.59.23| /category/software|/search/?c=Electr...| 200|Mozilla/5.0 (Maci...| 52| GET|1412382243|
|null|136.207.150.227|/category/electro...| -| 200|Mozilla/5.0 (iPad...| 120| GET|1412382234|
|null| 204.21.174.187| /category/jewelry| /item/office/3462| 200|Mozilla/5.0 (Wind...| 59| GET|1412382226|
|null| 224.198.88.93| /category/office| /category/music| 200|Mozilla/4.0 (comp...| 46| GET|1412382218|
|null| 96.54.24.116| /category/games| -| 200|Mozilla/5.0 (Wind...| 40| GET|1412382210|
|null| 184.42.224.210| /category/computers| -| 200|Mozilla/5.0 (Wind...| 95| GET|1412382201|
|null| 144.72.47.212|/item/giftcards/4684| /item/books/1031| 200|Mozilla/5.0 (Wind...| 65| GET|1412382193|
|null| 40.213.111.170| /item/toys/1085| /category/cameras| 200|Mozilla/5.0 (Wind...| 65| GET|1412382185|
|null| 132.54.226.209|/item/electronics...| /category/software| 200|Mozilla/5.0 (comp...| 121| GET|1412382177|
|null| 108.219.68.64|/category/cameras...| -| 200|Mozilla/5.0 (Maci...| 54| GET|1412382168|
|null| 168.66.149.218| /item/software/4343| /category/software| 200|Mozilla/4.0 (comp...| 139| GET|1412382160|
|null| 80.66.118.103| /category/software| -| 200|Mozilla/4.0 (comp...| 92| GET|1412382152|
|null|140.171.147.207| /category/music| /category/jewelry| 200|Mozilla/5.0 (Wind...| 119| GET|1412382144|
|null| 84.132.164.204| /item/software/4783|/category/electro...| 200|Mozilla/5.0 (Wind...| 137| GET|1412382135|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
only showing top 20 rows
>>>
```
## Usage
_TDSparkContext_ is an entry point to access td_pyspark's functionalities. To create TDSparkContext, pass your SparkSession (spark) to TDSparkContext:
```python
td = TDSparkContext(spark)
```
### Reading Tables as DataFrames
To read a table, use `td.table(table name)`:
```python
df = td.table("sample_datasets.www_access").df()
df.show()
```
To change the context database, use `td.use(database_name)`:
```python
td.use("sample_datasets")
# Accesses sample_datasets.www_access
df = td.table("www_access").df()
```
By calling `.df()` your table data will be read as Spark's DataFrame.
The usage of the DataFrame is the same with PySpark. See also [PySpark DataFrame documentation](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame).
#### Specifying Time Ranges
Treasure Data is a time series database, so reading recent data by specifying a time range is important to reduce the amount of data to be processed.
`.within(...)` function can be used to specify a target time range in a concise syntax.
`within` function accepts the same syntax used in [TD_INTERVAL function in Presto](https://docs.treasuredata.com/display/public/PD/Supported+Presto+and+TD+Functions#SupportedPrestoandTDFunctions-TD_INTERVAL).
For example, to read the last 1 hour range of data, use `within("-1h")`:
```python
td.table("tbl").within("-1h").df()
```
You can also read the last day's data:
```python
td.table("tbl").within("-1d").df()
```
You can also specify an _offset_ of the relative time range. This example reads the last days's data beginning from 7 days ago:
```python
td.table("tbl").within("-1d/-7d").df()
```
If you know an exact time range, `within("(start time)/(end time)")` is useful:
```python
>>> df = td.table("sample_datasets.www_access").within("2014-10-04/2014-10-05").df()
>>> df.show()
2021-05-10 09:10:02.366Z info [PartitionScanner] Fetching the partition list of sample_datasets.www_access within time range:[2014-10-04 00:00:00Z,2014-10-05 00:00:00Z) - (PartitionScanner.scala:29)
...
```
See [this doc](https://docs.treasuredata.com/display/public/PD/Supported+Presto+and+TD+Functions#SupportedPrestoandTDFunctions-TD_INTERVAL) for more examples of interval strings.
#### Submitting Presto Queries
If your Spark cluster is small, reading all of the data as in-memory DataFrame might be difficult.
In this case, you can utilize Presto, a distributed SQL query engine, to reduce the amount of data processing with PySpark:
```python
>>> q = td.presto("select code, * from sample_datasets.www_access")
>>> q.show()
2019-06-13 20:09:13.245Z info [TDPrestoJDBCRDD] - (TDPrestoRelation.scala:106)
Submit Presto query:
select code, count(*) cnt from sample_datasets.www_access group by 1
+----+----+
|code| cnt|
+----+----+
| 200|4981|
| 500| 2|
| 404| 17|
+----+----+
```
The query result is represented as a DataFrame.
To run non query statements (e.g., INSERT INTO, CREATE TABLE, etc.) use `execute_presto(sql)`:
```python
td.execute_presto("CREATE TABLE IF NOT EXISTS A(time bigint, id varchar)")
```
#### Using SparkSQL
To use tables in Treaure Data inside Spark SQL, create a view with `df.createOrReplaceTempView(...)`:
```python
# Read TD table as a DataFrame
df = td.table("mydb.test1").df()
# Register the DataFrame as a view
df.createOrReplaceTempView("test1")
spark.sql("SELECT * FROM test1").show()
```
### Create or Drop Databases and Tables
Create a new table or database:
```python
td.create_database_if_not_exists("mydb")
td.create_table_if_not_exists("mydb.test1")
```
Delete unnecessary tables:
```python
td.drop_table_if_exists("mydb.test1")
td.drop_database_if_exists("mydb")
```
You can also check the presence of a table:
```python
td.table("mydb.test1").exists() # True if the table exists
```
### Create User-Defined Partition Tables
User-defined partitioning ([UDP](https://docs.treasuredata.com/display/public/PD/Defining+Partitioning+for+Presto)) is useful if
you know a column in the table that has unique identifiers (e.g., IDs, category values).
You can create a UDP table partitioned by id (string type column) as follows:
```python
td.create_udp_s("mydb.user_list", "id")
```
To create a UDP table, partitioned by Long (bigint) type column, use `td.create_udp_l`:
```python
td.create_udp_l("mydb.departments", "dept_id")
```
### Swapping Table Contents
You can replace the contents of two tables. The input tables must be in the same database:
```python
# Swap the contents of two tables
td.swap_tables("mydb.tbl1", "mydb.tbl2")
# Another way to swap tables
td.table("mydb.tbl1").swap_table_with("tbl2")
```
### Uploading DataFrames to Treasure Data
To save your local DataFrames as a table, `td.insert_into(df, table)` and `td.create_or_replace(df, table)` can be used:
```python
# Insert the records in the input DataFrame to the target table:
td.insert_into(df, "mydb.tbl1")
# Create or replace the target table with the content of the input DataFrame:
td.create_or_replace(df, "mydb.tbl2")
```
## Using multiple TD accounts
To specify a new api key aside from the key that is configured in td-spark.conf, just use `td.with_apikey(apikey)`:
```python
# Returns a new TDSparkContext with the specified key
td2 = td.with_apikey("key2")
```
For reading tables or uploading DataFrames with the new key, use `td2`:
```python
# Read a table with key2
df = td2.table("sample_datasets.www_access").df()
...
# Insert the records with key2
td2.insert_into(df, "mydb.tbl1")
```
### Running PySpark jobs with spark-submit
To submit your PySpark script to a Spark cluster, you will need the following files:
- __td-spark.conf__ file that describes your TD API key and `spark.td.site` (See above).
- __td_pyspark.py__
- Check the file location using `pip show -f td-pyspark`, and copy td_pyspark.py to your favorite location
- __td-spark-assembly-latest_xxxx.jar__
- Get the latest version from [Download](https://treasure-data.github.io/td-spark/release_notes.html#download) page.
- Pre-build Spark
- [Download Spark 3.4.2](https://spark.apache.org/downloads.html) with Hadoop 3.3 (built for Scala 2.12)
- Extract the downloaded archive. This folder location will be your `$SPARK_HOME`.
Here is an example PySpark application code:
__my_app.py__
```python
import td_pyspark
from pyspark.sql import SparkSession
# Create a new SparkSession
spark = SparkSession\
.builder\
.appName("myapp")\
.getOrCreate()
# Create TDSparkContext
td = td_pyspark.TDSparkContext(spark)
# Read the table data within -1d (yesterday) range as DataFrame
df = td.table("sample_datasets.www_access").within("-1d").df()
df.show()
```
To run `my_app.py` use spark-submit by specifying the necessary files mentioned above:
```bash
# Launching PySpark with the local mode
$ ${SPARK_HOME}/bin/spark-submit --master "local[4]"\
--driver-class-path td-spark-assembly.jar\
--properties-file=td-spark.conf\
--py-files td_pyspark.py\
my_app.py
```
`local[4]` means running a Spark cluster locally using 4 threads.
To use a remote Spark cluster, specify `master` address, e.g., `--master=spark://(master node IP address):7077`.
### Using td-spark assembly included in the PyPI package.
The package contains pre-built binary of td-spark so that you can add it into the classpath as default.
`TDSparkContextBuilder.default_jar_path()` returns the path to the default td-spark-assembly.jar file.
Passing the path to `jars` method of TDSparkContextBuilder will automatically build the SparkSession including the default jar.
```python
import td_pyspark
from pyspark.sql import SparkSession
builder = SparkSession\
.builder\
.appName("td-pyspark-app")
td = td_pyspark.TDSparkContextBuilder(builder)\
.apikey("XXXXXXXXXXXXXX")\
.jars(TDSparkContextBuilder.default_jar_path())\
.build()
```
Raw data
{
"_id": null,
"home_page": "https://docs.treasuredata.com/display/public/INT/Data+Science+and+SQL+Tools",
"name": "td-pyspark",
"maintainer": null,
"docs_url": null,
"requires_python": null,
"maintainer_email": null,
"keywords": "Spark PySpark TreasureData",
"author": "Treasure Data",
"author_email": "dev+pypi@treasure-data.com",
"download_url": "https://files.pythonhosted.org/packages/94/62/4428acc986a2aac4c223f4746c19257c54405ce3b363f77989cbdbf43591/td_pyspark-24.4.1.tar.gz",
"platform": null,
"description": "# Getting Started: td-pyspark\n\n[Treasure Data](https://treasuredata.com) extension for using [pyspark](https://spark.apache.org/docs/latest/api/python/index.html).\n\n## Installation\n\nYou can install td-pyspark from PyPI by using `pip` as follows:\n\n```sh\n$ pip install td-pyspark\n```\n\nIf you want to install PySpark via PyPI as well, you can install as:\n\n```sh\n$ pip install td-pyspark[spark]\n```\n\n## Introduction\n\nFirst contact [support@treasure-data.com](mailto:support@treasure-data.com) to enable td-spark feature. This feature is disabled by default.\n\ntd-pyspark is a library to enable Python to access tables in Treasure Data.\nThe features of td_pyspark include:\n\n- Reading tables in Treasure Data as DataFrame\n- Writing DataFrames to Treasure Data\n- Submitting Presto queries and read the query results as DataFrames\n\nFor more details, see also [td-spark FAQs](https://docs.treasuredata.com/display/public/PD/Apache+Spark+Driver+%28td-spark%29+FAQs).\n\n### Quick Start with Docker\n\nYou can try td_pyspark using Docker without installing Spark nor Python.\n\nFirst create __td-spark.conf__ file and set your TD API KEY and site (us, jp, eu01, ap02) configurations:\n\n__td-spark.conf__\n```\nspark.td.apikey (Your TD API KEY)\nspark.td.site (Your site: us, jp, eu01, ap02)\nspark.serializer org.apache.spark.serializer.KryoSerializer\nspark.sql.execution.arrow.pyspark.enabled true\n```\n\nLaunch pyspark Docker image. This image already has a pre-installed td_pyspark library:\n\n```shell\n$ docker run -it -e TD_SPARK_CONF=td-spark.conf -v $(pwd):/opt/spark/work devtd/td-spark-pyspark:latest_spark3.1.1\nPython 3.9.2 (default, Feb 19 2021, 17:33:48) \n[GCC 10.2.1 20201203] on linux\nType \"help\", \"copyright\", \"credits\" or \"license\" for more information.\n21/05/10 09:04:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\nUsing Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\nSetting default log level to \"WARN\".\nTo adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\nWelcome to\n ____ __\n / __/__ ___ _____/ /__\n _\\ \\/ _ \\/ _ `/ __/ '_/\n /__ / .__/\\_,_/_/ /_/\\_\\ version 3.1.1\n /_/\n\nUsing Python version 3.9.2 (default, Feb 19 2021 17:33:48)\nSparkSession available as 'spark'.\n2021-05-10 09:04:53.268Z debug [spark] Loading com.treasuredata.spark package - (package.scala:23)\n...\n>>>\n```\n\nTry read a sample table by specifying a time range:\n\n```python\n>>> df = td.table(\"sample_datasets.www_access\").within(\"+2d/2014-10-04\").df()\n>>> df.show()\n2021-05-10 09:07:40.233Z info [PartitionScanner] Fetching the partition list of sample_datasets.www_access within time range:[2014-10-04 00:00:00Z,2014-10-06 00:00:00Z) - (PartitionScanner.scala:29)\n2021-05-10 09:07:42.262Z info [PartitionScanner] Retrieved 2 partition entries - (PartitionScanner.scala:36)\n+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+\n|user| host| path| referer|code| agent|size|method| time|\n+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+\n|null|192.225.229.196| /category/software| -| 200|Mozilla/5.0 (Maci...| 117| GET|1412382292|\n|null|120.168.215.131| /category/software| -| 200|Mozilla/5.0 (comp...| 53| GET|1412382284|\n|null|180.198.173.136|/category/electro...| /category/computers| 200|Mozilla/5.0 (Wind...| 106| GET|1412382275|\n|null| 140.168.145.49| /item/garden/2832| /item/toys/230| 200|Mozilla/5.0 (Maci...| 122| GET|1412382267|\n|null| 52.168.78.222|/category/electro...| /item/games/2532| 200|Mozilla/5.0 (comp...| 73| GET|1412382259|\n|null| 32.42.160.165| /category/cameras|/category/cameras...| 200|Mozilla/5.0 (Wind...| 117| GET|1412382251|\n|null| 48.204.59.23| /category/software|/search/?c=Electr...| 200|Mozilla/5.0 (Maci...| 52| GET|1412382243|\n|null|136.207.150.227|/category/electro...| -| 200|Mozilla/5.0 (iPad...| 120| GET|1412382234|\n|null| 204.21.174.187| /category/jewelry| /item/office/3462| 200|Mozilla/5.0 (Wind...| 59| GET|1412382226|\n|null| 224.198.88.93| /category/office| /category/music| 200|Mozilla/4.0 (comp...| 46| GET|1412382218|\n|null| 96.54.24.116| /category/games| -| 200|Mozilla/5.0 (Wind...| 40| GET|1412382210|\n|null| 184.42.224.210| /category/computers| -| 200|Mozilla/5.0 (Wind...| 95| GET|1412382201|\n|null| 144.72.47.212|/item/giftcards/4684| /item/books/1031| 200|Mozilla/5.0 (Wind...| 65| GET|1412382193|\n|null| 40.213.111.170| /item/toys/1085| /category/cameras| 200|Mozilla/5.0 (Wind...| 65| GET|1412382185|\n|null| 132.54.226.209|/item/electronics...| /category/software| 200|Mozilla/5.0 (comp...| 121| GET|1412382177|\n|null| 108.219.68.64|/category/cameras...| -| 200|Mozilla/5.0 (Maci...| 54| GET|1412382168|\n|null| 168.66.149.218| /item/software/4343| /category/software| 200|Mozilla/4.0 (comp...| 139| GET|1412382160|\n|null| 80.66.118.103| /category/software| -| 200|Mozilla/4.0 (comp...| 92| GET|1412382152|\n|null|140.171.147.207| /category/music| /category/jewelry| 200|Mozilla/5.0 (Wind...| 119| GET|1412382144|\n|null| 84.132.164.204| /item/software/4783|/category/electro...| 200|Mozilla/5.0 (Wind...| 137| GET|1412382135|\n+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+\nonly showing top 20 rows\n>>>\n```\n\n\n## Usage\n\n_TDSparkContext_ is an entry point to access td_pyspark's functionalities. To create TDSparkContext, pass your SparkSession (spark) to TDSparkContext:\n\n```python\ntd = TDSparkContext(spark)\n```\n\n### Reading Tables as DataFrames\n\nTo read a table, use `td.table(table name)`:\n\n```python\ndf = td.table(\"sample_datasets.www_access\").df()\ndf.show()\n```\n\nTo change the context database, use `td.use(database_name)`:\n\n```python\ntd.use(\"sample_datasets\")\n# Accesses sample_datasets.www_access\ndf = td.table(\"www_access\").df()\n```\n\nBy calling `.df()` your table data will be read as Spark's DataFrame.\nThe usage of the DataFrame is the same with PySpark. See also [PySpark DataFrame documentation](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame).\n\n#### Specifying Time Ranges\n\nTreasure Data is a time series database, so reading recent data by specifying a time range is important to reduce the amount of data to be processed.\n`.within(...)` function can be used to specify a target time range in a concise syntax.\n`within` function accepts the same syntax used in [TD_INTERVAL function in Presto](https://docs.treasuredata.com/display/public/PD/Supported+Presto+and+TD+Functions#SupportedPrestoandTDFunctions-TD_INTERVAL).\n\nFor example, to read the last 1 hour range of data, use `within(\"-1h\")`:\n\n```python\ntd.table(\"tbl\").within(\"-1h\").df()\n```\n\nYou can also read the last day's data:\n\n```python\ntd.table(\"tbl\").within(\"-1d\").df()\n```\n\nYou can also specify an _offset_ of the relative time range. This example reads the last days's data beginning from 7 days ago:\n\n```python\ntd.table(\"tbl\").within(\"-1d/-7d\").df()\n```\n\nIf you know an exact time range, `within(\"(start time)/(end time)\")` is useful:\n\n```python\n>>> df = td.table(\"sample_datasets.www_access\").within(\"2014-10-04/2014-10-05\").df()\n>>> df.show()\n2021-05-10 09:10:02.366Z info [PartitionScanner] Fetching the partition list of sample_datasets.www_access within time range:[2014-10-04 00:00:00Z,2014-10-05 00:00:00Z) - (PartitionScanner.scala:29)\n...\n```\n\nSee [this doc](https://docs.treasuredata.com/display/public/PD/Supported+Presto+and+TD+Functions#SupportedPrestoandTDFunctions-TD_INTERVAL) for more examples of interval strings.\n\n\n#### Submitting Presto Queries\n\nIf your Spark cluster is small, reading all of the data as in-memory DataFrame might be difficult.\nIn this case, you can utilize Presto, a distributed SQL query engine, to reduce the amount of data processing with PySpark:\n\n```python\n>>> q = td.presto(\"select code, * from sample_datasets.www_access\")\n>>> q.show()\n2019-06-13 20:09:13.245Z info [TDPrestoJDBCRDD] - (TDPrestoRelation.scala:106)\nSubmit Presto query:\nselect code, count(*) cnt from sample_datasets.www_access group by 1\n+----+----+\n|code| cnt|\n+----+----+\n| 200|4981|\n| 500| 2|\n| 404| 17|\n+----+----+\n```\n\nThe query result is represented as a DataFrame.\n\nTo run non query statements (e.g., INSERT INTO, CREATE TABLE, etc.) use `execute_presto(sql)`:\n\n```python\ntd.execute_presto(\"CREATE TABLE IF NOT EXISTS A(time bigint, id varchar)\")\n```\n\n#### Using SparkSQL\n\nTo use tables in Treaure Data inside Spark SQL, create a view with `df.createOrReplaceTempView(...)`:\n\n```python\n# Read TD table as a DataFrame\ndf = td.table(\"mydb.test1\").df()\n# Register the DataFrame as a view\ndf.createOrReplaceTempView(\"test1\")\n\nspark.sql(\"SELECT * FROM test1\").show()\n```\n\n### Create or Drop Databases and Tables\n\nCreate a new table or database:\n\n```python\ntd.create_database_if_not_exists(\"mydb\")\ntd.create_table_if_not_exists(\"mydb.test1\")\n```\n\nDelete unnecessary tables:\n\n```python\ntd.drop_table_if_exists(\"mydb.test1\")\ntd.drop_database_if_exists(\"mydb\")\n```\n\nYou can also check the presence of a table:\n\n```python\ntd.table(\"mydb.test1\").exists() # True if the table exists\n```\n\n### Create User-Defined Partition Tables\n\nUser-defined partitioning ([UDP](https://docs.treasuredata.com/display/public/PD/Defining+Partitioning+for+Presto)) is useful if\nyou know a column in the table that has unique identifiers (e.g., IDs, category values).\n\nYou can create a UDP table partitioned by id (string type column) as follows:\n\n```python\ntd.create_udp_s(\"mydb.user_list\", \"id\")\n```\n\nTo create a UDP table, partitioned by Long (bigint) type column, use `td.create_udp_l`:\n\n```python\ntd.create_udp_l(\"mydb.departments\", \"dept_id\")\n```\n\n### Swapping Table Contents\n\nYou can replace the contents of two tables. The input tables must be in the same database:\n\n```python\n# Swap the contents of two tables\ntd.swap_tables(\"mydb.tbl1\", \"mydb.tbl2\")\n\n# Another way to swap tables\ntd.table(\"mydb.tbl1\").swap_table_with(\"tbl2\")\n```\n\n### Uploading DataFrames to Treasure Data\n\nTo save your local DataFrames as a table, `td.insert_into(df, table)` and `td.create_or_replace(df, table)` can be used:\n\n```python\n# Insert the records in the input DataFrame to the target table:\ntd.insert_into(df, \"mydb.tbl1\")\n\n# Create or replace the target table with the content of the input DataFrame:\ntd.create_or_replace(df, \"mydb.tbl2\")\n```\n\n## Using multiple TD accounts\n\nTo specify a new api key aside from the key that is configured in td-spark.conf, just use `td.with_apikey(apikey)`:\n\n```python\n# Returns a new TDSparkContext with the specified key\ntd2 = td.with_apikey(\"key2\")\n```\n\nFor reading tables or uploading DataFrames with the new key, use `td2`:\n\n```python\n# Read a table with key2\ndf = td2.table(\"sample_datasets.www_access\").df()\n...\n# Insert the records with key2\ntd2.insert_into(df, \"mydb.tbl1\")\n```\n\n### Running PySpark jobs with spark-submit\n\nTo submit your PySpark script to a Spark cluster, you will need the following files:\n\n- __td-spark.conf__ file that describes your TD API key and `spark.td.site` (See above).\n- __td_pyspark.py__\n - Check the file location using `pip show -f td-pyspark`, and copy td_pyspark.py to your favorite location\n- __td-spark-assembly-latest_xxxx.jar__\n - Get the latest version from [Download](https://treasure-data.github.io/td-spark/release_notes.html#download) page.\n- Pre-build Spark\n - [Download Spark 3.4.2](https://spark.apache.org/downloads.html) with Hadoop 3.3 (built for Scala 2.12)\n - Extract the downloaded archive. This folder location will be your `$SPARK_HOME`.\n\nHere is an example PySpark application code:\n__my_app.py__\n\n```python\nimport td_pyspark\nfrom pyspark.sql import SparkSession\n\n# Create a new SparkSession\nspark = SparkSession\\\n .builder\\\n .appName(\"myapp\")\\\n .getOrCreate()\n\n# Create TDSparkContext\ntd = td_pyspark.TDSparkContext(spark)\n\n# Read the table data within -1d (yesterday) range as DataFrame\ndf = td.table(\"sample_datasets.www_access\").within(\"-1d\").df()\ndf.show()\n```\n\nTo run `my_app.py` use spark-submit by specifying the necessary files mentioned above:\n\n```bash\n# Launching PySpark with the local mode\n$ ${SPARK_HOME}/bin/spark-submit --master \"local[4]\"\\\n --driver-class-path td-spark-assembly.jar\\\n --properties-file=td-spark.conf\\\n --py-files td_pyspark.py\\\n my_app.py\n```\n\n`local[4]` means running a Spark cluster locally using 4 threads.\n\nTo use a remote Spark cluster, specify `master` address, e.g., `--master=spark://(master node IP address):7077`.\n\n### Using td-spark assembly included in the PyPI package.\n\nThe package contains pre-built binary of td-spark so that you can add it into the classpath as default.\n`TDSparkContextBuilder.default_jar_path()` returns the path to the default td-spark-assembly.jar file.\nPassing the path to `jars` method of TDSparkContextBuilder will automatically build the SparkSession including the default jar.\n\n```python\nimport td_pyspark\nfrom pyspark.sql import SparkSession\n\nbuilder = SparkSession\\\n .builder\\\n .appName(\"td-pyspark-app\")\n\ntd = td_pyspark.TDSparkContextBuilder(builder)\\\n .apikey(\"XXXXXXXXXXXXXX\")\\\n .jars(TDSparkContextBuilder.default_jar_path())\\\n .build()\n```\n",
"bugtrack_url": null,
"license": "Apache 2",
"summary": "Treasure Data extension for pyspark",
"version": "24.4.1",
"project_urls": {
"Homepage": "https://docs.treasuredata.com/display/public/INT/Data+Science+and+SQL+Tools"
},
"split_keywords": [
"spark",
"pyspark",
"treasuredata"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "c6aaca3356b8f0b64953172fd66b8179258fd8d539151885b7e61fe4ce13554e",
"md5": "3ad63844d7767c49a8ec1c568e36676a",
"sha256": "454b9b529fc7caa8c0e4593a3a9dd08b4f92654e062d385a17511b90af3ceb19"
},
"downloads": -1,
"filename": "td_pyspark-24.4.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "3ad63844d7767c49a8ec1c568e36676a",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": null,
"size": 29562260,
"upload_time": "2024-04-04T05:04:43",
"upload_time_iso_8601": "2024-04-04T05:04:43.633812Z",
"url": "https://files.pythonhosted.org/packages/c6/aa/ca3356b8f0b64953172fd66b8179258fd8d539151885b7e61fe4ce13554e/td_pyspark-24.4.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "94624428acc986a2aac4c223f4746c19257c54405ce3b363f77989cbdbf43591",
"md5": "9949c977691f6f2447fdbedf30ef497e",
"sha256": "15883402ecc4a37c858b4b028159728b4dbdb60d2e3efc941fd8b7dcbf7fd598"
},
"downloads": -1,
"filename": "td_pyspark-24.4.1.tar.gz",
"has_sig": false,
"md5_digest": "9949c977691f6f2447fdbedf30ef497e",
"packagetype": "sdist",
"python_version": "source",
"requires_python": null,
"size": 29566459,
"upload_time": "2024-04-04T05:04:47",
"upload_time_iso_8601": "2024-04-04T05:04:47.009810Z",
"url": "https://files.pythonhosted.org/packages/94/62/4428acc986a2aac4c223f4746c19257c54405ce3b363f77989cbdbf43591/td_pyspark-24.4.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-04-04 05:04:47",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "td-pyspark"
}