pydruid


Namepydruid JSON
Version 0.6.6 PyPI version JSON
download
home_pagehttps://druid.apache.org
SummaryA Python connector for Druid.
upload_time2023-12-01 18:38:37
maintainer
docs_urlhttps://pythonhosted.org/pydruid/
authorDruid Developers
requires_python
licenseApache License, Version 2.0
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI
coveralls test coverage No coveralls.
            # pydruid

pydruid exposes a simple API to create, execute, and analyze [Druid](http://druid.io/) queries. pydruid can parse query results into [Pandas](http://pandas.pydata.org/) DataFrame objects for subsequent data analysis -- this offers a tight integration between [Druid](http://druid.io/), the [SciPy](http://www.scipy.org/stackspec.html) stack (for scientific computing) and [scikit-learn](http://scikit-learn.org/stable/) (for machine learning). pydruid can export query results into TSV or JSON for further processing with your favorite tool, e.g., R, Julia, Matlab, Excel. It provides both synchronous and asynchronous clients.

Additionally, pydruid implements the [Python DB API 2.0](https://www.python.org/dev/peps/pep-0249/), a [SQLAlchemy dialect](http://docs.sqlalchemy.org/en/latest/dialects/), and a provides a command line interface to interact with Druid.

To install:
```python
pip install pydruid
# or, if you intend to use asynchronous client
pip install pydruid[async]
# or, if you intend to export query results into pandas
pip install pydruid[pandas]
# or, if you intend to do both
pip install pydruid[async, pandas]
# or, if you want to use the SQLAlchemy engine
pip install pydruid[sqlalchemy]
# or, if you want to use the CLI
pip install pydruid[cli]
```
Documentation: https://pythonhosted.org/pydruid/.

# examples

The following exampes show how to execute and analyze the results of three types of queries: timeseries, topN, and groupby. We will use these queries to ask simple questions about twitter's public data set.

## timeseries

What was the average tweet length, per day, surrounding the 2014 Sochi olympics?

```python
from pydruid.client import *
from pylab import plt

query = PyDruid(druid_url_goes_here, 'druid/v2')

ts = query.timeseries(
    datasource='twitterstream',
    granularity='day',
    intervals='2014-02-02/p4w',
    aggregations={'length': doublesum('tweet_length'), 'count': doublesum('count')},
    post_aggregations={'avg_tweet_length': (Field('length') / Field('count'))},
    filter=Dimension('first_hashtag') == 'sochi2014'
)
df = query.export_pandas()
df['timestamp'] = df['timestamp'].map(lambda x: x.split('T')[0])
df.plot(x='timestamp', y='avg_tweet_length', ylim=(80, 140), rot=20,
        title='Sochi 2014')
plt.ylabel('avg tweet length (chars)')
plt.show()
```

![alt text](https://github.com/metamx/pydruid/raw/master/docs/figures/avg_tweet_length.png "Avg. tweet length")

## topN

Who were the top ten mentions (@user_name) during the 2014 Oscars?

```python
top = query.topn(
    datasource='twitterstream',
    granularity='all',
    intervals='2014-03-03/p1d',  # utc time of 2014 oscars
    aggregations={'count': doublesum('count')},
    dimension='user_mention_name',
    filter=(Dimension('user_lang') == 'en') & (Dimension('first_hashtag') == 'oscars') &
           (Dimension('user_time_zone') == 'Pacific Time (US & Canada)') &
           ~(Dimension('user_mention_name') == 'No Mention'),
    metric='count',
    threshold=10
)

df = query.export_pandas()
print df

   count                 timestamp user_mention_name
0   1303  2014-03-03T00:00:00.000Z      TheEllenShow
1     44  2014-03-03T00:00:00.000Z        TheAcademy
2     21  2014-03-03T00:00:00.000Z               MTV
3     21  2014-03-03T00:00:00.000Z         peoplemag
4     17  2014-03-03T00:00:00.000Z               THR
5     16  2014-03-03T00:00:00.000Z      ItsQueenElsa
6     16  2014-03-03T00:00:00.000Z           eonline
7     15  2014-03-03T00:00:00.000Z       PerezHilton
8     14  2014-03-03T00:00:00.000Z     realjohngreen
9     12  2014-03-03T00:00:00.000Z       KevinSpacey

```

## groupby

What does the social network of users replying to other users look like?

```python
from igraph import *
from cairo import *
from pandas import concat

group = query.groupby(
    datasource='twitterstream',
    granularity='hour',
    intervals='2013-10-04/pt12h',
    dimensions=["user_name", "reply_to_name"],
    filter=(~(Dimension("reply_to_name") == "Not A Reply")) &
           (Dimension("user_location") == "California"),
    aggregations={"count": doublesum("count")}
)

df = query.export_pandas()

# map names to categorical variables with a lookup table
names = concat([df['user_name'], df['reply_to_name']]).unique()
nameLookup = dict([pair[::-1] for pair in enumerate(names)])
df['user_name_lookup'] = df['user_name'].map(nameLookup.get)
df['reply_to_name_lookup'] = df['reply_to_name'].map(nameLookup.get)

# create the graph with igraph
g = Graph(len(names), directed=False)
vertices = zip(df['user_name_lookup'], df['reply_to_name_lookup'])
g.vs["name"] = names
g.add_edges(vertices)
layout = g.layout_fruchterman_reingold()
plot(g, "tweets.png", layout=layout, vertex_size=2, bbox=(400, 400), margin=25, edge_width=1, vertex_color="blue")
```

![alt text](https://github.com/metamx/pydruid/raw/master/docs/figures/twitter_graph.png "Social Network")

# asynchronous client
```pydruid.async_client.AsyncPyDruid``` implements an asynchronous client. To achieve that, it utilizes an asynchronous
HTTP client from ```Tornado``` framework. The asynchronous client is suitable for use with async frameworks such as Tornado
and provides much better performance at scale. It lets you serve multiple requests at the same time, without blocking on
Druid executing your queries.

## example
```python
from tornado import gen
from pydruid.async_client import AsyncPyDruid
from pydruid.utils.aggregators import longsum
from pydruid.utils.filters import Dimension

client = AsyncPyDruid(url_to_druid_broker, 'druid/v2')

@gen.coroutine
def your_asynchronous_method_serving_top10_mentions_for_day(day
    top_mentions = yield client.topn(
        datasource='twitterstream',
        granularity='all',
        intervals="%s/p1d" % (day, ),
        aggregations={'count': doublesum('count')},
        dimension='user_mention_name',
        filter=(Dimension('user_lang') == 'en') & (Dimension('first_hashtag') == 'oscars') &
               (Dimension('user_time_zone') == 'Pacific Time (US & Canada)') &
               ~(Dimension('user_mention_name') == 'No Mention'),
        metric='count',
        threshold=10)

    # asynchronously return results
    # can be simply ```return top_mentions``` in python 3.x
    raise gen.Return(top_mentions)
```


# thetaSketches
Theta sketch Post aggregators are built slightly differently to normal Post Aggregators, as they have different operators.
Note: you must have the ```druid-datasketches``` extension loaded into your Druid cluster in order to use these.
See the [Druid datasketches](http://druid.io/docs/latest/development/extensions-core/datasketches-aggregators.html) documentation for details.

```python
from pydruid.client import *
from pydruid.utils import aggregators
from pydruid.utils import filters
from pydruid.utils import postaggregator

query = PyDruid(url_to_druid_broker, 'druid/v2')
ts = query.groupby(
    datasource='test_datasource',
    granularity='all',
    intervals='2016-09-01/P1M',
    filter = ( filters.Dimension('product').in_(['product_A', 'product_B'])),
    aggregations={
        'product_A_users': aggregators.filtered(
            filters.Dimension('product') == 'product_A',
            aggregators.thetasketch('user_id')
            ),
        'product_B_users': aggregators.filtered(
            filters.Dimension('product') == 'product_B',
            aggregators.thetasketch('user_id')
            )
    },
    post_aggregations={
        'both_A_and_B': postaggregator.ThetaSketchEstimate(
            postaggregator.ThetaSketch('product_A_users') & postaggregator.ThetaSketch('product_B_users')
            )
    }
)
```

# DB API

```python
from pydruid.db import connect

conn = connect(host='localhost', port=8082, path='/druid/v2/sql/', scheme='http')
curs = conn.cursor()
curs.execute("""
    SELECT place,
           CAST(REGEXP_EXTRACT(place, '(.*),', 1) AS FLOAT) AS lat,
           CAST(REGEXP_EXTRACT(place, ',(.*)', 1) AS FLOAT) AS lon
      FROM places
     LIMIT 10
""")
for row in curs:
    print(row)
```

# SQLAlchemy

```python
from sqlalchemy import *
from sqlalchemy.engine import create_engine
from sqlalchemy.schema import *

engine = create_engine('druid://localhost:8082/druid/v2/sql/')  # uses HTTP by default :(
# engine = create_engine('druid+http://localhost:8082/druid/v2/sql/')
# engine = create_engine('druid+https://localhost:8082/druid/v2/sql/')

places = Table('places', MetaData(bind=engine), autoload=True)
print(select([func.count('*')], from_obj=places).scalar())
```


## Column headers

In version 0.13.0 Druid SQL added support for including the column names in the
response which can be requested via the "header" field in the request. This
helps to ensure that the cursor description is defined (which is a requirement
for SQLAlchemy query statements) regardless on whether the result set contains
any rows. Historically this was problematic for result sets which contained no
rows at one could not infer the expected column names.

Enabling the header can be configured via the SQLAlchemy URI by using the query
parameter, i.e.,

```python
engine = create_engine('druid://localhost:8082/druid/v2/sql?header=true')
```

Note the current default is `false` to ensure backwards compatibility but should
be set to `true` for Druid versions >= 0.13.0.


# Command line

```bash
$ pydruid http://localhost:8082/druid/v2/sql/
> SELECT COUNT(*) AS cnt FROM places
  cnt
-----
12345
> SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES;
TABLE_NAME
----------
test_table
COLUMNS
SCHEMATA
TABLES
> BYE;
GoodBye!
```

# Contributing

Contributions are welcomed of course. We like to use `black` and `flake8`.

```bash
pip install -r requirements-dev.txt  # installs useful dev deps
pre-commit install  # installs useful commit hooks
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://druid.apache.org",
    "name": "pydruid",
    "maintainer": "",
    "docs_url": "https://pythonhosted.org/pydruid/",
    "requires_python": "",
    "maintainer_email": "",
    "keywords": "",
    "author": "Druid Developers",
    "author_email": "druid-development@googlegroups.com",
    "download_url": "https://files.pythonhosted.org/packages/d4/16/e828f8d653f2499b50b93bc421c1f3e13a71d21e287648f619a40556ce96/pydruid-0.6.6.tar.gz",
    "platform": null,
    "description": "# pydruid\n\npydruid exposes a simple API to create, execute, and analyze [Druid](http://druid.io/) queries. pydruid can parse query results into [Pandas](http://pandas.pydata.org/) DataFrame objects for subsequent data analysis -- this offers a tight integration between [Druid](http://druid.io/), the [SciPy](http://www.scipy.org/stackspec.html) stack (for scientific computing) and [scikit-learn](http://scikit-learn.org/stable/) (for machine learning). pydruid can export query results into TSV or JSON for further processing with your favorite tool, e.g., R, Julia, Matlab, Excel. It provides both synchronous and asynchronous clients.\n\nAdditionally, pydruid implements the [Python DB API 2.0](https://www.python.org/dev/peps/pep-0249/), a [SQLAlchemy dialect](http://docs.sqlalchemy.org/en/latest/dialects/), and a provides a command line interface to interact with Druid.\n\nTo install:\n```python\npip install pydruid\n# or, if you intend to use asynchronous client\npip install pydruid[async]\n# or, if you intend to export query results into pandas\npip install pydruid[pandas]\n# or, if you intend to do both\npip install pydruid[async, pandas]\n# or, if you want to use the SQLAlchemy engine\npip install pydruid[sqlalchemy]\n# or, if you want to use the CLI\npip install pydruid[cli]\n```\nDocumentation: https://pythonhosted.org/pydruid/.\n\n# examples\n\nThe following exampes show how to execute and analyze the results of three types of queries: timeseries, topN, and groupby. We will use these queries to ask simple questions about twitter's public data set.\n\n## timeseries\n\nWhat was the average tweet length, per day, surrounding the 2014 Sochi olympics?\n\n```python\nfrom pydruid.client import *\nfrom pylab import plt\n\nquery = PyDruid(druid_url_goes_here, 'druid/v2')\n\nts = query.timeseries(\n    datasource='twitterstream',\n    granularity='day',\n    intervals='2014-02-02/p4w',\n    aggregations={'length': doublesum('tweet_length'), 'count': doublesum('count')},\n    post_aggregations={'avg_tweet_length': (Field('length') / Field('count'))},\n    filter=Dimension('first_hashtag') == 'sochi2014'\n)\ndf = query.export_pandas()\ndf['timestamp'] = df['timestamp'].map(lambda x: x.split('T')[0])\ndf.plot(x='timestamp', y='avg_tweet_length', ylim=(80, 140), rot=20,\n        title='Sochi 2014')\nplt.ylabel('avg tweet length (chars)')\nplt.show()\n```\n\n![alt text](https://github.com/metamx/pydruid/raw/master/docs/figures/avg_tweet_length.png \"Avg. tweet length\")\n\n## topN\n\nWho were the top ten mentions (@user_name) during the 2014 Oscars?\n\n```python\ntop = query.topn(\n    datasource='twitterstream',\n    granularity='all',\n    intervals='2014-03-03/p1d',  # utc time of 2014 oscars\n    aggregations={'count': doublesum('count')},\n    dimension='user_mention_name',\n    filter=(Dimension('user_lang') == 'en') & (Dimension('first_hashtag') == 'oscars') &\n           (Dimension('user_time_zone') == 'Pacific Time (US & Canada)') &\n           ~(Dimension('user_mention_name') == 'No Mention'),\n    metric='count',\n    threshold=10\n)\n\ndf = query.export_pandas()\nprint df\n\n   count                 timestamp user_mention_name\n0   1303  2014-03-03T00:00:00.000Z      TheEllenShow\n1     44  2014-03-03T00:00:00.000Z        TheAcademy\n2     21  2014-03-03T00:00:00.000Z               MTV\n3     21  2014-03-03T00:00:00.000Z         peoplemag\n4     17  2014-03-03T00:00:00.000Z               THR\n5     16  2014-03-03T00:00:00.000Z      ItsQueenElsa\n6     16  2014-03-03T00:00:00.000Z           eonline\n7     15  2014-03-03T00:00:00.000Z       PerezHilton\n8     14  2014-03-03T00:00:00.000Z     realjohngreen\n9     12  2014-03-03T00:00:00.000Z       KevinSpacey\n\n```\n\n## groupby\n\nWhat does the social network of users replying to other users look like?\n\n```python\nfrom igraph import *\nfrom cairo import *\nfrom pandas import concat\n\ngroup = query.groupby(\n    datasource='twitterstream',\n    granularity='hour',\n    intervals='2013-10-04/pt12h',\n    dimensions=[\"user_name\", \"reply_to_name\"],\n    filter=(~(Dimension(\"reply_to_name\") == \"Not A Reply\")) &\n           (Dimension(\"user_location\") == \"California\"),\n    aggregations={\"count\": doublesum(\"count\")}\n)\n\ndf = query.export_pandas()\n\n# map names to categorical variables with a lookup table\nnames = concat([df['user_name'], df['reply_to_name']]).unique()\nnameLookup = dict([pair[::-1] for pair in enumerate(names)])\ndf['user_name_lookup'] = df['user_name'].map(nameLookup.get)\ndf['reply_to_name_lookup'] = df['reply_to_name'].map(nameLookup.get)\n\n# create the graph with igraph\ng = Graph(len(names), directed=False)\nvertices = zip(df['user_name_lookup'], df['reply_to_name_lookup'])\ng.vs[\"name\"] = names\ng.add_edges(vertices)\nlayout = g.layout_fruchterman_reingold()\nplot(g, \"tweets.png\", layout=layout, vertex_size=2, bbox=(400, 400), margin=25, edge_width=1, vertex_color=\"blue\")\n```\n\n![alt text](https://github.com/metamx/pydruid/raw/master/docs/figures/twitter_graph.png \"Social Network\")\n\n# asynchronous client\n```pydruid.async_client.AsyncPyDruid``` implements an asynchronous client. To achieve that, it utilizes an asynchronous\nHTTP client from ```Tornado``` framework. The asynchronous client is suitable for use with async frameworks such as Tornado\nand provides much better performance at scale. It lets you serve multiple requests at the same time, without blocking on\nDruid executing your queries.\n\n## example\n```python\nfrom tornado import gen\nfrom pydruid.async_client import AsyncPyDruid\nfrom pydruid.utils.aggregators import longsum\nfrom pydruid.utils.filters import Dimension\n\nclient = AsyncPyDruid(url_to_druid_broker, 'druid/v2')\n\n@gen.coroutine\ndef your_asynchronous_method_serving_top10_mentions_for_day(day\n    top_mentions = yield client.topn(\n        datasource='twitterstream',\n        granularity='all',\n        intervals=\"%s/p1d\" % (day, ),\n        aggregations={'count': doublesum('count')},\n        dimension='user_mention_name',\n        filter=(Dimension('user_lang') == 'en') & (Dimension('first_hashtag') == 'oscars') &\n               (Dimension('user_time_zone') == 'Pacific Time (US & Canada)') &\n               ~(Dimension('user_mention_name') == 'No Mention'),\n        metric='count',\n        threshold=10)\n\n    # asynchronously return results\n    # can be simply ```return top_mentions``` in python 3.x\n    raise gen.Return(top_mentions)\n```\n\n\n# thetaSketches\nTheta sketch Post aggregators are built slightly differently to normal Post Aggregators, as they have different operators.\nNote: you must have the ```druid-datasketches``` extension loaded into your Druid cluster in order to use these.\nSee the [Druid datasketches](http://druid.io/docs/latest/development/extensions-core/datasketches-aggregators.html) documentation for details.\n\n```python\nfrom pydruid.client import *\nfrom pydruid.utils import aggregators\nfrom pydruid.utils import filters\nfrom pydruid.utils import postaggregator\n\nquery = PyDruid(url_to_druid_broker, 'druid/v2')\nts = query.groupby(\n    datasource='test_datasource',\n    granularity='all',\n    intervals='2016-09-01/P1M',\n    filter = ( filters.Dimension('product').in_(['product_A', 'product_B'])),\n    aggregations={\n        'product_A_users': aggregators.filtered(\n            filters.Dimension('product') == 'product_A',\n            aggregators.thetasketch('user_id')\n            ),\n        'product_B_users': aggregators.filtered(\n            filters.Dimension('product') == 'product_B',\n            aggregators.thetasketch('user_id')\n            )\n    },\n    post_aggregations={\n        'both_A_and_B': postaggregator.ThetaSketchEstimate(\n            postaggregator.ThetaSketch('product_A_users') & postaggregator.ThetaSketch('product_B_users')\n            )\n    }\n)\n```\n\n# DB API\n\n```python\nfrom pydruid.db import connect\n\nconn = connect(host='localhost', port=8082, path='/druid/v2/sql/', scheme='http')\ncurs = conn.cursor()\ncurs.execute(\"\"\"\n    SELECT place,\n           CAST(REGEXP_EXTRACT(place, '(.*),', 1) AS FLOAT) AS lat,\n           CAST(REGEXP_EXTRACT(place, ',(.*)', 1) AS FLOAT) AS lon\n      FROM places\n     LIMIT 10\n\"\"\")\nfor row in curs:\n    print(row)\n```\n\n# SQLAlchemy\n\n```python\nfrom sqlalchemy import *\nfrom sqlalchemy.engine import create_engine\nfrom sqlalchemy.schema import *\n\nengine = create_engine('druid://localhost:8082/druid/v2/sql/')  # uses HTTP by default :(\n# engine = create_engine('druid+http://localhost:8082/druid/v2/sql/')\n# engine = create_engine('druid+https://localhost:8082/druid/v2/sql/')\n\nplaces = Table('places', MetaData(bind=engine), autoload=True)\nprint(select([func.count('*')], from_obj=places).scalar())\n```\n\n\n## Column headers\n\nIn version 0.13.0 Druid SQL added support for including the column names in the\nresponse which can be requested via the \"header\" field in the request. This\nhelps to ensure that the cursor description is defined (which is a requirement\nfor SQLAlchemy query statements) regardless on whether the result set contains\nany rows. Historically this was problematic for result sets which contained no\nrows at one could not infer the expected column names.\n\nEnabling the header can be configured via the SQLAlchemy URI by using the query\nparameter, i.e.,\n\n```python\nengine = create_engine('druid://localhost:8082/druid/v2/sql?header=true')\n```\n\nNote the current default is `false` to ensure backwards compatibility but should\nbe set to `true` for Druid versions >= 0.13.0.\n\n\n# Command line\n\n```bash\n$ pydruid http://localhost:8082/druid/v2/sql/\n> SELECT COUNT(*) AS cnt FROM places\n  cnt\n-----\n12345\n> SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES;\nTABLE_NAME\n----------\ntest_table\nCOLUMNS\nSCHEMATA\nTABLES\n> BYE;\nGoodBye!\n```\n\n# Contributing\n\nContributions are welcomed of course. We like to use `black` and `flake8`.\n\n```bash\npip install -r requirements-dev.txt  # installs useful dev deps\npre-commit install  # installs useful commit hooks\n```\n",
    "bugtrack_url": null,
    "license": "Apache License, Version 2.0",
    "summary": "A Python connector for Druid.",
    "version": "0.6.6",
    "project_urls": {
        "Bug Tracker": "https://github.com/druid-io/pydruid/issues",
        "Documentation": "https://pythonhosted.org/pydruid/",
        "Homepage": "https://druid.apache.org",
        "Source Code": "https://github.com/druid-io/pydruid"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "d416e828f8d653f2499b50b93bc421c1f3e13a71d21e287648f619a40556ce96",
                "md5": "378446217b8796125fa9ba20029b1e7b",
                "sha256": "d175ba103fc8361f643075301dc9bc43a6dcefdaa8293dc47136082d307fa3c2"
            },
            "downloads": -1,
            "filename": "pydruid-0.6.6.tar.gz",
            "has_sig": false,
            "md5_digest": "378446217b8796125fa9ba20029b1e7b",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 48423,
            "upload_time": "2023-12-01T18:38:37",
            "upload_time_iso_8601": "2023-12-01T18:38:37.836034Z",
            "url": "https://files.pythonhosted.org/packages/d4/16/e828f8d653f2499b50b93bc421c1f3e13a71d21e287648f619a40556ce96/pydruid-0.6.6.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-12-01 18:38:37",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "druid-io",
    "github_project": "pydruid",
    "travis_ci": true,
    "coveralls": false,
    "github_actions": false,
    "requirements": [],
    "tox": true,
    "lcname": "pydruid"
}
        
Elapsed time: 0.15211s