# influxdb-client-python
<!-- marker-index-start -->
[![CircleCI](https://circleci.com/gh/influxdata/influxdb-client-python.svg?style=svg)](https://circleci.com/gh/influxdata/influxdb-client-python)
[![codecov](https://codecov.io/gh/influxdata/influxdb-client-python/branch/master/graph/badge.svg)](https://codecov.io/gh/influxdata/influxdb-client-python)
[![CI status](https://img.shields.io/circleci/project/github/influxdata/influxdb-client-python/master.svg)](https://circleci.com/gh/influxdata/influxdb-client-python)
[![PyPI package](https://img.shields.io/pypi/v/influxdb-client.svg)](https://pypi.org/project/influxdb-client/)
[![Anaconda.org package](https://anaconda.org/influxdata/influxdb_client/badges/version.svg)](https://anaconda.org/influxdata/influxdb_client)
[![Supported Python versions](https://img.shields.io/pypi/pyversions/influxdb-client.svg)](https://pypi.python.org/pypi/influxdb-client)
[![Documentation status](https://readthedocs.org/projects/influxdb-client/badge/?version=stable)](https://influxdb-client.readthedocs.io/en/stable/)
[![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](https://www.influxdata.com/slack)
This repository contains the Python client library for use with InfluxDB 2.x and Flux. InfluxDB 3.x users should instead use the lightweight [v3 client library](https://github.com/InfluxCommunity/influxdb3-python).
InfluxDB 1.x users should use the [v1 client library](https://github.com/influxdata/influxdb-python).
For ease of migration and a consistent query and write experience, v2 users should consider using InfluxQL and the [v1 client library](https://github.com/influxdata/influxdb-python).
The API of the **influxdb-client-python** is not the backwards-compatible with the old one - **influxdb-python**.
## Documentation
This section contains links to the client library documentation.
- [Product documentation](https://docs.influxdata.com/influxdb/v2.0/tools/client-libraries/), [Getting Started](#getting-started)
- [Examples](https://github.com/influxdata/influxdb-client-python/tree/master/examples)
- [API Reference](https://influxdb-client.readthedocs.io/en/stable/api.html)
- [Changelog](https://github.com/influxdata/influxdb-client-python/blob/master/CHANGELOG.md)
## InfluxDB 2.0 client features
- Querying data
- using the Flux language
- into csv, raw data, [flux_table](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L33) structure, [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
- [How to query](#queries)
- Writing data using
- [Line Protocol](https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol)
- [Data Point](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py#L47)
- [RxPY](https://rxpy.readthedocs.io/en/latest/) Observable
- [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
- [How to write](#writes)
- [InfluxDB 2.0 API](https://github.com/influxdata/influxdb/blob/master/http/swagger.yml) client for management
- the client is generated from the [swagger](https://github.com/influxdata/influxdb/blob/master/http/swagger.yml) by using the [openapi-generator](https://github.com/OpenAPITools/openapi-generator)
- organizations & users management
- buckets management
- tasks management
- authorizations
- health check
- ...
- [InfluxDB 1.8 API compatibility](#influxdb-18-api-compatibility)
- Examples
- [Connect to InfluxDB Cloud](#connect-to-influxdb-cloud)
- [How to efficiently import large dataset](#how-to-efficiently-import-large-dataset)
- [Efficiency write data from IOT sensor](#efficiency-write-data-from-iot-sensor)
- [How to use Jupyter + Pandas + InfluxDB 2](#how-to-use-jupyter--pandas--influxdb-2)
- [Advanced Usage](#advanced-usage)
- [Gzip support](#gzip-support)
- [Proxy configuration](#proxy-configuration)
- [Nanosecond precision](#nanosecond-precision)
- [Delete data](#delete-data)
- [Handling Errors](#handling-errors)
- [Logging](#logging)
## Installation
InfluxDB python library uses [RxPY](https://github.com/ReactiveX/RxPY) - The Reactive Extensions for Python (RxPY).
**Python 3.7** or later is required.
:warning:
> It is recommended to use `ciso8601` with client for parsing dates. `ciso8601` is much faster than built-in Python datetime. Since it's written as a `C` module the best way is build it from sources:
**Windows**:
You have to install [Visual C++ Build Tools 2015](http://go.microsoft.com/fwlink/?LinkId=691126&fixForIE=.exe) to build `ciso8601` by `pip`.
**conda**:
Install from sources: `conda install -c conda-forge/label/cf202003 ciso8601`.
### pip install
The python package is hosted on [PyPI](https://pypi.org/project/influxdb-client/), you can install latest version directly:
``` sh
pip install 'influxdb-client[ciso]'
```
Then import the package:
``` python
import influxdb_client
```
If your application uses async/await in Python you can install with the `async` extra:
``` sh
$ pip install influxdb-client[async]
```
For more info see [How to use Asyncio](#how-to-use-asyncio).
### Setuptools
Install via [Setuptools](http://pypi.python.org/pypi/setuptools).
``` sh
python setup.py install --user
```
(or `sudo python setup.py install` to install the package for all users)
## Getting Started
Please follow the [Installation](#installation) and then run the following:
<!-- marker-query-start -->
``` python
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
bucket = "my-bucket"
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
write_api.write(bucket=bucket, record=p)
## using Table structure
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')
for table in tables:
print(table)
for row in table.records:
print (row.values)
## using csv library
csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)')
val_count = 0
for row in csv_result:
for cell in row:
val_count += 1
```
<!-- marker-query-end -->
## Client configuration
### Via File
A client can be configured via `*.ini` file in segment `influx2`.
The following options are supported:
- `url` - the url to connect to InfluxDB
- `org` - default destination organization for writes and queries
- `token` - the token to use for the authorization
- `timeout` - socket timeout in ms (default value is 10000)
- `verify_ssl` - set this to false to skip verifying SSL certificate when calling API from https server
- `ssl_ca_cert` - set this to customize the certificate file to verify the peer
- `cert_file` - path to the certificate that will be used for mTLS authentication
- `cert_key_file` - path to the file contains private key for mTLS certificate
- `cert_key_password` - string or function which returns password for decrypting the mTLS private key
- `connection_pool_maxsize` - set the number of connections to save that can be reused by urllib3
- `auth_basic` - enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)
- `profilers` - set the list of enabled [Flux profilers](https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/profiler/)
``` python
self.client = InfluxDBClient.from_config_file("config.ini")
```
``` ini
[influx2]
url=http://localhost:8086
org=my-org
token=my-token
timeout=6000
verify_ssl=False
```
### Via Environment Properties
A client can be configured via environment properties.
Supported properties are:
- `INFLUXDB_V2_URL` - the url to connect to InfluxDB
- `INFLUXDB_V2_ORG` - default destination organization for writes and queries
- `INFLUXDB_V2_TOKEN` - the token to use for the authorization
- `INFLUXDB_V2_TIMEOUT` - socket timeout in ms (default value is 10000)
- `INFLUXDB_V2_VERIFY_SSL` - set this to false to skip verifying SSL certificate when calling API from https server
- `INFLUXDB_V2_SSL_CA_CERT` - set this to customize the certificate file to verify the peer
- `INFLUXDB_V2_CERT_FILE` - path to the certificate that will be used for mTLS authentication
- `INFLUXDB_V2_CERT_KEY_FILE` - path to the file contains private key for mTLS certificate
- `INFLUXDB_V2_CERT_KEY_PASSWORD` - string or function which returns password for decrypting the mTLS private key
- `INFLUXDB_V2_CONNECTION_POOL_MAXSIZE` - set the number of connections to save that can be reused by urllib3
- `INFLUXDB_V2_AUTH_BASIC` - enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)
- `INFLUXDB_V2_PROFILERS` - set the list of enabled [Flux profilers](https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/profiler/)
``` python
self.client = InfluxDBClient.from_env_properties()
```
### Profile query
The [Flux Profiler package](https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/profiler/) provides performance profiling tools for Flux queries and operations.
You can enable printing profiler information of the Flux query in client
library by:
- set QueryOptions.profilers in QueryApi,
- set `INFLUXDB_V2_PROFILERS` environment variable,
- set `profilers` option in configuration file.
When the profiler is enabled, the result of flux query contains additional tables "profiler/". In order to have consistent behaviour with enabled/disabled profiler, `FluxCSVParser` excludes "profiler/" measurements from result.
Example how to enable profilers using API:
``` python
q = '''
from(bucket: stringParam)
|> range(start: -5m, stop: now())
|> filter(fn: (r) => r._measurement == "mem")
|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")
|> aggregateWindow(every: 1m, fn: mean)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
'''
p = {
"stringParam": "my-bucket",
}
query_api = client.query_api(query_options=QueryOptions(profilers=["query", "operator"]))
csv_result = query_api.query(query=q, params=p)
```
Example of a profiler output:
``` text
===============
Profiler: query
===============
from(bucket: stringParam)
|> range(start: -5m, stop: now())
|> filter(fn: (r) => r._measurement == "mem")
|> filter(fn: (r) => r._field == "available" or r._field == "free" or r._field == "used")
|> aggregateWindow(every: 1m, fn: mean)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
========================
Profiler: profiler/query
========================
result : _profiler
table : 0
_measurement : profiler/query
TotalDuration : 8924700
CompileDuration : 350900
QueueDuration : 33800
PlanDuration : 0
RequeueDuration : 0
ExecuteDuration : 8486500
Concurrency : 0
MaxAllocated : 2072
TotalAllocated : 0
flux/query-plan :
digraph {
ReadWindowAggregateByTime11
// every = 1m, aggregates = [mean], createEmpty = true, timeColumn = "_stop"
pivot8
generated_yield
ReadWindowAggregateByTime11 -> pivot8
pivot8 -> generated_yield
}
influxdb/scanned-bytes: 0
influxdb/scanned-values: 0
===========================
Profiler: profiler/operator
===========================
result : _profiler
table : 1
_measurement : profiler/operator
Type : *universe.pivotTransformation
Label : pivot8
Count : 3
MinDuration : 32600
MaxDuration : 126200
DurationSum : 193400
MeanDuration : 64466.666666666664
===========================
Profiler: profiler/operator
===========================
result : _profiler
table : 1
_measurement : profiler/operator
Type : *influxdb.readWindowAggregateSource
Label : ReadWindowAggregateByTime11
Count : 1
MinDuration : 940500
MaxDuration : 940500
DurationSum : 940500
MeanDuration : 940500.0
```
You can also use callback function to get profilers output. Return value of this callback is type of FluxRecord.
Example how to use profilers with callback:
``` python
class ProfilersCallback(object):
def __init__(self):
self.records = []
def __call__(self, flux_record):
self.records.append(flux_record.values)
callback = ProfilersCallback()
query_api = client.query_api(query_options=QueryOptions(profilers=["query", "operator"], profiler_callback=callback))
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')
for profiler in callback.records:
print(f'Custom processing of profiler result: {profiler}')
```
Example output of this callback:
``` text
Custom processing of profiler result: {'result': '_profiler', 'table': 0, '_measurement': 'profiler/query', 'TotalDuration': 18843792, 'CompileDuration': 1078666, 'QueueDuration': 93375, 'PlanDuration': 0, 'RequeueDuration': 0, 'ExecuteDuration': 17371000, 'Concurrency': 0, 'MaxAllocated': 448, 'TotalAllocated': 0, 'RuntimeErrors': None, 'flux/query-plan': 'digraph {\r\n ReadRange2\r\n generated_yield\r\n\r\n ReadRange2 -> generated_yield\r\n}\r\n\r\n', 'influxdb/scanned-bytes': 0, 'influxdb/scanned-values': 0}
Custom processing of profiler result: {'result': '_profiler', 'table': 1, '_measurement': 'profiler/operator', 'Type': '*influxdb.readFilterSource', 'Label': 'ReadRange2', 'Count': 1, 'MinDuration': 3274084, 'MaxDuration': 3274084, 'DurationSum': 3274084, 'MeanDuration': 3274084.0}
```
<!-- marker-index-end -->
## How to use
### Writes
<!-- marker-writes-start -->
The [WriteApi](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write_api.py) supports synchronous, asynchronous and batching writes into InfluxDB 2.0. The data should be passed as a [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/latest/write_protocols/line_protocol_tutorial/), [Data Point](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py) or Observable stream.
:warning:
> The `WriteApi` in batching mode (default mode) is supposed to run as a
singleton. To flush all your data you should wrap the execution using
`with client.write_api(...) as write_api:` statement or call
`write_api.close()` at the end of your script.
*The default instance of WriteApi use batching.*
#### The data could be written as
1. `string` or `bytes` that is formatted as a InfluxDB's line protocol
2. [Data Point](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py#L16) structure
3. Dictionary style mapping with keys: `measurement`, `tags`, `fields` and `time` or custom structure
4. [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple)
5. [Data Classes](https://docs.python.org/3/library/dataclasses.html)
6. [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
7. List of above items
8. A `batching` type of write also supports an `Observable` that produce one of an above item
You can find write examples at GitHub: [influxdb-client-python/examples](https://github.com/influxdata/influxdb-client-python/tree/master/examples#writes).
#### Batching
The batching is configurable by `write_options`:
| Property | Description | Default Value |
|----------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|
| **batch_size** | the number of data point to collect in a batch | `1000` |
| **flush_interval** | the number of milliseconds before the batch is written | `1000` |
| **jitter_interval** | the number of milliseconds to increase the batch flush interval by a random amount | `0` |
| **retry_interval** | the number of milliseconds to retry first unsuccessful write. The next retry delay is computed using exponential random backoff. The retry interval is used when the InfluxDB server does not specify \"Retry-After\" header. | `5000` |
| **max_retry_time** | maximum total retry timeout in milliseconds. | `180_000` |
| **max_retries** | the number of max retries when write fails | `5` |
| **max_retry_delay** | the maximum delay between each retry attempt in milliseconds | `125_000` |
| **max_close_wait** | the maximum amount of time to wait for batches to flush when `.close()` is called | `300_000` |
| **exponential_base** | the base for the exponential retry delay, the next delay is computed using random exponential backoff as a random value within the interval `retry_interval * exponential_base^(attempts-1)` and `retry_interval * exponential_base^(attempts)`. Example for `retry_interval=5_000, exponential_base=2, max_retry_delay=125_000, total=5` Retry delays are random distributed values within the ranges of `[5_000-10_000, 10_000-20_000, 20_000-40_000, 40_000-80_000, 80_000-125_000]` | `2` |
``` python
from datetime import datetime, timedelta, timezone
import pandas as pd
import reactivex as rx
from reactivex import operators as ops
from influxdb_client import InfluxDBClient, Point, WriteOptions
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as _client:
with _client.write_api(write_options=WriteOptions(batch_size=500,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
max_close_wait=300_000,
exponential_base=2)) as _write_client:
"""
Write Line Protocol formatted as string
"""
_write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1")
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2",
"h2o_feet,location=coyote_creek water_level=3.0 3"])
"""
Write Line Protocol formatted as byte array
"""
_write_client.write("my-bucket", "my-org", "h2o_feet,location=coyote_creek water_level=1.0 1".encode())
_write_client.write("my-bucket", "my-org", ["h2o_feet,location=coyote_creek water_level=2.0 2".encode(),
"h2o_feet,location=coyote_creek water_level=3.0 3".encode()])
"""
Write Dictionary-style object
"""
_write_client.write("my-bucket", "my-org", {"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"fields": {"water_level": 1.0}, "time": 1})
_write_client.write("my-bucket", "my-org", [{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"fields": {"water_level": 2.0}, "time": 2},
{"measurement": "h2o_feet", "tags": {"location": "coyote_creek"},
"fields": {"water_level": 3.0}, "time": 3}])
"""
Write Data Point
"""
_write_client.write("my-bucket", "my-org",
Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 4.0).time(4))
_write_client.write("my-bucket", "my-org",
[Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 5.0).time(5),
Point("h2o_feet").tag("location", "coyote_creek").field("water_level", 6.0).time(6)])
"""
Write Observable stream
"""
_data = rx \
.range(7, 11) \
.pipe(ops.map(lambda i: "h2o_feet,location=coyote_creek water_level={0}.0 {0}".format(i)))
_write_client.write("my-bucket", "my-org", _data)
"""
Write Pandas DataFrame
"""
_now = datetime.now(tz=timezone.utc)
_data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
index=[_now, _now + timedelta(hours=1)],
columns=["location", "water_level"])
_write_client.write("my-bucket", "my-org", record=_data_frame, data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location'])
```
#### Default Tags
Sometimes is useful to store same information in every measurement e.g. `hostname`, `location`, `customer`. The client is able to use static value or env property as a tag value.
The expressions:
- `California Miner` - static value
- `${env.hostname}` - environment property
##### Via API
``` python
point_settings = PointSettings()
point_settings.add_default_tag("id", "132-987-655")
point_settings.add_default_tag("customer", "California Miner")
point_settings.add_default_tag("data_center", "${env.data_center}")
self.write_client = self.client.write_api(write_options=SYNCHRONOUS, point_settings=point_settings)
```
``` python
self.write_client = self.client.write_api(write_options=SYNCHRONOUS,
point_settings=PointSettings(**{"id": "132-987-655",
"customer": "California Miner"}))
```
##### Via Configuration file
In an [init](https://docs.python.org/3/library/configparser.html) configuration file you are able to specify default tags by `tags` segment.
``` python
self.client = InfluxDBClient.from_config_file("config.ini")
```
```
[influx2]
url=http://localhost:8086
org=my-org
token=my-token
timeout=6000
[tags]
id = 132-987-655
customer = California Miner
data_center = ${env.data_center}
```
You can also use a [TOML](https://toml.io/en/) or a[JSON](https://www.json.org/json-en.html) format for the configuration file.
##### Via Environment Properties
You are able to specify default tags by environment properties with prefix `INFLUXDB_V2_TAG_`.
Examples:
- `INFLUXDB_V2_TAG_ID`
- `INFLUXDB_V2_TAG_HOSTNAME`
``` python
self.client = InfluxDBClient.from_env_properties()
```
#### Synchronous client
Data are writes in a synchronous HTTP request.
``` python
from influxdb_client import InfluxDBClient, Point
from influxdb_client .client.write_api import SYNCHRONOUS
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)
_point1 = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
_point2 = Point("my_measurement").tag("location", "New York").field("temperature", 24.3)
write_api.write(bucket="my-bucket", record=[_point1, _point2])
client.close()
```
<!-- marker-writes-end -->
### Queries
The result retrieved by [QueryApi](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/query_api.py) could be formatted as a:
1. Flux data structure: [FluxTable](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L5), [FluxColumn](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L22) and [FluxRecord](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L31)
2. `influxdb_client.client.flux_table.CSVIterator` which will iterate over CSV lines
3. Raw unprocessed results as a `str` iterator
4. [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
The API also support streaming `FluxRecord` via [query_stream](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/query_api.py#L77), see example below:
``` python
from influxdb_client import InfluxDBClient, Point, Dialect
from influxdb_client.client.write_api import SYNCHRONOUS
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
"""
Prepare data
"""
_point1 = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
_point2 = Point("my_measurement").tag("location", "New York").field("temperature", 24.3)
write_api.write(bucket="my-bucket", record=[_point1, _point2])
"""
Query: using Table structure
"""
tables = query_api.query('from(bucket:"my-bucket") |> range(start: -10m)')
for table in tables:
print(table)
for record in table.records:
print(record.values)
print()
print()
"""
Query: using Bind parameters
"""
p = {"_start": datetime.timedelta(hours=-1),
"_location": "Prague",
"_desc": True,
"_floatParam": 25.1,
"_every": datetime.timedelta(minutes=5)
}
tables = query_api.query('''
from(bucket:"my-bucket") |> range(start: _start)
|> filter(fn: (r) => r["_measurement"] == "my_measurement")
|> filter(fn: (r) => r["_field"] == "temperature")
|> filter(fn: (r) => r["location"] == _location and r["_value"] > _floatParam)
|> aggregateWindow(every: _every, fn: mean, createEmpty: true)
|> sort(columns: ["_time"], desc: _desc)
''', params=p)
for table in tables:
print(table)
for record in table.records:
print(str(record["_time"]) + " - " + record["location"] + ": " + str(record["_value"]))
print()
print()
"""
Query: using Stream
"""
records = query_api.query_stream('from(bucket:"my-bucket") |> range(start: -10m)')
for record in records:
print(f'Temperature in {record["location"]} is {record["_value"]}')
"""
Interrupt a stream after retrieve a required data
"""
large_stream = query_api.query_stream('from(bucket:"my-bucket") |> range(start: -100d)')
for record in large_stream:
if record["location"] == "New York":
print(f'New York temperature: {record["_value"]}')
break
large_stream.close()
print()
print()
"""
Query: using csv library
"""
csv_result = query_api.query_csv('from(bucket:"my-bucket") |> range(start: -10m)',
dialect=Dialect(header=False, delimiter=",", comment_prefix="#", annotations=[],
date_time_format="RFC3339"))
for csv_line in csv_result:
if not len(csv_line) == 0:
print(f'Temperature in {csv_line[9]} is {csv_line[6]}')
"""
Close client
"""
client.close()
```
#### Pandas DataFrame
<!-- marker-pandas-start -->
:warning:
> For DataFrame querying you should install Pandas dependency via `pip install 'influxdb-client[extra]'`.
:warning:
> Note that if a query returns more then one table than the client generates a `DataFrame` for each of them.
The `client` is able to retrieve data in [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html) format thought `query_data_frame`:
``` python
from influxdb_client import InfluxDBClient, Point, Dialect
from influxdb_client.client.write_api import SYNCHRONOUS
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
"""
Prepare data
"""
_point1 = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3)
_point2 = Point("my_measurement").tag("location", "New York").field("temperature", 24.3)
write_api.write(bucket="my-bucket", record=[_point1, _point2])
"""
Query: using Pandas DataFrame
"""
data_frame = query_api.query_data_frame('from(bucket:"my-bucket") '
'|> range(start: -10m) '
'|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") '
'|> keep(columns: ["location", "temperature"])')
print(data_frame.to_string())
"""
Close client
"""
client.close()
```
Output:
``` text
result table location temperature
0 _result 0 New York 24.3
1 _result 1 Prague 25.3
```
<!-- marker-pandas-end -->
### Examples
<!-- marker-examples-start -->
#### How to efficiently import large dataset
The following example shows how to import dataset with a dozen megabytes. If you would like to import gigabytes of data then
use our multiprocessing example: [import_data_set_multiprocessing.py](https://github.com/influxdata/influxdb-client-python/blob/master/examples/import_data_set_multiprocessing.py) for use a full capability of your hardware.
- sources - [import_data_set.py](https://github.com/influxdata/influxdb-client-python/blob/master/examples/import_data_set.py)
``` python
"""
Import VIX - CBOE Volatility Index - from "vix-daily.csv" file into InfluxDB 2.0
https://datahub.io/core/finance-vix#data
"""
from collections import OrderedDict
from csv import DictReader
import reactivex as rx
from reactivex import operators as ops
from influxdb_client import InfluxDBClient, Point, WriteOptions
def parse_row(row: OrderedDict):
"""Parse row of CSV file into Point with structure:
financial-analysis,type=ily close=18.47,high=19.82,low=18.28,open=19.82 1198195200000000000
CSV format:
Date,VIX Open,VIX High,VIX Low,VIX Close\n
2004-01-02,17.96,18.68,17.54,18.22\n
2004-01-05,18.45,18.49,17.44,17.49\n
2004-01-06,17.66,17.67,16.19,16.73\n
2004-01-07,16.72,16.75,15.5,15.5\n
2004-01-08,15.42,15.68,15.32,15.61\n
2004-01-09,16.15,16.88,15.57,16.75\n
...
:param row: the row of CSV file
:return: Parsed csv row to [Point]
"""
"""
For better performance is sometimes useful directly create a LineProtocol to avoid unnecessary escaping overhead:
"""
# from datetime import timezone
# import ciso8601
# from influxdb_client.client.write.point import EPOCH
#
# time = (ciso8601.parse_datetime(row["Date"]).replace(tzinfo=timezone.utc) - EPOCH).total_seconds() * 1e9
# return f"financial-analysis,type=vix-daily" \
# f" close={float(row['VIX Close'])},high={float(row['VIX High'])},low={float(row['VIX Low'])},open={float(row['VIX Open'])} " \
# f" {int(time)}"
return Point("financial-analysis") \
.tag("type", "vix-daily") \
.field("open", float(row['VIX Open'])) \
.field("high", float(row['VIX High'])) \
.field("low", float(row['VIX Low'])) \
.field("close", float(row['VIX Close'])) \
.time(row['Date'])
"""
Converts vix-daily.csv into sequence of datad point
"""
data = rx \
.from_iterable(DictReader(open('vix-daily.csv', 'r'))) \
.pipe(ops.map(lambda row: parse_row(row)))
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=True)
"""
Create client that writes data in batches with 50_000 items.
"""
write_api = client.write_api(write_options=WriteOptions(batch_size=50_000, flush_interval=10_000))
"""
Write data into InfluxDB
"""
write_api.write(bucket="my-bucket", record=data)
write_api.close()
"""
Querying max value of CBOE Volatility Index
"""
query = 'from(bucket:"my-bucket")' \
' |> range(start: 0, stop: now())' \
' |> filter(fn: (r) => r._measurement == "financial-analysis")' \
' |> max()'
result = client.query_api().query(query=query)
"""
Processing results
"""
print()
print("=== results ===")
print()
for table in result:
for record in table.records:
print('max {0:5} = {1}'.format(record.get_field(), record.get_value()))
"""
Close client
"""
client.close()
```
#### Efficiency write data from IOT sensor
- sources - [iot_sensor.py](https://github.com/influxdata/influxdb-client-python/blob/master/examples/iot_sensor.py)
``` python
"""
Efficiency write data from IOT sensor - write changed temperature every minute
"""
import atexit
import platform
from datetime import timedelta
import psutil as psutil
import reactivex as rx
from reactivex import operators as ops
from influxdb_client import InfluxDBClient, WriteApi, WriteOptions
def on_exit(db_client: InfluxDBClient, write_api: WriteApi):
"""Close clients after terminate a script.
:param db_client: InfluxDB client
:param write_api: WriteApi
:return: nothing
"""
write_api.close()
db_client.close()
def sensor_temperature():
"""Read a CPU temperature. The [psutil] doesn't support MacOS so we use [sysctl].
:return: actual CPU temperature
"""
os_name = platform.system()
if os_name == 'Darwin':
from subprocess import check_output
output = check_output(["sysctl", "machdep.xcpm.cpu_thermal_level"])
import re
return re.findall(r'\d+', str(output))[0]
else:
return psutil.sensors_temperatures()["coretemp"][0]
def line_protocol(temperature):
"""Create a InfluxDB line protocol with structure:
iot_sensor,hostname=mine_sensor_12,type=temperature value=68
:param temperature: the sensor temperature
:return: Line protocol to write into InfluxDB
"""
import socket
return 'iot_sensor,hostname={},type=temperature value={}'.format(socket.gethostname(), temperature)
"""
Read temperature every minute; distinct_until_changed - produce only if temperature change
"""
data = rx\
.interval(period=timedelta(seconds=60))\
.pipe(ops.map(lambda t: sensor_temperature()),
ops.distinct_until_changed(),
ops.map(lambda temperature: line_protocol(temperature)))
_db_client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", debug=True)
"""
Create client that writes data into InfluxDB
"""
_write_api = _db_client.write_api(write_options=WriteOptions(batch_size=1))
_write_api.write(bucket="my-bucket", record=data)
"""
Call after terminate a script
"""
atexit.register(on_exit, _db_client, _write_api)
input()
```
#### Connect to InfluxDB Cloud
The following example demonstrate the simplest way how to write and query date with the InfluxDB Cloud.
At first point you should create an authentication token as is described [here](https://v2.docs.influxdata.com/v2.0/security/tokens/create-token/).
After that you should configure properties: `influx_cloud_url`,`influx_cloud_token`, `bucket` and `org` in a `influx_cloud.py` example.
The last step is run a python script via: `python3 influx_cloud.py`.
- sources - [influx_cloud.py](https://github.com/influxdata/influxdb-client-python/blob/master/examples/influx_cloud.py)
``` python
"""
Connect to InfluxDB 2.0 - write data and query them
"""
from datetime import datetime, timezone
from influxdb_client import Point, InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
"""
Configure credentials
"""
influx_cloud_url = 'https://us-west-2-1.aws.cloud2.influxdata.com'
influx_cloud_token = '...'
bucket = '...'
org = '...'
client = InfluxDBClient(url=influx_cloud_url, token=influx_cloud_token)
try:
kind = 'temperature'
host = 'host1'
device = 'opt-123'
"""
Write data by Point structure
"""
point = Point(kind).tag('host', host).tag('device', device).field('value', 25.3).time(time=datetime.now(tz=timezone.utc))
print(f'Writing to InfluxDB cloud: {point.to_line_protocol()} ...')
write_api = client.write_api(write_options=SYNCHRONOUS)
write_api.write(bucket=bucket, org=org, record=point)
print()
print('success')
print()
print()
"""
Query written data
"""
query = f'from(bucket: "{bucket}") |> range(start: -1d) |> filter(fn: (r) => r._measurement == "{kind}")'
print(f'Querying from InfluxDB cloud: "{query}" ...')
print()
query_api = client.query_api()
tables = query_api.query(query=query, org=org)
for table in tables:
for row in table.records:
print(f'{row.values["_time"]}: host={row.values["host"]},device={row.values["device"]} '
f'{row.values["_value"]} °C')
print()
print('success')
except Exception as e:
print(e)
finally:
client.close()
```
#### How to use Jupyter + Pandas + InfluxDB 2
The first example shows how to use client capabilities to predict stock price via [Keras](https://keras.io), [TensorFlow](https://www.tensorflow.org), [sklearn](https://scikit-learn.org/stable/):
The example is taken from [Kaggle](https://www.kaggle.com/chaitanyacc4/predicting-stock-prices-of-apple-inc).
- sources - [stock-predictions.ipynb](notebooks/stock-predictions.ipynb)
![image](https://raw.githubusercontent.com/influxdata/influxdb-client-python/master/docs/images/stock-price-prediction.gif)
Result:
![image](https://raw.githubusercontent.com/influxdata/influxdb-client-python/master/docs/images/stock-price-prediction-results.png)
The second example shows how to use client capabilities to realtime visualization via [hvPlot](https://hvplot.pyviz.org), [Streamz](https://streamz.readthedocs.io/en/latest/), [RxPY](https://rxpy.readthedocs.io/en/latest/):
- sources - [realtime-stream.ipynb](notebooks/realtime-stream.ipynb)
![image](https://raw.githubusercontent.com/influxdata/influxdb-client-python/master/docs/images/realtime-result.gif)
#### Other examples
You can find all examples at GitHub: [influxdb-client-python/examples](https://github.com/influxdata/influxdb-client-python/tree/master/examples#examples).
<!-- marker-examples-end -->
## Advanced Usage
### Gzip support
<!-- marker-gzip-start -->
`InfluxDBClient` does not enable gzip compression for http requests by default. If you want to enable gzip to reduce transfer data's size, you can call:
``` python
from influxdb_client import InfluxDBClient
_db_client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", enable_gzip=True)
```
<!-- marker-gzip-end -->
### Authenticate to the InfluxDB
<!-- marker-authenticate-start -->
`InfluxDBClient` supports three options how to authorize a connection:
- _Token_
- _Username & Password_
- _HTTP Basic_
#### Token
Use the `token` to authenticate to the InfluxDB API. In your API requests, an _Authorization_ header will be sent. The header value, provide the word _Token_ followed by a space and an InfluxDB API token. The word _token_ is case-sensitive.
``` python
from influxdb_client import InfluxDBClient
with InfluxDBClient(url="http://localhost:8086", token="my-token") as client
```
:warning:
> Note that this is a preferred way how to authenticate to InfluxDB API.
#### Username & Password
Authenticates via username and password credentials. If successful, creates a new session for the user.
``` python
from influxdb_client import InfluxDBClient
with InfluxDBClient(url="http://localhost:8086", username="my-user", password="my-password") as client
```
:warning:
> The `username/password` auth is based on the HTTP "Basic" authentication. The authorization expires when the [time-to-live (TTL)](https://docs.influxdata.com/influxdb/latest/reference/config-options/#session-length) (default 60 minutes) is reached and client produces `unauthorized exception`.
#### HTTP Basic
Use this to enable basic authentication when talking to a InfluxDB 1.8.x that does not use auth-enabled but is protected by a reverse proxy with basic authentication.
``` python
from influxdb_client import InfluxDBClient
with InfluxDBClient(url="http://localhost:8086", auth_basic=True, token="my-proxy-secret") as client
```
:warning:
> Don't use this when directly talking to InfluxDB 2.
<!-- marker-authenticate-end -->
### Proxy configuration
<!-- marker-proxy-start -->
You can configure the client to tunnel requests through an HTTP proxy. The following proxy options are supported:
- `proxy` - Set this to configure the http proxy to be used, ex. `http://localhost:3128`
- `proxy_headers` - A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
``` python
from influxdb_client import InfluxDBClient
with InfluxDBClient(url="http://localhost:8086",
token="my-token",
org="my-org",
proxy="http://localhost:3128") as client:
```
If your proxy notify the client with permanent redirect (`HTTP 301`) to **different host**. The client removes `Authorization` header, because otherwise the contents of `Authorization` is sent to third parties which is a security vulnerability.
You can change this behaviour by:
``` python
from urllib3 import Retry
Retry.DEFAULT_REMOVE_HEADERS_ON_REDIRECT = frozenset()
Retry.DEFAULT.remove_headers_on_redirect = Retry.DEFAULT_REMOVE_HEADERS_ON_REDIRECT
```
<!-- marker-proxy-end -->
### Delete data
<!-- marker-delete-start -->
The [delete_api.py](influxdb_client/client/delete_api.py) supports deletes [points](https://v2.docs.influxdata.com/v2.0/reference/glossary/#point) from an InfluxDB bucket.
``` python
from influxdb_client import InfluxDBClient
client = InfluxDBClient(url="http://localhost:8086", token="my-token")
delete_api = client.delete_api()
"""
Delete Data
"""
start = "1970-01-01T00:00:00Z"
stop = "2021-02-01T00:00:00Z"
delete_api.delete(start, stop, '_measurement="my_measurement"', bucket='my-bucket', org='my-org')
"""
Close client
"""
client.close()
```
<!-- marker-delete-end -->
### InfluxDB 1.8 API compatibility
[InfluxDB 1.8.0 introduced forward compatibility APIs](https://docs.influxdata.com/influxdb/v1.8/tools/api/#influxdb-2-0-api-compatibility-endpoints) for InfluxDB 2.0. This allows you to easily move from InfluxDB 1.x to InfluxDB 2.0 Cloud or open source.
The following forward compatible APIs are available:
| API | Endpoint | Description |
|-----------------------------------------------------|------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| [query_api.py](influxdb_client/client/query_api.py) | [/api/v2/query](https://docs.influxdata.com/influxdb/v1.8/tools/api/#apiv2query-http-endpoint) | Query data in InfluxDB 1.8.0+ using the InfluxDB 2.0 API and [Flux](https://docs.influxdata.com/flux/latest/) (endpoint should be enabled by [flux-enabled option](https://docs.influxdata.com/influxdb/v1.8/administration/config/#flux-enabled-false)) |
| [write_api.py](influxdb_client/client/write_api.py) | [/api/v2/write](https://docs.influxdata.com/influxdb/v1.8/tools/api/#apiv2write-http-endpoint) | Write data to InfluxDB 1.8.0+ using the InfluxDB 2.0 API |
| [ping()](influxdb_client/client/influxdb_client.py) | [/ping](https://docs.influxdata.com/influxdb/v1.8/tools/api/#ping-http-endpoint) | Check the status of your InfluxDB instance |
For detail info see [InfluxDB 1.8 example](examples/influxdb_18_example.py).
### Handling Errors
<!-- marker-handling-errors-start -->
Errors happen, and it's important that your code is prepared for them. All client related exceptions are delivered from `InfluxDBError`.
If the exception cannot be recovered in the client it is returned to the application. These exceptions are left for the developer to handle.
Almost all APIs directly return unrecoverable exceptions to be handled this way:
``` python
from influxdb_client import InfluxDBClient
from influxdb_client.client.exceptions import InfluxDBError
from influxdb_client.client.write_api import SYNCHRONOUS
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
try:
client.write_api(write_options=SYNCHRONOUS).write("my-bucket", record="mem,tag=a value=86")
except InfluxDBError as e:
if e.response.status == 401:
raise Exception(f"Insufficient write permissions to 'my-bucket'.") from e
raise
```
The only exception is **batching** `WriteAPI` (for more info see [Batching](#batching)) where you need to register custom callbacks to handle batch events.
This is because this API runs in the `background` in a `separate` thread and isn't possible to directly return underlying exceptions.
``` python
from influxdb_client import InfluxDBClient
from influxdb_client.client.exceptions import InfluxDBError
class BatchingCallback(object):
def success(self, conf: (str, str, str), data: str):
print(f"Written batch: {conf}, data: {data}")
def error(self, conf: (str, str, str), data: str, exception: InfluxDBError):
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
callback = BatchingCallback()
with client.write_api(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry) as write_api:
pass
```
#### HTTP Retry Strategy
By default, the client uses a retry strategy only for batching writes (for more info see [Batching](#batching)).
For other HTTP requests there is no one retry strategy, but it could be configured by `retries` parameter of `InfluxDBClient`.
For more info about how configure HTTP retry see details in [urllib3 documentation](https://urllib3.readthedocs.io/en/latest/reference/index.html?highlight=retry#urllib3.Retry).
``` python
from urllib3 import Retry
from influxdb_client import InfluxDBClient
retries = Retry(connect=5, read=2, redirect=5)
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org", retries=retries)
```
<!-- marker-handling-errors-end -->
### Nanosecond precision
<!-- marker-nanosecond-start -->
The Python's [datetime](https://docs.python.org/3/library/datetime.html) doesn't support precision with nanoseconds so the library during writes and queries ignores everything after microseconds.
If you would like to use `datetime` with nanosecond precision you should use [pandas.Timestamp](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Timestamp.html#pandas.Timestamp) that is replacement for python `datetime.datetime` object, and also you should set a proper `DateTimeHelper` to the client.
- sources - [nanosecond_precision.py](https://github.com/influxdata/influxdb-client-python/blob/master/examples/nanosecond_precision.py)
``` python
from influxdb_client import Point, InfluxDBClient
from influxdb_client.client.util.date_utils_pandas import PandasDateTimeHelper
from influxdb_client.client.write_api import SYNCHRONOUS
"""
Set PandasDate helper which supports nanoseconds.
"""
import influxdb_client.client.util.date_utils as date_utils
date_utils.date_helper = PandasDateTimeHelper()
"""
Prepare client.
"""
client = InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org")
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
"""
Prepare data
"""
point = Point("h2o_feet") \
.field("water_level", 10) \
.tag("location", "pacific") \
.time('1996-02-25T21:20:00.001001231Z')
print(f'Time serialized with nanosecond precision: {point.to_line_protocol()}')
print()
write_api.write(bucket="my-bucket", record=point)
"""
Query: using Stream
"""
query = '''
from(bucket:"my-bucket")
|> range(start: 0, stop: now())
|> filter(fn: (r) => r._measurement == "h2o_feet")
'''
records = query_api.query_stream(query)
for record in records:
print(f'Temperature in {record["location"]} is {record["_value"]} at time: {record["_time"]}')
"""
Close client
"""
client.close()
```
<!-- marker-nanosecond-end -->
### How to use Asyncio
<!-- marker-asyncio-start -->
Starting from version 1.27.0 for Python 3.7+ the `influxdb-client` package supports `async/await` based on [asyncio](https://docs.python.org/3/library/asyncio.html), [aiohttp](https://docs.aiohttp.org) and [aiocsv](https://pypi.org/project/aiocsv/).
You can install `aiohttp` and `aiocsv` directly:
> ``` bash
> $ python -m pip install influxdb-client aiohttp aiocsv
> ```
or use the `[async]` extra:
> ``` bash
> $ python -m pip install influxdb-client[async]
> ```
:warning:
> The `InfluxDBClientAsync` should be initialised inside `async coroutine` otherwise there can be unexpected behaviour. For more info see: [Why is creating a ClientSession outside an event loop dangerous?](https://docs.aiohttp.org/en/stable/faq.html#why-is-creating-a-clientsession-outside-of-an-event-loop-dangerous).
#### Async APIs
All async APIs are available via `influxdb_client.client.influxdb_client_async.InfluxDBClientAsync`. The `async` version of the client supports following asynchronous APIs:
- `influxdb_client.client.write_api_async.WriteApiAsync`
- `influxdb_client.client.query_api_async.QueryApiAsync`
- `influxdb_client.client.delete_api_async.DeleteApiAsync`
- Management services into `influxdb_client.service` supports async
operation
and also check to readiness of the InfluxDB via `/ping` endpoint:
The `InfluxDBClientAsync` constructor accepts a number of __configuration properties__. Most useful among these are:
* `connection_pool_maxsize` - The total number of simultaneous connections. Defaults to `multiprocessing.cpu_count() * 5`.
* `enable_gzip` - enable gzip compression during `write` and `query` calls. Defaults to `false`.
* `proxy` - URL of an HTTP proxy to be used.
* `timeout` - The maximum number of milliseconds for handling HTTP requests from initial handshake to handling response data. This is passed directly to the underlying transport library. If large amounts of data are anticipated, for example from `query_api.query_stream(...)`, this should be increased to avoid `TimeoutError` or `CancelledError`. Defaults to 10_000 ms.
> ``` python
> import asyncio
>
> from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
>
>
> async def main():
> async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:
> ready = await client.ping()
> print(f"InfluxDB: {ready}")
>
>
> if __name__ == "__main__":
> asyncio.run(main())
> ```
#### Async Write API
The `influxdb_client.client.write_api_async.WriteApiAsync` supports ingesting data as:
- `string` or `bytes` that is formatted as a InfluxDB\'s line protocol
- [Data Point](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py#L16) structure
- Dictionary style mapping with keys: `measurement`, `tags`, `fields` and `time` or custom structure
- [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple)
- [Data Classes](https://docs.python.org/3/library/dataclasses.html)
- [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
- List of above items
> ``` python
> import asyncio
>
> from influxdb_client import Point
> from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
>
>
> async def main():
> async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:
>
> write_api = client.write_api()
>
> _point1 = Point("async_m").tag("location", "Prague").field("temperature", 25.3)
> _point2 = Point("async_m").tag("location", "New York").field("temperature", 24.3)
>
> successfully = await write_api.write(bucket="my-bucket", record=[_point1, _point2])
>
> print(f" > successfully: {successfully}")
>
>
> if __name__ == "__main__":
> asyncio.run(main())
> ```
#### Async Query API
The `influxdb_client.client.query_api_async.QueryApiAsync` supports retrieve data as:
- List of `influxdb_client.client.flux_table.FluxTable`
- Stream of `influxdb_client.client.flux_table.FluxRecord` via `typing.AsyncGenerator`
- [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)
- Stream of [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html) via `typing.AsyncGenerator`
- Raw `str` output
> ``` python
> import asyncio
>
> from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
>
>
> async def main():
> async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:
> # Stream of FluxRecords
> query_api = client.query_api()
> records = await query_api.query_stream('from(bucket:"my-bucket") '
> '|> range(start: -10m) '
> '|> filter(fn: (r) => r["_measurement"] == "async_m")')
> async for record in records:
> print(record)
>
>
> if __name__ == "__main__":
> asyncio.run(main())
> ```
#### Async Delete API
> ``` python
> import asyncio
> from datetime import datetime
>
> from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
>
>
> async def main():
> async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:
> start = datetime.fromtimestamp(0)
> stop = datetime.now()
> # Delete data with location = 'Prague'
> successfully = await client.delete_api().delete(start=start, stop=stop, bucket="my-bucket",
> predicate="location = \"Prague\"")
> print(f" > successfully: {successfully}")
>
>
> if __name__ == "__main__":
> asyncio.run(main())
> ```
#### Management API
> ``` python
> import asyncio
>
> from influxdb_client import OrganizationsService
> from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
>
>
> async def main():
> async with InfluxDBClientAsync(url='http://localhost:8086', token='my-token', org='my-org') as client:
> # Initialize async OrganizationsService
> organizations_service = OrganizationsService(api_client=client.api_client)
>
> # Find organization with name 'my-org'
> organizations = await organizations_service.get_orgs(org='my-org')
> for organization in organizations.orgs:
> print(f'name: {organization.name}, id: {organization.id}')
>
>
> if __name__ == "__main__":
> asyncio.run(main())
> ```
#### Proxy and redirects
You can configure the client to tunnel requests through an HTTP proxy.
The following proxy options are supported:
- `proxy` - Set this to configure the http proxy to be used, ex. `http://localhost:3128`
- `proxy_headers` - A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.
``` python
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
async with InfluxDBClientAsync(url="http://localhost:8086",
token="my-token",
org="my-org",
proxy="http://localhost:3128") as client:
```
If your proxy notify the client with permanent redirect (`HTTP 301`) to **different host**.
The client removes `Authorization` header, because otherwise the contents of `Authorization` is sent to third parties which is a security vulnerability.
Client automatically follows HTTP redirects. The default redirect policy is to follow up to `10` consecutive requests.
The redirects can be configured via:
- `allow_redirects` - If set to `False`, do not follow HTTP redirects.
`True` by default.
- `max_redirects` - Maximum number of HTTP redirects to follow. `10`
by default.
<!-- marker-asyncio-end -->
### Logging
<!-- marker-logging-start -->
The client uses Python's [logging](https://docs.python.org/3/library/logging.html) facility for logging the library activity. The following logger categories are
exposed:
- `influxdb_client.client.influxdb_client`
- `influxdb_client.client.influxdb_client_async`
- `influxdb_client.client.write_api`
- `influxdb_client.client.write_api_async`
- `influxdb_client.client.write.retry`
- `influxdb_client.client.write.dataframe_serializer`
- `influxdb_client.client.util.multiprocessing_helper`
- `influxdb_client.client.http`
- `influxdb_client.client.exceptions`
The default logging level is `warning` without configured logger output. You can use the standard logger interface to change the log level and handler:
``` python
import logging
import sys
from influxdb_client import InfluxDBClient
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
for _, logger in client.conf.loggers.items():
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
```
#### Debugging
For debug purpose you can enable verbose logging of HTTP requests and set the `debug` level to all client's logger categories by:
``` python
client = InfluxDBClient(url="http://localhost:8086", token="my-token", debug=True)
```
Both HTTP request headers and body will be logged to standard output.
<!-- marker-logging-end -->
## Local tests
``` console
# start/restart InfluxDB2 on local machine using docker
./scripts/influxdb-restart.sh
# install requirements
pip install -e . --user
pip install -e .\[extra\] --user
pip install -e .\[test\] --user
# run unit & integration tests
pytest tests
```
## Contributing
Bug reports and pull requests are welcome on GitHub at <https://github.com/influxdata/influxdb-client-python>.
## License
The gem is available as open source under the terms of the [MIT License](https://opensource.org/licenses/MIT).
Raw data
{
"_id": null,
"home_page": "https://github.com/influxdata/influxdb-client-python",
"name": "influxdb-client",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.7",
"maintainer_email": null,
"keywords": "InfluxDB, InfluxDB Python Client",
"author": null,
"author_email": null,
"download_url": "https://files.pythonhosted.org/packages/11/47/b756380917cb4b968bd871fc006128e2cc9897fb1ab4bcf7d108f9601e78/influxdb_client-1.48.0.tar.gz",
"platform": null,
"description": "# influxdb-client-python\n\n<!-- marker-index-start -->\n\n[![CircleCI](https://circleci.com/gh/influxdata/influxdb-client-python.svg?style=svg)](https://circleci.com/gh/influxdata/influxdb-client-python)\n[![codecov](https://codecov.io/gh/influxdata/influxdb-client-python/branch/master/graph/badge.svg)](https://codecov.io/gh/influxdata/influxdb-client-python)\n[![CI status](https://img.shields.io/circleci/project/github/influxdata/influxdb-client-python/master.svg)](https://circleci.com/gh/influxdata/influxdb-client-python)\n[![PyPI package](https://img.shields.io/pypi/v/influxdb-client.svg)](https://pypi.org/project/influxdb-client/)\n[![Anaconda.org package](https://anaconda.org/influxdata/influxdb_client/badges/version.svg)](https://anaconda.org/influxdata/influxdb_client)\n[![Supported Python versions](https://img.shields.io/pypi/pyversions/influxdb-client.svg)](https://pypi.python.org/pypi/influxdb-client)\n[![Documentation status](https://readthedocs.org/projects/influxdb-client/badge/?version=stable)](https://influxdb-client.readthedocs.io/en/stable/)\n[![Slack Status](https://img.shields.io/badge/slack-join_chat-white.svg?logo=slack&style=social)](https://www.influxdata.com/slack)\n\nThis repository contains the Python client library for use with InfluxDB 2.x and Flux. InfluxDB 3.x users should instead use the lightweight [v3 client library](https://github.com/InfluxCommunity/influxdb3-python).\nInfluxDB 1.x users should use the [v1 client library](https://github.com/influxdata/influxdb-python).\n\nFor ease of migration and a consistent query and write experience, v2 users should consider using InfluxQL and the [v1 client library](https://github.com/influxdata/influxdb-python).\n\nThe API of the **influxdb-client-python** is not the backwards-compatible with the old one - **influxdb-python**.\n\n## Documentation\n\nThis section contains links to the client library documentation.\n\n- [Product documentation](https://docs.influxdata.com/influxdb/v2.0/tools/client-libraries/), [Getting Started](#getting-started)\n- [Examples](https://github.com/influxdata/influxdb-client-python/tree/master/examples)\n- [API Reference](https://influxdb-client.readthedocs.io/en/stable/api.html)\n- [Changelog](https://github.com/influxdata/influxdb-client-python/blob/master/CHANGELOG.md)\n\n## InfluxDB 2.0 client features\n\n- Querying data\n - using the Flux language\n - into csv, raw data, [flux_table](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L33) structure, [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)\n - [How to query](#queries)\n- Writing data using\n - [Line Protocol](https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol)\n - [Data Point](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py#L47)\n - [RxPY](https://rxpy.readthedocs.io/en/latest/) Observable\n - [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)\n - [How to write](#writes)\n- [InfluxDB 2.0 API](https://github.com/influxdata/influxdb/blob/master/http/swagger.yml) client for management\n - the client is generated from the [swagger](https://github.com/influxdata/influxdb/blob/master/http/swagger.yml) by using the [openapi-generator](https://github.com/OpenAPITools/openapi-generator)\n - organizations & users management\n - buckets management\n - tasks management\n - authorizations\n - health check\n - ...\n- [InfluxDB 1.8 API compatibility](#influxdb-18-api-compatibility)\n- Examples\n - [Connect to InfluxDB Cloud](#connect-to-influxdb-cloud)\n - [How to efficiently import large dataset](#how-to-efficiently-import-large-dataset)\n - [Efficiency write data from IOT sensor](#efficiency-write-data-from-iot-sensor)\n - [How to use Jupyter + Pandas + InfluxDB 2](#how-to-use-jupyter--pandas--influxdb-2)\n- [Advanced Usage](#advanced-usage)\n - [Gzip support](#gzip-support)\n - [Proxy configuration](#proxy-configuration)\n - [Nanosecond precision](#nanosecond-precision)\n - [Delete data](#delete-data)\n - [Handling Errors](#handling-errors)\n - [Logging](#logging)\n\n## Installation\n\nInfluxDB python library uses [RxPY](https://github.com/ReactiveX/RxPY) - The Reactive Extensions for Python (RxPY).\n\n**Python 3.7** or later is required.\n\n:warning:\n> It is recommended to use `ciso8601` with client for parsing dates. `ciso8601` is much faster than built-in Python datetime. Since it's written as a `C` module the best way is build it from sources:\n\n**Windows**:\n\nYou have to install [Visual C++ Build Tools 2015](http://go.microsoft.com/fwlink/?LinkId=691126&fixForIE=.exe) to build `ciso8601` by `pip`.\n\n**conda**:\n\nInstall from sources: `conda install -c conda-forge/label/cf202003 ciso8601`.\n\n### pip install\n\nThe python package is hosted on [PyPI](https://pypi.org/project/influxdb-client/), you can install latest version directly:\n\n``` sh\npip install 'influxdb-client[ciso]'\n```\n\nThen import the package:\n\n``` python\nimport influxdb_client\n```\n\nIf your application uses async/await in Python you can install with the `async` extra:\n\n``` sh\n$ pip install influxdb-client[async]\n```\n\nFor more info see [How to use Asyncio](#how-to-use-asyncio).\n\n### Setuptools\n\nInstall via [Setuptools](http://pypi.python.org/pypi/setuptools).\n\n``` sh\npython setup.py install --user\n```\n\n(or `sudo python setup.py install` to install the package for all users)\n\n## Getting Started\n\nPlease follow the [Installation](#installation) and then run the following:\n\n<!-- marker-query-start -->\n\n``` python\nfrom influxdb_client import InfluxDBClient, Point\nfrom influxdb_client.client.write_api import SYNCHRONOUS\n\nbucket = \"my-bucket\"\n\nclient = InfluxDBClient(url=\"http://localhost:8086\", token=\"my-token\", org=\"my-org\")\n\nwrite_api = client.write_api(write_options=SYNCHRONOUS)\nquery_api = client.query_api()\n\np = Point(\"my_measurement\").tag(\"location\", \"Prague\").field(\"temperature\", 25.3)\n\nwrite_api.write(bucket=bucket, record=p)\n\n## using Table structure\ntables = query_api.query('from(bucket:\"my-bucket\") |> range(start: -10m)')\n\nfor table in tables:\n print(table)\n for row in table.records:\n print (row.values)\n\n\n## using csv library\ncsv_result = query_api.query_csv('from(bucket:\"my-bucket\") |> range(start: -10m)')\nval_count = 0\nfor row in csv_result:\n for cell in row:\n val_count += 1\n```\n\n<!-- marker-query-end -->\n\n## Client configuration\n\n### Via File\n\nA client can be configured via `*.ini` file in segment `influx2`.\n\nThe following options are supported:\n\n- `url` - the url to connect to InfluxDB\n- `org` - default destination organization for writes and queries\n- `token` - the token to use for the authorization\n- `timeout` - socket timeout in ms (default value is 10000)\n- `verify_ssl` - set this to false to skip verifying SSL certificate when calling API from https server\n- `ssl_ca_cert` - set this to customize the certificate file to verify the peer\n- `cert_file` - path to the certificate that will be used for mTLS authentication\n- `cert_key_file` - path to the file contains private key for mTLS certificate\n- `cert_key_password` - string or function which returns password for decrypting the mTLS private key\n- `connection_pool_maxsize` - set the number of connections to save that can be reused by urllib3\n- `auth_basic` - enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)\n- `profilers` - set the list of enabled [Flux profilers](https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/profiler/)\n\n``` python\nself.client = InfluxDBClient.from_config_file(\"config.ini\")\n```\n\n``` ini\n[influx2]\nurl=http://localhost:8086\norg=my-org\ntoken=my-token\ntimeout=6000\nverify_ssl=False\n```\n\n### Via Environment Properties\n\nA client can be configured via environment properties.\n\nSupported properties are:\n\n- `INFLUXDB_V2_URL` - the url to connect to InfluxDB\n- `INFLUXDB_V2_ORG` - default destination organization for writes and queries\n- `INFLUXDB_V2_TOKEN` - the token to use for the authorization\n- `INFLUXDB_V2_TIMEOUT` - socket timeout in ms (default value is 10000)\n- `INFLUXDB_V2_VERIFY_SSL` - set this to false to skip verifying SSL certificate when calling API from https server\n- `INFLUXDB_V2_SSL_CA_CERT` - set this to customize the certificate file to verify the peer\n- `INFLUXDB_V2_CERT_FILE` - path to the certificate that will be used for mTLS authentication\n- `INFLUXDB_V2_CERT_KEY_FILE` - path to the file contains private key for mTLS certificate\n- `INFLUXDB_V2_CERT_KEY_PASSWORD` - string or function which returns password for decrypting the mTLS private key\n- `INFLUXDB_V2_CONNECTION_POOL_MAXSIZE` - set the number of connections to save that can be reused by urllib3\n- `INFLUXDB_V2_AUTH_BASIC` - enable http basic authentication when talking to a InfluxDB 1.8.x without authentication but is accessed via reverse proxy with basic authentication (defaults to false)\n- `INFLUXDB_V2_PROFILERS` - set the list of enabled [Flux profilers](https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/profiler/)\n\n``` python\nself.client = InfluxDBClient.from_env_properties()\n```\n\n### Profile query\n\nThe [Flux Profiler package](https://docs.influxdata.com/influxdb/v2.0/reference/flux/stdlib/profiler/) provides performance profiling tools for Flux queries and operations.\n\nYou can enable printing profiler information of the Flux query in client\nlibrary by:\n\n- set QueryOptions.profilers in QueryApi,\n- set `INFLUXDB_V2_PROFILERS` environment variable,\n- set `profilers` option in configuration file.\n\nWhen the profiler is enabled, the result of flux query contains additional tables \"profiler/\". In order to have consistent behaviour with enabled/disabled profiler, `FluxCSVParser` excludes \"profiler/\" measurements from result.\n\nExample how to enable profilers using API:\n\n``` python\nq = '''\n from(bucket: stringParam)\n |> range(start: -5m, stop: now())\n |> filter(fn: (r) => r._measurement == \"mem\")\n |> filter(fn: (r) => r._field == \"available\" or r._field == \"free\" or r._field == \"used\")\n |> aggregateWindow(every: 1m, fn: mean)\n |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")\n'''\np = {\n \"stringParam\": \"my-bucket\",\n}\n\nquery_api = client.query_api(query_options=QueryOptions(profilers=[\"query\", \"operator\"]))\ncsv_result = query_api.query(query=q, params=p)\n```\n\nExample of a profiler output:\n\n``` text\n===============\nProfiler: query\n===============\n\nfrom(bucket: stringParam)\n |> range(start: -5m, stop: now())\n |> filter(fn: (r) => r._measurement == \"mem\")\n |> filter(fn: (r) => r._field == \"available\" or r._field == \"free\" or r._field == \"used\")\n |> aggregateWindow(every: 1m, fn: mean)\n |> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")\n\n========================\nProfiler: profiler/query\n========================\nresult : _profiler\ntable : 0\n_measurement : profiler/query\nTotalDuration : 8924700\nCompileDuration : 350900\nQueueDuration : 33800\nPlanDuration : 0\nRequeueDuration : 0\nExecuteDuration : 8486500\nConcurrency : 0\nMaxAllocated : 2072\nTotalAllocated : 0\nflux/query-plan :\n\ndigraph {\n ReadWindowAggregateByTime11\n // every = 1m, aggregates = [mean], createEmpty = true, timeColumn = \"_stop\"\n pivot8\n generated_yield\n\n ReadWindowAggregateByTime11 -> pivot8\n pivot8 -> generated_yield\n}\n\n\ninfluxdb/scanned-bytes: 0\ninfluxdb/scanned-values: 0\n\n===========================\nProfiler: profiler/operator\n===========================\nresult : _profiler\ntable : 1\n_measurement : profiler/operator\nType : *universe.pivotTransformation\nLabel : pivot8\nCount : 3\nMinDuration : 32600\nMaxDuration : 126200\nDurationSum : 193400\nMeanDuration : 64466.666666666664\n\n===========================\nProfiler: profiler/operator\n===========================\nresult : _profiler\ntable : 1\n_measurement : profiler/operator\nType : *influxdb.readWindowAggregateSource\nLabel : ReadWindowAggregateByTime11\nCount : 1\nMinDuration : 940500\nMaxDuration : 940500\nDurationSum : 940500\nMeanDuration : 940500.0\n```\n\nYou can also use callback function to get profilers output. Return value of this callback is type of FluxRecord.\n\nExample how to use profilers with callback:\n\n``` python\nclass ProfilersCallback(object):\n def __init__(self):\n self.records = []\n\n def __call__(self, flux_record):\n self.records.append(flux_record.values)\n\ncallback = ProfilersCallback()\n\nquery_api = client.query_api(query_options=QueryOptions(profilers=[\"query\", \"operator\"], profiler_callback=callback))\ntables = query_api.query('from(bucket:\"my-bucket\") |> range(start: -10m)')\n\nfor profiler in callback.records:\n print(f'Custom processing of profiler result: {profiler}')\n```\n\nExample output of this callback:\n\n``` text\nCustom processing of profiler result: {'result': '_profiler', 'table': 0, '_measurement': 'profiler/query', 'TotalDuration': 18843792, 'CompileDuration': 1078666, 'QueueDuration': 93375, 'PlanDuration': 0, 'RequeueDuration': 0, 'ExecuteDuration': 17371000, 'Concurrency': 0, 'MaxAllocated': 448, 'TotalAllocated': 0, 'RuntimeErrors': None, 'flux/query-plan': 'digraph {\\r\\n ReadRange2\\r\\n generated_yield\\r\\n\\r\\n ReadRange2 -> generated_yield\\r\\n}\\r\\n\\r\\n', 'influxdb/scanned-bytes': 0, 'influxdb/scanned-values': 0}\nCustom processing of profiler result: {'result': '_profiler', 'table': 1, '_measurement': 'profiler/operator', 'Type': '*influxdb.readFilterSource', 'Label': 'ReadRange2', 'Count': 1, 'MinDuration': 3274084, 'MaxDuration': 3274084, 'DurationSum': 3274084, 'MeanDuration': 3274084.0}\n```\n\n<!-- marker-index-end -->\n\n## How to use\n\n### Writes\n\n<!-- marker-writes-start -->\n\nThe [WriteApi](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write_api.py) supports synchronous, asynchronous and batching writes into InfluxDB 2.0. The data should be passed as a [InfluxDB Line Protocol](https://docs.influxdata.com/influxdb/latest/write_protocols/line_protocol_tutorial/), [Data Point](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py) or Observable stream.\n\n:warning:\n\n> The `WriteApi` in batching mode (default mode) is supposed to run as a\nsingleton. To flush all your data you should wrap the execution using\n`with client.write_api(...) as write_api:` statement or call\n`write_api.close()` at the end of your script.\n\n*The default instance of WriteApi use batching.*\n\n#### The data could be written as\n\n1. `string` or `bytes` that is formatted as a InfluxDB's line protocol\n2. [Data Point](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py#L16) structure\n3. Dictionary style mapping with keys: `measurement`, `tags`, `fields` and `time` or custom structure\n4. [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple)\n5. [Data Classes](https://docs.python.org/3/library/dataclasses.html)\n6. [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)\n7. List of above items\n8. A `batching` type of write also supports an `Observable` that produce one of an above item\n\nYou can find write examples at GitHub: [influxdb-client-python/examples](https://github.com/influxdata/influxdb-client-python/tree/master/examples#writes).\n\n#### Batching\n\nThe batching is configurable by `write_options`:\n\n\n\n| Property | Description | Default Value |\n|----------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------|\n| **batch_size** | the number of data point to collect in a batch | `1000` |\n| **flush_interval** | the number of milliseconds before the batch is written | `1000` |\n| **jitter_interval** | the number of milliseconds to increase the batch flush interval by a random amount | `0` |\n| **retry_interval** | the number of milliseconds to retry first unsuccessful write. The next retry delay is computed using exponential random backoff. The retry interval is used when the InfluxDB server does not specify \\\"Retry-After\\\" header. | `5000` |\n| **max_retry_time** | maximum total retry timeout in milliseconds. | `180_000` |\n| **max_retries** | the number of max retries when write fails | `5` |\n| **max_retry_delay** | the maximum delay between each retry attempt in milliseconds | `125_000` |\n| **max_close_wait** | the maximum amount of time to wait for batches to flush when `.close()` is called | `300_000` |\n| **exponential_base** | the base for the exponential retry delay, the next delay is computed using random exponential backoff as a random value within the interval `retry_interval * exponential_base^(attempts-1)` and `retry_interval * exponential_base^(attempts)`. Example for `retry_interval=5_000, exponential_base=2, max_retry_delay=125_000, total=5` Retry delays are random distributed values within the ranges of `[5_000-10_000, 10_000-20_000, 20_000-40_000, 40_000-80_000, 80_000-125_000]` | `2` |\n\n``` python\nfrom datetime import datetime, timedelta, timezone\n\nimport pandas as pd\nimport reactivex as rx\nfrom reactivex import operators as ops\n\nfrom influxdb_client import InfluxDBClient, Point, WriteOptions\n\nwith InfluxDBClient(url=\"http://localhost:8086\", token=\"my-token\", org=\"my-org\") as _client:\n\n with _client.write_api(write_options=WriteOptions(batch_size=500,\n flush_interval=10_000,\n jitter_interval=2_000,\n retry_interval=5_000,\n max_retries=5,\n max_retry_delay=30_000,\n max_close_wait=300_000,\n exponential_base=2)) as _write_client:\n\n \"\"\"\n Write Line Protocol formatted as string\n \"\"\"\n _write_client.write(\"my-bucket\", \"my-org\", \"h2o_feet,location=coyote_creek water_level=1.0 1\")\n _write_client.write(\"my-bucket\", \"my-org\", [\"h2o_feet,location=coyote_creek water_level=2.0 2\",\n \"h2o_feet,location=coyote_creek water_level=3.0 3\"])\n\n \"\"\"\n Write Line Protocol formatted as byte array\n \"\"\"\n _write_client.write(\"my-bucket\", \"my-org\", \"h2o_feet,location=coyote_creek water_level=1.0 1\".encode())\n _write_client.write(\"my-bucket\", \"my-org\", [\"h2o_feet,location=coyote_creek water_level=2.0 2\".encode(),\n \"h2o_feet,location=coyote_creek water_level=3.0 3\".encode()])\n\n \"\"\"\n Write Dictionary-style object\n \"\"\"\n _write_client.write(\"my-bucket\", \"my-org\", {\"measurement\": \"h2o_feet\", \"tags\": {\"location\": \"coyote_creek\"},\n \"fields\": {\"water_level\": 1.0}, \"time\": 1})\n _write_client.write(\"my-bucket\", \"my-org\", [{\"measurement\": \"h2o_feet\", \"tags\": {\"location\": \"coyote_creek\"},\n \"fields\": {\"water_level\": 2.0}, \"time\": 2},\n {\"measurement\": \"h2o_feet\", \"tags\": {\"location\": \"coyote_creek\"},\n \"fields\": {\"water_level\": 3.0}, \"time\": 3}])\n\n \"\"\"\n Write Data Point\n \"\"\"\n _write_client.write(\"my-bucket\", \"my-org\",\n Point(\"h2o_feet\").tag(\"location\", \"coyote_creek\").field(\"water_level\", 4.0).time(4))\n _write_client.write(\"my-bucket\", \"my-org\",\n [Point(\"h2o_feet\").tag(\"location\", \"coyote_creek\").field(\"water_level\", 5.0).time(5),\n Point(\"h2o_feet\").tag(\"location\", \"coyote_creek\").field(\"water_level\", 6.0).time(6)])\n\n \"\"\"\n Write Observable stream\n \"\"\"\n _data = rx \\\n .range(7, 11) \\\n .pipe(ops.map(lambda i: \"h2o_feet,location=coyote_creek water_level={0}.0 {0}\".format(i)))\n\n _write_client.write(\"my-bucket\", \"my-org\", _data)\n\n \"\"\"\n Write Pandas DataFrame\n \"\"\"\n _now = datetime.now(tz=timezone.utc)\n _data_frame = pd.DataFrame(data=[[\"coyote_creek\", 1.0], [\"coyote_creek\", 2.0]],\n index=[_now, _now + timedelta(hours=1)],\n columns=[\"location\", \"water_level\"])\n\n _write_client.write(\"my-bucket\", \"my-org\", record=_data_frame, data_frame_measurement_name='h2o_feet',\n data_frame_tag_columns=['location'])\n```\n\n#### Default Tags\n\nSometimes is useful to store same information in every measurement e.g. `hostname`, `location`, `customer`. The client is able to use static value or env property as a tag value.\n\nThe expressions:\n\n- `California Miner` - static value\n- `${env.hostname}` - environment property\n\n##### Via API\n\n``` python\npoint_settings = PointSettings()\npoint_settings.add_default_tag(\"id\", \"132-987-655\")\npoint_settings.add_default_tag(\"customer\", \"California Miner\")\npoint_settings.add_default_tag(\"data_center\", \"${env.data_center}\")\n\nself.write_client = self.client.write_api(write_options=SYNCHRONOUS, point_settings=point_settings)\n```\n\n``` python\nself.write_client = self.client.write_api(write_options=SYNCHRONOUS,\n point_settings=PointSettings(**{\"id\": \"132-987-655\",\n \"customer\": \"California Miner\"}))\n```\n\n##### Via Configuration file\n\nIn an [init](https://docs.python.org/3/library/configparser.html) configuration file you are able to specify default tags by `tags` segment.\n\n``` python\nself.client = InfluxDBClient.from_config_file(\"config.ini\")\n```\n\n``` \n[influx2]\nurl=http://localhost:8086\norg=my-org\ntoken=my-token\ntimeout=6000\n\n[tags]\nid = 132-987-655\ncustomer = California Miner\ndata_center = ${env.data_center}\n```\n\nYou can also use a [TOML](https://toml.io/en/) or a[JSON](https://www.json.org/json-en.html) format for the configuration file.\n\n##### Via Environment Properties\n\nYou are able to specify default tags by environment properties with prefix `INFLUXDB_V2_TAG_`.\n\nExamples:\n\n- `INFLUXDB_V2_TAG_ID`\n- `INFLUXDB_V2_TAG_HOSTNAME`\n\n``` python\nself.client = InfluxDBClient.from_env_properties()\n```\n\n#### Synchronous client\n\nData are writes in a synchronous HTTP request.\n\n``` python\nfrom influxdb_client import InfluxDBClient, Point\nfrom influxdb_client .client.write_api import SYNCHRONOUS\n\nclient = InfluxDBClient(url=\"http://localhost:8086\", token=\"my-token\", org=\"my-org\")\nwrite_api = client.write_api(write_options=SYNCHRONOUS)\n\n_point1 = Point(\"my_measurement\").tag(\"location\", \"Prague\").field(\"temperature\", 25.3)\n_point2 = Point(\"my_measurement\").tag(\"location\", \"New York\").field(\"temperature\", 24.3)\n\nwrite_api.write(bucket=\"my-bucket\", record=[_point1, _point2])\n\nclient.close()\n```\n<!-- marker-writes-end -->\n\n### Queries\n\nThe result retrieved by [QueryApi](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/query_api.py) could be formatted as a:\n\n1. Flux data structure: [FluxTable](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L5), [FluxColumn](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L22) and [FluxRecord](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/flux_table.py#L31)\n2. `influxdb_client.client.flux_table.CSVIterator` which will iterate over CSV lines\n3. Raw unprocessed results as a `str` iterator\n4. [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)\n\nThe API also support streaming `FluxRecord` via [query_stream](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/query_api.py#L77), see example below:\n\n``` python\nfrom influxdb_client import InfluxDBClient, Point, Dialect\nfrom influxdb_client.client.write_api import SYNCHRONOUS\n\nclient = InfluxDBClient(url=\"http://localhost:8086\", token=\"my-token\", org=\"my-org\")\n\nwrite_api = client.write_api(write_options=SYNCHRONOUS)\nquery_api = client.query_api()\n\n\"\"\"\nPrepare data\n\"\"\"\n\n_point1 = Point(\"my_measurement\").tag(\"location\", \"Prague\").field(\"temperature\", 25.3)\n_point2 = Point(\"my_measurement\").tag(\"location\", \"New York\").field(\"temperature\", 24.3)\n\nwrite_api.write(bucket=\"my-bucket\", record=[_point1, _point2])\n\n\"\"\"\nQuery: using Table structure\n\"\"\"\ntables = query_api.query('from(bucket:\"my-bucket\") |> range(start: -10m)')\n\nfor table in tables:\n print(table)\n for record in table.records:\n print(record.values)\n\nprint()\nprint()\n\n\"\"\"\nQuery: using Bind parameters\n\"\"\"\n\np = {\"_start\": datetime.timedelta(hours=-1),\n \"_location\": \"Prague\",\n \"_desc\": True,\n \"_floatParam\": 25.1,\n \"_every\": datetime.timedelta(minutes=5)\n }\n\ntables = query_api.query('''\n from(bucket:\"my-bucket\") |> range(start: _start)\n |> filter(fn: (r) => r[\"_measurement\"] == \"my_measurement\")\n |> filter(fn: (r) => r[\"_field\"] == \"temperature\")\n |> filter(fn: (r) => r[\"location\"] == _location and r[\"_value\"] > _floatParam)\n |> aggregateWindow(every: _every, fn: mean, createEmpty: true)\n |> sort(columns: [\"_time\"], desc: _desc)\n''', params=p)\n\nfor table in tables:\n print(table)\n for record in table.records:\n print(str(record[\"_time\"]) + \" - \" + record[\"location\"] + \": \" + str(record[\"_value\"]))\n\nprint()\nprint()\n\n\"\"\"\nQuery: using Stream\n\"\"\"\nrecords = query_api.query_stream('from(bucket:\"my-bucket\") |> range(start: -10m)')\n\nfor record in records:\n print(f'Temperature in {record[\"location\"]} is {record[\"_value\"]}')\n\n\"\"\"\nInterrupt a stream after retrieve a required data\n\"\"\"\nlarge_stream = query_api.query_stream('from(bucket:\"my-bucket\") |> range(start: -100d)')\nfor record in large_stream:\n if record[\"location\"] == \"New York\":\n print(f'New York temperature: {record[\"_value\"]}')\n break\n\nlarge_stream.close()\n\nprint()\nprint()\n\n\"\"\"\nQuery: using csv library\n\"\"\"\ncsv_result = query_api.query_csv('from(bucket:\"my-bucket\") |> range(start: -10m)',\n dialect=Dialect(header=False, delimiter=\",\", comment_prefix=\"#\", annotations=[],\n date_time_format=\"RFC3339\"))\nfor csv_line in csv_result:\n if not len(csv_line) == 0:\n print(f'Temperature in {csv_line[9]} is {csv_line[6]}')\n\n\"\"\"\nClose client\n\"\"\"\nclient.close()\n```\n\n#### Pandas DataFrame\n\n<!-- marker-pandas-start -->\n\n:warning:\n\n> For DataFrame querying you should install Pandas dependency via `pip install 'influxdb-client[extra]'`.\n\n:warning:\n\n> Note that if a query returns more then one table than the client generates a `DataFrame` for each of them.\n\nThe `client` is able to retrieve data in [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html) format thought `query_data_frame`:\n\n``` python\nfrom influxdb_client import InfluxDBClient, Point, Dialect\nfrom influxdb_client.client.write_api import SYNCHRONOUS\n\nclient = InfluxDBClient(url=\"http://localhost:8086\", token=\"my-token\", org=\"my-org\")\n\nwrite_api = client.write_api(write_options=SYNCHRONOUS)\nquery_api = client.query_api()\n\n\"\"\"\nPrepare data\n\"\"\"\n\n_point1 = Point(\"my_measurement\").tag(\"location\", \"Prague\").field(\"temperature\", 25.3)\n_point2 = Point(\"my_measurement\").tag(\"location\", \"New York\").field(\"temperature\", 24.3)\n\nwrite_api.write(bucket=\"my-bucket\", record=[_point1, _point2])\n\n\"\"\"\nQuery: using Pandas DataFrame\n\"\"\"\ndata_frame = query_api.query_data_frame('from(bucket:\"my-bucket\") '\n '|> range(start: -10m) '\n '|> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\") '\n '|> keep(columns: [\"location\", \"temperature\"])')\nprint(data_frame.to_string())\n\n\"\"\"\nClose client\n\"\"\"\nclient.close()\n```\n\nOutput:\n\n``` text\nresult table location temperature\n0 _result 0 New York 24.3\n1 _result 1 Prague 25.3\n```\n\n<!-- marker-pandas-end -->\n\n### Examples\n\n<!-- marker-examples-start -->\n\n#### How to efficiently import large dataset\n\nThe following example shows how to import dataset with a dozen megabytes. If you would like to import gigabytes of data then \nuse our multiprocessing example: [import_data_set_multiprocessing.py](https://github.com/influxdata/influxdb-client-python/blob/master/examples/import_data_set_multiprocessing.py) for use a full capability of your hardware.\n\n- sources - [import_data_set.py](https://github.com/influxdata/influxdb-client-python/blob/master/examples/import_data_set.py)\n\n``` python\n\"\"\"\nImport VIX - CBOE Volatility Index - from \"vix-daily.csv\" file into InfluxDB 2.0\n\nhttps://datahub.io/core/finance-vix#data\n\"\"\"\n\nfrom collections import OrderedDict\nfrom csv import DictReader\n\nimport reactivex as rx\nfrom reactivex import operators as ops\n\nfrom influxdb_client import InfluxDBClient, Point, WriteOptions\n\ndef parse_row(row: OrderedDict):\n \"\"\"Parse row of CSV file into Point with structure:\n\n financial-analysis,type=ily close=18.47,high=19.82,low=18.28,open=19.82 1198195200000000000\n\n CSV format:\n Date,VIX Open,VIX High,VIX Low,VIX Close\\n\n 2004-01-02,17.96,18.68,17.54,18.22\\n\n 2004-01-05,18.45,18.49,17.44,17.49\\n\n 2004-01-06,17.66,17.67,16.19,16.73\\n\n 2004-01-07,16.72,16.75,15.5,15.5\\n\n 2004-01-08,15.42,15.68,15.32,15.61\\n\n 2004-01-09,16.15,16.88,15.57,16.75\\n\n ...\n\n :param row: the row of CSV file\n :return: Parsed csv row to [Point]\n \"\"\"\n\n \"\"\"\n For better performance is sometimes useful directly create a LineProtocol to avoid unnecessary escaping overhead:\n \"\"\"\n # from datetime import timezone\n # import ciso8601\n # from influxdb_client.client.write.point import EPOCH\n #\n # time = (ciso8601.parse_datetime(row[\"Date\"]).replace(tzinfo=timezone.utc) - EPOCH).total_seconds() * 1e9\n # return f\"financial-analysis,type=vix-daily\" \\\n # f\" close={float(row['VIX Close'])},high={float(row['VIX High'])},low={float(row['VIX Low'])},open={float(row['VIX Open'])} \" \\\n # f\" {int(time)}\"\n\n return Point(\"financial-analysis\") \\\n .tag(\"type\", \"vix-daily\") \\\n .field(\"open\", float(row['VIX Open'])) \\\n .field(\"high\", float(row['VIX High'])) \\\n .field(\"low\", float(row['VIX Low'])) \\\n .field(\"close\", float(row['VIX Close'])) \\\n .time(row['Date'])\n\n\n\"\"\"\nConverts vix-daily.csv into sequence of datad point\n\"\"\"\ndata = rx \\\n .from_iterable(DictReader(open('vix-daily.csv', 'r'))) \\\n .pipe(ops.map(lambda row: parse_row(row)))\n\nclient = InfluxDBClient(url=\"http://localhost:8086\", token=\"my-token\", org=\"my-org\", debug=True)\n\n\"\"\"\nCreate client that writes data in batches with 50_000 items.\n\"\"\"\nwrite_api = client.write_api(write_options=WriteOptions(batch_size=50_000, flush_interval=10_000))\n\n\"\"\"\nWrite data into InfluxDB\n\"\"\"\nwrite_api.write(bucket=\"my-bucket\", record=data)\nwrite_api.close()\n\n\"\"\"\nQuerying max value of CBOE Volatility Index\n\"\"\"\nquery = 'from(bucket:\"my-bucket\")' \\\n ' |> range(start: 0, stop: now())' \\\n ' |> filter(fn: (r) => r._measurement == \"financial-analysis\")' \\\n ' |> max()'\nresult = client.query_api().query(query=query)\n\n\"\"\"\nProcessing results\n\"\"\"\nprint()\nprint(\"=== results ===\")\nprint()\nfor table in result:\n for record in table.records:\n print('max {0:5} = {1}'.format(record.get_field(), record.get_value()))\n\n\"\"\"\nClose client\n\"\"\"\nclient.close()\n```\n\n#### Efficiency write data from IOT sensor\n\n- sources - [iot_sensor.py](https://github.com/influxdata/influxdb-client-python/blob/master/examples/iot_sensor.py)\n\n``` python\n\"\"\"\nEfficiency write data from IOT sensor - write changed temperature every minute\n\"\"\"\nimport atexit\nimport platform\nfrom datetime import timedelta\n\nimport psutil as psutil\nimport reactivex as rx\nfrom reactivex import operators as ops\n\nfrom influxdb_client import InfluxDBClient, WriteApi, WriteOptions\n\ndef on_exit(db_client: InfluxDBClient, write_api: WriteApi):\n \"\"\"Close clients after terminate a script.\n\n :param db_client: InfluxDB client\n :param write_api: WriteApi\n :return: nothing\n \"\"\"\n write_api.close()\n db_client.close()\n\n\ndef sensor_temperature():\n \"\"\"Read a CPU temperature. The [psutil] doesn't support MacOS so we use [sysctl].\n\n :return: actual CPU temperature\n \"\"\"\n os_name = platform.system()\n if os_name == 'Darwin':\n from subprocess import check_output\n output = check_output([\"sysctl\", \"machdep.xcpm.cpu_thermal_level\"])\n import re\n return re.findall(r'\\d+', str(output))[0]\n else:\n return psutil.sensors_temperatures()[\"coretemp\"][0]\n\n\ndef line_protocol(temperature):\n \"\"\"Create a InfluxDB line protocol with structure:\n\n iot_sensor,hostname=mine_sensor_12,type=temperature value=68\n\n :param temperature: the sensor temperature\n :return: Line protocol to write into InfluxDB\n \"\"\"\n\n import socket\n return 'iot_sensor,hostname={},type=temperature value={}'.format(socket.gethostname(), temperature)\n\n\n\"\"\"\nRead temperature every minute; distinct_until_changed - produce only if temperature change\n\"\"\"\ndata = rx\\\n .interval(period=timedelta(seconds=60))\\\n .pipe(ops.map(lambda t: sensor_temperature()),\n ops.distinct_until_changed(),\n ops.map(lambda temperature: line_protocol(temperature)))\n\n_db_client = InfluxDBClient(url=\"http://localhost:8086\", token=\"my-token\", org=\"my-org\", debug=True)\n\n\"\"\"\nCreate client that writes data into InfluxDB\n\"\"\"\n_write_api = _db_client.write_api(write_options=WriteOptions(batch_size=1))\n_write_api.write(bucket=\"my-bucket\", record=data)\n\n\n\"\"\"\nCall after terminate a script\n\"\"\"\natexit.register(on_exit, _db_client, _write_api)\n\ninput()\n```\n\n#### Connect to InfluxDB Cloud\n\nThe following example demonstrate the simplest way how to write and query date with the InfluxDB Cloud.\n\nAt first point you should create an authentication token as is described [here](https://v2.docs.influxdata.com/v2.0/security/tokens/create-token/).\n\nAfter that you should configure properties: `influx_cloud_url`,`influx_cloud_token`, `bucket` and `org` in a `influx_cloud.py` example.\n\nThe last step is run a python script via: `python3 influx_cloud.py`.\n\n- sources - [influx_cloud.py](https://github.com/influxdata/influxdb-client-python/blob/master/examples/influx_cloud.py)\n\n``` python\n\"\"\"\nConnect to InfluxDB 2.0 - write data and query them\n\"\"\"\n\nfrom datetime import datetime, timezone\n\nfrom influxdb_client import Point, InfluxDBClient\nfrom influxdb_client.client.write_api import SYNCHRONOUS\n\n\"\"\"\nConfigure credentials\n\"\"\"\ninflux_cloud_url = 'https://us-west-2-1.aws.cloud2.influxdata.com'\ninflux_cloud_token = '...'\nbucket = '...'\norg = '...'\n\nclient = InfluxDBClient(url=influx_cloud_url, token=influx_cloud_token)\ntry:\n kind = 'temperature'\n host = 'host1'\n device = 'opt-123'\n\n \"\"\"\n Write data by Point structure\n \"\"\"\n point = Point(kind).tag('host', host).tag('device', device).field('value', 25.3).time(time=datetime.now(tz=timezone.utc))\n\n print(f'Writing to InfluxDB cloud: {point.to_line_protocol()} ...')\n\n write_api = client.write_api(write_options=SYNCHRONOUS)\n write_api.write(bucket=bucket, org=org, record=point)\n\n print()\n print('success')\n print()\n print()\n\n \"\"\"\n Query written data\n \"\"\"\n query = f'from(bucket: \"{bucket}\") |> range(start: -1d) |> filter(fn: (r) => r._measurement == \"{kind}\")'\n print(f'Querying from InfluxDB cloud: \"{query}\" ...')\n print()\n\n query_api = client.query_api()\n tables = query_api.query(query=query, org=org)\n\n for table in tables:\n for row in table.records:\n print(f'{row.values[\"_time\"]}: host={row.values[\"host\"]},device={row.values[\"device\"]} '\n f'{row.values[\"_value\"]} \u00b0C')\n\n print()\n print('success')\n\nexcept Exception as e:\n print(e)\nfinally:\n client.close()\n```\n\n#### How to use Jupyter + Pandas + InfluxDB 2\n\nThe first example shows how to use client capabilities to predict stock price via [Keras](https://keras.io), [TensorFlow](https://www.tensorflow.org), [sklearn](https://scikit-learn.org/stable/):\n\nThe example is taken from [Kaggle](https://www.kaggle.com/chaitanyacc4/predicting-stock-prices-of-apple-inc).\n\n- sources - [stock-predictions.ipynb](notebooks/stock-predictions.ipynb)\n\n![image](https://raw.githubusercontent.com/influxdata/influxdb-client-python/master/docs/images/stock-price-prediction.gif)\n\nResult:\n\n![image](https://raw.githubusercontent.com/influxdata/influxdb-client-python/master/docs/images/stock-price-prediction-results.png)\n\nThe second example shows how to use client capabilities to realtime visualization via [hvPlot](https://hvplot.pyviz.org), [Streamz](https://streamz.readthedocs.io/en/latest/), [RxPY](https://rxpy.readthedocs.io/en/latest/):\n\n- sources - [realtime-stream.ipynb](notebooks/realtime-stream.ipynb)\n\n![image](https://raw.githubusercontent.com/influxdata/influxdb-client-python/master/docs/images/realtime-result.gif)\n\n#### Other examples\n\nYou can find all examples at GitHub: [influxdb-client-python/examples](https://github.com/influxdata/influxdb-client-python/tree/master/examples#examples).\n\n<!-- marker-examples-end -->\n\n## Advanced Usage\n\n### Gzip support\n\n<!-- marker-gzip-start -->\n\n`InfluxDBClient` does not enable gzip compression for http requests by default. If you want to enable gzip to reduce transfer data's size, you can call:\n\n``` python\nfrom influxdb_client import InfluxDBClient\n\n_db_client = InfluxDBClient(url=\"http://localhost:8086\", token=\"my-token\", org=\"my-org\", enable_gzip=True)\n```\n<!-- marker-gzip-end -->\n\n### Authenticate to the InfluxDB\n\n<!-- marker-authenticate-start -->\n\n`InfluxDBClient` supports three options how to authorize a connection:\n\n- _Token_\n- _Username & Password_\n- _HTTP Basic_\n\n#### Token\n\nUse the `token` to authenticate to the InfluxDB API. In your API requests, an _Authorization_ header will be sent. The header value, provide the word _Token_ followed by a space and an InfluxDB API token. The word _token_ is case-sensitive.\n\n``` python\nfrom influxdb_client import InfluxDBClient\n\nwith InfluxDBClient(url=\"http://localhost:8086\", token=\"my-token\") as client\n```\n\n:warning:\n\n> Note that this is a preferred way how to authenticate to InfluxDB API.\n\n\n#### Username & Password\n\nAuthenticates via username and password credentials. If successful, creates a new session for the user.\n\n``` python\nfrom influxdb_client import InfluxDBClient\n\nwith InfluxDBClient(url=\"http://localhost:8086\", username=\"my-user\", password=\"my-password\") as client\n```\n\n:warning:\n\n> The `username/password` auth is based on the HTTP \"Basic\" authentication. The authorization expires when the [time-to-live (TTL)](https://docs.influxdata.com/influxdb/latest/reference/config-options/#session-length) (default 60 minutes) is reached and client produces `unauthorized exception`.\n\n#### HTTP Basic\n\nUse this to enable basic authentication when talking to a InfluxDB 1.8.x that does not use auth-enabled but is protected by a reverse proxy with basic authentication.\n\n``` python\nfrom influxdb_client import InfluxDBClient\n\nwith InfluxDBClient(url=\"http://localhost:8086\", auth_basic=True, token=\"my-proxy-secret\") as client\n```\n\n:warning:\n\n> Don't use this when directly talking to InfluxDB 2.\n\n<!-- marker-authenticate-end -->\n\n### Proxy configuration\n\n<!-- marker-proxy-start -->\n\nYou can configure the client to tunnel requests through an HTTP proxy. The following proxy options are supported:\n\n- `proxy` - Set this to configure the http proxy to be used, ex. `http://localhost:3128`\n- `proxy_headers` - A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.\n\n``` python\nfrom influxdb_client import InfluxDBClient\n\nwith InfluxDBClient(url=\"http://localhost:8086\",\n token=\"my-token\",\n org=\"my-org\",\n proxy=\"http://localhost:3128\") as client:\n```\n\nIf your proxy notify the client with permanent redirect (`HTTP 301`) to **different host**. The client removes `Authorization` header, because otherwise the contents of `Authorization` is sent to third parties which is a security vulnerability.\n\nYou can change this behaviour by:\n\n``` python\nfrom urllib3 import Retry\nRetry.DEFAULT_REMOVE_HEADERS_ON_REDIRECT = frozenset()\nRetry.DEFAULT.remove_headers_on_redirect = Retry.DEFAULT_REMOVE_HEADERS_ON_REDIRECT\n```\n<!-- marker-proxy-end -->\n\n### Delete data\n\n<!-- marker-delete-start -->\n\nThe [delete_api.py](influxdb_client/client/delete_api.py) supports deletes [points](https://v2.docs.influxdata.com/v2.0/reference/glossary/#point) from an InfluxDB bucket.\n\n``` python\nfrom influxdb_client import InfluxDBClient\n\nclient = InfluxDBClient(url=\"http://localhost:8086\", token=\"my-token\")\n\ndelete_api = client.delete_api()\n\n\"\"\"\nDelete Data\n\"\"\"\nstart = \"1970-01-01T00:00:00Z\"\nstop = \"2021-02-01T00:00:00Z\"\ndelete_api.delete(start, stop, '_measurement=\"my_measurement\"', bucket='my-bucket', org='my-org')\n\n\"\"\"\nClose client\n\"\"\"\nclient.close()\n```\n<!-- marker-delete-end -->\n\n### InfluxDB 1.8 API compatibility\n\n[InfluxDB 1.8.0 introduced forward compatibility APIs](https://docs.influxdata.com/influxdb/v1.8/tools/api/#influxdb-2-0-api-compatibility-endpoints) for InfluxDB 2.0. This allows you to easily move from InfluxDB 1.x to InfluxDB 2.0 Cloud or open source.\n\nThe following forward compatible APIs are available:\n\n | API | Endpoint | Description |\n |-----------------------------------------------------|------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|\n | [query_api.py](influxdb_client/client/query_api.py) | [/api/v2/query](https://docs.influxdata.com/influxdb/v1.8/tools/api/#apiv2query-http-endpoint) | Query data in InfluxDB 1.8.0+ using the InfluxDB 2.0 API and [Flux](https://docs.influxdata.com/flux/latest/) (endpoint should be enabled by [flux-enabled option](https://docs.influxdata.com/influxdb/v1.8/administration/config/#flux-enabled-false)) |\n | [write_api.py](influxdb_client/client/write_api.py) | [/api/v2/write](https://docs.influxdata.com/influxdb/v1.8/tools/api/#apiv2write-http-endpoint) | Write data to InfluxDB 1.8.0+ using the InfluxDB 2.0 API |\n | [ping()](influxdb_client/client/influxdb_client.py) | [/ping](https://docs.influxdata.com/influxdb/v1.8/tools/api/#ping-http-endpoint) | Check the status of your InfluxDB instance |\n\nFor detail info see [InfluxDB 1.8 example](examples/influxdb_18_example.py).\n\n### Handling Errors\n\n<!-- marker-handling-errors-start -->\n\nErrors happen, and it's important that your code is prepared for them. All client related exceptions are delivered from `InfluxDBError`. \nIf the exception cannot be recovered in the client it is returned to the application. These exceptions are left for the developer to handle.\n\nAlmost all APIs directly return unrecoverable exceptions to be handled this way:\n\n``` python\nfrom influxdb_client import InfluxDBClient\nfrom influxdb_client.client.exceptions import InfluxDBError\nfrom influxdb_client.client.write_api import SYNCHRONOUS\n\nwith InfluxDBClient(url=\"http://localhost:8086\", token=\"my-token\", org=\"my-org\") as client:\n try:\n client.write_api(write_options=SYNCHRONOUS).write(\"my-bucket\", record=\"mem,tag=a value=86\")\n except InfluxDBError as e:\n if e.response.status == 401:\n raise Exception(f\"Insufficient write permissions to 'my-bucket'.\") from e\n raise\n```\n\nThe only exception is **batching** `WriteAPI` (for more info see [Batching](#batching)) where you need to register custom callbacks to handle batch events. \nThis is because this API runs in the `background` in a `separate` thread and isn't possible to directly return underlying exceptions.\n\n``` python\nfrom influxdb_client import InfluxDBClient\nfrom influxdb_client.client.exceptions import InfluxDBError\n\n\nclass BatchingCallback(object):\n\n def success(self, conf: (str, str, str), data: str):\n print(f\"Written batch: {conf}, data: {data}\")\n\n def error(self, conf: (str, str, str), data: str, exception: InfluxDBError):\n print(f\"Cannot write batch: {conf}, data: {data} due: {exception}\")\n\n def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):\n print(f\"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}\")\n\n\nwith InfluxDBClient(url=\"http://localhost:8086\", token=\"my-token\", org=\"my-org\") as client:\n callback = BatchingCallback()\n with client.write_api(success_callback=callback.success,\n error_callback=callback.error,\n retry_callback=callback.retry) as write_api:\n pass\n```\n\n#### HTTP Retry Strategy\n\nBy default, the client uses a retry strategy only for batching writes (for more info see [Batching](#batching)). \nFor other HTTP requests there is no one retry strategy, but it could be configured by `retries` parameter of `InfluxDBClient`.\n\nFor more info about how configure HTTP retry see details in [urllib3 documentation](https://urllib3.readthedocs.io/en/latest/reference/index.html?highlight=retry#urllib3.Retry).\n\n``` python\nfrom urllib3 import Retry\n\nfrom influxdb_client import InfluxDBClient\n\nretries = Retry(connect=5, read=2, redirect=5)\nclient = InfluxDBClient(url=\"http://localhost:8086\", token=\"my-token\", org=\"my-org\", retries=retries)\n```\n\n<!-- marker-handling-errors-end -->\n\n### Nanosecond precision\n\n<!-- marker-nanosecond-start -->\n\nThe Python's [datetime](https://docs.python.org/3/library/datetime.html) doesn't support precision with nanoseconds so the library during writes and queries ignores everything after microseconds.\n\nIf you would like to use `datetime` with nanosecond precision you should use [pandas.Timestamp](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Timestamp.html#pandas.Timestamp) that is replacement for python `datetime.datetime` object, and also you should set a proper `DateTimeHelper` to the client.\n\n- sources - [nanosecond_precision.py](https://github.com/influxdata/influxdb-client-python/blob/master/examples/nanosecond_precision.py)\n\n``` python\nfrom influxdb_client import Point, InfluxDBClient\nfrom influxdb_client.client.util.date_utils_pandas import PandasDateTimeHelper\nfrom influxdb_client.client.write_api import SYNCHRONOUS\n\n\"\"\"\nSet PandasDate helper which supports nanoseconds.\n\"\"\"\nimport influxdb_client.client.util.date_utils as date_utils\n\ndate_utils.date_helper = PandasDateTimeHelper()\n\n\"\"\"\nPrepare client.\n\"\"\"\nclient = InfluxDBClient(url=\"http://localhost:8086\", token=\"my-token\", org=\"my-org\")\n\nwrite_api = client.write_api(write_options=SYNCHRONOUS)\nquery_api = client.query_api()\n\n\"\"\"\nPrepare data\n\"\"\"\n\npoint = Point(\"h2o_feet\") \\\n .field(\"water_level\", 10) \\\n .tag(\"location\", \"pacific\") \\\n .time('1996-02-25T21:20:00.001001231Z')\n\nprint(f'Time serialized with nanosecond precision: {point.to_line_protocol()}')\nprint()\n\nwrite_api.write(bucket=\"my-bucket\", record=point)\n\n\"\"\"\nQuery: using Stream\n\"\"\"\nquery = '''\nfrom(bucket:\"my-bucket\")\n |> range(start: 0, stop: now())\n |> filter(fn: (r) => r._measurement == \"h2o_feet\")\n'''\nrecords = query_api.query_stream(query)\n\nfor record in records:\n print(f'Temperature in {record[\"location\"]} is {record[\"_value\"]} at time: {record[\"_time\"]}')\n\n\"\"\"\nClose client\n\"\"\"\nclient.close()\n```\n<!-- marker-nanosecond-end -->\n\n### How to use Asyncio\n\n<!-- marker-asyncio-start -->\n\nStarting from version 1.27.0 for Python 3.7+ the `influxdb-client` package supports `async/await` based on [asyncio](https://docs.python.org/3/library/asyncio.html), [aiohttp](https://docs.aiohttp.org) and [aiocsv](https://pypi.org/project/aiocsv/). \nYou can install `aiohttp` and `aiocsv` directly:\n\n> ``` bash\n> $ python -m pip install influxdb-client aiohttp aiocsv\n> ```\n\nor use the `[async]` extra:\n\n> ``` bash\n> $ python -m pip install influxdb-client[async]\n> ```\n\n:warning:\n\n> The `InfluxDBClientAsync` should be initialised inside `async coroutine` otherwise there can be unexpected behaviour. For more info see: [Why is creating a ClientSession outside an event loop dangerous?](https://docs.aiohttp.org/en/stable/faq.html#why-is-creating-a-clientsession-outside-of-an-event-loop-dangerous).\n\n#### Async APIs\n\nAll async APIs are available via `influxdb_client.client.influxdb_client_async.InfluxDBClientAsync`. The `async` version of the client supports following asynchronous APIs:\n\n- `influxdb_client.client.write_api_async.WriteApiAsync`\n- `influxdb_client.client.query_api_async.QueryApiAsync`\n- `influxdb_client.client.delete_api_async.DeleteApiAsync`\n- Management services into `influxdb_client.service` supports async\n operation\n\nand also check to readiness of the InfluxDB via `/ping` endpoint:\n\nThe `InfluxDBClientAsync` constructor accepts a number of __configuration properties__. Most useful among these are:\n\n* `connection_pool_maxsize` - The total number of simultaneous connections. Defaults to `multiprocessing.cpu_count() * 5`.\n* `enable_gzip` - enable gzip compression during `write` and `query` calls. Defaults to `false`.\n* `proxy` - URL of an HTTP proxy to be used.\n* `timeout` - The maximum number of milliseconds for handling HTTP requests from initial handshake to handling response data. This is passed directly to the underlying transport library. If large amounts of data are anticipated, for example from `query_api.query_stream(...)`, this should be increased to avoid `TimeoutError` or `CancelledError`. Defaults to 10_000 ms.\n\n> ``` python\n> import asyncio\n>\n> from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync\n>\n>\n> async def main():\n> async with InfluxDBClientAsync(url=\"http://localhost:8086\", token=\"my-token\", org=\"my-org\") as client:\n> ready = await client.ping()\n> print(f\"InfluxDB: {ready}\")\n>\n>\n> if __name__ == \"__main__\":\n> asyncio.run(main())\n> ```\n\n#### Async Write API\n\nThe `influxdb_client.client.write_api_async.WriteApiAsync` supports ingesting data as:\n\n- `string` or `bytes` that is formatted as a InfluxDB\\'s line protocol\n- [Data Point](https://github.com/influxdata/influxdb-client-python/blob/master/influxdb_client/client/write/point.py#L16) structure\n- Dictionary style mapping with keys: `measurement`, `tags`, `fields` and `time` or custom structure\n- [NamedTuple](https://docs.python.org/3/library/collections.html#collections.namedtuple)\n- [Data Classes](https://docs.python.org/3/library/dataclasses.html)\n- [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)\n- List of above items\n\n> ``` python\n> import asyncio\n>\n> from influxdb_client import Point\n> from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync\n>\n>\n> async def main():\n> async with InfluxDBClientAsync(url=\"http://localhost:8086\", token=\"my-token\", org=\"my-org\") as client:\n>\n> write_api = client.write_api()\n>\n> _point1 = Point(\"async_m\").tag(\"location\", \"Prague\").field(\"temperature\", 25.3)\n> _point2 = Point(\"async_m\").tag(\"location\", \"New York\").field(\"temperature\", 24.3)\n>\n> successfully = await write_api.write(bucket=\"my-bucket\", record=[_point1, _point2])\n>\n> print(f\" > successfully: {successfully}\")\n>\n>\n> if __name__ == \"__main__\":\n> asyncio.run(main())\n> ```\n\n#### Async Query API\n\nThe `influxdb_client.client.query_api_async.QueryApiAsync` supports retrieve data as:\n\n- List of `influxdb_client.client.flux_table.FluxTable`\n- Stream of `influxdb_client.client.flux_table.FluxRecord` via `typing.AsyncGenerator`\n- [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html)\n- Stream of [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.html) via `typing.AsyncGenerator`\n- Raw `str` output\n\n> ``` python\n> import asyncio\n>\n> from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync\n>\n>\n> async def main():\n> async with InfluxDBClientAsync(url=\"http://localhost:8086\", token=\"my-token\", org=\"my-org\") as client:\n> # Stream of FluxRecords\n> query_api = client.query_api()\n> records = await query_api.query_stream('from(bucket:\"my-bucket\") '\n> '|> range(start: -10m) '\n> '|> filter(fn: (r) => r[\"_measurement\"] == \"async_m\")')\n> async for record in records:\n> print(record)\n>\n>\n> if __name__ == \"__main__\":\n> asyncio.run(main())\n> ```\n\n#### Async Delete API\n\n> ``` python\n> import asyncio\n> from datetime import datetime\n>\n> from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync\n>\n>\n> async def main():\n> async with InfluxDBClientAsync(url=\"http://localhost:8086\", token=\"my-token\", org=\"my-org\") as client:\n> start = datetime.fromtimestamp(0)\n> stop = datetime.now()\n> # Delete data with location = 'Prague'\n> successfully = await client.delete_api().delete(start=start, stop=stop, bucket=\"my-bucket\",\n> predicate=\"location = \\\"Prague\\\"\")\n> print(f\" > successfully: {successfully}\")\n>\n>\n> if __name__ == \"__main__\":\n> asyncio.run(main())\n> ```\n\n#### Management API\n\n> ``` python\n> import asyncio\n>\n> from influxdb_client import OrganizationsService\n> from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync\n>\n>\n> async def main():\n> async with InfluxDBClientAsync(url='http://localhost:8086', token='my-token', org='my-org') as client:\n> # Initialize async OrganizationsService\n> organizations_service = OrganizationsService(api_client=client.api_client)\n>\n> # Find organization with name 'my-org'\n> organizations = await organizations_service.get_orgs(org='my-org')\n> for organization in organizations.orgs:\n> print(f'name: {organization.name}, id: {organization.id}')\n>\n>\n> if __name__ == \"__main__\":\n> asyncio.run(main())\n> ```\n\n#### Proxy and redirects\n\nYou can configure the client to tunnel requests through an HTTP proxy.\nThe following proxy options are supported:\n\n- `proxy` - Set this to configure the http proxy to be used, ex. `http://localhost:3128`\n- `proxy_headers` - A dictionary containing headers that will be sent to the proxy. Could be used for proxy authentication.\n\n``` python\nfrom influxdb_client.client.influxdb_client_async import InfluxDBClientAsync\n\n\nasync with InfluxDBClientAsync(url=\"http://localhost:8086\",\n token=\"my-token\",\n org=\"my-org\",\n proxy=\"http://localhost:3128\") as client:\n```\n\nIf your proxy notify the client with permanent redirect (`HTTP 301`) to **different host**.\nThe client removes `Authorization` header, because otherwise the contents of `Authorization` is sent to third parties which is a security vulnerability.\n\nClient automatically follows HTTP redirects. The default redirect policy is to follow up to `10` consecutive requests. \nThe redirects can be configured via:\n\n- `allow_redirects` - If set to `False`, do not follow HTTP redirects.\n `True` by default.\n- `max_redirects` - Maximum number of HTTP redirects to follow. `10`\n by default.\n\n<!-- marker-asyncio-end -->\n\n### Logging\n\n<!-- marker-logging-start -->\n\nThe client uses Python's [logging](https://docs.python.org/3/library/logging.html) facility for logging the library activity. The following logger categories are\nexposed:\n\n- `influxdb_client.client.influxdb_client`\n- `influxdb_client.client.influxdb_client_async`\n- `influxdb_client.client.write_api`\n- `influxdb_client.client.write_api_async`\n- `influxdb_client.client.write.retry`\n- `influxdb_client.client.write.dataframe_serializer`\n- `influxdb_client.client.util.multiprocessing_helper`\n- `influxdb_client.client.http`\n- `influxdb_client.client.exceptions`\n\nThe default logging level is `warning` without configured logger output. You can use the standard logger interface to change the log level and handler:\n\n``` python\nimport logging\nimport sys\n\nfrom influxdb_client import InfluxDBClient\n\nwith InfluxDBClient(url=\"http://localhost:8086\", token=\"my-token\", org=\"my-org\") as client:\n for _, logger in client.conf.loggers.items():\n logger.setLevel(logging.DEBUG)\n logger.addHandler(logging.StreamHandler(sys.stdout))\n```\n\n#### Debugging\n\nFor debug purpose you can enable verbose logging of HTTP requests and set the `debug` level to all client's logger categories by:\n\n``` python\nclient = InfluxDBClient(url=\"http://localhost:8086\", token=\"my-token\", debug=True)\n```\n\nBoth HTTP request headers and body will be logged to standard output.\n\n<!-- marker-logging-end -->\n\n## Local tests\n\n``` console\n# start/restart InfluxDB2 on local machine using docker\n./scripts/influxdb-restart.sh\n\n# install requirements\npip install -e . --user\npip install -e .\\[extra\\] --user\npip install -e .\\[test\\] --user\n\n# run unit & integration tests\npytest tests\n```\n\n## Contributing\n\nBug reports and pull requests are welcome on GitHub at <https://github.com/influxdata/influxdb-client-python>.\n\n## License\n\nThe gem is available as open source under the terms of the [MIT License](https://opensource.org/licenses/MIT).\n",
"bugtrack_url": null,
"license": null,
"summary": "InfluxDB 2.0 Python client library",
"version": "1.48.0",
"project_urls": {
"Homepage": "https://github.com/influxdata/influxdb-client-python"
},
"split_keywords": [
"influxdb",
" influxdb python client"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "5cb31edc89584b8d1bc5226cf508b67ab64da3ba83041cab348861e6f4392326",
"md5": "c88affdc7daedb823c159edea5fda2eb",
"sha256": "410db15db761df7ea98adb333c7a03f05bcc2ceef4830cefb7071b888be2b827"
},
"downloads": -1,
"filename": "influxdb_client-1.48.0-py3-none-any.whl",
"has_sig": false,
"md5_digest": "c88affdc7daedb823c159edea5fda2eb",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.7",
"size": 746177,
"upload_time": "2024-11-27T08:26:30",
"upload_time_iso_8601": "2024-11-27T08:26:30.438819Z",
"url": "https://files.pythonhosted.org/packages/5c/b3/1edc89584b8d1bc5226cf508b67ab64da3ba83041cab348861e6f4392326/influxdb_client-1.48.0-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "1147b756380917cb4b968bd871fc006128e2cc9897fb1ab4bcf7d108f9601e78",
"md5": "7e018fde2836e1dbfb44e22062aee28b",
"sha256": "414d5b5eff7d2b6b453f33e2826ea9872ea04a11996ba9c8604b0c1df57c8559"
},
"downloads": -1,
"filename": "influxdb_client-1.48.0.tar.gz",
"has_sig": false,
"md5_digest": "7e018fde2836e1dbfb44e22062aee28b",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.7",
"size": 386415,
"upload_time": "2024-11-27T08:26:32",
"upload_time_iso_8601": "2024-11-27T08:26:32.909856Z",
"url": "https://files.pythonhosted.org/packages/11/47/b756380917cb4b968bd871fc006128e2cc9897fb1ab4bcf7d108f9601e78/influxdb_client-1.48.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-11-27 08:26:32",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "influxdata",
"github_project": "influxdb-client-python",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"circle": true,
"lcname": "influxdb-client"
}