sling


Namesling JSON
Version 1.4.15 PyPI version JSON
download
home_pagehttps://github.com/slingdata-io/sling-python
SummarySlings data from a source to a target
upload_time2025-07-26 00:36:30
maintainerNone
docs_urlNone
authorSling Data
requires_pythonNone
licenseNone
keywords sling etl elt extract load
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            <p align="center"><img src="https://github.com/slingdata-io/sling-python/raw/main/logo-with-text.png" alt="logo" width="250"/></p>

<p align="center">Slings from a data source to a data target.</p>

## Installation

`pip install sling` or `pip install sling[arrow]` for streaming.

Then you should be able to run `sling --help` from command line.

## Running a Extract-Load Task

### CLI

```shell
sling run --src-conn MY_PG --src-stream myschema.mytable \
  --tgt-conn YOUR_SNOWFLAKE --tgt-object yourschema.yourtable \
  --mode full-refresh
```

Or passing a yaml/json string or file

```shell
cat '
source: MY_POSTGRES
target: MY_SNOWFLAKE

# default config options which apply to all streams
defaults:
  mode: full-refresh
  object: new_schema.{stream_schema}_{stream_table}

streams:
  my_schema.*:
' > /path/to/replication.yaml

sling run -r /path/to/replication.yaml
```

### Using the `Replication` class

Run a replication from file:

```python
import yaml
from sling import Replication

# From a YAML file
replication = Replication(file_path="path/to/replication.yaml")
replication.run()

# Or load into object
with open('path/to/replication.yaml') as file:
  config = yaml.load(file, Loader=yaml.FullLoader)

replication = Replication(**config)

replication.run()
```

Build a replication dynamically:

```python
from sling import Replication, ReplicationStream, Mode

# build sling replication
streams = {}
for (folder, table_name) in list(folders):
  streams[folder] = ReplicationStream(
    mode=Mode.FULL_REFRESH, object=table_name, primary_key='_hash_id')

replication = Replication(
  source='aws_s3',
  target='snowflake',
  streams=streams,
  env=dict(SLING_STREAM_URL_COLUMN='true', SLING_LOADED_AT_COLUMN='true'),
  debug=True,
)

replication.run()
```

### Using the `Sling` Class

For more direct control and streaming capabilities, you can use the `Sling` class, which mirrors the CLI interface.

#### Basic Usage with `run()` method

```python
import os
from sling import Sling, Mode

# Set postgres & snowflake connection
# see https://docs.slingdata.io/connections/database-connections
os.environ["POSTGRES"] = 'postgres://...'
os.environ["SNOWFLAKE"] = 'snowflake://...'

# Database to database transfer
Sling(
    src_conn="postgres",
    src_stream="public.users",
    tgt_conn="snowflake",
    tgt_object="public.users_copy",
    mode=Mode.FULL_REFRESH
).run()

# Database to file
Sling(
    src_conn="postgres", 
    src_stream="select * from users where active = true",
    tgt_object="file:///tmp/active_users.csv"
).run()

# File to database
Sling(
    src_stream="file:///path/to/data.csv",
    tgt_conn="snowflake",
    tgt_object="public.imported_data"
).run()
```


#### Input Streaming - Python Data to Target

> **💡 Tip:** Install `pip install sling[arrow]` for better streaming performance and improved data type handling.

> **📊 DataFrame Support:** The `input` parameter accepts lists of dictionaries, pandas DataFrames, or polars DataFrames. DataFrame support preserves data types when using Arrow format.

> **⚠️ Note:** Be careful with large numbers of `Sling` invocations using `input` or `stream()` methods when working with external systems (databases, file systems). Each call re-opens the connection since it invokes the underlying sling binary. For better performance and connection reuse, consider using the `Replication` class instead, which maintains open connections across multiple operations.

