ckanext-ingest


Nameckanext-ingest JSON
Version 1.4.1 PyPI version JSON
download
home_pagehttps://github.com/DataShades/ckanext-ingest
SummaryNone
upload_time2024-04-02 15:25:46
maintainerNone
docs_urlNone
authorSergey Motornyuk
requires_python>=3.8
licenseAGPL
keywords ckan
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI
coveralls test coverage No coveralls.
            [![Tests](https://github.com/DataShades/ckanext-ingest/workflows/Tests/badge.svg)](https://github.com/DataShades/ckanext-ingest/actions/workflows/test.yml)

# ckanext-ingest

Framework for data import from arbitrary sources.

Note: this extension has no aim to perform import of every possible data source
into CKAN. Instead, it defines a structure and rules for making import more
predictable, reusable and flexible.

This extension can be used if you need to:

* Create datasets/resources/etc using data from multiple files. But you want do
  import all files in a similar manner and don't want to spend time introducing
  and explaining the whole process.
* reuse ingestion logic in different projects
* share pieces of logic between different ingestion workflows

And you probably don't need it if you want to:

* import a single file using CLI once and and never do it again.


## Structure

* [Requirements](#requirements)
* [Installation](#installation)
* [Usage](#usage)
* [Examples](#examples)
* [Advanced](#advanced)
* [Configuration](#configuration)
* [Interfaces](#interfaces)
* [API](#api)
  * [`ingest_extract_records`](#ingest_extract_records)
  * [`ingest_import_records`](#ingest_import_records)

## Requirements

Compatibility with core CKAN versions:

| CKAN version | Compatible? |
|--------------|-------------|
| 2.9          | no          |
| 2.10         | yes         |
| master       | yes         |


## Installation

To install ckanext-ingest:

1. Install it via **pip**:
   ```sh
   pip install ckanext-ingest

   ## with basic XLSX strategy
   # pip install 'ckanext-ingest[xlsx]'
   ```
1. Add `ingest` to the `ckan.plugins` setting in your CKAN config file.

## Usage

Data can be ingested into CKAN via `ingest_import_records` API action. It
requires a `source` with the data, and it's recommended to pass an extraction
`strategy`, to get a full control over the process.

```sh
ckanapi action ingest_import_records source@path/to/data.zip strategy="myext:extract_archive"
```

But before anything can be ingested you have to regiser a `strategy` that
produces `records`. `strategy` defines how source is parsed and divided into
data chunks, and `record` wraps single data chunk and perform actions using
information from the chunk.

`strategy` is registered via `IIngest` interface. It has to be a subclass of
`ckanext.ingest.shared.ExtractionStrategy`. The only requirement for
`strategy` is to return iterable of `records` from its `extract` method.

`record` is created by `strategy` and it has to be a subclass of
`ckanext.ingest.shared.Record`. Its `ingest` method is responsible for
ingestion: depending on the record purpose, it can create/update/delete data or
perform any other task that has sense.


## Examples

### Register custom strategy

```python

import ckan.plugins as p

from ckanext.ingest.interfaces import IIngest

class MyPlugin(p.SingletonPlugin):
    p.implements(IIngest)

    def get_ingest_strategies(self):
        return {
          "my:custom_strategy": CustomStrategy
        }

```

### Strategy thay reads JSON file and creates a single dataset from it.
```python
import ckan.plugins.toolkit as tk
from ckanext.ingest.shared import ExtractionStrategy, Storage, Record, IngestionResult

class SingleJsonStrategy(ExtractionStrategy):

    def extract(self, source: Storage, options):
        # source is a readable IO stream(werkzeug.datastructures.FileStorage)
        data = json.load(source)

        # `extract` returns iterable over records. When the strategy produces
        # a single record, this record can be either yielded or returned as
        # a list with a single element
        yield SimplePackageRecord(data, {})

class SimplePackageRecord(Record):
    def ingest(self, context: ckan.types.Context) -> IngestionResult:

        dataset = tk.get_action("package_create")(context, self.data)

        # `ingest` returns a brief overview of the ingestion result
        return {
            "success": True,
            "result": dataset,
            "details": {}
        }

```

### Strategy that reads from CSV names of organizations that must be removed from the portal

```python
import csv
import ckan.plugins.toolkit as tk
from ckanext.ingest.shared import ExtractionStrategy, Record

class DropOrganizationsUsingCsvStrategy(ExtractionStrategy):

    def extract(self, source, options):
        # `source` is an `IO[bytes]`, so we turn in into `IO[str]`
        str_stream = StringIO(source.read().decode())
        rows = csv.DictReader(st_stream)

        for row in rows:
            # record's constructor requires two arguments:
            # the raw data and the mapping with record options.
            yield DropOrganiationRecord(row, {})

class DropOrganizationRecord(Record):
    def ingest(self, context: ckan.types.Context):
        try:
            tk.get_action("organization_delete")(context, {"id": self.data["name"]})
        except tk.ObjectNotFound:
            success = False
        else:
            success = True

        return {
            "success": success,
            "result": None,
            "details": {}
        }

```

### Pull datasets from CKAN instance specified in JSON(like ckanext-harvest), and remove datasets that were not updated during ingestion

```python
import json
from datetime import datetime
from ckanapi import RemoteCKAN
import ckan.plugins.toolkit as tk
from ckanext.ingest.shared import ExtractionStrategy, Record

class HarvestStrategy(ExtractionStrategy):

    def extract(self, source, options):
        details = json.load(source)
        client = RemoteCKAN(**details)

        now = datetime.utcnow()

        # produce a record that creates a package for every remote dataset
        for dataset in client.action.package_search()["results"]:
            yield SimpleDatasetRecord(row, {})

        # produce an additional record that removes stale datasets
        # (datasets that were modified before ingestion started and were
        # not updated during current ingestion)
        yield DeleteStaleDatasetsRecord({"before": now}, {})

class SimplePackageRecord(Record):
    def ingest(self, context: ckan.types.Context) -> IngestionResult:

        dataset = tk.get_action("package_create")(context, self.data)

        return {
            "success": True,
            "result": dataset,
            "details": {"remote_id": self.data["id"]}
        }


class DeleteStaleDatasetsRecord(Record):
    def ingest(self, context: ckan.types.Context) -> IngestionResult:
        before = self.data["before"].isoformat()
        result = tk.get_action("package_search")(
            context,
            {"fq": f"metadata_modified:[* TO {before}]", "fl": "id"}
        )

        deleted = []
        for dataset in result["results"]
            tk.get_action("package_delete")(context, {"id": dataset["id"]})
            deleted.append(id)

        return {
            "success": True,
            "result": deleted,
            "details": {"count": len(deleted), "before": before}
        }

```

## Advanced

To get the most from ingestion workflows, try writing reusable strategies and
records using details below

### Strategy autodetection

`strategy` argument for actions is optional. When it missing, the plugins
chooses the most appropriate strategy for the ingested source. This feature
relies on `can_handle` and `must_handle` methods of the extraction
strategy. Both methods receive the mimetype of the source and the source itself
and return `True`/`False`.

Among all strategies that return `True` from `can_handle`, plugin selects the
first strategy that returns `True` from `must_handle` as well. If there is no
such strategy, the first `can_handle` wins.

`ckanext.ingest.shared.ExtractionStrategy` defines both these
methods. `must_handle` always returns `False`. `can_handle` return `True` if
source's mimetype is listed in `mimetypes` property of the handler:

```python
class ExtractionStrategy:
    mimetypes: ClassVar[set[str]] = set()

    @classmethod
    def can_handle(cls, mime: str | None, source) -> bool:
        return mime in cls.mimetypes

    @classmethod
    def must_handle(cls, mime, source) -> bool:
        return False

```

If you want to register strategy that can handle JSON sources, just register
strategy with an appropriate `mimetypes`:

```python
class JsonStrategy(ExtractionStrategy):
    mimetypes = {"application/json"}
```

If there are more than one strategy that supports JSON mimetype, the first
registered strategy is selected. If you want to register strategy that aalways
handles JSON sources with specific name(`DRINK_ME.json`), disregarding the
order, you can use `must_handle`.

Note, that `must_handle` is checked only when `can_handle` returns `True`, so
we still using default `mimetypes` logic:

```python
class DrinkMeJsonStrategy(ExtractionStrategy):
    mimetypes = {"application/json"}

    @classmethod
    def must_handle(cls, mime, source: Storage) -> bool:
        return source.filename == "DRINK_ME.json"
```

### Record factories

`ExtractionStrategy` has a default implementation of `extract`. This default
implementation calls `chunks` method to parse the source and get ingestable
data fragments. Then, for every data chunk `chunk_into_record` method is
called, to transform arbitrary data into a `Record`. Finally, `extract` yields
whatever is produced by `chunk_into_record`.

Default implementation of `chunks` ignores the source and returns an empty
list. As result, by default any source produce zero records and nothing happens.

The first thing you can do to produce a data is overriding `chunks`.

If you are working with CSV file, `chunks` can return rows from the file:

```python
class CsvRowsStrategy(ExtractionStrategy):
    mimetypes = {"text/csv"}

    def chunks(self, source, options) -> Iterable[Any]:
        str_stream = StringIO(source.read().decode())
        rows = csv.reader(str_stream)

        yield from rows
```

Such strategy will produce `ckanext.ingest.shared.Record` for every row of the
source CSV. But base `Record` class doesn't do much, so you need to replace it
with your own `Record` subclass.

As mentioned before, data chunk converted into a record via `chunk_into_record`
method. You can either override it, or use default implemmentation, which
creates instances of the class stored under `record_factory` attribute of the
strategy. Default value of this attribute is `ckanext.ingest.shared.Record` and
if you want to use a different record implementation, do the following:

```python
class CsvRowsStrategy(ExtractionStrategy):
    record_factory = MyCustomRecord
    ...
```

### Strategy delegation

`ExtractionStrategy.extract` method is responsible for producing records. But
it doesn't mean that strategy have to generate records itself. Instead,
strategy can do some preparations and use another strategy in order to make records.

Let's imagine `UrlStrategy` that accepts file with a single line - URL of the
remote portal - and pulls data from this portal. As we don't know the
type of the data, we cannot tell, how records can be created from it. So, when
data is fetched, we can use its mimetype to select the most suitable strategy
and delegate record generation to its `extract` method:

```python
import requests
import magic
from io import BytesIO
from ckanext.ingest import shared

class UrlStrategy(ExtractionStrategy):

    def extract(self, source, options) -> Iterable[Any]:
        # read URL from file-like source
        url = source.read()
        resp = requests.get(url)

        # convert response bytes into `source`
        sub_source = shared.make_storage(BytesIO(resp.content))

        # identify mimetype
        mime = magic.from_buffer(sub_source.read(1024))
        sub_source.seek(0)

        # choose the appropriate strategy
        handler = shared.get_handler_for_mimetype(mime, sub_source)

        # delegate extraction
        if handler:
            yield from handler.extract(source, options)
```

### Strategy and Record options

`ExtractionStrategy.extract` and `Record.ingest` accept second argument
`options`. In both cases it's a dictionary that can be used to modify the logic
inside corresponding methods. Strategy options described by
`ckanext.ingest.shared.StrategyOptions`, and record options described by
`ckanext.ingest.shared.RecordOptions`.

Keys defined on the top-level, have sense for every strategy/record. For
example, `RecordOptions` defines `update_existing` flag. If record that creates
data detects existing conflicting entity, `update_existing` flag should be
taken into account when the record is considering what to do in such case. It's
only a recomendation and this flag can be ignored or you can use a different
option. But using common options simplify understanding of the workflow.

For strategy there are 3 common keys:

* `record_options`: these options should be passed into every record produced
  by the strategy(`RecordOptions`)
* `nested_strategy`: if strategy delegates record creation to a different
  strategy, `nested_strategy` should be prefered over auto-detected
  strategy(mimetype detection)
* `locator`: if source is represented by some kind of collection, `locator` is
  a callable that returns specific members of collection. It can be used when
  parsing archives, so that strategy can extract package's metadata from one
  file and upload resources returned by `locator` into it. Or, when parsing
  XLSX, `locator` can return sheets by title to simplify processing of multiple
  sheets.

For any options that can be used only by a specific strategy, there is an
`extras` option inside both `StrategyOptions` and `RecordOptions`. This
dictionary can hold any data and there are no expectations to its structure.

Keys that are used often inside `extras` may eventually be added as recommended
options to the top-level. But, as these are only recomendations, you can just
ignore them and pass whatever data you need as options.

### Data transformation in Record

`ckanext.ingest.shared.Record` class requires two parameters for
initialization: `raw` data and `options` for the record. When record is
created, it calls its `trasform` method, that copies `raw` data into `data`
property. This is the best place for data mapping, before record's `ingest`
method is called. If you want to remove all empty members from record's `data`,
it can be done in the following way:

```python
class DenseRecord(Record):
    def transform(self, raw: Any):
        self.data = {
            key: value
            for key, value in raw.items()
            if value is not None
        }

```

### Record ingestion and rsults

Record usually calls one of CKAN API actions during ingestion. In order to do
it properly, record needs action `context`, which is passed as as single
argument into `ingest` method. But this is only the most common workflow, so if
you don't use any action, just ignore the `context`. What is more important, is
the output of the `ingest`. It must be a dictionary described by
`ckanext.ingest.shared.IngestionResult`. It has three members:

* `success`: flag that indicates whether ingestion succeeded or failed
* `result`: data produced by ingestion(package, resource, organization, etc.)
* `details`: any other details that may be useful. For example, how many
  entities were modified during ingestion, which API action was used, what were
  the errors if ingestion failed.

These details are not required by ingestion, but they may be used for building
ingestion report.

### Configure record trasformation with ckanext-scheming

`ckanext.ingest.record` module contains `PackageRecord` and `ResourceRecord`
classes that create package/resource. But their `trasform` method is much more
interesting. It maps `raw` into `data` using field configuration from metadata
schema defined by ckanext-scheming.

In order to configure mapping, add `ingest_options` attribute to the field defition:
```yaml
- field_name: title
  label: Title
  ingest_options: {}
```

During transformation, every key in `raw` is checked agains the schema. If
schema contains field with `ingest_options` whose `field_name` or `label`
matches the key from `raw`, this key is copied into `data` and mapped to the
corresponding `field_name`. I.e, for the field definition above, both `raw`
versions - `{"title": "hello"}` and `{"Title": "hello"}` will turn into `data`
with value `{"title": "hello"}`.

If you have completely different names in `raw`, use `aliases`(`list[str]`)
attribute of `ingest_options`:

```yaml
- field_name: title
  label: Title
  ingest_options:
      aliasses: [TITLE, "name of the dataset"]
```

In this case, `{"name of the dataset": "hello"}` and `{"TITLE": "hello"}` will
turn into `{"title": "hello"}`.

If value requires additional processing before it can be used as a field
values, specify all applied validators as `convert` attribute of the
`ingest_options`:

```yaml
- field_name: title
  label: Title
  ingest_options:
      convert: conver_to_lowercase json_list_or_string
```

`convert` uses the same syntax as `validators` attribute of the field
definition. You can use any registered validator inside this field. But, unlike
validators, if `Invalid` error raised during transformation, field is silently
ignored and ingestion continues.

Any field from `raw` that has no corresponding field in schema(detected by
`field_name`/`label` or `ingest_options.aiases`), is not added to the `data`
and won't be used for package/resource creation.

### Generic strategies

There are a number of strategies available out-of-the box. You probably won't
use them as-is, but creating a subclass of these strategies may simplify the
process and solve a couple of common problems.

#### `ingest:scheming_csv`

Defined by `ckanext.ingest.strategy.csv.CsvStrategy`.

#### `ingest:recursive_zip`

Defined by `ckanext.ingest.strategy.zip.CsvStrategy`.

#### `ingest:xlsx`

Defined by `ckanext.ingest.strategy.xlsx.XlsxStrategy`.

## Configuration

```ini
# List of allowed ingestion strategies. If empty, all registered strategies
# are allowed
# (optional, default: )
ckanext.ingest.strategy.allowed = ingest:recursive_zip

# List of disabled ingestion strategies.
# (optional, default: )
ckanext.ingest.strategy.disabled = ingest:scheming_csv

# Base template for WebUI
# (optional, default: page.html)
ckanext.ingest.base_template = admin/index.html

# Allow moving existing resources between packages.
# (optional, default: false)
ckanext.ingest.allow_resource_transfer = true

# Rename strategies using `{"import.path.of:StrategyClass": "new_name"}` JSON
# object
# (optional, default: )
ckanext.ingest.strategy.name_mapping = {"ckanext.ingest.strategy.zip:ZipStrategy": "zip"}
```

## Interfaces

`ckanext.ingest.interfaces.IIngest` interface implementations can regiser
custom extraction strategies via `get_ingest_strategies` method::

```python
def get_ingest_strategies() -> dict[str, type[ckanext.ingest.shared.ExtractionStrategy]]:
    """Return extraction strategies."""
    return {
        "my_plugin:xlsx_datasets": MyXlsxStrategy,
    }
```

## API

### `ingest_extract_records`

Extract records from the source.

This method mainly exists for debugging. It doesn't create anything, just
parses the source, produces records and return record's data as a
list. Because it aggregates all extracted records into a single list, it
can consume a lot of memory. If you want to iterate over, consider using
`iter_records` function that produces an iterable over records.

Args:

    source: str|FileStorage - data source for records

    strategy: str|None - record extraction strategy. If missing, strategy
    is guessed depending on source's mimetype

    options: SourceOptions - dictionary with configuration for strategy and
    records. Consumed by strategies so heavily depends on the chosen
    strategy.


### `ingest_import_records`

Ingest records extracted from source.

Parse the source, convert it into Records using selected strategy, and call
`Record.ingest`, potentially creating/updating data.

Args:

    source: str|FileStorage - data source for records

    strategy: str|None - record extraction strategy. If missing, strategy
    is guessed depending on source's mimetype

    options: SourceOptions - dictionary with configuration for strategy and
    records. Consumed by strategies so heavily depends on the chosen
    strategy.

    defaults: dict[str, Any] - default data added to every record(if missing)

    overrides: dict[str, Any] - data that unconditionally overrides record details

    skip: int - number of records that are skipped without ingestion

    take: int - max number of records that will be ingested

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/DataShades/ckanext-ingest",
    "name": "ckanext-ingest",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.8",
    "maintainer_email": null,
    "keywords": "CKAN",
    "author": "Sergey Motornyuk",
    "author_email": "sergey.motornyuk@linkdigital.com.au",
    "download_url": "https://files.pythonhosted.org/packages/9f/57/e2d4ae352bc5f644c412fdded4b31769fe323f779544145cee3ed5095b9a/ckanext-ingest-1.4.1.tar.gz",
    "platform": null,
    "description": "[![Tests](https://github.com/DataShades/ckanext-ingest/workflows/Tests/badge.svg)](https://github.com/DataShades/ckanext-ingest/actions/workflows/test.yml)\n\n# ckanext-ingest\n\nFramework for data import from arbitrary sources.\n\nNote: this extension has no aim to perform import of every possible data source\ninto CKAN. Instead, it defines a structure and rules for making import more\npredictable, reusable and flexible.\n\nThis extension can be used if you need to:\n\n* Create datasets/resources/etc using data from multiple files. But you want do\n  import all files in a similar manner and don't want to spend time introducing\n  and explaining the whole process.\n* reuse ingestion logic in different projects\n* share pieces of logic between different ingestion workflows\n\nAnd you probably don't need it if you want to:\n\n* import a single file using CLI once and and never do it again.\n\n\n## Structure\n\n* [Requirements](#requirements)\n* [Installation](#installation)\n* [Usage](#usage)\n* [Examples](#examples)\n* [Advanced](#advanced)\n* [Configuration](#configuration)\n* [Interfaces](#interfaces)\n* [API](#api)\n  * [`ingest_extract_records`](#ingest_extract_records)\n  * [`ingest_import_records`](#ingest_import_records)\n\n## Requirements\n\nCompatibility with core CKAN versions:\n\n| CKAN version | Compatible? |\n|--------------|-------------|\n| 2.9          | no          |\n| 2.10         | yes         |\n| master       | yes         |\n\n\n## Installation\n\nTo install ckanext-ingest:\n\n1. Install it via **pip**:\n   ```sh\n   pip install ckanext-ingest\n\n   ## with basic XLSX strategy\n   # pip install 'ckanext-ingest[xlsx]'\n   ```\n1. Add `ingest` to the `ckan.plugins` setting in your CKAN config file.\n\n## Usage\n\nData can be ingested into CKAN via `ingest_import_records` API action. It\nrequires a `source` with the data, and it's recommended to pass an extraction\n`strategy`, to get a full control over the process.\n\n```sh\nckanapi action ingest_import_records source@path/to/data.zip strategy=\"myext:extract_archive\"\n```\n\nBut before anything can be ingested you have to regiser a `strategy` that\nproduces `records`. `strategy` defines how source is parsed and divided into\ndata chunks, and `record` wraps single data chunk and perform actions using\ninformation from the chunk.\n\n`strategy` is registered via `IIngest` interface. It has to be a subclass of\n`ckanext.ingest.shared.ExtractionStrategy`. The only requirement for\n`strategy` is to return iterable of `records` from its `extract` method.\n\n`record` is created by `strategy` and it has to be a subclass of\n`ckanext.ingest.shared.Record`. Its `ingest` method is responsible for\ningestion: depending on the record purpose, it can create/update/delete data or\nperform any other task that has sense.\n\n\n## Examples\n\n### Register custom strategy\n\n```python\n\nimport ckan.plugins as p\n\nfrom ckanext.ingest.interfaces import IIngest\n\nclass MyPlugin(p.SingletonPlugin):\n    p.implements(IIngest)\n\n    def get_ingest_strategies(self):\n        return {\n          \"my:custom_strategy\": CustomStrategy\n        }\n\n```\n\n### Strategy thay reads JSON file and creates a single dataset from it.\n```python\nimport ckan.plugins.toolkit as tk\nfrom ckanext.ingest.shared import ExtractionStrategy, Storage, Record, IngestionResult\n\nclass SingleJsonStrategy(ExtractionStrategy):\n\n    def extract(self, source: Storage, options):\n        # source is a readable IO stream(werkzeug.datastructures.FileStorage)\n        data = json.load(source)\n\n        # `extract` returns iterable over records. When the strategy produces\n        # a single record, this record can be either yielded or returned as\n        # a list with a single element\n        yield SimplePackageRecord(data, {})\n\nclass SimplePackageRecord(Record):\n    def ingest(self, context: ckan.types.Context) -> IngestionResult:\n\n        dataset = tk.get_action(\"package_create\")(context, self.data)\n\n        # `ingest` returns a brief overview of the ingestion result\n        return {\n            \"success\": True,\n            \"result\": dataset,\n            \"details\": {}\n        }\n\n```\n\n### Strategy that reads from CSV names of organizations that must be removed from the portal\n\n```python\nimport csv\nimport ckan.plugins.toolkit as tk\nfrom ckanext.ingest.shared import ExtractionStrategy, Record\n\nclass DropOrganizationsUsingCsvStrategy(ExtractionStrategy):\n\n    def extract(self, source, options):\n        # `source` is an `IO[bytes]`, so we turn in into `IO[str]`\n        str_stream = StringIO(source.read().decode())\n        rows = csv.DictReader(st_stream)\n\n        for row in rows:\n            # record's constructor requires two arguments:\n            # the raw data and the mapping with record options.\n            yield DropOrganiationRecord(row, {})\n\nclass DropOrganizationRecord(Record):\n    def ingest(self, context: ckan.types.Context):\n        try:\n            tk.get_action(\"organization_delete\")(context, {\"id\": self.data[\"name\"]})\n        except tk.ObjectNotFound:\n            success = False\n        else:\n            success = True\n\n        return {\n            \"success\": success,\n            \"result\": None,\n            \"details\": {}\n        }\n\n```\n\n### Pull datasets from CKAN instance specified in JSON(like ckanext-harvest), and remove datasets that were not updated during ingestion\n\n```python\nimport json\nfrom datetime import datetime\nfrom ckanapi import RemoteCKAN\nimport ckan.plugins.toolkit as tk\nfrom ckanext.ingest.shared import ExtractionStrategy, Record\n\nclass HarvestStrategy(ExtractionStrategy):\n\n    def extract(self, source, options):\n        details = json.load(source)\n        client = RemoteCKAN(**details)\n\n        now = datetime.utcnow()\n\n        # produce a record that creates a package for every remote dataset\n        for dataset in client.action.package_search()[\"results\"]:\n            yield SimpleDatasetRecord(row, {})\n\n        # produce an additional record that removes stale datasets\n        # (datasets that were modified before ingestion started and were\n        # not updated during current ingestion)\n        yield DeleteStaleDatasetsRecord({\"before\": now}, {})\n\nclass SimplePackageRecord(Record):\n    def ingest(self, context: ckan.types.Context) -> IngestionResult:\n\n        dataset = tk.get_action(\"package_create\")(context, self.data)\n\n        return {\n            \"success\": True,\n            \"result\": dataset,\n            \"details\": {\"remote_id\": self.data[\"id\"]}\n        }\n\n\nclass DeleteStaleDatasetsRecord(Record):\n    def ingest(self, context: ckan.types.Context) -> IngestionResult:\n        before = self.data[\"before\"].isoformat()\n        result = tk.get_action(\"package_search\")(\n            context,\n            {\"fq\": f\"metadata_modified:[* TO {before}]\", \"fl\": \"id\"}\n        )\n\n        deleted = []\n        for dataset in result[\"results\"]\n            tk.get_action(\"package_delete\")(context, {\"id\": dataset[\"id\"]})\n            deleted.append(id)\n\n        return {\n            \"success\": True,\n            \"result\": deleted,\n            \"details\": {\"count\": len(deleted), \"before\": before}\n        }\n\n```\n\n## Advanced\n\nTo get the most from ingestion workflows, try writing reusable strategies and\nrecords using details below\n\n### Strategy autodetection\n\n`strategy` argument for actions is optional. When it missing, the plugins\nchooses the most appropriate strategy for the ingested source. This feature\nrelies on `can_handle` and `must_handle` methods of the extraction\nstrategy. Both methods receive the mimetype of the source and the source itself\nand return `True`/`False`.\n\nAmong all strategies that return `True` from `can_handle`, plugin selects the\nfirst strategy that returns `True` from `must_handle` as well. If there is no\nsuch strategy, the first `can_handle` wins.\n\n`ckanext.ingest.shared.ExtractionStrategy` defines both these\nmethods. `must_handle` always returns `False`. `can_handle` return `True` if\nsource's mimetype is listed in `mimetypes` property of the handler:\n\n```python\nclass ExtractionStrategy:\n    mimetypes: ClassVar[set[str]] = set()\n\n    @classmethod\n    def can_handle(cls, mime: str | None, source) -> bool:\n        return mime in cls.mimetypes\n\n    @classmethod\n    def must_handle(cls, mime, source) -> bool:\n        return False\n\n```\n\nIf you want to register strategy that can handle JSON sources, just register\nstrategy with an appropriate `mimetypes`:\n\n```python\nclass JsonStrategy(ExtractionStrategy):\n    mimetypes = {\"application/json\"}\n```\n\nIf there are more than one strategy that supports JSON mimetype, the first\nregistered strategy is selected. If you want to register strategy that aalways\nhandles JSON sources with specific name(`DRINK_ME.json`), disregarding the\norder, you can use `must_handle`.\n\nNote, that `must_handle` is checked only when `can_handle` returns `True`, so\nwe still using default `mimetypes` logic:\n\n```python\nclass DrinkMeJsonStrategy(ExtractionStrategy):\n    mimetypes = {\"application/json\"}\n\n    @classmethod\n    def must_handle(cls, mime, source: Storage) -> bool:\n        return source.filename == \"DRINK_ME.json\"\n```\n\n### Record factories\n\n`ExtractionStrategy` has a default implementation of `extract`. This default\nimplementation calls `chunks` method to parse the source and get ingestable\ndata fragments. Then, for every data chunk `chunk_into_record` method is\ncalled, to transform arbitrary data into a `Record`. Finally, `extract` yields\nwhatever is produced by `chunk_into_record`.\n\nDefault implementation of `chunks` ignores the source and returns an empty\nlist. As result, by default any source produce zero records and nothing happens.\n\nThe first thing you can do to produce a data is overriding `chunks`.\n\nIf you are working with CSV file, `chunks` can return rows from the file:\n\n```python\nclass CsvRowsStrategy(ExtractionStrategy):\n    mimetypes = {\"text/csv\"}\n\n    def chunks(self, source, options) -> Iterable[Any]:\n        str_stream = StringIO(source.read().decode())\n        rows = csv.reader(str_stream)\n\n        yield from rows\n```\n\nSuch strategy will produce `ckanext.ingest.shared.Record` for every row of the\nsource CSV. But base `Record` class doesn't do much, so you need to replace it\nwith your own `Record` subclass.\n\nAs mentioned before, data chunk converted into a record via `chunk_into_record`\nmethod. You can either override it, or use default implemmentation, which\ncreates instances of the class stored under `record_factory` attribute of the\nstrategy. Default value of this attribute is `ckanext.ingest.shared.Record` and\nif you want to use a different record implementation, do the following:\n\n```python\nclass CsvRowsStrategy(ExtractionStrategy):\n    record_factory = MyCustomRecord\n    ...\n```\n\n### Strategy delegation\n\n`ExtractionStrategy.extract` method is responsible for producing records. But\nit doesn't mean that strategy have to generate records itself. Instead,\nstrategy can do some preparations and use another strategy in order to make records.\n\nLet's imagine `UrlStrategy` that accepts file with a single line - URL of the\nremote portal - and pulls data from this portal. As we don't know the\ntype of the data, we cannot tell, how records can be created from it. So, when\ndata is fetched, we can use its mimetype to select the most suitable strategy\nand delegate record generation to its `extract` method:\n\n```python\nimport requests\nimport magic\nfrom io import BytesIO\nfrom ckanext.ingest import shared\n\nclass UrlStrategy(ExtractionStrategy):\n\n    def extract(self, source, options) -> Iterable[Any]:\n        # read URL from file-like source\n        url = source.read()\n        resp = requests.get(url)\n\n        # convert response bytes into `source`\n        sub_source = shared.make_storage(BytesIO(resp.content))\n\n        # identify mimetype\n        mime = magic.from_buffer(sub_source.read(1024))\n        sub_source.seek(0)\n\n        # choose the appropriate strategy\n        handler = shared.get_handler_for_mimetype(mime, sub_source)\n\n        # delegate extraction\n        if handler:\n            yield from handler.extract(source, options)\n```\n\n### Strategy and Record options\n\n`ExtractionStrategy.extract` and `Record.ingest` accept second argument\n`options`. In both cases it's a dictionary that can be used to modify the logic\ninside corresponding methods. Strategy options described by\n`ckanext.ingest.shared.StrategyOptions`, and record options described by\n`ckanext.ingest.shared.RecordOptions`.\n\nKeys defined on the top-level, have sense for every strategy/record. For\nexample, `RecordOptions` defines `update_existing` flag. If record that creates\ndata detects existing conflicting entity, `update_existing` flag should be\ntaken into account when the record is considering what to do in such case. It's\nonly a recomendation and this flag can be ignored or you can use a different\noption. But using common options simplify understanding of the workflow.\n\nFor strategy there are 3 common keys:\n\n* `record_options`: these options should be passed into every record produced\n  by the strategy(`RecordOptions`)\n* `nested_strategy`: if strategy delegates record creation to a different\n  strategy, `nested_strategy` should be prefered over auto-detected\n  strategy(mimetype detection)\n* `locator`: if source is represented by some kind of collection, `locator` is\n  a callable that returns specific members of collection. It can be used when\n  parsing archives, so that strategy can extract package's metadata from one\n  file and upload resources returned by `locator` into it. Or, when parsing\n  XLSX, `locator` can return sheets by title to simplify processing of multiple\n  sheets.\n\nFor any options that can be used only by a specific strategy, there is an\n`extras` option inside both `StrategyOptions` and `RecordOptions`. This\ndictionary can hold any data and there are no expectations to its structure.\n\nKeys that are used often inside `extras` may eventually be added as recommended\noptions to the top-level. But, as these are only recomendations, you can just\nignore them and pass whatever data you need as options.\n\n### Data transformation in Record\n\n`ckanext.ingest.shared.Record` class requires two parameters for\ninitialization: `raw` data and `options` for the record. When record is\ncreated, it calls its `trasform` method, that copies `raw` data into `data`\nproperty. This is the best place for data mapping, before record's `ingest`\nmethod is called. If you want to remove all empty members from record's `data`,\nit can be done in the following way:\n\n```python\nclass DenseRecord(Record):\n    def transform(self, raw: Any):\n        self.data = {\n            key: value\n            for key, value in raw.items()\n            if value is not None\n        }\n\n```\n\n### Record ingestion and rsults\n\nRecord usually calls one of CKAN API actions during ingestion. In order to do\nit properly, record needs action `context`, which is passed as as single\nargument into `ingest` method. But this is only the most common workflow, so if\nyou don't use any action, just ignore the `context`. What is more important, is\nthe output of the `ingest`. It must be a dictionary described by\n`ckanext.ingest.shared.IngestionResult`. It has three members:\n\n* `success`: flag that indicates whether ingestion succeeded or failed\n* `result`: data produced by ingestion(package, resource, organization, etc.)\n* `details`: any other details that may be useful. For example, how many\n  entities were modified during ingestion, which API action was used, what were\n  the errors if ingestion failed.\n\nThese details are not required by ingestion, but they may be used for building\ningestion report.\n\n### Configure record trasformation with ckanext-scheming\n\n`ckanext.ingest.record` module contains `PackageRecord` and `ResourceRecord`\nclasses that create package/resource. But their `trasform` method is much more\ninteresting. It maps `raw` into `data` using field configuration from metadata\nschema defined by ckanext-scheming.\n\nIn order to configure mapping, add `ingest_options` attribute to the field defition:\n```yaml\n- field_name: title\n  label: Title\n  ingest_options: {}\n```\n\nDuring transformation, every key in `raw` is checked agains the schema. If\nschema contains field with `ingest_options` whose `field_name` or `label`\nmatches the key from `raw`, this key is copied into `data` and mapped to the\ncorresponding `field_name`. I.e, for the field definition above, both `raw`\nversions - `{\"title\": \"hello\"}` and `{\"Title\": \"hello\"}` will turn into `data`\nwith value `{\"title\": \"hello\"}`.\n\nIf you have completely different names in `raw`, use `aliases`(`list[str]`)\nattribute of `ingest_options`:\n\n```yaml\n- field_name: title\n  label: Title\n  ingest_options:\n      aliasses: [TITLE, \"name of the dataset\"]\n```\n\nIn this case, `{\"name of the dataset\": \"hello\"}` and `{\"TITLE\": \"hello\"}` will\nturn into `{\"title\": \"hello\"}`.\n\nIf value requires additional processing before it can be used as a field\nvalues, specify all applied validators as `convert` attribute of the\n`ingest_options`:\n\n```yaml\n- field_name: title\n  label: Title\n  ingest_options:\n      convert: conver_to_lowercase json_list_or_string\n```\n\n`convert` uses the same syntax as `validators` attribute of the field\ndefinition. You can use any registered validator inside this field. But, unlike\nvalidators, if `Invalid` error raised during transformation, field is silently\nignored and ingestion continues.\n\nAny field from `raw` that has no corresponding field in schema(detected by\n`field_name`/`label` or `ingest_options.aiases`), is not added to the `data`\nand won't be used for package/resource creation.\n\n### Generic strategies\n\nThere are a number of strategies available out-of-the box. You probably won't\nuse them as-is, but creating a subclass of these strategies may simplify the\nprocess and solve a couple of common problems.\n\n#### `ingest:scheming_csv`\n\nDefined by `ckanext.ingest.strategy.csv.CsvStrategy`.\n\n#### `ingest:recursive_zip`\n\nDefined by `ckanext.ingest.strategy.zip.CsvStrategy`.\n\n#### `ingest:xlsx`\n\nDefined by `ckanext.ingest.strategy.xlsx.XlsxStrategy`.\n\n## Configuration\n\n```ini\n# List of allowed ingestion strategies. If empty, all registered strategies\n# are allowed\n# (optional, default: )\nckanext.ingest.strategy.allowed = ingest:recursive_zip\n\n# List of disabled ingestion strategies.\n# (optional, default: )\nckanext.ingest.strategy.disabled = ingest:scheming_csv\n\n# Base template for WebUI\n# (optional, default: page.html)\nckanext.ingest.base_template = admin/index.html\n\n# Allow moving existing resources between packages.\n# (optional, default: false)\nckanext.ingest.allow_resource_transfer = true\n\n# Rename strategies using `{\"import.path.of:StrategyClass\": \"new_name\"}` JSON\n# object\n# (optional, default: )\nckanext.ingest.strategy.name_mapping = {\"ckanext.ingest.strategy.zip:ZipStrategy\": \"zip\"}\n```\n\n## Interfaces\n\n`ckanext.ingest.interfaces.IIngest` interface implementations can regiser\ncustom extraction strategies via `get_ingest_strategies` method::\n\n```python\ndef get_ingest_strategies() -> dict[str, type[ckanext.ingest.shared.ExtractionStrategy]]:\n    \"\"\"Return extraction strategies.\"\"\"\n    return {\n        \"my_plugin:xlsx_datasets\": MyXlsxStrategy,\n    }\n```\n\n## API\n\n### `ingest_extract_records`\n\nExtract records from the source.\n\nThis method mainly exists for debugging. It doesn't create anything, just\nparses the source, produces records and return record's data as a\nlist. Because it aggregates all extracted records into a single list, it\ncan consume a lot of memory. If you want to iterate over, consider using\n`iter_records` function that produces an iterable over records.\n\nArgs:\n\n    source: str|FileStorage - data source for records\n\n    strategy: str|None - record extraction strategy. If missing, strategy\n    is guessed depending on source's mimetype\n\n    options: SourceOptions - dictionary with configuration for strategy and\n    records. Consumed by strategies so heavily depends on the chosen\n    strategy.\n\n\n### `ingest_import_records`\n\nIngest records extracted from source.\n\nParse the source, convert it into Records using selected strategy, and call\n`Record.ingest`, potentially creating/updating data.\n\nArgs:\n\n    source: str|FileStorage - data source for records\n\n    strategy: str|None - record extraction strategy. If missing, strategy\n    is guessed depending on source's mimetype\n\n    options: SourceOptions - dictionary with configuration for strategy and\n    records. Consumed by strategies so heavily depends on the chosen\n    strategy.\n\n    defaults: dict[str, Any] - default data added to every record(if missing)\n\n    overrides: dict[str, Any] - data that unconditionally overrides record details\n\n    skip: int - number of records that are skipped without ingestion\n\n    take: int - max number of records that will be ingested\n",
    "bugtrack_url": null,
    "license": "AGPL",
    "summary": null,
    "version": "1.4.1",
    "project_urls": {
        "Homepage": "https://github.com/DataShades/ckanext-ingest"
    },
    "split_keywords": [
        "ckan"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "b163a176147d433ba7416d934010f8fc24e5bc2f18a51a358cb2dfb844044bbb",
                "md5": "2861f28528cd2625fdd9d527ef597a41",
                "sha256": "84544a08e66073edbc3cdf32876c04c31510a68cbc050f7e648f53e2dab50e84"
            },
            "downloads": -1,
            "filename": "ckanext_ingest-1.4.1-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "2861f28528cd2625fdd9d527ef597a41",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.8",
            "size": 41973,
            "upload_time": "2024-04-02T15:25:42",
            "upload_time_iso_8601": "2024-04-02T15:25:42.887222Z",
            "url": "https://files.pythonhosted.org/packages/b1/63/a176147d433ba7416d934010f8fc24e5bc2f18a51a358cb2dfb844044bbb/ckanext_ingest-1.4.1-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "9f57e2d4ae352bc5f644c412fdded4b31769fe323f779544145cee3ed5095b9a",
                "md5": "071aca6881e6dcf4790ba3927dd37400",
                "sha256": "43c909a05d4f12c5fcf352f82ccedd454377f766fc3bf243ef4b0f3417a2d81a"
            },
            "downloads": -1,
            "filename": "ckanext-ingest-1.4.1.tar.gz",
            "has_sig": false,
            "md5_digest": "071aca6881e6dcf4790ba3927dd37400",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.8",
            "size": 42301,
            "upload_time": "2024-04-02T15:25:46",
            "upload_time_iso_8601": "2024-04-02T15:25:46.657287Z",
            "url": "https://files.pythonhosted.org/packages/9f/57/e2d4ae352bc5f644c412fdded4b31769fe323f779544145cee3ed5095b9a/ckanext-ingest-1.4.1.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-04-02 15:25:46",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "DataShades",
    "github_project": "ckanext-ingest",
    "travis_ci": true,
    "coveralls": false,
    "github_actions": true,
    "requirements": [],
    "lcname": "ckanext-ingest"
}
        
Elapsed time: 0.33926s