synpp


Namesynpp JSON
Version 1.6.0 PyPI version JSON
download
home_pagehttps://github.com/eqasim-org/synpp
SummarySynthetic population pipeline package for eqasim
upload_time2025-07-08 09:38:50
maintainerNone
docs_urlNone
authorSebastian Hörl
requires_python>=3.0
licenseNone
keywords pipeline automation synthetic population dependency management transport
VCS
bugtrack_url
requirements networkx PyYAML pyzmq
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Synthetic Population Pipeline (synpp)

[![Build Status](https://travis-ci.org/eqasim-org/synpp.svg?branch=develop)](https://travis-ci.org/eqasim-org/synpp)

The *synpp* module is a tool to chain different stages of a (population)
synthesis pipeline. This means that self-contained pieces of code can be
run, which are dependent on the outputs of other self-contained pieces
of code. Those pieces, or steps, are called *stages* in this module.

The following will describe the components of the pipeline and how it can
be set up and configured. Scroll to the bottom to find a full example of such
a pipeline which automatically downloads NYC taxi data sets, merges them together
and calculates the average vehicle occupancy during a predefined period.

## Installation

The `synpp` package releases can be installed via `pip`:

```sh
pip install synpp
```

Currently, version `1.6.0` is the active release version. Alternatively, you can
clone the `develop` branch of this repository to use the development version.
It can be installed by calling

```
pip install .
```

inside of the repository directoy.

## Concepts

A typical chain of stages could, for instance, be: **(C1)** load raw census data,
**(C2)** clean raw census data *(dependent on C1)*, **(H1)** load raw household travel survey data,
**(H2)** clean survey data *(dependent on C2)*, **(P1)** merge census *(C1)* and survey *(H2)* data,
**(P2)** generate a synthetic population from merged data *(P1)*.

In *synpp* each *stage* is defined by:

* A *descriptor*, which contains the stage logic.
* *Configuration options* that parameterize each *stage*.

### Defining a descriptor

A descriptor can be defined in its compact form or in its full form.
Both work in the same way and can be used interchangeably in most cases.

In this readme the full form is preferred to explain each of `synpp`'s feature as it is more expressive,
but towards the end a closer look at the compact form is also provided.

A descriptor in its full form looks like:  

```python
def configure(context):
  pass

def execute(context):
  pass

def validate(context):
  pass
```

These functions are provided in a Python object, such as a module,
a class or a class's instance.
`synpp` expects either a String containing the path to the object,
such as "pkg.subpkg.module", or the instantiated object directly.  

In its compact form, the stage is defined as a function, and looks like:

```python
@synpp.stage
def stage_to_run():
  pass
```

Where the `@stage` decorator informs `synpp` that it should handle
this function as a stage and how it should do it.

### Configuration and Parameterization

Whenever the pipeline explores a stage, *configure* is called first. Note that
in the example above we use a Python module, but the same procedure would work
analogously with a class. In *configure* one can tell the pipeline what the *stage*
expects in terms of other input *stages* and in terms of
*configuration options*:

```python
def configure(context):
  # Expect an output directory
  value = context.config("output_path")

  # Expect a random seed
  value = context.config("random_seed")

  # Expect a certain stage (no return value)
  context.stage("my.pipeline.raw_data")
```

We could add this stage (let's call it `my.pipeline.raw_data`)
as a dependency to another one. However, as we did not define a default
value with the `config` method, we need to explicitly set one, like so:

```python
def configure(context):
  context.stage("my.pipeline.raw_data", { "random_seed": 1234 })
```

Note that it is even possible to build recursive chains of stages using only
one stage definition:

```python
def configure(context):
  i = context.config("i")

  if i > 0:
    context.stage("this.stage", { "i": i - 1 })
```

Configuration options can also be defined globally in the pipeline. In case
no default value is given for an option in `configure` and in case that no
specific value is passed to the stage, a global configuration that is specific
to the pipeline will be used to look up the value.

Note that the call to `context.config` can have a default value as the second argument, and you can set `volatile = True` which means that the stage will not be reevaluated if this config value changes. Such a behavior is useful if you are rather changing the number of cores or the available memory, but don't want to rerun the parts of the code that already passed with the given values.

### Execution

The requested configuration values and stages are afterwards available
to the `execute` step of a *stage*. There those values can be used to do the
"heavy work" of the stage. As the `configure` step already defined what kind
of values to expect, we can be sure that those values and dependencies are
present once `execute` is called.

```python
def execute(context):
  # Load some data from another stage
  df = context.stage("my.pipeline.census.raw")

  df = df.dropna()
  df["age"] = df["age"].astype(int)

  # We could access some values if we wanted
  value = context.config("...")

  return df
```

Note that the `execute` step returns a value. This value will be *pickled* (see
*pickle* package of Python) and cached on the hard drive. This means that whenever
the output of this stage is requested by another stage, it doesn't need to be
run again. The pipeline can simply load the cached result from hard drive.

If one has a very complex pipeline with many stages this means that changes in
one stage will likely not lead to a situation where one needs to re-run the
whole pipeline, but only a fraction. The *synpp* framework has intelligent
explorations algorithms included which figure out automatically, which
*stages* need to be re-run.

### Running a pipeline

A pipeline can be started using the `synpp.run` method. A typical run would
look like this:

```python
config = { "random_seed": 1234 }
working_directory = "~/pipeline/cache"

synpp.run([
    { "descriptor": "my.pipeline.final_population" },
    { "descriptor": "my.pipeline.paper_analysis", "config": { "font_size": 12 } }
], config = config, working_directory = working_directory)
```

Here we call the *stage* defined by the module `my.pipeline.final_population`
which should be available in the Python path. And we also want to run the
`my.pipeline.paper_analysis` path with a font size parameter of `12`. Note that
in both cases we could also have based the bare Python module objects instead
of strings.

The pipeline will now figure out how to run those *stages*. Probably they have
dependencies and the analysis *stage* may even depend on the other one. Therefore,
*synpp* explores the tree of dependencies as follows:

* Consider the requested stages (two in this case)
* Step by step, go through the dependencies of those stages
* Then again, go through the dependencies of all added stages, and so on

By that the pipeline traverses the whole tree of dependencies as they are defined
by the `configure` steps of all stages. At the same time it collects information
about which configuration options and parameters are required by each stage. Note
that a stage can occur twice in this dependency tree if it has different
parameters.

After constructing a tree of *stages*, *synpp* devalidates some of them according
to the following scheme. A *stage* is devalidated if ...

- ... it is requested by the `run` call (and `rerun_required` is set to `True`, the default)
- ... it is new (no meta data from a previous call is present)
- ... the code of the stage has changed (verified with inspection)
- ... if at least one of the requested configuration options has changed
- ... if at least one dependency has been re-run since the last run of the stage
- ... if list of dependencies has changed
- ... if manual *validation* of the stage has failed (see below)
- ... if any ascendant of a stage has been devalidated

This list of conditions makes sure that in almost any case of pipeline
modification we end up in a consistent situation (though we cannot prove it).
The only measure that may be important to enforce 'by convention' is to
*always run a stage after the code has been modified*. Though even this can
be automated.

### Validation

Each *stage* has an additional `validate` step, which also receives the
configuration options and the parameters. Its purpose is to return a hash
value that represents the environment of the *stage*. To learn about the concept
in general, search for "md5 hash", for instance. The idea is the following:
After the `execute` step, the `validate` step is called and
 it will return a certain value. Next time the pipeline
is resolved the `validate` step is called during devalidation, i.e. before
the stage is actually *executed*. If the return value of `validate` now differs
from what it was before, the stage will be devalidated.

This is useful to check the integrity of data that is not generated inside of
the pipeline but comes from the outside, for instance:

```python
def configure(context):
  context.config("input_path")

def validate(context):
  path = context.config("input_path")
  filesize = get_filesize(path)

  # If the file size has changed, the file must have changed,
  # hence we want to run the stage again.
  return filesize

def execute(context):
  pass # Do something with the file
```

### Cache paths

Sometimes, results of a *stage* are not easily representable in Python. Even
more, stages may call Java or Shell scripts which simply generate an output
file. For these cases each stage has its own *cache path*. It can be accessed
through the stage context:

```python
def execute(context):
  # In this case we write a file to the cache path of the current stage
  with open("%s/myfile.txt" % context.path()) as f:
    f.write("my content")

  # In this case we read a file from the cache path of another stage
  with open("%s/otherfile.txt" % context.path("my.other.stage")) as f:
    value = f.read()
```

As the example shows, we can also access cache paths of other stages. The pipeline
will make sure that you only have access to the cache path of stages that
have been defined as dependencies before. Note that the pipeline cannot enforce
that one stage is not corrupting the cache path of another stage. Therefore,
by convention, a stage should never *write* to the cache path of another stage.

### Aliases

Once a pipeline has been defined, the structure is relatively rigid as stages
are referenced by their names. To provide more flexibility, it is possible to
define aliases, for instance:

```python
synpp.run(..., aliases = {
  "my.pipeline.final_population": "my.pipeline.final_population_replacement"
})
```

Whenever `my.pipeline.final_population` is requested, `my.pipeline.final_population_replacement`
will be used instead. Note that this allows to define entirely virtual stages that
are referenced from other stages and which are only bound to a specific execution
stage when running the pipeline (see example above).

### Parallel execution

The *synpp* package comes with some simplified ways of parallelizing code,
which are built on top of the `multiprocessing` package. To set up a parallel
routine, one can follow the following pattern:

```python
def run_parallel(context, x):
  return x**2 + context.data("y")

def execute(context):
  data = { "y": 5 }

  with context.parallel(data) as parallel:
    result = parallel.map(run_parallel, [1, 2, 3, 4, 5])
```

This approach looks similar to the `Pool` object of `multiprocessing` but has
some simplifications. First, the first argument of the parallel routine is a
context object, which provides configuration and parameters. Furthermore, it
provides data, which has been passed before in the `execute` function. This
simplifies passing data to all parallel threads considerably to the more
flexible approach in `multiprocessing`. Otherwise, the `parallel` object
provides most of the functionality of `Pool`, like, `map`, `async_map`,
`imap`, and `unordered_imap`.

### Info

While running the pipeline a lot of additional information may be interesting,
like how many samples of a data set have been discarded in a certain stage. However,
they often would only be used at the very end of the pipeline when maybe a paper,
a report or some explanatory graphics are generated. For that, the pipeline
provides the `set_info` method:

```python
def execute(context):
  # ...
  context.set_info("dropped_samples", number_of_dropped_samples)
  # ...
```

The information can later be retrieved from another stage (which has the
stage in question as a dependency):

```python
def execute(context):
  # ...
  value = context.get_info("my.other.stage", "dropped_samples")
  # ...
```

Note that the *info* functionality should only be used for light-weight
information like integers, short strings, etc.

### Progress

The *synpp* package provides functionality to show the progress of a stage
similar to `tqdm`. However, `tqdm` tends to spam the console output which is
especially undesired if pipelines have long runtimes and run, for instance, in
Continuous Integration environments. Therefore, *synpp* provides its own
functionality, although `tqdm` could still be used:

```python
def execute(context):
  # As a
  with context.progress(label = "My progress...", total = 100) as progress:
    i = 0

    while i < 100:
      progress.update()
      i += 1

  for i in context.progress(range(100)):
    pass
```

### Compact stage definition

As quickly [introduced before](#defining-a-descriptor), stages can also be defined in a compact form.
The example offered is the simplest possible, where a stage takes no configuration parameters.
Consider now the more elaborate setting:

```python
@synpp.stage(loaded_census="my.pipeline.census.raw", sample_size="census_sample_size")
def clean_census(loaded_census, sample_size=0.1):
    ...
```

When `synpp` sees `clean_census`, it will under the hood convert it to a stage in its full form.
Basically `@synpp.stage` says how the stage should be configured and the function defines the stage's logic.
To put clearly, the stage above is converted by `synpp` to something like:

```python
def configure(context):
  context.stage("my.pipeline.census.raw")
  context.config("census_sample_size", default=0.1)

def execute(context):
  loaded_census = context.stage("my.pipeline.census.raw")
  sample_size = context.config("census_sample_size")
  return clean_census(loaded_census, sample_size)
```

As you may have noticed, `census_sample_size` is a config option defined in the config file,
and in case it isn't found, `synpp` will simply use the function's default.
Notice also that the following wouldn't work: `@synpp.stage(..., sample_size=0.2)`,
since `synpp` would try to find a parameter called "0.2" in the config file that doesn't exist.

In case a parameterized stage must be passed as dependency, this can be performed
in a similar way, by simply wrapping the stage in the `synpp.stage()` decorator.
Following the previous example, we may replace the first argument with
`loaded_census=synpp.stage("my.pipeline.census.raw", file_path="path/to/alternative/census")`.

This compact way of defining stages does not support all functionality, for instance custom stage devalidation,
but functionality that requires the context object are also possible via the helper method `synpp.get_context()`.   

### Command-line tool

The `synpp` pipeline comes with a command line tool, which can be called like

```sh
python3 -m synpp [config_path]
```

If the config path is not given, it will assume `config.yml`. This file should
contain everything to run a pipeline. A simple version would look like this:

```yaml
# General pipeline settings
working_directory: /path/to/my/working_directory

# Requested stages
run:
  - my_first_module.my_first_stage
  - my_first_parameterized_stage:
    param1: 123
    param2: 345

# These are configuration options that are used in the pipeline
config:
  my_option: 123
```

It receives the working directory, a list of stages (which may be parameterized)
and all configuration options. The stages listed above should be available
as Python modules or classes. Furthermore, `aliases` can be defined as a top-level
element of the file.

You can override settings via the command line. To override the working directory, use:

```bash
--working-directory directory
```

To define the list of stages to run, provide individual `--run` options. They will fully replace the stages defined in the configuration:

```bash
--run stage1 --run stage2
```

To override configruation, options, prepend them with two dashes:

``bash
--my.config.option value
``

If a value is already present in the config file, the provided value will automatically be cast to the same data type. If the value does not exist yet, any argument is treated as a string.

## NYC Taxi Example

This repository contains an example of the pipline. To run it, you will need
`pandas` as an additional Python dependency. For testing, you can clone this
repository to any directory on your machine. Inside the repository directory
you can find the `example` directory. If you did not install `synpp` yet,
you can do this by executing

```sh
pip install .
```

inside of the repository directory. Afterwards, open `examples/config.yml`
and adjust the `working_directory` path. This is a path that should exist on
your machine and it should be empty. The best is if you simply create a new
folder and add the path in `config.yml`.

You can now go to `examples` and call the pipeline code:

```sh
cd examples
python3 -m synpp
```

It will automatically discover `config.yml` (but you could path a different
config file path manually as a command line argument). It will then download
the NYC taxi data for January, February and March 2018 (see configuration
options in `config.yml`). Note that this is happening in one stage for which
you can find the code in `nyc_taxi.download`. It is parameterized by a month
and a year to download the respective data set. These data sets then go into
`nyc_taxi.aggregate`, where they are merged together. Finally, an average
occupancy value is printed out in `nyc_taxi.print_occupancy`. So the dependency
structure is as follows:

```
nyc_taxi.aggregate depends on multiple nyc_taxi.download(year, month)
nyc_taxi.print_occupancy depends on nyc_taxi.aggregate
```

After one successful run of the pipeline you can start it again. You will notice
that the pipeline does *not* download the data again, because nothing has changed
for those stages. However, if you would change the requested months in `config.yml`
the pipeline may download the additional data sets.

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/eqasim-org/synpp",
    "name": "synpp",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.0",
    "maintainer_email": null,
    "keywords": "pipeline automation synthetic population dependency management transport",
    "author": "Sebastian H\u00f6rl",
    "author_email": "hoerl.sebastian@gmail.com",
    "download_url": "https://files.pythonhosted.org/packages/fc/35/e7e21b1fb89cbcae78a8dc89c68c34645ef69505c63fadcb769f3cd0323d/synpp-1.6.0.tar.gz",
    "platform": null,
    "description": "# Synthetic Population Pipeline (synpp)\n\n[![Build Status](https://travis-ci.org/eqasim-org/synpp.svg?branch=develop)](https://travis-ci.org/eqasim-org/synpp)\n\nThe *synpp* module is a tool to chain different stages of a (population)\nsynthesis pipeline. This means that self-contained pieces of code can be\nrun, which are dependent on the outputs of other self-contained pieces\nof code. Those pieces, or steps, are called *stages* in this module.\n\nThe following will describe the components of the pipeline and how it can\nbe set up and configured. Scroll to the bottom to find a full example of such\na pipeline which automatically downloads NYC taxi data sets, merges them together\nand calculates the average vehicle occupancy during a predefined period.\n\n## Installation\n\nThe `synpp` package releases can be installed via `pip`:\n\n```sh\npip install synpp\n```\n\nCurrently, version `1.6.0` is the active release version. Alternatively, you can\nclone the `develop` branch of this repository to use the development version.\nIt can be installed by calling\n\n```\npip install .\n```\n\ninside of the repository directoy.\n\n## Concepts\n\nA typical chain of stages could, for instance, be: **(C1)** load raw census data,\n**(C2)** clean raw census data *(dependent on C1)*, **(H1)** load raw household travel survey data,\n**(H2)** clean survey data *(dependent on C2)*, **(P1)** merge census *(C1)* and survey *(H2)* data,\n**(P2)** generate a synthetic population from merged data *(P1)*.\n\nIn *synpp* each *stage* is defined by:\n\n* A *descriptor*, which contains the stage logic.\n* *Configuration options* that parameterize each *stage*.\n\n### Defining a descriptor\n\nA descriptor can be defined in its compact form or in its full form.\nBoth work in the same way and can be used interchangeably in most cases.\n\nIn this readme the full form is preferred to explain each of `synpp`'s feature as it is more expressive,\nbut towards the end a closer look at the compact form is also provided.\n\nA descriptor in its full form looks like:  \n\n```python\ndef configure(context):\n  pass\n\ndef execute(context):\n  pass\n\ndef validate(context):\n  pass\n```\n\nThese functions are provided in a Python object, such as a module,\na class or a class's instance.\n`synpp` expects either a String containing the path to the object,\nsuch as \"pkg.subpkg.module\", or the instantiated object directly.  \n\nIn its compact form, the stage is defined as a function, and looks like:\n\n```python\n@synpp.stage\ndef stage_to_run():\n  pass\n```\n\nWhere the `@stage` decorator informs `synpp` that it should handle\nthis function as a stage and how it should do it.\n\n### Configuration and Parameterization\n\nWhenever the pipeline explores a stage, *configure* is called first. Note that\nin the example above we use a Python module, but the same procedure would work\nanalogously with a class. In *configure* one can tell the pipeline what the *stage*\nexpects in terms of other input *stages* and in terms of\n*configuration options*:\n\n```python\ndef configure(context):\n  # Expect an output directory\n  value = context.config(\"output_path\")\n\n  # Expect a random seed\n  value = context.config(\"random_seed\")\n\n  # Expect a certain stage (no return value)\n  context.stage(\"my.pipeline.raw_data\")\n```\n\nWe could add this stage (let's call it `my.pipeline.raw_data`)\nas a dependency to another one. However, as we did not define a default\nvalue with the `config` method, we need to explicitly set one, like so:\n\n```python\ndef configure(context):\n  context.stage(\"my.pipeline.raw_data\", { \"random_seed\": 1234 })\n```\n\nNote that it is even possible to build recursive chains of stages using only\none stage definition:\n\n```python\ndef configure(context):\n  i = context.config(\"i\")\n\n  if i > 0:\n    context.stage(\"this.stage\", { \"i\": i - 1 })\n```\n\nConfiguration options can also be defined globally in the pipeline. In case\nno default value is given for an option in `configure` and in case that no\nspecific value is passed to the stage, a global configuration that is specific\nto the pipeline will be used to look up the value.\n\nNote that the call to `context.config` can have a default value as the second argument, and you can set `volatile = True` which means that the stage will not be reevaluated if this config value changes. Such a behavior is useful if you are rather changing the number of cores or the available memory, but don't want to rerun the parts of the code that already passed with the given values.\n\n### Execution\n\nThe requested configuration values and stages are afterwards available\nto the `execute` step of a *stage*. There those values can be used to do the\n\"heavy work\" of the stage. As the `configure` step already defined what kind\nof values to expect, we can be sure that those values and dependencies are\npresent once `execute` is called.\n\n```python\ndef execute(context):\n  # Load some data from another stage\n  df = context.stage(\"my.pipeline.census.raw\")\n\n  df = df.dropna()\n  df[\"age\"] = df[\"age\"].astype(int)\n\n  # We could access some values if we wanted\n  value = context.config(\"...\")\n\n  return df\n```\n\nNote that the `execute` step returns a value. This value will be *pickled* (see\n*pickle* package of Python) and cached on the hard drive. This means that whenever\nthe output of this stage is requested by another stage, it doesn't need to be\nrun again. The pipeline can simply load the cached result from hard drive.\n\nIf one has a very complex pipeline with many stages this means that changes in\none stage will likely not lead to a situation where one needs to re-run the\nwhole pipeline, but only a fraction. The *synpp* framework has intelligent\nexplorations algorithms included which figure out automatically, which\n*stages* need to be re-run.\n\n### Running a pipeline\n\nA pipeline can be started using the `synpp.run` method. A typical run would\nlook like this:\n\n```python\nconfig = { \"random_seed\": 1234 }\nworking_directory = \"~/pipeline/cache\"\n\nsynpp.run([\n    { \"descriptor\": \"my.pipeline.final_population\" },\n    { \"descriptor\": \"my.pipeline.paper_analysis\", \"config\": { \"font_size\": 12 } }\n], config = config, working_directory = working_directory)\n```\n\nHere we call the *stage* defined by the module `my.pipeline.final_population`\nwhich should be available in the Python path. And we also want to run the\n`my.pipeline.paper_analysis` path with a font size parameter of `12`. Note that\nin both cases we could also have based the bare Python module objects instead\nof strings.\n\nThe pipeline will now figure out how to run those *stages*. Probably they have\ndependencies and the analysis *stage* may even depend on the other one. Therefore,\n*synpp* explores the tree of dependencies as follows:\n\n* Consider the requested stages (two in this case)\n* Step by step, go through the dependencies of those stages\n* Then again, go through the dependencies of all added stages, and so on\n\nBy that the pipeline traverses the whole tree of dependencies as they are defined\nby the `configure` steps of all stages. At the same time it collects information\nabout which configuration options and parameters are required by each stage. Note\nthat a stage can occur twice in this dependency tree if it has different\nparameters.\n\nAfter constructing a tree of *stages*, *synpp* devalidates some of them according\nto the following scheme. A *stage* is devalidated if ...\n\n- ... it is requested by the `run` call (and `rerun_required` is set to `True`, the default)\n- ... it is new (no meta data from a previous call is present)\n- ... the code of the stage has changed (verified with inspection)\n- ... if at least one of the requested configuration options has changed\n- ... if at least one dependency has been re-run since the last run of the stage\n- ... if list of dependencies has changed\n- ... if manual *validation* of the stage has failed (see below)\n- ... if any ascendant of a stage has been devalidated\n\nThis list of conditions makes sure that in almost any case of pipeline\nmodification we end up in a consistent situation (though we cannot prove it).\nThe only measure that may be important to enforce 'by convention' is to\n*always run a stage after the code has been modified*. Though even this can\nbe automated.\n\n### Validation\n\nEach *stage* has an additional `validate` step, which also receives the\nconfiguration options and the parameters. Its purpose is to return a hash\nvalue that represents the environment of the *stage*. To learn about the concept\nin general, search for \"md5 hash\", for instance. The idea is the following:\nAfter the `execute` step, the `validate` step is called and\n it will return a certain value. Next time the pipeline\nis resolved the `validate` step is called during devalidation, i.e. before\nthe stage is actually *executed*. If the return value of `validate` now differs\nfrom what it was before, the stage will be devalidated.\n\nThis is useful to check the integrity of data that is not generated inside of\nthe pipeline but comes from the outside, for instance:\n\n```python\ndef configure(context):\n  context.config(\"input_path\")\n\ndef validate(context):\n  path = context.config(\"input_path\")\n  filesize = get_filesize(path)\n\n  # If the file size has changed, the file must have changed,\n  # hence we want to run the stage again.\n  return filesize\n\ndef execute(context):\n  pass # Do something with the file\n```\n\n### Cache paths\n\nSometimes, results of a *stage* are not easily representable in Python. Even\nmore, stages may call Java or Shell scripts which simply generate an output\nfile. For these cases each stage has its own *cache path*. It can be accessed\nthrough the stage context:\n\n```python\ndef execute(context):\n  # In this case we write a file to the cache path of the current stage\n  with open(\"%s/myfile.txt\" % context.path()) as f:\n    f.write(\"my content\")\n\n  # In this case we read a file from the cache path of another stage\n  with open(\"%s/otherfile.txt\" % context.path(\"my.other.stage\")) as f:\n    value = f.read()\n```\n\nAs the example shows, we can also access cache paths of other stages. The pipeline\nwill make sure that you only have access to the cache path of stages that\nhave been defined as dependencies before. Note that the pipeline cannot enforce\nthat one stage is not corrupting the cache path of another stage. Therefore,\nby convention, a stage should never *write* to the cache path of another stage.\n\n### Aliases\n\nOnce a pipeline has been defined, the structure is relatively rigid as stages\nare referenced by their names. To provide more flexibility, it is possible to\ndefine aliases, for instance:\n\n```python\nsynpp.run(..., aliases = {\n  \"my.pipeline.final_population\": \"my.pipeline.final_population_replacement\"\n})\n```\n\nWhenever `my.pipeline.final_population` is requested, `my.pipeline.final_population_replacement`\nwill be used instead. Note that this allows to define entirely virtual stages that\nare referenced from other stages and which are only bound to a specific execution\nstage when running the pipeline (see example above).\n\n### Parallel execution\n\nThe *synpp* package comes with some simplified ways of parallelizing code,\nwhich are built on top of the `multiprocessing` package. To set up a parallel\nroutine, one can follow the following pattern:\n\n```python\ndef run_parallel(context, x):\n  return x**2 + context.data(\"y\")\n\ndef execute(context):\n  data = { \"y\": 5 }\n\n  with context.parallel(data) as parallel:\n    result = parallel.map(run_parallel, [1, 2, 3, 4, 5])\n```\n\nThis approach looks similar to the `Pool` object of `multiprocessing` but has\nsome simplifications. First, the first argument of the parallel routine is a\ncontext object, which provides configuration and parameters. Furthermore, it\nprovides data, which has been passed before in the `execute` function. This\nsimplifies passing data to all parallel threads considerably to the more\nflexible approach in `multiprocessing`. Otherwise, the `parallel` object\nprovides most of the functionality of `Pool`, like, `map`, `async_map`,\n`imap`, and `unordered_imap`.\n\n### Info\n\nWhile running the pipeline a lot of additional information may be interesting,\nlike how many samples of a data set have been discarded in a certain stage. However,\nthey often would only be used at the very end of the pipeline when maybe a paper,\na report or some explanatory graphics are generated. For that, the pipeline\nprovides the `set_info` method:\n\n```python\ndef execute(context):\n  # ...\n  context.set_info(\"dropped_samples\", number_of_dropped_samples)\n  # ...\n```\n\nThe information can later be retrieved from another stage (which has the\nstage in question as a dependency):\n\n```python\ndef execute(context):\n  # ...\n  value = context.get_info(\"my.other.stage\", \"dropped_samples\")\n  # ...\n```\n\nNote that the *info* functionality should only be used for light-weight\ninformation like integers, short strings, etc.\n\n### Progress\n\nThe *synpp* package provides functionality to show the progress of a stage\nsimilar to `tqdm`. However, `tqdm` tends to spam the console output which is\nespecially undesired if pipelines have long runtimes and run, for instance, in\nContinuous Integration environments. Therefore, *synpp* provides its own\nfunctionality, although `tqdm` could still be used:\n\n```python\ndef execute(context):\n  # As a\n  with context.progress(label = \"My progress...\", total = 100) as progress:\n    i = 0\n\n    while i < 100:\n      progress.update()\n      i += 1\n\n  for i in context.progress(range(100)):\n    pass\n```\n\n### Compact stage definition\n\nAs quickly [introduced before](#defining-a-descriptor), stages can also be defined in a compact form.\nThe example offered is the simplest possible, where a stage takes no configuration parameters.\nConsider now the more elaborate setting:\n\n```python\n@synpp.stage(loaded_census=\"my.pipeline.census.raw\", sample_size=\"census_sample_size\")\ndef clean_census(loaded_census, sample_size=0.1):\n    ...\n```\n\nWhen `synpp` sees `clean_census`, it will under the hood convert it to a stage in its full form.\nBasically `@synpp.stage` says how the stage should be configured and the function defines the stage's logic.\nTo put clearly, the stage above is converted by `synpp` to something like:\n\n```python\ndef configure(context):\n  context.stage(\"my.pipeline.census.raw\")\n  context.config(\"census_sample_size\", default=0.1)\n\ndef execute(context):\n  loaded_census = context.stage(\"my.pipeline.census.raw\")\n  sample_size = context.config(\"census_sample_size\")\n  return clean_census(loaded_census, sample_size)\n```\n\nAs you may have noticed, `census_sample_size` is a config option defined in the config file,\nand in case it isn't found, `synpp` will simply use the function's default.\nNotice also that the following wouldn't work: `@synpp.stage(..., sample_size=0.2)`,\nsince `synpp` would try to find a parameter called \"0.2\" in the config file that doesn't exist.\n\nIn case a parameterized stage must be passed as dependency, this can be performed\nin a similar way, by simply wrapping the stage in the `synpp.stage()` decorator.\nFollowing the previous example, we may replace the first argument with\n`loaded_census=synpp.stage(\"my.pipeline.census.raw\", file_path=\"path/to/alternative/census\")`.\n\nThis compact way of defining stages does not support all functionality, for instance custom stage devalidation,\nbut functionality that requires the context object are also possible via the helper method `synpp.get_context()`.   \n\n### Command-line tool\n\nThe `synpp` pipeline comes with a command line tool, which can be called like\n\n```sh\npython3 -m synpp [config_path]\n```\n\nIf the config path is not given, it will assume `config.yml`. This file should\ncontain everything to run a pipeline. A simple version would look like this:\n\n```yaml\n# General pipeline settings\nworking_directory: /path/to/my/working_directory\n\n# Requested stages\nrun:\n  - my_first_module.my_first_stage\n  - my_first_parameterized_stage:\n    param1: 123\n    param2: 345\n\n# These are configuration options that are used in the pipeline\nconfig:\n  my_option: 123\n```\n\nIt receives the working directory, a list of stages (which may be parameterized)\nand all configuration options. The stages listed above should be available\nas Python modules or classes. Furthermore, `aliases` can be defined as a top-level\nelement of the file.\n\nYou can override settings via the command line. To override the working directory, use:\n\n```bash\n--working-directory directory\n```\n\nTo define the list of stages to run, provide individual `--run` options. They will fully replace the stages defined in the configuration:\n\n```bash\n--run stage1 --run stage2\n```\n\nTo override configruation, options, prepend them with two dashes:\n\n``bash\n--my.config.option value\n``\n\nIf a value is already present in the config file, the provided value will automatically be cast to the same data type. If the value does not exist yet, any argument is treated as a string.\n\n## NYC Taxi Example\n\nThis repository contains an example of the pipline. To run it, you will need\n`pandas` as an additional Python dependency. For testing, you can clone this\nrepository to any directory on your machine. Inside the repository directory\nyou can find the `example` directory. If you did not install `synpp` yet,\nyou can do this by executing\n\n```sh\npip install .\n```\n\ninside of the repository directory. Afterwards, open `examples/config.yml`\nand adjust the `working_directory` path. This is a path that should exist on\nyour machine and it should be empty. The best is if you simply create a new\nfolder and add the path in `config.yml`.\n\nYou can now go to `examples` and call the pipeline code:\n\n```sh\ncd examples\npython3 -m synpp\n```\n\nIt will automatically discover `config.yml` (but you could path a different\nconfig file path manually as a command line argument). It will then download\nthe NYC taxi data for January, February and March 2018 (see configuration\noptions in `config.yml`). Note that this is happening in one stage for which\nyou can find the code in `nyc_taxi.download`. It is parameterized by a month\nand a year to download the respective data set. These data sets then go into\n`nyc_taxi.aggregate`, where they are merged together. Finally, an average\noccupancy value is printed out in `nyc_taxi.print_occupancy`. So the dependency\nstructure is as follows:\n\n```\nnyc_taxi.aggregate depends on multiple nyc_taxi.download(year, month)\nnyc_taxi.print_occupancy depends on nyc_taxi.aggregate\n```\n\nAfter one successful run of the pipeline you can start it again. You will notice\nthat the pipeline does *not* download the data again, because nothing has changed\nfor those stages. However, if you would change the requested months in `config.yml`\nthe pipeline may download the additional data sets.\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Synthetic population pipeline package for eqasim",
    "version": "1.6.0",
    "project_urls": {
        "Homepage": "https://github.com/eqasim-org/synpp"
    },
    "split_keywords": [
        "pipeline",
        "automation",
        "synthetic",
        "population",
        "dependency",
        "management",
        "transport"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "f6346bb854e871d23a8253c25bc4b6ff11f9723b0509396e3b6bd180fe3703dc",
                "md5": "81555bcac12f24b751ae90a53be567a2",
                "sha256": "6d56effcdb950bae47b779af44302ac403e3256389c6ad227dec46ce4ebc8034"
            },
            "downloads": -1,
            "filename": "synpp-1.6.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "81555bcac12f24b751ae90a53be567a2",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.0",
            "size": 22125,
            "upload_time": "2025-07-08T09:38:48",
            "upload_time_iso_8601": "2025-07-08T09:38:48.388459Z",
            "url": "https://files.pythonhosted.org/packages/f6/34/6bb854e871d23a8253c25bc4b6ff11f9723b0509396e3b6bd180fe3703dc/synpp-1.6.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "fc35e7e21b1fb89cbcae78a8dc89c68c34645ef69505c63fadcb769f3cd0323d",
                "md5": "a29747ca5ef87b4ac6da0c593755a1f8",
                "sha256": "554dce68fb7bacbcfa6c4c558b6ea0c1f888490881a100f4475060b90552b163"
            },
            "downloads": -1,
            "filename": "synpp-1.6.0.tar.gz",
            "has_sig": false,
            "md5_digest": "a29747ca5ef87b4ac6da0c593755a1f8",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.0",
            "size": 33201,
            "upload_time": "2025-07-08T09:38:50",
            "upload_time_iso_8601": "2025-07-08T09:38:50.092177Z",
            "url": "https://files.pythonhosted.org/packages/fc/35/e7e21b1fb89cbcae78a8dc89c68c34645ef69505c63fadcb769f3cd0323d/synpp-1.6.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-08 09:38:50",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "eqasim-org",
    "github_project": "synpp",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "requirements": [
        {
            "name": "networkx",
            "specs": [
                [
                    ">=",
                    "2.4"
                ]
            ]
        },
        {
            "name": "PyYAML",
            "specs": [
                [
                    ">=",
                    "5.3.1"
                ]
            ]
        },
        {
            "name": "pyzmq",
            "specs": [
                [
                    ">=",
                    "19.0.0"
                ]
            ]
        }
    ],
    "lcname": "synpp"
}
        
Elapsed time: 1.71484s