ast-tool-py


Nameast-tool-py JSON
Version 0.16.1 PyPI version JSON
download
home_pageNone
SummaryAsterix data processing tool
upload_time2024-09-01 17:43:04
maintainerNone
docs_urlNone
authorNone
requires_python>=3.8
licenseNone
keywords asterix eurocontrol radar
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Asterix processing tool - python version

Features:

- random asterix data generator
- asterix decoder to text output
- UDP receiver and transmitter with multicast mode support
- asterix category/edition detector
- stream recorder and replay
- support for multiple recording file formats
- simple integration with other standard command line tools via stdin/stdout
- user defined asterix data processing with custom script

## Installation

Install from python package index:

``` bash
pip install ast-tool-py
```

### Other installation methods and remarks

This installation procedures requires `python >= 3.7` and `pip >= 21.3`.
Tested under `ubuntu-22.04` and `ubuntu-20.04`.

Prepare virtual environment:

```bash
sudo apt -y install python3-venv
python3 -m venv env
source env/bin/activate
python3 -m pip install wheel # might be required (e.g. under ubuntu 20.04)
```

Under some older OS versions (like ubuntu-18.04) it might be necessary to upgrade the
the required versions first. In this case, the procedure to prepare the environment
should be something like:

```bash
sudo apt -y install python3.8 python3.8-venv python3-pip
python3.8 -m venv env
source env/bin/activate
python3 -m pip install --upgrade pip
python --version # check version
pip --version # check version
python3 -m pip install wheel
```

Install latest git version of this project and check installation:

```bash
python3 -m pip install "git+https://github.com/zoranbosnjak/asterix-tool.git#subdirectory=ast-tool-py"
ast-tool-py --version
ast-tool-py --help
```

### Offline installation

If the target server is offline (that is: without internet access), use the
following installation procedure (tested with `ubuntu-22.04`):

- prepare installation bundle on auxilary server with internet access
- transfer files to offline server
- on the offline server, install from local disk

**NOTE**:
At the time of writing, `ubuntu-22.04` contains `pip` version `22.0.2` with a bug.
For proper operation, `pip` needs to be upgraded on both auxilary and target
server. Tested with `pip` version `23.3.1`.

#### Prepare installation bundle

Run on the server with internet access.

```bash
sudo apt -y install python3-pip
mkdir ast-tool-py-bundle
cd ast-tool-py-bundle

# check pip version, upgrade if necessary (see note above)
pip3 --version
python3 -m pip install --upgrade pip
pip3 --version
# download python support packages and 'ast-tool-py' package
python3 -m pip download -d . pip setuptools wheel
python3 -m pip download -d . "git+https://github.com/zoranbosnjak/asterix-tool.git#subdirectory=ast-tool-py"
```

#### Install on offline server

It is assumed that target server has `python`, `pip` and `venv` installed already.
If required, install:

```bash
sudo apt -y install python3-pip python3-venv
```

Manually transfer `ast-tool-py-bundle/` to the target server and install.

```bash
# prepare 'env'
cd
python3 -m venv env
source env/bin/activate

cd ast-tool-py-bundle

# check pip version, upgrade if necessary (see note above)
pip3 --version
python3 -m pip install --upgrade --no-index ./pip*
pip3 --version

# install ast-tool-py package
python3 -m pip install --no-index --find-links=./ ./ast-tool*
ast-tool-py --version
ast-tool-py --help
```

## Common arguments

### Category, edition and expansion selection

Some commands (`random`, `decode`...) need selection of asterix categories
and edition. By default all available categories and the latest editions
are used. The following command line arguments are available to adjust the
setting (can be specified multiple times).

- `--empty-selection` - start with empty selection instead of *all latest*
- `--cat CAT EDITION` - add/replace category with explicit edition
- `--ref CAT EDITION` - add/replace expansion with explicit edition
- `--expand CAT ITEM-NAME` - Use expansion definition with selected topitem,
  for example `--expand 62 RE`

### Data flow operating mode

The following options are available when program is used in a
*bash pipeline*:

- `--simple-input` - force simple mode for data input
- `--simple-output` - force simple mode for data output
- `-s` or `--simple` - force simple mode for data input and output
  (this is the same as setting `--simple-input` and `--simple-output`)
- `--no-flush` - do not flush output on each event (use buffering)

Examples:

```bash
# generate random output stream (CTRL-C to terminate)
ast-tool-py random

# generate random output with simple output mode
ast-tool-py --simple-output random

# next process must use '--simple-input', to match
ast-tool-py --simple-output random | ast-tool-py --simple-input decode
# ... or using the short form
ast-tool-py -s random | ast-tool-py -s decode
```

## Getting help

```bash
ast-tool-py --help                   # general help
ast-tool-py {subcommand-name} --help # subcommand help
```

## Subcommands

### `random` command

*Pseudo random asterix data generator*

Examples:

```bash
# generate random data
ast-tool-py random

# limit number of generated samples
ast-tool-py random | stdbuf -oL head -n 10

# random streem is different on each program run
# try multiple times, expect different result
ast-tool-py -s random | stdbuf -oL head -n 10 | sha1sum

# unless a seed value is fixed in which case
# try multiple times, expect same result
ast-tool-py -s random --seed 0 | stdbuf -oL head -n 10 | sha1sum

# generate only asterix category 062, edition 1.19
ast-tool-py manifest | grep "062" # show available editions
ast-tool-py --empty-selection --cat 062 1.19 random

# generate all categories and for cat062, generate 'RE' expansion field too
ast-tool-py --expand 62 RE random

# limit sample generation speed/rate (various options)
ast-tool-py random --sleep 0.5
ast-tool-py random | while read x; do echo "$x"; sleep 0.5; done
ast-tool-py random | pv -qL 300

# prepend/append some string to generated samples in simple format
ast-tool-py -s random | awk '{print "0: "$1}'

# set random channel name for each event, choose from ['ch1', 'ch2']
ast-tool-py random --channel ch1 --channel ch2
```

### `decode` command

*Asterix decoder to plain text*

Examples:

```bash
# decode random data
ast-tool-py random | ast-tool-py decode

# decode random data, truncate output to 80 chars
ast-tool-py random | ast-tool-py decode --truncate 80

# decode, truncate, parse only up to the level of 'records' or 'items
ast-tool-py random | ast-tool-py decode --truncate 80 --parsing-level 3
ast-tool-py random | ast-tool-py decode --truncate 80 --parsing-level 4

# generate and decode 'RE' expansion field too
ast-tool-py --expand 62 RE random | ast-tool-py --expand 62 RE decode
```

Run simple self-test:

Check if the tool can decode it's own random data, ignore decoding results.
This bash pipeline shall run without error until interrupted.

```bash
ast-tool-py random | ast-tool-py decode --stop-on-error > /dev/null
# press CTRL-C to interrupt
```

### `from-udp`, `to-udp` commands

*UDP datagram receiver/transmitter*

Examples:

```bash
# send random data to UDP
ast-tool-py random | ast-tool-py to-udp --unicast "*" 127.0.0.1 56780

# forward UDP from one port to another
ast-tool-py from-udp --unicast "ch1" 127.0.0.1 56780 \
    | ast-tool-py to-udp --unicast "*" 127.0.0.1 56781

# decode data from UDP
ast-tool-py from-udp --unicast "ch1" 127.0.0.1 56781 | ast-tool-py decode

# distribute data by channel name (ch1 -> 56001, ch2 -> 56002)
ast-tool-py random --sleep 0.3 --channel ch1 --channel ch2 \
    | ast-tool-py to-udp \
        --unicast "ch1" 127.0.0.1 56001 \
        --unicast "ch2" 127.0.0.1 56002

# monitor result on individual UDP ports
ast-tool-py from-udp --unicast "ch1" 127.0.0.1 56001
ast-tool-py from-udp --unicast "ch2" 127.0.0.1 56002
```

