pycurie


Namepycurie JSON
Version 0.1.16 PyPI version JSON
download
home_pagehttps://github.com/EricDiGi/CurieKetl
Summary
upload_time2023-10-17 16:56:22
maintainer
docs_urlNone
authorEric DiGioacchino
requires_python
license
keywords etl data data engineering data science mlops devops
VCS
bugtrack_url
requirements boto3 botocore Jinja2 mysql_connector_repackaged python-dotenv PyYAML redshift_connector setuptools
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # Welcome to Curie :test_tube:

Curie is a command line interface for ETL pipelines and local data workflows. It is designed to be simple, flexible, and extensible. It is written in Python and uses [JinjaSQL]() to write SQL queries.

## Table of Contents

1. [Getting Started](#getting-started-100)
    1. [Command Line Interface](#command-line-interface-110)
    2. [Configuration Files](#configuration-files-120)
    3. [Pipelining](#pipeline-blueprints-130)
    4. [Project Structure](#project-structure-140)

# Getting Started 1.0.0 

**Definitions:**
* **project:** A project is a collection of pipelines. It is the highest level of organization in Curie.
* **pipeline:** A pipeline is a collection of tables/nodes. It is the second level of organization in Curie. And it is the core of Curie.
* **tables and nodes:** Tables and nodes are the same thing. They are the smallest unit of organization in Curie. They can be thought of as a single actionable unit.
* **variants:** Variants are the different ways that a table can be run. They are defined in the pipeline definitions, only for Save operations. They are used to define the different ways that a table can be saved. For example, you may want to save a table as a CSV and as a JSON. You can define these as variants in the pipeline definition, and then call them when you run the pipeline. Variants also enable more complex save strategies like saving one csv for each categorical item in a field. Some use cases will be provided later.
* **modes:** Modes are the different ways that a table can be run. There are two actionable modes that should be used in pipeline definitions: __run__ and __save__. __run__ will execute the pipeline and __save__ will download the data specified in the pipeline. Each will allow you to specify a list of dependencies that will be executed before the table is run, and a query that will generate the data. Of course, __run__ will execute the query and affect the database, and __save__ will download the data generated by the query.

### Command Line Interface 1.1.0


1. **Initialize a new project** - Change your working directory to the location where you want to create your project. Then run the following command:

    ```bash
    curie init <project>
    ```

2. **Compile your project** - Compiling your project will generate the scripts that will be used to run your pipeline. By default these will be stored in `<root>/scripts/compiled/Unknown/` if no `compile_path` is defined in `project.yaml`. 

    Change your working directory to the location of your project. Then run either of the following commands:

    ```bash
    curie etl <mode> <pipeline> --compile
    ```
3. **Running your pipeline** - Running your pipeline will execute the scripts generated during compilation (all scripts will be recompiled with each run). This action affects your database. Common uses include: updating tables, building a new dataset, refreshing dependencies.

    Change your working directory to the location of your project. Then run either of the following commands:

    ```bash
    curie etl run <pipeline> [start] [--tables <t1 t2 t3 ... tn (.)> ][--connection <myDB-Conn-Name>][--override-name <var1 var2 var3 ... varn>][--override-values <vala valb valc ... valn>]
    ```
4. **Saving your pipeline** - Saving your pipeline will download selections of the tables specified in the command according to terms defined in your config file. By default these will be stored in `<root>/data/Unknown/` if not specified in the `project.yaml`. This action does not affect your database. Common uses include: downloading data for analysis, downloading data for sharing. **Variant executions are supported in this mode.**

    Change your working directory to the location of your project. Then run either of the following commands:

    ```bash
    curie etl save <pipeline> [start] [--tables <t1 t2 t3 ... tn (.)> ][--connection <myDB-Conn-Name>][--override-name <var1 var2 var3 ... varn>][--override-values <vala valb valc ... valn>]
    ```
5. **Cleaning your pipeline** - Cleaning your pipeline will remove all artifacts generated by the pipeline or project. This action does not affect your database. Common uses include: removing downloaded data, removing compiled scripts.

    Change your working directory to the location of your project. Then run the following command:
    ```bash
    curie etl clean <pipeline (.)>
    ```

    Alternatively, you can avail yourself to the included Makefile which supplies a number of commands to selectively remove artificts.

6. **Automated Documentation** - Curie is self-documenting, with plenty of options to add more insight. To generate documentation for your project, run the following command:

    Change your working directory to the location of your project. Then run the following command:
    ```bash
    curie docs generate
    ```
    And to view the documentation, run the following command:
    ```bash
    curie docs serve
    ```
    This will launch a local server that will allow you to view your documentation in your browser.

### Configuration Files 1.2.0

Curie requires three configuration files to be operational; `project.yaml` in the root directory, `connections.yaml` in the `config/` directory, and a configuration file for each pipeline in the `pipelines/` directory.

#### Project Configuration 1.2.1

The root `project.yaml` file is the core of the project. It defines the project name, the project description, the project version, and the project author. It also defines the default connection to be used for all pipelines. `Project` is the root-most key and all definitions will be defined inside this key. The following is an example of a `project.yaml` file:

```yaml
Project:
  Documentation:
    site: # Variables to modify how the generated documentation looks
      name: My Curie project
      description: This is a description of my project
      primary_color: blue
    Connections: ./config/connections.yaml # Path to the connections file
    Pipelines:
      # The following is a list of pipeline definitions.
      - name: PipelineX
        pipeline: ./pipelines/PipelineX.yaml # Path to the pipeline definition
        compile_path: ./scripts/compiled/PipelineX # Path to the compiled scripts
        download: ./data/PipelineX/raw # Path to the saved data
        connection: my-db-conn # Connection to use for this pipeline
        meta: # ALL META IS OPTIONAL
          description: This is a description of the pipeline
          created_at: 2023-09-29
          authors:
            - name: Eric DiGioacchino
              email: test@email.com
          tags:
            - tag1
            - tag2              

```

#### Connections Configuration 1.2.2

The `connections.yaml` file defines the connections that will be used by the pipelines. It is a list of connections, each with a unique name. Secrets can be integrated into the connections file using Jinja and specific YAML, the following example demonstrates this as well as the default configuration method.

```yaml

Redshift:
  secret-redshift:
    secrets:
      env_file:
        path: ./config/.env
    host: '{{REDSHIFT_HOST}}'
    port: '{{REDSHIFT_PORT}}'
    database: private
    user: '{{REDSHIFT_USER}}'
    password: '{{REDSHIFT_PASSWORD}}'
    reminder: 'This is a reminder to turn on your VPN' # This is a comment that will appear if any connection issues occur.

MySQL:
  default-mysql:
    host: localhost
    port: 3306
    database: public
    user: root
    password: password

```

#### Pipeline Defintions 1.3.0

Pipeline definitions are the core of Curie. They define the tables that will be run, the queries that will be executed, and the dependencies that will be requierd. They are written in YAML and are stored in the `pipelines/` directory. The following is an example of a pipeline definition:

```yaml
arguments: # Arguments are optional and are used to pass variables to the pipeline.
  limit: 100
  stateAbbrev: CA

etl: # The core definition
  
  facilities:
    run:
      script: ./scripts/pipeline1/facilities.sql
      method: seed
    save:
      store_results: true # This indicates to Curie that the results of this query should be used for context in other queries.
      query: SELECT id,name FROM facilities WHERE stateAbbrev = '{{stateAbbrev}}' LIMIT {{limit}}
      outputs: # Passes fields out of this node as parameters that can be called later.
        - id
    meta:
      description: This is a description of the table
      fields:
        - name: id
          description: This is a description of the field
          type: integer
          nullable: false
          primary_key: true
        - name: name
          description: This is a description of the field
          type: string
          nullable: false
  # An employees table that is seeded from a query.
  # And generates a csv for the employees at each facility for analysis.
  employees:
    run:
      script: ./scripts/pipeline1/employees.sql
      method: seed
    save:
        query: SELECT id,name,tenure,salary FROM employees WHERE facility_id = {{facility_id}}
        depends_on:
          - facilities
        variants:
          - name: "facility-{{id}}" # The prefix of the file name that will be saved.
            filetype: csv # The filetype of the file to be saved
            iterate_on:
                id: "{{facilities.id}}" # The field to iterate on from any previous nodes. (Must be a dependency)
            arguments:
                facility_id: "{{id}}" # The argument to pass to the query.

```




* **Modal Elements:** There are two modes supported: `run` and `save`. `run` will execute the pipeline and `save` will download the data specified in the pipeline. Each will allow you to specify a list of dependencies that will be executed before the pipeline is run, and a query that will generate the data. Of course, `run` will execute the query and affect the database, and `save` will download the data generated by the query.
    * **query:** The query is a string that will be executed by the database. It is the core of the pipeline. These should be written in Jinja SQL.

    * **script:** Scripts are Jinja SQL files that will be compiled then run to execute the pipeline. Store them where you prefer, but please reserve the `scripts/compiled/<pipeline>` directory for compiled JinjaSQL scripts.

    > **script** and **query** are mutually exclusive. If both are specified, **script** will be used.

    * **depends_on:** Defines a list of tables that must be formed before the current table is run. This allows the program to form a dependency graph and execute the tables in the correct order. This matters in procedural ETLs and in forming good data models.

    > ### Run Mode Only
    >    * **method:** Defines the manner in which a table is affected: `replace`, `truncate`, `merge`, `append`,`seed`. `replace` will drop the table and replace it with the new data. `truncate` will delete all rows from the table and insert the new data. `merge` will update the table with the new data using an identifier. `append` will insert the new data into the table. `seed` will not wrap the query in any additional logic. It will simply execute the query and insert the data into the table. This is useful for creating tables that will be used as dependencies for other tables.

### Project Structure 1.4.0

```
<PROJECT ROOT>
    |
    ├── pipelines
    │   └── pipeline1.yaml
    |   └── ...
    |
    ├── config
    │   ├── connections.yaml
    │   └── .env
    ├── data
    │   └── pipeline1
    │       └── table1.csv
    ├── scripts
    │   ├── compiled
    │   │   └── pipeline1
    │   │       └── table1.sql (compiled)
    |   |
    │   └── pipeline1
    │       └── table1.sql (jinjasql)
    ├── docs
    │   ├── stylesheets (default for curie's mkdocs)
    │   ├── pipes (generated by curie)
    │   └── index.md
    |   └── project.md
    |
    └── README-curie.md

```

# Deployment 2.0.0

Curie was build with cross-platform deployment in mind. Rather than requiring any special infrastructure, Curie is designed to be deployed on any machine that can run Python. This includes Windows, Mac, and Linux machines. More specifically Curie can be deployed to cloud infrastructure through github actions.

## Github Actions 2.1.0

### Submit Code to a Storage Bucket 2.1.1

#### AWS S3
    
    ```yaml
    - name: Deploy ETL Code to S3
      run: |
        cd CurieProject

        pip install -r requirements.txt

        curie etl run "My Pipeline" --compile

        if [ ! -d "scripts/compiled/My Pipeline" ]; then
          echo "Compiled scripts not found"
          exit 1
        fi

        aws s3 sync scripts/compiled/My Pipeline s3://my-bucket/My Pipeline --delete
    ```

    Then you can also include a step to deploy Cloudformation for a Glue Job to run the ETL on a schedule.

## AWS Deployment 2.2.0
### AWS Glue 2.2.1

The following code uses 4 parameters to run a Curie pipeline. This Job can be modified to run on a schedule by adding triggers. The parameters are as follows:
- s3-bucket: The bucket where the pipeline scripts are stored
- pipeline-keys: A list of keys to the scripts to be executed (the paths within the bucket)
- connection-names: The name of the glue connection to use (optional will default to the connection specified on the Job settings)
- database: The name of the database to use (optional)

The script resolves all database connections available to Glue in your cloud environment, and makes them accessible by name. This allows you to use the same script for multiple databases without having to modify the script. The script also resolves secrets from AWS Secrets Manager. This allows you to store your database credentials in Secrets Manager and reference them in your Glue connection. This is a more secure way to store your credentials than storing them in the Glue connection itself.

1. Create a new Glue Job with the code below.
2. Add a trigger with a schedule to run the job on a schedule.
3. Add the parameters to the trigger.
4. Let it run!

```python
import os
import sys
import boto3
import logging as log
import json
import redshift_connector
import pandas as pd
from awsglue.utils import getResolvedOptions

region = os.environ['AWS_REGION']
client = boto3.client('glue', region_name=region)
secrets_client = boto3.client('secretsmanager', region_name=region)
s3_client = boto3.client('s3', region_name=region)

def prepare_connection_properties(client, connection_name, database=None):
    def get_connection(name , connections):
        for connection in connections:
            if connection['Name'] == name:
                return connection
        raise Exception(f'Connection {name} not found')
    def derive_secrets(connection_properties):
        props = {}
        if 'SECRET_ID' in connection_properties:
            secret = secrets_client.get_secret_value(SecretId=connection_properties['SECRET_ID'])
            secret = json.loads(secret['SecretString'])
            props['user'] = secret['username']
            props['password'] = secret['password']
        if 'PASSWORD' in connection_properties:
            props['password'] = connection_properties['PASSWORD']
        if 'USERNAME' in connection_properties:
            props['user'] = connection_properties['USERNAME']
        if 'user' not in props or 'password' not in props:
            raise Exception(f'Could not derive username and password from connection properties for {connection_name}')
        return props
    response = client.get_connections()
    connections = response['ConnectionList']

    props = {}
    connection_properties = get_connection(connection_name, connections)['ConnectionProperties']
    if 'JDBC_CONNECTION_URL' in connection_properties:
        props['host'] = connection_properties['JDBC_CONNECTION_URL'].split('/')[2].split(':')[0]
        props['port'] = int(connection_properties['JDBC_CONNECTION_URL'].split('/')[2].split(':')[1])
        props['database'] = connection_properties['JDBC_CONNECTION_URL'].split('/')[3]
    if 'HOST' in connection_properties:
        props['host'] = connection_properties['HOST']
    if 'PORT' in connection_properties:
        props['port'] = int(connection_properties['PORT'])
    if 'DATABASE' in connection_properties:
        props['database'] = connection_properties['DATABASE']

    props.update( derive_secrets(connection_properties) )
    
    if database is not None:
        props['database'] = database
    return props

# Load Args from Glue Job using sys.argv
# s3_bucket: s3 bucket name
# pipeline_keys: list of files to be processed
# connection_names: glue connection name
# database: database name
default_arg_keys = ['s3-bucket', 'pipeline-keys', 'connection-names']
if '--connection' in sys.argv:
    default_arg_keys.append('connection')
if '--database' in sys.argv:
    default_arg_keys.append('database')
args = getResolvedOptions(sys.argv, default_arg_keys)

# Database connection properties
conn_name = args['connection_names'] if '--connection' not in sys.argv else args['connection']
database = args['database'] if '--database' in sys.argv else None
props = prepare_connection_properties(client, conn_name, database)
print(f"Connecting to {props['host']}:{props['port']}:{props['database']} as {props['user']}")

def execute_query(query):
    conn = redshift_connector.connect(**props)
    cursor = conn.cursor()
    steps = query.split(';')
    for step in steps:
        if len(step) > 0:
            cursor.execute(step)
    if cursor.description is not None:
        print(cursor.description)
        df = pd.DataFrame(cursor.fetchall(), columns=[desc[0] for desc in cursor.description])
        print(df)
    conn.commit()
    conn.close()

def validate_file_exists(s3_client, bucket, key):
    try:
        s3_client.head_object(Bucket=bucket, Key=key)
    except Exception as e:
        raise Exception(f'File {key} does not exist in {bucket}')

def s3_read(client, bucket, key):
    obj = s3_client.get_object(Bucket=bucket, Key=key)
    return obj['Body'].read().decode('utf-8')

print(f"Beginning execution of scripts from {args['s3_bucket']} on {args['connection_names']}")
for key in json.loads(args['pipeline_keys']):
    validate_file_exists(s3_client, args['s3_bucket'], key)
    print(f"Reading file {key}")
    script = s3_read(s3_client, args['s3_bucket'], key)
    try:
        print(f"Executing script {key}")
        execute_query(script)
    except Exception as e:
        print(f'Error executing script {key}: {e}')
        raise Exception(f'Error executing script {key}: {e}')
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/EricDiGi/CurieKetl",
    "name": "pycurie",
    "maintainer": "",
    "docs_url": null,
    "requires_python": "",
    "maintainer_email": "",
    "keywords": "etl,data,data engineering,data science,mlops,devops",
    "author": "Eric DiGioacchino",
    "author_email": "",
    "download_url": "https://files.pythonhosted.org/packages/36/8a/3b4cc292ea142382d050a9c51f81115f41e5311eaa51fff6b2e3efce27fb/pycurie-0.1.16.tar.gz",
    "platform": null,
    "description": "# Welcome to Curie :test_tube:\n\nCurie is a command line interface for ETL pipelines and local data workflows. It is designed to be simple, flexible, and extensible. It is written in Python and uses [JinjaSQL]() to write SQL queries.\n\n## Table of Contents\n\n1. [Getting Started](#getting-started-100)\n    1. [Command Line Interface](#command-line-interface-110)\n    2. [Configuration Files](#configuration-files-120)\n    3. [Pipelining](#pipeline-blueprints-130)\n    4. [Project Structure](#project-structure-140)\n\n# Getting Started 1.0.0 \n\n**Definitions:**\n* **project:** A project is a collection of pipelines. It is the highest level of organization in Curie.\n* **pipeline:** A pipeline is a collection of tables/nodes. It is the second level of organization in Curie. And it is the core of Curie.\n* **tables and nodes:** Tables and nodes are the same thing. They are the smallest unit of organization in Curie. They can be thought of as a single actionable unit.\n* **variants:** Variants are the different ways that a table can be run. They are defined in the pipeline definitions, only for Save operations. They are used to define the different ways that a table can be saved. For example, you may want to save a table as a CSV and as a JSON. You can define these as variants in the pipeline definition, and then call them when you run the pipeline. Variants also enable more complex save strategies like saving one csv for each categorical item in a field. Some use cases will be provided later.\n* **modes:** Modes are the different ways that a table can be run. There are two actionable modes that should be used in pipeline definitions: __run__ and __save__. __run__ will execute the pipeline and __save__ will download the data specified in the pipeline. Each will allow you to specify a list of dependencies that will be executed before the table is run, and a query that will generate the data. Of course, __run__ will execute the query and affect the database, and __save__ will download the data generated by the query.\n\n### Command Line Interface 1.1.0\n\n\n1. **Initialize a new project** - Change your working directory to the location where you want to create your project. Then run the following command:\n\n    ```bash\n    curie init <project>\n    ```\n\n2. **Compile your project** - Compiling your project will generate the scripts that will be used to run your pipeline. By default these will be stored in `<root>/scripts/compiled/Unknown/` if no `compile_path` is defined in `project.yaml`. \n\n    Change your working directory to the location of your project. Then run either of the following commands:\n\n    ```bash\n    curie etl <mode> <pipeline> --compile\n    ```\n3. **Running your pipeline** - Running your pipeline will execute the scripts generated during compilation (all scripts will be recompiled with each run). This action affects your database. Common uses include: updating tables, building a new dataset, refreshing dependencies.\n\n    Change your working directory to the location of your project. Then run either of the following commands:\n\n    ```bash\n    curie etl run <pipeline> [start] [--tables <t1 t2 t3 ... tn (.)> ][--connection <myDB-Conn-Name>][--override-name <var1 var2 var3 ... varn>][--override-values <vala valb valc ... valn>]\n    ```\n4. **Saving your pipeline** - Saving your pipeline will download selections of the tables specified in the command according to terms defined in your config file. By default these will be stored in `<root>/data/Unknown/` if not specified in the `project.yaml`. This action does not affect your database. Common uses include: downloading data for analysis, downloading data for sharing. **Variant executions are supported in this mode.**\n\n    Change your working directory to the location of your project. Then run either of the following commands:\n\n    ```bash\n    curie etl save <pipeline> [start] [--tables <t1 t2 t3 ... tn (.)> ][--connection <myDB-Conn-Name>][--override-name <var1 var2 var3 ... varn>][--override-values <vala valb valc ... valn>]\n    ```\n5. **Cleaning your pipeline** - Cleaning your pipeline will remove all artifacts generated by the pipeline or project. This action does not affect your database. Common uses include: removing downloaded data, removing compiled scripts.\n\n    Change your working directory to the location of your project. Then run the following command:\n    ```bash\n    curie etl clean <pipeline (.)>\n    ```\n\n    Alternatively, you can avail yourself to the included Makefile which supplies a number of commands to selectively remove artificts.\n\n6. **Automated Documentation** - Curie is self-documenting, with plenty of options to add more insight. To generate documentation for your project, run the following command:\n\n    Change your working directory to the location of your project. Then run the following command:\n    ```bash\n    curie docs generate\n    ```\n    And to view the documentation, run the following command:\n    ```bash\n    curie docs serve\n    ```\n    This will launch a local server that will allow you to view your documentation in your browser.\n\n### Configuration Files 1.2.0\n\nCurie requires three configuration files to be operational; `project.yaml` in the root directory, `connections.yaml` in the `config/` directory, and a configuration file for each pipeline in the `pipelines/` directory.\n\n#### Project Configuration 1.2.1\n\nThe root `project.yaml` file is the core of the project. It defines the project name, the project description, the project version, and the project author. It also defines the default connection to be used for all pipelines. `Project` is the root-most key and all definitions will be defined inside this key. The following is an example of a `project.yaml` file:\n\n```yaml\nProject:\n  Documentation:\n    site: # Variables to modify how the generated documentation looks\n      name: My Curie project\n      description: This is a description of my project\n      primary_color: blue\n    Connections: ./config/connections.yaml # Path to the connections file\n    Pipelines:\n      # The following is a list of pipeline definitions.\n      - name: PipelineX\n        pipeline: ./pipelines/PipelineX.yaml # Path to the pipeline definition\n        compile_path: ./scripts/compiled/PipelineX # Path to the compiled scripts\n        download: ./data/PipelineX/raw # Path to the saved data\n        connection: my-db-conn # Connection to use for this pipeline\n        meta: # ALL META IS OPTIONAL\n          description: This is a description of the pipeline\n          created_at: 2023-09-29\n          authors:\n            - name: Eric DiGioacchino\n              email: test@email.com\n          tags:\n            - tag1\n            - tag2              \n\n```\n\n#### Connections Configuration 1.2.2\n\nThe `connections.yaml` file defines the connections that will be used by the pipelines. It is a list of connections, each with a unique name. Secrets can be integrated into the connections file using Jinja and specific YAML, the following example demonstrates this as well as the default configuration method.\n\n```yaml\n\nRedshift:\n  secret-redshift:\n    secrets:\n      env_file:\n        path: ./config/.env\n    host: '{{REDSHIFT_HOST}}'\n    port: '{{REDSHIFT_PORT}}'\n    database: private\n    user: '{{REDSHIFT_USER}}'\n    password: '{{REDSHIFT_PASSWORD}}'\n    reminder: 'This is a reminder to turn on your VPN' # This is a comment that will appear if any connection issues occur.\n\nMySQL:\n  default-mysql:\n    host: localhost\n    port: 3306\n    database: public\n    user: root\n    password: password\n\n```\n\n#### Pipeline Defintions 1.3.0\n\nPipeline definitions are the core of Curie. They define the tables that will be run, the queries that will be executed, and the dependencies that will be requierd. They are written in YAML and are stored in the `pipelines/` directory. The following is an example of a pipeline definition:\n\n```yaml\narguments: # Arguments are optional and are used to pass variables to the pipeline.\n  limit: 100\n  stateAbbrev: CA\n\netl: # The core definition\n  \n  facilities:\n    run:\n      script: ./scripts/pipeline1/facilities.sql\n      method: seed\n    save:\n      store_results: true # This indicates to Curie that the results of this query should be used for context in other queries.\n      query: SELECT id,name FROM facilities WHERE stateAbbrev = '{{stateAbbrev}}' LIMIT {{limit}}\n      outputs: # Passes fields out of this node as parameters that can be called later.\n        - id\n    meta:\n      description: This is a description of the table\n      fields:\n        - name: id\n          description: This is a description of the field\n          type: integer\n          nullable: false\n          primary_key: true\n        - name: name\n          description: This is a description of the field\n          type: string\n          nullable: false\n  # An employees table that is seeded from a query.\n  # And generates a csv for the employees at each facility for analysis.\n  employees:\n    run:\n      script: ./scripts/pipeline1/employees.sql\n      method: seed\n    save:\n        query: SELECT id,name,tenure,salary FROM employees WHERE facility_id = {{facility_id}}\n        depends_on:\n          - facilities\n        variants:\n          - name: \"facility-{{id}}\" # The prefix of the file name that will be saved.\n            filetype: csv # The filetype of the file to be saved\n            iterate_on:\n                id: \"{{facilities.id}}\" # The field to iterate on from any previous nodes. (Must be a dependency)\n            arguments:\n                facility_id: \"{{id}}\" # The argument to pass to the query.\n\n```\n\n\n\n\n* **Modal Elements:** There are two modes supported: `run` and `save`. `run` will execute the pipeline and `save` will download the data specified in the pipeline. Each will allow you to specify a list of dependencies that will be executed before the pipeline is run, and a query that will generate the data. Of course, `run` will execute the query and affect the database, and `save` will download the data generated by the query.\n    * **query:** The query is a string that will be executed by the database. It is the core of the pipeline. These should be written in Jinja SQL.\n\n    * **script:** Scripts are Jinja SQL files that will be compiled then run to execute the pipeline. Store them where you prefer, but please reserve the `scripts/compiled/<pipeline>` directory for compiled JinjaSQL scripts.\n\n    > **script** and **query** are mutually exclusive. If both are specified, **script** will be used.\n\n    * **depends_on:** Defines a list of tables that must be formed before the current table is run. This allows the program to form a dependency graph and execute the tables in the correct order. This matters in procedural ETLs and in forming good data models.\n\n    > ### Run Mode Only\n    >    * **method:** Defines the manner in which a table is affected: `replace`, `truncate`, `merge`, `append`,`seed`. `replace` will drop the table and replace it with the new data. `truncate` will delete all rows from the table and insert the new data. `merge` will update the table with the new data using an identifier. `append` will insert the new data into the table. `seed` will not wrap the query in any additional logic. It will simply execute the query and insert the data into the table. This is useful for creating tables that will be used as dependencies for other tables.\n\n### Project Structure 1.4.0\n\n```\n<PROJECT ROOT>\n    |\n    \u251c\u2500\u2500 pipelines\n    \u2502   \u2514\u2500\u2500 pipeline1.yaml\n    |   \u2514\u2500\u2500 ...\n    |\n    \u251c\u2500\u2500 config\n    \u2502   \u251c\u2500\u2500 connections.yaml\n    \u2502   \u2514\u2500\u2500 .env\n    \u251c\u2500\u2500 data\n    \u2502   \u2514\u2500\u2500 pipeline1\n    \u2502       \u2514\u2500\u2500 table1.csv\n    \u251c\u2500\u2500 scripts\n    \u2502   \u251c\u2500\u2500 compiled\n    \u2502   \u2502   \u2514\u2500\u2500 pipeline1\n    \u2502   \u2502       \u2514\u2500\u2500 table1.sql (compiled)\n    |   |\n    \u2502   \u2514\u2500\u2500 pipeline1\n    \u2502       \u2514\u2500\u2500 table1.sql (jinjasql)\n    \u251c\u2500\u2500 docs\n    \u2502   \u251c\u2500\u2500 stylesheets (default for curie's mkdocs)\n    \u2502   \u251c\u2500\u2500 pipes (generated by curie)\n    \u2502   \u2514\u2500\u2500 index.md\n    |   \u2514\u2500\u2500 project.md\n    |\n    \u2514\u2500\u2500 README-curie.md\n\n```\n\n# Deployment 2.0.0\n\nCurie was build with cross-platform deployment in mind. Rather than requiring any special infrastructure, Curie is designed to be deployed on any machine that can run Python. This includes Windows, Mac, and Linux machines. More specifically Curie can be deployed to cloud infrastructure through github actions.\n\n## Github Actions 2.1.0\n\n### Submit Code to a Storage Bucket 2.1.1\n\n#### AWS S3\n    \n    ```yaml\n    - name: Deploy ETL Code to S3\n      run: |\n        cd CurieProject\n\n        pip install -r requirements.txt\n\n        curie etl run \"My Pipeline\" --compile\n\n        if [ ! -d \"scripts/compiled/My Pipeline\" ]; then\n          echo \"Compiled scripts not found\"\n          exit 1\n        fi\n\n        aws s3 sync scripts/compiled/My Pipeline s3://my-bucket/My Pipeline --delete\n    ```\n\n    Then you can also include a step to deploy Cloudformation for a Glue Job to run the ETL on a schedule.\n\n## AWS Deployment 2.2.0\n### AWS Glue 2.2.1\n\nThe following code uses 4 parameters to run a Curie pipeline. This Job can be modified to run on a schedule by adding triggers. The parameters are as follows:\n- s3-bucket: The bucket where the pipeline scripts are stored\n- pipeline-keys: A list of keys to the scripts to be executed (the paths within the bucket)\n- connection-names: The name of the glue connection to use (optional will default to the connection specified on the Job settings)\n- database: The name of the database to use (optional)\n\nThe script resolves all database connections available to Glue in your cloud environment, and makes them accessible by name. This allows you to use the same script for multiple databases without having to modify the script. The script also resolves secrets from AWS Secrets Manager. This allows you to store your database credentials in Secrets Manager and reference them in your Glue connection. This is a more secure way to store your credentials than storing them in the Glue connection itself.\n\n1. Create a new Glue Job with the code below.\n2. Add a trigger with a schedule to run the job on a schedule.\n3. Add the parameters to the trigger.\n4. Let it run!\n\n```python\nimport os\nimport sys\nimport boto3\nimport logging as log\nimport json\nimport redshift_connector\nimport pandas as pd\nfrom awsglue.utils import getResolvedOptions\n\nregion = os.environ['AWS_REGION']\nclient = boto3.client('glue', region_name=region)\nsecrets_client = boto3.client('secretsmanager', region_name=region)\ns3_client = boto3.client('s3', region_name=region)\n\ndef prepare_connection_properties(client, connection_name, database=None):\n    def get_connection(name , connections):\n        for connection in connections:\n            if connection['Name'] == name:\n                return connection\n        raise Exception(f'Connection {name} not found')\n    def derive_secrets(connection_properties):\n        props = {}\n        if 'SECRET_ID' in connection_properties:\n            secret = secrets_client.get_secret_value(SecretId=connection_properties['SECRET_ID'])\n            secret = json.loads(secret['SecretString'])\n            props['user'] = secret['username']\n            props['password'] = secret['password']\n        if 'PASSWORD' in connection_properties:\n            props['password'] = connection_properties['PASSWORD']\n        if 'USERNAME' in connection_properties:\n            props['user'] = connection_properties['USERNAME']\n        if 'user' not in props or 'password' not in props:\n            raise Exception(f'Could not derive username and password from connection properties for {connection_name}')\n        return props\n    response = client.get_connections()\n    connections = response['ConnectionList']\n\n    props = {}\n    connection_properties = get_connection(connection_name, connections)['ConnectionProperties']\n    if 'JDBC_CONNECTION_URL' in connection_properties:\n        props['host'] = connection_properties['JDBC_CONNECTION_URL'].split('/')[2].split(':')[0]\n        props['port'] = int(connection_properties['JDBC_CONNECTION_URL'].split('/')[2].split(':')[1])\n        props['database'] = connection_properties['JDBC_CONNECTION_URL'].split('/')[3]\n    if 'HOST' in connection_properties:\n        props['host'] = connection_properties['HOST']\n    if 'PORT' in connection_properties:\n        props['port'] = int(connection_properties['PORT'])\n    if 'DATABASE' in connection_properties:\n        props['database'] = connection_properties['DATABASE']\n\n    props.update( derive_secrets(connection_properties) )\n    \n    if database is not None:\n        props['database'] = database\n    return props\n\n# Load Args from Glue Job using sys.argv\n# s3_bucket: s3 bucket name\n# pipeline_keys: list of files to be processed\n# connection_names: glue connection name\n# database: database name\ndefault_arg_keys = ['s3-bucket', 'pipeline-keys', 'connection-names']\nif '--connection' in sys.argv:\n    default_arg_keys.append('connection')\nif '--database' in sys.argv:\n    default_arg_keys.append('database')\nargs = getResolvedOptions(sys.argv, default_arg_keys)\n\n# Database connection properties\nconn_name = args['connection_names'] if '--connection' not in sys.argv else args['connection']\ndatabase = args['database'] if '--database' in sys.argv else None\nprops = prepare_connection_properties(client, conn_name, database)\nprint(f\"Connecting to {props['host']}:{props['port']}:{props['database']} as {props['user']}\")\n\ndef execute_query(query):\n    conn = redshift_connector.connect(**props)\n    cursor = conn.cursor()\n    steps = query.split(';')\n    for step in steps:\n        if len(step) > 0:\n            cursor.execute(step)\n    if cursor.description is not None:\n        print(cursor.description)\n        df = pd.DataFrame(cursor.fetchall(), columns=[desc[0] for desc in cursor.description])\n        print(df)\n    conn.commit()\n    conn.close()\n\ndef validate_file_exists(s3_client, bucket, key):\n    try:\n        s3_client.head_object(Bucket=bucket, Key=key)\n    except Exception as e:\n        raise Exception(f'File {key} does not exist in {bucket}')\n\ndef s3_read(client, bucket, key):\n    obj = s3_client.get_object(Bucket=bucket, Key=key)\n    return obj['Body'].read().decode('utf-8')\n\nprint(f\"Beginning execution of scripts from {args['s3_bucket']} on {args['connection_names']}\")\nfor key in json.loads(args['pipeline_keys']):\n    validate_file_exists(s3_client, args['s3_bucket'], key)\n    print(f\"Reading file {key}\")\n    script = s3_read(s3_client, args['s3_bucket'], key)\n    try:\n        print(f\"Executing script {key}\")\n        execute_query(script)\n    except Exception as e:\n        print(f'Error executing script {key}: {e}')\n        raise Exception(f'Error executing script {key}: {e}')\n```\n",
    "bugtrack_url": null,
    "license": "",
    "summary": "",
    "version": "0.1.16",
    "project_urls": {
        "Homepage": "https://github.com/EricDiGi/CurieKetl"
    },
    "split_keywords": [
        "etl",
        "data",
        "data engineering",
        "data science",
        "mlops",
        "devops"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "8c861f086114f36586c817579017cab718350db6532a5f8265af1ddb6d8c364e",
                "md5": "50855a54c628f6c2028f91f8a41ba757",
                "sha256": "163664710faa5871dd4f38da638326b8552d3c4e417396e27ffe69a2174e2032"
            },
            "downloads": -1,
            "filename": "pycurie-0.1.16-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "50855a54c628f6c2028f91f8a41ba757",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": null,
            "size": 45003,
            "upload_time": "2023-10-17T16:56:20",
            "upload_time_iso_8601": "2023-10-17T16:56:20.958319Z",
            "url": "https://files.pythonhosted.org/packages/8c/86/1f086114f36586c817579017cab718350db6532a5f8265af1ddb6d8c364e/pycurie-0.1.16-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "368a3b4cc292ea142382d050a9c51f81115f41e5311eaa51fff6b2e3efce27fb",
                "md5": "bd691b044408db3dfd77e659355ace62",
                "sha256": "eeabe51ecd64adc521efa9b7dfb2a5399dc56b4271e4e6030f0d012048d73bde"
            },
            "downloads": -1,
            "filename": "pycurie-0.1.16.tar.gz",
            "has_sig": false,
            "md5_digest": "bd691b044408db3dfd77e659355ace62",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 44616,
            "upload_time": "2023-10-17T16:56:22",
            "upload_time_iso_8601": "2023-10-17T16:56:22.604226Z",
            "url": "https://files.pythonhosted.org/packages/36/8a/3b4cc292ea142382d050a9c51f81115f41e5311eaa51fff6b2e3efce27fb/pycurie-0.1.16.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-10-17 16:56:22",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "EricDiGi",
    "github_project": "CurieKetl",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "requirements": [
        {
            "name": "boto3",
            "specs": [
                [
                    "==",
                    "1.26.115"
                ]
            ]
        },
        {
            "name": "botocore",
            "specs": [
                [
                    "==",
                    "1.29.130"
                ]
            ]
        },
        {
            "name": "Jinja2",
            "specs": [
                [
                    "==",
                    "3.1.2"
                ]
            ]
        },
        {
            "name": "mysql_connector_repackaged",
            "specs": [
                [
                    "==",
                    "0.3.1"
                ]
            ]
        },
        {
            "name": "python-dotenv",
            "specs": [
                [
                    "==",
                    "1.0.0"
                ]
            ]
        },
        {
            "name": "PyYAML",
            "specs": [
                [
                    "==",
                    "6.0.1"
                ]
            ]
        },
        {
            "name": "redshift_connector",
            "specs": [
                [
                    "==",
                    "2.0.910"
                ]
            ]
        },
        {
            "name": "setuptools",
            "specs": [
                [
                    "==",
                    "68.0.0"
                ]
            ]
        }
    ],
    "lcname": "pycurie"
}
        
Elapsed time: 0.12344s