th2-data-services-lwdp


Nameth2-data-services-lwdp JSON
Version 3.1.0.0 PyPI version JSON
download
home_pagehttps://github.com/th2-net/th2-ds-source-lwdp
Summaryth2_data_services_lwdp
upload_time2024-03-11 08:03:40
maintainer
docs_urlNone
authorTH2-devs
requires_python>=3.7
licenseApache License 2.0
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Lightweight Data Provider Data Source (major version 3).
# Introduction
This repository is an implementation of data-services data source for Lightweight Data Provider V3.

Most commands that this library provides can be seen in example below.

# Example
<!-- start get_started_example.py -->
```python
from typing import List

from th2_data_services.event_tree import EventTreeCollection

from th2_data_services.data_source.lwdp.commands import http as commands
from th2_data_services.data_source.lwdp.data_source import DataSource
from th2_data_services.data_source.lwdp.event_tree import ETCDriver
from th2_data_services.data_source.lwdp.streams import Streams, Stream
from th2_data_services.data import Data
from datetime import datetime
from th2_data_services.data_source.lwdp import Page

# About this example
#   The following document shows common features of the library.
#   Read command's docstrings to know more about commands features.


# Initialize some variables that will be used in this example.
book_id = "demo_book_1"  # demo_book_1 is an example book from host namespace
page_name = "1"  # example page name from book demo_book_1
scopes = ["th2-scope"]  # Event scope - similar to stream for messages.

# [0] Streams
#   Stream is a string that looks like `alias:direction`
#   - You can provide only aliases as streams, in this way all directions
#   will be requested for stream.
#   - Stream objects to set up exact direction.
#   - Streams object to set up exact direction for all aliases.
#   - Mix of them.

# We can use a list of aliases.
streams = [
    "default-message-producer-alias",
    "fix-demo-server1",
    "fix-demo-server2",
    "fix-client2",
    "fix-client1",
]
# A list of Stream objects.
streams_list_with_stream_object = [
    Stream("default-message-producer-alias", direction=1),
    Stream("fix-demo-server1", direction=2),
    Stream("fix-demo-server2"),  # Both directions.
    Stream("fix-client1", direction=1),
    Stream("fix-client2", direction=1),
]
# Or a Streams object, which takes a list of aliases as argument.
streams_direction1 = Streams(streams, direction=1)


groups = streams  # In this namespace groups and streams have same name.

# Date has to be in utc timezone.
START_TIME = datetime(year=2023, month=1, day=5, hour=13, minute=57, second=5, microsecond=0)
END_TIME = datetime(year=2023, month=1, day=5, hour=13, minute=57, second=6, microsecond=0)

# [1] Create data source object to connect to lightweight data provider.
provider_url_link = f"http://10.100.66.105:32681"
data_source = DataSource(provider_url_link)

# [2] Getting books, pages, scopes, groups and aliases.

# [2.1] Get books.
#   On database data is segregated with books, such as they never intersect.
#   To get the names of the books we have a command GetBooks which takes no argument.
books: List[str] = data_source.command(commands.GetBooks())

# [2.2] Get pages.
# This command returns objects of Page class
# GetPages with only book_id returns all pages.
pages_all: Data[Page] = data_source.command(commands.GetPages(book_id))

# GetPages with timestamps returns all pages within that time frame.
pages: Data[Page] = data_source.command(commands.GetPages(book_id, START_TIME, END_TIME))

# [2.3] Get scopes.
# Some events are grouped by scopes, which we can get using GetScopes command.
book_scopes: List[str] = data_source.command(commands.GetEventScopes(book_id))

# [2.4] Get message aliases.
aliases: List[str] = data_source.command(commands.GetMessageAliases(book_id))

# [2.5] Get message groups.
book_groups: List[str] = data_source.command(commands.GetMessageGroups(book_id))

# [3] Getting events and messages.

# [3.1] Get events/messages by ID.
#   These commands will raise Exception if the event/message is not found.
#   If you don't want to get Exception, use `use_stub=True` commands parameter.
#     In this way you will get event/message stub.

# [3.1.1] Get events by id.
single_event: dict = data_source.command(
    commands.GetEventById(
        "demo_book_1:th2-scope:20221226140719671764000:9c59694b-8526-11ed-8311-df33e1b504e4"
    )
)
multiple_events: List[dict] = data_source.command(
    commands.GetEventsById(
        [
            "demo_book_1:th2-scope:20221226140719671764000:9c59694b-8526-11ed-8311-df33e1b504e4",
            "demo_book_1:th2-scope:20221226140723967243000:9ee8edcc-8526-11ed-8311-df33e1b504e4",
            "demo_book_1:th2-scope:20221226140724065522000:9ef7e1ed-8526-11ed-8311-df33e1b504e4",
        ]
    )
)

# [3.1.2] Get messages by id.
single_message: dict = data_source.command(
    commands.GetMessageById("case3:arfq02fix30:2:20221111165012889502000:1668182272676097251")
)
multiple_messages: List[dict] = data_source.command(
    commands.GetMessagesById(
        [
            "case3:arfq02fix30:2:20221111165012889502000:1668182272676097251",
            "case3:arfq02fix30:2:20221111165252889876000:1668182272676097315",
        ]
    )
)

# [3.2] Get events/messages by BOOK.

# [3.2.1] Get events by BOOK, scopes and time interval.
events: Data[dict] = data_source.command(
    commands.GetEventsByBookByScopes(
        start_timestamp=START_TIME, end_timestamp=END_TIME, book_id=book_id, scopes=scopes
    )
)

# [3.2.2] Get messages by BOOK, streams and time interval.
#   streams: List of aliases to request. If direction is not specified all directions
#   will be requested for stream.
#   You can also use Stream and Streams classes to set up them (see streams section [0]).
messages_by_stream: Data[dict] = data_source.command(
    commands.GetMessagesByBookByStreams(
        start_timestamp=START_TIME,
        end_timestamp=END_TIME,
        streams=streams,
        book_id=book_id,
    )
)

# [3.2.3] Get messages by BOOK, groups and time interval.
messages_by_group: Data[dict] = data_source.command(
    commands.GetMessagesByBookByGroups(
        start_timestamp=START_TIME, end_timestamp=END_TIME, groups=groups, book_id=book_id
    )
)

# [3.3] Get events/messages by PAGE.
#   This set of commands allows you to get data by specific page instead of datetime range.
#   GetByPage commands accept Page class objects as argument.
#   Alternatively they also accept page name with book id.

page: Page = list(pages)[0]

events_by_page_by_scopes: Data[dict] = data_source.command(
    commands.GetEventsByPageByScopes(page=page, scopes=["th2-scope"])
)
events_by_page_name_by_scopes: Data[dict] = data_source.command(
    commands.GetEventsByPageByScopes(page=page_name, book_id=book_id, scopes=["th2-scope"])
)

messages_by_page_by_streams: Data[dict] = data_source.command(
    commands.GetMessagesByPageByStreams(page=page, stream=streams)
)
messages_by_page_name_by_streams: Data[dict] = data_source.command(
    commands.GetMessagesByPageByStreams(page=page_name, book_id=book_id, stream=streams)
)

messages_by_page_by_groups: Data[dict] = data_source.command(
    commands.GetMessagesByPageByGroups(page=page, groups=groups)
)
messages_by_page_name_by_groups: Data[dict] = data_source.command(
    commands.GetMessagesByPageByGroups(page=page_name, book_id=book_id, groups=groups)
)

# [4] ETCDriver
#   To work with EventTreeCollection and its children we need to use special driver.
#   This driver contains lwdp-related methods that ETC required.

# [4.1] Init driver
etc_driver = ETCDriver(data_source=data_source, use_stub=False)
# [4.2] Init ETC object
etc = EventTreeCollection(etc_driver)

# [4.3] Build Event Trees inside ETC and recover unknown events if it has them.
etc.build(events)
etc.recover_unknown_events()
# See more info about how to use ETC in th2-data-services lib documentation.

```
<!-- end get_started_example.py -->