### `inspect` command

*Detect valid/invalid asterix editions in a stream*

This command inspects a stream and tryes to decode asterix with all defined
asterix category/edition combinations. It runs until the stream is exhausted
or until the process is interrupted.

Examples:

```bash
# inspect random samples:
ast-tool-py random | stdbuf -oL head -n 1000 | ast-tool-py inspect

# inspect network traffic (CTRL-C to stop after some observation time)
ast-tool-py from-udp --unicast "ch1" 127.0.0.1 56780 | ast-tool-py inspect
```

### `record`, `replay` commands

*Record/replay data to/from a file*

Examples:

```bash
# save random data
ast-tool-py random --sleep 0.2 | stdbuf -oL head -n 10 | ast-tool-py record
# ... to a file
ast-tool-py random --sleep 0.2 | stdbuf -oL head -n 10 | ast-tool-py record | tee recording.simple
ast-tool-py random --sleep 0.2 | stdbuf -oL head -n 10 | ast-tool-py record > recording.simple

# use binary final file format
ast-tool-py record --help # check supported recording file formats
ast-tool-py random --sleep 0.2 | stdbuf -oL head -n 10 \
    | ast-tool-py record --format final > recording.ff

# replay at normal/full speed
ast-tool-py replay recording.simple
ast-tool-py replay recording.simple --full-speed

# use different replay file format
ast-tool-py replay --help # check supported replay file formats
ast-tool-py replay --format final recording.ff

# replay from gzipped file (not supported with 'pcap' format)
gzip recording.simple
cat recording.simple.gz | gunzip | ast-tool-py replay
zcat recording.simple.gz | ast-tool-py replay # or using 'zcat'
```

### `custom` command

*Running custom python script*

This command dynamically imports a custom `python` script and runs required
function (custom script entry point). The entry point function shall accept
the following arguments:

