# RisingWave Python UDF SDK
This library provides a Python SDK for creating user-defined functions (UDF) in [RisingWave](https://www.risingwave.com/).
For a detailed guide on how to use Python UDF in RisingWave, please refer to [this doc](https://docs.risingwave.com/docs/current/udf-python/).
## Introduction
RisingWave supports user-defined functions implemented as external functions.
With the RisingWave Python UDF SDK, users can define custom UDFs using Python and start a Python process as a UDF server.
RisingWave can then remotely access the UDF server to execute the defined functions.
## Installation
```sh
pip install risingwave
```
## Usage
Define functions in a Python file:
```python
# udf.py
from risingwave.udf import udf, udtf, UdfServer
import struct
import socket
# Define a scalar function
@udf(input_types=['INT', 'INT'], result_type='INT')
def gcd(x, y):
while y != 0:
(x, y) = (y, x % y)
return x
# Define a scalar function that returns multiple values (within a struct)
@udf(input_types=['BYTEA'], result_type='STRUCT<VARCHAR, VARCHAR, SMALLINT, SMALLINT>')
def extract_tcp_info(tcp_packet: bytes):
src_addr, dst_addr = struct.unpack('!4s4s', tcp_packet[12:20])
src_port, dst_port = struct.unpack('!HH', tcp_packet[20:24])
src_addr = socket.inet_ntoa(src_addr)
dst_addr = socket.inet_ntoa(dst_addr)
return src_addr, dst_addr, src_port, dst_port
# Define a table function
@udtf(input_types='INT', result_types='INT')
def series(n):
for i in range(n):
yield i
# Start a UDF server
if __name__ == '__main__':
server = UdfServer(location="0.0.0.0:8815")
server.add_function(gcd)
server.add_function(series)
server.serve()
```
Start the UDF server:
```sh
python3 udf.py
```
To create functions in RisingWave, use the following syntax:
```sql
create function <name> ( <arg_type>[, ...] )
[ returns <ret_type> | returns table ( <column_name> <column_type> [, ...] ) ]
as <name_defined_in_server> using link '<udf_server_address>';
```
- The `as` parameter specifies the function name defined in the UDF server.
- The `link` parameter specifies the address of the UDF server.
For example:
```sql
create function gcd(int, int) returns int
as gcd using link 'http://localhost:8815';
create function series(int) returns table (x int)
as series using link 'http://localhost:8815';
select gcd(25, 15);
select * from series(10);
```
## Data Types
The RisingWave Python UDF SDK supports the following data types:
| SQL Type | Python Type | Notes |
| ---------------- | ----------------------------- | ------------------ |
| BOOLEAN | bool | |
| SMALLINT | int | |
| INT | int | |
| BIGINT | int | |
| REAL | float | |
| DOUBLE PRECISION | float | |
| DECIMAL | decimal.Decimal | |
| DATE | datetime.date | |
| TIME | datetime.time | |
| TIMESTAMP | datetime.datetime | |
| INTERVAL | MonthDayNano / (int, int, int) | Fields can be obtained by `months()`, `days()` and `nanoseconds()` from `MonthDayNano` |
| VARCHAR | str | |
| BYTEA | bytes | |
| JSONB | any | |
| T[] | list[T] | |
| STRUCT<> | tuple | |
| ...others | | Not supported yet. |
Raw data
{
"_id": null,
"home_page": "",
"name": "risingwave",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": "",
"keywords": "",
"author": "RisingWave Labs",
"author_email": "",
"download_url": "https://files.pythonhosted.org/packages/e0/aa/25dce50fde98c1973380bf58bff18d206fa3ece5a0e6dcdfdceedc1df267/risingwave-0.1.1.tar.gz",
"platform": null,
"description": "# RisingWave Python UDF SDK\n\nThis library provides a Python SDK for creating user-defined functions (UDF) in [RisingWave](https://www.risingwave.com/).\n\nFor a detailed guide on how to use Python UDF in RisingWave, please refer to [this doc](https://docs.risingwave.com/docs/current/udf-python/).\n\n## Introduction\n\nRisingWave supports user-defined functions implemented as external functions.\nWith the RisingWave Python UDF SDK, users can define custom UDFs using Python and start a Python process as a UDF server.\nRisingWave can then remotely access the UDF server to execute the defined functions.\n\n## Installation\n\n```sh\npip install risingwave\n```\n\n## Usage\n\nDefine functions in a Python file:\n\n```python\n# udf.py\nfrom risingwave.udf import udf, udtf, UdfServer\nimport struct\nimport socket\n\n# Define a scalar function\n@udf(input_types=['INT', 'INT'], result_type='INT')\ndef gcd(x, y):\n while y != 0:\n (x, y) = (y, x % y)\n return x\n\n# Define a scalar function that returns multiple values (within a struct)\n@udf(input_types=['BYTEA'], result_type='STRUCT<VARCHAR, VARCHAR, SMALLINT, SMALLINT>')\ndef extract_tcp_info(tcp_packet: bytes):\n src_addr, dst_addr = struct.unpack('!4s4s', tcp_packet[12:20])\n src_port, dst_port = struct.unpack('!HH', tcp_packet[20:24])\n src_addr = socket.inet_ntoa(src_addr)\n dst_addr = socket.inet_ntoa(dst_addr)\n return src_addr, dst_addr, src_port, dst_port\n\n# Define a table function\n@udtf(input_types='INT', result_types='INT')\ndef series(n):\n for i in range(n):\n yield i\n\n# Start a UDF server\nif __name__ == '__main__':\n server = UdfServer(location=\"0.0.0.0:8815\")\n server.add_function(gcd)\n server.add_function(series)\n server.serve()\n```\n\nStart the UDF server:\n\n```sh\npython3 udf.py\n```\n\nTo create functions in RisingWave, use the following syntax:\n\n```sql\ncreate function <name> ( <arg_type>[, ...] )\n [ returns <ret_type> | returns table ( <column_name> <column_type> [, ...] ) ]\n as <name_defined_in_server> using link '<udf_server_address>';\n```\n\n- The `as` parameter specifies the function name defined in the UDF server.\n- The `link` parameter specifies the address of the UDF server.\n\nFor example:\n\n```sql\ncreate function gcd(int, int) returns int\nas gcd using link 'http://localhost:8815';\n\ncreate function series(int) returns table (x int)\nas series using link 'http://localhost:8815';\n\nselect gcd(25, 15);\n\nselect * from series(10);\n```\n\n## Data Types\n\nThe RisingWave Python UDF SDK supports the following data types:\n\n| SQL Type | Python Type | Notes |\n| ---------------- | ----------------------------- | ------------------ |\n| BOOLEAN | bool | |\n| SMALLINT | int | |\n| INT | int | |\n| BIGINT | int | |\n| REAL | float | |\n| DOUBLE PRECISION | float | |\n| DECIMAL | decimal.Decimal | |\n| DATE | datetime.date | |\n| TIME | datetime.time | |\n| TIMESTAMP | datetime.datetime | |\n| INTERVAL | MonthDayNano / (int, int, int) | Fields can be obtained by `months()`, `days()` and `nanoseconds()` from `MonthDayNano` |\n| VARCHAR | str | |\n| BYTEA | bytes | |\n| JSONB | any | |\n| T[] | list[T] | |\n| STRUCT<> | tuple | |\n| ...others | | Not supported yet. |\n",
"bugtrack_url": null,
"license": "Apache Software License",
"summary": "RisingWave Python API",
"version": "0.1.1",
"project_urls": null,
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "587a31a1f80960031357f0f0a90f234c547e77e0076493eeb294dffa5b7dda2b",
"md5": "a28f28b48f55c79da29ff5af848e6fc3",
"sha256": "236b58f2cc8cb5525baec6e6710d1ce9aedad0212b00d1d4dce275dae2ddd379"
},
"downloads": -1,
"filename": "risingwave-0.1.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "a28f28b48f55c79da29ff5af848e6fc3",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 11158,
"upload_time": "2023-12-07T12:04:42",
"upload_time_iso_8601": "2023-12-07T12:04:42.394947Z",
"url": "https://files.pythonhosted.org/packages/58/7a/31a1f80960031357f0f0a90f234c547e77e0076493eeb294dffa5b7dda2b/risingwave-0.1.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "e0aa25dce50fde98c1973380bf58bff18d206fa3ece5a0e6dcdfdceedc1df267",
"md5": "7a15693c25bb281087cb630899ff403f",
"sha256": "ddfb031c3582852f077c36f99dcd540f5fa4b73e44f950c0d926bdb59795095a"
},
"downloads": -1,
"filename": "risingwave-0.1.1.tar.gz",
"has_sig": false,
"md5_digest": "7a15693c25bb281087cb630899ff403f",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 10481,
"upload_time": "2023-12-07T12:04:45",
"upload_time_iso_8601": "2023-12-07T12:04:45.060356Z",
"url": "https://files.pythonhosted.org/packages/e0/aa/25dce50fde98c1973380bf58bff18d206fa3ece5a0e6dcdfdceedc1df267/risingwave-0.1.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-12-07 12:04:45",
"github": false,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"lcname": "risingwave"
}