# threadingpg
Control PostgreSQL using thread(s).
## Initialize Controller
```python
import threadingpg
controller = threadingpg.Controller()
controller.connect(dbname='database_name', user='user_name', password='password', port=5432)
# ...
controller.close()
```
## Initialize Pool
```python
import threadingpg
controller = threadingpg.Pool(dbname='database_name', user='user_name', password='password', port=5432)
# ...
controller.close()
```
## Table and Row
### Table
```python
import threadingpg
class MyTable(threadingpg.Table):
table_name="mytable"
index = threadingpg.Column(data_type=threadingpg.types.serial)
name = threadingpg.Column(data_type=threadingpg.types.varchar())
# or
class MyTable(threadingpg.Table):
def __init__(self) -> None:
self.index = threadingpg.Column(data_type=threadingpg.types.serial)
self.name = threadingpg.Column(data_type=threadingpg.types.varchar())
super().__init__("mytable") # important position
```
#### Create/Drop Table
```python
mytable = MyTable()
controller.create_table(mytable)
controller.drop_table(mytable)
```
### Row
```python
class MyRow(threadingpg.Row):
def __init__(self, name:str=None) -> None:
self.name = name
```
#### Insert Row
```python
mytable = MyTable()
myrow = MyRow("my_row")
controller.insert_row(mytable, myrow)
# or
controller.insert_dict(mytable, {"name":"my_row"})
```
#### Select Row
```python
mytable = MyTable()
column_name_list, rows = controller.select(mytable)
for row in rows:
myrow = MyRow()
myrow.set_data(column_name_list, row)
print(f"output: {myrow.name}") # output: my_row
```
#### Update Row
```python
mytable = MyTable()
myrow = MyRow("update_my_row")
condition_equal_0 = threadingpg.condition.Equal(mytable.index, 0)
controller.update_row(mytable, myrow, condition_equal_0)
```
#### Delete Row
```python
mytable = MyTable()
delete_condition = threadingpg.condition.Equal(mytable.index, 5)
controller.delete_row(mytable, delete_condition)
```
### Conditions
#### Where
```python
mytable = MyTable()
condition_equal_1 = threadingpg.condition.Equal(mytable.index, 1)
condition_equal_2 = threadingpg.condition.Equal(mytable.index, 2)
condition_equal_3 = threadingpg.condition.Equal(mytable.index, 3)
conditions = threadingpg.condition.Or(condition_equal_1, condition_equal_2, condition_equal_3)
column_name_list, rows = controller.select(mytable, where=conditions)
```
#### OrderBy
```python
mytable = MyTable()
orderby_index = threadingpg.condition.OrderBy(mytable.index)
orderby_name = threadingpg.condition.OrderBy(mytable.name, True)
orderby_conditions = threadingpg.condition.And(orderby_index, orderby_name)
column_name_list, rows = controller.select(mytable, order_by=orderby_conditions)
```
## Trigger
Need delay each function.
```python
mytable = MyTable()
channel_name = "mych"
trigger_name = "mytr"
function_name = "myfn"
listner = threadingpg.TriggerListner()
# implement 'notify = listner.notify_queue.get()'
listner.connect(dbname=dbname, user=user, password=password, port=5432)
listner.create_function(function_name, channel_name)
listner.create_trigger(mytable, trigger_name, function_name)
listner.start_listening()
listner.listen_channel(channel_name)
# ...
listner.unlisten_channel(channel_name)
listner.stop_listening()
```
Raw data
{
"_id": null,
"home_page": "",
"name": "threadingpg",
"maintainer": "",
"docs_url": null,
"requires_python": ">=3.10",
"maintainer_email": "",
"keywords": "postgres,postgresql,thread,thread postgresql,threading postgresql",
"author": "",
"author_email": "Chor <chorong8883@gmail.com>",
"download_url": "https://files.pythonhosted.org/packages/d6/f3/b044ee3f6f753843ff7e812a5645d1e22325d5492d9c51a0cec4a16f0145/threadingpg-0.0.6.tar.gz",
"platform": null,
"description": "# threadingpg\nControl PostgreSQL using thread(s).\n\n## Initialize Controller \n```python \nimport threadingpg\ncontroller = threadingpg.Controller()\ncontroller.connect(dbname='database_name', user='user_name', password='password', port=5432)\n# ...\ncontroller.close()\n```\n\n## Initialize Pool \n```python \nimport threadingpg\ncontroller = threadingpg.Pool(dbname='database_name', user='user_name', password='password', port=5432)\n# ...\ncontroller.close()\n```\n\n## Table and Row\n### Table\n```python \nimport threadingpg\nclass MyTable(threadingpg.Table):\n table_name=\"mytable\"\n index = threadingpg.Column(data_type=threadingpg.types.serial)\n name = threadingpg.Column(data_type=threadingpg.types.varchar())\n# or \nclass MyTable(threadingpg.Table):\n def __init__(self) -> None:\n self.index = threadingpg.Column(data_type=threadingpg.types.serial)\n self.name = threadingpg.Column(data_type=threadingpg.types.varchar())\n super().__init__(\"mytable\") # important position\n```\n#### Create/Drop Table\n```python \nmytable = MyTable()\ncontroller.create_table(mytable)\ncontroller.drop_table(mytable)\n```\n\n### Row\n```python\nclass MyRow(threadingpg.Row):\n def __init__(self, name:str=None) -> None:\n self.name = name\n```\n#### Insert Row\n```python\nmytable = MyTable()\nmyrow = MyRow(\"my_row\")\ncontroller.insert_row(mytable, myrow)\n# or\ncontroller.insert_dict(mytable, {\"name\":\"my_row\"})\n```\n#### Select Row\n```python\nmytable = MyTable()\ncolumn_name_list, rows = controller.select(mytable)\nfor row in rows:\n myrow = MyRow()\n myrow.set_data(column_name_list, row)\n print(f\"output: {myrow.name}\") # output: my_row\n```\n#### Update Row\n```python\nmytable = MyTable()\nmyrow = MyRow(\"update_my_row\")\ncondition_equal_0 = threadingpg.condition.Equal(mytable.index, 0)\ncontroller.update_row(mytable, myrow, condition_equal_0)\n```\n#### Delete Row\n```python\nmytable = MyTable()\ndelete_condition = threadingpg.condition.Equal(mytable.index, 5)\ncontroller.delete_row(mytable, delete_condition)\n```\n\n### Conditions\n#### Where \n```python\nmytable = MyTable()\ncondition_equal_1 = threadingpg.condition.Equal(mytable.index, 1)\ncondition_equal_2 = threadingpg.condition.Equal(mytable.index, 2)\ncondition_equal_3 = threadingpg.condition.Equal(mytable.index, 3)\nconditions = threadingpg.condition.Or(condition_equal_1, condition_equal_2, condition_equal_3)\ncolumn_name_list, rows = controller.select(mytable, where=conditions)\n```\n#### OrderBy\n```python\nmytable = MyTable()\norderby_index = threadingpg.condition.OrderBy(mytable.index)\norderby_name = threadingpg.condition.OrderBy(mytable.name, True)\norderby_conditions = threadingpg.condition.And(orderby_index, orderby_name)\ncolumn_name_list, rows = controller.select(mytable, order_by=orderby_conditions)\n```\n\n## Trigger\nNeed delay each function.\n```python\nmytable = MyTable()\nchannel_name = \"mych\"\ntrigger_name = \"mytr\"\nfunction_name = \"myfn\"\n\nlistner = threadingpg.TriggerListner()\n# implement 'notify = listner.notify_queue.get()'\n\nlistner.connect(dbname=dbname, user=user, password=password, port=5432)\nlistner.create_function(function_name, channel_name)\nlistner.create_trigger(mytable, trigger_name, function_name)\n\nlistner.start_listening()\nlistner.listen_channel(channel_name)\n# ...\nlistner.unlisten_channel(channel_name)\nlistner.stop_listening()\n```",
"bugtrack_url": null,
"license": "",
"summary": "Simple control 'psycopg2'(PostgreSQL).",
"version": "0.0.6",
"project_urls": {
"Homepage": "https://github.com/chorong8883/threadingpg"
},
"split_keywords": [
"postgres",
"postgresql",
"thread",
"thread postgresql",
"threading postgresql"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "9f3eb9409af915c45f664dc9b3c9e11dbcee06898de0fefb7f419195bb6e18f0",
"md5": "bf7a55990d01383c0f9938504796c13b",
"sha256": "f3f22d6987542c80c1308da4a7de5652ae0a7351ed7a6ae099df4a3b55589402"
},
"downloads": -1,
"filename": "threadingpg-0.0.6-py3-none-any.whl",
"has_sig": false,
"md5_digest": "bf7a55990d01383c0f9938504796c13b",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.10",
"size": 13160,
"upload_time": "2023-12-27T02:09:18",
"upload_time_iso_8601": "2023-12-27T02:09:18.035079Z",
"url": "https://files.pythonhosted.org/packages/9f/3e/b9409af915c45f664dc9b3c9e11dbcee06898de0fefb7f419195bb6e18f0/threadingpg-0.0.6-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "d6f3b044ee3f6f753843ff7e812a5645d1e22325d5492d9c51a0cec4a16f0145",
"md5": "dab278f849beee8ea4551969b8975711",
"sha256": "0d3b45fe7b8a1dbe4ff4f811fb015856e66cbc2a11c39c02958b92d1ce707f14"
},
"downloads": -1,
"filename": "threadingpg-0.0.6.tar.gz",
"has_sig": false,
"md5_digest": "dab278f849beee8ea4551969b8975711",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.10",
"size": 12537,
"upload_time": "2023-12-27T02:09:19",
"upload_time_iso_8601": "2023-12-27T02:09:19.963978Z",
"url": "https://files.pythonhosted.org/packages/d6/f3/b044ee3f6f753843ff7e812a5645d1e22325d5492d9c51a0cec4a16f0145/threadingpg-0.0.6.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2023-12-27 02:09:19",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "chorong8883",
"github_project": "threadingpg",
"travis_ci": false,
"coveralls": false,
"github_actions": false,
"lcname": "threadingpg"
}