timeplus


Nametimeplus JSON
Version 1.4.1 PyPI version JSON
download
home_pagehttps://github.com/timeplus-io/gluon/tree/develop/python
SummaryTimeplus python SDK
upload_time2024-03-19 21:36:17
maintainer
docs_urlNone
authorGang Tao
requires_python>=3
license
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Timeplus Python Client

[Timeplus](https://www.timeplus.com/) is a real-time streaming data analytic platform.  Timeplus python client provides basic functionalities to interact with Timeplus cloud, to manage the streaming analytic work loads.


## Installation

Proton Python Driver currently supports the following versions of Python: 3.8, 3.9, and 3.10.

### Installing with pip
We recommend creating a virtual environment when installing Python dependencies. For more information on setting up a virtual environment, see the [Python documentation](https://docs.python.org/3.9/tutorial/venv.html)


```bash
pip install timeplus
```

## 3 API
The `timeplus` python libary supports 3 API:

* [DB API](#db-api) as defined in [Python Database API Specification v2.0](https://peps.python.org/pep-0249/).
* [SQLAchamy](#sqlachamy)
* [REST API](#rest-api)

### DB API

Timeplus python client support DB API defined in [Python Database API Specification v2.0](https://peps.python.org/pep-0249/).  


```python
from timeplus.dbapi import connect

api_key = "your_timeplus_apikey"
api_address = "us.timeplus.cloud"
workspace = "your_timeplus_workspace_id"

# create a connection using host/password/workspace
conn = connect(host=api_address, password=api_key, path=workspace)

# run a streaming query
cursor = conn.execute("select * from car_live_data")

# get first result from the qeury
next_result = cursor.next()

# get next one result from the qeury
row1 = cursor.fetchone()

# get next three result
rows = cursor.fetchmany(3)
```

Note, as the streaming query result is unbounded, the cursor will not end, fetch will be blocked is there is no new query result.


### SQLAchamy

Timeplus python client has implemeted a [SQLAlchemy](https://www.sqlalchemy.org/) dialect to run queries, so user can use it with SQLAlchemy API.

```python
import os
from sqlalchemy import create_engine, text, select, MetaData, Table
from sqlalchemy.dialects import registry

# register timeplus dialect
registry.register("timeplus", "timeplus.sqlalchemy", "TimeplusDialect")

api_key = os.environ.get("TIMEPLUS_API_KEY")
api_address = "dev.timeplus.cloud"
port = 443
workspace = os.environ.get("TIMEPLUS_WORKSPACE") or "tp-demo"

# create a new engine
engine = create_engine(
    f"timeplus://:{api_key}@{api_address}:{port}/{workspace}")

# execute streaming sql
with engine.connect() as connection:
    result = connection.execute(text("select * from car_live_data"))
    count = 0
    max = 10
    for row in result:
        print(f"got one row : {row}")
        count += 1
        if count >= max:
            break

# execute statement using table from metadata
metadata_obj = MetaData()
car_table = Table(table_name, metadata_obj, autoload_with=engine)
print(f"reflected table is {car_table}")
print(f"cols is {[ (c.name, c.type) for c in car_table.columns]}")

stmt = select(car_table).where(car_table.c.cid == "c00001")
print(stmt)
with engine.connect() as conn:
    count = 0
    max = 3
    for row in conn.execute(stmt):
        print(f"got one row from query {row}")
        count += 1
        if count >= max:
            break
```

### REST API

Timeplus python client also provides resources wrapper which can be used to call the [Timeplus REST API](https://docs.timeplus.com/rest.html) through python object.

here is a list of all supported resources

| Resource              |  Supported Methods                                    |
|-----------------------|--------------------------------------------|
| [Stream](https://docs.timeplus.com/working-with-streams)| create,list,get,delete,ingest,exist        |
| [Query](https://docs.timeplus.com/stream-query)| create,list,get,delete,cancel,analyze      |
| [Source](https://docs.timeplus.com/source) | create,list,get,delete,start,stop          |
| [Sink](https://docs.timeplus.com/destination)| create,list,get,delete,start,stop          |
| [View](https://docs.timeplus.com/view)| create,list,get,delete,exist               |
| [UDF](https://docs.timeplus.com/udf)|  list                                       |
| [Alert](https://docs.timeplus.com/alert)| list                                       |
| [Dashboard](https://docs.timeplus.com/viz#dashboard) | list                                       |


#### query

Run streaming query and fetch the query result with query metrics.

```python
import os
import traceback
import json
from pprint import pprint

from timeplus import Query, Environment

api_key = os.environ.get("TIMEPLUS_API_KEY")
api_address = os.environ.get("TIMEPLUS_HOST")
workspace = os.environ.get("TIMEPLUS_WORKSPACE")

# Configure API key and address
env = Environment().address(api_address).workspace(workspace).apikey(api_key)

try:
    # list all qeuries
    query_list = Query(env=env).list()
    pprint(f"there are {len(query_list)} queries ")

    # create a new query
    query = (
        Query(env=env).sql(query="SELECT * FROM car_live_data")
        # .batching_policy(1000, 1000)
        .create()
    )

    pprint(f"query with metadata {json.dumps(query.metadata())}")

    # query header is the colume definitions of query result table
    # it is a list of name/value pair
    # for example : [{'name': 'in_use', 'type': 'bool'}, {'name': 'speed', 'type': 'float32'}]
    query_header = query.header()
    pprint(f"query with header {query.header()}")

    # iterate query result
    limit = 3
    count = 0

    # query.result() is an iterator which will pull all the query result in small batches
    # the iterator will continously pulling query result
    # for streaming query, the iterator will not end until user cancel the query
    for event in query.result():
        # metric event return result time query metrics
        # a sample metrics event:
        # {'count': 117, 'eps': 75, 'processing_time': 1560,
        # 'last_event_time': 1686237113265, 'response_time': 861,
        # 'scanned_rows': 117, 'scanned_bytes': 7605}
        if event.event == "metrics":
            pprint(json.loads(event.data))

        # message event contains query result which is an array of arrays
        # representing multiple query result rows
        # a sample message event:
        # [[True,-73.857],[False, 84.1]]
        if event.event == "message":
            pprint(json.loads(event.data))
        count += 1
        if count >= limit:
            break

    query.cancel()
    query.delete()

except Exception as e:
    pprint(e)
    traceback.print_exc()
```

#### stream

Create/list/get/delete of streams

```python
import os
import traceback
import json
from pprint import pprint

from timeplus import Stream, Environment

api_key = os.environ.get("TIMEPLUS_API_KEY")
api_address = os.environ.get("TIMEPLUS_HOST")
worksapce = os.environ.get("TIMEPLUS_WORKSAPCE")

# Configure API key and address
env = Environment().address(api_address).apikey(api_key).workspace(worksapce)

try:
    # list all streams
    stream_list = Stream(env=env).list()
    pprint(f"there are {len(stream_list)} streams ")

    # create a new stream
    stream = (
        Stream(env=env)
        .name("test")
        .column("time", "datetime64(3)")
        .column("data", "string")
        .create()
    )

    stream_list = Stream(env=env).list()
    pprint(f"there are {len(stream_list)} streams after create")
    pprint(f"created stream is {stream.metadata()}; type is {type(stream.metadata())}")

    a_stream = Stream(env=env).name("test").get()
    pprint(f"get stream is {a_stream.metadata()} ; type is {type(a_stream.metadata())}")

    stream.delete()
    stream_list = Stream(env=env).list()
    pprint(f"there are {len(stream_list)} streams after delete")
except Exception as e:
    pprint(e)
    traceback.print_exc()

```

#### ingest

Ingest data into streams

##### default ingest

```python
stream = (
        Stream(env=env)
        .name("test_ingest")
        .column("time", "datetime64(3)")
        .column("data", "string")
        .create()
    )

stream.ingest(["time", "data"], [[datetime.datetime.now(), "abcd"]])

```

##### ingest json streams

```python
stream = (
        Stream(env=env)
        .name("test_ingest")
        .column("a", "integer")
        .column("b", "string")
        .create()
    )

payload = """
{"a":2,"b":"hello"}
{"a":1,"b":"world"}
"""

stream.ingest(payload=payload, format="streaming")

```

##### ingest one raw event with multiple lines

```python
stream = Stream(env=env).name("test_ingest_raw").column("raw", "string").create()

payload = """
first line
second line
"""

stream.ingest(payload=payload, format="raw")

```

##### ingest multiple lines json

```python
stream = Stream(env=env).name("test_ingest_lines").column("raw", "string").create()
payload = '{"a":1,"b":"world"}\n{"a":2,"b":"hello"}'

stream.ingest(payload=payload, format="lines")

```

## Examples

More sample code can be found [here](../examples/helloworld/)

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/timeplus-io/gluon/tree/develop/python",
    "name": "timeplus",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3",
    "maintainer_email": "",
    "keywords": "",
    "author": "Gang Tao",
    "author_email": "gang@timeplus.io",
    "download_url": "https://files.pythonhosted.org/packages/37/d8/68a19339bea2d6f691180ae13bb40df99efd08653e3b15fb67958a6bc75b/timeplus-1.4.1.tar.gz",
    "platform": null,
    "description": "# Timeplus Python Client\n\n[Timeplus](https://www.timeplus.com/) is a real-time streaming data analytic platform.  Timeplus python client provides basic functionalities to interact with Timeplus cloud, to manage the streaming analytic work loads.\n\n\n## Installation\n\nProton Python Driver currently supports the following versions of Python: 3.8, 3.9, and 3.10.\n\n### Installing with pip\nWe recommend creating a virtual environment when installing Python dependencies. For more information on setting up a virtual environment, see the [Python documentation](https://docs.python.org/3.9/tutorial/venv.html)\n\n\n```bash\npip install timeplus\n```\n\n## 3 API\nThe `timeplus` python libary supports 3 API:\n\n* [DB API](#db-api) as defined in [Python Database API Specification v2.0](https://peps.python.org/pep-0249/).\n* [SQLAchamy](#sqlachamy)\n* [REST API](#rest-api)\n\n### DB API\n\nTimeplus python client support DB API defined in [Python Database API Specification v2.0](https://peps.python.org/pep-0249/).  \n\n\n```python\nfrom timeplus.dbapi import connect\n\napi_key = \"your_timeplus_apikey\"\napi_address = \"us.timeplus.cloud\"\nworkspace = \"your_timeplus_workspace_id\"\n\n# create a connection using host/password/workspace\nconn = connect(host=api_address, password=api_key, path=workspace)\n\n# run a streaming query\ncursor = conn.execute(\"select * from car_live_data\")\n\n# get first result from the qeury\nnext_result = cursor.next()\n\n# get next one result from the qeury\nrow1 = cursor.fetchone()\n\n# get next three result\nrows = cursor.fetchmany(3)\n```\n\nNote, as the streaming query result is unbounded, the cursor will not end, fetch will be blocked is there is no new query result.\n\n\n### SQLAchamy\n\nTimeplus python client has implemeted a [SQLAlchemy](https://www.sqlalchemy.org/) dialect to run queries, so user can use it with SQLAlchemy API.\n\n```python\nimport os\nfrom sqlalchemy import create_engine, text, select, MetaData, Table\nfrom sqlalchemy.dialects import registry\n\n# register timeplus dialect\nregistry.register(\"timeplus\", \"timeplus.sqlalchemy\", \"TimeplusDialect\")\n\napi_key = os.environ.get(\"TIMEPLUS_API_KEY\")\napi_address = \"dev.timeplus.cloud\"\nport = 443\nworkspace = os.environ.get(\"TIMEPLUS_WORKSPACE\") or \"tp-demo\"\n\n# create a new engine\nengine = create_engine(\n    f\"timeplus://:{api_key}@{api_address}:{port}/{workspace}\")\n\n# execute streaming sql\nwith engine.connect() as connection:\n    result = connection.execute(text(\"select * from car_live_data\"))\n    count = 0\n    max = 10\n    for row in result:\n        print(f\"got one row : {row}\")\n        count += 1\n        if count >= max:\n            break\n\n# execute statement using table from metadata\nmetadata_obj = MetaData()\ncar_table = Table(table_name, metadata_obj, autoload_with=engine)\nprint(f\"reflected table is {car_table}\")\nprint(f\"cols is {[ (c.name, c.type) for c in car_table.columns]}\")\n\nstmt = select(car_table).where(car_table.c.cid == \"c00001\")\nprint(stmt)\nwith engine.connect() as conn:\n    count = 0\n    max = 3\n    for row in conn.execute(stmt):\n        print(f\"got one row from query {row}\")\n        count += 1\n        if count >= max:\n            break\n```\n\n### REST API\n\nTimeplus python client also provides resources wrapper which can be used to call the [Timeplus REST API](https://docs.timeplus.com/rest.html) through python object.\n\nhere is a list of all supported resources\n\n| Resource              |  Supported Methods                                    |\n|-----------------------|--------------------------------------------|\n| [Stream](https://docs.timeplus.com/working-with-streams)| create,list,get,delete,ingest,exist        |\n| [Query](https://docs.timeplus.com/stream-query)| create,list,get,delete,cancel,analyze      |\n| [Source](https://docs.timeplus.com/source) | create,list,get,delete,start,stop          |\n| [Sink](https://docs.timeplus.com/destination)| create,list,get,delete,start,stop          |\n| [View](https://docs.timeplus.com/view)| create,list,get,delete,exist               |\n| [UDF](https://docs.timeplus.com/udf)|  list                                       |\n| [Alert](https://docs.timeplus.com/alert)| list                                       |\n| [Dashboard](https://docs.timeplus.com/viz#dashboard) | list                                       |\n\n\n#### query\n\nRun streaming query and fetch the query result with query metrics.\n\n```python\nimport os\nimport traceback\nimport json\nfrom pprint import pprint\n\nfrom timeplus import Query, Environment\n\napi_key = os.environ.get(\"TIMEPLUS_API_KEY\")\napi_address = os.environ.get(\"TIMEPLUS_HOST\")\nworkspace = os.environ.get(\"TIMEPLUS_WORKSPACE\")\n\n# Configure API key and address\nenv = Environment().address(api_address).workspace(workspace).apikey(api_key)\n\ntry:\n    # list all qeuries\n    query_list = Query(env=env).list()\n    pprint(f\"there are {len(query_list)} queries \")\n\n    # create a new query\n    query = (\n        Query(env=env).sql(query=\"SELECT * FROM car_live_data\")\n        # .batching_policy(1000, 1000)\n        .create()\n    )\n\n    pprint(f\"query with metadata {json.dumps(query.metadata())}\")\n\n    # query header is the colume definitions of query result table\n    # it is a list of name/value pair\n    # for example : [{'name': 'in_use', 'type': 'bool'}, {'name': 'speed', 'type': 'float32'}]\n    query_header = query.header()\n    pprint(f\"query with header {query.header()}\")\n\n    # iterate query result\n    limit = 3\n    count = 0\n\n    # query.result() is an iterator which will pull all the query result in small batches\n    # the iterator will continously pulling query result\n    # for streaming query, the iterator will not end until user cancel the query\n    for event in query.result():\n        # metric event return result time query metrics\n        # a sample metrics event:\n        # {'count': 117, 'eps': 75, 'processing_time': 1560,\n        # 'last_event_time': 1686237113265, 'response_time': 861,\n        # 'scanned_rows': 117, 'scanned_bytes': 7605}\n        if event.event == \"metrics\":\n            pprint(json.loads(event.data))\n\n        # message event contains query result which is an array of arrays\n        # representing multiple query result rows\n        # a sample message event:\n        # [[True,-73.857],[False, 84.1]]\n        if event.event == \"message\":\n            pprint(json.loads(event.data))\n        count += 1\n        if count >= limit:\n            break\n\n    query.cancel()\n    query.delete()\n\nexcept Exception as e:\n    pprint(e)\n    traceback.print_exc()\n```\n\n#### stream\n\nCreate/list/get/delete of streams\n\n```python\nimport os\nimport traceback\nimport json\nfrom pprint import pprint\n\nfrom timeplus import Stream, Environment\n\napi_key = os.environ.get(\"TIMEPLUS_API_KEY\")\napi_address = os.environ.get(\"TIMEPLUS_HOST\")\nworksapce = os.environ.get(\"TIMEPLUS_WORKSAPCE\")\n\n# Configure API key and address\nenv = Environment().address(api_address).apikey(api_key).workspace(worksapce)\n\ntry:\n    # list all streams\n    stream_list = Stream(env=env).list()\n    pprint(f\"there are {len(stream_list)} streams \")\n\n    # create a new stream\n    stream = (\n        Stream(env=env)\n        .name(\"test\")\n        .column(\"time\", \"datetime64(3)\")\n        .column(\"data\", \"string\")\n        .create()\n    )\n\n    stream_list = Stream(env=env).list()\n    pprint(f\"there are {len(stream_list)} streams after create\")\n    pprint(f\"created stream is {stream.metadata()}; type is {type(stream.metadata())}\")\n\n    a_stream = Stream(env=env).name(\"test\").get()\n    pprint(f\"get stream is {a_stream.metadata()} ; type is {type(a_stream.metadata())}\")\n\n    stream.delete()\n    stream_list = Stream(env=env).list()\n    pprint(f\"there are {len(stream_list)} streams after delete\")\nexcept Exception as e:\n    pprint(e)\n    traceback.print_exc()\n\n```\n\n#### ingest\n\nIngest data into streams\n\n##### default ingest\n\n```python\nstream = (\n        Stream(env=env)\n        .name(\"test_ingest\")\n        .column(\"time\", \"datetime64(3)\")\n        .column(\"data\", \"string\")\n        .create()\n    )\n\nstream.ingest([\"time\", \"data\"], [[datetime.datetime.now(), \"abcd\"]])\n\n```\n\n##### ingest json streams\n\n```python\nstream = (\n        Stream(env=env)\n        .name(\"test_ingest\")\n        .column(\"a\", \"integer\")\n        .column(\"b\", \"string\")\n        .create()\n    )\n\npayload = \"\"\"\n{\"a\":2,\"b\":\"hello\"}\n{\"a\":1,\"b\":\"world\"}\n\"\"\"\n\nstream.ingest(payload=payload, format=\"streaming\")\n\n```\n\n##### ingest one raw event with multiple lines\n\n```python\nstream = Stream(env=env).name(\"test_ingest_raw\").column(\"raw\", \"string\").create()\n\npayload = \"\"\"\nfirst line\nsecond line\n\"\"\"\n\nstream.ingest(payload=payload, format=\"raw\")\n\n```\n\n##### ingest multiple lines json\n\n```python\nstream = Stream(env=env).name(\"test_ingest_lines\").column(\"raw\", \"string\").create()\npayload = '{\"a\":1,\"b\":\"world\"}\\n{\"a\":2,\"b\":\"hello\"}'\n\nstream.ingest(payload=payload, format=\"lines\")\n\n```\n\n## Examples\n\nMore sample code can be found [here](../examples/helloworld/)\n",
    "bugtrack_url": null,
    "license": "",
    "summary": "Timeplus python SDK",
    "version": "1.4.1",
    "project_urls": {
        "Bug Tracker": "https://github.com/timeplus-io/gluon/issues",
        "Homepage": "https://github.com/timeplus-io/gluon/tree/develop/python"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "764136cde8204e431f98de520ffb96836af0edd3915c8b66472b454b45c9c66f",
                "md5": "87fa73c1491d060c10b96f9118bc190a",
                "sha256": "34961e1d9789d49e9a60caa42205b2b9d4d2acede4724037316c73c39c8d0268"
            },
            "downloads": -1,
            "filename": "timeplus-1.4.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "87fa73c1491d060c10b96f9118bc190a",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3",
            "size": 290340,
            "upload_time": "2024-03-19T21:36:15",
            "upload_time_iso_8601": "2024-03-19T21:36:15.015090Z",
            "url": "https://files.pythonhosted.org/packages/76/41/36cde8204e431f98de520ffb96836af0edd3915c8b66472b454b45c9c66f/timeplus-1.4.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "37d868a19339bea2d6f691180ae13bb40df99efd08653e3b15fb67958a6bc75b",
                "md5": "ef51276314486b59966dc9ec57701bee",
                "sha256": "36db4c76e9898dd83dc64fe8520d772bd51e98286292aa0c62d7f4cde49662c7"
            },
            "downloads": -1,
            "filename": "timeplus-1.4.1.tar.gz",
            "has_sig": false,
            "md5_digest": "ef51276314486b59966dc9ec57701bee",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3",
            "size": 104181,
            "upload_time": "2024-03-19T21:36:17",
            "upload_time_iso_8601": "2024-03-19T21:36:17.655934Z",
            "url": "https://files.pythonhosted.org/packages/37/d8/68a19339bea2d6f691180ae13bb40df99efd08653e3b15fb67958a6bc75b/timeplus-1.4.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-03-19 21:36:17",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "timeplus-io",
    "github_project": "gluon",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "timeplus"
}
        
Elapsed time: 0.30936s