# Timeplus Python Client
[Timeplus](https://www.timeplus.com/) is a real-time streaming data analytic platform. Timeplus python client provides basic functionalities to interact with Timeplus cloud, to manage the streaming analytic work loads.
## Installation
Proton Python Driver currently supports the following versions of Python: 3.8, 3.9, and 3.10.
### Installing with pip
We recommend creating a virtual environment when installing Python dependencies. For more information on setting up a virtual environment, see the [Python documentation](https://docs.python.org/3.9/tutorial/venv.html)
```bash
pip install timeplus
```
## 3 API
The `timeplus` python libary supports 3 API:
* [DB API](#db-api) as defined in [Python Database API Specification v2.0](https://peps.python.org/pep-0249/).
* [SQLAchamy](#sqlachamy)
* [REST API](#rest-api)
### DB API
Timeplus python client support DB API defined in [Python Database API Specification v2.0](https://peps.python.org/pep-0249/).
```python
from timeplus.dbapi import connect
api_key = "your_timeplus_apikey"
api_address = "us.timeplus.cloud"
workspace = "your_timeplus_workspace_id"
# create a connection using host/password/workspace
conn = connect(host=api_address, password=api_key, path=workspace)
# run a streaming query
cursor = conn.execute("select * from car_live_data")
# get first result from the qeury
next_result = cursor.next()
# get next one result from the qeury
row1 = cursor.fetchone()
# get next three result
rows = cursor.fetchmany(3)
```
Note, as the streaming query result is unbounded, the cursor will not end, fetch will be blocked is there is no new query result.
### SQLAchamy
Timeplus python client has implemeted a [SQLAlchemy](https://www.sqlalchemy.org/) dialect to run queries, so user can use it with SQLAlchemy API.
```python
import os
from sqlalchemy import create_engine, text, select, MetaData, Table
from sqlalchemy.dialects import registry
# register timeplus dialect
registry.register("timeplus", "timeplus.sqlalchemy", "TimeplusDialect")
api_key = os.environ.get("TIMEPLUS_API_KEY")
api_address = "dev.timeplus.cloud"
port = 443
workspace = os.environ.get("TIMEPLUS_WORKSPACE") or "tp-demo"
# create a new engine
engine = create_engine(
f"timeplus://:{api_key}@{api_address}:{port}/{workspace}")
# execute streaming sql
with engine.connect() as connection:
result = connection.execute(text("select * from car_live_data"))
count = 0
max = 10
for row in result:
print(f"got one row : {row}")
count += 1
if count >= max:
break
# execute statement using table from metadata
metadata_obj = MetaData()
car_table = Table(table_name, metadata_obj, autoload_with=engine)
print(f"reflected table is {car_table}")
print(f"cols is {[ (c.name, c.type) for c in car_table.columns]}")
stmt = select(car_table).where(car_table.c.cid == "c00001")
print(stmt)
with engine.connect() as conn:
count = 0
max = 3
for row in conn.execute(stmt):
print(f"got one row from query {row}")
count += 1
if count >= max:
break
```
### REST API
Timeplus python client also provides resources wrapper which can be used to call the [Timeplus REST API](https://docs.timeplus.com/rest.html) through python object.
here is a list of all supported resources
| Resource | Supported Methods |
|-----------------------|--------------------------------------------|
| [Stream](https://docs.timeplus.com/working-with-streams)| create,list,get,delete,ingest,exist |
| [Query](https://docs.timeplus.com/stream-query)| create,list,get,delete,cancel,analyze |
| [Source](https://docs.timeplus.com/source) | create,list,get,delete,start,stop |
| [Sink](https://docs.timeplus.com/destination)| create,list,get,delete,start,stop |
| [View](https://docs.timeplus.com/view)| create,list,get,delete,exist |
| [UDF](https://docs.timeplus.com/udf)| list |
| [Alert](https://docs.timeplus.com/alert)| list |
| [Dashboard](https://docs.timeplus.com/viz#dashboard) | list |
#### query
Run streaming query and fetch the query result with query metrics.
```python
import os
import traceback
import json
from pprint import pprint
from timeplus import Query, Environment
api_key = os.environ.get("TIMEPLUS_API_KEY")
api_address = os.environ.get("TIMEPLUS_HOST")
workspace = os.environ.get("TIMEPLUS_WORKSPACE")
# Configure API key and address
env = Environment().address(api_address).workspace(workspace).apikey(api_key)
try:
# list all qeuries
query_list = Query(env=env).list()
pprint(f"there are {len(query_list)} queries ")
# create a new query
query = (
Query(env=env).sql(query="SELECT * FROM car_live_data")
# .batching_policy(1000, 1000)
.create()
)
pprint(f"query with metadata {json.dumps(query.metadata())}")
# query header is the colume definitions of query result table
# it is a list of name/value pair
# for example : [{'name': 'in_use', 'type': 'bool'}, {'name': 'speed', 'type': 'float32'}]
query_header = query.header()
pprint(f"query with header {query.header()}")
# iterate query result
limit = 3
count = 0
# query.result() is an iterator which will pull all the query result in small batches
# the iterator will continously pulling query result
# for streaming query, the iterator will not end until user cancel the query
for event in query.result():
# metric event return result time query metrics
# a sample metrics event:
# {'count': 117, 'eps': 75, 'processing_time': 1560,
# 'last_event_time': 1686237113265, 'response_time': 861,
# 'scanned_rows': 117, 'scanned_bytes': 7605}
if event.event == "metrics":
pprint(json.loads(event.data))
# message event contains query result which is an array of arrays
# representing multiple query result rows
# a sample message event:
# [[True,-73.857],[False, 84.1]]
if event.event == "message":
pprint(json.loads(event.data))
count += 1
if count >= limit:
break
query.cancel()
query.delete()
except Exception as e:
pprint(e)
traceback.print_exc()
```
#### stream
Create/list/get/delete of streams
```python
import os
import traceback
import json
from pprint import pprint
from timeplus import Stream, Environment
api_key = os.environ.get("TIMEPLUS_API_KEY")
api_address = os.environ.get("TIMEPLUS_HOST")
worksapce = os.environ.get("TIMEPLUS_WORKSAPCE")
# Configure API key and address
env = Environment().address(api_address).apikey(api_key).workspace(worksapce)
try:
# list all streams
stream_list = Stream(env=env).list()
pprint(f"there are {len(stream_list)} streams ")
# create a new stream
stream = (
Stream(env=env)
.name("test")
.column("time", "datetime64(3)")
.column("data", "string")
.create()
)
stream_list = Stream(env=env).list()
pprint(f"there are {len(stream_list)} streams after create")
pprint(f"created stream is {stream.metadata()}; type is {type(stream.metadata())}")
a_stream = Stream(env=env).name("test").get()
pprint(f"get stream is {a_stream.metadata()} ; type is {type(a_stream.metadata())}")
stream.delete()
stream_list = Stream(env=env).list()
pprint(f"there are {len(stream_list)} streams after delete")
except Exception as e:
pprint(e)
traceback.print_exc()
```
#### ingest
Ingest data into streams
##### default ingest
```python
stream = (
Stream(env=env)
.name("test_ingest")
.column("time", "datetime64(3)")
.column("data", "string")
.create()
)
stream.ingest(["time", "data"], [[datetime.datetime.now(), "abcd"]])
```
##### ingest json streams
```python
stream = (
Stream(env=env)
.name("test_ingest")
.column("a", "integer")
.column("b", "string")
.create()
)
payload = """
{"a":2,"b":"hello"}
{"a":1,"b":"world"}
"""
stream.ingest(payload=payload, format="streaming")
```
##### ingest one raw event with multiple lines
```python
stream = Stream(env=env).name("test_ingest_raw").column("raw", "string").create()
payload = """
first line
second line
"""
stream.ingest(payload=payload, format="raw")
```
##### ingest multiple lines json
```python
stream = Stream(env=env).name("test_ingest_lines").column("raw", "string").create()
payload = '{"a":1,"b":"world"}\n{"a":2,"b":"hello"}'
stream.ingest(payload=payload, format="lines")
```
## Examples
More sample code can be found [here](../examples/helloworld/)
Raw data
{
"_id": null,
"home_page": "https://github.com/timeplus-io/gluon/tree/develop/python",
"name": "timeplus",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3",
"maintainer_email": "",
"keywords": "",
"author": "Gang Tao",
"author_email": "gang@timeplus.io",
"download_url": "https://files.pythonhosted.org/packages/37/d8/68a19339bea2d6f691180ae13bb40df99efd08653e3b15fb67958a6bc75b/timeplus-1.4.1.tar.gz",
"platform": null,
"description": "# Timeplus Python Client\n\n[Timeplus](https://www.timeplus.com/) is a real-time streaming data analytic platform. Timeplus python client provides basic functionalities to interact with Timeplus cloud, to manage the streaming analytic work loads.\n\n\n## Installation\n\nProton Python Driver currently supports the following versions of Python: 3.8, 3.9, and 3.10.\n\n### Installing with pip\nWe recommend creating a virtual environment when installing Python dependencies. For more information on setting up a virtual environment, see the [Python documentation](https://docs.python.org/3.9/tutorial/venv.html)\n\n\n```bash\npip install timeplus\n```\n\n## 3 API\nThe `timeplus` python libary supports 3 API:\n\n* [DB API](#db-api) as defined in [Python Database API Specification v2.0](https://peps.python.org/pep-0249/).\n* [SQLAchamy](#sqlachamy)\n* [REST API](#rest-api)\n\n### DB API\n\nTimeplus python client support DB API defined in [Python Database API Specification v2.0](https://peps.python.org/pep-0249/). \n\n\n```python\nfrom timeplus.dbapi import connect\n\napi_key = \"your_timeplus_apikey\"\napi_address = \"us.timeplus.cloud\"\nworkspace = \"your_timeplus_workspace_id\"\n\n# create a connection using host/password/workspace\nconn = connect(host=api_address, password=api_key, path=workspace)\n\n# run a streaming query\ncursor = conn.execute(\"select * from car_live_data\")\n\n# get first result from the qeury\nnext_result = cursor.next()\n\n# get next one result from the qeury\nrow1 = cursor.fetchone()\n\n# get next three result\nrows = cursor.fetchmany(3)\n```\n\nNote, as the streaming query result is unbounded, the cursor will not end, fetch will be blocked is there is no new query result.\n\n\n### SQLAchamy\n\nTimeplus python client has implemeted a [SQLAlchemy](https://www.sqlalchemy.org/) dialect to run queries, so user can use it with SQLAlchemy API.\n\n```python\nimport os\nfrom sqlalchemy import create_engine, text, select, MetaData, Table\nfrom sqlalchemy.dialects import registry\n\n# register timeplus dialect\nregistry.register(\"timeplus\", \"timeplus.sqlalchemy\", \"TimeplusDialect\")\n\napi_key = os.environ.get(\"TIMEPLUS_API_KEY\")\napi_address = \"dev.timeplus.cloud\"\nport = 443\nworkspace = os.environ.get(\"TIMEPLUS_WORKSPACE\") or \"tp-demo\"\n\n# create a new engine\nengine = create_engine(\n f\"timeplus://:{api_key}@{api_address}:{port}/{workspace}\")\n\n# execute streaming sql\nwith engine.connect() as connection:\n result = connection.execute(text(\"select * from car_live_data\"))\n count = 0\n max = 10\n for row in result:\n print(f\"got one row : {row}\")\n count += 1\n if count >= max:\n break\n\n# execute statement using table from metadata\nmetadata_obj = MetaData()\ncar_table = Table(table_name, metadata_obj, autoload_with=engine)\nprint(f\"reflected table is {car_table}\")\nprint(f\"cols is {[ (c.name, c.type) for c in car_table.columns]}\")\n\nstmt = select(car_table).where(car_table.c.cid == \"c00001\")\nprint(stmt)\nwith engine.connect() as conn:\n count = 0\n max = 3\n for row in conn.execute(stmt):\n print(f\"got one row from query {row}\")\n count += 1\n if count >= max:\n break\n```\n\n### REST API\n\nTimeplus python client also provides resources wrapper which can be used to call the [Timeplus REST API](https://docs.timeplus.com/rest.html) through python object.\n\nhere is a list of all supported resources\n\n| Resource | Supported Methods |\n|-----------------------|--------------------------------------------|\n| [Stream](https://docs.timeplus.com/working-with-streams)| create,list,get,delete,ingest,exist |\n| [Query](https://docs.timeplus.com/stream-query)| create,list,get,delete,cancel,analyze |\n| [Source](https://docs.timeplus.com/source) | create,list,get,delete,start,stop |\n| [Sink](https://docs.timeplus.com/destination)| create,list,get,delete,start,stop |\n| [View](https://docs.timeplus.com/view)| create,list,get,delete,exist |\n| [UDF](https://docs.timeplus.com/udf)| list |\n| [Alert](https://docs.timeplus.com/alert)| list |\n| [Dashboard](https://docs.timeplus.com/viz#dashboard) | list |\n\n\n#### query\n\nRun streaming query and fetch the query result with query metrics.\n\n```python\nimport os\nimport traceback\nimport json\nfrom pprint import pprint\n\nfrom timeplus import Query, Environment\n\napi_key = os.environ.get(\"TIMEPLUS_API_KEY\")\napi_address = os.environ.get(\"TIMEPLUS_HOST\")\nworkspace = os.environ.get(\"TIMEPLUS_WORKSPACE\")\n\n# Configure API key and address\nenv = Environment().address(api_address).workspace(workspace).apikey(api_key)\n\ntry:\n # list all qeuries\n query_list = Query(env=env).list()\n pprint(f\"there are {len(query_list)} queries \")\n\n # create a new query\n query = (\n Query(env=env).sql(query=\"SELECT * FROM car_live_data\")\n # .batching_policy(1000, 1000)\n .create()\n )\n\n pprint(f\"query with metadata {json.dumps(query.metadata())}\")\n\n # query header is the colume definitions of query result table\n # it is a list of name/value pair\n # for example : [{'name': 'in_use', 'type': 'bool'}, {'name': 'speed', 'type': 'float32'}]\n query_header = query.header()\n pprint(f\"query with header {query.header()}\")\n\n # iterate query result\n limit = 3\n count = 0\n\n # query.result() is an iterator which will pull all the query result in small batches\n # the iterator will continously pulling query result\n # for streaming query, the iterator will not end until user cancel the query\n for event in query.result():\n # metric event return result time query metrics\n # a sample metrics event:\n # {'count': 117, 'eps': 75, 'processing_time': 1560,\n # 'last_event_time': 1686237113265, 'response_time': 861,\n # 'scanned_rows': 117, 'scanned_bytes': 7605}\n if event.event == \"metrics\":\n pprint(json.loads(event.data))\n\n # message event contains query result which is an array of arrays\n # representing multiple query result rows\n # a sample message event:\n # [[True,-73.857],[False, 84.1]]\n if event.event == \"message\":\n pprint(json.loads(event.data))\n count += 1\n if count >= limit:\n break\n\n query.cancel()\n query.delete()\n\nexcept Exception as e:\n pprint(e)\n traceback.print_exc()\n```\n\n#### stream\n\nCreate/list/get/delete of streams\n\n```python\nimport os\nimport traceback\nimport json\nfrom pprint import pprint\n\nfrom timeplus import Stream, Environment\n\napi_key = os.environ.get(\"TIMEPLUS_API_KEY\")\napi_address = os.environ.get(\"TIMEPLUS_HOST\")\nworksapce = os.environ.get(\"TIMEPLUS_WORKSAPCE\")\n\n# Configure API key and address\nenv = Environment().address(api_address).apikey(api_key).workspace(worksapce)\n\ntry:\n # list all streams\n stream_list = Stream(env=env).list()\n pprint(f\"there are {len(stream_list)} streams \")\n\n # create a new stream\n stream = (\n Stream(env=env)\n .name(\"test\")\n .column(\"time\", \"datetime64(3)\")\n .column(\"data\", \"string\")\n .create()\n )\n\n stream_list = Stream(env=env).list()\n pprint(f\"there are {len(stream_list)} streams after create\")\n pprint(f\"created stream is {stream.metadata()}; type is {type(stream.metadata())}\")\n\n a_stream = Stream(env=env).name(\"test\").get()\n pprint(f\"get stream is {a_stream.metadata()} ; type is {type(a_stream.metadata())}\")\n\n stream.delete()\n stream_list = Stream(env=env).list()\n pprint(f\"there are {len(stream_list)} streams after delete\")\nexcept Exception as e:\n pprint(e)\n traceback.print_exc()\n\n```\n\n#### ingest\n\nIngest data into streams\n\n##### default ingest\n\n```python\nstream = (\n Stream(env=env)\n .name(\"test_ingest\")\n .column(\"time\", \"datetime64(3)\")\n .column(\"data\", \"string\")\n .create()\n )\n\nstream.ingest([\"time\", \"data\"], [[datetime.datetime.now(), \"abcd\"]])\n\n```\n\n##### ingest json streams\n\n```python\nstream = (\n Stream(env=env)\n .name(\"test_ingest\")\n .column(\"a\", \"integer\")\n .column(\"b\", \"string\")\n .create()\n )\n\npayload = \"\"\"\n{\"a\":2,\"b\":\"hello\"}\n{\"a\":1,\"b\":\"world\"}\n\"\"\"\n\nstream.ingest(payload=payload, format=\"streaming\")\n\n```\n\n##### ingest one raw event with multiple lines\n\n```python\nstream = Stream(env=env).name(\"test_ingest_raw\").column(\"raw\", \"string\").create()\n\npayload = \"\"\"\nfirst line\nsecond line\n\"\"\"\n\nstream.ingest(payload=payload, format=\"raw\")\n\n```\n\n##### ingest multiple lines json\n\n```python\nstream = Stream(env=env).name(\"test_ingest_lines\").column(\"raw\", \"string\").create()\npayload = '{\"a\":1,\"b\":\"world\"}\\n{\"a\":2,\"b\":\"hello\"}'\n\nstream.ingest(payload=payload, format=\"lines\")\n\n```\n\n## Examples\n\nMore sample code can be found [here](../examples/helloworld/)\n",
"bugtrack_url": null,
"license": "",
"summary": "Timeplus python SDK",
"version": "1.4.1",
"project_urls": {
"Bug Tracker": "https://github.com/timeplus-io/gluon/issues",
"Homepage": "https://github.com/timeplus-io/gluon/tree/develop/python"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "764136cde8204e431f98de520ffb96836af0edd3915c8b66472b454b45c9c66f",
"md5": "87fa73c1491d060c10b96f9118bc190a",
"sha256": "34961e1d9789d49e9a60caa42205b2b9d4d2acede4724037316c73c39c8d0268"
},
"downloads": -1,
"filename": "timeplus-1.4.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "87fa73c1491d060c10b96f9118bc190a",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3",
"size": 290340,
"upload_time": "2024-03-19T21:36:15",
"upload_time_iso_8601": "2024-03-19T21:36:15.015090Z",
"url": "https://files.pythonhosted.org/packages/76/41/36cde8204e431f98de520ffb96836af0edd3915c8b66472b454b45c9c66f/timeplus-1.4.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "37d868a19339bea2d6f691180ae13bb40df99efd08653e3b15fb67958a6bc75b",
"md5": "ef51276314486b59966dc9ec57701bee",
"sha256": "36db4c76e9898dd83dc64fe8520d772bd51e98286292aa0c62d7f4cde49662c7"
},
"downloads": -1,
"filename": "timeplus-1.4.1.tar.gz",
"has_sig": false,
"md5_digest": "ef51276314486b59966dc9ec57701bee",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3",
"size": 104181,
"upload_time": "2024-03-19T21:36:17",
"upload_time_iso_8601": "2024-03-19T21:36:17.655934Z",
"url": "https://files.pythonhosted.org/packages/37/d8/68a19339bea2d6f691180ae13bb40df99efd08653e3b15fb67958a6bc75b/timeplus-1.4.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-03-19 21:36:17",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "timeplus-io",
"github_project": "gluon",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "timeplus"
}