py-sc-kpm


Namepy-sc-kpm JSON
Version 0.3.0 PyPI version JSON
download
home_pagehttps://github.com/ostis-ai/py-sc-kpm
SummaryThe Python implementation of the knowledge processing module for knowledge manipulations
upload_time2024-07-29 17:24:16
maintainerNone
docs_urlNone
authorostis-ai
requires_python<4,>=3.8
licenseMIT
keywords sc-kpm kpm knowledge processing
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # py-sc-kpm

The python implementation of the knowledge processing machine (kpm)
for [sc-machine](https://github.com/ostis-ai/sc-machine).
Library provides tools for interacting with knowledge bases.
Communication with server is implemented in separate library
named [py-sc-client](https://github.com/ostis-ai/py-sc-client).
This module is compatible with 0.9.0 version
of [OSTIS Platform](https://github.com/ostis-ai/ostis-web-platform).

# API Reference

1. [Classes](#classes)
   + [ScKeynodes](#sckeynodes)
   + [ScAgent](#scagent-and-scagentclassic)
   + [ScModule](#scmodule)
   + [ScServer](#scserver)
2. [Utils](#utils)
   + [Common utils](#common-utils)
   + [Creating utils](#creating-utils)
   + [Retrieve utils](#retrieve-utils)
   + [Action utils](#action-utils)
3. [Use-cases](#use-cases)

## Classes

The library contains the python implementation of useful classes and functions to work with the sc-memory.

### ScKeynodes

Class which provides the ability to cache the identifier and ScAddr of keynodes stored in the KB.

```python
from sc_client.constants import sc_types
from sc_kpm import ScKeynodes

# Get the provided identifier
ScKeynodes["identifier_of_keynode"]  # Returns an ScAddr of the given identifier

# Get the unprovided identifier
ScKeynodes["not_stored_in_kb"]  # Raises InvalidValueError if an identifier doesn't exist in the KB
ScKeynodes.get("not_stored_in_kb")  # Returns an invalid ScAddr(0) in the same situation

# Resolve identifier
ScKeynodes.resolve("my_class_node", sc_types.NODE_CONST_CLASS)  # Returns the element if it exists, otherwise creates
ScKeynodes.resolve("some_node", None)  # Returns the element if it exists, otherwise returns an invalid ScAddr(0)

# Delete identifier
ScKeynodes.delete("identifier_to_delete")  # Delete keynode from kb and ScKeynodes cache

# Get rrel node
ScKeynodes.rrel_index(1)  # Returns valid ScAddr of 'rrel_1'
ScKeynodes.rrel_index(11)  # Raises KeyError if index more than 10
ScKeynodes.rrel_index("some_str")  # Raises TypeError if index is not int
```

### ScAgent and ScAgentClassic

A classes for handling a single ScEvent. Define your agents like this:

```python
from sc_client.models import ScAddr
from sc_kpm import ScAgent, ScAgentClassic, ScResult


class ScAgentTest(ScAgent):
    def on_event(self, class_node: ScAddr, edge: ScAddr, action_node: ScAddr) -> ScResult:
        ...
        return ScResult.OK


class ScAgentClassicTest(ScAgentClassic):
    def on_event(self, class_node: ScAddr, edge: ScAddr, action_node: ScAddr) -> ScResult:
        # ScAgentClassic automatically checks its action
        ...
        return ScResult.OK
```

For the ScAgent initialization you should define the sc-element and the type of the ScEvent.

For the ScAgentClassic initialization
you should define the identifier of the action class node and arguments of the ScAgent.
`event_class` is set to the `action_initiated` keynode by default.
`event_type` is set to the `ScEventType.ADD_OUTGOING_EDGE` type by default.

**ClassicScAgent checks its action element automatically and doesn't run `on_event` method if checking fails.**

```python
from sc_client.constants import sc_types
from sc_client.constants.common import ScEventType
from sc_kpm import ScKeynodes

action_class = ScKeynodes.resolve("test_class", sc_types.NODE_CONST_CLASS)
agent = ScAgentTest(action_class, ScEventType.ADD_OUTGOING_EDGE)

classic_agent = ScAgentClassicTest("classic_test_class")
classic_agent_ingoing = ScAgentClassicTest("classic_test_class", ScEventType.ADD_INGOING_EDGE)
```

### ScModule

A class for handling multiple ScAgent objects.
Define your modules like this:

```python
from sc_kpm import ScModule

module = ScModule(
    agent1,
    agent2,
)
...
module.add_agent(agent3)
...
module.remove_agent(agent3)
```

_Note: you don't need remove agents in the end of program._

### ScServer

A class for serving, register ScModule objects.

Firstly you need connect to server. You can use connect/disconnect methods:

```python
from sc_kpm import ScServer

SC_SERVER_URL = "ws://localhost:8090/ws_json"
server = ScServer(SC_SERVER_URL)
server.connect()
...
server.disconnect()
```

Or with-statement. We recommend it because it easier to use, and it's safe:

```python
from sc_kpm import ScServer

SC_SERVER_URL = "ws://localhost:8090/ws_json"
server = ScServer(SC_SERVER_URL)
with server.connect():
    ...
```

After connection, you can add and remove your modules. Manage your modules like this:

```python
...
with server.connect():
    module = ScModule(...)
    server.add_modules(module)
    ...
    server.remove_modules(module)
```

But the modules are still not registered. For this use register_modules/unregister_modules methods:

```python
...
with server.connect():
    ...
    server.register_modules()
    ...
    server.unregister_modules()
```

Or one mode with-statement.
We also recommend to use so because it guarantees a safe agents unregistration if errors occur:

```python
...
with server.connect():
    ...
    with server.register_modules():
        ...
```

If you needn't separate connecting and registration, you can do it all using one command:

```python
with server.start():
    ...
# or
server.start()
...
server.stop()
```

There is also method for stopping program until a SIGINT signal (or ^C, or terminate in IDE) is received.
So you can leave agents registered for a long time:

```python
...
with server.connect():
    # Creating some agents
    with server.register_modules():
        # Registration some agents
        server.serve()  # Agents will be active until ^C
```

### ScSets

Sc-set is a construction that presents main node called `set_node` and linked elements.
There is no limit in types for sc-set elements:

![sc-set example](docs/schemes/png/set.png)

#### ScSet

- *sc_kpm.sc_sets*.**ScSet**

Class for handling simple sc-sets.
It is the parent class for `ScOrientedSet` and `ScNumberedSet`

Methods and properties:

1. *ScSet*(*elements: ScAddr, set_node: ScAddr = None, set_node_type: ScType = None) -> None

   Constructor of sc-set receives all elements to add, optional `set_node` and optional `set_node_type`.
   If you don't specify `set_node`, it will be created with `set_node_type` (default NODE_CONST).

2. *ScSet*.**add**(*elements: ScAddr) -> None

   Add elements to the end of sc-set construction.

3. *ScSet*.**set_node** -> ScAddr

   Property to give the **main node** of sc-set.

4. *ScSet* == *ScSet* -> bool

   Check sc-sets have the same set_nodes.

5. *ScSet*.**elements_set** -> Set[ScAddr]

   Property to give all elements from sc-set as a set.
   Use it if you don't need order in ordered sets.

6. **iter**(*ScSet*) -> Iterator[ScAddr]

   Dunder method for iterating by sc-set.
   Lazy algorithm (ScOrientedSet and ScNumberedSet).

7. **len**(*ScSet*) -> int

   Fast dunder method to give **count of elements** (power of sc-set).

8. **bool**(*ScSet*) -> bool

   Dunder method for if-statement: True if there are elements in sc-set.

9. *ScSet*.**is_empty**() -> bool

   True if there are **no elements** in sc-set.

10. *ScAddr* **in** *ScSet* -> bool

    Dunder method: True if sc-set contains element.

11. *ScSet*.**clear**() -> bool

    Remove all elements from sc-set.

12. *ScSet*.**remove**(*elements: ScAddr) -> None

    Remove elements from sc-set.
    *WARNING*: method isn't optimized in ScOrientedSet and ScNumberedSet

```python
from sc_client.constants import sc_types
from sc_client.models import ScAddr

from sc_kpm.sc_sets import ScSet
from sc_kpm.utils import create_node, create_nodes

# Example elements to add
example_set_node: ScAddr = create_node(sc_types.NODE_CONST)
elements: list[ScAddr] = create_nodes(*[sc_types.NODE_CONST] * 5)

# Init sc-set and add elements
empty_set = ScSet()

set_with_elements = ScSet(elements[0], elements[1])
set_with_elements.add(elements[2], elements[3])
set_with_elements.add(elements[4])

set_with_elements_and_set_node = ScSet(elements[2], set_node=example_set_node)
empty_set_with_specific_set_node_type = ScSet(set_node_type=sc_types.NODE_VAR)

# Get set node
set_node = empty_set.set_node
assert set_with_elements_and_set_node.set_node == example_set_node

# Get elements: list and set
assert set_with_elements.elements_set == set(elements)  # set view faster and safe

# Iterate by elements
for element in set_with_elements:
    print(element)

# Length, bool, is_empty, in
assert len(set_with_elements) == len(elements)
assert bool(set_with_elements)
assert not set_with_elements.is_empty()
assert empty_set.is_empty()
assert elements[2] in set_with_elements

# Clear and remove
set_with_elements.remove(elements[4])
assert len(set_with_elements), len(elements) - 1
set_with_elements.clear()
assert set_with_elements.is_empty()
```

##### ScStructure

If `set_node` has type `sc_types.NODE_CONST_STRUCT` construction is called sc-structure
and looks like a loop in SCg:

![structure](docs/schemes/png/structure.png)
![structure2](docs/schemes/png/structure_circuit.png)

- *sc_kpm*.**ScStructure**

Class for handling structure construction in the kb.
The same logic as in `ScSet`, but *set_node_type* if set NODE_CONST_STRUCT.
There are checks that set node has struct sc-type:

```python
from sc_kpm.sc_sets import ScStructure

sc_struct = ScStructure(..., set_node=..., set_node_type=...)

sc_struct = ScStructure(..., set_node=create_node(sc_types.NODE_CONST))  # InvalidTypeError - not struct type
sc_struct = ScStructure(..., set_node_type=sc_types.NODE_CONST)  # InvalidTypeError - not struct type
```

#### Ordered sc-sets

ScOrientedSet and ScNumberedSet are ordered sc-constructions.
Child classes of ScSet, have the order in iteration and get list elements:

- *Sc{ordered}Set*.**elements_set** -> List[ScAddr]

  Get the list of all elements with order

##### ScOrientedSet

- *sc_kpm*.**ScOrientedSet**

![oriented_set_example](docs/schemes/png/oriented_set.png)

Class for handling sc-oriented-set construction.
Has marked edges between edges from set_node to elements.
Easy lazy iterating.
No access by index.

```python
from sc_client.constants import sc_types

from sc_kpm.sc_sets import ScOrientedSet
from sc_kpm.utils import create_nodes


elements = create_nodes(*[sc_types.NODE_CONST] * 5)
numbered_set = ScOrientedSet(*elements)
assert numbered_set.elements_list == elements
```

##### ScNumberedSet

- *sc_kpm*.**ScNumberedSet**

![numbered_set_example](docs/schemes/png/numbered_set.png)

Class for handling sc-numbered-set construction.
Set-node is edged with numerating each element with rrel node.
Easy access to elements by index (index i is marked with rrel(i + 1))

```python
from sc_client.constants import sc_types

from sc_kpm.sc_sets import ScNumberedSet
from sc_kpm.utils import create_nodes


elements = create_nodes(*[sc_types.NODE_CONST] * 5)
numbered_set = ScNumberedSet(*elements)
assert numbered_set.elements_list == elements
assert numbered_set[2] == elements[2]
numbered_set[5]  # raise KeyError
```

## Utils

There are some functions for working with nodes, edges, links: create them, search, get content, delete, etc.
There is also possibility to wrap in set or oriented set.

## Common utils

There are utils to work with basic elements

_You can import these utils from `sc_kpm.utils`_

### Nodes creating

If you want to create one or more nodes use these functions with type setting argument:

```python
def create_node(node_type: ScType, sys_idtf: str = None) -> ScAddr: ...


def create_nodes(*node_types: ScType) -> List[ScAddr]: ...
```

`sys_idtf` is optional name of keynode if you want to add it there.

```python
from sc_client.constants import sc_types
from sc_kpm import ScKeynodes
from sc_kpm.utils.common_utils import create_node, create_nodes

lang = create_node(sc_types.NODE_CONST_CLASS)  # ScAddr(...)
lang_en = create_node(sc_types.NODE_CONST_CLASS)  # ScAddr(...)
assert lang.is_valid() and lang_en.is_valid()
elements = create_nodes(sc_types.NODE_CONST, sc_types.NODE_VAR)  # [ScAddr(...), ScAddr(...)]
assert len(elements) == 2
assert all(element.is_valid() for element in elements)
```

### Edges creating

For creating edge between **src** and **trg** with setting its type use **create_edge** function:

```python
def create_edge(edge_type: ScType, src: ScAddr, trg: ScAddr) -> ScAddr: ...

def create_edges(edge_type: ScType, src: ScAddr, *targets: ScAddr) -> List[ScAddr]: ...
```

```python
from sc_client.constants import sc_types
from sc_kpm.utils import create_nodes
from sc_kpm.utils import create_edge, create_edges

src, trg, trg2, trg3 = create_nodes(*[sc_types.NODE_CONST] * 4)
edge = create_edge(sc_types.EDGE_ACCESS_CONST_POS_PERM, src, trg)  # ScAddr(...)
edges = create_edges(sc_types.EDGE_ACCESS_CONST_POS_PERM, src, trg2, trg3)  # [ScAddr(...), ScAddr(...)]
assert edge.is_valid()
assert all(edges)
```

Function **is_valid()** is used for validation addresses of nodes or edges.

### Links creating

For creating links with string type content (by default) use these functions:

```python
def create_link(
        content: Union[str, int],
        content_type: ScLinkContentType = ScLinkContentType.STRING,
        link_type: ScType = sc_types.LINK_CONST
) -> ScAddr: ...


def create_links(
        *contents: Union[str, int],
        content_type: ScLinkContentType = ScLinkContentType.STRING,
        link_type: ScType = sc_types.LINK_CONST,
) -> List[ScAddr]: ...
```

You may use **ScLinkContentType.STRING** and **ScLinkContentType.INT** types for content of created links.

```python
from sc_client.constants import sc_types
from sc_client.models import ScLinkContentType
from sc_kpm.utils import create_link, create_links

msg = create_link("hello")  # ScAddr(...)
four = create_link(4, ScLinkContentType.INT)  # ScAddr(...)
water = create_link("water", link_type=sc_types.LINK_VAR)  # ScAddr(...)
names = create_links("Sam", "Pit")  # [ScAddr(...), ScAddr(...)]
```

### Relations creating

Create different binary relations with these functions:

```python
def create_binary_relation(edge_type: ScType, src: ScAddr, trg: ScAddr, *relations: ScAddr) -> ScAddr: ...


def create_role_relation(src: ScAddr, trg: ScAddr, *rrel_nodes: ScAddr) -> ScAddr: ...


def create_norole_relation(src: ScAddr, trg: ScAddr, *nrel_nodes: ScAddr) -> ScAddr: ...
```

```python
from sc_client.constants import sc_types
from sc_kpm import ScKeynodes
from sc_kpm.utils import create_node, create_nodes
from sc_kpm.utils import create_binary_relation, create_role_relation, create_norole_relation

src, trg = create_nodes(*[sc_types.NODE_CONST] * 2)
increase_relation = create_node(sc_types.NODE_CONST_CLASS)

brel = create_binary_relation(sc_types.EDGE_ACCESS_CONST_POS_PERM, src, trg, increase_relation)  # ScAddr(...)
rrel = create_role_relation(src, trg, ScKeynodes.rrel_index(1))  # ScAddr(...)
nrel = create_norole_relation(src, trg, create_node(sc_types.NODE_CONST_NOROLE))  # ScAddr(...)
```

### Deleting utils

If you want to remove all edges between two nodes, which define by their type use

```python
def delete_edges(source: ScAddr, target: ScAddr, *edge_types: ScType) -> bool: ...
```

It return **True** if operations was successful and **False** otherwise.

```python
from sc_client.constants import sc_types
from sc_kpm.utils import create_nodes, create_edge
from sc_kpm.utils import delete_edges

src, trg = create_nodes(*[sc_types.NODE_CONST] * 2)
edge = create_edge(sc_types.EDGE_ACCESS_CONST_POS_PERM, src, trg)
delete_edges(src, trg, sc_types.EDGE_ACCESS_CONST_POS_PERM)  # True
```

### Getting edges

For getting edge or edges between two nodes use:

```python
def get_edge(source: ScAddr, target: ScAddr, edge_type: ScType) -> ScAddr: ...


def get_edges(source: ScAddr, target: ScAddr, *edge_types: ScType) -> List[ScAddr]: ...
```

_**NOTE: Use VAR type instead of CONST in getting utils**_

```python
from sc_client.constants import sc_types
from sc_kpm.utils import create_nodes, create_edge
from sc_kpm.utils import get_edge, get_edges

src, trg = create_nodes(*[sc_types.NODE_CONST] * 2)
edge1 = create_edge(sc_types.EDGE_ACCESS_CONST_POS_PERM, src, trg)
edge2 = create_edge(sc_types.EDGE_D_COMMON_CONST, src, trg)

class_edge = get_edge(src, trg, sc_types.EDGE_ACCESS_VAR_POS_PERM)  # ScAddr(...)
assert class_edge == edge1
edges = get_edges(src, trg, sc_types.EDGE_ACCESS_VAR_POS_PERM, sc_types.EDGE_D_COMMON_VAR)  # [ScAddr(...), ScAddr(...)]
assert edges == [edge1, edge2]
```

### Getting elements by relation

Get target element by source element and relation:

```python
def get_element_by_role_relation(src: ScAddr, rrel_node: ScAddr) -> ScAddr: ...


def get_element_by_norole_relation(src: ScAddr, nrel_node: ScAddr) -> ScAddr: ...
```

```python
from sc_client.constants import sc_types
from sc_kpm import ScKeynodes
from sc_kpm.identifiers import CommonIdentifiers
from sc_kpm.utils import create_nodes, create_role_relation, create_norole_relation
from sc_kpm.utils import get_element_by_role_relation, get_element_by_norole_relation

src, trg_rrel, trg_nrel = create_nodes(*[sc_types.NODE_CONST] * 3)
rrel = create_role_relation(src, trg_rrel, ScKeynodes.rrel_index(1))  # ScAddr(...)
nrel = create_norole_relation(src, trg_nrel, ScKeynodes[CommonIdentifiers.NREL_SYSTEM_IDENTIFIER])  # ScAddr(...)

result_rrel = get_element_by_role_relation(src, ScKeynodes.rrel_index(1))  # ScAddr(...)
assert result_rrel == trg_rrel
result_nrel = get_element_by_norole_relation(src, ScKeynodes[CommonIdentifiers.NREL_SYSTEM_IDENTIFIER])  # ScAddr(...)
assert result_nrel == trg_nrel
```

### Getting link content

For existed links you may get their content by address with this function:

```python
def get_link_content(link: ScAddr) -> Union[str, int]: ...
```

```python
from sc_kpm.utils import create_link
from sc_kpm.utils import get_link_content_data

water = create_link("water")
content = get_link_content_data(water)  # "water"
```

### Getting system identifier

For getting system identifier of keynode use:

```python
def get_system_idtf(addr: ScAddr) -> str: ...
```

```python
from sc_client.constants import sc_types
from sc_kpm import ScKeynodes
from sc_kpm.utils import create_node
from sc_kpm.utils import get_system_idtf

lang_en = create_node(sc_types.NODE_CONST_CLASS)  # ScAddr(...)
idtf = get_system_idtf(lang_en)  # "lang_en"
assert ScKeynodes[idtf] == lang_en
```

## Action utils

Utils to work with actions, events and agents

![agent_base](docs/schemes/png/agent_base.png)

### Check action class

```python
def check_action_class(action_class: Union[ScAddr, Idtf], action_node: ScAddr) -> bool: ...
```

This function returns **True** if action class has connection to action node.
You can use identifier of action class instead of ScAddr.
This function should not be used in the ScAgentClassic.

![check action class](docs/schemes/png/check_action_class.png)

```python
from sc_client.constants import sc_types
from sc_kpm import ScKeynodes
from sc_kpm.identifiers import CommonIdentifiers
from sc_kpm.utils import create_node, create_edge
from sc_kpm.utils.action_utils import check_action_class

action_node = create_node(sc_types.NODE_CONST)
create_edge(sc_types.EDGE_ACCESS_CONST_POS_PERM, ScKeynodes[CommonIdentifiers.ACTION], action_node)
action_class = create_node(sc_types.NODE_CONST_CLASS)
create_edge(sc_types.EDGE_ACCESS_CONST_POS_PERM, action_class, action_node)

assert check_action_class(action_class, action_node)
# or
assert check_action_class("some_classification", action_node)
```

### Get action arguments

For getting list of action arguments concatenated by `rrel_[1 -> count]` use:

```python
def get_action_arguments(action_class: Union[ScAddr, Idtf], count: int) -> List[ScAddr]: ...
```

![check action class](docs/schemes/png/get_arguments.png)

```python
from sc_client.constants import sc_types
from sc_kpm import ScKeynodes
from sc_kpm.identifiers import CommonIdentifiers
from sc_kpm.utils import create_node, create_edge, create_role_relation
from sc_kpm.utils.action_utils import get_action_arguments

action_node = create_node(sc_types.NODE_CONST)

# Static argument
argument1 = create_node(sc_types.NODE_CONST)
create_role_relation(action_node, argument1, ScKeynodes.rrel_index(1))

# Dynamic argument
dynamic_node = create_node(sc_types.NODE_CONST)
rrel_dynamic_arg = ScKeynodes[CommonIdentifiers.RREL_DYNAMIC_ARGUMENT]
create_role_relation(action_node, dynamic_node, rrel_dynamic_arg, ScKeynodes.rrel_index(2))
argument2 = create_node(sc_types.NODE_CONST)
create_edge(sc_types.EDGE_ACCESS_CONST_POS_TEMP, dynamic_node, argument2)

arguments = get_action_arguments(action_node, 2)
assert arguments == [argument1, dynamic_node]
```

### Create and get action answer

```python
def create_action_answer(action_node: ScAddr, *elements: ScAddr) -> None: ...


def get_action_answer(action_node: ScAddr) -> ScAddr: ...
```

Create and get structure with output of action

![agent answer](docs/schemes/png/agent_answer.png)

```python
from sc_client.constants import sc_types
from sc_kpm.utils import create_node
from sc_kpm.utils.action_utils import create_action_answer, get_action_answer
from sc_kpm.sc_sets import ScStructure

action_node = create_node(sc_types.NODE_CONST_STRUCT)
answer_element = create_node(sc_types.NODE_CONST_STRUCT)
create_action_answer(action_node, answer_element)
result = get_action_answer(action_node)
result_elements = ScStructure(result).elements_set
assert result_elements == {answer_element}
```

### Call, execute and wait agent

Agent call function: creates **action node** with some arguments, concepts and connects it to the node with initiation identifier.
Returns **action node**

```python
def call_agent(
        arguments: Dict[ScAddr, IsDynamic],
        concepts: List[Idtf],
        initiation: Idtf = ActionStatus.ACTION_INITIATED,
) -> ScAddr: ...
```

Agent wait function: Waits for creation of edge to reaction node for some seconds.
Default reaction_node is `action_finished`.

```python
def wait_agent(seconds: float, action_node: ScAddr, reaction_node: ScAddr = None) -> None: ...
```

Agent execute function: combines two previous functions -- calls, waits and returns action node and **True** if success

```python
def execute_agent(
        arguments: Dict[ScAddr, IsDynamic],
        concepts: List[Idtf],
        initiation: Idtf = ActionStatus.ACTION_INITIATED,
        reaction: Idtf = ActionStatus.ACTION_FINISHED_SUCCESSFULLY,
        wait_time: int = COMMON_WAIT_TIME,  # 5
) -> Tuple[ScAddr, bool]: ...
```


![execute_agent](docs/schemes/png/execute_agent.png)

```python
from sc_client.models import ScLinkContentType
from sc_kpm import ScKeynodes
from sc_kpm.identifiers import CommonIdentifiers, ActionStatus
from sc_kpm.utils import create_link
from sc_kpm.utils.action_utils import execute_agent, call_agent, wait_agent

arg1 = create_link(2, ScLinkContentType.INT)
arg2 = create_link(3, ScLinkContentType.INT)

kwargs = dict(
    arguments={arg1: False, arg2: False},
    concepts=[CommonIdentifiers.ACTION, "some_class_name"],
)

action = call_agent(**kwargs)  # ScAddr(...)
wait_agent(3, action, ScKeynodes[ActionStatus.ACTION_FINISHED])
# or
action, is_successfully = execute_agent(**kwargs, wait_time=3)  # ScAddr(...), bool
```

### Create, call and execute action

Functions that allow to create action and add arguments and call later.

Function that creates action with concepts and return its ScAddr.

```python
def create_action(*concepts: Idtf) -> ScAddr: ...
```

Function that creates arguments of action

```python
def add_action_arguments(action_node: ScAddr, arguments: Dict[ScAddr, IsDynamic]) -> None: ...
```

Now you can call or execute action.
Action call functions don't return action_node because it's parameter to them.

```python
def call_action(
        action_node: ScAddr, initiation: Idtf = ActionStatus.ACTION_INITIATED
) -> None: ...
```

```python
def execute_action(
        action_node: ScAddr,
        initiation: Idtf = ActionStatus.ACTION_INITIATED,
        reaction: Idtf = ActionStatus.ACTION_FINISHED_SUCCESSFULLY,
        wait_time: float = COMMON_WAIT_TIME,
) -> bool: ...
```

Example:

```python
from sc_client.models import ScLinkContentType
from sc_kpm import ScKeynodes
from sc_kpm.identifiers import CommonIdentifiers, ActionStatus
from sc_kpm.utils import create_link
from sc_kpm.utils.action_utils import add_action_arguments, call_action, create_action, execute_action, wait_agent

arg1 = create_link(2, ScLinkContentType.INT)
arg2 = create_link(3, ScLinkContentType.INT)

action_node = create_action(CommonIdentifiers.ACTION, "some_class_name")  # ScAddr(...)
# Do something here
arguments = {arg1: False, arg2: False}

add_action_arguments(action_node, arguments)
call_action(action_node)
wait_agent(3, action_node, ScKeynodes[ActionStatus.ACTION_FINISHED])
# or
is_successful = execute_action(action_node, wait_time=3)  # bool
```

### Finish action

Function `finish_action` connects status class to action node:

```python
def finish_action(action_node: ScAddr, status: Idtf = ActionStatus.ACTION_FINISHED) -> ScAddr: ...
```

Function `finish_action_with_status` connects `action_finished` and `action_finished_(un)successfully` statuses to it:

```python
def finish_action_with_status(action_node: ScAddr, is_success: bool = True) -> None: ...
```

![finish_action](docs/schemes/png/finish_action.png)

```python
from sc_client.constants import sc_types
from sc_kpm import ScKeynodes
from sc_kpm.identifiers import ActionStatus
from sc_kpm.utils import create_node, check_edge
from sc_kpm.utils.action_utils import finish_action, finish_action_with_status

action_node = create_node(sc_types.NODE_CONST)

finish_action_with_status(action_node)
# or
finish_action_with_status(action_node, True)
# or
finish_action(action_node, ActionStatus.ACTION_FINISHED)
finish_action(action_node, ActionStatus.ACTION_FINISHED_SUCCESSFULLY)

action_finished = ScKeynodes[ActionStatus.ACTION_FINISHED]
action_finished_successfully = ScKeynodes[ActionStatus.ACTION_FINISHED_SUCCESSFULLY]
assert check_edge(sc_types.EDGE_ACCESS_VAR_POS_PERM, action_finished, action_node)
assert check_edge(sc_types.EDGE_ACCESS_VAR_POS_PERM, action_finished_successfully, action_node)
```

# Use-cases

- Script for creating and registration agent until user press ^C:
    - [based on ScAgentClassic](docs/examples/register_and_wait_for_user.py)
- Scripts for creating agent to calculate sum of two arguments and confirm result:
    - [based on ScAgent](docs/examples/sum_agent.py)
    - [based on ScAgentClassic](docs/examples/sum_agent_classic.py)
- Logging examples:
    - [pretty project logging](docs/examples/pretty_logging.py)

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/ostis-ai/py-sc-kpm",
    "name": "py-sc-kpm",
    "maintainer": null,
    "docs_url": null,
    "requires_python": "<4,>=3.8",
    "maintainer_email": null,
    "keywords": "sc-kpm, kpm, knowledge processing",
    "author": "ostis-ai",
    "author_email": null,
    "download_url": "https://files.pythonhosted.org/packages/3e/8d/2e56857b1d5ef1986b6b6502eaca73ee6c60f2603074813662666e789cf5/py_sc_kpm-0.3.0.tar.gz",
    "platform": null,
    "description": "# py-sc-kpm\n\nThe python implementation of the knowledge processing machine (kpm)\nfor [sc-machine](https://github.com/ostis-ai/sc-machine).\nLibrary provides tools for interacting with knowledge bases.\nCommunication with server is implemented in separate library\nnamed [py-sc-client](https://github.com/ostis-ai/py-sc-client).\nThis module is compatible with 0.9.0 version\nof [OSTIS Platform](https://github.com/ostis-ai/ostis-web-platform).\n\n# API Reference\n\n1. [Classes](#classes)\n   + [ScKeynodes](#sckeynodes)\n   + [ScAgent](#scagent-and-scagentclassic)\n   + [ScModule](#scmodule)\n   + [ScServer](#scserver)\n2. [Utils](#utils)\n   + [Common utils](#common-utils)\n   + [Creating utils](#creating-utils)\n   + [Retrieve utils](#retrieve-utils)\n   + [Action utils](#action-utils)\n3. [Use-cases](#use-cases)\n\n## Classes\n\nThe library contains the python implementation of useful classes and functions to work with the sc-memory.\n\n### ScKeynodes\n\nClass which provides the ability to cache the identifier and ScAddr of keynodes stored in the KB.\n\n```python\nfrom sc_client.constants import sc_types\nfrom sc_kpm import ScKeynodes\n\n# Get the provided identifier\nScKeynodes[\"identifier_of_keynode\"]  # Returns an ScAddr of the given identifier\n\n# Get the unprovided identifier\nScKeynodes[\"not_stored_in_kb\"]  # Raises InvalidValueError if an identifier doesn't exist in the KB\nScKeynodes.get(\"not_stored_in_kb\")  # Returns an invalid ScAddr(0) in the same situation\n\n# Resolve identifier\nScKeynodes.resolve(\"my_class_node\", sc_types.NODE_CONST_CLASS)  # Returns the element if it exists, otherwise creates\nScKeynodes.resolve(\"some_node\", None)  # Returns the element if it exists, otherwise returns an invalid ScAddr(0)\n\n# Delete identifier\nScKeynodes.delete(\"identifier_to_delete\")  # Delete keynode from kb and ScKeynodes cache\n\n# Get rrel node\nScKeynodes.rrel_index(1)  # Returns valid ScAddr of 'rrel_1'\nScKeynodes.rrel_index(11)  # Raises KeyError if index more than 10\nScKeynodes.rrel_index(\"some_str\")  # Raises TypeError if index is not int\n```\n\n### ScAgent and ScAgentClassic\n\nA classes for handling a single ScEvent. Define your agents like this:\n\n```python\nfrom sc_client.models import ScAddr\nfrom sc_kpm import ScAgent, ScAgentClassic, ScResult\n\n\nclass ScAgentTest(ScAgent):\n    def on_event(self, class_node: ScAddr, edge: ScAddr, action_node: ScAddr) -> ScResult:\n        ...\n        return ScResult.OK\n\n\nclass ScAgentClassicTest(ScAgentClassic):\n    def on_event(self, class_node: ScAddr, edge: ScAddr, action_node: ScAddr) -> ScResult:\n        # ScAgentClassic automatically checks its action\n        ...\n        return ScResult.OK\n```\n\nFor the ScAgent initialization you should define the sc-element and the type of the ScEvent.\n\nFor the ScAgentClassic initialization\nyou should define the identifier of the action class node and arguments of the ScAgent.\n`event_class` is set to the `action_initiated` keynode by default.\n`event_type` is set to the `ScEventType.ADD_OUTGOING_EDGE` type by default.\n\n**ClassicScAgent checks its action element automatically and doesn't run `on_event` method if checking fails.**\n\n```python\nfrom sc_client.constants import sc_types\nfrom sc_client.constants.common import ScEventType\nfrom sc_kpm import ScKeynodes\n\naction_class = ScKeynodes.resolve(\"test_class\", sc_types.NODE_CONST_CLASS)\nagent = ScAgentTest(action_class, ScEventType.ADD_OUTGOING_EDGE)\n\nclassic_agent = ScAgentClassicTest(\"classic_test_class\")\nclassic_agent_ingoing = ScAgentClassicTest(\"classic_test_class\", ScEventType.ADD_INGOING_EDGE)\n```\n\n### ScModule\n\nA class for handling multiple ScAgent objects.\nDefine your modules like this:\n\n```python\nfrom sc_kpm import ScModule\n\nmodule = ScModule(\n    agent1,\n    agent2,\n)\n...\nmodule.add_agent(agent3)\n...\nmodule.remove_agent(agent3)\n```\n\n_Note: you don't need remove agents in the end of program._\n\n### ScServer\n\nA class for serving, register ScModule objects.\n\nFirstly you need connect to server. You can use connect/disconnect methods:\n\n```python\nfrom sc_kpm import ScServer\n\nSC_SERVER_URL = \"ws://localhost:8090/ws_json\"\nserver = ScServer(SC_SERVER_URL)\nserver.connect()\n...\nserver.disconnect()\n```\n\nOr with-statement. We recommend it because it easier to use, and it's safe:\n\n```python\nfrom sc_kpm import ScServer\n\nSC_SERVER_URL = \"ws://localhost:8090/ws_json\"\nserver = ScServer(SC_SERVER_URL)\nwith server.connect():\n    ...\n```\n\nAfter connection, you can add and remove your modules. Manage your modules like this:\n\n```python\n...\nwith server.connect():\n    module = ScModule(...)\n    server.add_modules(module)\n    ...\n    server.remove_modules(module)\n```\n\nBut the modules are still not registered. For this use register_modules/unregister_modules methods:\n\n```python\n...\nwith server.connect():\n    ...\n    server.register_modules()\n    ...\n    server.unregister_modules()\n```\n\nOr one mode with-statement.\nWe also recommend to use so because it guarantees a safe agents unregistration if errors occur:\n\n```python\n...\nwith server.connect():\n    ...\n    with server.register_modules():\n        ...\n```\n\nIf you needn't separate connecting and registration, you can do it all using one command:\n\n```python\nwith server.start():\n    ...\n# or\nserver.start()\n...\nserver.stop()\n```\n\nThere is also method for stopping program until a SIGINT signal (or ^C, or terminate in IDE) is received.\nSo you can leave agents registered for a long time:\n\n```python\n...\nwith server.connect():\n    # Creating some agents\n    with server.register_modules():\n        # Registration some agents\n        server.serve()  # Agents will be active until ^C\n```\n\n### ScSets\n\nSc-set is a construction that presents main node called `set_node` and linked elements.\nThere is no limit in types for sc-set elements:\n\n![sc-set example](docs/schemes/png/set.png)\n\n#### ScSet\n\n- *sc_kpm.sc_sets*.**ScSet**\n\nClass for handling simple sc-sets.\nIt is the parent class for `ScOrientedSet` and `ScNumberedSet`\n\nMethods and properties:\n\n1. *ScSet*(*elements: ScAddr, set_node: ScAddr = None, set_node_type: ScType = None) -> None\n\n   Constructor of sc-set receives all elements to add, optional `set_node` and optional `set_node_type`.\n   If you don't specify `set_node`, it will be created with `set_node_type` (default NODE_CONST).\n\n2. *ScSet*.**add**(*elements: ScAddr) -> None\n\n   Add elements to the end of sc-set construction.\n\n3. *ScSet*.**set_node** -> ScAddr\n\n   Property to give the **main node** of sc-set.\n\n4. *ScSet* == *ScSet* -> bool\n\n   Check sc-sets have the same set_nodes.\n\n5. *ScSet*.**elements_set** -> Set[ScAddr]\n\n   Property to give all elements from sc-set as a set.\n   Use it if you don't need order in ordered sets.\n\n6. **iter**(*ScSet*) -> Iterator[ScAddr]\n\n   Dunder method for iterating by sc-set.\n   Lazy algorithm (ScOrientedSet and ScNumberedSet).\n\n7. **len**(*ScSet*) -> int\n\n   Fast dunder method to give **count of elements** (power of sc-set).\n\n8. **bool**(*ScSet*) -> bool\n\n   Dunder method for if-statement: True if there are elements in sc-set.\n\n9. *ScSet*.**is_empty**() -> bool\n\n   True if there are **no elements** in sc-set.\n\n10. *ScAddr* **in** *ScSet* -> bool\n\n    Dunder method: True if sc-set contains element.\n\n11. *ScSet*.**clear**() -> bool\n\n    Remove all elements from sc-set.\n\n12. *ScSet*.**remove**(*elements: ScAddr) -> None\n\n    Remove elements from sc-set.\n    *WARNING*: method isn't optimized in ScOrientedSet and ScNumberedSet\n\n```python\nfrom sc_client.constants import sc_types\nfrom sc_client.models import ScAddr\n\nfrom sc_kpm.sc_sets import ScSet\nfrom sc_kpm.utils import create_node, create_nodes\n\n# Example elements to add\nexample_set_node: ScAddr = create_node(sc_types.NODE_CONST)\nelements: list[ScAddr] = create_nodes(*[sc_types.NODE_CONST] * 5)\n\n# Init sc-set and add elements\nempty_set = ScSet()\n\nset_with_elements = ScSet(elements[0], elements[1])\nset_with_elements.add(elements[2], elements[3])\nset_with_elements.add(elements[4])\n\nset_with_elements_and_set_node = ScSet(elements[2], set_node=example_set_node)\nempty_set_with_specific_set_node_type = ScSet(set_node_type=sc_types.NODE_VAR)\n\n# Get set node\nset_node = empty_set.set_node\nassert set_with_elements_and_set_node.set_node == example_set_node\n\n# Get elements: list and set\nassert set_with_elements.elements_set == set(elements)  # set view faster and safe\n\n# Iterate by elements\nfor element in set_with_elements:\n    print(element)\n\n# Length, bool, is_empty, in\nassert len(set_with_elements) == len(elements)\nassert bool(set_with_elements)\nassert not set_with_elements.is_empty()\nassert empty_set.is_empty()\nassert elements[2] in set_with_elements\n\n# Clear and remove\nset_with_elements.remove(elements[4])\nassert len(set_with_elements), len(elements) - 1\nset_with_elements.clear()\nassert set_with_elements.is_empty()\n```\n\n##### ScStructure\n\nIf `set_node` has type `sc_types.NODE_CONST_STRUCT` construction is called sc-structure\nand looks like a loop in SCg:\n\n![structure](docs/schemes/png/structure.png)\n![structure2](docs/schemes/png/structure_circuit.png)\n\n- *sc_kpm*.**ScStructure**\n\nClass for handling structure construction in the kb.\nThe same logic as in `ScSet`, but *set_node_type* if set NODE_CONST_STRUCT.\nThere are checks that set node has struct sc-type:\n\n```python\nfrom sc_kpm.sc_sets import ScStructure\n\nsc_struct = ScStructure(..., set_node=..., set_node_type=...)\n\nsc_struct = ScStructure(..., set_node=create_node(sc_types.NODE_CONST))  # InvalidTypeError - not struct type\nsc_struct = ScStructure(..., set_node_type=sc_types.NODE_CONST)  # InvalidTypeError - not struct type\n```\n\n#### Ordered sc-sets\n\nScOrientedSet and ScNumberedSet are ordered sc-constructions.\nChild classes of ScSet, have the order in iteration and get list elements:\n\n- *Sc{ordered}Set*.**elements_set** -> List[ScAddr]\n\n  Get the list of all elements with order\n\n##### ScOrientedSet\n\n- *sc_kpm*.**ScOrientedSet**\n\n![oriented_set_example](docs/schemes/png/oriented_set.png)\n\nClass for handling sc-oriented-set construction.\nHas marked edges between edges from set_node to elements.\nEasy lazy iterating.\nNo access by index.\n\n```python\nfrom sc_client.constants import sc_types\n\nfrom sc_kpm.sc_sets import ScOrientedSet\nfrom sc_kpm.utils import create_nodes\n\n\nelements = create_nodes(*[sc_types.NODE_CONST] * 5)\nnumbered_set = ScOrientedSet(*elements)\nassert numbered_set.elements_list == elements\n```\n\n##### ScNumberedSet\n\n- *sc_kpm*.**ScNumberedSet**\n\n![numbered_set_example](docs/schemes/png/numbered_set.png)\n\nClass for handling sc-numbered-set construction.\nSet-node is edged with numerating each element with rrel node.\nEasy access to elements by index (index i is marked with rrel(i + 1))\n\n```python\nfrom sc_client.constants import sc_types\n\nfrom sc_kpm.sc_sets import ScNumberedSet\nfrom sc_kpm.utils import create_nodes\n\n\nelements = create_nodes(*[sc_types.NODE_CONST] * 5)\nnumbered_set = ScNumberedSet(*elements)\nassert numbered_set.elements_list == elements\nassert numbered_set[2] == elements[2]\nnumbered_set[5]  # raise KeyError\n```\n\n## Utils\n\nThere are some functions for working with nodes, edges, links: create them, search, get content, delete, etc.\nThere is also possibility to wrap in set or oriented set.\n\n## Common utils\n\nThere are utils to work with basic elements\n\n_You can import these utils from `sc_kpm.utils`_\n\n### Nodes creating\n\nIf you want to create one or more nodes use these functions with type setting argument:\n\n```python\ndef create_node(node_type: ScType, sys_idtf: str = None) -> ScAddr: ...\n\n\ndef create_nodes(*node_types: ScType) -> List[ScAddr]: ...\n```\n\n`sys_idtf` is optional name of keynode if you want to add it there.\n\n```python\nfrom sc_client.constants import sc_types\nfrom sc_kpm import ScKeynodes\nfrom sc_kpm.utils.common_utils import create_node, create_nodes\n\nlang = create_node(sc_types.NODE_CONST_CLASS)  # ScAddr(...)\nlang_en = create_node(sc_types.NODE_CONST_CLASS)  # ScAddr(...)\nassert lang.is_valid() and lang_en.is_valid()\nelements = create_nodes(sc_types.NODE_CONST, sc_types.NODE_VAR)  # [ScAddr(...), ScAddr(...)]\nassert len(elements) == 2\nassert all(element.is_valid() for element in elements)\n```\n\n### Edges creating\n\nFor creating edge between **src** and **trg** with setting its type use **create_edge** function:\n\n```python\ndef create_edge(edge_type: ScType, src: ScAddr, trg: ScAddr) -> ScAddr: ...\n\ndef create_edges(edge_type: ScType, src: ScAddr, *targets: ScAddr) -> List[ScAddr]: ...\n```\n\n```python\nfrom sc_client.constants import sc_types\nfrom sc_kpm.utils import create_nodes\nfrom sc_kpm.utils import create_edge, create_edges\n\nsrc, trg, trg2, trg3 = create_nodes(*[sc_types.NODE_CONST] * 4)\nedge = create_edge(sc_types.EDGE_ACCESS_CONST_POS_PERM, src, trg)  # ScAddr(...)\nedges = create_edges(sc_types.EDGE_ACCESS_CONST_POS_PERM, src, trg2, trg3)  # [ScAddr(...), ScAddr(...)]\nassert edge.is_valid()\nassert all(edges)\n```\n\nFunction **is_valid()** is used for validation addresses of nodes or edges.\n\n### Links creating\n\nFor creating links with string type content (by default) use these functions:\n\n```python\ndef create_link(\n        content: Union[str, int],\n        content_type: ScLinkContentType = ScLinkContentType.STRING,\n        link_type: ScType = sc_types.LINK_CONST\n) -> ScAddr: ...\n\n\ndef create_links(\n        *contents: Union[str, int],\n        content_type: ScLinkContentType = ScLinkContentType.STRING,\n        link_type: ScType = sc_types.LINK_CONST,\n) -> List[ScAddr]: ...\n```\n\nYou may use **ScLinkContentType.STRING** and **ScLinkContentType.INT** types for content of created links.\n\n```python\nfrom sc_client.constants import sc_types\nfrom sc_client.models import ScLinkContentType\nfrom sc_kpm.utils import create_link, create_links\n\nmsg = create_link(\"hello\")  # ScAddr(...)\nfour = create_link(4, ScLinkContentType.INT)  # ScAddr(...)\nwater = create_link(\"water\", link_type=sc_types.LINK_VAR)  # ScAddr(...)\nnames = create_links(\"Sam\", \"Pit\")  # [ScAddr(...), ScAddr(...)]\n```\n\n### Relations creating\n\nCreate different binary relations with these functions:\n\n```python\ndef create_binary_relation(edge_type: ScType, src: ScAddr, trg: ScAddr, *relations: ScAddr) -> ScAddr: ...\n\n\ndef create_role_relation(src: ScAddr, trg: ScAddr, *rrel_nodes: ScAddr) -> ScAddr: ...\n\n\ndef create_norole_relation(src: ScAddr, trg: ScAddr, *nrel_nodes: ScAddr) -> ScAddr: ...\n```\n\n```python\nfrom sc_client.constants import sc_types\nfrom sc_kpm import ScKeynodes\nfrom sc_kpm.utils import create_node, create_nodes\nfrom sc_kpm.utils import create_binary_relation, create_role_relation, create_norole_relation\n\nsrc, trg = create_nodes(*[sc_types.NODE_CONST] * 2)\nincrease_relation = create_node(sc_types.NODE_CONST_CLASS)\n\nbrel = create_binary_relation(sc_types.EDGE_ACCESS_CONST_POS_PERM, src, trg, increase_relation)  # ScAddr(...)\nrrel = create_role_relation(src, trg, ScKeynodes.rrel_index(1))  # ScAddr(...)\nnrel = create_norole_relation(src, trg, create_node(sc_types.NODE_CONST_NOROLE))  # ScAddr(...)\n```\n\n### Deleting utils\n\nIf you want to remove all edges between two nodes, which define by their type use\n\n```python\ndef delete_edges(source: ScAddr, target: ScAddr, *edge_types: ScType) -> bool: ...\n```\n\nIt return **True** if operations was successful and **False** otherwise.\n\n```python\nfrom sc_client.constants import sc_types\nfrom sc_kpm.utils import create_nodes, create_edge\nfrom sc_kpm.utils import delete_edges\n\nsrc, trg = create_nodes(*[sc_types.NODE_CONST] * 2)\nedge = create_edge(sc_types.EDGE_ACCESS_CONST_POS_PERM, src, trg)\ndelete_edges(src, trg, sc_types.EDGE_ACCESS_CONST_POS_PERM)  # True\n```\n\n### Getting edges\n\nFor getting edge or edges between two nodes use:\n\n```python\ndef get_edge(source: ScAddr, target: ScAddr, edge_type: ScType) -> ScAddr: ...\n\n\ndef get_edges(source: ScAddr, target: ScAddr, *edge_types: ScType) -> List[ScAddr]: ...\n```\n\n_**NOTE: Use VAR type instead of CONST in getting utils**_\n\n```python\nfrom sc_client.constants import sc_types\nfrom sc_kpm.utils import create_nodes, create_edge\nfrom sc_kpm.utils import get_edge, get_edges\n\nsrc, trg = create_nodes(*[sc_types.NODE_CONST] * 2)\nedge1 = create_edge(sc_types.EDGE_ACCESS_CONST_POS_PERM, src, trg)\nedge2 = create_edge(sc_types.EDGE_D_COMMON_CONST, src, trg)\n\nclass_edge = get_edge(src, trg, sc_types.EDGE_ACCESS_VAR_POS_PERM)  # ScAddr(...)\nassert class_edge == edge1\nedges = get_edges(src, trg, sc_types.EDGE_ACCESS_VAR_POS_PERM, sc_types.EDGE_D_COMMON_VAR)  # [ScAddr(...), ScAddr(...)]\nassert edges == [edge1, edge2]\n```\n\n### Getting elements by relation\n\nGet target element by source element and relation:\n\n```python\ndef get_element_by_role_relation(src: ScAddr, rrel_node: ScAddr) -> ScAddr: ...\n\n\ndef get_element_by_norole_relation(src: ScAddr, nrel_node: ScAddr) -> ScAddr: ...\n```\n\n```python\nfrom sc_client.constants import sc_types\nfrom sc_kpm import ScKeynodes\nfrom sc_kpm.identifiers import CommonIdentifiers\nfrom sc_kpm.utils import create_nodes, create_role_relation, create_norole_relation\nfrom sc_kpm.utils import get_element_by_role_relation, get_element_by_norole_relation\n\nsrc, trg_rrel, trg_nrel = create_nodes(*[sc_types.NODE_CONST] * 3)\nrrel = create_role_relation(src, trg_rrel, ScKeynodes.rrel_index(1))  # ScAddr(...)\nnrel = create_norole_relation(src, trg_nrel, ScKeynodes[CommonIdentifiers.NREL_SYSTEM_IDENTIFIER])  # ScAddr(...)\n\nresult_rrel = get_element_by_role_relation(src, ScKeynodes.rrel_index(1))  # ScAddr(...)\nassert result_rrel == trg_rrel\nresult_nrel = get_element_by_norole_relation(src, ScKeynodes[CommonIdentifiers.NREL_SYSTEM_IDENTIFIER])  # ScAddr(...)\nassert result_nrel == trg_nrel\n```\n\n### Getting link content\n\nFor existed links you may get their content by address with this function:\n\n```python\ndef get_link_content(link: ScAddr) -> Union[str, int]: ...\n```\n\n```python\nfrom sc_kpm.utils import create_link\nfrom sc_kpm.utils import get_link_content_data\n\nwater = create_link(\"water\")\ncontent = get_link_content_data(water)  # \"water\"\n```\n\n### Getting system identifier\n\nFor getting system identifier of keynode use:\n\n```python\ndef get_system_idtf(addr: ScAddr) -> str: ...\n```\n\n```python\nfrom sc_client.constants import sc_types\nfrom sc_kpm import ScKeynodes\nfrom sc_kpm.utils import create_node\nfrom sc_kpm.utils import get_system_idtf\n\nlang_en = create_node(sc_types.NODE_CONST_CLASS)  # ScAddr(...)\nidtf = get_system_idtf(lang_en)  # \"lang_en\"\nassert ScKeynodes[idtf] == lang_en\n```\n\n## Action utils\n\nUtils to work with actions, events and agents\n\n![agent_base](docs/schemes/png/agent_base.png)\n\n### Check action class\n\n```python\ndef check_action_class(action_class: Union[ScAddr, Idtf], action_node: ScAddr) -> bool: ...\n```\n\nThis function returns **True** if action class has connection to action node.\nYou can use identifier of action class instead of ScAddr.\nThis function should not be used in the ScAgentClassic.\n\n![check action class](docs/schemes/png/check_action_class.png)\n\n```python\nfrom sc_client.constants import sc_types\nfrom sc_kpm import ScKeynodes\nfrom sc_kpm.identifiers import CommonIdentifiers\nfrom sc_kpm.utils import create_node, create_edge\nfrom sc_kpm.utils.action_utils import check_action_class\n\naction_node = create_node(sc_types.NODE_CONST)\ncreate_edge(sc_types.EDGE_ACCESS_CONST_POS_PERM, ScKeynodes[CommonIdentifiers.ACTION], action_node)\naction_class = create_node(sc_types.NODE_CONST_CLASS)\ncreate_edge(sc_types.EDGE_ACCESS_CONST_POS_PERM, action_class, action_node)\n\nassert check_action_class(action_class, action_node)\n# or\nassert check_action_class(\"some_classification\", action_node)\n```\n\n### Get action arguments\n\nFor getting list of action arguments concatenated by `rrel_[1 -> count]` use:\n\n```python\ndef get_action_arguments(action_class: Union[ScAddr, Idtf], count: int) -> List[ScAddr]: ...\n```\n\n![check action class](docs/schemes/png/get_arguments.png)\n\n```python\nfrom sc_client.constants import sc_types\nfrom sc_kpm import ScKeynodes\nfrom sc_kpm.identifiers import CommonIdentifiers\nfrom sc_kpm.utils import create_node, create_edge, create_role_relation\nfrom sc_kpm.utils.action_utils import get_action_arguments\n\naction_node = create_node(sc_types.NODE_CONST)\n\n# Static argument\nargument1 = create_node(sc_types.NODE_CONST)\ncreate_role_relation(action_node, argument1, ScKeynodes.rrel_index(1))\n\n# Dynamic argument\ndynamic_node = create_node(sc_types.NODE_CONST)\nrrel_dynamic_arg = ScKeynodes[CommonIdentifiers.RREL_DYNAMIC_ARGUMENT]\ncreate_role_relation(action_node, dynamic_node, rrel_dynamic_arg, ScKeynodes.rrel_index(2))\nargument2 = create_node(sc_types.NODE_CONST)\ncreate_edge(sc_types.EDGE_ACCESS_CONST_POS_TEMP, dynamic_node, argument2)\n\narguments = get_action_arguments(action_node, 2)\nassert arguments == [argument1, dynamic_node]\n```\n\n### Create and get action answer\n\n```python\ndef create_action_answer(action_node: ScAddr, *elements: ScAddr) -> None: ...\n\n\ndef get_action_answer(action_node: ScAddr) -> ScAddr: ...\n```\n\nCreate and get structure with output of action\n\n![agent answer](docs/schemes/png/agent_answer.png)\n\n```python\nfrom sc_client.constants import sc_types\nfrom sc_kpm.utils import create_node\nfrom sc_kpm.utils.action_utils import create_action_answer, get_action_answer\nfrom sc_kpm.sc_sets import ScStructure\n\naction_node = create_node(sc_types.NODE_CONST_STRUCT)\nanswer_element = create_node(sc_types.NODE_CONST_STRUCT)\ncreate_action_answer(action_node, answer_element)\nresult = get_action_answer(action_node)\nresult_elements = ScStructure(result).elements_set\nassert result_elements == {answer_element}\n```\n\n### Call, execute and wait agent\n\nAgent call function: creates **action node** with some arguments, concepts and connects it to the node with initiation identifier.\nReturns **action node**\n\n```python\ndef call_agent(\n        arguments: Dict[ScAddr, IsDynamic],\n        concepts: List[Idtf],\n        initiation: Idtf = ActionStatus.ACTION_INITIATED,\n) -> ScAddr: ...\n```\n\nAgent wait function: Waits for creation of edge to reaction node for some seconds.\nDefault reaction_node is `action_finished`.\n\n```python\ndef wait_agent(seconds: float, action_node: ScAddr, reaction_node: ScAddr = None) -> None: ...\n```\n\nAgent execute function: combines two previous functions -- calls, waits and returns action node and **True** if success\n\n```python\ndef execute_agent(\n        arguments: Dict[ScAddr, IsDynamic],\n        concepts: List[Idtf],\n        initiation: Idtf = ActionStatus.ACTION_INITIATED,\n        reaction: Idtf = ActionStatus.ACTION_FINISHED_SUCCESSFULLY,\n        wait_time: int = COMMON_WAIT_TIME,  # 5\n) -> Tuple[ScAddr, bool]: ...\n```\n\n\n![execute_agent](docs/schemes/png/execute_agent.png)\n\n```python\nfrom sc_client.models import ScLinkContentType\nfrom sc_kpm import ScKeynodes\nfrom sc_kpm.identifiers import CommonIdentifiers, ActionStatus\nfrom sc_kpm.utils import create_link\nfrom sc_kpm.utils.action_utils import execute_agent, call_agent, wait_agent\n\narg1 = create_link(2, ScLinkContentType.INT)\narg2 = create_link(3, ScLinkContentType.INT)\n\nkwargs = dict(\n    arguments={arg1: False, arg2: False},\n    concepts=[CommonIdentifiers.ACTION, \"some_class_name\"],\n)\n\naction = call_agent(**kwargs)  # ScAddr(...)\nwait_agent(3, action, ScKeynodes[ActionStatus.ACTION_FINISHED])\n# or\naction, is_successfully = execute_agent(**kwargs, wait_time=3)  # ScAddr(...), bool\n```\n\n### Create, call and execute action\n\nFunctions that allow to create action and add arguments and call later.\n\nFunction that creates action with concepts and return its ScAddr.\n\n```python\ndef create_action(*concepts: Idtf) -> ScAddr: ...\n```\n\nFunction that creates arguments of action\n\n```python\ndef add_action_arguments(action_node: ScAddr, arguments: Dict[ScAddr, IsDynamic]) -> None: ...\n```\n\nNow you can call or execute action.\nAction call functions don't return action_node because it's parameter to them.\n\n```python\ndef call_action(\n        action_node: ScAddr, initiation: Idtf = ActionStatus.ACTION_INITIATED\n) -> None: ...\n```\n\n```python\ndef execute_action(\n        action_node: ScAddr,\n        initiation: Idtf = ActionStatus.ACTION_INITIATED,\n        reaction: Idtf = ActionStatus.ACTION_FINISHED_SUCCESSFULLY,\n        wait_time: float = COMMON_WAIT_TIME,\n) -> bool: ...\n```\n\nExample:\n\n```python\nfrom sc_client.models import ScLinkContentType\nfrom sc_kpm import ScKeynodes\nfrom sc_kpm.identifiers import CommonIdentifiers, ActionStatus\nfrom sc_kpm.utils import create_link\nfrom sc_kpm.utils.action_utils import add_action_arguments, call_action, create_action, execute_action, wait_agent\n\narg1 = create_link(2, ScLinkContentType.INT)\narg2 = create_link(3, ScLinkContentType.INT)\n\naction_node = create_action(CommonIdentifiers.ACTION, \"some_class_name\")  # ScAddr(...)\n# Do something here\narguments = {arg1: False, arg2: False}\n\nadd_action_arguments(action_node, arguments)\ncall_action(action_node)\nwait_agent(3, action_node, ScKeynodes[ActionStatus.ACTION_FINISHED])\n# or\nis_successful = execute_action(action_node, wait_time=3)  # bool\n```\n\n### Finish action\n\nFunction `finish_action` connects status class to action node:\n\n```python\ndef finish_action(action_node: ScAddr, status: Idtf = ActionStatus.ACTION_FINISHED) -> ScAddr: ...\n```\n\nFunction `finish_action_with_status` connects `action_finished` and `action_finished_(un)successfully` statuses to it:\n\n```python\ndef finish_action_with_status(action_node: ScAddr, is_success: bool = True) -> None: ...\n```\n\n![finish_action](docs/schemes/png/finish_action.png)\n\n```python\nfrom sc_client.constants import sc_types\nfrom sc_kpm import ScKeynodes\nfrom sc_kpm.identifiers import ActionStatus\nfrom sc_kpm.utils import create_node, check_edge\nfrom sc_kpm.utils.action_utils import finish_action, finish_action_with_status\n\naction_node = create_node(sc_types.NODE_CONST)\n\nfinish_action_with_status(action_node)\n# or\nfinish_action_with_status(action_node, True)\n# or\nfinish_action(action_node, ActionStatus.ACTION_FINISHED)\nfinish_action(action_node, ActionStatus.ACTION_FINISHED_SUCCESSFULLY)\n\naction_finished = ScKeynodes[ActionStatus.ACTION_FINISHED]\naction_finished_successfully = ScKeynodes[ActionStatus.ACTION_FINISHED_SUCCESSFULLY]\nassert check_edge(sc_types.EDGE_ACCESS_VAR_POS_PERM, action_finished, action_node)\nassert check_edge(sc_types.EDGE_ACCESS_VAR_POS_PERM, action_finished_successfully, action_node)\n```\n\n# Use-cases\n\n- Script for creating and registration agent until user press ^C:\n    - [based on ScAgentClassic](docs/examples/register_and_wait_for_user.py)\n- Scripts for creating agent to calculate sum of two arguments and confirm result:\n    - [based on ScAgent](docs/examples/sum_agent.py)\n    - [based on ScAgentClassic](docs/examples/sum_agent_classic.py)\n- Logging examples:\n    - [pretty project logging](docs/examples/pretty_logging.py)\n",
    "bugtrack_url": null,
    "license": "MIT",
    "summary": "The Python implementation of the knowledge processing module for knowledge manipulations",
    "version": "0.3.0",
    "project_urls": {
        "Bug Reports": "https://github.com/ostis-ai/py-sc-kpm/issues",
        "Homepage": "https://github.com/ostis-ai/py-sc-kpm",
        "Source": "https://github.com/ostis-ai/py-sc-kpm"
    },
    "split_keywords": [
        "sc-kpm",
        " kpm",
        " knowledge processing"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "8aa0a04bedb4ba3f9256eb5f252e5adb2b19350a6d843835d4e4e1e3c1c38b04",
                "md5": "42486970389336417253bbdb4312825d",
                "sha256": "86c08d7b061c3da5f47e31857236e10bcb1a0980dd514ed02e49e159aa84fd95"
            },
            "downloads": -1,
            "filename": "py_sc_kpm-0.3.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "42486970389336417253bbdb4312825d",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": "<4,>=3.8",
            "size": 25136,
            "upload_time": "2024-07-29T17:24:14",
            "upload_time_iso_8601": "2024-07-29T17:24:14.940334Z",
            "url": "https://files.pythonhosted.org/packages/8a/a0/a04bedb4ba3f9256eb5f252e5adb2b19350a6d843835d4e4e1e3c1c38b04/py_sc_kpm-0.3.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "3e8d2e56857b1d5ef1986b6b6502eaca73ee6c60f2603074813662666e789cf5",
                "md5": "a5be490aaa9ac847c2752e8e41704fdc",
                "sha256": "df00bace096ed051babcb673c565eb0902a115c7e991218ac4af10d2e649941c"
            },
            "downloads": -1,
            "filename": "py_sc_kpm-0.3.0.tar.gz",
            "has_sig": false,
            "md5_digest": "a5be490aaa9ac847c2752e8e41704fdc",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": "<4,>=3.8",
            "size": 26550,
            "upload_time": "2024-07-29T17:24:16",
            "upload_time_iso_8601": "2024-07-29T17:24:16.567711Z",
            "url": "https://files.pythonhosted.org/packages/3e/8d/2e56857b1d5ef1986b6b6502eaca73ee6c60f2603074813662666e789cf5/py_sc_kpm-0.3.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-07-29 17:24:16",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "ostis-ai",
    "github_project": "py-sc-kpm",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "tox": true,
    "lcname": "py-sc-kpm"
}
        
Elapsed time: 0.63618s