```python
import os
from sling import Sling, Format

# Set postgres connection
# see https://docs.slingdata.io/connections/database-connections
os.environ["POSTGRES"] = 'postgres://...'

# Stream Python data to CSV file
data = [
    {"id": 1, "name": "John", "age": 30},
    {"id": 2, "name": "Jane", "age": 25},
    {"id": 3, "name": "Bob", "age": 35}
]

Sling(
    input=data,
    tgt_object="file:///tmp/output.csv"
).run()

# Stream Python data to database
Sling(
    input=data,
    tgt_conn="postgres",
    tgt_object="public.users"
).run()

# Stream Python data to JSON Lines file
Sling(
    input=data,
    tgt_object="file:///tmp/output.jsonl",
    tgt_options={"format": Format.JSONLINES}
).run()

# Stream from generator (memory efficient for large datasets)
def data_generator():
    for i in range(10000):
        yield {"id": i, "value": f"item_{i}", "timestamp": "2023-01-01"}

Sling(input=data_generator(), tgt_object="file:///tmp/large_dataset.csv").run()

# Stream pandas DataFrame to database
import pandas as pd

df = pd.DataFrame({
    "id": [1, 2, 3, 4],
    "name": ["Alice", "Bob", "Charlie", "Diana"],
    "age": [25, 30, 35, 28],
    "salary": [50000, 60000, 70000, 55000]
})

Sling(
    input=df,
    tgt_conn="postgres",
    tgt_object="public.employees"
).run()

# Stream polars DataFrame to CSV file
import polars as pl

df = pl.DataFrame({
    "product_id": [101, 102, 103],
    "product_name": ["Laptop", "Mouse", "Keyboard"],
    "price": [999.99, 25.50, 75.00],
    "in_stock": [True, False, True]
})

Sling(
    input=df,
    tgt_object="file:///tmp/products.csv"
).run()

# DataFrame with column selection
Sling(
    input=df,
    select=["product_name", "price"],  # Only export specific columns
    tgt_object="file:///tmp/product_prices.csv"
).run()
```

#### Output Streaming with `stream()`

```python
import os
from sling import Sling

# Set postgres connection
# see https://docs.slingdata.io/connections/database-connections
os.environ["POSTGRES"] = 'postgres://...'

# Stream data from database
sling = Sling(
    src_conn="postgres",
    src_stream="public.users",
    limit=1000
)

for record in sling.stream():
    print(f"User: {record['name']}, Age: {record['age']}")

# Stream data from file
sling = Sling(
    src_stream="file:///path/to/data.csv"
)

# Process records one by one (memory efficient)
for record in sling.stream():
    # Process each record
    processed_data = transform_record(record)
    # Could save to another system, send to API, etc.

# Stream with parameters
sling = Sling(
    src_conn="postgres",
    src_stream="public.orders",
    select=["order_id", "customer_name", "total"],
    where="total > 100",
    limit=500
)

records = list(sling.stream())
print(f"Found {len(records)} high-value orders")
```

#### High-Performance Streaming with `stream_arrow()`

> **🚀 Performance:** The `stream_arrow()` method provides the highest performance streaming with full data type preservation by using Apache Arrow's columnar format. Requires `pip install sling[arrow]`.

> **📊 Type Safety:** Unlike `stream()` which may convert data types during CSV serialization, `stream_arrow()` preserves exact data types including integers, floats, timestamps, and more.

