# Introduction
The `PyWarp` module provides functions which ease the interaction with the [Warp 10](https://warp10.io/) Time Series Platform.
The functions it provides can be used to fetch data from a Warp 10 instance into a [Pandas](https://pandas.pydata.org) [dataframe](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or a [Spark](https://spark.apache.org) [dataframe](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes). A function is also provided for loading data from [HFiles](https://blog.senx.io/introducing-hfiles-cloud-native-infinite-storage-for-time-series-data/) into a Spark dataframe.
A function also allows the conversion of native Warp 10 *wrappers* into a Pandas dataframe.
An `exec` function allows the execution of WarpScript on a Warp 10 instance and the retrieval of the result.
# Installation
```
pip3 install warp10-pywarp
```
# Installation from source
In this folder run the command:
```
pip3 install -e .
```
# Fetching data options
Data points in the Warp 10 platform follow a Geo Time Series data model (geo location information is optional).
The PyWarp library provides various functions to fetch and represent these data points using [Pandas](https://pandas.pydata.org) [dataframes](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html), or [Spark](https://spark.apache.org) [dataframes](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes):
[Pandas](https://pandas.pydata.org) integration:
- **`pywarp.fetch`**: returns a single dataframe where each row represents a single data point.
- **`pywarp.sfetch`**: returns a list of dataframes, with each dataframe representing a distinct (geo) time series.
- **`pywarp.ffetch`**: returns a single dataframe, resulting from the fusion of multiple (geo) time series dataframes.
[Spark](https://spark.apache.org) integration:
- **`pywarp.spark.fetch`**: reads wrappers directly from a Warp 10 instance and loads them into a Spark dataframe.
- **`pywarp.spark.hfileread`**: reads data from HFiles and loads the extracted wrappers into a Spark dataframe.
- **`pywarp.spark.wrappers2df`**: converts a dataframe containing wrappers into a dataframe of data points.
[WarpScript](https://warp10.io/content/03_Documentation/04_WarpScript) integration:
- **`pywarp.exec`**: outputs the parsed JSON result of a WarpScript query.
A notebook example for each dataframe schema option is provided in `test/`.
# Data Frame Schemas
### 1. Data Point Stream Data Frame
Returned by `pywarp.fetch` and `pywarp.spark.wrappers2df`, this format streams data points within a single Pandas dataframe, where each row represents a distinct data point.
| Column Name | Data Type | Description | Optional |
|------------|-----------|-------------|----------|
| classname | str | Classname of the series the data point belongs to | No |
| labels | dict | Labels of the series the data point belongs to | No |
| attributes | dict | Attributes of the series the data point belongs to | No |
| ts | int | Timestamp of the data point in time units since Epoch | No |
| lat | float | Latitude of the data point | No |
| lon | float | Longitude of the data point | No |
| elev | int | Elevation of the data point | No |
| l_value | int | `LONG` value of the data point |No |
| d_value | float | `DOUBLE` value of the data point | No |
| b_value | bool | `BOOLEAN` value of data point | No |
| s_value | str | `STRING` value of data point | No |
| bin_value | binary | `BYTES` value of data point | No |
### 2. GTS Data Frame List
Returned by `pywarp.sfetch`, this format gives a list of individual Pandas dataframes, each representing a unique Geo Time Series.
| Column Name | Data Type | Description | Optional |
|------------|-----------|-------------|----------|
| ts or *index* | int | Timestamp in time units since Epoch | No |
| lat | float | Latitude | Yes |
| lon | float | Longitude | Yes |
| elev | int | Elevation | Yes |
| `<classname>` | various | Value | No |
Each DataFrame's `.attrs` dict contains:
- **warp10classname**: Classname of the Geo Time Series (str).
- **warp10labels**: Labels associated with the time series (dict).
- **warp10attributes**: Attributes of the time series (dict).
### 3. Fused GTS Data Frames
Returned by `pywarp.ffetch`, this format amalgamates data from all fetched Geo Time Series into columns of a single Pandas dataframe.
| Column Name/Prefix | Data Type | Description | Optional |
|-------------------------|-----------|-----------------------------------------|----------|
| *index* | int | Timestamp in time units since Epoch | No |
| l:`<label key>` | str | One column for each unique label key | Yes |
| a:`<attribute key>` | str | One column for each unique attribute key | Yes |
| lat:`<classname>` | float | Latitude, one column for each unique classname | Yes |
| lon:`<classname>` | float | Longitude, one column for each unique classname | Yes |
| elev:`<classname>` | int | Elevation, one column for each unique classname | Yes |
| val:`<classname>` | various | Value, one column for each unique classname | No |
### 4. WarpScript JSON Output
`pywarp.exec` returns the parsed JSON output of a WarpScript query obtained against the Warp 10 `/exec` endpoint.
This is the most flexible way to retrieve data in a customizable format.
# Examples
## Reading data from a Warp 10 instance
See also: [notebook examples](https://github.com/senx/pywarp/blob/master/test/).
```
import pywarp
df = pywarp.fetch('https://HOST:PORT/api/v0/fetch', 'TOKEN', 'SELECTOR{}', 'now', -100)
# Or using another dataframe schema:
# df = pywarp.sfetch('https://HOST:PORT/api/v0/fetch', 'TOKEN', 'SELECTOR{}', 'now', -100, indexedByTimestamp=True)
# df = pywarp.ffetch('https://HOST:PORT/api/v0/fetch', 'TOKEN', 'SELECTOR{}', 'now', -100, indexedByTimestamp=True)
print(df)
```
## Reading data from a Warp 10 instance via Spark
```
import pywarp.spark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
builder = SparkSession.builder.appName("PyWarp Test")
spark = builder.getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
df = pywarp.spark.fetch(sc, 'https://HOST:PORT/api/v0/fetch', 'TOKEN', 'SELECTOR{}', 'now', -1440)
df = pywarp.spark.wrapper2df(sc, df, 'wrapper')
df.show()
```
Spark jobs making use of the HFStore extension must be launched using:
```
spark-submit --packages io.warp10:warp10-spark:3.0.2,io.senx:warp10-ext-hfstore:2.0.0 \
--repositories https://maven.senx.io/repository/senx-public \
--properties-file spark.conf \
--files warp10.conf
```
where `spark.conf` contains the following definitions:
```
##
## Executor specific options
##
spark.executor.extraJavaOptions=-Dwarp10.config=warp10.conf -Ddisable.logging=true
##
## Driver specific options
##
spark.driver.extraJavaOptions=-Dwarp10.config=warp10.conf -Ddisable.logging=true
```
and the `warp10.conf` file contains *a minima*:
```
##
## Use microseconds as the time unit
##
warp.timeunits=us
##
## Load the Spark extension
##
warpscript.extensions=io.warp10.spark.SparkWarpScriptExtension
##
## Load the Debug extension so STDOUT is available
##
warpscript.extension.debug=io.warp10.script.ext.debug.DebugWarpScriptExtension
```
Alternatively if you do not want to use `spark-submit`, you can add the following in your script between the line `builder = ....` and `spark = builder.getOrCreate()`
```
conf = {}
conf['spark.master'] = 'local'
conf['spark.submit.deployMode'] = 'client'
conf['spark.executor.instances'] = '1'
conf['spark.executor.cores'] = '2'
conf['spark.driver.memory'] = '1g'
conf['spark.executor.memory'] = '1g'
conf['spark.executor.extraJavaOptions'] = '-Dwarp10.config=warp10.conf -Ddisable.logging=true'
conf['spark.driver.extraJavaOptions'] = '-Dwarp10.config=warp10.conf -Ddisable.logging=true'
conf['spark.driver.bindAddress'] = '0.0.0.0'
conf['spark.jars.packages'] = 'io.warp10:warp10-spark:3.0.2,io.senx:warp10-ext-hfstore:2.0.0'
conf['spark.jars.repositories'] = 'https://maven.senx.io/senx-public'
conf['spark.files'] = 'warp10.conf'
for (k,v) in conf.items():
builder = builder.config(key=k,value=v)
```
and simply launch it using `python3`.
## Reading data from HFiles in Spark
```
import pywarp.spark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
spark = SparkSession.builder.appName("PyWarp Test").getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)
df = pywarp.spark.hfileread(sc, '/path/to/file.hfile', selector='SELECTOR{}', end=1081244481160.000, start=1081244472361.000)
df = pywarp.spark.wrapper2df(sc, df, 'wrapper')
df.show(n=1000,truncate=False)
```
## Executing WarpScript on a Warp 10 instance
```
import pywarp
x = pywarp.exec('https://sandbox.senx.io/api/v0/exec',
"""
REV REV REV "UTF-8" ->BYTES 42 42.0 F 6 ->LIST
#->PICKLE ->B64
""",
False # Set to true if your code returns base64 encoded pickled content (decomment the line that uses ->PICKLE above)
)
print(x)
```
## Executing WarpScript in Spark
```
import pywarp
spark = SparkSession.builder.appName("PyWarp Test").getOrCreate()
sc = spark.sparkContext
df = ....
# Register a function 'foo' which returns a STRING and takes 2 parameters
pywarp.spark.register(df.sql_ctx, 'foo', 2, '')
# Create a temp view
df.createOrReplaceTempView('DF')
# Call WarpScript which converts column _1 to a STRING and returns it
df = df.sql_ctx.sql("SELECT foo(' TOSTRING', _1) AS str FROM DF");
df.show()
```
Raw data
{
"_id": null,
"home_page": "http://github.com/senx/pywarp",
"name": "warp10-pywarp",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "time series Warp 10",
"author": "SenX",
"author_email": "contact@senx.io",
"download_url": "https://files.pythonhosted.org/packages/4a/58/399bff70063f66f6d1d89f3ad04b91455414c014b6b3db1c3d922c7da001/warp10-pywarp-1.0.1.tar.gz",
"platform": null,
"description": "# Introduction\n\nThe `PyWarp` module provides functions which ease the interaction with the [Warp 10](https://warp10.io/) Time Series Platform.\n\nThe functions it provides can be used to fetch data from a Warp 10 instance into a [Pandas](https://pandas.pydata.org) [dataframe](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or a [Spark](https://spark.apache.org) [dataframe](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes). A function is also provided for loading data from [HFiles](https://blog.senx.io/introducing-hfiles-cloud-native-infinite-storage-for-time-series-data/) into a Spark dataframe.\n\nA function also allows the conversion of native Warp 10 *wrappers* into a Pandas dataframe.\n\nAn `exec` function allows the execution of WarpScript on a Warp 10 instance and the retrieval of the result.\n\n# Installation\n\n```\npip3 install warp10-pywarp\n```\n\n\n# Installation from source\n\nIn this folder run the command:\n```\npip3 install -e .\n```\n\n# Fetching data options\n\nData points in the Warp 10 platform follow a Geo Time Series data model (geo location information is optional).\n\nThe PyWarp library provides various functions to fetch and represent these data points using [Pandas](https://pandas.pydata.org) [dataframes](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html), or [Spark](https://spark.apache.org) [dataframes](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes):\n\n[Pandas](https://pandas.pydata.org) integration:\n- **`pywarp.fetch`**: returns a single dataframe where each row represents a single data point.\n- **`pywarp.sfetch`**: returns a list of dataframes, with each dataframe representing a distinct (geo) time series.\n- **`pywarp.ffetch`**: returns a single dataframe, resulting from the fusion of multiple (geo) time series dataframes.\n\n[Spark](https://spark.apache.org) integration:\n- **`pywarp.spark.fetch`**: reads wrappers directly from a Warp 10 instance and loads them into a Spark dataframe.\n- **`pywarp.spark.hfileread`**: reads data from HFiles and loads the extracted wrappers into a Spark dataframe.\n- **`pywarp.spark.wrappers2df`**: converts a dataframe containing wrappers into a dataframe of data points.\n\n[WarpScript](https://warp10.io/content/03_Documentation/04_WarpScript) integration:\n- **`pywarp.exec`**: outputs the parsed JSON result of a WarpScript query.\n\nA notebook example for each dataframe schema option is provided in `test/`.\n\n# Data Frame Schemas\n\n### 1. Data Point Stream Data Frame\n\nReturned by `pywarp.fetch` and `pywarp.spark.wrappers2df`, this format streams data points within a single Pandas dataframe, where each row represents a distinct data point.\n\n| Column Name | Data Type | Description | Optional |\n|------------|-----------|-------------|----------|\n| classname | str | Classname of the series the data point belongs to | No |\n| labels | dict | Labels of the series the data point belongs to | No |\n| attributes | dict | Attributes of the series the data point belongs to | No |\n| ts | int | Timestamp of the data point in time units since Epoch | No |\n| lat | float | Latitude of the data point | No |\n| lon | float | Longitude of the data point | No |\n| elev | int | Elevation of the data point | No |\n| l_value | int | `LONG` value of the data point |No |\n| d_value | float | `DOUBLE` value of the data point | No |\n| b_value | bool | `BOOLEAN` value of data point | No |\n| s_value | str | `STRING` value of data point | No |\n| bin_value | binary | `BYTES` value of data point | No |\n\n### 2. GTS Data Frame List\n\nReturned by `pywarp.sfetch`, this format gives a list of individual Pandas dataframes, each representing a unique Geo Time Series.\n\n| Column Name | Data Type | Description | Optional |\n|------------|-----------|-------------|----------|\n| ts or *index* | int | Timestamp in time units since Epoch | No |\n| lat | float | Latitude | Yes |\n| lon | float | Longitude | Yes |\n| elev | int | Elevation | Yes |\n| `<classname>` | various | Value | No |\n\nEach DataFrame's `.attrs` dict contains:\n - **warp10classname**: Classname of the Geo Time Series (str).\n - **warp10labels**: Labels associated with the time series (dict).\n - **warp10attributes**: Attributes of the time series (dict).\n\n### 3. Fused GTS Data Frames\n\nReturned by `pywarp.ffetch`, this format amalgamates data from all fetched Geo Time Series into columns of a single Pandas dataframe.\n\n| Column Name/Prefix | Data Type | Description | Optional |\n|-------------------------|-----------|-----------------------------------------|----------|\n| *index* | int | Timestamp in time units since Epoch | No |\n| l:`<label key>` | str | One column for each unique label key | Yes |\n| a:`<attribute key>` | str | One column for each unique attribute key | Yes |\n| lat:`<classname>` | float | Latitude, one column for each unique classname | Yes |\n| lon:`<classname>` | float | Longitude, one column for each unique classname | Yes |\n| elev:`<classname>` | int | Elevation, one column for each unique classname | Yes |\n| val:`<classname>` | various | Value, one column for each unique classname | No |\n\n### 4. WarpScript JSON Output\n\n`pywarp.exec` returns the parsed JSON output of a WarpScript query obtained against the Warp 10 `/exec` endpoint.\n\nThis is the most flexible way to retrieve data in a customizable format.\n\n# Examples\n\n## Reading data from a Warp 10 instance\n\nSee also: [notebook examples](https://github.com/senx/pywarp/blob/master/test/).\n\n```\nimport pywarp\n\ndf = pywarp.fetch('https://HOST:PORT/api/v0/fetch', 'TOKEN', 'SELECTOR{}', 'now', -100)\n\n# Or using another dataframe schema:\n# df = pywarp.sfetch('https://HOST:PORT/api/v0/fetch', 'TOKEN', 'SELECTOR{}', 'now', -100, indexedByTimestamp=True)\n# df = pywarp.ffetch('https://HOST:PORT/api/v0/fetch', 'TOKEN', 'SELECTOR{}', 'now', -100, indexedByTimestamp=True)\n\nprint(df)\n```\n\n## Reading data from a Warp 10 instance via Spark\n\n```\nimport pywarp.spark\n\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql import SQLContext\n\nbuilder = SparkSession.builder.appName(\"PyWarp Test\")\n\nspark = builder.getOrCreate()\nsc = spark.sparkContext\n\nsqlContext = SQLContext(sc)\n\ndf = pywarp.spark.fetch(sc, 'https://HOST:PORT/api/v0/fetch', 'TOKEN', 'SELECTOR{}', 'now', -1440)\ndf = pywarp.spark.wrapper2df(sc, df, 'wrapper')\ndf.show()\n```\n\nSpark jobs making use of the HFStore extension must be launched using:\n\n```\nspark-submit --packages io.warp10:warp10-spark:3.0.2,io.senx:warp10-ext-hfstore:2.0.0 \\\n --repositories https://maven.senx.io/repository/senx-public \\\n --properties-file spark.conf \\\n --files warp10.conf\n```\n\nwhere `spark.conf` contains the following definitions:\n\n```\n##\n## Executor specific options\n##\n\nspark.executor.extraJavaOptions=-Dwarp10.config=warp10.conf -Ddisable.logging=true \n\n##\n## Driver specific options\n##\n\nspark.driver.extraJavaOptions=-Dwarp10.config=warp10.conf -Ddisable.logging=true \n```\n\nand the `warp10.conf` file contains *a minima*:\n\n```\n##\n## Use microseconds as the time unit\n##\nwarp.timeunits=us\n\n##\n## Load the Spark extension\n##\nwarpscript.extensions=io.warp10.spark.SparkWarpScriptExtension\n\n##\n## Load the Debug extension so STDOUT is available\n##\nwarpscript.extension.debug=io.warp10.script.ext.debug.DebugWarpScriptExtension\n```\n\nAlternatively if you do not want to use `spark-submit`, you can add the following in your script between the line `builder = ....` and `spark = builder.getOrCreate()`\n\n```\nconf = {}\nconf['spark.master'] = 'local'\nconf['spark.submit.deployMode'] = 'client'\nconf['spark.executor.instances'] = '1'\nconf['spark.executor.cores'] = '2'\nconf['spark.driver.memory'] = '1g'\nconf['spark.executor.memory'] = '1g'\nconf['spark.executor.extraJavaOptions'] = '-Dwarp10.config=warp10.conf -Ddisable.logging=true'\nconf['spark.driver.extraJavaOptions'] = '-Dwarp10.config=warp10.conf -Ddisable.logging=true'\nconf['spark.driver.bindAddress'] = '0.0.0.0'\nconf['spark.jars.packages'] = 'io.warp10:warp10-spark:3.0.2,io.senx:warp10-ext-hfstore:2.0.0'\nconf['spark.jars.repositories'] = 'https://maven.senx.io/senx-public'\nconf['spark.files'] = 'warp10.conf'\n\nfor (k,v) in conf.items():\n builder = builder.config(key=k,value=v)\n```\n\nand simply launch it using `python3`.\n\n## Reading data from HFiles in Spark\n\n```\nimport pywarp.spark\n\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql import SQLContext\n\nspark = SparkSession.builder.appName(\"PyWarp Test\").getOrCreate()\nsc = spark.sparkContext\n\nsqlContext = SQLContext(sc)\n\ndf = pywarp.spark.hfileread(sc, '/path/to/file.hfile', selector='SELECTOR{}', end=1081244481160.000, start=1081244472361.000)\n\ndf = pywarp.spark.wrapper2df(sc, df, 'wrapper')\ndf.show(n=1000,truncate=False)\n```\n\n## Executing WarpScript on a Warp 10 instance\n\n```\nimport pywarp\n\nx = pywarp.exec('https://sandbox.senx.io/api/v0/exec',\n\"\"\"\nREV REV REV \"UTF-8\" ->BYTES 42 42.0 F 6 ->LIST\n#->PICKLE ->B64\n\"\"\",\nFalse # Set to true if your code returns base64 encoded pickled content (decomment the line that uses ->PICKLE above)\n)\n\nprint(x)\n```\n## Executing WarpScript in Spark\n\n```\nimport pywarp\n\nspark = SparkSession.builder.appName(\"PyWarp Test\").getOrCreate()\nsc = spark.sparkContext\n\ndf = ....\n\n# Register a function 'foo' which returns a STRING and takes 2 parameters\npywarp.spark.register(df.sql_ctx, 'foo', 2, '')\n\n# Create a temp view\ndf.createOrReplaceTempView('DF')\n# Call WarpScript which converts column _1 to a STRING and returns it\ndf = df.sql_ctx.sql(\"SELECT foo(' TOSTRING', _1) AS str FROM DF\");\ndf.show()\n```",
"bugtrack_url": null,
"license": "Apache 2.0",
"summary": "A set of functions to ease working with Warp 10",
"version": "1.0.1",
"project_urls": {
"Homepage": "http://github.com/senx/pywarp"
},
"split_keywords": [
"time",
"series",
"warp",
"10"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "4a58399bff70063f66f6d1d89f3ad04b91455414c014b6b3db1c3d922c7da001",
"md5": "a9b896cbd827e07bf5574128c4fd1e87",
"sha256": "26162f2e7e9aaaf241e58bb9ebb19006af8f0ae0ce453869b7f20234f9c93460"
},
"downloads": -1,
"filename": "warp10-pywarp-1.0.1.tar.gz",
"has_sig": false,
"md5_digest": "a9b896cbd827e07bf5574128c4fd1e87",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 16368,
"upload_time": "2024-06-25T10:11:55",
"upload_time_iso_8601": "2024-06-25T10:11:55.408029Z",
"url": "https://files.pythonhosted.org/packages/4a/58/399bff70063f66f6d1d89f3ad04b91455414c014b6b3db1c3d922c7da001/warp10-pywarp-1.0.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-06-25 10:11:55",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "senx",
"github_project": "pywarp",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"requirements": [],
"lcname": "warp10-pywarp"
}