risingwave


Namerisingwave JSON
Version 0.1.1 PyPI version JSON
download
home_page
SummaryRisingWave Python API
upload_time2023-12-07 12:04:45
maintainer
docs_urlNone
authorRisingWave Labs
requires_python>=3.8
licenseApache Software License
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # RisingWave Python UDF SDK

This library provides a Python SDK for creating user-defined functions (UDF) in [RisingWave](https://www.risingwave.com/).

For a detailed guide on how to use Python UDF in RisingWave, please refer to [this doc](https://docs.risingwave.com/docs/current/udf-python/).

## Introduction

RisingWave supports user-defined functions implemented as external functions.
With the RisingWave Python UDF SDK, users can define custom UDFs using Python and start a Python process as a UDF server.
RisingWave can then remotely access the UDF server to execute the defined functions.

## Installation

```sh
pip install risingwave
```

## Usage

Define functions in a Python file:

```python
# udf.py
from risingwave.udf import udf, udtf, UdfServer
import struct
import socket

# Define a scalar function
@udf(input_types=['INT', 'INT'], result_type='INT')
def gcd(x, y):
    while y != 0:
        (x, y) = (y, x % y)
    return x

# Define a scalar function that returns multiple values (within a struct)
@udf(input_types=['BYTEA'], result_type='STRUCT<VARCHAR, VARCHAR, SMALLINT, SMALLINT>')
def extract_tcp_info(tcp_packet: bytes):
    src_addr, dst_addr = struct.unpack('!4s4s', tcp_packet[12:20])
    src_port, dst_port = struct.unpack('!HH', tcp_packet[20:24])
    src_addr = socket.inet_ntoa(src_addr)
    dst_addr = socket.inet_ntoa(dst_addr)
    return src_addr, dst_addr, src_port, dst_port

# Define a table function
@udtf(input_types='INT', result_types='INT')
def series(n):
    for i in range(n):
        yield i

# Start a UDF server
if __name__ == '__main__':
    server = UdfServer(location="0.0.0.0:8815")
    server.add_function(gcd)
    server.add_function(series)
    server.serve()
```

Start the UDF server:

```sh
python3 udf.py
```

To create functions in RisingWave, use the following syntax:

```sql
create function <name> ( <arg_type>[, ...] )
    [ returns <ret_type> | returns table ( <column_name> <column_type> [, ...] ) ]
    as <name_defined_in_server> using link '<udf_server_address>';
```

- The `as` parameter specifies the function name defined in the UDF server.
- The `link` parameter specifies the address of the UDF server.

For example:

```sql
create function gcd(int, int) returns int
as gcd using link 'http://localhost:8815';

create function series(int) returns table (x int)
as series using link 'http://localhost:8815';

select gcd(25, 15);

select * from series(10);
```

## Data Types

The RisingWave Python UDF SDK supports the following data types:

| SQL Type         | Python Type                    | Notes              |
| ---------------- | -----------------------------  | ------------------ |
| BOOLEAN          | bool                           |                    |
| SMALLINT         | int                            |                    |
| INT              | int                            |                    |
| BIGINT           | int                            |                    |
| REAL             | float                          |                    |
| DOUBLE PRECISION | float                          |                    |
| DECIMAL          | decimal.Decimal                |                    |
| DATE             | datetime.date                  |                    |
| TIME             | datetime.time                  |                    |
| TIMESTAMP        | datetime.datetime              |                    |
| INTERVAL         | MonthDayNano / (int, int, int) | Fields can be obtained by `months()`, `days()` and `nanoseconds()` from `MonthDayNano` |
| VARCHAR          | str                            |                    |
| BYTEA            | bytes                          |                    |
| JSONB            | any                            |                    |
| T[]              | list[T]                        |                    |
| STRUCT<>         | tuple                          |                    |
| ...others        |                                | Not supported yet. |

            

Raw data

            {
    "_id": null,
    "home_page": "",
    "name": "risingwave",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": "",
    "keywords": "",
    "author": "RisingWave Labs",
    "author_email": "",
    "download_url": "https://files.pythonhosted.org/packages/e0/aa/25dce50fde98c1973380bf58bff18d206fa3ece5a0e6dcdfdceedc1df267/risingwave-0.1.1.tar.gz",
    "platform": null,
    "description": "# RisingWave Python UDF SDK\n\nThis library provides a Python SDK for creating user-defined functions (UDF) in [RisingWave](https://www.risingwave.com/).\n\nFor a detailed guide on how to use Python UDF in RisingWave, please refer to [this doc](https://docs.risingwave.com/docs/current/udf-python/).\n\n## Introduction\n\nRisingWave supports user-defined functions implemented as external functions.\nWith the RisingWave Python UDF SDK, users can define custom UDFs using Python and start a Python process as a UDF server.\nRisingWave can then remotely access the UDF server to execute the defined functions.\n\n## Installation\n\n```sh\npip install risingwave\n```\n\n## Usage\n\nDefine functions in a Python file:\n\n```python\n# udf.py\nfrom risingwave.udf import udf, udtf, UdfServer\nimport struct\nimport socket\n\n# Define a scalar function\n@udf(input_types=['INT', 'INT'], result_type='INT')\ndef gcd(x, y):\n    while y != 0:\n        (x, y) = (y, x % y)\n    return x\n\n# Define a scalar function that returns multiple values (within a struct)\n@udf(input_types=['BYTEA'], result_type='STRUCT<VARCHAR, VARCHAR, SMALLINT, SMALLINT>')\ndef extract_tcp_info(tcp_packet: bytes):\n    src_addr, dst_addr = struct.unpack('!4s4s', tcp_packet[12:20])\n    src_port, dst_port = struct.unpack('!HH', tcp_packet[20:24])\n    src_addr = socket.inet_ntoa(src_addr)\n    dst_addr = socket.inet_ntoa(dst_addr)\n    return src_addr, dst_addr, src_port, dst_port\n\n# Define a table function\n@udtf(input_types='INT', result_types='INT')\ndef series(n):\n    for i in range(n):\n        yield i\n\n# Start a UDF server\nif __name__ == '__main__':\n    server = UdfServer(location=\"0.0.0.0:8815\")\n    server.add_function(gcd)\n    server.add_function(series)\n    server.serve()\n```\n\nStart the UDF server:\n\n```sh\npython3 udf.py\n```\n\nTo create functions in RisingWave, use the following syntax:\n\n```sql\ncreate function <name> ( <arg_type>[, ...] )\n    [ returns <ret_type> | returns table ( <column_name> <column_type> [, ...] ) ]\n    as <name_defined_in_server> using link '<udf_server_address>';\n```\n\n- The `as` parameter specifies the function name defined in the UDF server.\n- The `link` parameter specifies the address of the UDF server.\n\nFor example:\n\n```sql\ncreate function gcd(int, int) returns int\nas gcd using link 'http://localhost:8815';\n\ncreate function series(int) returns table (x int)\nas series using link 'http://localhost:8815';\n\nselect gcd(25, 15);\n\nselect * from series(10);\n```\n\n## Data Types\n\nThe RisingWave Python UDF SDK supports the following data types:\n\n| SQL Type         | Python Type                    | Notes              |\n| ---------------- | -----------------------------  | ------------------ |\n| BOOLEAN          | bool                           |                    |\n| SMALLINT         | int                            |                    |\n| INT              | int                            |                    |\n| BIGINT           | int                            |                    |\n| REAL             | float                          |                    |\n| DOUBLE PRECISION | float                          |                    |\n| DECIMAL          | decimal.Decimal                |                    |\n| DATE             | datetime.date                  |                    |\n| TIME             | datetime.time                  |                    |\n| TIMESTAMP        | datetime.datetime              |                    |\n| INTERVAL         | MonthDayNano / (int, int, int) | Fields can be obtained by `months()`, `days()` and `nanoseconds()` from `MonthDayNano` |\n| VARCHAR          | str                            |                    |\n| BYTEA            | bytes                          |                    |\n| JSONB            | any                            |                    |\n| T[]              | list[T]                        |                    |\n| STRUCT<>         | tuple                          |                    |\n| ...others        |                                | Not supported yet. |\n",
    "bugtrack_url": null,
    "license": "Apache Software License",
    "summary": "RisingWave Python API",
    "version": "0.1.1",
    "project_urls": null,
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "587a31a1f80960031357f0f0a90f234c547e77e0076493eeb294dffa5b7dda2b",
                "md5": "a28f28b48f55c79da29ff5af848e6fc3",
                "sha256": "236b58f2cc8cb5525baec6e6710d1ce9aedad0212b00d1d4dce275dae2ddd379"
            },
            "downloads": -1,
            "filename": "risingwave-0.1.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "a28f28b48f55c79da29ff5af848e6fc3",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 11158,
            "upload_time": "2023-12-07T12:04:42",
            "upload_time_iso_8601": "2023-12-07T12:04:42.394947Z",
            "url": "https://files.pythonhosted.org/packages/58/7a/31a1f80960031357f0f0a90f234c547e77e0076493eeb294dffa5b7dda2b/risingwave-0.1.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "e0aa25dce50fde98c1973380bf58bff18d206fa3ece5a0e6dcdfdceedc1df267",
                "md5": "7a15693c25bb281087cb630899ff403f",
                "sha256": "ddfb031c3582852f077c36f99dcd540f5fa4b73e44f950c0d926bdb59795095a"
            },
            "downloads": -1,
            "filename": "risingwave-0.1.1.tar.gz",
            "has_sig": false,
            "md5_digest": "7a15693c25bb281087cb630899ff403f",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 10481,
            "upload_time": "2023-12-07T12:04:45",
            "upload_time_iso_8601": "2023-12-07T12:04:45.060356Z",
            "url": "https://files.pythonhosted.org/packages/e0/aa/25dce50fde98c1973380bf58bff18d206fa3ece5a0e6dcdfdceedc1df267/risingwave-0.1.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-12-07 12:04:45",
    "github": false,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "lcname": "risingwave"
}
        
Elapsed time: 0.87771s