Name | ast-tool-py JSON |
Version |
0.16.1
JSON |
| download |
home_page | None |
Summary | Asterix data processing tool |
upload_time | 2024-09-01 17:43:04 |
maintainer | None |
docs_url | None |
author | None |
requires_python | >=3.8 |
license | None |
keywords |
asterix
eurocontrol
radar
|
VCS |
|
bugtrack_url |
|
requirements |
No requirements were recorded.
|
Travis-CI |
No Travis.
|
coveralls test coverage |
No coveralls.
|
# Asterix processing tool - python version
Features:
- random asterix data generator
- asterix decoder to text output
- UDP receiver and transmitter with multicast mode support
- asterix category/edition detector
- stream recorder and replay
- support for multiple recording file formats
- simple integration with other standard command line tools via stdin/stdout
- user defined asterix data processing with custom script
## Installation
Install from python package index:
``` bash
pip install ast-tool-py
```
### Other installation methods and remarks
This installation procedures requires `python >= 3.7` and `pip >= 21.3`.
Tested under `ubuntu-22.04` and `ubuntu-20.04`.
Prepare virtual environment:
```bash
sudo apt -y install python3-venv
python3 -m venv env
source env/bin/activate
python3 -m pip install wheel # might be required (e.g. under ubuntu 20.04)
```
Under some older OS versions (like ubuntu-18.04) it might be necessary to upgrade the
the required versions first. In this case, the procedure to prepare the environment
should be something like:
```bash
sudo apt -y install python3.8 python3.8-venv python3-pip
python3.8 -m venv env
source env/bin/activate
python3 -m pip install --upgrade pip
python --version # check version
pip --version # check version
python3 -m pip install wheel
```
Install latest git version of this project and check installation:
```bash
python3 -m pip install "git+https://github.com/zoranbosnjak/asterix-tool.git#subdirectory=ast-tool-py"
ast-tool-py --version
ast-tool-py --help
```
### Offline installation
If the target server is offline (that is: without internet access), use the
following installation procedure (tested with `ubuntu-22.04`):
- prepare installation bundle on auxilary server with internet access
- transfer files to offline server
- on the offline server, install from local disk
**NOTE**:
At the time of writing, `ubuntu-22.04` contains `pip` version `22.0.2` with a bug.
For proper operation, `pip` needs to be upgraded on both auxilary and target
server. Tested with `pip` version `23.3.1`.
#### Prepare installation bundle
Run on the server with internet access.
```bash
sudo apt -y install python3-pip
mkdir ast-tool-py-bundle
cd ast-tool-py-bundle
# check pip version, upgrade if necessary (see note above)
pip3 --version
python3 -m pip install --upgrade pip
pip3 --version
# download python support packages and 'ast-tool-py' package
python3 -m pip download -d . pip setuptools wheel
python3 -m pip download -d . "git+https://github.com/zoranbosnjak/asterix-tool.git#subdirectory=ast-tool-py"
```
#### Install on offline server
It is assumed that target server has `python`, `pip` and `venv` installed already.
If required, install:
```bash
sudo apt -y install python3-pip python3-venv
```
Manually transfer `ast-tool-py-bundle/` to the target server and install.
```bash
# prepare 'env'
cd
python3 -m venv env
source env/bin/activate
cd ast-tool-py-bundle
# check pip version, upgrade if necessary (see note above)
pip3 --version
python3 -m pip install --upgrade --no-index ./pip*
pip3 --version
# install ast-tool-py package
python3 -m pip install --no-index --find-links=./ ./ast-tool*
ast-tool-py --version
ast-tool-py --help
```
## Common arguments
### Category, edition and expansion selection
Some commands (`random`, `decode`...) need selection of asterix categories
and edition. By default all available categories and the latest editions
are used. The following command line arguments are available to adjust the
setting (can be specified multiple times).
- `--empty-selection` - start with empty selection instead of *all latest*
- `--cat CAT EDITION` - add/replace category with explicit edition
- `--ref CAT EDITION` - add/replace expansion with explicit edition
- `--expand CAT ITEM-NAME` - Use expansion definition with selected topitem,
for example `--expand 62 RE`
### Data flow operating mode
The following options are available when program is used in a
*bash pipeline*:
- `--simple-input` - force simple mode for data input
- `--simple-output` - force simple mode for data output
- `-s` or `--simple` - force simple mode for data input and output
(this is the same as setting `--simple-input` and `--simple-output`)
- `--no-flush` - do not flush output on each event (use buffering)
Examples:
```bash
# generate random output stream (CTRL-C to terminate)
ast-tool-py random
# generate random output with simple output mode
ast-tool-py --simple-output random
# next process must use '--simple-input', to match
ast-tool-py --simple-output random | ast-tool-py --simple-input decode
# ... or using the short form
ast-tool-py -s random | ast-tool-py -s decode
```
## Getting help
```bash
ast-tool-py --help # general help
ast-tool-py {subcommand-name} --help # subcommand help
```
## Subcommands
### `random` command
*Pseudo random asterix data generator*
Examples:
```bash
# generate random data
ast-tool-py random
# limit number of generated samples
ast-tool-py random | stdbuf -oL head -n 10
# random streem is different on each program run
# try multiple times, expect different result
ast-tool-py -s random | stdbuf -oL head -n 10 | sha1sum
# unless a seed value is fixed in which case
# try multiple times, expect same result
ast-tool-py -s random --seed 0 | stdbuf -oL head -n 10 | sha1sum
# generate only asterix category 062, edition 1.19
ast-tool-py manifest | grep "062" # show available editions
ast-tool-py --empty-selection --cat 062 1.19 random
# generate all categories and for cat062, generate 'RE' expansion field too
ast-tool-py --expand 62 RE random
# limit sample generation speed/rate (various options)
ast-tool-py random --sleep 0.5
ast-tool-py random | while read x; do echo "$x"; sleep 0.5; done
ast-tool-py random | pv -qL 300
# prepend/append some string to generated samples in simple format
ast-tool-py -s random | awk '{print "0: "$1}'
# set random channel name for each event, choose from ['ch1', 'ch2']
ast-tool-py random --channel ch1 --channel ch2
```
### `decode` command
*Asterix decoder to plain text*
Examples:
```bash
# decode random data
ast-tool-py random | ast-tool-py decode
# decode random data, truncate output to 80 chars
ast-tool-py random | ast-tool-py decode --truncate 80
# decode, truncate, parse only up to the level of 'records' or 'items
ast-tool-py random | ast-tool-py decode --truncate 80 --parsing-level 3
ast-tool-py random | ast-tool-py decode --truncate 80 --parsing-level 4
# generate and decode 'RE' expansion field too
ast-tool-py --expand 62 RE random | ast-tool-py --expand 62 RE decode
```
Run simple self-test:
Check if the tool can decode it's own random data, ignore decoding results.
This bash pipeline shall run without error until interrupted.
```bash
ast-tool-py random | ast-tool-py decode --stop-on-error > /dev/null
# press CTRL-C to interrupt
```
### `from-udp`, `to-udp` commands
*UDP datagram receiver/transmitter*
Examples:
```bash
# send random data to UDP
ast-tool-py random | ast-tool-py to-udp --unicast "*" 127.0.0.1 56780
# forward UDP from one port to another
ast-tool-py from-udp --unicast "ch1" 127.0.0.1 56780 \
| ast-tool-py to-udp --unicast "*" 127.0.0.1 56781
# decode data from UDP
ast-tool-py from-udp --unicast "ch1" 127.0.0.1 56781 | ast-tool-py decode
# distribute data by channel name (ch1 -> 56001, ch2 -> 56002)
ast-tool-py random --sleep 0.3 --channel ch1 --channel ch2 \
| ast-tool-py to-udp \
--unicast "ch1" 127.0.0.1 56001 \
--unicast "ch2" 127.0.0.1 56002
# monitor result on individual UDP ports
ast-tool-py from-udp --unicast "ch1" 127.0.0.1 56001
ast-tool-py from-udp --unicast "ch2" 127.0.0.1 56002
```
### `inspect` command
*Detect valid/invalid asterix editions in a stream*
This command inspects a stream and tryes to decode asterix with all defined
asterix category/edition combinations. It runs until the stream is exhausted
or until the process is interrupted.
Examples:
```bash
# inspect random samples:
ast-tool-py random | stdbuf -oL head -n 1000 | ast-tool-py inspect
# inspect network traffic (CTRL-C to stop after some observation time)
ast-tool-py from-udp --unicast "ch1" 127.0.0.1 56780 | ast-tool-py inspect
```
### `record`, `replay` commands
*Record/replay data to/from a file*
Examples:
```bash
# save random data
ast-tool-py random --sleep 0.2 | stdbuf -oL head -n 10 | ast-tool-py record
# ... to a file
ast-tool-py random --sleep 0.2 | stdbuf -oL head -n 10 | ast-tool-py record | tee recording.simple
ast-tool-py random --sleep 0.2 | stdbuf -oL head -n 10 | ast-tool-py record > recording.simple
# use binary final file format
ast-tool-py record --help # check supported recording file formats
ast-tool-py random --sleep 0.2 | stdbuf -oL head -n 10 \
| ast-tool-py record --format final > recording.ff
# replay at normal/full speed
ast-tool-py replay recording.simple
ast-tool-py replay recording.simple --full-speed
# use different replay file format
ast-tool-py replay --help # check supported replay file formats
ast-tool-py replay --format final recording.ff
# replay from gzipped file (not supported with 'pcap' format)
gzip recording.simple
cat recording.simple.gz | gunzip | ast-tool-py replay
zcat recording.simple.gz | ast-tool-py replay # or using 'zcat'
```
### `custom` command
*Running custom python script*
This command dynamically imports a custom `python` script and runs required
function (custom script entry point). The entry point function shall accept
the following arguments:
- `base`, `gen` - base and generated asterix module (encoder/decoder), see
python asterix library:
[libasterix](https://github.com/zoranbosnjak/asterix-libs/tree/main/libs/python#readme)
- `io` - standard input/output instance
- `args` - program arguments
Custom script can use:
- `io.rx` to fetch events as data *consumer*,
for example to decode and display asterix data in any data-serialization
format (json, xml, bson...)
- `io.tx` to generate events as data *producer*,
for example: reading and parsing non-standard recording file from disk
- both `io.rx` and `io.tx` to act as custom data *filter* with arbitrary
data manipulation capabilities
#### Minimal example
```python
# -- custom.py script
def custom(base, gen, io, args):
print("Hello from custom script!")
print(args.args) # explicit arguments
print(args) # all program arguments
```
Test:
```bash
ast-tool-py custom --script custom.py --call custom --args "additional arguments, any string"
```
#### Example: transparent filter
Basic filtering loop (transparent filter), use `io.rx` and `io.tx`.
```python
# -- custom.py script
def custom(base, gen, io, args):
for event in io.rx():
io.tx(event)
```
Test:
```bash
ast-tool-py random \
| ast-tool-py custom --script custom.py --call custom \
| ast-tool-py decode
```
#### Example: Channel filter
Drop events unless `channel == "ch1"`.
```python
# -- custom.py script
def custom(base, gen, io, args):
for event in io.rx():
(t_mono, t_utc, channel, data) = event
if channel != "ch1":
continue
io.tx(event)
```
```bash
# expect only 'ch1' on output
ast-tool-py random --channel ch1 --channel ch2 --channel ch3 \
| ast-tool-py custom --script custom.py --call custom
```
#### Example: Make channel name configurable from command line
Use `args`.
```python
# -- custom.py script
def custom(base, gen, io, args):
valid_channels = args.args.strip().split()
for event in io.rx():
(t_mono, t_utc, channel, data) = event
if not channel in valid_channels:
continue
io.tx(event)
```
Specify channels with command line argument.
```bash
# expect 'ch1' and 'ch2' on output
ast-tool-py random --channel ch1 --channel ch2 --channel ch3 \
| ast-tool-py custom --script custom.py --call custom --args "ch1 ch2"
```
### Custom asterix processing examples
Note: This project is using
[libasterix](https://github.com/zoranbosnjak/asterix-libs/tree/main/libs/python#readme)
for asterix data processing. The `asterix` module is automatically imported
and available in custom script (it does not require separate installation step).
In general, if both `rx` and `tx` are used, custom scripts are in the form similar to
the code snipped below. User might decide to handle exceptions differently.
```python
# -- custom.py script
# custom script entry point
def custom(base, gen, io, args):
cfg = setup(base, gen)
for event in io.rx():
try:
result = handle_event(cfg, event)
except Exception as e:
raise Exception('problem: ', event, e)
io.tx(result)
# prepare configuration for handle_event function
def setup(base, gen):
return (base,gen) # for example if complete module is required
# actual event handler
def handle_event(cfg, event):
(t_mono, t_utc, channel, data) = event
# process data in some way...
return event # possible modified event
```
#### Example: Print number of datablocks per datagram
```python
# -- custom.py script
# custom script entry point
def custom(base, gen, io, args):
cfg = setup(base)
for event in io.rx():
handle_event(cfg, event)
def setup(base):
return (base.RawDatablock, base.Bits)
def handle_event(cfg, event):
RawDatablock, Bits = cfg
(t_mono, t_utc, channel, data) = event
raw_datablocks = RawDatablock.parse(Bits.from_bytes(data))
print(t_utc, channel, len(raw_datablocks))
```
Test:
```bash
ast-tool-py random --channel ch1 --channel ch2 --channel ch3 \
| ast-tool-py custom --script custom.py --call custom
```
#### Example: Search for some specific events
... for example *north marker* and *sector crossing* message in category 034.
```python
# -- custom.py script
# custom script entry point
def custom(base, gen, io, args):
cfg = setup((base, gen))
for event in io.rx():
handle_event(cfg, event)
def setup(cfg):
base, gen = cfg
return (base, gen.Cat_034_1_29) # use cat 034, explicit edition
def handle_event(cfg, event):
base, Spec = cfg
(t_mono, t_utc, channel, data) = event
# parse to raw datablocks
bits = base.Bits.from_bytes(data)
raw_datablocks = base.RawDatablock.parse(bits)
for raw_db in raw_datablocks:
# focus on one category only
if raw_db.get_category() != Spec.cv_category:
continue
# fully parse raw datablock
records = Spec.cv_uap.parse(raw_db.get_raw_records())
for rec in records:
handle_record(t_utc, channel, rec)
def handle_record(t_utc, channel, rec):
msg_type = rec.get_item('000')
if msg_type is None:
return
x = msg_type.as_uint()
s = None
if x == 1:
s = 'north marker'
elif x == 2:
s = 'sector crossing'
if s is not None:
print(t_utc, channel, s)
```
Test:
```bash
ast-tool-py --empty-selection --cat 34 1.29 random --channel ch1 --channel ch2 --channel ch3 \
| ast-tool-py custom --script custom.py --call custom
```
#### Example: Reverse datablocks in each datagram
```python
# -- custom.py script
# custom script entry point
def custom(base, gen, io, args):
for event in io.rx():
(t_mono, t_utc, channel, data) = event
data2 = handle_datagram(base, data)
event2 = (t_mono, t_utc, channel, data2)
io.tx(event2)
def handle_datagram(base, data):
bits = base.Bits.from_bytes(data)
raw_datablocks = base.RawDatablock.parse(bits)
if len(raw_datablocks) <= 1:
return data
return b''.join([db.unparse().to_bytes() for db in reversed(raw_datablocks)])
```
Test:
```bash
ast-tool-py random | ast-tool-py custom --script custom.py --call custom
```
#### Example: Filter asterix by category
Accept category number as argument, drop other categories.
```python
# -- custom.py script
# custom script entry point
def custom(base, gen, io, args):
cat = int(args.args.strip())
for event in io.rx():
(t_mono, t_utc, channel, data) = event
bits = base.Bits.from_bytes(data)
lst = base.RawDatablock.parse(bits)
lst = [db for db in lst if db.get_category() == cat]
if not lst:
continue
data2 = b''.join([db.unparse().to_bytes() for db in lst])
event2 = (t_mono, t_utc, channel, data2)
io.tx(event2)
```
Run custom filter on random data, filte out all categories but `62`.
```bash
ast-tool-py random \
| ast-tool-py custom --script custom.py --call custom --args 62 \
| ast-tool-py decode --parsing-level 2 --truncate 80
```
#### Example: Modify category `062`, set `SAC/SIC` codes in item `010` to zero
Keep other items unmodified.
```python
# -- custom.py script
# custom script entry point
def custom(base, gen, io, args):
Spec = gen.Cat_062_1_20
for event in io.rx():
(t_mono, t_utc, channel, data) = event
bits = base.Bits.from_bytes(data)
lst = base.RawDatablock.parse(bits)
lst = [handle_datablock(Spec, db) for db in lst]
data2 = b''.join([db.unparse().to_bytes() for db in lst])
event2 = (t_mono, t_utc, channel, data2)
io.tx(event2)
def handle_datablock(Spec, raw_db):
if raw_db.get_category() != Spec.cv_category:
return raw_db
records1 = Spec.cv_uap.parse(raw_db.get_raw_records())
records2 = map(handle_record, records1)
return Spec.create(records2)
def handle_record(rec):
return rec.set_item('010', (('SAC', 0), ('SIC', 0)))
```
Test:
```bash
ast-tool-py --empty-selection --cat 62 1.20 random | \
ast-tool-py custom --script custom.py --call custom | \
ast-tool-py decode | grep \'010\' | grep -E 'SAC|SIC'
```
#### Example: Convert binary asterix to `json` output
This example fully decodes and converts each *event* to `json` format.
Obviously, there are multiple ways to perform such conversion, depending
on user preferences and information that needs to be preserved.
This is one example.
```python
# -- custom.py script
import json
# custom script entry point
def custom(base, gen, io, args):
cfg = setup(base, gen, args)
for event in io.rx():
try:
obj = convert_event(cfg, event)
except Exception as e:
raise Exception('problem: ', event, e)
print(json.dumps(obj))
def string_to_edition(ed):
"""Convert edition string to a tuple, for example "1.2" -> (1,2)"""
a,b = ed.split('.')
return (int(a), int(b))
def get_selection(gen, empty, explicit_cats, explicit_refs):
"""Get category selection."""
def get_latest(lst):
return sorted(lst, key=lambda pair: string_to_edition(pair[0]), reverse=True)[0]
# get latest
cats = {cat: get_latest(gen.manifest['CATS'][cat].items())[1] for cat in gen.manifest['CATS'].keys()}
refs = {cat: get_latest(gen.manifest['REFS'][cat].items())[1] for cat in gen.manifest['REFS'].keys()}
# cleanup if required
if empty:
cats = {}
refs = {}
# update with explicit editions
for (a,b,c) in [
(cats, 'CATS', explicit_cats),
(refs, 'REFS', explicit_refs),
]:
for (cat,ed) in c:
cat = int(cat)
a.update({cat: manifest[b][cat][ed]})
return {'CATS': cats, 'REFS': refs}
def get_expansions(base, gen, selection, expansions):
result = []
for (cat, name) in expansions:
cat = int(cat)
assert cat in selection['REFS'].keys(), 'REF not defined'
spec = selection['CATS'][cat]
subitem = spec.cv_record.spec(name)
assert issubclass(subitem.cv_rule.cv_variation, base.Explicit)
result.append((cat, name))
return result
def setup(base, gen, args):
# use command line arguments for asterix category/edition selection
sel = get_selection(gen, args.empty_selection, args.cat or [], args.ref or [])
exp = get_expansions(base, gen, sel, args.expand or [])
return (base, sel, exp)
def convert_event(cfg, event):
"""Turn 'event' to json-serializable object"""
(t_mono, t_utc, channel, data) = event
return {
'tMono': t_mono,
'tUtc': t_utc.isoformat(),
'channel': channel,
'data': convert_datagram(cfg, data),
}
def convert_datagram(cfg, data):
(base, sel, exp) = cfg
# parse to raw datablocks
bits = base.Bits.from_bytes(data)
raw_datablocks = base.RawDatablock.parse(bits)
return [convert_datablock(cfg, raw_db) for raw_db in raw_datablocks]
def convert_datablock(cfg, raw_db):
(base, sel, exp) = cfg
cat = raw_db.get_category()
spec = sel['CATS'].get(cat)
if spec is None: # asterix category unknown
return raw_db.unparse().to_bytes().hex()
records = spec.cv_uap.parse(raw_db.get_raw_records())
return {
'cat': cat,
'records': [convert_record(cfg, cat, rec) for rec in records]
}
def convert_record(cfg, cat, rec):
(base, sel, exp) = cfg
def convert_content(content):
if isinstance(content, base.ContentRaw):
return {'type': 'raw', 'raw': content.as_uint()}
elif isinstance(content, base.ContentTable):
return {'type': 'table', 'raw': content.as_uint(), 'str': content.table_value()}
elif isinstance(content, base.ContentString):
return {'type': 'string', 'raw': content.as_uint(), 'str': content.as_string()}
elif isinstance(content, base.ContentInteger):
return {'type': 'integer', 'raw': content.as_uint(), 'int': content.as_integer()}
elif isinstance(content, base.ContentQuantity):
return {'type': 'quantity', 'raw': content.as_uint(), 'float': content._as_quantity()
, 'unit': content.__class__.cv_unit}
elif isinstance(content, base.ContentBds):
return {'type': 'bds', 'raw': content.as_uint()}
else:
raise Exception('internal error, unexpected content', content)
def convert_rulecontent(rule):
if isinstance(rule, base.RuleContentContextFree):
return convert_content(rule.content)
elif isinstance(rule, base.RuleContentDependent):
return '(content dependent structure...)'
else:
raise Exception('internal error, unexpected rule', rule)
def convert_variation(var, path):
if isinstance(var, base.Element):
return {
'type': 'Element',
'content': convert_rulecontent(var.rule),
}
elif isinstance (var, base.Group):
return {
'type': 'Group',
'items': [convert_item(i, path) for i in var.arg]
}
elif isinstance (var, base.Extended):
items = []
for lists in var.arg:
for i in lists:
if i is not None:
items.append((convert_item(i, path),))
else:
items.append('(FX)')
return {
'type': 'Extended',
'items': items,
}
elif isinstance (var, base.Repetitive):
return {
'type': 'Repetitive',
'items': [convert_variation(sub, path+[cnt]) for (cnt, sub) in enumerate(var.arg)]
}
elif isinstance (var, base.Explicit):
this_item = (cat, path[0])
b = var.get_bytes()
content = b.hex()
if this_item in exp:
sub = sel['REFS'][cat].cv_expansion
bits = base.Bits.from_bytes(b)
(val, remaining) = sub.parse(bits)
assert not len(remaining)
content = convert_expansion(val, path)
return {
'type': 'Explicit',
'content': content,
}
elif isinstance (var, base.Compound):
return {
'type': 'Compound',
'items': {name: convert_rulevariation(nsp.rule, path+[name]) for name, nsp in var.arg.items()},
}
else:
raise Exception('internal error, unexpected variation', var.variation, var)
def convert_item(item, path):
if isinstance(item, base.Spare):
return item.as_uint()
elif isinstance(item, base.Item):
nsp = item.arg
name = nsp.__class__.cv_name
return (name, convert_rulevariation(nsp.rule, path+[name]))
else:
raise Exception('internal error, unexpected item', item)
def convert_rulevariation(rule, path):
if isinstance(rule, base.RuleVariationContextFree):
return convert_variation(rule.variation, path)
elif isinstance(rule, base.RuleVariationDependent):
return '(content dependent structure...)'
else:
raise Exception('internal error, unexpected rule', rule)
def convert_expansion(var, path):
return {name: convert_rulevariation(nsp.rule, path+[name]) for name, nsp in var.arg.items()}
result = {}
for ui in rec.cv_items_list:
if not issubclass(ui, base.UapItem):
continue
name = ui.cv_non_spare.cv_name
nsp = rec.get_item(name)
if nsp is None:
continue
result[name] = convert_rulevariation(nsp.rule, [name])
return result
```
As an example:
- use random input data
- use latest edition for each known category
- in addition, decode cat021 'RE' expansion field
```bash
sudo apt install jq
ast-tool-py --expand 21 RE random --seed 0 --populate-all-items \
| ast-tool-py --expand 21 RE custom --script custom.py --call custom \
| jq
```
#### Restamp asterix data to current UTC time
Scenario:
- read data from recording file, filter required channels
- filter out non-asterix data
- restamp several asterix categories to current time
- send to udp destination, according to channel name mapping
```python
# -- custom.py script
import datetime
# cleanup entry point
def cleanup(base, gen, io, args):
for event in io.rx():
(t_mono, t_utc, channel, data) = event
bits = base.Bits.from_bytes(data)
result = base.RawDatablock.parse(bits)
if isinstance(result, ValueError): # skip non-asterix
continue
io.tx(event)
# restamp entry point
def restamp(base, gen, io, args):
# for each category specify (edition, item to modify)
updates = {
1: (gen.Cat_001_1_4, "141"),
2: (gen.Cat_002_1_1, '030'),
19: (gen.Cat_019_1_3, '140'),
20: (gen.Cat_020_1_10, '140'),
34: (gen.Cat_034_1_29, '030'),
48: (gen.Cat_048_1_32, '140'),
# add more categories here...
}
for event in io.rx():
now = datetime.datetime.now(tz=datetime.timezone.utc)
(t_mono, t_utc, channel, data) = event
bits = base.Bits.from_bytes(data)
result = base.RawDatablock.parse(bits)
if isinstance(result, ValueError):
continue
raw_datablocks = [db for db in result if db.get_category() in updates]
if not raw_datablocks:
continue
t2 = seconds_since_midnight(t_utc)
t3 = seconds_since_midnight(now)
lst = [handle_datablock(t2, t3, updates, db) for db in raw_datablocks]
data2 = b''.join([db.unparse().to_bytes() for db in lst])
event2 = (t_mono, now, channel, data2)
io.tx(event2)
def seconds_since_midnight(t):
"""Calculate seconds since midnight."""
midnight = datetime.datetime.combine(t, datetime.time(0), t.tzinfo)
dt = t - midnight
return dt.total_seconds()
def handle_datablock(t2, t3, updates, raw_db):
cat = raw_db.get_category()
(Spec, name) = updates.get(cat)
result = Spec.cv_uap.parse(raw_db.get_raw_records())
assert not isinstance(result, ValueError)
records = [handle_record(t2, t3, rec, name) for rec in result]
return Spec.create(records)
def handle_record(t2, t3, rec, name):
# compensate original delay/jitter from the recording
def stamp(t1):
t1 = t1.variation.content.as_quantity()
original_delay = t2 - t1
return t3 - original_delay
x = rec.get_item(name)
if x is None:
return rec
return rec.set_item(name, stamp(x))
```
Test:
```bash
# create recording file
ast-tool-py random --seed 0 --sleep 0.05 --channel ch1 --channel ch2 \
| stdbuf -oL head -n 100 \
| ast-tool-py record | tee recording
# inspect recording file, make sure to use valid editions in custom script
cat recording | ast-tool-py replay | ast-tool-py inspect
# replay, restamp, resend
cat recording | ast-tool-py replay \
| ast-tool-py custom --script custom.py --call cleanup \
| ast-tool-py custom --script custom.py --call restamp \
| ast-tool-py to-udp \
--unicast ch1 127.0.0.1 56001 \
--unicast ch2 127.0.0.1 56002
```
Raw data
{
"_id": null,
"home_page": null,
"name": "ast-tool-py",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.8",
"maintainer_email": null,
"keywords": "asterix, eurocontrol, radar",
"author": null,
"author_email": "Zoran Bo\u0161njak <zoran.bosnjak@sloveniacontrol.si>",
"download_url": "https://files.pythonhosted.org/packages/2e/96/c7ca5141cff153cf88967d67cdca19d809fe715501ab07883a59236685be/ast_tool_py-0.16.1.tar.gz",
"platform": null,
"description": "# Asterix processing tool - python version\n\nFeatures:\n\n- random asterix data generator\n- asterix decoder to text output\n- UDP receiver and transmitter with multicast mode support\n- asterix category/edition detector\n- stream recorder and replay\n- support for multiple recording file formats\n- simple integration with other standard command line tools via stdin/stdout\n- user defined asterix data processing with custom script\n\n## Installation\n\nInstall from python package index:\n\n``` bash\npip install ast-tool-py\n```\n\n### Other installation methods and remarks\n\nThis installation procedures requires `python >= 3.7` and `pip >= 21.3`.\nTested under `ubuntu-22.04` and `ubuntu-20.04`.\n\nPrepare virtual environment:\n\n```bash\nsudo apt -y install python3-venv\npython3 -m venv env\nsource env/bin/activate\npython3 -m pip install wheel # might be required (e.g. under ubuntu 20.04)\n```\n\nUnder some older OS versions (like ubuntu-18.04) it might be necessary to upgrade the\nthe required versions first. In this case, the procedure to prepare the environment\nshould be something like:\n\n```bash\nsudo apt -y install python3.8 python3.8-venv python3-pip\npython3.8 -m venv env\nsource env/bin/activate\npython3 -m pip install --upgrade pip\npython --version # check version\npip --version # check version\npython3 -m pip install wheel\n```\n\nInstall latest git version of this project and check installation:\n\n```bash\npython3 -m pip install \"git+https://github.com/zoranbosnjak/asterix-tool.git#subdirectory=ast-tool-py\"\nast-tool-py --version\nast-tool-py --help\n```\n\n### Offline installation\n\nIf the target server is offline (that is: without internet access), use the\nfollowing installation procedure (tested with `ubuntu-22.04`):\n\n- prepare installation bundle on auxilary server with internet access\n- transfer files to offline server\n- on the offline server, install from local disk\n\n**NOTE**:\nAt the time of writing, `ubuntu-22.04` contains `pip` version `22.0.2` with a bug.\nFor proper operation, `pip` needs to be upgraded on both auxilary and target\nserver. Tested with `pip` version `23.3.1`.\n\n#### Prepare installation bundle\n\nRun on the server with internet access.\n\n```bash\nsudo apt -y install python3-pip\nmkdir ast-tool-py-bundle\ncd ast-tool-py-bundle\n\n# check pip version, upgrade if necessary (see note above)\npip3 --version\npython3 -m pip install --upgrade pip\npip3 --version\n# download python support packages and 'ast-tool-py' package\npython3 -m pip download -d . pip setuptools wheel\npython3 -m pip download -d . \"git+https://github.com/zoranbosnjak/asterix-tool.git#subdirectory=ast-tool-py\"\n```\n\n#### Install on offline server\n\nIt is assumed that target server has `python`, `pip` and `venv` installed already.\nIf required, install:\n\n```bash\nsudo apt -y install python3-pip python3-venv\n```\n\nManually transfer `ast-tool-py-bundle/` to the target server and install.\n\n```bash\n# prepare 'env'\ncd\npython3 -m venv env\nsource env/bin/activate\n\ncd ast-tool-py-bundle\n\n# check pip version, upgrade if necessary (see note above)\npip3 --version\npython3 -m pip install --upgrade --no-index ./pip*\npip3 --version\n\n# install ast-tool-py package\npython3 -m pip install --no-index --find-links=./ ./ast-tool*\nast-tool-py --version\nast-tool-py --help\n```\n\n## Common arguments\n\n### Category, edition and expansion selection\n\nSome commands (`random`, `decode`...) need selection of asterix categories\nand edition. By default all available categories and the latest editions\nare used. The following command line arguments are available to adjust the\nsetting (can be specified multiple times).\n\n- `--empty-selection` - start with empty selection instead of *all latest*\n- `--cat CAT EDITION` - add/replace category with explicit edition\n- `--ref CAT EDITION` - add/replace expansion with explicit edition\n- `--expand CAT ITEM-NAME` - Use expansion definition with selected topitem,\n for example `--expand 62 RE`\n\n### Data flow operating mode\n\nThe following options are available when program is used in a\n*bash pipeline*:\n\n- `--simple-input` - force simple mode for data input\n- `--simple-output` - force simple mode for data output\n- `-s` or `--simple` - force simple mode for data input and output\n (this is the same as setting `--simple-input` and `--simple-output`)\n- `--no-flush` - do not flush output on each event (use buffering)\n\nExamples:\n\n```bash\n# generate random output stream (CTRL-C to terminate)\nast-tool-py random\n\n# generate random output with simple output mode\nast-tool-py --simple-output random\n\n# next process must use '--simple-input', to match\nast-tool-py --simple-output random | ast-tool-py --simple-input decode\n# ... or using the short form\nast-tool-py -s random | ast-tool-py -s decode\n```\n\n## Getting help\n\n```bash\nast-tool-py --help # general help\nast-tool-py {subcommand-name} --help # subcommand help\n```\n\n## Subcommands\n\n### `random` command\n\n*Pseudo random asterix data generator*\n\nExamples:\n\n```bash\n# generate random data\nast-tool-py random\n\n# limit number of generated samples\nast-tool-py random | stdbuf -oL head -n 10\n\n# random streem is different on each program run\n# try multiple times, expect different result\nast-tool-py -s random | stdbuf -oL head -n 10 | sha1sum\n\n# unless a seed value is fixed in which case\n# try multiple times, expect same result\nast-tool-py -s random --seed 0 | stdbuf -oL head -n 10 | sha1sum\n\n# generate only asterix category 062, edition 1.19\nast-tool-py manifest | grep \"062\" # show available editions\nast-tool-py --empty-selection --cat 062 1.19 random\n\n# generate all categories and for cat062, generate 'RE' expansion field too\nast-tool-py --expand 62 RE random\n\n# limit sample generation speed/rate (various options)\nast-tool-py random --sleep 0.5\nast-tool-py random | while read x; do echo \"$x\"; sleep 0.5; done\nast-tool-py random | pv -qL 300\n\n# prepend/append some string to generated samples in simple format\nast-tool-py -s random | awk '{print \"0: \"$1}'\n\n# set random channel name for each event, choose from ['ch1', 'ch2']\nast-tool-py random --channel ch1 --channel ch2\n```\n\n### `decode` command\n\n*Asterix decoder to plain text*\n\nExamples:\n\n```bash\n# decode random data\nast-tool-py random | ast-tool-py decode\n\n# decode random data, truncate output to 80 chars\nast-tool-py random | ast-tool-py decode --truncate 80\n\n# decode, truncate, parse only up to the level of 'records' or 'items\nast-tool-py random | ast-tool-py decode --truncate 80 --parsing-level 3\nast-tool-py random | ast-tool-py decode --truncate 80 --parsing-level 4\n\n# generate and decode 'RE' expansion field too\nast-tool-py --expand 62 RE random | ast-tool-py --expand 62 RE decode\n```\n\nRun simple self-test:\n\nCheck if the tool can decode it's own random data, ignore decoding results.\nThis bash pipeline shall run without error until interrupted.\n\n```bash\nast-tool-py random | ast-tool-py decode --stop-on-error > /dev/null\n# press CTRL-C to interrupt\n```\n\n### `from-udp`, `to-udp` commands\n\n*UDP datagram receiver/transmitter*\n\nExamples:\n\n```bash\n# send random data to UDP\nast-tool-py random | ast-tool-py to-udp --unicast \"*\" 127.0.0.1 56780\n\n# forward UDP from one port to another\nast-tool-py from-udp --unicast \"ch1\" 127.0.0.1 56780 \\\n | ast-tool-py to-udp --unicast \"*\" 127.0.0.1 56781\n\n# decode data from UDP\nast-tool-py from-udp --unicast \"ch1\" 127.0.0.1 56781 | ast-tool-py decode\n\n# distribute data by channel name (ch1 -> 56001, ch2 -> 56002)\nast-tool-py random --sleep 0.3 --channel ch1 --channel ch2 \\\n | ast-tool-py to-udp \\\n --unicast \"ch1\" 127.0.0.1 56001 \\\n --unicast \"ch2\" 127.0.0.1 56002\n\n# monitor result on individual UDP ports\nast-tool-py from-udp --unicast \"ch1\" 127.0.0.1 56001\nast-tool-py from-udp --unicast \"ch2\" 127.0.0.1 56002\n```\n\n### `inspect` command\n\n*Detect valid/invalid asterix editions in a stream*\n\nThis command inspects a stream and tryes to decode asterix with all defined\nasterix category/edition combinations. It runs until the stream is exhausted\nor until the process is interrupted.\n\nExamples:\n\n```bash\n# inspect random samples:\nast-tool-py random | stdbuf -oL head -n 1000 | ast-tool-py inspect\n\n# inspect network traffic (CTRL-C to stop after some observation time)\nast-tool-py from-udp --unicast \"ch1\" 127.0.0.1 56780 | ast-tool-py inspect\n```\n\n### `record`, `replay` commands\n\n*Record/replay data to/from a file*\n\nExamples:\n\n```bash\n# save random data\nast-tool-py random --sleep 0.2 | stdbuf -oL head -n 10 | ast-tool-py record\n# ... to a file\nast-tool-py random --sleep 0.2 | stdbuf -oL head -n 10 | ast-tool-py record | tee recording.simple\nast-tool-py random --sleep 0.2 | stdbuf -oL head -n 10 | ast-tool-py record > recording.simple\n\n# use binary final file format\nast-tool-py record --help # check supported recording file formats\nast-tool-py random --sleep 0.2 | stdbuf -oL head -n 10 \\\n | ast-tool-py record --format final > recording.ff\n\n# replay at normal/full speed\nast-tool-py replay recording.simple\nast-tool-py replay recording.simple --full-speed\n\n# use different replay file format\nast-tool-py replay --help # check supported replay file formats\nast-tool-py replay --format final recording.ff\n\n# replay from gzipped file (not supported with 'pcap' format)\ngzip recording.simple\ncat recording.simple.gz | gunzip | ast-tool-py replay\nzcat recording.simple.gz | ast-tool-py replay # or using 'zcat'\n```\n\n### `custom` command\n\n*Running custom python script*\n\nThis command dynamically imports a custom `python` script and runs required\nfunction (custom script entry point). The entry point function shall accept\nthe following arguments:\n\n- `base`, `gen` - base and generated asterix module (encoder/decoder), see\n python asterix library:\n [libasterix](https://github.com/zoranbosnjak/asterix-libs/tree/main/libs/python#readme)\n- `io` - standard input/output instance\n- `args` - program arguments\n\nCustom script can use:\n\n- `io.rx` to fetch events as data *consumer*,\n for example to decode and display asterix data in any data-serialization\n format (json, xml, bson...)\n- `io.tx` to generate events as data *producer*,\n for example: reading and parsing non-standard recording file from disk\n- both `io.rx` and `io.tx` to act as custom data *filter* with arbitrary\n data manipulation capabilities\n\n#### Minimal example\n\n```python\n# -- custom.py script\ndef custom(base, gen, io, args):\n print(\"Hello from custom script!\")\n print(args.args) # explicit arguments\n print(args) # all program arguments\n```\n\nTest:\n\n```bash\nast-tool-py custom --script custom.py --call custom --args \"additional arguments, any string\"\n```\n\n#### Example: transparent filter\n\nBasic filtering loop (transparent filter), use `io.rx` and `io.tx`.\n\n```python\n# -- custom.py script\ndef custom(base, gen, io, args):\n for event in io.rx():\n io.tx(event)\n```\n\nTest:\n\n```bash\nast-tool-py random \\\n | ast-tool-py custom --script custom.py --call custom \\\n | ast-tool-py decode\n```\n\n#### Example: Channel filter\n\nDrop events unless `channel == \"ch1\"`.\n\n```python\n# -- custom.py script\ndef custom(base, gen, io, args):\n for event in io.rx():\n (t_mono, t_utc, channel, data) = event\n if channel != \"ch1\":\n continue\n io.tx(event)\n```\n\n```bash\n# expect only 'ch1' on output\nast-tool-py random --channel ch1 --channel ch2 --channel ch3 \\\n | ast-tool-py custom --script custom.py --call custom\n```\n\n#### Example: Make channel name configurable from command line\n\nUse `args`.\n\n```python\n# -- custom.py script\ndef custom(base, gen, io, args):\n valid_channels = args.args.strip().split()\n for event in io.rx():\n (t_mono, t_utc, channel, data) = event\n if not channel in valid_channels:\n continue\n io.tx(event)\n```\n\nSpecify channels with command line argument.\n\n```bash\n# expect 'ch1' and 'ch2' on output\nast-tool-py random --channel ch1 --channel ch2 --channel ch3 \\\n | ast-tool-py custom --script custom.py --call custom --args \"ch1 ch2\"\n```\n\n### Custom asterix processing examples\n\nNote: This project is using\n[libasterix](https://github.com/zoranbosnjak/asterix-libs/tree/main/libs/python#readme)\nfor asterix data processing. The `asterix` module is automatically imported\nand available in custom script (it does not require separate installation step).\n\nIn general, if both `rx` and `tx` are used, custom scripts are in the form similar to\nthe code snipped below. User might decide to handle exceptions differently.\n\n```python\n# -- custom.py script\n\n# custom script entry point\ndef custom(base, gen, io, args):\n cfg = setup(base, gen)\n for event in io.rx():\n try:\n result = handle_event(cfg, event)\n except Exception as e:\n raise Exception('problem: ', event, e)\n io.tx(result)\n\n# prepare configuration for handle_event function\ndef setup(base, gen):\n return (base,gen) # for example if complete module is required\n\n# actual event handler\ndef handle_event(cfg, event):\n (t_mono, t_utc, channel, data) = event\n # process data in some way...\n return event # possible modified event\n```\n\n#### Example: Print number of datablocks per datagram\n\n```python\n# -- custom.py script\n\n# custom script entry point\ndef custom(base, gen, io, args):\n cfg = setup(base)\n for event in io.rx():\n handle_event(cfg, event)\n\ndef setup(base):\n return (base.RawDatablock, base.Bits)\n\ndef handle_event(cfg, event):\n RawDatablock, Bits = cfg\n (t_mono, t_utc, channel, data) = event\n raw_datablocks = RawDatablock.parse(Bits.from_bytes(data))\n print(t_utc, channel, len(raw_datablocks))\n```\n\nTest:\n\n```bash\nast-tool-py random --channel ch1 --channel ch2 --channel ch3 \\\n | ast-tool-py custom --script custom.py --call custom\n```\n\n#### Example: Search for some specific events\n\n... for example *north marker* and *sector crossing* message in category 034.\n\n```python\n# -- custom.py script\n\n# custom script entry point\ndef custom(base, gen, io, args):\n cfg = setup((base, gen))\n for event in io.rx():\n handle_event(cfg, event)\n\ndef setup(cfg):\n base, gen = cfg\n return (base, gen.Cat_034_1_29) # use cat 034, explicit edition\n\ndef handle_event(cfg, event):\n base, Spec = cfg\n (t_mono, t_utc, channel, data) = event\n # parse to raw datablocks\n bits = base.Bits.from_bytes(data)\n raw_datablocks = base.RawDatablock.parse(bits)\n for raw_db in raw_datablocks:\n # focus on one category only\n if raw_db.get_category() != Spec.cv_category:\n continue\n # fully parse raw datablock\n records = Spec.cv_uap.parse(raw_db.get_raw_records())\n for rec in records:\n handle_record(t_utc, channel, rec)\n\ndef handle_record(t_utc, channel, rec):\n msg_type = rec.get_item('000')\n if msg_type is None:\n return\n x = msg_type.as_uint()\n s = None\n if x == 1:\n s = 'north marker'\n elif x == 2:\n s = 'sector crossing'\n if s is not None:\n print(t_utc, channel, s)\n```\n\nTest:\n\n```bash\nast-tool-py --empty-selection --cat 34 1.29 random --channel ch1 --channel ch2 --channel ch3 \\\n | ast-tool-py custom --script custom.py --call custom\n```\n\n#### Example: Reverse datablocks in each datagram\n\n```python\n# -- custom.py script\n\n# custom script entry point\ndef custom(base, gen, io, args):\n for event in io.rx():\n (t_mono, t_utc, channel, data) = event\n data2 = handle_datagram(base, data)\n event2 = (t_mono, t_utc, channel, data2)\n io.tx(event2)\n\ndef handle_datagram(base, data):\n bits = base.Bits.from_bytes(data)\n raw_datablocks = base.RawDatablock.parse(bits)\n if len(raw_datablocks) <= 1:\n return data\n return b''.join([db.unparse().to_bytes() for db in reversed(raw_datablocks)])\n```\n\nTest:\n\n```bash\nast-tool-py random | ast-tool-py custom --script custom.py --call custom\n```\n\n#### Example: Filter asterix by category\n\nAccept category number as argument, drop other categories.\n\n```python\n# -- custom.py script\n\n# custom script entry point\ndef custom(base, gen, io, args):\n cat = int(args.args.strip())\n for event in io.rx():\n (t_mono, t_utc, channel, data) = event\n bits = base.Bits.from_bytes(data)\n lst = base.RawDatablock.parse(bits)\n lst = [db for db in lst if db.get_category() == cat]\n if not lst:\n continue\n data2 = b''.join([db.unparse().to_bytes() for db in lst])\n event2 = (t_mono, t_utc, channel, data2)\n io.tx(event2)\n```\n\nRun custom filter on random data, filte out all categories but `62`.\n\n```bash\nast-tool-py random \\\n | ast-tool-py custom --script custom.py --call custom --args 62 \\\n | ast-tool-py decode --parsing-level 2 --truncate 80\n```\n\n#### Example: Modify category `062`, set `SAC/SIC` codes in item `010` to zero\n\nKeep other items unmodified.\n\n```python\n# -- custom.py script\n\n# custom script entry point\ndef custom(base, gen, io, args):\n Spec = gen.Cat_062_1_20\n for event in io.rx():\n (t_mono, t_utc, channel, data) = event\n bits = base.Bits.from_bytes(data)\n lst = base.RawDatablock.parse(bits)\n lst = [handle_datablock(Spec, db) for db in lst]\n data2 = b''.join([db.unparse().to_bytes() for db in lst])\n event2 = (t_mono, t_utc, channel, data2)\n io.tx(event2)\n\ndef handle_datablock(Spec, raw_db):\n if raw_db.get_category() != Spec.cv_category:\n return raw_db\n records1 = Spec.cv_uap.parse(raw_db.get_raw_records())\n records2 = map(handle_record, records1)\n return Spec.create(records2)\n\ndef handle_record(rec):\n return rec.set_item('010', (('SAC', 0), ('SIC', 0)))\n```\n\nTest:\n\n```bash\nast-tool-py --empty-selection --cat 62 1.20 random | \\\n ast-tool-py custom --script custom.py --call custom | \\\n ast-tool-py decode | grep \\'010\\' | grep -E 'SAC|SIC'\n```\n\n#### Example: Convert binary asterix to `json` output\n\nThis example fully decodes and converts each *event* to `json` format.\nObviously, there are multiple ways to perform such conversion, depending\non user preferences and information that needs to be preserved.\nThis is one example.\n\n```python\n# -- custom.py script\n\nimport json\n\n# custom script entry point\ndef custom(base, gen, io, args):\n cfg = setup(base, gen, args)\n for event in io.rx():\n try:\n obj = convert_event(cfg, event)\n except Exception as e:\n raise Exception('problem: ', event, e)\n print(json.dumps(obj))\n\ndef string_to_edition(ed):\n \"\"\"Convert edition string to a tuple, for example \"1.2\" -> (1,2)\"\"\"\n a,b = ed.split('.')\n return (int(a), int(b))\n\ndef get_selection(gen, empty, explicit_cats, explicit_refs):\n \"\"\"Get category selection.\"\"\"\n\n def get_latest(lst):\n return sorted(lst, key=lambda pair: string_to_edition(pair[0]), reverse=True)[0]\n\n # get latest\n cats = {cat: get_latest(gen.manifest['CATS'][cat].items())[1] for cat in gen.manifest['CATS'].keys()}\n refs = {cat: get_latest(gen.manifest['REFS'][cat].items())[1] for cat in gen.manifest['REFS'].keys()}\n\n # cleanup if required\n if empty:\n cats = {}\n refs = {}\n\n # update with explicit editions\n for (a,b,c) in [\n (cats, 'CATS', explicit_cats),\n (refs, 'REFS', explicit_refs),\n ]:\n for (cat,ed) in c:\n cat = int(cat)\n a.update({cat: manifest[b][cat][ed]})\n return {'CATS': cats, 'REFS': refs}\n\ndef get_expansions(base, gen, selection, expansions):\n result = []\n for (cat, name) in expansions:\n cat = int(cat)\n assert cat in selection['REFS'].keys(), 'REF not defined'\n spec = selection['CATS'][cat]\n subitem = spec.cv_record.spec(name)\n assert issubclass(subitem.cv_rule.cv_variation, base.Explicit)\n result.append((cat, name))\n return result\n\ndef setup(base, gen, args):\n # use command line arguments for asterix category/edition selection\n sel = get_selection(gen, args.empty_selection, args.cat or [], args.ref or [])\n exp = get_expansions(base, gen, sel, args.expand or [])\n return (base, sel, exp)\n\ndef convert_event(cfg, event):\n \"\"\"Turn 'event' to json-serializable object\"\"\"\n (t_mono, t_utc, channel, data) = event\n return {\n 'tMono': t_mono,\n 'tUtc': t_utc.isoformat(),\n 'channel': channel,\n 'data': convert_datagram(cfg, data),\n }\n\ndef convert_datagram(cfg, data):\n (base, sel, exp) = cfg\n # parse to raw datablocks\n bits = base.Bits.from_bytes(data)\n raw_datablocks = base.RawDatablock.parse(bits)\n return [convert_datablock(cfg, raw_db) for raw_db in raw_datablocks]\n\ndef convert_datablock(cfg, raw_db):\n (base, sel, exp) = cfg\n cat = raw_db.get_category()\n spec = sel['CATS'].get(cat)\n if spec is None: # asterix category unknown\n return raw_db.unparse().to_bytes().hex()\n records = spec.cv_uap.parse(raw_db.get_raw_records())\n return {\n 'cat': cat,\n 'records': [convert_record(cfg, cat, rec) for rec in records]\n }\n\ndef convert_record(cfg, cat, rec):\n (base, sel, exp) = cfg\n\n def convert_content(content):\n if isinstance(content, base.ContentRaw):\n return {'type': 'raw', 'raw': content.as_uint()}\n elif isinstance(content, base.ContentTable):\n return {'type': 'table', 'raw': content.as_uint(), 'str': content.table_value()}\n elif isinstance(content, base.ContentString):\n return {'type': 'string', 'raw': content.as_uint(), 'str': content.as_string()}\n elif isinstance(content, base.ContentInteger):\n return {'type': 'integer', 'raw': content.as_uint(), 'int': content.as_integer()}\n elif isinstance(content, base.ContentQuantity):\n return {'type': 'quantity', 'raw': content.as_uint(), 'float': content._as_quantity()\n , 'unit': content.__class__.cv_unit}\n elif isinstance(content, base.ContentBds):\n return {'type': 'bds', 'raw': content.as_uint()}\n else:\n raise Exception('internal error, unexpected content', content)\n\n def convert_rulecontent(rule):\n if isinstance(rule, base.RuleContentContextFree):\n return convert_content(rule.content)\n elif isinstance(rule, base.RuleContentDependent):\n return '(content dependent structure...)'\n else:\n raise Exception('internal error, unexpected rule', rule)\n\n def convert_variation(var, path):\n if isinstance(var, base.Element):\n return {\n 'type': 'Element',\n 'content': convert_rulecontent(var.rule),\n }\n elif isinstance (var, base.Group):\n return {\n 'type': 'Group',\n 'items': [convert_item(i, path) for i in var.arg]\n }\n elif isinstance (var, base.Extended):\n items = []\n for lists in var.arg:\n for i in lists:\n if i is not None:\n items.append((convert_item(i, path),))\n else:\n items.append('(FX)')\n return {\n 'type': 'Extended',\n 'items': items,\n }\n elif isinstance (var, base.Repetitive):\n return {\n 'type': 'Repetitive',\n 'items': [convert_variation(sub, path+[cnt]) for (cnt, sub) in enumerate(var.arg)]\n }\n elif isinstance (var, base.Explicit):\n this_item = (cat, path[0])\n b = var.get_bytes()\n content = b.hex()\n if this_item in exp:\n sub = sel['REFS'][cat].cv_expansion\n bits = base.Bits.from_bytes(b)\n (val, remaining) = sub.parse(bits)\n assert not len(remaining)\n content = convert_expansion(val, path)\n return {\n 'type': 'Explicit',\n 'content': content,\n }\n elif isinstance (var, base.Compound):\n return {\n 'type': 'Compound',\n 'items': {name: convert_rulevariation(nsp.rule, path+[name]) for name, nsp in var.arg.items()},\n }\n else:\n raise Exception('internal error, unexpected variation', var.variation, var)\n\n def convert_item(item, path):\n if isinstance(item, base.Spare):\n return item.as_uint()\n elif isinstance(item, base.Item):\n nsp = item.arg\n name = nsp.__class__.cv_name\n return (name, convert_rulevariation(nsp.rule, path+[name]))\n else:\n raise Exception('internal error, unexpected item', item)\n\n def convert_rulevariation(rule, path):\n if isinstance(rule, base.RuleVariationContextFree):\n return convert_variation(rule.variation, path)\n elif isinstance(rule, base.RuleVariationDependent):\n return '(content dependent structure...)'\n else:\n raise Exception('internal error, unexpected rule', rule)\n\n def convert_expansion(var, path):\n return {name: convert_rulevariation(nsp.rule, path+[name]) for name, nsp in var.arg.items()}\n\n result = {}\n for ui in rec.cv_items_list:\n if not issubclass(ui, base.UapItem):\n continue\n name = ui.cv_non_spare.cv_name\n nsp = rec.get_item(name)\n if nsp is None:\n continue\n result[name] = convert_rulevariation(nsp.rule, [name])\n return result\n```\n\nAs an example:\n\n- use random input data\n- use latest edition for each known category\n- in addition, decode cat021 'RE' expansion field\n\n```bash\nsudo apt install jq\nast-tool-py --expand 21 RE random --seed 0 --populate-all-items \\\n | ast-tool-py --expand 21 RE custom --script custom.py --call custom \\\n | jq\n```\n\n#### Restamp asterix data to current UTC time\n\nScenario:\n\n- read data from recording file, filter required channels\n- filter out non-asterix data\n- restamp several asterix categories to current time\n- send to udp destination, according to channel name mapping\n\n```python\n# -- custom.py script\n\nimport datetime\n\n# cleanup entry point\ndef cleanup(base, gen, io, args):\n for event in io.rx():\n (t_mono, t_utc, channel, data) = event\n bits = base.Bits.from_bytes(data)\n result = base.RawDatablock.parse(bits)\n if isinstance(result, ValueError): # skip non-asterix\n continue\n io.tx(event)\n\n# restamp entry point\ndef restamp(base, gen, io, args):\n # for each category specify (edition, item to modify)\n updates = {\n 1: (gen.Cat_001_1_4, \"141\"),\n 2: (gen.Cat_002_1_1, '030'),\n 19: (gen.Cat_019_1_3, '140'),\n 20: (gen.Cat_020_1_10, '140'),\n 34: (gen.Cat_034_1_29, '030'),\n 48: (gen.Cat_048_1_32, '140'),\n # add more categories here...\n }\n\n for event in io.rx():\n now = datetime.datetime.now(tz=datetime.timezone.utc)\n (t_mono, t_utc, channel, data) = event\n bits = base.Bits.from_bytes(data)\n result = base.RawDatablock.parse(bits)\n if isinstance(result, ValueError):\n continue\n raw_datablocks = [db for db in result if db.get_category() in updates]\n if not raw_datablocks:\n continue\n t2 = seconds_since_midnight(t_utc)\n t3 = seconds_since_midnight(now)\n lst = [handle_datablock(t2, t3, updates, db) for db in raw_datablocks]\n data2 = b''.join([db.unparse().to_bytes() for db in lst])\n event2 = (t_mono, now, channel, data2)\n io.tx(event2)\n\ndef seconds_since_midnight(t):\n \"\"\"Calculate seconds since midnight.\"\"\"\n midnight = datetime.datetime.combine(t, datetime.time(0), t.tzinfo)\n dt = t - midnight\n return dt.total_seconds()\n\ndef handle_datablock(t2, t3, updates, raw_db):\n cat = raw_db.get_category()\n (Spec, name) = updates.get(cat)\n result = Spec.cv_uap.parse(raw_db.get_raw_records())\n assert not isinstance(result, ValueError)\n records = [handle_record(t2, t3, rec, name) for rec in result]\n return Spec.create(records)\n\ndef handle_record(t2, t3, rec, name):\n # compensate original delay/jitter from the recording\n def stamp(t1):\n t1 = t1.variation.content.as_quantity()\n original_delay = t2 - t1\n return t3 - original_delay\n x = rec.get_item(name)\n if x is None:\n return rec\n return rec.set_item(name, stamp(x))\n```\n\nTest:\n\n```bash\n# create recording file\nast-tool-py random --seed 0 --sleep 0.05 --channel ch1 --channel ch2 \\\n | stdbuf -oL head -n 100 \\\n | ast-tool-py record | tee recording\n\n# inspect recording file, make sure to use valid editions in custom script\ncat recording | ast-tool-py replay | ast-tool-py inspect\n\n# replay, restamp, resend\ncat recording | ast-tool-py replay \\\n | ast-tool-py custom --script custom.py --call cleanup \\\n | ast-tool-py custom --script custom.py --call restamp \\\n | ast-tool-py to-udp \\\n --unicast ch1 127.0.0.1 56001 \\\n --unicast ch2 127.0.0.1 56002\n```\n",
"bugtrack_url": null,
"license": null,
"summary": "Asterix data processing tool",
"version": "0.16.1",
"project_urls": {
"Bug Tracker": "https://github.com/zoranbosnjak/ast-tool/issues",
"Homepage": "https://github.com/zoranbosnjak/asterix-tool/tree/master/ast-tool-py#readme"
},
"split_keywords": [
"asterix",
" eurocontrol",
" radar"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "11a2b4cdf09466c0cb99a46254ec371cb61cd1d9db5df1ab20b6a74724b6ddaa",
"md5": "c73114bde908bab289376cc53ebbe13d",
"sha256": "4378819f5325fb23445f47feed1fe35983715d4c8d7ac4d156ce6b9e0d0ea56c"
},
"downloads": -1,
"filename": "ast_tool_py-0.16.1-py3-none-any.whl",
"has_sig": false,
"md5_digest": "c73114bde908bab289376cc53ebbe13d",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.8",
"size": 20762,
"upload_time": "2024-09-01T17:43:02",
"upload_time_iso_8601": "2024-09-01T17:43:02.411986Z",
"url": "https://files.pythonhosted.org/packages/11/a2/b4cdf09466c0cb99a46254ec371cb61cd1d9db5df1ab20b6a74724b6ddaa/ast_tool_py-0.16.1-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "2e96c7ca5141cff153cf88967d67cdca19d809fe715501ab07883a59236685be",
"md5": "1a7ad7b9d68a1bef4379a1c28b4cc6ae",
"sha256": "f075485d2b3537ca4ee166991c0de676846dd93e03e8f246e9867f2b8355bd6b"
},
"downloads": -1,
"filename": "ast_tool_py-0.16.1.tar.gz",
"has_sig": false,
"md5_digest": "1a7ad7b9d68a1bef4379a1c28b4cc6ae",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.8",
"size": 27818,
"upload_time": "2024-09-01T17:43:04",
"upload_time_iso_8601": "2024-09-01T17:43:04.472387Z",
"url": "https://files.pythonhosted.org/packages/2e/96/c7ca5141cff153cf88967d67cdca19d809fe715501ab07883a59236685be/ast_tool_py-0.16.1.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-09-01 17:43:04",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "zoranbosnjak",
"github_project": "ast-tool",
"github_not_found": true,
"lcname": "ast-tool-py"
}