```python
import os
from sling import Sling

# Set postgres connection  
# see https://docs.slingdata.io/connections/database-connections
os.environ["POSTGRES"] = 'postgres://...'

# Basic Arrow streaming from database
sling = Sling(src_conn="postgres", src_stream="public.users", limit=1000)

# Get Arrow RecordBatchStreamReader for maximum performance
reader = sling.stream_arrow()

# Convert to Arrow Table for analysis
table = reader.read_all()
print(f"Received {table.num_rows} rows with {table.num_columns} columns")
print(f"Column names: {table.column_names}")
print(f"Schema: {table.schema}")

# Convert to pandas DataFrame with preserved types
if table.num_rows > 0:
    df = table.to_pandas()
    print(df.dtypes)  # Shows preserved data types

# Stream Arrow file with type preservation
sling = Sling(
    src_stream="file:///path/to/data.arrow",
    src_options={"format": "arrow"}
)

reader = sling.stream_arrow()
table = reader.read_all()

# Access columnar data directly (very efficient)
for column_name in table.column_names:
    column = table.column(column_name)
    print(f"{column_name}: {column.type}")

# Process Arrow batches for large datasets (memory efficient)
sling = Sling(
    src_conn="postgres", 
    src_stream="select * from large_table"
)

reader = sling.stream_arrow()
for batch in reader:
    # Process each batch separately to manage memory
    print(f"Processing batch with {batch.num_rows} rows")
    # Convert batch to pandas if needed
    batch_df = batch.to_pandas()
    # Process batch_df...

# Round-trip with Arrow format preservation
import pandas as pd

# Write DataFrame to Arrow file with type preservation
df = pd.DataFrame({
    "id": [1, 2, 3],
    "amount": [100.50, 250.75, 75.25],
    "timestamp": pd.to_datetime(["2023-01-01", "2023-01-02", "2023-01-03"]),
    "active": [True, False, True]
})

Sling(
    input=df,
    tgt_object="file:///tmp/data.arrow",
    tgt_options={"format": "arrow"}
).run()

# Read back with full type preservation
sling = Sling(
    src_stream="file:///tmp/data.arrow",
    src_options={"format": "arrow"}
)

reader = sling.stream_arrow()
restored_table = reader.read_all()
restored_df = restored_table.to_pandas()

# Types are exactly preserved (no string conversion)
print(restored_df.dtypes)
assert restored_df['active'].dtype == 'bool'
assert 'datetime64' in str(restored_df['timestamp'].dtype)
```

**Notes:**
- `stream_arrow()` requires PyArrow: `pip install sling[arrow]`
- Cannot be used with a target object (use `run()` instead)
- Provides the best performance for large datasets
- Preserves exact data types including timestamps, decimals, and booleans
- Ideal for analytics workloads and data science applications

#### Round-trip Examples

```python
import os
from sling import Sling

# Set postgres connection
# see https://docs.slingdata.io/connections/database-connections
os.environ["POSTGRES"] = 'postgres://...'

# Python → File → Python
original_data = [
    {"id": 1, "name": "Alice", "score": 95.5},
    {"id": 2, "name": "Bob", "score": 87.2}
]

# Step 1: Python data to file
sling_write = Sling(
    input=original_data,
    tgt_object="file:///tmp/scores.csv"
)
sling_write.run()

# Step 2: File back to Python
sling_read = Sling(
    src_stream="file:///tmp/scores.csv"
)
loaded_data = list(sling_read.stream())

# Python → Database → Python (with transformations)
sling_to_db = Sling(
    input=original_data,
    tgt_conn="postgres",
    tgt_object="public.temp_scores"
)
sling_to_db.run()

sling_from_db = Sling(
    src_conn="postgres", 
    src_stream="select *, score * 1.1 as boosted_score from public.temp_scores",
)
transformed_data = list(sling_from_db.stream())

# DataFrame → Database → DataFrame (with pandas/polars)
import pandas as pd

# Start with pandas DataFrame
df = pd.DataFrame({
    "user_id": [1, 2, 3],
    "purchase_amount": [100.50, 250.75, 75.25],
    "category": ["electronics", "clothing", "books"]
})

# Write DataFrame to database
Sling(
    input=df,
    tgt_conn="postgres",
    tgt_object="public.purchases"
).run()

# Read back with SQL transformations as pandas DataFrame
sling_query = Sling(
    src_conn="postgres",
    src_stream="""
        SELECT category, 
               COUNT(*) as purchase_count,
               AVG(purchase_amount) as avg_amount
        FROM public.purchases 
        GROUP BY category
    """
)
summary_data = list(sling_query.stream())
summary_df = pd.DataFrame(summary_data)
print(summary_df)
```


### Using the `Pipeline` class

