td-pyspark


Nametd-pyspark JSON
Version 24.4.1 PyPI version JSON
download
home_pagehttps://docs.treasuredata.com/display/public/INT/Data+Science+and+SQL+Tools
SummaryTreasure Data extension for pyspark
upload_time2024-04-04 05:04:47
maintainerNone
docs_urlNone
authorTreasure Data
requires_pythonNone
licenseApache 2
keywords spark pyspark treasuredata
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Getting Started: td-pyspark

[Treasure Data](https://treasuredata.com) extension for using [pyspark](https://spark.apache.org/docs/latest/api/python/index.html).

## Installation

You can install td-pyspark from PyPI by using `pip` as follows:

```sh
$ pip install td-pyspark
```

If you want to install PySpark via PyPI as well, you can install as:

```sh
$ pip install td-pyspark[spark]
```

## Introduction

First contact [support@treasure-data.com](mailto:support@treasure-data.com) to enable td-spark feature. This feature is disabled by default.

td-pyspark is a library to enable Python to access tables in Treasure Data.
The features of td_pyspark include:

- Reading tables in Treasure Data as DataFrame
- Writing DataFrames to Treasure Data
- Submitting Presto queries and read the query results as DataFrames

For more details, see also [td-spark FAQs](https://docs.treasuredata.com/display/public/PD/Apache+Spark+Driver+%28td-spark%29+FAQs).

### Quick Start with Docker

You can try td_pyspark using Docker without installing Spark nor Python.

First create __td-spark.conf__ file and set your TD API KEY and site (us, jp, eu01, ap02) configurations:

__td-spark.conf__
```
spark.td.apikey (Your TD API KEY)
spark.td.site (Your site: us, jp, eu01, ap02)
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.execution.arrow.pyspark.enabled true
```

Launch pyspark Docker image. This image already has a pre-installed td_pyspark library:

```shell
$ docker run -it -e TD_SPARK_CONF=td-spark.conf -v $(pwd):/opt/spark/work devtd/td-spark-pyspark:latest_spark3.1.1
Python 3.9.2 (default, Feb 19 2021, 17:33:48) 
[GCC 10.2.1 20201203] on linux
Type "help", "copyright", "credits" or "license" for more information.
21/05/10 09:04:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.1.1
      /_/

Using Python version 3.9.2 (default, Feb 19 2021 17:33:48)
SparkSession available as 'spark'.
2021-05-10 09:04:53.268Z debug [spark] Loading com.treasuredata.spark package - (package.scala:23)
...
>>>
```

Try read a sample table by specifying a time range:

```python
>>> df = td.table("sample_datasets.www_access").within("+2d/2014-10-04").df()
>>> df.show()
2021-05-10 09:07:40.233Z  info [PartitionScanner] Fetching the partition list of sample_datasets.www_access within time range:[2014-10-04 00:00:00Z,2014-10-06 00:00:00Z) - (PartitionScanner.scala:29)
2021-05-10 09:07:42.262Z  info [PartitionScanner] Retrieved 2 partition entries - (PartitionScanner.scala:36)
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|user|           host|                path|             referer|code|               agent|size|method|      time|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
|null|192.225.229.196|  /category/software|                   -| 200|Mozilla/5.0 (Maci...| 117|   GET|1412382292|
|null|120.168.215.131|  /category/software|                   -| 200|Mozilla/5.0 (comp...|  53|   GET|1412382284|
|null|180.198.173.136|/category/electro...| /category/computers| 200|Mozilla/5.0 (Wind...| 106|   GET|1412382275|
|null| 140.168.145.49|   /item/garden/2832|      /item/toys/230| 200|Mozilla/5.0 (Maci...| 122|   GET|1412382267|
|null|  52.168.78.222|/category/electro...|    /item/games/2532| 200|Mozilla/5.0 (comp...|  73|   GET|1412382259|
|null|  32.42.160.165|   /category/cameras|/category/cameras...| 200|Mozilla/5.0 (Wind...| 117|   GET|1412382251|
|null|   48.204.59.23|  /category/software|/search/?c=Electr...| 200|Mozilla/5.0 (Maci...|  52|   GET|1412382243|
|null|136.207.150.227|/category/electro...|                   -| 200|Mozilla/5.0 (iPad...| 120|   GET|1412382234|
|null| 204.21.174.187|   /category/jewelry|   /item/office/3462| 200|Mozilla/5.0 (Wind...|  59|   GET|1412382226|
|null|  224.198.88.93|    /category/office|     /category/music| 200|Mozilla/4.0 (comp...|  46|   GET|1412382218|
|null|   96.54.24.116|     /category/games|                   -| 200|Mozilla/5.0 (Wind...|  40|   GET|1412382210|
|null| 184.42.224.210| /category/computers|                   -| 200|Mozilla/5.0 (Wind...|  95|   GET|1412382201|
|null|  144.72.47.212|/item/giftcards/4684|    /item/books/1031| 200|Mozilla/5.0 (Wind...|  65|   GET|1412382193|
|null| 40.213.111.170|     /item/toys/1085|   /category/cameras| 200|Mozilla/5.0 (Wind...|  65|   GET|1412382185|
|null| 132.54.226.209|/item/electronics...|  /category/software| 200|Mozilla/5.0 (comp...| 121|   GET|1412382177|
|null|  108.219.68.64|/category/cameras...|                   -| 200|Mozilla/5.0 (Maci...|  54|   GET|1412382168|
|null| 168.66.149.218| /item/software/4343|  /category/software| 200|Mozilla/4.0 (comp...| 139|   GET|1412382160|
|null|  80.66.118.103|  /category/software|                   -| 200|Mozilla/4.0 (comp...|  92|   GET|1412382152|
|null|140.171.147.207|     /category/music|   /category/jewelry| 200|Mozilla/5.0 (Wind...| 119|   GET|1412382144|
|null| 84.132.164.204| /item/software/4783|/category/electro...| 200|Mozilla/5.0 (Wind...| 137|   GET|1412382135|
+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+
only showing top 20 rows
>>>
```


## Usage

_TDSparkContext_ is an entry point to access td_pyspark's functionalities. To create TDSparkContext, pass your SparkSession (spark) to TDSparkContext:

```python
td = TDSparkContext(spark)
```

### Reading Tables as DataFrames

To read a table, use `td.table(table name)`:

```python
df = td.table("sample_datasets.www_access").df()
df.show()
```

To change the context database, use `td.use(database_name)`:

```python
td.use("sample_datasets")
# Accesses sample_datasets.www_access
df = td.table("www_access").df()
```

By calling `.df()` your table data will be read as Spark's DataFrame.
The usage of the DataFrame is the same with PySpark. See also [PySpark DataFrame documentation](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame).

#### Specifying Time Ranges

Treasure Data is a time series database, so reading recent data by specifying a time range is important to reduce the amount of data to be processed.
`.within(...)` function can be used to specify a target time range in a concise syntax.
`within` function accepts the same syntax used in [TD_INTERVAL function in Presto](https://docs.treasuredata.com/display/public/PD/Supported+Presto+and+TD+Functions#SupportedPrestoandTDFunctions-TD_INTERVAL).

For example, to read the last 1 hour range of data, use `within("-1h")`:

```python
td.table("tbl").within("-1h").df()
```

You can also read the last day's data:

```python
td.table("tbl").within("-1d").df()
```

You can also specify an _offset_ of the relative time range. This example reads the last days's data beginning from 7 days ago:

```python
td.table("tbl").within("-1d/-7d").df()
```

If you know an exact time range, `within("(start time)/(end time)")` is useful:

```python
>>> df = td.table("sample_datasets.www_access").within("2014-10-04/2014-10-05").df()
>>> df.show()
2021-05-10 09:10:02.366Z  info [PartitionScanner] Fetching the partition list of sample_datasets.www_access within time range:[2014-10-04 00:00:00Z,2014-10-05 00:00:00Z) - (PartitionScanner.scala:29)
...
```

See [this doc](https://docs.treasuredata.com/display/public/PD/Supported+Presto+and+TD+Functions#SupportedPrestoandTDFunctions-TD_INTERVAL) for more examples of interval strings.


#### Submitting Presto Queries

If your Spark cluster is small, reading all of the data as in-memory DataFrame might be difficult.
In this case, you can utilize Presto, a distributed SQL query engine, to reduce the amount of data processing with PySpark:

```python
>>> q = td.presto("select code, * from sample_datasets.www_access")
>>> q.show()
2019-06-13 20:09:13.245Z  info [TDPrestoJDBCRDD]  - (TDPrestoRelation.scala:106)
Submit Presto query:
select code, count(*) cnt from sample_datasets.www_access group by 1
+----+----+
|code| cnt|
+----+----+
| 200|4981|
| 500|   2|
| 404|  17|
+----+----+
```

The query result is represented as a DataFrame.

To run non query statements (e.g., INSERT INTO, CREATE TABLE, etc.) use `execute_presto(sql)`:

```python
td.execute_presto("CREATE TABLE IF NOT EXISTS A(time bigint, id varchar)")
```

#### Using SparkSQL

To use tables in Treaure Data inside Spark SQL, create a view with `df.createOrReplaceTempView(...)`:

```python
# Read TD table as a DataFrame
df = td.table("mydb.test1").df()
# Register the DataFrame as a view
df.createOrReplaceTempView("test1")

spark.sql("SELECT * FROM test1").show()
```

### Create or Drop Databases and Tables

Create a new table or database:

```python
td.create_database_if_not_exists("mydb")
td.create_table_if_not_exists("mydb.test1")
```

Delete unnecessary tables:

```python
td.drop_table_if_exists("mydb.test1")
td.drop_database_if_exists("mydb")
```

You can also check the presence of a table:

```python
td.table("mydb.test1").exists() # True if the table exists
```

### Create User-Defined Partition Tables

User-defined partitioning ([UDP](https://docs.treasuredata.com/display/public/PD/Defining+Partitioning+for+Presto)) is useful if
you know a column in the table that has unique identifiers (e.g., IDs, category values).

You can create a UDP table partitioned by id (string type column) as follows:

```python
td.create_udp_s("mydb.user_list", "id")
```

To create a UDP table, partitioned by Long (bigint) type column, use `td.create_udp_l`:

```python
td.create_udp_l("mydb.departments", "dept_id")
```

### Swapping Table Contents

You can replace the contents of two tables. The input tables must be in the same database:

```python
# Swap the contents of two tables
td.swap_tables("mydb.tbl1", "mydb.tbl2")

# Another way to swap tables
td.table("mydb.tbl1").swap_table_with("tbl2")
```

### Uploading DataFrames to Treasure Data

To save your local DataFrames as a table, `td.insert_into(df, table)` and `td.create_or_replace(df, table)` can be used:

```python
# Insert the records in the input DataFrame to the target table:
td.insert_into(df, "mydb.tbl1")

# Create or replace the target table with the content of the input DataFrame:
td.create_or_replace(df, "mydb.tbl2")
```

## Using multiple TD accounts

To specify a new api key aside from the key that is configured in td-spark.conf, just use `td.with_apikey(apikey)`:

```python
# Returns a new TDSparkContext with the specified key
td2 = td.with_apikey("key2")
```

For reading tables or uploading DataFrames with the new key, use `td2`:

```python
# Read a table with key2
df = td2.table("sample_datasets.www_access").df()
...
# Insert the records with key2
td2.insert_into(df, "mydb.tbl1")
```

### Running PySpark jobs with spark-submit

To submit your PySpark script to a Spark cluster, you will need the following files:

- __td-spark.conf__ file that describes your TD API key and `spark.td.site` (See above).
- __td_pyspark.py__
  - Check the file location using `pip show -f td-pyspark`, and copy td_pyspark.py to your favorite location
- __td-spark-assembly-latest_xxxx.jar__
  - Get the latest version from [Download](https://treasure-data.github.io/td-spark/release_notes.html#download) page.
- Pre-build Spark
  - [Download Spark 3.4.2](https://spark.apache.org/downloads.html) with Hadoop 3.3 (built for Scala 2.12)
  - Extract the downloaded archive. This folder location will be your `$SPARK_HOME`.

Here is an example PySpark application code:
__my_app.py__

```python
import td_pyspark
from pyspark.sql import SparkSession

# Create a new SparkSession
spark = SparkSession\
    .builder\
    .appName("myapp")\
    .getOrCreate()

# Create TDSparkContext
td = td_pyspark.TDSparkContext(spark)

# Read the table data within -1d (yesterday) range as DataFrame
df = td.table("sample_datasets.www_access").within("-1d").df()
df.show()
```

To run `my_app.py` use spark-submit by specifying the necessary files mentioned above:

```bash
# Launching PySpark with the local mode
$ ${SPARK_HOME}/bin/spark-submit --master "local[4]"\
  --driver-class-path td-spark-assembly.jar\
  --properties-file=td-spark.conf\
  --py-files td_pyspark.py\
  my_app.py
```

`local[4]` means running a Spark cluster locally using 4 threads.

To use a remote Spark cluster, specify `master` address, e.g., `--master=spark://(master node IP address):7077`.

### Using td-spark assembly included in the PyPI package.

The package contains pre-built binary of td-spark so that you can add it into the classpath as default.
`TDSparkContextBuilder.default_jar_path()` returns the path to the default td-spark-assembly.jar file.
Passing the path to `jars` method of TDSparkContextBuilder will automatically build the SparkSession including the default jar.

```python
import td_pyspark
from pyspark.sql import SparkSession

builder = SparkSession\
    .builder\
    .appName("td-pyspark-app")

td = td_pyspark.TDSparkContextBuilder(builder)\
    .apikey("XXXXXXXXXXXXXX")\
    .jars(TDSparkContextBuilder.default_jar_path())\
    .build()
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://docs.treasuredata.com/display/public/INT/Data+Science+and+SQL+Tools",
    "name": "td-pyspark",
    "maintainer": null,
    "docs_url": null,
    "requires_python": null,
    "maintainer_email": null,
    "keywords": "Spark PySpark TreasureData",
    "author": "Treasure Data",
    "author_email": "dev+pypi@treasure-data.com",
    "download_url": "https://files.pythonhosted.org/packages/94/62/4428acc986a2aac4c223f4746c19257c54405ce3b363f77989cbdbf43591/td_pyspark-24.4.1.tar.gz",
    "platform": null,
    "description": "# Getting Started: td-pyspark\n\n[Treasure Data](https://treasuredata.com) extension for using [pyspark](https://spark.apache.org/docs/latest/api/python/index.html).\n\n## Installation\n\nYou can install td-pyspark from PyPI by using `pip` as follows:\n\n```sh\n$ pip install td-pyspark\n```\n\nIf you want to install PySpark via PyPI as well, you can install as:\n\n```sh\n$ pip install td-pyspark[spark]\n```\n\n## Introduction\n\nFirst contact [support@treasure-data.com](mailto:support@treasure-data.com) to enable td-spark feature. This feature is disabled by default.\n\ntd-pyspark is a library to enable Python to access tables in Treasure Data.\nThe features of td_pyspark include:\n\n- Reading tables in Treasure Data as DataFrame\n- Writing DataFrames to Treasure Data\n- Submitting Presto queries and read the query results as DataFrames\n\nFor more details, see also [td-spark FAQs](https://docs.treasuredata.com/display/public/PD/Apache+Spark+Driver+%28td-spark%29+FAQs).\n\n### Quick Start with Docker\n\nYou can try td_pyspark using Docker without installing Spark nor Python.\n\nFirst create __td-spark.conf__ file and set your TD API KEY and site (us, jp, eu01, ap02) configurations:\n\n__td-spark.conf__\n```\nspark.td.apikey (Your TD API KEY)\nspark.td.site (Your site: us, jp, eu01, ap02)\nspark.serializer org.apache.spark.serializer.KryoSerializer\nspark.sql.execution.arrow.pyspark.enabled true\n```\n\nLaunch pyspark Docker image. This image already has a pre-installed td_pyspark library:\n\n```shell\n$ docker run -it -e TD_SPARK_CONF=td-spark.conf -v $(pwd):/opt/spark/work devtd/td-spark-pyspark:latest_spark3.1.1\nPython 3.9.2 (default, Feb 19 2021, 17:33:48) \n[GCC 10.2.1 20201203] on linux\nType \"help\", \"copyright\", \"credits\" or \"license\" for more information.\n21/05/10 09:04:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\nUsing Spark's default log4j profile: org/apache/spark/log4j-defaults.properties\nSetting default log level to \"WARN\".\nTo adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\nWelcome to\n      ____              __\n     / __/__  ___ _____/ /__\n    _\\ \\/ _ \\/ _ `/ __/  '_/\n   /__ / .__/\\_,_/_/ /_/\\_\\   version 3.1.1\n      /_/\n\nUsing Python version 3.9.2 (default, Feb 19 2021 17:33:48)\nSparkSession available as 'spark'.\n2021-05-10 09:04:53.268Z debug [spark] Loading com.treasuredata.spark package - (package.scala:23)\n...\n>>>\n```\n\nTry read a sample table by specifying a time range:\n\n```python\n>>> df = td.table(\"sample_datasets.www_access\").within(\"+2d/2014-10-04\").df()\n>>> df.show()\n2021-05-10 09:07:40.233Z  info [PartitionScanner] Fetching the partition list of sample_datasets.www_access within time range:[2014-10-04 00:00:00Z,2014-10-06 00:00:00Z) - (PartitionScanner.scala:29)\n2021-05-10 09:07:42.262Z  info [PartitionScanner] Retrieved 2 partition entries - (PartitionScanner.scala:36)\n+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+\n|user|           host|                path|             referer|code|               agent|size|method|      time|\n+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+\n|null|192.225.229.196|  /category/software|                   -| 200|Mozilla/5.0 (Maci...| 117|   GET|1412382292|\n|null|120.168.215.131|  /category/software|                   -| 200|Mozilla/5.0 (comp...|  53|   GET|1412382284|\n|null|180.198.173.136|/category/electro...| /category/computers| 200|Mozilla/5.0 (Wind...| 106|   GET|1412382275|\n|null| 140.168.145.49|   /item/garden/2832|      /item/toys/230| 200|Mozilla/5.0 (Maci...| 122|   GET|1412382267|\n|null|  52.168.78.222|/category/electro...|    /item/games/2532| 200|Mozilla/5.0 (comp...|  73|   GET|1412382259|\n|null|  32.42.160.165|   /category/cameras|/category/cameras...| 200|Mozilla/5.0 (Wind...| 117|   GET|1412382251|\n|null|   48.204.59.23|  /category/software|/search/?c=Electr...| 200|Mozilla/5.0 (Maci...|  52|   GET|1412382243|\n|null|136.207.150.227|/category/electro...|                   -| 200|Mozilla/5.0 (iPad...| 120|   GET|1412382234|\n|null| 204.21.174.187|   /category/jewelry|   /item/office/3462| 200|Mozilla/5.0 (Wind...|  59|   GET|1412382226|\n|null|  224.198.88.93|    /category/office|     /category/music| 200|Mozilla/4.0 (comp...|  46|   GET|1412382218|\n|null|   96.54.24.116|     /category/games|                   -| 200|Mozilla/5.0 (Wind...|  40|   GET|1412382210|\n|null| 184.42.224.210| /category/computers|                   -| 200|Mozilla/5.0 (Wind...|  95|   GET|1412382201|\n|null|  144.72.47.212|/item/giftcards/4684|    /item/books/1031| 200|Mozilla/5.0 (Wind...|  65|   GET|1412382193|\n|null| 40.213.111.170|     /item/toys/1085|   /category/cameras| 200|Mozilla/5.0 (Wind...|  65|   GET|1412382185|\n|null| 132.54.226.209|/item/electronics...|  /category/software| 200|Mozilla/5.0 (comp...| 121|   GET|1412382177|\n|null|  108.219.68.64|/category/cameras...|                   -| 200|Mozilla/5.0 (Maci...|  54|   GET|1412382168|\n|null| 168.66.149.218| /item/software/4343|  /category/software| 200|Mozilla/4.0 (comp...| 139|   GET|1412382160|\n|null|  80.66.118.103|  /category/software|                   -| 200|Mozilla/4.0 (comp...|  92|   GET|1412382152|\n|null|140.171.147.207|     /category/music|   /category/jewelry| 200|Mozilla/5.0 (Wind...| 119|   GET|1412382144|\n|null| 84.132.164.204| /item/software/4783|/category/electro...| 200|Mozilla/5.0 (Wind...| 137|   GET|1412382135|\n+----+---------------+--------------------+--------------------+----+--------------------+----+------+----------+\nonly showing top 20 rows\n>>>\n```\n\n\n## Usage\n\n_TDSparkContext_ is an entry point to access td_pyspark's functionalities. To create TDSparkContext, pass your SparkSession (spark) to TDSparkContext:\n\n```python\ntd = TDSparkContext(spark)\n```\n\n### Reading Tables as DataFrames\n\nTo read a table, use `td.table(table name)`:\n\n```python\ndf = td.table(\"sample_datasets.www_access\").df()\ndf.show()\n```\n\nTo change the context database, use `td.use(database_name)`:\n\n```python\ntd.use(\"sample_datasets\")\n# Accesses sample_datasets.www_access\ndf = td.table(\"www_access\").df()\n```\n\nBy calling `.df()` your table data will be read as Spark's DataFrame.\nThe usage of the DataFrame is the same with PySpark. See also [PySpark DataFrame documentation](https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame).\n\n#### Specifying Time Ranges\n\nTreasure Data is a time series database, so reading recent data by specifying a time range is important to reduce the amount of data to be processed.\n`.within(...)` function can be used to specify a target time range in a concise syntax.\n`within` function accepts the same syntax used in [TD_INTERVAL function in Presto](https://docs.treasuredata.com/display/public/PD/Supported+Presto+and+TD+Functions#SupportedPrestoandTDFunctions-TD_INTERVAL).\n\nFor example, to read the last 1 hour range of data, use `within(\"-1h\")`:\n\n```python\ntd.table(\"tbl\").within(\"-1h\").df()\n```\n\nYou can also read the last day's data:\n\n```python\ntd.table(\"tbl\").within(\"-1d\").df()\n```\n\nYou can also specify an _offset_ of the relative time range. This example reads the last days's data beginning from 7 days ago:\n\n```python\ntd.table(\"tbl\").within(\"-1d/-7d\").df()\n```\n\nIf you know an exact time range, `within(\"(start time)/(end time)\")` is useful:\n\n```python\n>>> df = td.table(\"sample_datasets.www_access\").within(\"2014-10-04/2014-10-05\").df()\n>>> df.show()\n2021-05-10 09:10:02.366Z  info [PartitionScanner] Fetching the partition list of sample_datasets.www_access within time range:[2014-10-04 00:00:00Z,2014-10-05 00:00:00Z) - (PartitionScanner.scala:29)\n...\n```\n\nSee [this doc](https://docs.treasuredata.com/display/public/PD/Supported+Presto+and+TD+Functions#SupportedPrestoandTDFunctions-TD_INTERVAL) for more examples of interval strings.\n\n\n#### Submitting Presto Queries\n\nIf your Spark cluster is small, reading all of the data as in-memory DataFrame might be difficult.\nIn this case, you can utilize Presto, a distributed SQL query engine, to reduce the amount of data processing with PySpark:\n\n```python\n>>> q = td.presto(\"select code, * from sample_datasets.www_access\")\n>>> q.show()\n2019-06-13 20:09:13.245Z  info [TDPrestoJDBCRDD]  - (TDPrestoRelation.scala:106)\nSubmit Presto query:\nselect code, count(*) cnt from sample_datasets.www_access group by 1\n+----+----+\n|code| cnt|\n+----+----+\n| 200|4981|\n| 500|   2|\n| 404|  17|\n+----+----+\n```\n\nThe query result is represented as a DataFrame.\n\nTo run non query statements (e.g., INSERT INTO, CREATE TABLE, etc.) use `execute_presto(sql)`:\n\n```python\ntd.execute_presto(\"CREATE TABLE IF NOT EXISTS A(time bigint, id varchar)\")\n```\n\n#### Using SparkSQL\n\nTo use tables in Treaure Data inside Spark SQL, create a view with `df.createOrReplaceTempView(...)`:\n\n```python\n# Read TD table as a DataFrame\ndf = td.table(\"mydb.test1\").df()\n# Register the DataFrame as a view\ndf.createOrReplaceTempView(\"test1\")\n\nspark.sql(\"SELECT * FROM test1\").show()\n```\n\n### Create or Drop Databases and Tables\n\nCreate a new table or database:\n\n```python\ntd.create_database_if_not_exists(\"mydb\")\ntd.create_table_if_not_exists(\"mydb.test1\")\n```\n\nDelete unnecessary tables:\n\n```python\ntd.drop_table_if_exists(\"mydb.test1\")\ntd.drop_database_if_exists(\"mydb\")\n```\n\nYou can also check the presence of a table:\n\n```python\ntd.table(\"mydb.test1\").exists() # True if the table exists\n```\n\n### Create User-Defined Partition Tables\n\nUser-defined partitioning ([UDP](https://docs.treasuredata.com/display/public/PD/Defining+Partitioning+for+Presto)) is useful if\nyou know a column in the table that has unique identifiers (e.g., IDs, category values).\n\nYou can create a UDP table partitioned by id (string type column) as follows:\n\n```python\ntd.create_udp_s(\"mydb.user_list\", \"id\")\n```\n\nTo create a UDP table, partitioned by Long (bigint) type column, use `td.create_udp_l`:\n\n```python\ntd.create_udp_l(\"mydb.departments\", \"dept_id\")\n```\n\n### Swapping Table Contents\n\nYou can replace the contents of two tables. The input tables must be in the same database:\n\n```python\n# Swap the contents of two tables\ntd.swap_tables(\"mydb.tbl1\", \"mydb.tbl2\")\n\n# Another way to swap tables\ntd.table(\"mydb.tbl1\").swap_table_with(\"tbl2\")\n```\n\n### Uploading DataFrames to Treasure Data\n\nTo save your local DataFrames as a table, `td.insert_into(df, table)` and `td.create_or_replace(df, table)` can be used:\n\n```python\n# Insert the records in the input DataFrame to the target table:\ntd.insert_into(df, \"mydb.tbl1\")\n\n# Create or replace the target table with the content of the input DataFrame:\ntd.create_or_replace(df, \"mydb.tbl2\")\n```\n\n## Using multiple TD accounts\n\nTo specify a new api key aside from the key that is configured in td-spark.conf, just use `td.with_apikey(apikey)`:\n\n```python\n# Returns a new TDSparkContext with the specified key\ntd2 = td.with_apikey(\"key2\")\n```\n\nFor reading tables or uploading DataFrames with the new key, use `td2`:\n\n```python\n# Read a table with key2\ndf = td2.table(\"sample_datasets.www_access\").df()\n...\n# Insert the records with key2\ntd2.insert_into(df, \"mydb.tbl1\")\n```\n\n### Running PySpark jobs with spark-submit\n\nTo submit your PySpark script to a Spark cluster, you will need the following files:\n\n- __td-spark.conf__ file that describes your TD API key and `spark.td.site` (See above).\n- __td_pyspark.py__\n  - Check the file location using `pip show -f td-pyspark`, and copy td_pyspark.py to your favorite location\n- __td-spark-assembly-latest_xxxx.jar__\n  - Get the latest version from [Download](https://treasure-data.github.io/td-spark/release_notes.html#download) page.\n- Pre-build Spark\n  - [Download Spark 3.4.2](https://spark.apache.org/downloads.html) with Hadoop 3.3 (built for Scala 2.12)\n  - Extract the downloaded archive. This folder location will be your `$SPARK_HOME`.\n\nHere is an example PySpark application code:\n__my_app.py__\n\n```python\nimport td_pyspark\nfrom pyspark.sql import SparkSession\n\n# Create a new SparkSession\nspark = SparkSession\\\n    .builder\\\n    .appName(\"myapp\")\\\n    .getOrCreate()\n\n# Create TDSparkContext\ntd = td_pyspark.TDSparkContext(spark)\n\n# Read the table data within -1d (yesterday) range as DataFrame\ndf = td.table(\"sample_datasets.www_access\").within(\"-1d\").df()\ndf.show()\n```\n\nTo run `my_app.py` use spark-submit by specifying the necessary files mentioned above:\n\n```bash\n# Launching PySpark with the local mode\n$ ${SPARK_HOME}/bin/spark-submit --master \"local[4]\"\\\n  --driver-class-path td-spark-assembly.jar\\\n  --properties-file=td-spark.conf\\\n  --py-files td_pyspark.py\\\n  my_app.py\n```\n\n`local[4]` means running a Spark cluster locally using 4 threads.\n\nTo use a remote Spark cluster, specify `master` address, e.g., `--master=spark://(master node IP address):7077`.\n\n### Using td-spark assembly included in the PyPI package.\n\nThe package contains pre-built binary of td-spark so that you can add it into the classpath as default.\n`TDSparkContextBuilder.default_jar_path()` returns the path to the default td-spark-assembly.jar file.\nPassing the path to `jars` method of TDSparkContextBuilder will automatically build the SparkSession including the default jar.\n\n```python\nimport td_pyspark\nfrom pyspark.sql import SparkSession\n\nbuilder = SparkSession\\\n    .builder\\\n    .appName(\"td-pyspark-app\")\n\ntd = td_pyspark.TDSparkContextBuilder(builder)\\\n    .apikey(\"XXXXXXXXXXXXXX\")\\\n    .jars(TDSparkContextBuilder.default_jar_path())\\\n    .build()\n```\n",
    "bugtrack_url": null,
    "license": "Apache 2",
    "summary": "Treasure Data extension for pyspark",
    "version": "24.4.1",
    "project_urls": {
        "Homepage": "https://docs.treasuredata.com/display/public/INT/Data+Science+and+SQL+Tools"
    },
    "split_keywords": [
        "spark",
        "pyspark",
        "treasuredata"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "c6aaca3356b8f0b64953172fd66b8179258fd8d539151885b7e61fe4ce13554e",
                "md5": "3ad63844d7767c49a8ec1c568e36676a",
                "sha256": "454b9b529fc7caa8c0e4593a3a9dd08b4f92654e062d385a17511b90af3ceb19"
            },
            "downloads": -1,
            "filename": "td_pyspark-24.4.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "3ad63844d7767c49a8ec1c568e36676a",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": null,
            "size": 29562260,
            "upload_time": "2024-04-04T05:04:43",
            "upload_time_iso_8601": "2024-04-04T05:04:43.633812Z",
            "url": "https://files.pythonhosted.org/packages/c6/aa/ca3356b8f0b64953172fd66b8179258fd8d539151885b7e61fe4ce13554e/td_pyspark-24.4.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "94624428acc986a2aac4c223f4746c19257c54405ce3b363f77989cbdbf43591",
                "md5": "9949c977691f6f2447fdbedf30ef497e",
                "sha256": "15883402ecc4a37c858b4b028159728b4dbdb60d2e3efc941fd8b7dcbf7fd598"
            },
            "downloads": -1,
            "filename": "td_pyspark-24.4.1.tar.gz",
            "has_sig": false,
            "md5_digest": "9949c977691f6f2447fdbedf30ef497e",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 29566459,
            "upload_time": "2024-04-04T05:04:47",
            "upload_time_iso_8601": "2024-04-04T05:04:47.009810Z",
            "url": "https://files.pythonhosted.org/packages/94/62/4428acc986a2aac4c223f4746c19257c54405ce3b363f77989cbdbf43591/td_pyspark-24.4.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-04-04 05:04:47",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "td-pyspark"
}
        
Elapsed time: 0.25515s