- `base`, `gen` - base and generated asterix module (encoder/decoder), see
  python asterix library:
  [libasterix](https://github.com/zoranbosnjak/asterix-libs/tree/main/libs/python#readme)
- `io` - standard input/output instance
- `args` - program arguments

Custom script can use:

- `io.rx` to fetch events as data *consumer*,
  for example to decode and display asterix data in any data-serialization
  format (json, xml, bson...)
- `io.tx` to generate events as data *producer*,
  for example: reading and parsing non-standard recording file from disk
- both `io.rx` and `io.tx` to act as custom data *filter* with arbitrary
  data manipulation capabilities

#### Minimal example

```python
# -- custom.py script
def custom(base, gen, io, args):
    print("Hello from custom script!")
    print(args.args) # explicit arguments
    print(args)      # all program arguments
```

Test:

```bash
ast-tool-py custom --script custom.py --call custom --args "additional arguments, any string"
```

#### Example: transparent filter

Basic filtering loop (transparent filter), use `io.rx` and `io.tx`.

```python
# -- custom.py script
def custom(base, gen, io, args):
    for event in io.rx():
        io.tx(event)
```

Test:

```bash
ast-tool-py random \
    | ast-tool-py custom --script custom.py --call custom \
    | ast-tool-py decode
```

#### Example: Channel filter

Drop events unless `channel == "ch1"`.

```python
# -- custom.py script
def custom(base, gen, io, args):
    for event in io.rx():
        (t_mono, t_utc, channel, data) = event
        if channel != "ch1":
            continue
        io.tx(event)
```

```bash
# expect only 'ch1' on output
ast-tool-py random --channel ch1 --channel ch2 --channel ch3 \
    | ast-tool-py custom --script custom.py --call custom
```

#### Example: Make channel name configurable from command line

Use `args`.

```python
# -- custom.py script
def custom(base, gen, io, args):
    valid_channels = args.args.strip().split()
    for event in io.rx():
        (t_mono, t_utc, channel, data) = event
        if not channel in valid_channels:
            continue
        io.tx(event)
```

Specify channels with command line argument.

```bash
# expect 'ch1' and 'ch2' on output
ast-tool-py random --channel ch1 --channel ch2 --channel ch3 \
    | ast-tool-py custom --script custom.py --call custom --args "ch1 ch2"
```

### Custom asterix processing examples

Note: This project is using
[libasterix](https://github.com/zoranbosnjak/asterix-libs/tree/main/libs/python#readme)
for asterix data processing. The `asterix` module is automatically imported
and available in custom script (it does not require separate installation step).

In general, if both `rx` and `tx` are used, custom scripts are in the form similar to
the code snipped below. User might decide to handle exceptions differently.

```python
# -- custom.py script

# custom script entry point
def custom(base, gen, io, args):
    cfg = setup(base, gen)
    for event in io.rx():
        try:
            result = handle_event(cfg, event)
        except Exception as e:
            raise Exception('problem: ', event, e)
        io.tx(result)

# prepare configuration for handle_event function
def setup(base, gen):
    return (base,gen) # for example if complete module is required

# actual event handler
def handle_event(cfg, event):
    (t_mono, t_utc, channel, data) = event
    # process data in some way...
    return event # possible modified event
```

#### Example: Print number of datablocks per datagram

```python
# -- custom.py script

# custom script entry point
def custom(base, gen, io, args):
    cfg = setup(base)
    for event in io.rx():
        handle_event(cfg, event)

def setup(base):
    return (base.RawDatablock, base.Bits)

def handle_event(cfg, event):
    RawDatablock, Bits = cfg
    (t_mono, t_utc, channel, data) = event
    raw_datablocks = RawDatablock.parse(Bits.from_bytes(data))
    print(t_utc, channel, len(raw_datablocks))
```

Test:

```bash
ast-tool-py random --channel ch1 --channel ch2 --channel ch3 \
    | ast-tool-py custom --script custom.py --call custom
```

#### Example: Search for some specific events

... for example *north marker* and *sector crossing* message in category 034.

```python
# -- custom.py script

# custom script entry point
def custom(base, gen, io, args):
    cfg = setup((base, gen))
    for event in io.rx():
        handle_event(cfg, event)

def setup(cfg):
    base, gen = cfg
    return (base, gen.Cat_034_1_29) # use cat 034, explicit edition

def handle_event(cfg, event):
    base, Spec = cfg
    (t_mono, t_utc, channel, data) = event
    # parse to raw datablocks
    bits = base.Bits.from_bytes(data)
    raw_datablocks = base.RawDatablock.parse(bits)
    for raw_db in raw_datablocks:
        # focus on one category only
        if raw_db.get_category() != Spec.cv_category:
            continue
        # fully parse raw datablock
        records = Spec.cv_uap.parse(raw_db.get_raw_records())
        for rec in records:
            handle_record(t_utc, channel, rec)

def handle_record(t_utc, channel, rec):
    msg_type = rec.get_item('000')
    if msg_type is None:
        return
    x = msg_type.as_uint()
    s = None
    if x == 1:
        s = 'north marker'
    elif x == 2:
        s = 'sector crossing'
    if s is not None:
        print(t_utc, channel, s)
```

Test:

```bash
ast-tool-py --empty-selection --cat 34 1.29 random --channel ch1 --channel ch2 --channel ch3 \
    | ast-tool-py custom --script custom.py --call custom
```

#### Example: Reverse datablocks in each datagram

```python
# -- custom.py script

# custom script entry point
def custom(base, gen, io, args):
    for event in io.rx():
        (t_mono, t_utc, channel, data) = event
        data2 = handle_datagram(base, data)
        event2 = (t_mono, t_utc, channel, data2)
        io.tx(event2)

def handle_datagram(base, data):
    bits = base.Bits.from_bytes(data)
    raw_datablocks = base.RawDatablock.parse(bits)
    if len(raw_datablocks) <= 1:
        return data
    return b''.join([db.unparse().to_bytes() for db in reversed(raw_datablocks)])
```

Test:

```bash
ast-tool-py random | ast-tool-py custom --script custom.py --call custom
```

#### Example: Filter asterix by category

Accept category number as argument, drop other categories.

```python
# -- custom.py script

# custom script entry point
def custom(base, gen, io, args):
    cat = int(args.args.strip())
    for event in io.rx():
        (t_mono, t_utc, channel, data) = event
        bits = base.Bits.from_bytes(data)
        lst = base.RawDatablock.parse(bits)
        lst = [db for db in lst if db.get_category() == cat]
        if not lst:
            continue
        data2 = b''.join([db.unparse().to_bytes() for db in lst])
        event2 = (t_mono, t_utc, channel, data2)
        io.tx(event2)
```

Run custom filter on random data, filte out all categories but `62`.

```bash
ast-tool-py random \
    | ast-tool-py custom --script custom.py --call custom --args 62 \
    | ast-tool-py decode --parsing-level 2 --truncate 80
```

#### Example: Modify category `062`, set `SAC/SIC` codes in item `010` to zero

Keep other items unmodified.

```python
# -- custom.py script

# custom script entry point
def custom(base, gen, io, args):
    Spec = gen.Cat_062_1_20
    for event in io.rx():
        (t_mono, t_utc, channel, data) = event
        bits = base.Bits.from_bytes(data)
        lst = base.RawDatablock.parse(bits)
        lst = [handle_datablock(Spec, db) for db in lst]
        data2 = b''.join([db.unparse().to_bytes() for db in lst])
        event2 = (t_mono, t_utc, channel, data2)
        io.tx(event2)

def handle_datablock(Spec, raw_db):
    if raw_db.get_category() != Spec.cv_category:
        return raw_db
    records1 = Spec.cv_uap.parse(raw_db.get_raw_records())
    records2 = map(handle_record, records1)
    return Spec.create(records2)

def handle_record(rec):
    return rec.set_item('010', (('SAC', 0), ('SIC', 0)))
```

Test:

```bash
ast-tool-py --empty-selection --cat 62 1.20 random | \
    ast-tool-py custom --script custom.py --call custom | \
    ast-tool-py decode | grep \'010\' | grep -E 'SAC|SIC'
```

#### Example: Convert binary asterix to `json` output

This example fully decodes and converts each *event* to `json` format.
Obviously, there are multiple ways to perform such conversion, depending
on user preferences and information that needs to be preserved.
This is one example.

```python
# -- custom.py script

import json

# custom script entry point
def custom(base, gen, io, args):
    cfg = setup(base, gen, args)
    for event in io.rx():
        try:
            obj = convert_event(cfg, event)
        except Exception as e:
            raise Exception('problem: ', event, e)
        print(json.dumps(obj))

def string_to_edition(ed):
    """Convert edition string to a tuple, for example "1.2" -> (1,2)"""
    a,b = ed.split('.')
    return (int(a), int(b))

def get_selection(gen, empty, explicit_cats, explicit_refs):
    """Get category selection."""

    def get_latest(lst):
        return sorted(lst, key=lambda pair: string_to_edition(pair[0]), reverse=True)[0]

    # get latest
    cats = {cat: get_latest(gen.manifest['CATS'][cat].items())[1] for cat in gen.manifest['CATS'].keys()}
    refs = {cat: get_latest(gen.manifest['REFS'][cat].items())[1] for cat in gen.manifest['REFS'].keys()}

    # cleanup if required
    if empty:
        cats = {}
        refs = {}

    # update with explicit editions
    for (a,b,c) in [
        (cats, 'CATS', explicit_cats),
        (refs, 'REFS', explicit_refs),
        ]:
        for (cat,ed) in c:
            cat = int(cat)
            a.update({cat: manifest[b][cat][ed]})
    return {'CATS': cats, 'REFS': refs}

def get_expansions(base, gen, selection, expansions):
    result = []
    for (cat, name) in expansions:
        cat = int(cat)
        assert cat in selection['REFS'].keys(), 'REF not defined'
        spec = selection['CATS'][cat]
        subitem = spec.cv_record.spec(name)
        assert issubclass(subitem.cv_rule.cv_variation, base.Explicit)
        result.append((cat, name))
    return result

def setup(base, gen, args):
    # use command line arguments for asterix category/edition selection
    sel = get_selection(gen, args.empty_selection, args.cat or [], args.ref or [])
    exp = get_expansions(base, gen, sel, args.expand or [])
    return (base, sel, exp)

def convert_event(cfg, event):
    """Turn 'event' to json-serializable object"""
    (t_mono, t_utc, channel, data) = event
    return {
        'tMono':    t_mono,
        'tUtc':     t_utc.isoformat(),
        'channel':  channel,
        'data':     convert_datagram(cfg, data),
    }

def convert_datagram(cfg, data):
    (base, sel, exp) = cfg
    # parse to raw datablocks
    bits = base.Bits.from_bytes(data)
    raw_datablocks = base.RawDatablock.parse(bits)
    return [convert_datablock(cfg, raw_db) for raw_db in raw_datablocks]

def convert_datablock(cfg, raw_db):
    (base, sel, exp) = cfg
    cat = raw_db.get_category()
    spec = sel['CATS'].get(cat)
    if spec is None: # asterix category unknown
        return raw_db.unparse().to_bytes().hex()
    records = spec.cv_uap.parse(raw_db.get_raw_records())
    return {
        'cat': cat,
        'records': [convert_record(cfg, cat, rec) for rec in records]
    }

def convert_record(cfg, cat, rec):
    (base, sel, exp) = cfg

    def convert_content(content):
        if isinstance(content, base.ContentRaw):
            return {'type': 'raw', 'raw': content.as_uint()}
        elif isinstance(content, base.ContentTable):
            return {'type': 'table', 'raw': content.as_uint(), 'str': content.table_value()}
        elif isinstance(content, base.ContentString):
            return {'type': 'string', 'raw': content.as_uint(), 'str': content.as_string()}
        elif isinstance(content, base.ContentInteger):
            return {'type': 'integer', 'raw': content.as_uint(), 'int': content.as_integer()}
        elif isinstance(content, base.ContentQuantity):
            return {'type': 'quantity', 'raw': content.as_uint(), 'float': content._as_quantity()
                    , 'unit': content.__class__.cv_unit}
        elif isinstance(content, base.ContentBds):
            return {'type': 'bds', 'raw': content.as_uint()}
        else:
            raise Exception('internal error, unexpected content', content)

    def convert_rulecontent(rule):
        if isinstance(rule, base.RuleContentContextFree):
            return convert_content(rule.content)
        elif isinstance(rule, base.RuleContentDependent):
            return '(content dependent structure...)'
        else:
            raise Exception('internal error, unexpected rule', rule)

    def convert_variation(var, path):
        if isinstance(var, base.Element):
            return {
                'type': 'Element',
                'content': convert_rulecontent(var.rule),
            }
        elif isinstance (var, base.Group):
            return {
                'type': 'Group',
                'items': [convert_item(i, path) for i in var.arg]
            }
        elif isinstance (var, base.Extended):
            items = []
            for lists in var.arg:
                for i in lists:
                    if i is not None:
                        items.append((convert_item(i, path),))
                    else:
                        items.append('(FX)')
            return {
                'type': 'Extended',
                'items': items,
            }
        elif isinstance (var, base.Repetitive):
            return {
                'type': 'Repetitive',
                'items': [convert_variation(sub, path+[cnt]) for (cnt, sub) in enumerate(var.arg)]
            }
        elif isinstance (var, base.Explicit):
            this_item = (cat, path[0])
            b = var.get_bytes()
            content = b.hex()
            if this_item in exp:
                sub = sel['REFS'][cat].cv_expansion
                bits = base.Bits.from_bytes(b)
                (val, remaining) = sub.parse(bits)
                assert not len(remaining)
                content = convert_expansion(val, path)
            return {
                'type': 'Explicit',
                'content': content,
            }
        elif isinstance (var, base.Compound):
            return {
                'type': 'Compound',
                'items': {name: convert_rulevariation(nsp.rule, path+[name]) for name, nsp in var.arg.items()},
            }
        else:
            raise Exception('internal error, unexpected variation', var.variation, var)

    def convert_item(item, path):
        if isinstance(item, base.Spare):
            return item.as_uint()
        elif isinstance(item, base.Item):
            nsp = item.arg
            name = nsp.__class__.cv_name
            return (name, convert_rulevariation(nsp.rule, path+[name]))
        else:
            raise Exception('internal error, unexpected item', item)

    def convert_rulevariation(rule, path):
        if isinstance(rule, base.RuleVariationContextFree):
            return convert_variation(rule.variation, path)
        elif isinstance(rule, base.RuleVariationDependent):
            return '(content dependent structure...)'
        else:
            raise Exception('internal error, unexpected rule', rule)

    def convert_expansion(var, path):
        return {name: convert_rulevariation(nsp.rule, path+[name]) for name, nsp in var.arg.items()}

    result = {}
    for ui in rec.cv_items_list:
        if not issubclass(ui, base.UapItem):
            continue
        name = ui.cv_non_spare.cv_name
        nsp = rec.get_item(name)
        if nsp is None:
            continue
        result[name] = convert_rulevariation(nsp.rule, [name])
    return result
```

As an example:

- use random input data
- use latest edition for each known category
- in addition, decode cat021 'RE' expansion field

```bash
sudo apt install jq
ast-tool-py --expand 21 RE random --seed 0 --populate-all-items \
    | ast-tool-py --expand 21 RE custom --script custom.py --call custom \
    | jq
```

#### Restamp asterix data to current UTC time

Scenario:

- read data from recording file, filter required channels
- filter out non-asterix data
- restamp several asterix categories to current time
- send to udp destination, according to channel name mapping

```python
# -- custom.py script

import datetime

# cleanup entry point
def cleanup(base, gen, io, args):
    for event in io.rx():
        (t_mono, t_utc, channel, data) = event
        bits = base.Bits.from_bytes(data)
        result = base.RawDatablock.parse(bits)
        if isinstance(result, ValueError): # skip non-asterix
            continue
        io.tx(event)

# restamp entry point
def restamp(base, gen, io, args):
    # for each category specify (edition, item to modify)
    updates = {
        1: (gen.Cat_001_1_4, "141"),
        2: (gen.Cat_002_1_1, '030'),
        19: (gen.Cat_019_1_3, '140'),
        20: (gen.Cat_020_1_10, '140'),
        34: (gen.Cat_034_1_29, '030'),
        48: (gen.Cat_048_1_32, '140'),
        # add more categories here...
    }

    for event in io.rx():
        now = datetime.datetime.now(tz=datetime.timezone.utc)
        (t_mono, t_utc, channel, data) = event
        bits = base.Bits.from_bytes(data)
        result = base.RawDatablock.parse(bits)
        if isinstance(result, ValueError):
            continue
        raw_datablocks = [db for db in result if db.get_category() in updates]
        if not raw_datablocks:
            continue
        t2 = seconds_since_midnight(t_utc)
        t3 = seconds_since_midnight(now)
        lst = [handle_datablock(t2, t3, updates, db) for db in raw_datablocks]
        data2 = b''.join([db.unparse().to_bytes() for db in lst])
        event2 = (t_mono, now, channel, data2)
        io.tx(event2)

def seconds_since_midnight(t):
    """Calculate seconds since midnight."""
    midnight = datetime.datetime.combine(t, datetime.time(0), t.tzinfo)
    dt = t - midnight
    return dt.total_seconds()

def handle_datablock(t2, t3, updates, raw_db):
    cat = raw_db.get_category()
    (Spec, name) = updates.get(cat)
    result = Spec.cv_uap.parse(raw_db.get_raw_records())
    assert not isinstance(result, ValueError)
    records = [handle_record(t2, t3, rec, name) for rec in result]
    return Spec.create(records)

def handle_record(t2, t3, rec, name):
    # compensate original delay/jitter from the recording
    def stamp(t1):
        t1 = t1.variation.content.as_quantity()
        original_delay = t2 - t1
        return t3 - original_delay
    x = rec.get_item(name)
    if x is None:
        return rec
    return rec.set_item(name, stamp(x))
```

Test:

```bash
# create recording file
ast-tool-py random --seed 0 --sleep 0.05 --channel ch1 --channel ch2 \
    | stdbuf -oL head -n 100 \
    | ast-tool-py record | tee recording

# inspect recording file, make sure to use valid editions in custom script
cat recording | ast-tool-py replay | ast-tool-py inspect

# replay, restamp, resend
cat recording | ast-tool-py replay \
    | ast-tool-py custom --script custom.py --call cleanup \
    | ast-tool-py custom --script custom.py --call restamp \
    | ast-tool-py to-udp \
        --unicast ch1 127.0.0.1 56001 \
        --unicast ch2 127.0.0.1 56002
```

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "ast-tool-py",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": "asterix, eurocontrol, radar",
    "author": null,
    "author_email": "Zoran Bo\u0161njak <zoran.bosnjak@sloveniacontrol.si>",
    "download_url": "https://files.pythonhosted.org/packages/2e/96/c7ca5141cff153cf88967d67cdca19d809fe715501ab07883a59236685be/ast_tool_py-0.16.1.tar.gz",
    "platform": null,
    "description": "# Asterix processing tool - python version\n\nFeatures:\n\n- random asterix data generator\n- asterix decoder to text output\n- UDP receiver and transmitter with multicast mode support\n- asterix category/edition detector\n- stream recorder and replay\n- support for multiple recording file formats\n- simple integration with other standard command line tools via stdin/stdout\n- user defined asterix data processing with custom script\n\n## Installation\n\nInstall from python package index:\n\n``` bash\npip install ast-tool-py\n```\n\n### Other installation methods and remarks\n\nThis installation procedures requires `python >= 3.7` and `pip >= 21.3`.\nTested under `ubuntu-22.04` and `ubuntu-20.04`.\n\nPrepare virtual environment:\n\n```bash\nsudo apt -y install python3-venv\npython3 -m venv env\nsource env/bin/activate\npython3 -m pip install wheel # might be required (e.g. under ubuntu 20.04)\n```\n\nUnder some older OS versions (like ubuntu-18.04) it might be necessary to upgrade the\nthe required versions first. In this case, the procedure to prepare the environment\nshould be something like:\n\n```bash\nsudo apt -y install python3.8 python3.8-venv python3-pip\npython3.8 -m venv env\nsource env/bin/activate\npython3 -m pip install --upgrade pip\npython --version # check version\npip --version # check version\npython3 -m pip install wheel\n```\n\nInstall latest git version of this project and check installation:\n\n```bash\npython3 -m pip install \"git+https://github.com/zoranbosnjak/asterix-tool.git#subdirectory=ast-tool-py\"\nast-tool-py --version\nast-tool-py --help\n```\n\n### Offline installation\n\nIf the target server is offline (that is: without internet access), use the\nfollowing installation procedure (tested with `ubuntu-22.04`):\n\n- prepare installation bundle on auxilary server with internet access\n- transfer files to offline server\n- on the offline server, install from local disk\n\n**NOTE**:\nAt the time of writing, `ubuntu-22.04` contains `pip` version `22.0.2` with a bug.\nFor proper operation, `pip` needs to be upgraded on both auxilary and target\nserver. Tested with `pip` version `23.3.1`.\n\n#### Prepare installation bundle\n\nRun on the server with internet access.\n\n```bash\nsudo apt -y install python3-pip\nmkdir ast-tool-py-bundle\ncd ast-tool-py-bundle\n\n# check pip version, upgrade if necessary (see note above)\npip3 --version\npython3 -m pip install --upgrade pip\npip3 --version\n# download python support packages and 'ast-tool-py' package\npython3 -m pip download -d . pip setuptools wheel\npython3 -m pip download -d . \"git+https://github.com/zoranbosnjak/asterix-tool.git#subdirectory=ast-tool-py\"\n```\n\n#### Install on offline server\n\nIt is assumed that target server has `python`, `pip` and `venv` installed already.\nIf required, install:\n\n```bash\nsudo apt -y install python3-pip python3-venv\n```\n\nManually transfer `ast-tool-py-bundle/` to the target server and install.\n\n```bash\n# prepare 'env'\ncd\npython3 -m venv env\nsource env/bin/activate\n\ncd ast-tool-py-bundle\n\n# check pip version, upgrade if necessary (see note above)\npip3 --version\npython3 -m pip install --upgrade --no-index ./pip*\npip3 --version\n\n# install ast-tool-py package\npython3 -m pip install --no-index --find-links=./ ./ast-tool*\nast-tool-py --version\nast-tool-py --help\n```\n\n## Common arguments\n\n### Category, edition and expansion selection\n\nSome commands (`random`, `decode`...) need selection of asterix categories\nand edition. By default all available categories and the latest editions\nare used. The following command line arguments are available to adjust the\nsetting (can be specified multiple times).\n\n- `--empty-selection` - start with empty selection instead of *all latest*\n- `--cat CAT EDITION` - add/replace category with explicit edition\n- `--ref CAT EDITION` - add/replace expansion with explicit edition\n- `--expand CAT ITEM-NAME` - Use expansion definition with selected topitem,\n  for example `--expand 62 RE`\n\n### Data flow operating mode\n\nThe following options are available when program is used in a\n*bash pipeline*:\n\n- `--simple-input` - force simple mode for data input\n- `--simple-output` - force simple mode for data output\n- `-s` or `--simple` - force simple mode for data input and output\n  (this is the same as setting `--simple-input` and `--simple-output`)\n- `--no-flush` - do not flush output on each event (use buffering)\n\nExamples:\n\n```bash\n# generate random output stream (CTRL-C to terminate)\nast-tool-py random\n\n# generate random output with simple output mode\nast-tool-py --simple-output random\n\n# next process must use '--simple-input', to match\nast-tool-py --simple-output random | ast-tool-py --simple-input decode\n# ... or using the short form\nast-tool-py -s random | ast-tool-py -s decode\n```\n\n## Getting help\n\n```bash\nast-tool-py --help                   # general help\nast-tool-py {subcommand-name} --help # subcommand help\n```\n\n## Subcommands\n\n### `random` command\n\n*Pseudo random asterix data generator*\n\nExamples:\n\n```bash\n# generate random data\nast-tool-py random\n\n# limit number of generated samples\nast-tool-py random | stdbuf -oL head -n 10\n\n# random streem is different on each program run\n# try multiple times, expect different result\nast-tool-py -s random | stdbuf -oL head -n 10 | sha1sum\n\n# unless a seed value is fixed in which case\n# try multiple times, expect same result\nast-tool-py -s random --seed 0 | stdbuf -oL head -n 10 | sha1sum\n\n# generate only asterix category 062, edition 1.19\nast-tool-py manifest | grep \"062\" # show available editions\nast-tool-py --empty-selection --cat 062 1.19 random\n\n# generate all categories and for cat062, generate 'RE' expansion field too\nast-tool-py --expand 62 RE random\n\n# limit sample generation speed/rate (various options)\nast-tool-py random --sleep 0.5\nast-tool-py random | while read x; do echo \"$x\"; sleep 0.5; done\nast-tool-py random | pv -qL 300\n\n# prepend/append some string to generated samples in simple format\nast-tool-py -s random | awk '{print \"0: \"$1}'\n\n# set random channel name for each event, choose from ['ch1', 'ch2']\nast-tool-py random --channel ch1 --channel ch2\n```\n\n### `decode` command\n\n*Asterix decoder to plain text*\n\nExamples:\n\n```bash\n# decode random data\nast-tool-py random | ast-tool-py decode\n\n# decode random data, truncate output to 80 chars\nast-tool-py random | ast-tool-py decode --truncate 80\n\n# decode, truncate, parse only up to the level of 'records' or 'items\nast-tool-py random | ast-tool-py decode --truncate 80 --parsing-level 3\nast-tool-py random | ast-tool-py decode --truncate 80 --parsing-level 4\n\n# generate and decode 'RE' expansion field too\nast-tool-py --expand 62 RE random | ast-tool-py --expand 62 RE decode\n```\n\nRun simple self-test:\n\nCheck if the tool can decode it's own random data, ignore decoding results.\nThis bash pipeline shall run without error until interrupted.\n\n```bash\nast-tool-py random | ast-tool-py decode --stop-on-error > /dev/null\n# press CTRL-C to interrupt\n```\n\n### `from-udp`, `to-udp` commands\n\n*UDP datagram receiver/transmitter*\n\nExamples:\n\n```bash\n# send random data to UDP\nast-tool-py random | ast-tool-py to-udp --unicast \"*\" 127.0.0.1 56780\n\n# forward UDP from one port to another\nast-tool-py from-udp --unicast \"ch1\" 127.0.0.1 56780 \\\n    | ast-tool-py to-udp --unicast \"*\" 127.0.0.1 56781\n\n# decode data from UDP\nast-tool-py from-udp --unicast \"ch1\" 127.0.0.1 56781 | ast-tool-py decode\n\n# distribute data by channel name (ch1 -> 56001, ch2 -> 56002)\nast-tool-py random --sleep 0.3 --channel ch1 --channel ch2 \\\n    | ast-tool-py to-udp \\\n        --unicast \"ch1\" 127.0.0.1 56001 \\\n        --unicast \"ch2\" 127.0.0.1 56002\n\n# monitor result on individual UDP ports\nast-tool-py from-udp --unicast \"ch1\" 127.0.0.1 56001\nast-tool-py from-udp --unicast \"ch2\" 127.0.0.1 56002\n```\n\n### `inspect` command\n\n*Detect valid/invalid asterix editions in a stream*\n\nThis command inspects a stream and tryes to decode asterix with all defined\nasterix category/edition combinations. It runs until the stream is exhausted\nor until the process is interrupted.\n\nExamples:\n\n```bash\n# inspect random samples:\nast-tool-py random | stdbuf -oL head -n 1000 | ast-tool-py inspect\n\n# inspect network traffic (CTRL-C to stop after some observation time)\nast-tool-py from-udp --unicast \"ch1\" 127.0.0.1 56780 | ast-tool-py inspect\n```\n\n### `record`, `replay` commands\n\n*Record/replay data to/from a file*\n\nExamples:\n\n```bash\n# save random data\nast-tool-py random --sleep 0.2 | stdbuf -oL head -n 10 | ast-tool-py record\n# ... to a file\nast-tool-py random --sleep 0.2 | stdbuf -oL head -n 10 | ast-tool-py record | tee recording.simple\nast-tool-py random --sleep 0.2 | stdbuf -oL head -n 10 | ast-tool-py record > recording.simple\n\n# use binary final file format\nast-tool-py record --help # check supported recording file formats\nast-tool-py random --sleep 0.2 | stdbuf -oL head -n 10 \\\n    | ast-tool-py record --format final > recording.ff\n\n# replay at normal/full speed\nast-tool-py replay recording.simple\nast-tool-py replay recording.simple --full-speed\n\n# use different replay file format\nast-tool-py replay --help # check supported replay file formats\nast-tool-py replay --format final recording.ff\n\n# replay from gzipped file (not supported with 'pcap' format)\ngzip recording.simple\ncat recording.simple.gz | gunzip | ast-tool-py replay\nzcat recording.simple.gz | ast-tool-py replay # or using 'zcat'\n```\n\n### `custom` command\n\n*Running custom python script*\n\nThis command dynamically imports a custom `python` script and runs required\nfunction (custom script entry point). The entry point function shall accept\nthe following arguments:\n\n- `base`, `gen` - base and generated asterix module (encoder/decoder), see\n  python asterix library:\n  [libasterix](https://github.com/zoranbosnjak/asterix-libs/tree/main/libs/python#readme)\n- `io` - standard input/output instance\n- `args` - program arguments\n\nCustom script can use:\n\n- `io.rx` to fetch events as data *consumer*,\n  for example to decode and display asterix data in any data-serialization\n  format (json, xml, bson...)\n- `io.tx` to generate events as data *producer*,\n  for example: reading and parsing non-standard recording file from disk\n- both `io.rx` and `io.tx` to act as custom data *filter* with arbitrary\n  data manipulation capabilities\n\n#### Minimal example\n\n```python\n# -- custom.py script\ndef custom(base, gen, io, args):\n    print(\"Hello from custom script!\")\n    print(args.args) # explicit arguments\n    print(args)      # all program arguments\n```\n\nTest:\n\n```bash\nast-tool-py custom --script custom.py --call custom --args \"additional arguments, any string\"\n```\n\n#### Example: transparent filter\n\nBasic filtering loop (transparent filter), use `io.rx` and `io.tx`.\n\n```python\n# -- custom.py script\ndef custom(base, gen, io, args):\n    for event in io.rx():\n        io.tx(event)\n```\n\nTest:\n\n```bash\nast-tool-py random \\\n    | ast-tool-py custom --script custom.py --call custom \\\n    | ast-tool-py decode\n```\n\n#### Example: Channel filter\n\nDrop events unless `channel == \"ch1\"`.\n\n```python\n# -- custom.py script\ndef custom(base, gen, io, args):\n    for event in io.rx():\n        (t_mono, t_utc, channel, data) = event\n        if channel != \"ch1\":\n            continue\n        io.tx(event)\n```\n\n```bash\n# expect only 'ch1' on output\nast-tool-py random --channel ch1 --channel ch2 --channel ch3 \\\n    | ast-tool-py custom --script custom.py --call custom\n```\n\n#### Example: Make channel name configurable from command line\n\nUse `args`.\n\n```python\n# -- custom.py script\ndef custom(base, gen, io, args):\n    valid_channels = args.args.strip().split()\n    for event in io.rx():\n        (t_mono, t_utc, channel, data) = event\n        if not channel in valid_channels:\n            continue\n        io.tx(event)\n```\n\nSpecify channels with command line argument.\n\n```bash\n# expect 'ch1' and 'ch2' on output\nast-tool-py random --channel ch1 --channel ch2 --channel ch3 \\\n    | ast-tool-py custom --script custom.py --call custom --args \"ch1 ch2\"\n```\n\n### Custom asterix processing examples\n\nNote: This project is using\n[libasterix](https://github.com/zoranbosnjak/asterix-libs/tree/main/libs/python#readme)\nfor asterix data processing. The `asterix` module is automatically imported\nand available in custom script (it does not require separate installation step).\n\nIn general, if both `rx` and `tx` are used, custom scripts are in the form similar to\nthe code snipped below. User might decide to handle exceptions differently.\n\n```python\n# -- custom.py script\n\n# custom script entry point\ndef custom(base, gen, io, args):\n    cfg = setup(base, gen)\n    for event in io.rx():\n        try:\n            result = handle_event(cfg, event)\n        except Exception as e:\n            raise Exception('problem: ', event, e)\n        io.tx(result)\n\n# prepare configuration for handle_event function\ndef setup(base, gen):\n    return (base,gen) # for example if complete module is required\n\n# actual event handler\ndef handle_event(cfg, event):\n    (t_mono, t_utc, channel, data) = event\n    # process data in some way...\n    return event # possible modified event\n```\n\n#### Example: Print number of datablocks per datagram\n\n```python\n# -- custom.py script\n\n# custom script entry point\ndef custom(base, gen, io, args):\n    cfg = setup(base)\n    for event in io.rx():\n        handle_event(cfg, event)\n\ndef setup(base):\n    return (base.RawDatablock, base.Bits)\n\ndef handle_event(cfg, event):\n    RawDatablock, Bits = cfg\n    (t_mono, t_utc, channel, data) = event\n    raw_datablocks = RawDatablock.parse(Bits.from_bytes(data))\n    print(t_utc, channel, len(raw_datablocks))\n```\n\nTest:\n\n```bash\nast-tool-py random --channel ch1 --channel ch2 --channel ch3 \\\n    | ast-tool-py custom --script custom.py --call custom\n```\n\n#### Example: Search for some specific events\n\n... for example *north marker* and *sector crossing* message in category 034.\n\n```python\n# -- custom.py script\n\n# custom script entry point\ndef custom(base, gen, io, args):\n    cfg = setup((base, gen))\n    for event in io.rx():\n        handle_event(cfg, event)\n\ndef setup(cfg):\n    base, gen = cfg\n    return (base, gen.Cat_034_1_29) # use cat 034, explicit edition\n\ndef handle_event(cfg, event):\n    base, Spec = cfg\n    (t_mono, t_utc, channel, data) = event\n    # parse to raw datablocks\n    bits = base.Bits.from_bytes(data)\n    raw_datablocks = base.RawDatablock.parse(bits)\n    for raw_db in raw_datablocks:\n        # focus on one category only\n        if raw_db.get_category() != Spec.cv_category:\n            continue\n        # fully parse raw datablock\n        records = Spec.cv_uap.parse(raw_db.get_raw_records())\n        for rec in records:\n            handle_record(t_utc, channel, rec)\n\ndef handle_record(t_utc, channel, rec):\n    msg_type = rec.get_item('000')\n    if msg_type is None:\n        return\n    x = msg_type.as_uint()\n    s = None\n    if x == 1:\n        s = 'north marker'\n    elif x == 2:\n        s = 'sector crossing'\n    if s is not None:\n        print(t_utc, channel, s)\n```\n\nTest:\n\n```bash\nast-tool-py --empty-selection --cat 34 1.29 random --channel ch1 --channel ch2 --channel ch3 \\\n    | ast-tool-py custom --script custom.py --call custom\n```\n\n#### Example: Reverse datablocks in each datagram\n\n```python\n# -- custom.py script\n\n# custom script entry point\ndef custom(base, gen, io, args):\n    for event in io.rx():\n        (t_mono, t_utc, channel, data) = event\n        data2 = handle_datagram(base, data)\n        event2 = (t_mono, t_utc, channel, data2)\n        io.tx(event2)\n\ndef handle_datagram(base, data):\n    bits = base.Bits.from_bytes(data)\n    raw_datablocks = base.RawDatablock.parse(bits)\n    if len(raw_datablocks) <= 1:\n        return data\n    return b''.join([db.unparse().to_bytes() for db in reversed(raw_datablocks)])\n```\n\nTest:\n\n```bash\nast-tool-py random | ast-tool-py custom --script custom.py --call custom\n```\n\n#### Example: Filter asterix by category\n\nAccept category number as argument, drop other categories.\n\n```python\n# -- custom.py script\n\n# custom script entry point\ndef custom(base, gen, io, args):\n    cat = int(args.args.strip())\n    for event in io.rx():\n        (t_mono, t_utc, channel, data) = event\n        bits = base.Bits.from_bytes(data)\n        lst = base.RawDatablock.parse(bits)\n        lst = [db for db in lst if db.get_category() == cat]\n        if not lst:\n            continue\n        data2 = b''.join([db.unparse().to_bytes() for db in lst])\n        event2 = (t_mono, t_utc, channel, data2)\n        io.tx(event2)\n```\n\nRun custom filter on random data, filte out all categories but `62`.\n\n```bash\nast-tool-py random \\\n    | ast-tool-py custom --script custom.py --call custom --args 62 \\\n    | ast-tool-py decode --parsing-level 2 --truncate 80\n```\n\n#### Example: Modify category `062`, set `SAC/SIC` codes in item `010` to zero\n\nKeep other items unmodified.\n\n```python\n# -- custom.py script\n\n# custom script entry point\ndef custom(base, gen, io, args):\n    Spec = gen.Cat_062_1_20\n    for event in io.rx():\n        (t_mono, t_utc, channel, data) = event\n        bits = base.Bits.from_bytes(data)\n        lst = base.RawDatablock.parse(bits)\n        lst = [handle_datablock(Spec, db) for db in lst]\n        data2 = b''.join([db.unparse().to_bytes() for db in lst])\n        event2 = (t_mono, t_utc, channel, data2)\n        io.tx(event2)\n\ndef handle_datablock(Spec, raw_db):\n    if raw_db.get_category() != Spec.cv_category:\n        return raw_db\n    records1 = Spec.cv_uap.parse(raw_db.get_raw_records())\n    records2 = map(handle_record, records1)\n    return Spec.create(records2)\n\ndef handle_record(rec):\n    return rec.set_item('010', (('SAC', 0), ('SIC', 0)))\n```\n\nTest:\n\n```bash\nast-tool-py --empty-selection --cat 62 1.20 random | \\\n    ast-tool-py custom --script custom.py --call custom | \\\n    ast-tool-py decode | grep \\'010\\' | grep -E 'SAC|SIC'\n```\n\n#### Example: Convert binary asterix to `json` output\n\nThis example fully decodes and converts each *event* to `json` format.\nObviously, there are multiple ways to perform such conversion, depending\non user preferences and information that needs to be preserved.\nThis is one example.\n\n```python\n# -- custom.py script\n\nimport json\n\n# custom script entry point\ndef custom(base, gen, io, args):\n    cfg = setup(base, gen, args)\n    for event in io.rx():\n        try:\n            obj = convert_event(cfg, event)\n        except Exception as e:\n            raise Exception('problem: ', event, e)\n        print(json.dumps(obj))\n\ndef string_to_edition(ed):\n    \"\"\"Convert edition string to a tuple, for example \"1.2\" -> (1,2)\"\"\"\n    a,b = ed.split('.')\n    return (int(a), int(b))\n\ndef get_selection(gen, empty, explicit_cats, explicit_refs):\n    \"\"\"Get category selection.\"\"\"\n\n    def get_latest(lst):\n        return sorted(lst, key=lambda pair: string_to_edition(pair[0]), reverse=True)[0]\n\n    # get latest\n    cats = {cat: get_latest(gen.manifest['CATS'][cat].items())[1] for cat in gen.manifest['CATS'].keys()}\n    refs = {cat: get_latest(gen.manifest['REFS'][cat].items())[1] for cat in gen.manifest['REFS'].keys()}\n\n    # cleanup if required\n    if empty:\n        cats = {}\n        refs = {}\n\n    # update with explicit editions\n    for (a,b,c) in [\n        (cats, 'CATS', explicit_cats),\n        (refs, 'REFS', explicit_refs),\n        ]:\n        for (cat,ed) in c:\n            cat = int(cat)\n            a.update({cat: manifest[b][cat][ed]})\n    return {'CATS': cats, 'REFS': refs}\n\ndef get_expansions(base, gen, selection, expansions):\n    result = []\n    for (cat, name) in expansions:\n        cat = int(cat)\n        assert cat in selection['REFS'].keys(), 'REF not defined'\n        spec = selection['CATS'][cat]\n        subitem = spec.cv_record.spec(name)\n        assert issubclass(subitem.cv_rule.cv_variation, base.Explicit)\n        result.append((cat, name))\n    return result\n\ndef setup(base, gen, args):\n    # use command line arguments for asterix category/edition selection\n    sel = get_selection(gen, args.empty_selection, args.cat or [], args.ref or [])\n    exp = get_expansions(base, gen, sel, args.expand or [])\n    return (base, sel, exp)\n\ndef convert_event(cfg, event):\n    \"\"\"Turn 'event' to json-serializable object\"\"\"\n    (t_mono, t_utc, channel, data) = event\n    return {\n        'tMono':    t_mono,\n        'tUtc':     t_utc.isoformat(),\n        'channel':  channel,\n        'data':     convert_datagram(cfg, data),\n    }\n\ndef convert_datagram(cfg, data):\n    (base, sel, exp) = cfg\n    # parse to raw datablocks\n    bits = base.Bits.from_bytes(data)\n    raw_datablocks = base.RawDatablock.parse(bits)\n    return [convert_datablock(cfg, raw_db) for raw_db in raw_datablocks]\n\ndef convert_datablock(cfg, raw_db):\n    (base, sel, exp) = cfg\n    cat = raw_db.get_category()\n    spec = sel['CATS'].get(cat)\n    if spec is None: # asterix category unknown\n        return raw_db.unparse().to_bytes().hex()\n    records = spec.cv_uap.parse(raw_db.get_raw_records())\n    return {\n        'cat': cat,\n        'records': [convert_record(cfg, cat, rec) for rec in records]\n    }\n\ndef convert_record(cfg, cat, rec):\n    (base, sel, exp) = cfg\n\n    def convert_content(content):\n        if isinstance(content, base.ContentRaw):\n            return {'type': 'raw', 'raw': content.as_uint()}\n        elif isinstance(content, base.ContentTable):\n            return {'type': 'table', 'raw': content.as_uint(), 'str': content.table_value()}\n        elif isinstance(content, base.ContentString):\n            return {'type': 'string', 'raw': content.as_uint(), 'str': content.as_string()}\n        elif isinstance(content, base.ContentInteger):\n            return {'type': 'integer', 'raw': content.as_uint(), 'int': content.as_integer()}\n        elif isinstance(content, base.ContentQuantity):\n            return {'type': 'quantity', 'raw': content.as_uint(), 'float': content._as_quantity()\n                    , 'unit': content.__class__.cv_unit}\n        elif isinstance(content, base.ContentBds):\n            return {'type': 'bds', 'raw': content.as_uint()}\n        else:\n            raise Exception('internal error, unexpected content', content)\n\n    def convert_rulecontent(rule):\n        if isinstance(rule, base.RuleContentContextFree):\n            return convert_content(rule.content)\n        elif isinstance(rule, base.RuleContentDependent):\n            return '(content dependent structure...)'\n        else:\n            raise Exception('internal error, unexpected rule', rule)\n\n    def convert_variation(var, path):\n        if isinstance(var, base.Element):\n            return {\n                'type': 'Element',\n                'content': convert_rulecontent(var.rule),\n            }\n        elif isinstance (var, base.Group):\n            return {\n                'type': 'Group',\n                'items': [convert_item(i, path) for i in var.arg]\n            }\n        elif isinstance (var, base.Extended):\n            items = []\n            for lists in var.arg:\n                for i in lists:\n                    if i is not None:\n                        items.append((convert_item(i, path),))\n                    else:\n                        items.append('(FX)')\n            return {\n                'type': 'Extended',\n                'items': items,\n            }\n        elif isinstance (var, base.Repetitive):\n            return {\n                'type': 'Repetitive',\n                'items': [convert_variation(sub, path+[cnt]) for (cnt, sub) in enumerate(var.arg)]\n            }\n        elif isinstance (var, base.Explicit):\n            this_item = (cat, path[0])\n            b = var.get_bytes()\n            content = b.hex()\n            if this_item in exp:\n                sub = sel['REFS'][cat].cv_expansion\n                bits = base.Bits.from_bytes(b)\n                (val, remaining) = sub.parse(bits)\n                assert not len(remaining)\n                content = convert_expansion(val, path)\n            return {\n                'type': 'Explicit',\n                'content': content,\n            }\n        elif isinstance (var, base.Compound):\n            return {\n                'type': 'Compound',\n                'items': {name: convert_rulevariation(nsp.rule, path+[name]) for name, nsp in var.arg.items()},\n            }\n        else:\n            raise Exception('internal error, unexpected variation', var.variation, var)\n\n    def convert_item(item, path):\n        if isinstance(item, base.Spare):\n            return item.as_uint()\n        elif isinstance(item, base.Item):\n            nsp = item.arg\n            name = nsp.__class__.cv_name\n            return (name, convert_rulevariation(nsp.rule, path+[name]))\n        else:\n            raise Exception('internal error, unexpected item', item)\n\n    def convert_rulevariation(rule, path):\n        if isinstance(rule, base.RuleVariationContextFree):\n            return convert_variation(rule.variation, path)\n        elif isinstance(rule, base.RuleVariationDependent):\n            return '(content dependent structure...)'\n        else:\n            raise Exception('internal error, unexpected rule', rule)\n\n    def convert_expansion(var, path):\n        return {name: convert_rulevariation(nsp.rule, path+[name]) for name, nsp in var.arg.items()}\n\n    result = {}\n    for ui in rec.cv_items_list:\n        if not issubclass(ui, base.UapItem):\n            continue\n        name = ui.cv_non_spare.cv_name\n        nsp = rec.get_item(name)\n        if nsp is None:\n            continue\n        result[name] = convert_rulevariation(nsp.rule, [name])\n    return result\n```\n\nAs an example:\n\n- use random input data\n- use latest edition for each known category\n- in addition, decode cat021 'RE' expansion field\n\n```bash\nsudo apt install jq\nast-tool-py --expand 21 RE random --seed 0 --populate-all-items \\\n    | ast-tool-py --expand 21 RE custom --script custom.py --call custom \\\n    | jq\n```\n\n#### Restamp asterix data to current UTC time\n\nScenario:\n\n- read data from recording file, filter required channels\n- filter out non-asterix data\n- restamp several asterix categories to current time\n- send to udp destination, according to channel name mapping\n\n```python\n# -- custom.py script\n\nimport datetime\n\n# cleanup entry point\ndef cleanup(base, gen, io, args):\n    for event in io.rx():\n        (t_mono, t_utc, channel, data) = event\n        bits = base.Bits.from_bytes(data)\n        result = base.RawDatablock.parse(bits)\n        if isinstance(result, ValueError): # skip non-asterix\n            continue\n        io.tx(event)\n\n# restamp entry point\ndef restamp(base, gen, io, args):\n    # for each category specify (edition, item to modify)\n    updates = {\n        1: (gen.Cat_001_1_4, \"141\"),\n        2: (gen.Cat_002_1_1, '030'),\n        19: (gen.Cat_019_1_3, '140'),\n        20: (gen.Cat_020_1_10, '140'),\n        34: (gen.Cat_034_1_29, '030'),\n        48: (gen.Cat_048_1_32, '140'),\n        # add more categories here...\n    }\n\n    for event in io.rx():\n        now = datetime.datetime.now(tz=datetime.timezone.utc)\n        (t_mono, t_utc, channel, data) = event\n        bits = base.Bits.from_bytes(data)\n        result = base.RawDatablock.parse(bits)\n        if isinstance(result, ValueError):\n            continue\n        raw_datablocks = [db for db in result if db.get_category() in updates]\n        if not raw_datablocks:\n            continue\n        t2 = seconds_since_midnight(t_utc)\n        t3 = seconds_since_midnight(now)\n        lst = [handle_datablock(t2, t3, updates, db) for db in raw_datablocks]\n        data2 = b''.join([db.unparse().to_bytes() for db in lst])\n        event2 = (t_mono, now, channel, data2)\n        io.tx(event2)\n\ndef seconds_since_midnight(t):\n    \"\"\"Calculate seconds since midnight.\"\"\"\n    midnight = datetime.datetime.combine(t, datetime.time(0), t.tzinfo)\n    dt = t - midnight\n    return dt.total_seconds()\n\ndef handle_datablock(t2, t3, updates, raw_db):\n    cat = raw_db.get_category()\n    (Spec, name) = updates.get(cat)\n    result = Spec.cv_uap.parse(raw_db.get_raw_records())\n    assert not isinstance(result, ValueError)\n    records = [handle_record(t2, t3, rec, name) for rec in result]\n    return Spec.create(records)\n\ndef handle_record(t2, t3, rec, name):\n    # compensate original delay/jitter from the recording\n    def stamp(t1):\n        t1 = t1.variation.content.as_quantity()\n        original_delay = t2 - t1\n        return t3 - original_delay\n    x = rec.get_item(name)\n    if x is None:\n        return rec\n    return rec.set_item(name, stamp(x))\n```\n\nTest:\n\n```bash\n# create recording file\nast-tool-py random --seed 0 --sleep 0.05 --channel ch1 --channel ch2 \\\n    | stdbuf -oL head -n 100 \\\n    | ast-tool-py record | tee recording\n\n# inspect recording file, make sure to use valid editions in custom script\ncat recording | ast-tool-py replay | ast-tool-py inspect\n\n# replay, restamp, resend\ncat recording | ast-tool-py replay \\\n    | ast-tool-py custom --script custom.py --call cleanup \\\n    | ast-tool-py custom --script custom.py --call restamp \\\n    | ast-tool-py to-udp \\\n        --unicast ch1 127.0.0.1 56001 \\\n        --unicast ch2 127.0.0.1 56002\n```\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Asterix data processing tool",
    "version": "0.16.1",
    "project_urls": {
        "Bug Tracker": "https://github.com/zoranbosnjak/ast-tool/issues",
        "Homepage": "https://github.com/zoranbosnjak/asterix-tool/tree/master/ast-tool-py#readme"
    },
    "split_keywords": [
        "asterix",
        " eurocontrol",
        " radar"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "11a2b4cdf09466c0cb99a46254ec371cb61cd1d9db5df1ab20b6a74724b6ddaa",
                "md5": "c73114bde908bab289376cc53ebbe13d",
                "sha256": "4378819f5325fb23445f47feed1fe35983715d4c8d7ac4d156ce6b9e0d0ea56c"
            },
            "downloads": -1,
            "filename": "ast_tool_py-0.16.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "c73114bde908bab289376cc53ebbe13d",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 20762,
            "upload_time": "2024-09-01T17:43:02",
            "upload_time_iso_8601": "2024-09-01T17:43:02.411986Z",
            "url": "https://files.pythonhosted.org/packages/11/a2/b4cdf09466c0cb99a46254ec371cb61cd1d9db5df1ab20b6a74724b6ddaa/ast_tool_py-0.16.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "2e96c7ca5141cff153cf88967d67cdca19d809fe715501ab07883a59236685be",
                "md5": "1a7ad7b9d68a1bef4379a1c28b4cc6ae",
                "sha256": "f075485d2b3537ca4ee166991c0de676846dd93e03e8f246e9867f2b8355bd6b"
            },
            "downloads": -1,
            "filename": "ast_tool_py-0.16.1.tar.gz",
            "has_sig": false,
            "md5_digest": "1a7ad7b9d68a1bef4379a1c28b4cc6ae",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 27818,
            "upload_time": "2024-09-01T17:43:04",
            "upload_time_iso_8601": "2024-09-01T17:43:04.472387Z",
            "url": "https://files.pythonhosted.org/packages/2e/96/c7ca5141cff153cf88967d67cdca19d809fe715501ab07883a59236685be/ast_tool_py-0.16.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-09-01 17:43:04",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "zoranbosnjak",
    "github_project": "ast-tool",
    "github_not_found": true,
    "lcname": "ast-tool-py"
}
        
Elapsed time: 0.95355s