# 1. Introduction
This library is the implementation of `data-services data source` for [Lightweight Data Provider](https://github.com/th2-net/th2-lw-data-provider) (LwDP).
See more about `data-services data source` [here](https://not_implemented_yet_relates_to_TH2-4185).
# 2. Getting started
## 2.1. Installation
- From PyPI (pip)
This package can be found on [PyPI](https://pypi.org/project/th2-data-services-lwdp/ "th2-data-services-lwdp").
```
pip install th2-data-services-lwdp
```
## 2.2. Releases
Each release has separate branch indicated by `DataSourceMajorVersion` of branch name.
Available versions:
|Data Source Name| Req. provider version for DS impl | DS Impl Status | DS Impl version |DS Impl grpc version| Features |
|--|-----------------------------------------------------------------------------|----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|--|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|LwDP| [1.1.0](https://github.com/th2-net/th2-lw-data-provider/tree/v1.1.0) | Canceled | [1.0.1.0](https://github.com/th2-net/th2-ds-source-lwdp/tree/dev_1.0.1.0) |[1.1.0](https://github.com/th2-net/th2-grpc-data-provider/blob/756e6841a4f3923789486fd17a39a25176f50a20/src/main/proto/th2_grpc_data_provider/data_provider.proto) <br> *LwDP 1.1.0 do not use all RPCs and all fields from grpc 1.1.0 because this PROTO file is shared with RDP6. This was solved since 1.1.1*| |
|LwDP| [2.0.0](https://github.com/th2-net/th2-lw-data-provider/tree/dev-version-2) | Released | [2.x.y.z](https://github.com/th2-net/th2-ds-source-lwdp/tree/release-2.0) <br><br> [release](https://github.com/th2-net/th2-ds-source-lwdp/releases/tag/v2.1.0.3) |Canceled <br> *We decided to not implement GRPC version because it works more slowly than http*| groups + books & pages |
|LwDP| _3.0.1_ <br> (actually 2.6.0+ with TP mode*) | Released | [3.0.1.0+](https://github.com/th2-net/th2-ds-source-lwdp/tree/release-3.0) <br><br> [release](https://github.com/th2-net/th2-ds-source-lwdp/releases/tag/v3.0.1.0) |Not supported by Impl| Transp proto |
|LwDP| _3.1.0_ <br> (actually 2.6.0+ with TP mode*) | Released | [3.1.0.0+](https://github.com/th2-net/th2-ds-source-lwdp/tree/v3.1.0.1) <br><br> [release](https://github.com/th2-net/th2-ds-source-lwdp/releases/tag/v3.1.0.1) |Not supported by Impl| ds-impl 3.1.x.y is appeared because of few not backward compatible changes [https://github.com/th2-net/th2-ds-source-lwdp/releases/tag/v3.1.0.0](https://github.com/th2-net/th2-ds-source-lwdp/releases/tag/v3.1.0.0) |
|LwDP| _3.1.1_ <br> (actually 2.12.0+ with TP mode*) | Released | [3.1.1.0+](https://github.com/th2-net/th2-ds-source-lwdp/tree/v3.1.1.0) <br><br> [release](https://github.com/th2-net/th2-ds-source-lwdp/releases/tag/v3.1.1.0) |Not supported by Impl| Added in LwDP <br> - `/download/events` endpoint to download events as file in JSONL format <br> - `EVENTS` resource option for `/download` task endpoint |
|LwDP| _3.1.2_ <br> (actually 2.12.0+ with TP mode*) | Released | [3.1.2.0+](https://github.com/th2-net/th2-ds-source-lwdp/tree/3.1.2.0) <br><br> [release](https://github.com/th2-net/th2-ds-source-lwdp/releases/tag/3.1.2.0) |Not supported by Impl| Added `gzip: bool` parameter that indicates whether to include or not `gzip` in the Accept-Encoding header. |
|LwDP| _3.1.3_ <br> | In progress | [3.1.3.0+](https://github.com/th2-net/th2-ds-source-lwdp/tree/master) <br><br> |Not supported by Impl| |
\* TP mode – transport protocol mode in LwDP.
## 2.3. Release versioning
Implementations versions have the following structure: `DataSourceMajorVersion`.`ImplVerison`
`DataSourceMajorVersion` - the major version of LwDP the release uses
`ImplVerison` - the version of data source implementation in `Major`.`Minor`.`Patch` versioning semantic style
For example `v1.0.1.0` is the version for LwDP `v1.x.y`. The implementation version `0.1.0`.
# 3 Examples
<!-- start get_started_example.py -->
```python
from typing import List
from th2_data_services.event_tree import EventTreeCollection
from th2_data_services.data_source.lwdp.commands import http as commands
from th2_data_services.data_source.lwdp.data_source import DataSource
from th2_data_services.data_source.lwdp.event_tree import ETCDriver
from th2_data_services.data_source.lwdp.streams import Streams, Stream
from th2_data_services.data import Data
from datetime import datetime
from th2_data_services.data_source.lwdp import Page
# About this example
# The following document shows common features of the library.
# Read command's docstrings to know more about commands features.
# Initialize some variables that will be used in this example.
book_id = "demo_book_1" # demo_book_1 is an example book from host namespace
page_name = "1" # example page name from book demo_book_1
scopes = ["th2-scope"] # Event scope - similar to stream for messages.
# [0] Streams
# Stream is a string that looks like `alias:direction`
# - You can provide only aliases as streams, in this way all directions
# will be requested for stream.
# - Stream objects to set up exact direction.
# - Streams object to set up exact direction for all aliases.
# - Mix of them.
# We can use a list of aliases.
streams = [
"default-message-producer-alias",
"fix-demo-server1",
"fix-demo-server2",
"fix-client2",
"fix-client1",
]
# A list of Stream objects.
streams_list_with_stream_object = [
Stream("default-message-producer-alias", direction=1),
Stream("fix-demo-server1", direction=2),
Stream("fix-demo-server2"), # Both directions.
Stream("fix-client1", direction=1),
Stream("fix-client2", direction=1),
]
# Or a Streams object, which takes a list of aliases as argument.
streams_direction1 = Streams(streams, direction=1)
groups = streams # In this namespace groups and streams have same name.
# Date has to be in utc timezone.
START_TIME = datetime(year=2023, month=1, day=5, hour=13, minute=57, second=5, microsecond=0)
END_TIME = datetime(year=2023, month=1, day=5, hour=13, minute=57, second=6, microsecond=0)
# [1] Create data source object to connect to lightweight data provider.
provider_url_link = "http://10.100.66.105:32681"
data_source = DataSource(provider_url_link)
# [2] Getting books, pages, scopes, groups and aliases.
# [2.1] Get books.
# On database data is segregated with books, such as they never intersect.
# To get the names of the books we have a command GetBooks which takes no argument.
books: List[str] = data_source.command(commands.GetBooks())
# [2.2] Get pages.
# This command returns objects of Page class
# GetPages with only book_id returns all pages.
pages_all: Data[Page] = data_source.command(commands.GetPages(book_id))
# GetPages with timestamps returns all pages within that time frame.
pages: Data[Page] = data_source.command(commands.GetPages(book_id, START_TIME, END_TIME))
# [2.3] Get scopes.
# Some events are grouped by scopes, which we can get using GetScopes command.
book_scopes: List[str] = data_source.command(commands.GetEventScopes(book_id))
# [2.4] Get message aliases.
aliases: List[str] = data_source.command(commands.GetMessageAliases(book_id))
# [2.5] Get message groups.
book_groups: List[str] = data_source.command(commands.GetMessageGroups(book_id))
# [3] Getting events and messages.
# [3.1] Get events/messages by ID.
# These commands will raise Exception if the event/message is not found.
# If you don't want to get Exception, use `use_stub=True` commands parameter.
# In this way you will get event/message stub.
# [3.1.1] Get events by id.
single_event: dict = data_source.command(
commands.GetEventById(
"demo_book_1:th2-scope:20221226140719671764000:9c59694b-8526-11ed-8311-df33e1b504e4"
)
)
multiple_events: List[dict] = data_source.command(
commands.GetEventsById(
[
"demo_book_1:th2-scope:20221226140719671764000:9c59694b-8526-11ed-8311-df33e1b504e4",
"demo_book_1:th2-scope:20221226140723967243000:9ee8edcc-8526-11ed-8311-df33e1b504e4",
"demo_book_1:th2-scope:20221226140724065522000:9ef7e1ed-8526-11ed-8311-df33e1b504e4",
]
)
)
# [3.1.2] Get messages by id.
single_message: dict = data_source.command(
commands.GetMessageById("case3:arfq02fix30:2:20221111165012889502000:1668182272676097251")
)
multiple_messages: List[dict] = data_source.command(
commands.GetMessagesById(
[
"case3:arfq02fix30:2:20221111165012889502000:1668182272676097251",
"case3:arfq02fix30:2:20221111165252889876000:1668182272676097315",
]
)
)
# [3.2] Get events/messages by BOOK.
# [3.2.1] Get events by BOOK, scopes and time interval.
events: Data[dict] = data_source.command(
commands.GetEventsByBookByScopes(
start_timestamp=START_TIME, end_timestamp=END_TIME, book_id=book_id, scopes=scopes
)
)
# [3.2.2] Get messages by BOOK, streams and time interval.
# streams: List of aliases to request. If direction is not specified all directions
# will be requested for stream.
# You can also use Stream and Streams classes to set up them (see streams section [0]).
messages_by_stream: Data[dict] = data_source.command(
commands.GetMessagesByBookByStreams(
start_timestamp=START_TIME,
end_timestamp=END_TIME,
streams=streams,
book_id=book_id,
)
)
# [3.2.3] Get messages by BOOK, groups and time interval.
messages_by_group: Data[dict] = data_source.command(
commands.GetMessagesByBookByGroupsSse(
start_timestamp=START_TIME, end_timestamp=END_TIME, groups=groups, book_id=book_id
)
)
# [3.3] Get events/messages by PAGE.
# This set of commands allows you to get data by specific page instead of datetime range.
# GetByPage commands accept Page class objects as argument.
# Alternatively they also accept page name with book id.
page: Page = list(pages)[0]
events_by_page_by_scopes: Data[dict] = data_source.command(
commands.GetEventsByPageByScopes(page=page, scopes=["th2-scope"])
)
events_by_page_name_by_scopes: Data[dict] = data_source.command(
commands.GetEventsByPageByScopes(page=page_name, book_id=book_id, scopes=["th2-scope"])
)
messages_by_page_by_streams: Data[dict] = data_source.command(
commands.GetMessagesByPageByStreams(page=page, stream=streams)
)
messages_by_page_name_by_streams: Data[dict] = data_source.command(
commands.GetMessagesByPageByStreams(page=page_name, book_id=book_id, stream=streams)
)
messages_by_page_by_groups: Data[dict] = data_source.command(
commands.GetMessagesByPageByGroupsSse(page=page, groups=groups)
)
messages_by_page_name_by_groups: Data[dict] = data_source.command(
commands.GetMessagesByPageByGroupsSse(page=page_name, book_id=book_id, groups=groups)
)
# [4] ETCDriver
# To work with EventTreeCollection and its children we need to use special driver.
# This driver contains lwdp-related methods that ETC required.
# [4.1] Init driver
etc_driver = ETCDriver(data_source=data_source, use_stub=False)
# [4.2] Init ETC object
etc = EventTreeCollection(etc_driver)
# [4.3] Build Event Trees inside ETC and recover unknown events if it has them.
etc.build(events)
etc.recover_unknown_events()
# See more info about how to use ETC in th2-data-services lib documentation.
```
<!-- end get_started_example.py -->
## Changes in LwDP 3.* against LwDP 2.*
Changes mostly affect how messages are represented in LwDP V3.
In V3 message id will also get a group section, and the new format will look like:
`book:group:session_alias:direction:timestamp:sequence`
The Main changes are in the body field of a message:
* METADATA
* the metadata does not contain duplicated information from the top message (direction, sequence, timestamp, sessionId). Only subsequence, messageType and protocol are left (inside metadata).
* metadata block doesn’t have fixed structure. So if there is only 1 parsed was produced from raw message, metadata will not have this key.
* ONLY messageType is required field in metadata.
* BODY
* The body is a collection (list) now (if the raw message produced a single parsed message, it will have only 1 element. If raw messages produced more than 1 message, all of them will be in that collection in the order they were produced).
* subsequence
* is always list.
* may be missing in metadata. It means [1].
Here is a small example on how to use expander to expand single message into multiple ones:
```python
from th2_data_services.data import Data
from th2_data_services.data_source.lwdp.resolver import MessageFieldResolver
# message in this example have 2 items in its body
message = {
"timestamp":{"epochSecond":1682680778,"nano":807953000},
"direction":"IN",
"sessionId":"ouch_1_1",
"attachedEventIds":[],
"body":[
{
"metadata":
{
"subsequence":[1],
"messageType":"SequencedDataPacket",
"protocol":"protocol"
},
"fields":
{
"MessageLength":55,
"MessageType":83
}
},
{
"metadata":
{
"subsequence":[2],
"messageType":"OrderExecuted",
"protocol":"protocol"
},
"fields":
{
"MessageType":69,
"Timestamp":1682399803928773173,
"OrderToken":"lzgjaynpgynbg1",
"OrderBookID":110616,
"TradedQuantity":50,
"TradePrice":5000,
"MatchID":"j\ufffdh\u0003\u0000\u0000\u0000\u0006\u0000\u0000\u0000",
"DealSource":1,
"MatchAttributes":5
}
}
],
"messageId":"store_perf_test:ouch_1_1:1:20230428111938807953000:1682680778806000001"
}
message_data = Data([message])
mfr = MessageFieldResolver()
message_data = message_data.map(mfr.expand_message)
print(message_data) # we should now have 2 messages built from the body list of original message.
```
Raw data
{
"_id": null,
"home_page": "https://github.com/th2-net/th2-ds-source-lwdp",
"name": "th2-data-services-lwdp",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": null,
"author": "TH2-devs",
"author_email": "th2-devs@exactprosystems.com",
"download_url": "https://files.pythonhosted.org/packages/45/de/2bba4b62d864153febdefc878680e7707db2c8ee177aa253b18108033a91/th2_data_services_lwdp-3.1.2.1.tar.gz",
"platform": null,
"description": "# 1. Introduction\n\nThis library is the implementation of `data-services data source` for [Lightweight Data Provider](https://github.com/th2-net/th2-lw-data-provider) (LwDP).\n\nSee more about `data-services data source` [here](https://not_implemented_yet_relates_to_TH2-4185).\n\n# 2. Getting started\n\n## 2.1. Installation\n\n- From PyPI (pip) \n This package can be found on [PyPI](https://pypi.org/project/th2-data-services-lwdp/ \"th2-data-services-lwdp\").\n ```\n pip install th2-data-services-lwdp\n ```\n\n## 2.2. Releases\n\nEach release has separate branch indicated by `DataSourceMajorVersion` of branch name.\n\nAvailable versions:\n\n|Data Source Name| Req. provider version for DS impl | DS Impl Status | DS Impl version |DS Impl grpc version| Features |\n|--|-----------------------------------------------------------------------------|----------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------|--|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|\n|LwDP| [1.1.0](https://github.com/th2-net/th2-lw-data-provider/tree/v1.1.0) | Canceled | [1.0.1.0](https://github.com/th2-net/th2-ds-source-lwdp/tree/dev_1.0.1.0) |[1.1.0](https://github.com/th2-net/th2-grpc-data-provider/blob/756e6841a4f3923789486fd17a39a25176f50a20/src/main/proto/th2_grpc_data_provider/data_provider.proto) <br> *LwDP 1.1.0 do not use all RPCs and all fields from grpc 1.1.0 because this PROTO file is shared with RDP6. This was solved since 1.1.1*| |\n|LwDP| [2.0.0](https://github.com/th2-net/th2-lw-data-provider/tree/dev-version-2) | Released | [2.x.y.z](https://github.com/th2-net/th2-ds-source-lwdp/tree/release-2.0) <br><br> [release](https://github.com/th2-net/th2-ds-source-lwdp/releases/tag/v2.1.0.3) |Canceled <br> *We decided to not implement GRPC version because it works more slowly than http*| groups + books & pages |\n|LwDP| _3.0.1_ <br> (actually 2.6.0+ with TP mode*) | Released | [3.0.1.0+](https://github.com/th2-net/th2-ds-source-lwdp/tree/release-3.0) <br><br> [release](https://github.com/th2-net/th2-ds-source-lwdp/releases/tag/v3.0.1.0) |Not supported by Impl| Transp proto |\n|LwDP| _3.1.0_ <br> (actually 2.6.0+ with TP mode*) | Released | [3.1.0.0+](https://github.com/th2-net/th2-ds-source-lwdp/tree/v3.1.0.1) <br><br> [release](https://github.com/th2-net/th2-ds-source-lwdp/releases/tag/v3.1.0.1) |Not supported by Impl| ds-impl 3.1.x.y is appeared because of few not backward compatible changes [https://github.com/th2-net/th2-ds-source-lwdp/releases/tag/v3.1.0.0](https://github.com/th2-net/th2-ds-source-lwdp/releases/tag/v3.1.0.0) |\n|LwDP| _3.1.1_ <br> (actually 2.12.0+ with TP mode*) | Released | [3.1.1.0+](https://github.com/th2-net/th2-ds-source-lwdp/tree/v3.1.1.0) <br><br> [release](https://github.com/th2-net/th2-ds-source-lwdp/releases/tag/v3.1.1.0) |Not supported by Impl| Added in LwDP <br> - `/download/events` endpoint to download events as file in JSONL format <br> - `EVENTS` resource option for `/download` task endpoint |\n|LwDP| _3.1.2_ <br> (actually 2.12.0+ with TP mode*) | Released | [3.1.2.0+](https://github.com/th2-net/th2-ds-source-lwdp/tree/3.1.2.0) <br><br> [release](https://github.com/th2-net/th2-ds-source-lwdp/releases/tag/3.1.2.0) |Not supported by Impl| Added `gzip: bool` parameter that indicates whether to include or not `gzip` in the Accept-Encoding header. |\n|LwDP| _3.1.3_ <br> | In progress | [3.1.3.0+](https://github.com/th2-net/th2-ds-source-lwdp/tree/master) <br><br> |Not supported by Impl| |\n\n\\* TP mode \u2013 transport protocol mode in LwDP.\n\n## 2.3. Release versioning\n\nImplementations versions have the following structure: `DataSourceMajorVersion`.`ImplVerison`\n\n`DataSourceMajorVersion` - the major version of LwDP the release uses\n\n`ImplVerison` - the version of data source implementation in `Major`.`Minor`.`Patch` versioning semantic style\n\nFor example `v1.0.1.0` is the version for LwDP `v1.x.y`. The implementation version `0.1.0`.\n\n# 3 Examples\n\n<!-- start get_started_example.py -->\n```python\nfrom typing import List\n\nfrom th2_data_services.event_tree import EventTreeCollection\n\nfrom th2_data_services.data_source.lwdp.commands import http as commands\nfrom th2_data_services.data_source.lwdp.data_source import DataSource\nfrom th2_data_services.data_source.lwdp.event_tree import ETCDriver\nfrom th2_data_services.data_source.lwdp.streams import Streams, Stream\nfrom th2_data_services.data import Data\nfrom datetime import datetime\nfrom th2_data_services.data_source.lwdp import Page\n\n# About this example\n# The following document shows common features of the library.\n# Read command's docstrings to know more about commands features.\n\n\n# Initialize some variables that will be used in this example.\nbook_id = \"demo_book_1\" # demo_book_1 is an example book from host namespace\npage_name = \"1\" # example page name from book demo_book_1\nscopes = [\"th2-scope\"] # Event scope - similar to stream for messages.\n\n# [0] Streams\n# Stream is a string that looks like `alias:direction`\n# - You can provide only aliases as streams, in this way all directions\n# will be requested for stream.\n# - Stream objects to set up exact direction.\n# - Streams object to set up exact direction for all aliases.\n# - Mix of them.\n\n# We can use a list of aliases.\nstreams = [\n \"default-message-producer-alias\",\n \"fix-demo-server1\",\n \"fix-demo-server2\",\n \"fix-client2\",\n \"fix-client1\",\n]\n# A list of Stream objects.\nstreams_list_with_stream_object = [\n Stream(\"default-message-producer-alias\", direction=1),\n Stream(\"fix-demo-server1\", direction=2),\n Stream(\"fix-demo-server2\"), # Both directions.\n Stream(\"fix-client1\", direction=1),\n Stream(\"fix-client2\", direction=1),\n]\n# Or a Streams object, which takes a list of aliases as argument.\nstreams_direction1 = Streams(streams, direction=1)\n\n\ngroups = streams # In this namespace groups and streams have same name.\n\n# Date has to be in utc timezone.\nSTART_TIME = datetime(year=2023, month=1, day=5, hour=13, minute=57, second=5, microsecond=0)\nEND_TIME = datetime(year=2023, month=1, day=5, hour=13, minute=57, second=6, microsecond=0)\n\n# [1] Create data source object to connect to lightweight data provider.\nprovider_url_link = \"http://10.100.66.105:32681\"\ndata_source = DataSource(provider_url_link)\n\n# [2] Getting books, pages, scopes, groups and aliases.\n\n# [2.1] Get books.\n# On database data is segregated with books, such as they never intersect.\n# To get the names of the books we have a command GetBooks which takes no argument.\nbooks: List[str] = data_source.command(commands.GetBooks())\n\n# [2.2] Get pages.\n# This command returns objects of Page class\n# GetPages with only book_id returns all pages.\npages_all: Data[Page] = data_source.command(commands.GetPages(book_id))\n\n# GetPages with timestamps returns all pages within that time frame.\npages: Data[Page] = data_source.command(commands.GetPages(book_id, START_TIME, END_TIME))\n\n# [2.3] Get scopes.\n# Some events are grouped by scopes, which we can get using GetScopes command.\nbook_scopes: List[str] = data_source.command(commands.GetEventScopes(book_id))\n\n# [2.4] Get message aliases.\naliases: List[str] = data_source.command(commands.GetMessageAliases(book_id))\n\n# [2.5] Get message groups.\nbook_groups: List[str] = data_source.command(commands.GetMessageGroups(book_id))\n\n# [3] Getting events and messages.\n\n# [3.1] Get events/messages by ID.\n# These commands will raise Exception if the event/message is not found.\n# If you don't want to get Exception, use `use_stub=True` commands parameter.\n# In this way you will get event/message stub.\n\n# [3.1.1] Get events by id.\nsingle_event: dict = data_source.command(\n commands.GetEventById(\n \"demo_book_1:th2-scope:20221226140719671764000:9c59694b-8526-11ed-8311-df33e1b504e4\"\n )\n)\nmultiple_events: List[dict] = data_source.command(\n commands.GetEventsById(\n [\n \"demo_book_1:th2-scope:20221226140719671764000:9c59694b-8526-11ed-8311-df33e1b504e4\",\n \"demo_book_1:th2-scope:20221226140723967243000:9ee8edcc-8526-11ed-8311-df33e1b504e4\",\n \"demo_book_1:th2-scope:20221226140724065522000:9ef7e1ed-8526-11ed-8311-df33e1b504e4\",\n ]\n )\n)\n\n# [3.1.2] Get messages by id.\nsingle_message: dict = data_source.command(\n commands.GetMessageById(\"case3:arfq02fix30:2:20221111165012889502000:1668182272676097251\")\n)\nmultiple_messages: List[dict] = data_source.command(\n commands.GetMessagesById(\n [\n \"case3:arfq02fix30:2:20221111165012889502000:1668182272676097251\",\n \"case3:arfq02fix30:2:20221111165252889876000:1668182272676097315\",\n ]\n )\n)\n\n# [3.2] Get events/messages by BOOK.\n\n# [3.2.1] Get events by BOOK, scopes and time interval.\nevents: Data[dict] = data_source.command(\n commands.GetEventsByBookByScopes(\n start_timestamp=START_TIME, end_timestamp=END_TIME, book_id=book_id, scopes=scopes\n )\n)\n\n# [3.2.2] Get messages by BOOK, streams and time interval.\n# streams: List of aliases to request. If direction is not specified all directions\n# will be requested for stream.\n# You can also use Stream and Streams classes to set up them (see streams section [0]).\nmessages_by_stream: Data[dict] = data_source.command(\n commands.GetMessagesByBookByStreams(\n start_timestamp=START_TIME,\n end_timestamp=END_TIME,\n streams=streams,\n book_id=book_id,\n )\n)\n\n# [3.2.3] Get messages by BOOK, groups and time interval.\nmessages_by_group: Data[dict] = data_source.command(\n commands.GetMessagesByBookByGroupsSse(\n start_timestamp=START_TIME, end_timestamp=END_TIME, groups=groups, book_id=book_id\n )\n)\n\n# [3.3] Get events/messages by PAGE.\n# This set of commands allows you to get data by specific page instead of datetime range.\n# GetByPage commands accept Page class objects as argument.\n# Alternatively they also accept page name with book id.\n\npage: Page = list(pages)[0]\n\nevents_by_page_by_scopes: Data[dict] = data_source.command(\n commands.GetEventsByPageByScopes(page=page, scopes=[\"th2-scope\"])\n)\nevents_by_page_name_by_scopes: Data[dict] = data_source.command(\n commands.GetEventsByPageByScopes(page=page_name, book_id=book_id, scopes=[\"th2-scope\"])\n)\n\nmessages_by_page_by_streams: Data[dict] = data_source.command(\n commands.GetMessagesByPageByStreams(page=page, stream=streams)\n)\nmessages_by_page_name_by_streams: Data[dict] = data_source.command(\n commands.GetMessagesByPageByStreams(page=page_name, book_id=book_id, stream=streams)\n)\n\nmessages_by_page_by_groups: Data[dict] = data_source.command(\n commands.GetMessagesByPageByGroupsSse(page=page, groups=groups)\n)\nmessages_by_page_name_by_groups: Data[dict] = data_source.command(\n commands.GetMessagesByPageByGroupsSse(page=page_name, book_id=book_id, groups=groups)\n)\n\n# [4] ETCDriver\n# To work with EventTreeCollection and its children we need to use special driver.\n# This driver contains lwdp-related methods that ETC required.\n\n# [4.1] Init driver\netc_driver = ETCDriver(data_source=data_source, use_stub=False)\n# [4.2] Init ETC object\netc = EventTreeCollection(etc_driver)\n\n# [4.3] Build Event Trees inside ETC and recover unknown events if it has them.\netc.build(events)\netc.recover_unknown_events()\n# See more info about how to use ETC in th2-data-services lib documentation.\n```\n<!-- end get_started_example.py -->\n\n## Changes in LwDP 3.* against LwDP 2.*\n\nChanges mostly affect how messages are represented in LwDP V3.\n\nIn V3 message id will also get a group section, and the new format will look like:\n`book:group:session_alias:direction:timestamp:sequence`\n\nThe Main changes are in the body field of a message:\n* METADATA\n * the metadata does not contain duplicated information from the top message (direction, sequence, timestamp, sessionId). Only subsequence, messageType and protocol are left (inside metadata).\n * metadata block doesn\u2019t have fixed structure. So if there is only 1 parsed was produced from raw message, metadata will not have this key.\n * ONLY messageType is required field in metadata.\n\n* BODY\n * The body is a collection (list) now (if the raw message produced a single parsed message, it will have only 1 element. If raw messages produced more than 1 message, all of them will be in that collection in the order they were produced).\n\n* subsequence\n * is always list.\n * may be missing in metadata. It means [1].\n\nHere is a small example on how to use expander to expand single message into multiple ones:\n\n```python\nfrom th2_data_services.data import Data\nfrom th2_data_services.data_source.lwdp.resolver import MessageFieldResolver\n# message in this example have 2 items in its body\nmessage = {\n \"timestamp\":{\"epochSecond\":1682680778,\"nano\":807953000},\n \"direction\":\"IN\",\n \"sessionId\":\"ouch_1_1\",\n \"attachedEventIds\":[],\n \"body\":[\n {\n \"metadata\":\n {\n \"subsequence\":[1],\n \"messageType\":\"SequencedDataPacket\",\n \"protocol\":\"protocol\"\n },\n \"fields\":\n {\n \"MessageLength\":55,\n \"MessageType\":83\n }\n },\n {\n \"metadata\":\n { \n \"subsequence\":[2],\n \"messageType\":\"OrderExecuted\",\n \"protocol\":\"protocol\"\n },\n \"fields\":\n {\n \"MessageType\":69,\n \"Timestamp\":1682399803928773173,\n \"OrderToken\":\"lzgjaynpgynbg1\",\n \"OrderBookID\":110616,\n \"TradedQuantity\":50,\n \"TradePrice\":5000,\n \"MatchID\":\"j\\ufffdh\\u0003\\u0000\\u0000\\u0000\\u0006\\u0000\\u0000\\u0000\",\n \"DealSource\":1,\n \"MatchAttributes\":5\n }\n }\n ],\n \"messageId\":\"store_perf_test:ouch_1_1:1:20230428111938807953000:1682680778806000001\"\n}\n\nmessage_data = Data([message])\nmfr = MessageFieldResolver()\nmessage_data = message_data.map(mfr.expand_message)\nprint(message_data) # we should now have 2 messages built from the body list of original message.\n```\n",
"bugtrack_url": null,
"license": "Apache License 2.0",
"summary": "th2_data_services_lwdp",
"version": "3.1.2.1",
"project_urls": {
"Homepage": "https://github.com/th2-net/th2-ds-source-lwdp"
},
"split_keywords": [],
"urls": [
{
"comment_text": null,
"digests": {
"blake2b_256": "45de2bba4b62d864153febdefc878680e7707db2c8ee177aa253b18108033a91",
"md5": "9bb9c49b3dd0cb012e5d9850f92bea8f",
"sha256": "e035b3e478bd252fe0fa77ddaf760a68dfa1343c1da30607bb1da8eb7a02f143"
},
"downloads": -1,
"filename": "th2_data_services_lwdp-3.1.2.1.tar.gz",
"has_sig": false,
"md5_digest": "9bb9c49b3dd0cb012e5d9850f92bea8f",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 36440,
"upload_time": "2025-10-21T11:44:30",
"upload_time_iso_8601": "2025-10-21T11:44:30.786547Z",
"url": "https://files.pythonhosted.org/packages/45/de/2bba4b62d864153febdefc878680e7707db2c8ee177aa253b18108033a91/th2_data_services_lwdp-3.1.2.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2025-10-21 11:44:30",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "th2-net",
"github_project": "th2-ds-source-lwdp",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"requirements": [
{
"name": "th2-data-services",
"specs": [
[
">=",
"2.0.0.dev8779326749"
],
[
"<",
"3"
]
]
},
{
"name": "orjson",
"specs": [
[
"<",
"4"
],
[
">=",
"3.10"
]
]
},
{
"name": "aiohttp",
"specs": [
[
"<",
"4"
],
[
">=",
"3.9.5"
]
]
},
{
"name": "nest-asyncio",
"specs": [
[
"<",
"2"
],
[
">=",
"1.6.0"
]
]
},
{
"name": "attrs",
"specs": [
[
"!=",
"24.3.0"
]
]
},
{
"name": "tenacity",
"specs": [
[
">=",
"9"
],
[
"<",
"10"
]
]
}
],
"lcname": "th2-data-services-lwdp"
}