fairspace-pipeline


Namefairspace-pipeline JSON
Version 0.0.3 PyPI version JSON
download
home_pageNone
SummaryBuilding blocks for creating a pipeline for Fairspace
upload_time2024-09-10 14:26:55
maintainerNone
docs_urlNone
authorNone
requires_python>=3.12
licenseApache-2.0
keywords fairspace pipeline
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Fairspace Pipeline

[![PyPI](https://img.shields.io/pypi/v/fairspace_pipeline.svg)](https://pypi.org/project/fairspace_pipeline/)
![PyPI - Status](https://img.shields.io/pypi/status/fairspace_pipeline.svg)
[![Apache 2.0 license](https://img.shields.io/pypi/l/fairspace_pipeline.svg)](LICENSE)

This repository contains the Fairspace Pipeline, 
a package that can be used as a base to create custom ETL pipelines for Fairspace.

## Installation

Requires Python 3.12 or newer.

### Installing from PyPI

```shell
python -m pip install fairspace-pipeline
```

### Installing from sources

```shell
git clone https://github.com/thehyve/fairspace-pipeline.git
python -m build
python -m pip install .
```

## Usage

Fairspace Pipeline cannot be used as a standalone package. It requires a custom implementation of several and specific configuration.
Sections below describe the necessary steps to create a fully-functioning custom pipeline with reusable components and interfaces implementation.

### Configuration
Create a `.env` file with the following environment variables:

```shell
# Keycloak
KEYCLOAK_CLIENT_ID="" # e.g. KEYCLOAK_CLIENT_ID="workspace-client"
KEYCLOAK_CLIENT_SECRET="" # e.g. KEYCLOAK_CLIENT_SECRET="********"
KEYCLOAK_USERNAME="" # e.g. KEYCLOAK_USERNAME="organisation-admin"
KEYCLOAK_PASSWORD="" # e.g. KEYCLOAK_PASSWORD="fairspace123"
KEYCLOAK_SERVER_URL="" # e.g. KEYCLOAK_SERVER_URL="https://my-keycloak-instance.com"
KEYCLOAK_REALM="" # e.g. KEYCLOAK_REALM="fairspace"

# Fairspace
FAIRSPACE_URL="" # e.g. FAIRSPACE_URL="https://my-fairspace-instance.com"

# Amazon S3
IS_AWS_S3=False # e.g. IS_AWS_S3=True if using AWS S3 bucket for source data
AWS_SOURCE_BUCKET_NAME="" # e.g. AWS_SOURCE_BUCKET_NAME="fairspace-metadata-source"
AWS_OUTPUT_BUCKET_NAME="" # e.g. AWS_OUTPUT_BUCKET_NAME="fairspace-metadata-output"

# Pipeline settings
TAXONOMIES_TTL_PATH="config/custom_taxonomies.ttl"
SOURCE_STUDIES_PATHS='' # e.g. '["./test_data/test_study_input1", "./test_data/test_study_input2"]'
OUTPUT_DATA_PATH="" # e.g. OUTPUT_DATA_PATH="./test_data/.output_data"
```

Next, implement a function that creates a FairspacePipelineConfig object using environment variables

```python
def get_config() -> FairspacePipelineConfig:
    config = FairspacePipelineConfig()
    config.source_study_prefixes = json.loads(str(os.environ.get('SOURCE_STUDIES_PREFIXES', '[""]')))
    config.output_data_directory = os.environ.get('OUTPUT_DATA_PATH',
                                                  os.path.join(os.getcwd(), 'test_data/.output_data'))
    config.is_aws_s3 = os.getenv("IS_AWS_S3", 'False').lower() in ('true', '1', 't')
    if config.is_aws_s3:
        config.source_study_directories = json.loads(os.environ.get('SOURCE_STUDIES_PATHS'))
    else:
        config.source_study_directories = [os.path.join(os.getcwd(), val) for val in json.loads(
            os.environ.get('SOURCE_STUDIES_PATHS', '["test_data/source_data"]'))]
    config.source_bucket_name = os.environ.get('AWS_SOURCE_BUCKET_NAME')
    config.output_bucket_name = os.environ.get('AWS_OUTPUT_BUCKET_NAME')
    config.taxonomies_directory = os.environ.get(
        'TAXONOMIES_TTL_PATH',
        os.path.join(pathlib.Path(__file__).parent.absolute(), "config", "taxonomies.ttl")
    )
    return config
```


### Taxonomies
Implement custom taxonomies graph class that extends [the `TaxonomiesGraph` class](src/fairspace_pipeline/graph/taxonomy_graph.py).
The class needs to:
- include all custom taxonomies as class attributes,
- define the TAXONOMY_PREFIX to be used in custom taxonomy terms URIs.

Sample implementation:
```python
from fairspace_pipeline.graph.taxonomy_graph import TaxonomiesGraph

TAXONOMY_PREFIX = "https://fairspace.com/custom_ontology#"  # Prefix for custom taxonomies

class CustomTaxonomiesGraph(TaxonomiesGraph):
    def __init__(self, taxonomies_dir):
        super().__init__(taxonomies_dir, TAXONOMY_PREFIX)
        self.countries = self.query_taxonomy(taxonomy_name='Country')
        self.file_types = self.query_taxonomy(taxonomy_name='FileType')
        self.species = self.query_taxonomy(taxonomy_name='Species')
        self.gender = self.query_taxonomy(taxonomy_name='Gender')
        self.study_status = self.query_taxonomy(taxonomy_name='StudyStatus')
        self.study_phases = self.query_taxonomy(taxonomy_name='StudyPhase')
        self.sample_treatments = self.query_taxonomy(taxonomy_name='Treatment')
```

### Mapping

Implement custom graph class that extends [the `FairspaceGraph` class](src/fairspace_pipeline/graph/fairspace_graph.py). 
It is a module for mapping data to RDF graphs specific to the custom data model, e.g.:

The class needs to implement `create_graph()` and `get_file_suffix_processing_order()` methods of the `FairspaceGraph` interface.

Sample implementation:
```python
from fairspace_pipeline.graph.fairspace_graph import FairspaceGraph

CUSTOM_NS = Namespace("https://custom.com/ontology#")

class CustomGraph(FairspaceGraph):
    def __init__(self, taxonomies_graph: CustomTaxonomiesGraph):
        super().__init__(taxonomies_graph)

    def map_study(self, study_dict: dict) -> Study:
        # Implement custom study processing logic
        pass

    def map_sample(self, sample_dict: dict) -> Sample:
        # Implement custom sample processing logic
        pass

    def map_subject(self, subject_dict: dict) -> Subject:
        # Implement custom subject processing logic
        pass

    def create_study_graph(self, data: dict, prefix: str) -> Graph:
        graph = self.create_new_graph()
        studies = self.map_studies(data, prefix)
        for study in studies:
            graph.add((study.uri, RDF.type, CUSTOM_NS.Study))
            if study.label is not None:
                graph.add((study.uri, RDFS.label, Literal(study.label)))
            if study.identifier is not None:
                graph.add((study.uri, DCTERMS.identifier, Literal(study.identifier)))
            ...
        return graph

    def create_sample_graph(self, data: dict, prefix: str) -> Graph:
        graph = self.create_new_graph()
        samples = self.map_sample(data, prefix)
        for sample in samples:
            ...
        return graph
            
    def create_subject_graph(self, data: dict, prefix: str) -> Graph:
        graph = self.create_new_graph()
        subjects = self.map_subject(data, prefix)
        for subject in subjects:
            ...
        return graph
            
    def create_graph(self, file_path: str, data: dict, prefix: str) -> Graph:
        if str(file_path).endswith(STUDY_MANIFEST_FILE_SUFFIX):
            return self.create_study_graph(data, prefix)
        if str(file_path).endswith(SAMPLE_INFORMATION_SUFFIX):
            return self.create_sample_graph(data, prefix)
        ...
        return self.create_subjects_data_files_graph(data, prefix)

    # Define the order of processing files by file suffix in the source study directory
    # e.g. ['_study.json', '_sample_information.json']
    def get_file_suffix_processing_order(self) -> list[str]:
        return [STUDY_MANIFEST_FILE_SUFFIX, SAMPLE_INFORMATION_SUFFIX]
```

#### Pipeline runner

Define main function to run the pipeline.
```python
from fairspace_pipeline.pipeline import FairspacePipeline, FairspacePipelineConfig

def main():
    load_dotenv()

    parser = argparse.ArgumentParser()
    parser.add_argument("-i", "--init", action="store_true")
    parser.add_argument("-p", "--process", action="store_true")
    parser.add_argument("-u", "--upload", action="store_true")
    parser.add_argument("-r", "--reindex", action="store_true")
    parser.add_argument("-d", "--delete", action="store_true")
    parser.add_argument("-c", "--compact", action="store_true")
    parser.add_argument("-ms", "--maintenance_status", action="store_true")
    args = parser.parse_args()

    config = get_config()

    taxonomies_graph = CustomTaxonomiesGraph(config.taxonomies_directory)
    graph = CustomGraph(config, taxonomies_graph)

    pipeline = FairspacePipeline(config, graph)
    pipeline.run(args.init, args.process, args.upload, args.delete, args.reindex, args.compact, args.maintenance_status)


if __name__ == '__main__':
    main()
```

#### Optional implementation

If a custom processing needs to be applied, implement a custom IO handler class that extends [the `IOHandler` class](src/fairspace_pipeline/io/io_handler.py),
following examples of [the `LocalIOHandler` class](src/fairspace_pipeline/io/local_io_handler.py) and [the `S3IOHandler` class](src/fairspace_pipeline/io/s3_io_handler.py).

## Running the complete pipeline

When all the components are implemented, the pipeline can be run with the following command:

```shell
python main.py --init --process --upload --compact
```
Include only the options that you need:

- `--init` or `-i` - prepare required roles for the configured user and upload taxonomies
- `--process` or `-p` - read the input source data, transform to Fairspace model and save to the configured output directory
- `--upload` or `-u` - upload the transform data into Fairspace
- `--reindex` or `-r` - reindex Fairspace views database
- `--compact` or `-c` - compact Fairspace Jena database to reduce the size
- `--maintenance_status` or `-ms` - get the maintenance status to see if reindexing or compacting is in progress

            

Raw data

            {
    "_id": null,
    "home_page": null,
    "name": "fairspace-pipeline",
    "maintainer": null,
    "docs_url": null,
    "requires_python": ">=3.12",
    "maintainer_email": null,
    "keywords": "fairspace, pipeline",
    "author": null,
    "author_email": "Ewelina Jozwik <ewelina@thehyve.nl>",
    "download_url": "https://files.pythonhosted.org/packages/79/3c/b7d190b30ab43cfa7fcba19a253ccd535fb35d774ed0e4457cc1bd251d6c/fairspace_pipeline-0.0.3.tar.gz",
    "platform": null,
    "description": "# Fairspace Pipeline\n\n[![PyPI](https://img.shields.io/pypi/v/fairspace_pipeline.svg)](https://pypi.org/project/fairspace_pipeline/)\n![PyPI - Status](https://img.shields.io/pypi/status/fairspace_pipeline.svg)\n[![Apache 2.0 license](https://img.shields.io/pypi/l/fairspace_pipeline.svg)](LICENSE)\n\nThis repository contains the Fairspace Pipeline, \na package that can be used as a base to create custom ETL pipelines for Fairspace.\n\n## Installation\n\nRequires Python 3.12 or newer.\n\n### Installing from PyPI\n\n```shell\npython -m pip install fairspace-pipeline\n```\n\n### Installing from sources\n\n```shell\ngit clone https://github.com/thehyve/fairspace-pipeline.git\npython -m build\npython -m pip install .\n```\n\n## Usage\n\nFairspace Pipeline cannot be used as a standalone package. It requires a custom implementation of several and specific configuration.\nSections below describe the necessary steps to create a fully-functioning custom pipeline with reusable components and interfaces implementation.\n\n### Configuration\nCreate a `.env` file with the following environment variables:\n\n```shell\n# Keycloak\nKEYCLOAK_CLIENT_ID=\"\" # e.g. KEYCLOAK_CLIENT_ID=\"workspace-client\"\nKEYCLOAK_CLIENT_SECRET=\"\" # e.g. KEYCLOAK_CLIENT_SECRET=\"********\"\nKEYCLOAK_USERNAME=\"\" # e.g. KEYCLOAK_USERNAME=\"organisation-admin\"\nKEYCLOAK_PASSWORD=\"\" # e.g. KEYCLOAK_PASSWORD=\"fairspace123\"\nKEYCLOAK_SERVER_URL=\"\" # e.g. KEYCLOAK_SERVER_URL=\"https://my-keycloak-instance.com\"\nKEYCLOAK_REALM=\"\" # e.g. KEYCLOAK_REALM=\"fairspace\"\n\n# Fairspace\nFAIRSPACE_URL=\"\" # e.g. FAIRSPACE_URL=\"https://my-fairspace-instance.com\"\n\n# Amazon S3\nIS_AWS_S3=False # e.g. IS_AWS_S3=True if using AWS S3 bucket for source data\nAWS_SOURCE_BUCKET_NAME=\"\" # e.g. AWS_SOURCE_BUCKET_NAME=\"fairspace-metadata-source\"\nAWS_OUTPUT_BUCKET_NAME=\"\" # e.g. AWS_OUTPUT_BUCKET_NAME=\"fairspace-metadata-output\"\n\n# Pipeline settings\nTAXONOMIES_TTL_PATH=\"config/custom_taxonomies.ttl\"\nSOURCE_STUDIES_PATHS='' # e.g. '[\"./test_data/test_study_input1\", \"./test_data/test_study_input2\"]'\nOUTPUT_DATA_PATH=\"\" # e.g. OUTPUT_DATA_PATH=\"./test_data/.output_data\"\n```\n\nNext, implement a function that creates a FairspacePipelineConfig object using environment variables\n\n```python\ndef get_config() -> FairspacePipelineConfig:\n    config = FairspacePipelineConfig()\n    config.source_study_prefixes = json.loads(str(os.environ.get('SOURCE_STUDIES_PREFIXES', '[\"\"]')))\n    config.output_data_directory = os.environ.get('OUTPUT_DATA_PATH',\n                                                  os.path.join(os.getcwd(), 'test_data/.output_data'))\n    config.is_aws_s3 = os.getenv(\"IS_AWS_S3\", 'False').lower() in ('true', '1', 't')\n    if config.is_aws_s3:\n        config.source_study_directories = json.loads(os.environ.get('SOURCE_STUDIES_PATHS'))\n    else:\n        config.source_study_directories = [os.path.join(os.getcwd(), val) for val in json.loads(\n            os.environ.get('SOURCE_STUDIES_PATHS', '[\"test_data/source_data\"]'))]\n    config.source_bucket_name = os.environ.get('AWS_SOURCE_BUCKET_NAME')\n    config.output_bucket_name = os.environ.get('AWS_OUTPUT_BUCKET_NAME')\n    config.taxonomies_directory = os.environ.get(\n        'TAXONOMIES_TTL_PATH',\n        os.path.join(pathlib.Path(__file__).parent.absolute(), \"config\", \"taxonomies.ttl\")\n    )\n    return config\n```\n\n\n### Taxonomies\nImplement custom taxonomies graph class that extends [the `TaxonomiesGraph` class](src/fairspace_pipeline/graph/taxonomy_graph.py).\nThe class needs to:\n- include all custom taxonomies as class attributes,\n- define the TAXONOMY_PREFIX to be used in custom taxonomy terms URIs.\n\nSample implementation:\n```python\nfrom fairspace_pipeline.graph.taxonomy_graph import TaxonomiesGraph\n\nTAXONOMY_PREFIX = \"https://fairspace.com/custom_ontology#\"  # Prefix for custom taxonomies\n\nclass CustomTaxonomiesGraph(TaxonomiesGraph):\n    def __init__(self, taxonomies_dir):\n        super().__init__(taxonomies_dir, TAXONOMY_PREFIX)\n        self.countries = self.query_taxonomy(taxonomy_name='Country')\n        self.file_types = self.query_taxonomy(taxonomy_name='FileType')\n        self.species = self.query_taxonomy(taxonomy_name='Species')\n        self.gender = self.query_taxonomy(taxonomy_name='Gender')\n        self.study_status = self.query_taxonomy(taxonomy_name='StudyStatus')\n        self.study_phases = self.query_taxonomy(taxonomy_name='StudyPhase')\n        self.sample_treatments = self.query_taxonomy(taxonomy_name='Treatment')\n```\n\n### Mapping\n\nImplement custom graph class that extends [the `FairspaceGraph` class](src/fairspace_pipeline/graph/fairspace_graph.py). \nIt is a module for mapping data to RDF graphs specific to the custom data model, e.g.:\n\nThe class needs to implement `create_graph()` and `get_file_suffix_processing_order()` methods of the `FairspaceGraph` interface.\n\nSample implementation:\n```python\nfrom fairspace_pipeline.graph.fairspace_graph import FairspaceGraph\n\nCUSTOM_NS = Namespace(\"https://custom.com/ontology#\")\n\nclass CustomGraph(FairspaceGraph):\n    def __init__(self, taxonomies_graph: CustomTaxonomiesGraph):\n        super().__init__(taxonomies_graph)\n\n    def map_study(self, study_dict: dict) -> Study:\n        # Implement custom study processing logic\n        pass\n\n    def map_sample(self, sample_dict: dict) -> Sample:\n        # Implement custom sample processing logic\n        pass\n\n    def map_subject(self, subject_dict: dict) -> Subject:\n        # Implement custom subject processing logic\n        pass\n\n    def create_study_graph(self, data: dict, prefix: str) -> Graph:\n        graph = self.create_new_graph()\n        studies = self.map_studies(data, prefix)\n        for study in studies:\n            graph.add((study.uri, RDF.type, CUSTOM_NS.Study))\n            if study.label is not None:\n                graph.add((study.uri, RDFS.label, Literal(study.label)))\n            if study.identifier is not None:\n                graph.add((study.uri, DCTERMS.identifier, Literal(study.identifier)))\n            ...\n        return graph\n\n    def create_sample_graph(self, data: dict, prefix: str) -> Graph:\n        graph = self.create_new_graph()\n        samples = self.map_sample(data, prefix)\n        for sample in samples:\n            ...\n        return graph\n            \n    def create_subject_graph(self, data: dict, prefix: str) -> Graph:\n        graph = self.create_new_graph()\n        subjects = self.map_subject(data, prefix)\n        for subject in subjects:\n            ...\n        return graph\n            \n    def create_graph(self, file_path: str, data: dict, prefix: str) -> Graph:\n        if str(file_path).endswith(STUDY_MANIFEST_FILE_SUFFIX):\n            return self.create_study_graph(data, prefix)\n        if str(file_path).endswith(SAMPLE_INFORMATION_SUFFIX):\n            return self.create_sample_graph(data, prefix)\n        ...\n        return self.create_subjects_data_files_graph(data, prefix)\n\n    # Define the order of processing files by file suffix in the source study directory\n    # e.g. ['_study.json', '_sample_information.json']\n    def get_file_suffix_processing_order(self) -> list[str]:\n        return [STUDY_MANIFEST_FILE_SUFFIX, SAMPLE_INFORMATION_SUFFIX]\n```\n\n#### Pipeline runner\n\nDefine main function to run the pipeline.\n```python\nfrom fairspace_pipeline.pipeline import FairspacePipeline, FairspacePipelineConfig\n\ndef main():\n    load_dotenv()\n\n    parser = argparse.ArgumentParser()\n    parser.add_argument(\"-i\", \"--init\", action=\"store_true\")\n    parser.add_argument(\"-p\", \"--process\", action=\"store_true\")\n    parser.add_argument(\"-u\", \"--upload\", action=\"store_true\")\n    parser.add_argument(\"-r\", \"--reindex\", action=\"store_true\")\n    parser.add_argument(\"-d\", \"--delete\", action=\"store_true\")\n    parser.add_argument(\"-c\", \"--compact\", action=\"store_true\")\n    parser.add_argument(\"-ms\", \"--maintenance_status\", action=\"store_true\")\n    args = parser.parse_args()\n\n    config = get_config()\n\n    taxonomies_graph = CustomTaxonomiesGraph(config.taxonomies_directory)\n    graph = CustomGraph(config, taxonomies_graph)\n\n    pipeline = FairspacePipeline(config, graph)\n    pipeline.run(args.init, args.process, args.upload, args.delete, args.reindex, args.compact, args.maintenance_status)\n\n\nif __name__ == '__main__':\n    main()\n```\n\n#### Optional implementation\n\nIf a custom processing needs to be applied, implement a custom IO handler class that extends [the `IOHandler` class](src/fairspace_pipeline/io/io_handler.py),\nfollowing examples of [the `LocalIOHandler` class](src/fairspace_pipeline/io/local_io_handler.py) and [the `S3IOHandler` class](src/fairspace_pipeline/io/s3_io_handler.py).\n\n## Running the complete pipeline\n\nWhen all the components are implemented, the pipeline can be run with the following command:\n\n```shell\npython main.py --init --process --upload --compact\n```\nInclude only the options that you need:\n\n- `--init` or `-i` - prepare required roles for the configured user and upload taxonomies\n- `--process` or `-p` - read the input source data, transform to Fairspace model and save to the configured output directory\n- `--upload` or `-u` - upload the transform data into Fairspace\n- `--reindex` or `-r` - reindex Fairspace views database\n- `--compact` or `-c` - compact Fairspace Jena database to reduce the size\n- `--maintenance_status` or `-ms` - get the maintenance status to see if reindexing or compacting is in progress\n",
    "bugtrack_url": null,
    "license": "Apache-2.0",
    "summary": "Building blocks for creating a pipeline for Fairspace",
    "version": "0.0.3",
    "project_urls": {
        "Homepage": "https://github.com/thehyve/fairspace-pipeline",
        "Repository": "https://github.com/thehyve/fairspace-pipeline.git"
    },
    "split_keywords": [
        "fairspace",
        " pipeline"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "7ef6d27da8fb288ae17388509547129a46eb5309d17406399b7d0006d7e2113c",
                "md5": "142791d443e50835fcbdc302639f168e",
                "sha256": "9ea91c11155460380d0184b3ded2044e96419ee35124d679eb8142c121a40e59"
            },
            "downloads": -1,
            "filename": "fairspace_pipeline-0.0.3-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "142791d443e50835fcbdc302639f168e",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.12",
            "size": 20489,
            "upload_time": "2024-09-10T14:26:52",
            "upload_time_iso_8601": "2024-09-10T14:26:52.389861Z",
            "url": "https://files.pythonhosted.org/packages/7e/f6/d27da8fb288ae17388509547129a46eb5309d17406399b7d0006d7e2113c/fairspace_pipeline-0.0.3-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "793cb7d190b30ab43cfa7fcba19a253ccd535fb35d774ed0e4457cc1bd251d6c",
                "md5": "65677811aea038d166998558a39281b8",
                "sha256": "93efe59cad4718153acd95844e80f91b664de10737e9e9047a06c9afc3b95219"
            },
            "downloads": -1,
            "filename": "fairspace_pipeline-0.0.3.tar.gz",
            "has_sig": false,
            "md5_digest": "65677811aea038d166998558a39281b8",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.12",
            "size": 19649,
            "upload_time": "2024-09-10T14:26:55",
            "upload_time_iso_8601": "2024-09-10T14:26:55.268852Z",
            "url": "https://files.pythonhosted.org/packages/79/3c/b7d190b30ab43cfa7fcba19a253ccd535fb35d774ed0e4457cc1bd251d6c/fairspace_pipeline-0.0.3.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2024-09-10 14:26:55",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "thehyve",
    "github_project": "fairspace-pipeline",
    "github_not_found": true,
    "lcname": "fairspace-pipeline"
}
        
Elapsed time: 0.35056s