# Changes in LwDP 3.0.0

Changes mostly affect how messages are represented in LwDP V3.

In V3 message id will also get a group section and new format will look like:
book:group:session_alias:direction:timestamp:sequence

Main changes are in body field of message:
* METADATA
  * the metadata does not contain duplicated information from the top message (direction, sequence, timestamp, sessionId). Only subsequence, messageType and protocol are left (inside metadata).
  * metadata block doesn’t have fixed structure. So if there is only 1 parsed was produced from raw message, metadata will not have this key.
  * ONLY messageType is required field in metadata.

* BODY
  * The body is a collection (list) now (if the raw message produced a single parsed message, it will have only 1 element. If raw messages produced more than 1 message, all of them will be in that collection in the order they were produced).

* subsequence
  * is always list.
  * may be missing in metadata. It means [1].

Here is a small example on how to use expander to expand single message into multiple ones:

```python
from th2_data_services.data import Data
from th2_data_services.data_source.lwdp.resolver import MessageFieldResolver
# message in this example have 2 items in its body
message = {
    "timestamp":{"epochSecond":1682680778,"nano":807953000},
    "direction":"IN",
    "sessionId":"ouch_1_1",
    "attachedEventIds":[],
    "body":[
        {
            "metadata":
            {
                "subsequence":[1],
                "messageType":"SequencedDataPacket",
                "protocol":"protocol"
            },
            "fields":
            {
                "MessageLength":55,
                "MessageType":83
            }
        },
        {
            "metadata":
            {   
                "subsequence":[2],
                "messageType":"OrderExecuted",
                "protocol":"protocol"
            },
            "fields":
            {
                "MessageType":69,
                "Timestamp":1682399803928773173,
                "OrderToken":"lzgjaynpgynbg1",
                "OrderBookID":110616,
                "TradedQuantity":50,
                "TradePrice":5000,
                "MatchID":"j\ufffdh\u0003\u0000\u0000\u0000\u0006\u0000\u0000\u0000",
                "DealSource":1,
                "MatchAttributes":5
            }
        }
    ],
    "messageId":"store_perf_test:ouch_1_1:1:20230428111938807953000:1682680778806000001"
}

message_data = Data([message])
mfr = MessageFieldResolver()
message_data = message_data.map(mfr.expand_message)
print(message_data) # we should now have 2 messages built from the body list of original message.
```
            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/th2-net/th2-ds-source-lwdp",
    "name": "th2-data-services-lwdp",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.7",
    "maintainer_email": "",
    "keywords": "",
    "author": "TH2-devs",
    "author_email": "th2-devs@exactprosystems.com",
    "download_url": "https://files.pythonhosted.org/packages/96/bb/184c047a84783d6c1c2faec2963c596c5195036a07cb8939c383aebeccb1/th2_data_services_lwdp-3.1.0.0.tar.gz",
    "platform": null,
    "description": "# Lightweight Data Provider Data Source (major version 3).\n# Introduction\nThis repository is an implementation of data-services data source for Lightweight Data Provider V3.\n\nMost commands that this library provides can be seen in example below.\n\n# Example\n<!-- start get_started_example.py -->\n```python\nfrom typing import List\n\nfrom th2_data_services.event_tree import EventTreeCollection\n\nfrom th2_data_services.data_source.lwdp.commands import http as commands\nfrom th2_data_services.data_source.lwdp.data_source import DataSource\nfrom th2_data_services.data_source.lwdp.event_tree import ETCDriver\nfrom th2_data_services.data_source.lwdp.streams import Streams, Stream\nfrom th2_data_services.data import Data\nfrom datetime import datetime\nfrom th2_data_services.data_source.lwdp import Page\n\n# About this example\n#   The following document shows common features of the library.\n#   Read command's docstrings to know more about commands features.\n\n\n# Initialize some variables that will be used in this example.\nbook_id = \"demo_book_1\"  # demo_book_1 is an example book from host namespace\npage_name = \"1\"  # example page name from book demo_book_1\nscopes = [\"th2-scope\"]  # Event scope - similar to stream for messages.\n\n# [0] Streams\n#   Stream is a string that looks like `alias:direction`\n#   - You can provide only aliases as streams, in this way all directions\n#   will be requested for stream.\n#   - Stream objects to set up exact direction.\n#   - Streams object to set up exact direction for all aliases.\n#   - Mix of them.\n\n# We can use a list of aliases.\nstreams = [\n    \"default-message-producer-alias\",\n    \"fix-demo-server1\",\n    \"fix-demo-server2\",\n    \"fix-client2\",\n    \"fix-client1\",\n]\n# A list of Stream objects.\nstreams_list_with_stream_object = [\n    Stream(\"default-message-producer-alias\", direction=1),\n    Stream(\"fix-demo-server1\", direction=2),\n    Stream(\"fix-demo-server2\"),  # Both directions.\n    Stream(\"fix-client1\", direction=1),\n    Stream(\"fix-client2\", direction=1),\n]\n# Or a Streams object, which takes a list of aliases as argument.\nstreams_direction1 = Streams(streams, direction=1)\n\n\ngroups = streams  # In this namespace groups and streams have same name.\n\n# Date has to be in utc timezone.\nSTART_TIME = datetime(year=2023, month=1, day=5, hour=13, minute=57, second=5, microsecond=0)\nEND_TIME = datetime(year=2023, month=1, day=5, hour=13, minute=57, second=6, microsecond=0)\n\n# [1] Create data source object to connect to lightweight data provider.\nprovider_url_link = f\"http://10.100.66.105:32681\"\ndata_source = DataSource(provider_url_link)\n\n# [2] Getting books, pages, scopes, groups and aliases.\n\n# [2.1] Get books.\n#   On database data is segregated with books, such as they never intersect.\n#   To get the names of the books we have a command GetBooks which takes no argument.\nbooks: List[str] = data_source.command(commands.GetBooks())\n\n# [2.2] Get pages.\n# This command returns objects of Page class\n# GetPages with only book_id returns all pages.\npages_all: Data[Page] = data_source.command(commands.GetPages(book_id))\n\n# GetPages with timestamps returns all pages within that time frame.\npages: Data[Page] = data_source.command(commands.GetPages(book_id, START_TIME, END_TIME))\n\n# [2.3] Get scopes.\n# Some events are grouped by scopes, which we can get using GetScopes command.\nbook_scopes: List[str] = data_source.command(commands.GetEventScopes(book_id))\n\n# [2.4] Get message aliases.\naliases: List[str] = data_source.command(commands.GetMessageAliases(book_id))\n\n# [2.5] Get message groups.\nbook_groups: List[str] = data_source.command(commands.GetMessageGroups(book_id))\n\n# [3] Getting events and messages.\n\n# [3.1] Get events/messages by ID.\n#   These commands will raise Exception if the event/message is not found.\n#   If you don't want to get Exception, use `use_stub=True` commands parameter.\n#     In this way you will get event/message stub.\n\n# [3.1.1] Get events by id.\nsingle_event: dict = data_source.command(\n    commands.GetEventById(\n        \"demo_book_1:th2-scope:20221226140719671764000:9c59694b-8526-11ed-8311-df33e1b504e4\"\n    )\n)\nmultiple_events: List[dict] = data_source.command(\n    commands.GetEventsById(\n        [\n            \"demo_book_1:th2-scope:20221226140719671764000:9c59694b-8526-11ed-8311-df33e1b504e4\",\n            \"demo_book_1:th2-scope:20221226140723967243000:9ee8edcc-8526-11ed-8311-df33e1b504e4\",\n            \"demo_book_1:th2-scope:20221226140724065522000:9ef7e1ed-8526-11ed-8311-df33e1b504e4\",\n        ]\n    )\n)\n\n# [3.1.2] Get messages by id.\nsingle_message: dict = data_source.command(\n    commands.GetMessageById(\"case3:arfq02fix30:2:20221111165012889502000:1668182272676097251\")\n)\nmultiple_messages: List[dict] = data_source.command(\n    commands.GetMessagesById(\n        [\n            \"case3:arfq02fix30:2:20221111165012889502000:1668182272676097251\",\n            \"case3:arfq02fix30:2:20221111165252889876000:1668182272676097315\",\n        ]\n    )\n)\n\n# [3.2] Get events/messages by BOOK.\n\n# [3.2.1] Get events by BOOK, scopes and time interval.\nevents: Data[dict] = data_source.command(\n    commands.GetEventsByBookByScopes(\n        start_timestamp=START_TIME, end_timestamp=END_TIME, book_id=book_id, scopes=scopes\n    )\n)\n\n# [3.2.2] Get messages by BOOK, streams and time interval.\n#   streams: List of aliases to request. If direction is not specified all directions\n#   will be requested for stream.\n#   You can also use Stream and Streams classes to set up them (see streams section [0]).\nmessages_by_stream: Data[dict] = data_source.command(\n    commands.GetMessagesByBookByStreams(\n        start_timestamp=START_TIME,\n        end_timestamp=END_TIME,\n        streams=streams,\n        book_id=book_id,\n    )\n)\n\n# [3.2.3] Get messages by BOOK, groups and time interval.\nmessages_by_group: Data[dict] = data_source.command(\n    commands.GetMessagesByBookByGroups(\n        start_timestamp=START_TIME, end_timestamp=END_TIME, groups=groups, book_id=book_id\n    )\n)\n\n# [3.3] Get events/messages by PAGE.\n#   This set of commands allows you to get data by specific page instead of datetime range.\n#   GetByPage commands accept Page class objects as argument.\n#   Alternatively they also accept page name with book id.\n\npage: Page = list(pages)[0]\n\nevents_by_page_by_scopes: Data[dict] = data_source.command(\n    commands.GetEventsByPageByScopes(page=page, scopes=[\"th2-scope\"])\n)\nevents_by_page_name_by_scopes: Data[dict] = data_source.command(\n    commands.GetEventsByPageByScopes(page=page_name, book_id=book_id, scopes=[\"th2-scope\"])\n)\n\nmessages_by_page_by_streams: Data[dict] = data_source.command(\n    commands.GetMessagesByPageByStreams(page=page, stream=streams)\n)\nmessages_by_page_name_by_streams: Data[dict] = data_source.command(\n    commands.GetMessagesByPageByStreams(page=page_name, book_id=book_id, stream=streams)\n)\n\nmessages_by_page_by_groups: Data[dict] = data_source.command(\n    commands.GetMessagesByPageByGroups(page=page, groups=groups)\n)\nmessages_by_page_name_by_groups: Data[dict] = data_source.command(\n    commands.GetMessagesByPageByGroups(page=page_name, book_id=book_id, groups=groups)\n)\n\n# [4] ETCDriver\n#   To work with EventTreeCollection and its children we need to use special driver.\n#   This driver contains lwdp-related methods that ETC required.\n\n# [4.1] Init driver\netc_driver = ETCDriver(data_source=data_source, use_stub=False)\n# [4.2] Init ETC object\netc = EventTreeCollection(etc_driver)\n\n# [4.3] Build Event Trees inside ETC and recover unknown events if it has them.\netc.build(events)\netc.recover_unknown_events()\n# See more info about how to use ETC in th2-data-services lib documentation.\n\n```\n<!-- end get_started_example.py -->\n\n# Changes in LwDP 3.0.0\n\nChanges mostly affect how messages are represented in LwDP V3.\n\nIn V3 message id will also get a group section and new format will look like:\nbook:group:session_alias:direction:timestamp:sequence\n\nMain changes are in body field of message:\n* METADATA\n  * the metadata does not contain duplicated information from the top message (direction, sequence, timestamp, sessionId). Only subsequence, messageType and protocol are left (inside metadata).\n  * metadata block doesn\u2019t have fixed structure. So if there is only 1 parsed was produced from raw message, metadata will not have this key.\n  * ONLY messageType is required field in metadata.\n\n* BODY\n  * The body is a collection (list) now (if the raw message produced a single parsed message, it will have only 1 element. If raw messages produced more than 1 message, all of them will be in that collection in the order they were produced).\n\n* subsequence\n  * is always list.\n  * may be missing in metadata. It means [1].\n\nHere is a small example on how to use expander to expand single message into multiple ones:\n\n```python\nfrom th2_data_services.data import Data\nfrom th2_data_services.data_source.lwdp.resolver import MessageFieldResolver\n# message in this example have 2 items in its body\nmessage = {\n    \"timestamp\":{\"epochSecond\":1682680778,\"nano\":807953000},\n    \"direction\":\"IN\",\n    \"sessionId\":\"ouch_1_1\",\n    \"attachedEventIds\":[],\n    \"body\":[\n        {\n            \"metadata\":\n            {\n                \"subsequence\":[1],\n                \"messageType\":\"SequencedDataPacket\",\n                \"protocol\":\"protocol\"\n            },\n            \"fields\":\n            {\n                \"MessageLength\":55,\n                \"MessageType\":83\n            }\n        },\n        {\n            \"metadata\":\n            {   \n                \"subsequence\":[2],\n                \"messageType\":\"OrderExecuted\",\n                \"protocol\":\"protocol\"\n            },\n            \"fields\":\n            {\n                \"MessageType\":69,\n                \"Timestamp\":1682399803928773173,\n                \"OrderToken\":\"lzgjaynpgynbg1\",\n                \"OrderBookID\":110616,\n                \"TradedQuantity\":50,\n                \"TradePrice\":5000,\n                \"MatchID\":\"j\\ufffdh\\u0003\\u0000\\u0000\\u0000\\u0006\\u0000\\u0000\\u0000\",\n                \"DealSource\":1,\n                \"MatchAttributes\":5\n            }\n        }\n    ],\n    \"messageId\":\"store_perf_test:ouch_1_1:1:20230428111938807953000:1682680778806000001\"\n}\n\nmessage_data = Data([message])\nmfr = MessageFieldResolver()\nmessage_data = message_data.map(mfr.expand_message)\nprint(message_data) # we should now have 2 messages built from the body list of original message.\n```",
    "bugtrack_url": null,
    "license": "Apache License 2.0",
    "summary": "th2_data_services_lwdp",
    "version": "3.1.0.0",
    "project_urls": {
        "Homepage": "https://github.com/th2-net/th2-ds-source-lwdp"
    },
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "96bb184c047a84783d6c1c2faec2963c596c5195036a07cb8939c383aebeccb1",
                "md5": "9dc628033b4ada78565437ffa083e4a8",
                "sha256": "0849fdccca703ddd060bd2793d88382ea30fbdef3ddcafd4c8c39d0d2fa6790d"
            },
            "downloads": -1,
            "filename": "th2_data_services_lwdp-3.1.0.0.tar.gz",
            "has_sig": false,
            "md5_digest": "9dc628033b4ada78565437ffa083e4a8",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.7",
            "size": 29689,
            "upload_time": "2024-03-11T08:03:40",
            "upload_time_iso_8601": "2024-03-11T08:03:40.816937Z",
            "url": "https://files.pythonhosted.org/packages/96/bb/184c047a84783d6c1c2faec2963c596c5195036a07cb8939c383aebeccb1/th2_data_services_lwdp-3.1.0.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-03-11 08:03:40",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "th2-net",
    "github_project": "th2-ds-source-lwdp",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "th2-data-services-lwdp"
}
        
Elapsed time: 0.27116s