warp10-pywarp


Namewarp10-pywarp JSON
Version 1.0.1 PyPI version JSON
download
home_pagehttp://github.com/senx/pywarp
SummaryA set of functions to ease working with Warp 10
upload_time2024-06-25 10:11:55
maintainerNone
docs_urlNone
authorSenX
requires_python>=3.8
licenseApache 2.0
keywords time series warp 10
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Introduction

The `PyWarp` module provides functions which ease the interaction with the [Warp 10](https://warp10.io/) Time Series Platform.

The functions it provides can be used to fetch data from a Warp 10 instance into a [Pandas](https://pandas.pydata.org) [dataframe](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or a [Spark](https://spark.apache.org) [dataframe](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes). A function is also provided for loading data from [HFiles](https://blog.senx.io/introducing-hfiles-cloud-native-infinite-storage-for-time-series-data/) into a Spark dataframe.

A function also allows the conversion of native Warp 10 *wrappers* into a Pandas dataframe.

An `exec` function allows the execution of WarpScript on a Warp 10 instance and the retrieval of the result.

# Installation

```
pip3 install warp10-pywarp
```


# Installation from source

In this folder run the command:
```
pip3 install -e .
```

# Fetching data options

Data points in the Warp 10 platform follow a Geo Time Series data model (geo location information is optional).

The PyWarp library provides various functions to fetch and represent these data points using [Pandas](https://pandas.pydata.org) [dataframes](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html), or [Spark](https://spark.apache.org) [dataframes](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes):

[Pandas](https://pandas.pydata.org) integration:
- **`pywarp.fetch`**: returns a single dataframe where each row represents a single data point.
- **`pywarp.sfetch`**: returns a list of dataframes, with each dataframe representing a distinct (geo) time series.
- **`pywarp.ffetch`**: returns a single dataframe, resulting from the fusion of multiple (geo) time series dataframes.

[Spark](https://spark.apache.org) integration:
- **`pywarp.spark.fetch`**: reads wrappers directly from a Warp 10 instance and loads them into a Spark dataframe.
- **`pywarp.spark.hfileread`**: reads data from HFiles and loads the extracted wrappers into a Spark dataframe.
- **`pywarp.spark.wrappers2df`**: converts a dataframe containing wrappers into a dataframe of data points.

[WarpScript](https://warp10.io/content/03_Documentation/04_WarpScript) integration:
- **`pywarp.exec`**: outputs the parsed JSON result of a WarpScript query.

A notebook example for each dataframe schema option is provided in `test/`.

# Data Frame Schemas

### 1. Data Point Stream Data Frame

Returned by `pywarp.fetch` and `pywarp.spark.wrappers2df`, this format streams data points within a single Pandas dataframe, where each row represents a distinct data point.

| Column Name | Data Type | Description | Optional |
|------------|-----------|-------------|----------|
| classname  | str       | Classname of the series the data point belongs to | No       |
| labels     | dict      | Labels of the series the data point belongs to | No       |
| attributes | dict      | Attributes of the series the data point belongs to | No       |
| ts         | int       | Timestamp of the data point in time units since Epoch | No       |
| lat        | float     | Latitude of the data point | No       |
| lon        | float     | Longitude of the data point | No       |
| elev       | int       | Elevation of the data point | No       |
| l_value    | int       | `LONG` value of the data point |No       |
| d_value    | float     | `DOUBLE` value of the data point | No       |
| b_value    | bool      | `BOOLEAN` value of data point | No       |
| s_value    | str       | `STRING` value of data point | No       |
| bin_value  | binary    | `BYTES` value of data point | No       |

### 2. GTS Data Frame List

Returned by `pywarp.sfetch`, this format gives a list of individual Pandas dataframes, each representing a unique Geo Time Series.

| Column Name | Data Type | Description | Optional |
|------------|-----------|-------------|----------|
| ts or *index* | int       | Timestamp in time units since Epoch | No       |
| lat        | float     | Latitude    | Yes      |
| lon        | float     | Longitude   | Yes      |
| elev       | int       | Elevation   | Yes      |
| `<classname>` | various   | Value       | No       |

Each DataFrame's `.attrs` dict contains:
  - **warp10classname**: Classname of the Geo Time Series (str).
  - **warp10labels**: Labels associated with the time series (dict).
  - **warp10attributes**: Attributes of the time series (dict).

### 3. Fused GTS Data Frames

Returned by `pywarp.ffetch`, this format amalgamates data from all fetched Geo Time Series into columns of a single Pandas dataframe.

| Column Name/Prefix      | Data Type | Description                             | Optional |
|-------------------------|-----------|-----------------------------------------|----------|
| *index*                 | int       | Timestamp in time units since Epoch      | No       |
| l:`<label key>`         | str       | One column for each unique label key     | Yes      |
| a:`<attribute key>`     | str       | One column for each unique attribute key | Yes      |
| lat:`<classname>`       | float     | Latitude, one column for each unique classname  | Yes      |
| lon:`<classname>`       | float     | Longitude, one column for each unique classname | Yes      |
| elev:`<classname>`      | int       | Elevation, one column for each unique classname | Yes      |
| val:`<classname>`       | various   | Value, one column for each unique classname     | No        |

### 4. WarpScript JSON Output

`pywarp.exec` returns the parsed JSON output of a WarpScript query obtained against the Warp 10 `/exec` endpoint.

This is the most flexible way to retrieve data in a customizable format.

# Examples

## Reading data from a Warp 10 instance

See also: [notebook examples](https://github.com/senx/pywarp/blob/master/test/).

```
import pywarp

df = pywarp.fetch('https://HOST:PORT/api/v0/fetch', 'TOKEN', 'SELECTOR{}', 'now', -100)

# Or using another dataframe schema:
# df = pywarp.sfetch('https://HOST:PORT/api/v0/fetch', 'TOKEN', 'SELECTOR{}', 'now', -100, indexedByTimestamp=True)
# df = pywarp.ffetch('https://HOST:PORT/api/v0/fetch', 'TOKEN', 'SELECTOR{}', 'now', -100, indexedByTimestamp=True)

print(df)
```

## Reading data from a Warp 10 instance via Spark

```
import pywarp.spark

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

builder = SparkSession.builder.appName("PyWarp Test")

spark = builder.getOrCreate()
sc = spark.sparkContext

sqlContext = SQLContext(sc)

df = pywarp.spark.fetch(sc, 'https://HOST:PORT/api/v0/fetch', 'TOKEN', 'SELECTOR{}', 'now', -1440)
df = pywarp.spark.wrapper2df(sc, df, 'wrapper')
df.show()
```

Spark jobs making use of the HFStore extension must be launched using:

```
spark-submit --packages io.warp10:warp10-spark:3.0.2,io.senx:warp10-ext-hfstore:2.0.0 \
  --repositories https://maven.senx.io/repository/senx-public \
  --properties-file spark.conf \
  --files warp10.conf
```

where `spark.conf` contains the following definitions:

```
##
## Executor specific options
##

spark.executor.extraJavaOptions=-Dwarp10.config=warp10.conf -Ddisable.logging=true 

##
## Driver specific options
##

spark.driver.extraJavaOptions=-Dwarp10.config=warp10.conf -Ddisable.logging=true 
```

and the `warp10.conf` file contains *a minima*:

```
##
## Use microseconds as the time unit
##
warp.timeunits=us

##
## Load the Spark extension
##
warpscript.extensions=io.warp10.spark.SparkWarpScriptExtension

##
## Load the Debug extension so STDOUT is available
##
warpscript.extension.debug=io.warp10.script.ext.debug.DebugWarpScriptExtension
```

Alternatively if you do not want to use `spark-submit`, you can add the following in your script between the line `builder = ....` and `spark = builder.getOrCreate()`

```
conf = {}
conf['spark.master'] = 'local'
conf['spark.submit.deployMode'] = 'client'
conf['spark.executor.instances'] = '1'
conf['spark.executor.cores'] = '2'
conf['spark.driver.memory'] = '1g'
conf['spark.executor.memory'] = '1g'
conf['spark.executor.extraJavaOptions'] = '-Dwarp10.config=warp10.conf -Ddisable.logging=true'
conf['spark.driver.extraJavaOptions'] = '-Dwarp10.config=warp10.conf -Ddisable.logging=true'
conf['spark.driver.bindAddress'] = '0.0.0.0'
conf['spark.jars.packages'] = 'io.warp10:warp10-spark:3.0.2,io.senx:warp10-ext-hfstore:2.0.0'
conf['spark.jars.repositories'] = 'https://maven.senx.io/senx-public'
conf['spark.files'] = 'warp10.conf'

for (k,v) in conf.items():
  builder = builder.config(key=k,value=v)
```

and simply launch it using `python3`.

## Reading data from HFiles in Spark

```
import pywarp.spark

from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

spark = SparkSession.builder.appName("PyWarp Test").getOrCreate()
sc = spark.sparkContext

sqlContext = SQLContext(sc)

df = pywarp.spark.hfileread(sc, '/path/to/file.hfile', selector='SELECTOR{}', end=1081244481160.000, start=1081244472361.000)

df = pywarp.spark.wrapper2df(sc, df, 'wrapper')
df.show(n=1000,truncate=False)
```

## Executing WarpScript on a Warp 10 instance

```
import pywarp

x = pywarp.exec('https://sandbox.senx.io/api/v0/exec',
"""
REV REV REV "UTF-8" ->BYTES 42 42.0 F 6 ->LIST
#->PICKLE ->B64
""",
False # Set to true if your code returns base64 encoded pickled content (decomment the line that uses ->PICKLE above)
)

print(x)
```
## Executing WarpScript in Spark

```
import pywarp

spark = SparkSession.builder.appName("PyWarp Test").getOrCreate()
sc = spark.sparkContext

df = ....

# Register a function 'foo' which returns a STRING and takes 2 parameters
pywarp.spark.register(df.sql_ctx, 'foo', 2, '')

# Create a temp view
df.createOrReplaceTempView('DF')
# Call WarpScript which converts column _1 to a STRING and returns it
df = df.sql_ctx.sql("SELECT foo(' TOSTRING', _1) AS str FROM DF");
df.show()
```
            

Raw data

            {
    "_id": null,
    "home_page": "http://github.com/senx/pywarp",
    "name": "warp10-pywarp",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": "time series Warp 10",
    "author": "SenX",
    "author_email": "contact@senx.io",
    "download_url": "https://files.pythonhosted.org/packages/4a/58/399bff70063f66f6d1d89f3ad04b91455414c014b6b3db1c3d922c7da001/warp10-pywarp-1.0.1.tar.gz",
    "platform": null,
    "description": "# Introduction\n\nThe `PyWarp` module provides functions which ease the interaction with the [Warp 10](https://warp10.io/) Time Series Platform.\n\nThe functions it provides can be used to fetch data from a Warp 10 instance into a [Pandas](https://pandas.pydata.org) [dataframe](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html) or a [Spark](https://spark.apache.org) [dataframe](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes). A function is also provided for loading data from [HFiles](https://blog.senx.io/introducing-hfiles-cloud-native-infinite-storage-for-time-series-data/) into a Spark dataframe.\n\nA function also allows the conversion of native Warp 10 *wrappers* into a Pandas dataframe.\n\nAn `exec` function allows the execution of WarpScript on a Warp 10 instance and the retrieval of the result.\n\n# Installation\n\n```\npip3 install warp10-pywarp\n```\n\n\n# Installation from source\n\nIn this folder run the command:\n```\npip3 install -e .\n```\n\n# Fetching data options\n\nData points in the Warp 10 platform follow a Geo Time Series data model (geo location information is optional).\n\nThe PyWarp library provides various functions to fetch and represent these data points using [Pandas](https://pandas.pydata.org) [dataframes](https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html), or [Spark](https://spark.apache.org) [dataframes](https://spark.apache.org/docs/latest/sql-programming-guide.html#datasets-and-dataframes):\n\n[Pandas](https://pandas.pydata.org) integration:\n- **`pywarp.fetch`**: returns a single dataframe where each row represents a single data point.\n- **`pywarp.sfetch`**: returns a list of dataframes, with each dataframe representing a distinct (geo) time series.\n- **`pywarp.ffetch`**: returns a single dataframe, resulting from the fusion of multiple (geo) time series dataframes.\n\n[Spark](https://spark.apache.org) integration:\n- **`pywarp.spark.fetch`**: reads wrappers directly from a Warp 10 instance and loads them into a Spark dataframe.\n- **`pywarp.spark.hfileread`**: reads data from HFiles and loads the extracted wrappers into a Spark dataframe.\n- **`pywarp.spark.wrappers2df`**: converts a dataframe containing wrappers into a dataframe of data points.\n\n[WarpScript](https://warp10.io/content/03_Documentation/04_WarpScript) integration:\n- **`pywarp.exec`**: outputs the parsed JSON result of a WarpScript query.\n\nA notebook example for each dataframe schema option is provided in `test/`.\n\n# Data Frame Schemas\n\n### 1. Data Point Stream Data Frame\n\nReturned by `pywarp.fetch` and `pywarp.spark.wrappers2df`, this format streams data points within a single Pandas dataframe, where each row represents a distinct data point.\n\n| Column Name | Data Type | Description | Optional |\n|------------|-----------|-------------|----------|\n| classname  | str       | Classname of the series the data point belongs to | No       |\n| labels     | dict      | Labels of the series the data point belongs to | No       |\n| attributes | dict      | Attributes of the series the data point belongs to | No       |\n| ts         | int       | Timestamp of the data point in time units since Epoch | No       |\n| lat        | float     | Latitude of the data point | No       |\n| lon        | float     | Longitude of the data point | No       |\n| elev       | int       | Elevation of the data point | No       |\n| l_value    | int       | `LONG` value of the data point |No       |\n| d_value    | float     | `DOUBLE` value of the data point | No       |\n| b_value    | bool      | `BOOLEAN` value of data point | No       |\n| s_value    | str       | `STRING` value of data point | No       |\n| bin_value  | binary    | `BYTES` value of data point | No       |\n\n### 2. GTS Data Frame List\n\nReturned by `pywarp.sfetch`, this format gives a list of individual Pandas dataframes, each representing a unique Geo Time Series.\n\n| Column Name | Data Type | Description | Optional |\n|------------|-----------|-------------|----------|\n| ts or *index* | int       | Timestamp in time units since Epoch | No       |\n| lat        | float     | Latitude    | Yes      |\n| lon        | float     | Longitude   | Yes      |\n| elev       | int       | Elevation   | Yes      |\n| `<classname>` | various   | Value       | No       |\n\nEach DataFrame's `.attrs` dict contains:\n  - **warp10classname**: Classname of the Geo Time Series (str).\n  - **warp10labels**: Labels associated with the time series (dict).\n  - **warp10attributes**: Attributes of the time series (dict).\n\n### 3. Fused GTS Data Frames\n\nReturned by `pywarp.ffetch`, this format amalgamates data from all fetched Geo Time Series into columns of a single Pandas dataframe.\n\n| Column Name/Prefix      | Data Type | Description                             | Optional |\n|-------------------------|-----------|-----------------------------------------|----------|\n| *index*                 | int       | Timestamp in time units since Epoch      | No       |\n| l:`<label key>`         | str       | One column for each unique label key     | Yes      |\n| a:`<attribute key>`     | str       | One column for each unique attribute key | Yes      |\n| lat:`<classname>`       | float     | Latitude, one column for each unique classname  | Yes      |\n| lon:`<classname>`       | float     | Longitude, one column for each unique classname | Yes      |\n| elev:`<classname>`      | int       | Elevation, one column for each unique classname | Yes      |\n| val:`<classname>`       | various   | Value, one column for each unique classname     | No        |\n\n### 4. WarpScript JSON Output\n\n`pywarp.exec` returns the parsed JSON output of a WarpScript query obtained against the Warp 10 `/exec` endpoint.\n\nThis is the most flexible way to retrieve data in a customizable format.\n\n# Examples\n\n## Reading data from a Warp 10 instance\n\nSee also: [notebook examples](https://github.com/senx/pywarp/blob/master/test/).\n\n```\nimport pywarp\n\ndf = pywarp.fetch('https://HOST:PORT/api/v0/fetch', 'TOKEN', 'SELECTOR{}', 'now', -100)\n\n# Or using another dataframe schema:\n# df = pywarp.sfetch('https://HOST:PORT/api/v0/fetch', 'TOKEN', 'SELECTOR{}', 'now', -100, indexedByTimestamp=True)\n# df = pywarp.ffetch('https://HOST:PORT/api/v0/fetch', 'TOKEN', 'SELECTOR{}', 'now', -100, indexedByTimestamp=True)\n\nprint(df)\n```\n\n## Reading data from a Warp 10 instance via Spark\n\n```\nimport pywarp.spark\n\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql import SQLContext\n\nbuilder = SparkSession.builder.appName(\"PyWarp Test\")\n\nspark = builder.getOrCreate()\nsc = spark.sparkContext\n\nsqlContext = SQLContext(sc)\n\ndf = pywarp.spark.fetch(sc, 'https://HOST:PORT/api/v0/fetch', 'TOKEN', 'SELECTOR{}', 'now', -1440)\ndf = pywarp.spark.wrapper2df(sc, df, 'wrapper')\ndf.show()\n```\n\nSpark jobs making use of the HFStore extension must be launched using:\n\n```\nspark-submit --packages io.warp10:warp10-spark:3.0.2,io.senx:warp10-ext-hfstore:2.0.0 \\\n  --repositories https://maven.senx.io/repository/senx-public \\\n  --properties-file spark.conf \\\n  --files warp10.conf\n```\n\nwhere `spark.conf` contains the following definitions:\n\n```\n##\n## Executor specific options\n##\n\nspark.executor.extraJavaOptions=-Dwarp10.config=warp10.conf -Ddisable.logging=true \n\n##\n## Driver specific options\n##\n\nspark.driver.extraJavaOptions=-Dwarp10.config=warp10.conf -Ddisable.logging=true \n```\n\nand the `warp10.conf` file contains *a minima*:\n\n```\n##\n## Use microseconds as the time unit\n##\nwarp.timeunits=us\n\n##\n## Load the Spark extension\n##\nwarpscript.extensions=io.warp10.spark.SparkWarpScriptExtension\n\n##\n## Load the Debug extension so STDOUT is available\n##\nwarpscript.extension.debug=io.warp10.script.ext.debug.DebugWarpScriptExtension\n```\n\nAlternatively if you do not want to use `spark-submit`, you can add the following in your script between the line `builder = ....` and `spark = builder.getOrCreate()`\n\n```\nconf = {}\nconf['spark.master'] = 'local'\nconf['spark.submit.deployMode'] = 'client'\nconf['spark.executor.instances'] = '1'\nconf['spark.executor.cores'] = '2'\nconf['spark.driver.memory'] = '1g'\nconf['spark.executor.memory'] = '1g'\nconf['spark.executor.extraJavaOptions'] = '-Dwarp10.config=warp10.conf -Ddisable.logging=true'\nconf['spark.driver.extraJavaOptions'] = '-Dwarp10.config=warp10.conf -Ddisable.logging=true'\nconf['spark.driver.bindAddress'] = '0.0.0.0'\nconf['spark.jars.packages'] = 'io.warp10:warp10-spark:3.0.2,io.senx:warp10-ext-hfstore:2.0.0'\nconf['spark.jars.repositories'] = 'https://maven.senx.io/senx-public'\nconf['spark.files'] = 'warp10.conf'\n\nfor (k,v) in conf.items():\n  builder = builder.config(key=k,value=v)\n```\n\nand simply launch it using `python3`.\n\n## Reading data from HFiles in Spark\n\n```\nimport pywarp.spark\n\nfrom pyspark.sql import SparkSession\nfrom pyspark.sql import SQLContext\n\nspark = SparkSession.builder.appName(\"PyWarp Test\").getOrCreate()\nsc = spark.sparkContext\n\nsqlContext = SQLContext(sc)\n\ndf = pywarp.spark.hfileread(sc, '/path/to/file.hfile', selector='SELECTOR{}', end=1081244481160.000, start=1081244472361.000)\n\ndf = pywarp.spark.wrapper2df(sc, df, 'wrapper')\ndf.show(n=1000,truncate=False)\n```\n\n## Executing WarpScript on a Warp 10 instance\n\n```\nimport pywarp\n\nx = pywarp.exec('https://sandbox.senx.io/api/v0/exec',\n\"\"\"\nREV REV REV \"UTF-8\" ->BYTES 42 42.0 F 6 ->LIST\n#->PICKLE ->B64\n\"\"\",\nFalse # Set to true if your code returns base64 encoded pickled content (decomment the line that uses ->PICKLE above)\n)\n\nprint(x)\n```\n## Executing WarpScript in Spark\n\n```\nimport pywarp\n\nspark = SparkSession.builder.appName(\"PyWarp Test\").getOrCreate()\nsc = spark.sparkContext\n\ndf = ....\n\n# Register a function 'foo' which returns a STRING and takes 2 parameters\npywarp.spark.register(df.sql_ctx, 'foo', 2, '')\n\n# Create a temp view\ndf.createOrReplaceTempView('DF')\n# Call WarpScript which converts column _1 to a STRING and returns it\ndf = df.sql_ctx.sql(\"SELECT foo(' TOSTRING', _1) AS str FROM DF\");\ndf.show()\n```",
    "bugtrack_url": null,
    "license": "Apache 2.0",
    "summary": "A set of functions to ease working with Warp 10",
    "version": "1.0.1",
    "project_urls": {
        "Homepage": "http://github.com/senx/pywarp"
    },
    "split_keywords": [
        "time",
        "series",
        "warp",
        "10"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "4a58399bff70063f66f6d1d89f3ad04b91455414c014b6b3db1c3d922c7da001",
                "md5": "a9b896cbd827e07bf5574128c4fd1e87",
                "sha256": "26162f2e7e9aaaf241e58bb9ebb19006af8f0ae0ce453869b7f20234f9c93460"
            },
            "downloads": -1,
            "filename": "warp10-pywarp-1.0.1.tar.gz",
            "has_sig": false,
            "md5_digest": "a9b896cbd827e07bf5574128c4fd1e87",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 16368,
            "upload_time": "2024-06-25T10:11:55",
            "upload_time_iso_8601": "2024-06-25T10:11:55.408029Z",
            "url": "https://files.pythonhosted.org/packages/4a/58/399bff70063f66f6d1d89f3ad04b91455414c014b6b3db1c3d922c7da001/warp10-pywarp-1.0.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-06-25 10:11:55",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "senx",
    "github_project": "pywarp",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "requirements": [],
    "lcname": "warp10-pywarp"
}
        
Elapsed time: 4.80614s