Run a [Pipeline](https://docs.slingdata.io/concepts/pipeline):

```python
from sling import Pipeline
from sling.hooks import StepLog, StepCopy, StepReplication, StepHTTP, StepCommand

# From a YAML file
pipeline = Pipeline(file_path="path/to/pipeline.yaml")
pipeline.run()

# Or using Hook objects for type safety
pipeline = Pipeline(
    steps=[
        StepLog(message="Hello world"),
        StepCopy(from_="sftp//path/to/file", to="aws_s3/path/to/file"),
        StepReplication(path="path/to/replication.yaml"),
        StepHTTP(url="https://trigger.webhook.com"),
        StepCommand(command=["ls", "-l"], print_output=True)
    ],
    env={"MY_VAR": "value"}
)
pipeline.run()

# Or programmatically using dictionaries
pipeline = Pipeline(
    steps=[
        {"type": "log", "message": "Hello world"},
        {"type": "copy", "from": "sftp//path/to/file", "to": "aws_s3/path/to/file"},
        {"type": "replication", "path": "path/to/replication.yaml"},
        {"type": "http", "url": "https://trigger.webhook.com"},
        {"type": "command", "command": ["ls", "-l"], "print": True}
    ],
    env={"MY_VAR": "value"}
)
pipeline.run()
```


## Testing

```bash
pytest sling/tests/tests.py -v
pytest sling/tests/test_sling_class.py -v
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/slingdata-io/sling-python",
    "name": "sling",
    "maintainer": null,
    "docs_url": null,
    "requires_python": null,
    "maintainer_email": null,
    "keywords": "sling, etl, elt, extract, load",
    "author": "Sling Data",
    "author_email": "support@slingdata.io",
    "download_url": "https://files.pythonhosted.org/packages/ca/83/f965d5786e96bd3c09c7c54dd49e7e3280ecf8ecf236801c1bfaa51142a8/sling-1.4.15.tar.gz",
    "platform": null,
    "description": "<p align=\"center\"><img src=\"https://github.com/slingdata-io/sling-python/raw/main/logo-with-text.png\" alt=\"logo\" width=\"250\"/></p>\n\n<p align=\"center\">Slings from a data source to a data target.</p>\n\n## Installation\n\n`pip install sling` or `pip install sling[arrow]` for streaming.\n\nThen you should be able to run `sling --help` from command line.\n\n## Running a Extract-Load Task\n\n### CLI\n\n```shell\nsling run --src-conn MY_PG --src-stream myschema.mytable \\\n  --tgt-conn YOUR_SNOWFLAKE --tgt-object yourschema.yourtable \\\n  --mode full-refresh\n```\n\nOr passing a yaml/json string or file\n\n```shell\ncat '\nsource: MY_POSTGRES\ntarget: MY_SNOWFLAKE\n\n# default config options which apply to all streams\ndefaults:\n  mode: full-refresh\n  object: new_schema.{stream_schema}_{stream_table}\n\nstreams:\n  my_schema.*:\n' > /path/to/replication.yaml\n\nsling run -r /path/to/replication.yaml\n```\n\n### Using the `Replication` class\n\nRun a replication from file:\n\n```python\nimport yaml\nfrom sling import Replication\n\n# From a YAML file\nreplication = Replication(file_path=\"path/to/replication.yaml\")\nreplication.run()\n\n# Or load into object\nwith open('path/to/replication.yaml') as file:\n  config = yaml.load(file, Loader=yaml.FullLoader)\n\nreplication = Replication(**config)\n\nreplication.run()\n```\n\nBuild a replication dynamically:\n\n```python\nfrom sling import Replication, ReplicationStream, Mode\n\n# build sling replication\nstreams = {}\nfor (folder, table_name) in list(folders):\n  streams[folder] = ReplicationStream(\n    mode=Mode.FULL_REFRESH, object=table_name, primary_key='_hash_id')\n\nreplication = Replication(\n  source='aws_s3',\n  target='snowflake',\n  streams=streams,\n  env=dict(SLING_STREAM_URL_COLUMN='true', SLING_LOADED_AT_COLUMN='true'),\n  debug=True,\n)\n\nreplication.run()\n```\n\n### Using the `Sling` Class\n\nFor more direct control and streaming capabilities, you can use the `Sling` class, which mirrors the CLI interface.\n\n#### Basic Usage with `run()` method\n\n```python\nimport os\nfrom sling import Sling, Mode\n\n# Set postgres & snowflake connection\n# see https://docs.slingdata.io/connections/database-connections\nos.environ[\"POSTGRES\"] = 'postgres://...'\nos.environ[\"SNOWFLAKE\"] = 'snowflake://...'\n\n# Database to database transfer\nSling(\n    src_conn=\"postgres\",\n    src_stream=\"public.users\",\n    tgt_conn=\"snowflake\",\n    tgt_object=\"public.users_copy\",\n    mode=Mode.FULL_REFRESH\n).run()\n\n# Database to file\nSling(\n    src_conn=\"postgres\", \n    src_stream=\"select * from users where active = true\",\n    tgt_object=\"file:///tmp/active_users.csv\"\n).run()\n\n# File to database\nSling(\n    src_stream=\"file:///path/to/data.csv\",\n    tgt_conn=\"snowflake\",\n    tgt_object=\"public.imported_data\"\n).run()\n```\n\n\n#### Input Streaming - Python Data to Target\n\n> **\ud83d\udca1 Tip:** Install `pip install sling[arrow]` for better streaming performance and improved data type handling.\n\n> **\ud83d\udcca DataFrame Support:** The `input` parameter accepts lists of dictionaries, pandas DataFrames, or polars DataFrames. DataFrame support preserves data types when using Arrow format.\n\n> **\u26a0\ufe0f Note:** Be careful with large numbers of `Sling` invocations using `input` or `stream()` methods when working with external systems (databases, file systems). Each call re-opens the connection since it invokes the underlying sling binary. For better performance and connection reuse, consider using the `Replication` class instead, which maintains open connections across multiple operations.\n\n```python\nimport os\nfrom sling import Sling, Format\n\n# Set postgres connection\n# see https://docs.slingdata.io/connections/database-connections\nos.environ[\"POSTGRES\"] = 'postgres://...'\n\n# Stream Python data to CSV file\ndata = [\n    {\"id\": 1, \"name\": \"John\", \"age\": 30},\n    {\"id\": 2, \"name\": \"Jane\", \"age\": 25},\n    {\"id\": 3, \"name\": \"Bob\", \"age\": 35}\n]\n\nSling(\n    input=data,\n    tgt_object=\"file:///tmp/output.csv\"\n).run()\n\n# Stream Python data to database\nSling(\n    input=data,\n    tgt_conn=\"postgres\",\n    tgt_object=\"public.users\"\n).run()\n\n# Stream Python data to JSON Lines file\nSling(\n    input=data,\n    tgt_object=\"file:///tmp/output.jsonl\",\n    tgt_options={\"format\": Format.JSONLINES}\n).run()\n\n# Stream from generator (memory efficient for large datasets)\ndef data_generator():\n    for i in range(10000):\n        yield {\"id\": i, \"value\": f\"item_{i}\", \"timestamp\": \"2023-01-01\"}\n\nSling(input=data_generator(), tgt_object=\"file:///tmp/large_dataset.csv\").run()\n\n# Stream pandas DataFrame to database\nimport pandas as pd\n\ndf = pd.DataFrame({\n    \"id\": [1, 2, 3, 4],\n    \"name\": [\"Alice\", \"Bob\", \"Charlie\", \"Diana\"],\n    \"age\": [25, 30, 35, 28],\n    \"salary\": [50000, 60000, 70000, 55000]\n})\n\nSling(\n    input=df,\n    tgt_conn=\"postgres\",\n    tgt_object=\"public.employees\"\n).run()\n\n# Stream polars DataFrame to CSV file\nimport polars as pl\n\ndf = pl.DataFrame({\n    \"product_id\": [101, 102, 103],\n    \"product_name\": [\"Laptop\", \"Mouse\", \"Keyboard\"],\n    \"price\": [999.99, 25.50, 75.00],\n    \"in_stock\": [True, False, True]\n})\n\nSling(\n    input=df,\n    tgt_object=\"file:///tmp/products.csv\"\n).run()\n\n# DataFrame with column selection\nSling(\n    input=df,\n    select=[\"product_name\", \"price\"],  # Only export specific columns\n    tgt_object=\"file:///tmp/product_prices.csv\"\n).run()\n```\n\n#### Output Streaming with `stream()`\n\n```python\nimport os\nfrom sling import Sling\n\n# Set postgres connection\n# see https://docs.slingdata.io/connections/database-connections\nos.environ[\"POSTGRES\"] = 'postgres://...'\n\n# Stream data from database\nsling = Sling(\n    src_conn=\"postgres\",\n    src_stream=\"public.users\",\n    limit=1000\n)\n\nfor record in sling.stream():\n    print(f\"User: {record['name']}, Age: {record['age']}\")\n\n# Stream data from file\nsling = Sling(\n    src_stream=\"file:///path/to/data.csv\"\n)\n\n# Process records one by one (memory efficient)\nfor record in sling.stream():\n    # Process each record\n    processed_data = transform_record(record)\n    # Could save to another system, send to API, etc.\n\n# Stream with parameters\nsling = Sling(\n    src_conn=\"postgres\",\n    src_stream=\"public.orders\",\n    select=[\"order_id\", \"customer_name\", \"total\"],\n    where=\"total > 100\",\n    limit=500\n)\n\nrecords = list(sling.stream())\nprint(f\"Found {len(records)} high-value orders\")\n```\n\n#### High-Performance Streaming with `stream_arrow()`\n\n> **\ud83d\ude80 Performance:** The `stream_arrow()` method provides the highest performance streaming with full data type preservation by using Apache Arrow's columnar format. Requires `pip install sling[arrow]`.\n\n> **\ud83d\udcca Type Safety:** Unlike `stream()` which may convert data types during CSV serialization, `stream_arrow()` preserves exact data types including integers, floats, timestamps, and more.\n\n```python\nimport os\nfrom sling import Sling\n\n# Set postgres connection  \n# see https://docs.slingdata.io/connections/database-connections\nos.environ[\"POSTGRES\"] = 'postgres://...'\n\n# Basic Arrow streaming from database\nsling = Sling(src_conn=\"postgres\", src_stream=\"public.users\", limit=1000)\n\n# Get Arrow RecordBatchStreamReader for maximum performance\nreader = sling.stream_arrow()\n\n# Convert to Arrow Table for analysis\ntable = reader.read_all()\nprint(f\"Received {table.num_rows} rows with {table.num_columns} columns\")\nprint(f\"Column names: {table.column_names}\")\nprint(f\"Schema: {table.schema}\")\n\n# Convert to pandas DataFrame with preserved types\nif table.num_rows > 0:\n    df = table.to_pandas()\n    print(df.dtypes)  # Shows preserved data types\n\n# Stream Arrow file with type preservation\nsling = Sling(\n    src_stream=\"file:///path/to/data.arrow\",\n    src_options={\"format\": \"arrow\"}\n)\n\nreader = sling.stream_arrow()\ntable = reader.read_all()\n\n# Access columnar data directly (very efficient)\nfor column_name in table.column_names:\n    column = table.column(column_name)\n    print(f\"{column_name}: {column.type}\")\n\n# Process Arrow batches for large datasets (memory efficient)\nsling = Sling(\n    src_conn=\"postgres\", \n    src_stream=\"select * from large_table\"\n)\n\nreader = sling.stream_arrow()\nfor batch in reader:\n    # Process each batch separately to manage memory\n    print(f\"Processing batch with {batch.num_rows} rows\")\n    # Convert batch to pandas if needed\n    batch_df = batch.to_pandas()\n    # Process batch_df...\n\n# Round-trip with Arrow format preservation\nimport pandas as pd\n\n# Write DataFrame to Arrow file with type preservation\ndf = pd.DataFrame({\n    \"id\": [1, 2, 3],\n    \"amount\": [100.50, 250.75, 75.25],\n    \"timestamp\": pd.to_datetime([\"2023-01-01\", \"2023-01-02\", \"2023-01-03\"]),\n    \"active\": [True, False, True]\n})\n\nSling(\n    input=df,\n    tgt_object=\"file:///tmp/data.arrow\",\n    tgt_options={\"format\": \"arrow\"}\n).run()\n\n# Read back with full type preservation\nsling = Sling(\n    src_stream=\"file:///tmp/data.arrow\",\n    src_options={\"format\": \"arrow\"}\n)\n\nreader = sling.stream_arrow()\nrestored_table = reader.read_all()\nrestored_df = restored_table.to_pandas()\n\n# Types are exactly preserved (no string conversion)\nprint(restored_df.dtypes)\nassert restored_df['active'].dtype == 'bool'\nassert 'datetime64' in str(restored_df['timestamp'].dtype)\n```\n\n**Notes:**\n- `stream_arrow()` requires PyArrow: `pip install sling[arrow]`\n- Cannot be used with a target object (use `run()` instead)\n- Provides the best performance for large datasets\n- Preserves exact data types including timestamps, decimals, and booleans\n- Ideal for analytics workloads and data science applications\n\n#### Round-trip Examples\n\n```python\nimport os\nfrom sling import Sling\n\n# Set postgres connection\n# see https://docs.slingdata.io/connections/database-connections\nos.environ[\"POSTGRES\"] = 'postgres://...'\n\n# Python \u2192 File \u2192 Python\noriginal_data = [\n    {\"id\": 1, \"name\": \"Alice\", \"score\": 95.5},\n    {\"id\": 2, \"name\": \"Bob\", \"score\": 87.2}\n]\n\n# Step 1: Python data to file\nsling_write = Sling(\n    input=original_data,\n    tgt_object=\"file:///tmp/scores.csv\"\n)\nsling_write.run()\n\n# Step 2: File back to Python\nsling_read = Sling(\n    src_stream=\"file:///tmp/scores.csv\"\n)\nloaded_data = list(sling_read.stream())\n\n# Python \u2192 Database \u2192 Python (with transformations)\nsling_to_db = Sling(\n    input=original_data,\n    tgt_conn=\"postgres\",\n    tgt_object=\"public.temp_scores\"\n)\nsling_to_db.run()\n\nsling_from_db = Sling(\n    src_conn=\"postgres\", \n    src_stream=\"select *, score * 1.1 as boosted_score from public.temp_scores\",\n)\ntransformed_data = list(sling_from_db.stream())\n\n# DataFrame \u2192 Database \u2192 DataFrame (with pandas/polars)\nimport pandas as pd\n\n# Start with pandas DataFrame\ndf = pd.DataFrame({\n    \"user_id\": [1, 2, 3],\n    \"purchase_amount\": [100.50, 250.75, 75.25],\n    \"category\": [\"electronics\", \"clothing\", \"books\"]\n})\n\n# Write DataFrame to database\nSling(\n    input=df,\n    tgt_conn=\"postgres\",\n    tgt_object=\"public.purchases\"\n).run()\n\n# Read back with SQL transformations as pandas DataFrame\nsling_query = Sling(\n    src_conn=\"postgres\",\n    src_stream=\"\"\"\n        SELECT category, \n               COUNT(*) as purchase_count,\n               AVG(purchase_amount) as avg_amount\n        FROM public.purchases \n        GROUP BY category\n    \"\"\"\n)\nsummary_data = list(sling_query.stream())\nsummary_df = pd.DataFrame(summary_data)\nprint(summary_df)\n```\n\n\n### Using the `Pipeline` class\n\nRun a [Pipeline](https://docs.slingdata.io/concepts/pipeline):\n\n```python\nfrom sling import Pipeline\nfrom sling.hooks import StepLog, StepCopy, StepReplication, StepHTTP, StepCommand\n\n# From a YAML file\npipeline = Pipeline(file_path=\"path/to/pipeline.yaml\")\npipeline.run()\n\n# Or using Hook objects for type safety\npipeline = Pipeline(\n    steps=[\n        StepLog(message=\"Hello world\"),\n        StepCopy(from_=\"sftp//path/to/file\", to=\"aws_s3/path/to/file\"),\n        StepReplication(path=\"path/to/replication.yaml\"),\n        StepHTTP(url=\"https://trigger.webhook.com\"),\n        StepCommand(command=[\"ls\", \"-l\"], print_output=True)\n    ],\n    env={\"MY_VAR\": \"value\"}\n)\npipeline.run()\n\n# Or programmatically using dictionaries\npipeline = Pipeline(\n    steps=[\n        {\"type\": \"log\", \"message\": \"Hello world\"},\n        {\"type\": \"copy\", \"from\": \"sftp//path/to/file\", \"to\": \"aws_s3/path/to/file\"},\n        {\"type\": \"replication\", \"path\": \"path/to/replication.yaml\"},\n        {\"type\": \"http\", \"url\": \"https://trigger.webhook.com\"},\n        {\"type\": \"command\", \"command\": [\"ls\", \"-l\"], \"print\": True}\n    ],\n    env={\"MY_VAR\": \"value\"}\n)\npipeline.run()\n```\n\n\n## Testing\n\n```bash\npytest sling/tests/tests.py -v\npytest sling/tests/test_sling_class.py -v\n```\n",
    "bugtrack_url": null,
    "license": null,
    "summary": "Slings data from a source to a target",
    "version": "1.4.15",
    "project_urls": {
        "Download": "https://github.com/slingdata-io/sling-python/archive/master.zip",
        "Homepage": "https://github.com/slingdata-io/sling-python"
    },
    "split_keywords": [
        "sling",
        " etl",
        " elt",
        " extract",
        " load"
    ],
    "urls": [
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "2007df80ae70dce2daca4c5962a6d8771fb635e15e91c8d4a5e7b6636ca62a40",
                "md5": "3953aac36f8466178d6ad00745f393d1",
                "sha256": "bad889ee671f148c78e0eed236bc521271e7ee49f4a186a9f32f938ce8085ce8"
            },
            "downloads": -1,
            "filename": "sling-1.4.15-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "3953aac36f8466178d6ad00745f393d1",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": null,
            "size": 24575,
            "upload_time": "2025-07-26T00:36:29",
            "upload_time_iso_8601": "2025-07-26T00:36:29.095497Z",
            "url": "https://files.pythonhosted.org/packages/20/07/df80ae70dce2daca4c5962a6d8771fb635e15e91c8d4a5e7b6636ca62a40/sling-1.4.15-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": null,
            "digests": {
                "blake2b_256": "ca83f965d5786e96bd3c09c7c54dd49e7e3280ecf8ecf236801c1bfaa51142a8",
                "md5": "ff54914d2f63aec1fb888ca96e69fe0b",
                "sha256": "ffa7c0b9bd26c6ae1b0f2601337e211dbcd0a1f13b4f7cb152fe39b06e8e5c58"
            },
            "downloads": -1,
            "filename": "sling-1.4.15.tar.gz",
            "has_sig": false,
            "md5_digest": "ff54914d2f63aec1fb888ca96e69fe0b",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": null,
            "size": 40299,
            "upload_time": "2025-07-26T00:36:30",
            "upload_time_iso_8601": "2025-07-26T00:36:30.111567Z",
            "url": "https://files.pythonhosted.org/packages/ca/83/f965d5786e96bd3c09c7c54dd49e7e3280ecf8ecf236801c1bfaa51142a8/sling-1.4.15.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2025-07-26 00:36:30",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "slingdata-io",
    "github_project": "sling-python",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "lcname": "sling"
}
        
Elapsed time: 0.73244s