.. _readme:
onETL
=====
|Repo Status| |PyPI Latest Release| |PyPI License| |PyPI Python Version| |PyPI Downloads|
|Documentation| |CI Status| |Test Coverage| |pre-commit.ci Status|
.. |Repo Status| image:: https://www.repostatus.org/badges/latest/active.svg
:alt: Repo status - Active
:target: https://github.com/MobileTeleSystems/onetl
.. |PyPI Latest Release| image:: https://img.shields.io/pypi/v/onetl
:alt: PyPI - Latest Release
:target: https://pypi.org/project/onetl/
.. |PyPI License| image:: https://img.shields.io/pypi/l/onetl.svg
:alt: PyPI - License
:target: https://github.com/MobileTeleSystems/onetl/blob/develop/LICENSE.txt
.. |PyPI Python Version| image:: https://img.shields.io/pypi/pyversions/onetl.svg
:alt: PyPI - Python Version
:target: https://pypi.org/project/onetl/
.. |PyPI Downloads| image:: https://img.shields.io/pypi/dm/onetl
:alt: PyPI - Downloads
:target: https://pypi.org/project/onetl/
.. |Documentation| image:: https://readthedocs.org/projects/onetl/badge/?version=stable
:alt: Documentation - ReadTheDocs
:target: https://onetl.readthedocs.io/
.. |CI Status| image:: https://github.com/MobileTeleSystems/onetl/workflows/Tests/badge.svg
:alt: Github Actions - latest CI build status
:target: https://github.com/MobileTeleSystems/onetl/actions
.. |Test Coverage| image:: https://codecov.io/gh/MobileTeleSystems/onetl/branch/develop/graph/badge.svg?token=RIO8URKNZJ
:alt: Test coverage - percent
:target: https://codecov.io/gh/MobileTeleSystems/onetl
.. |pre-commit.ci Status| image:: https://results.pre-commit.ci/badge/github/MobileTeleSystems/onetl/develop.svg
:alt: pre-commit.ci - status
:target: https://results.pre-commit.ci/latest/github/MobileTeleSystems/onetl/develop
|Logo|
.. |Logo| image:: https://raw.githubusercontent.com/MobileTeleSystems/onetl/0.12.5/docs/_static/logo_wide.svg
:alt: onETL logo
:target: https://github.com/MobileTeleSystems/onetl
What is onETL?
--------------
Python ETL/ELT library powered by `Apache Spark <https://spark.apache.org/>`_ & other open-source tools.
Goals
-----
* Provide unified classes to extract data from (**E**) & load data to (**L**) various stores.
* Provides `Spark DataFrame API <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html>`_ for performing transformations (**T**) in terms of *ETL*.
* Provide direct assess to database, allowing to execute SQL queries, as well as DDL, DML, and call functions/procedures. This can be used for building up *ELT* pipelines.
* Support different `read strategies <https://onetl.readthedocs.io/en/stable/strategy/index.html>`_ for incremental and batch data fetching.
* Provide `hooks <https://onetl.readthedocs.io/en/stable/hooks/index.html>`_ & `plugins <https://onetl.readthedocs.io/en/stable/plugins.html>`_ mechanism for altering behavior of internal classes.
Non-goals
---------
* onETL is not a Spark replacement. It just provides additional functionality that Spark does not have, and improves UX for end users.
* onETL is not a framework, as it does not have requirements to project structure, naming, the way of running ETL/ELT processes, configuration, etc. All of that should be implemented in some other tool.
* onETL is deliberately developed without any integration with scheduling software like Apache Airflow. All integrations should be implemented as separated tools.
* Only batch operations, no streaming. For streaming prefer `Apache Flink <https://flink.apache.org/>`_.
Requirements
------------
* **Python 3.7 - 3.12**
* PySpark 2.3.x - 3.5.x (depends on used connector)
* Java 8+ (required by Spark, see below)
* Kerberos libs & GCC (required by ``Hive``, ``HDFS`` and ``SparkHDFS`` connectors)
Supported storages
------------------
Database
~~~~~~~~
+--------------------+--------------+-------------------------------------------------------------------------------------------------------------------------+
| Type | Storage | Powered by |
+====================+==============+=========================================================================================================================+
| Database | Clickhouse | Apache Spark `JDBC Data Source <https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html>`_ |
+ +--------------+ +
| | MSSQL | |
+ +--------------+ +
| | MySQL | |
+ +--------------+ +
| | Postgres | |
+ +--------------+ +
| | Oracle | |
+ +--------------+ +
| | Teradata | |
+ +--------------+-------------------------------------------------------------------------------------------------------------------------+
| | Hive | Apache Spark `Hive integration <https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html>`_ |
+ +--------------+-------------------------------------------------------------------------------------------------------------------------+
| | Kafka | Apache Spark `Kafka integration <https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html>`_ |
+ +--------------+-------------------------------------------------------------------------------------------------------------------------+
| | Greenplum | VMware `Greenplum Spark connector <https://docs.vmware.com/en/VMware-Greenplum-Connector-for-Apache-Spark/index.html>`_ |
+ +--------------+-------------------------------------------------------------------------------------------------------------------------+
| | MongoDB | `MongoDB Spark connector <https://www.mongodb.com/docs/spark-connector/current>`_ |
+--------------------+--------------+-------------------------------------------------------------------------------------------------------------------------+
| File | HDFS | `HDFS Python client <https://pypi.org/project/hdfs/>`_ |
+ +--------------+-------------------------------------------------------------------------------------------------------------------------+
| | S3 | `minio-py client <https://pypi.org/project/minio/>`_ |
+ +--------------+-------------------------------------------------------------------------------------------------------------------------+
| | SFTP | `Paramiko library <https://pypi.org/project/paramiko/>`_ |
+ +--------------+-------------------------------------------------------------------------------------------------------------------------+
| | FTP | `FTPUtil library <https://pypi.org/project/ftputil/>`_ |
+ +--------------+ +
| | FTPS | |
+ +--------------+-------------------------------------------------------------------------------------------------------------------------+
| | WebDAV | `WebdavClient3 library <https://pypi.org/project/webdavclient3/>`_ |
+ +--------------+-------------------------------------------------------------------------------------------------------------------------+
| | Samba | `pysmb library <https://pypi.org/project/pysmb/>`_ |
+--------------------+--------------+-------------------------------------------------------------------------------------------------------------------------+
| Files as DataFrame | SparkLocalFS | Apache Spark `File Data Source <https://spark.apache.org/docs/latest/sql-data-sources-generic-options.html>`_ |
| +--------------+ +
| | SparkHDFS | |
| +--------------+-------------------------------------------------------------------------------------------------------------------------+
| | SparkS3 | `Hadoop AWS <https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html>`_ library |
+--------------------+--------------+-------------------------------------------------------------------------------------------------------------------------+
.. documentation
Documentation
-------------
See https://onetl.readthedocs.io/
How to install
---------------
.. _install:
Minimal installation
~~~~~~~~~~~~~~~~~~~~
.. _minimal-install:
Base ``onetl`` package contains:
* ``DBReader``, ``DBWriter`` and related classes
* ``FileDownloader``, ``FileUploader``, ``FileMover`` and related classes, like file filters & limits
* ``FileDFReader``, ``FileDFWriter`` and related classes, like file formats
* Read Strategies & HWM classes
* Plugins support
It can be installed via:
.. code:: bash
pip install onetl
.. warning::
This method does NOT include any connections.
This method is recommended for use in third-party libraries which require for ``onetl`` to be installed,
but do not use its connection classes.
With DB and FileDF connections
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. _spark-install:
All DB connection classes (``Clickhouse``, ``Greenplum``, ``Hive`` and others)
and all FileDF connection classes (``SparkHDFS``, ``SparkLocalFS``, ``SparkS3``)
require Spark to be installed.
.. _java-install:
Firstly, you should install JDK. The exact installation instruction depends on your OS, here are some examples:
.. code:: bash
yum install java-1.8.0-openjdk-devel # CentOS 7 + Spark 2
dnf install java-11-openjdk-devel # CentOS 8 + Spark 3
apt-get install openjdk-11-jdk # Debian-based + Spark 3
.. _spark-compatibility-matrix:
Compatibility matrix
^^^^^^^^^^^^^^^^^^^^
+--------------------------------------------------------------+-------------+-------------+-------+
| Spark | Python | Java | Scala |
+==============================================================+=============+=============+=======+
| `2.3.x <https://spark.apache.org/docs/2.3.1/#downloading>`_ | 3.7 only | 8 only | 2.11 |
+--------------------------------------------------------------+-------------+-------------+-------+
| `2.4.x <https://spark.apache.org/docs/2.4.8/#downloading>`_ | 3.7 only | 8 only | 2.11 |
+--------------------------------------------------------------+-------------+-------------+-------+
| `3.2.x <https://spark.apache.org/docs/3.2.4/#downloading>`_ | 3.7 - 3.10 | 8u201 - 11 | 2.12 |
+--------------------------------------------------------------+-------------+-------------+-------+
| `3.3.x <https://spark.apache.org/docs/3.3.4/#downloading>`_ | 3.7 - 3.10 | 8u201 - 17 | 2.12 |
+--------------------------------------------------------------+-------------+-------------+-------+
| `3.4.x <https://spark.apache.org/docs/3.4.3/#downloading>`_ | 3.7 - 3.12 | 8u362 - 20 | 2.12 |
+--------------------------------------------------------------+-------------+-------------+-------+
| `3.5.x <https://spark.apache.org/docs/3.5.3/#downloading>`_ | 3.8 - 3.12 | 8u371 - 20 | 2.12 |
+--------------------------------------------------------------+-------------+-------------+-------+
.. _pyspark-install:
Then you should install PySpark via passing ``spark`` to ``extras``:
.. code:: bash
pip install onetl[spark] # install latest PySpark
or install PySpark explicitly:
.. code:: bash
pip install onetl pyspark==3.5.3 # install a specific PySpark version
or inject PySpark to ``sys.path`` in some other way BEFORE creating a class instance.
**Otherwise connection object cannot be created.**
With File connections
~~~~~~~~~~~~~~~~~~~~~
.. _files-install:
All File (but not *FileDF*) connection classes (``FTP``, ``SFTP``, ``HDFS`` and so on) requires specific Python clients to be installed.
Each client can be installed explicitly by passing connector name (in lowercase) to ``extras``:
.. code:: bash
pip install onetl[ftp] # specific connector
pip install onetl[ftp,ftps,sftp,hdfs,s3,webdav,samba] # multiple connectors
To install all file connectors at once you can pass ``files`` to ``extras``:
.. code:: bash
pip install onetl[files]
**Otherwise class import will fail.**
With Kerberos support
~~~~~~~~~~~~~~~~~~~~~
.. _kerberos-install:
Most of Hadoop instances set up with Kerberos support,
so some connections require additional setup to work properly.
* ``HDFS``
Uses `requests-kerberos <https://pypi.org/project/requests-kerberos/>`_ and
`GSSApi <https://pypi.org/project/gssapi/>`_ for authentication.
It also uses ``kinit`` executable to generate Kerberos ticket.
* ``Hive`` and ``SparkHDFS``
require Kerberos ticket to exist before creating Spark session.
So you need to install OS packages with:
* ``krb5`` libs
* Headers for ``krb5``
* ``gcc`` or other compiler for C sources
The exact installation instruction depends on your OS, here are some examples:
.. code:: bash
dnf install krb5-devel gcc # CentOS, OracleLinux
apt install libkrb5-dev gcc # Debian-based
Also you should pass ``kerberos`` to ``extras`` to install required Python packages:
.. code:: bash
pip install onetl[kerberos]
Full bundle
~~~~~~~~~~~
.. _full-bundle:
To install all connectors and dependencies, you can pass ``all`` into ``extras``:
.. code:: bash
pip install onetl[all]
# this is just the same as
pip install onetl[spark,files,kerberos]
.. warning::
This method consumes a lot of disk space, and requires for Java & Kerberos libraries to be installed into your OS.
.. _quick-start:
Quick start
------------
MSSQL → Hive
~~~~~~~~~~~~
Read data from MSSQL, transform & write to Hive.
.. code:: bash
# install onETL and PySpark
pip install onetl[spark]
.. code:: python
# Import pyspark to initialize the SparkSession
from pyspark.sql import SparkSession
# import function to setup onETL logging
from onetl.log import setup_logging
# Import required connections
from onetl.connection import MSSQL, Hive
# Import onETL classes to read & write data
from onetl.db import DBReader, DBWriter
# change logging level to INFO, and set up default logging format and handler
setup_logging()
# Initialize new SparkSession with MSSQL driver loaded
maven_packages = MSSQL.get_packages()
spark = (
SparkSession.builder.appName("spark_app_onetl_demo")
.config("spark.jars.packages", ",".join(maven_packages))
.enableHiveSupport() # for Hive
.getOrCreate()
)
# Initialize MSSQL connection and check if database is accessible
mssql = MSSQL(
host="mssqldb.demo.com",
user="onetl",
password="onetl",
database="Telecom",
spark=spark,
# These options are passed to MSSQL JDBC Driver:
extra={"applicationIntent": "ReadOnly"},
).check()
# >>> INFO:|MSSQL| Connection is available
# Initialize DBReader
reader = DBReader(
connection=mssql,
source="dbo.demo_table",
columns=["on", "etl"],
# Set some MSSQL read options:
options=MSSQL.ReadOptions(fetchsize=10000),
)
# checks that there is data in the table, otherwise raises exception
reader.raise_if_no_data()
# Read data to DataFrame
df = reader.run()
df.printSchema()
# root
# |-- id: integer (nullable = true)
# |-- phone_number: string (nullable = true)
# |-- region: string (nullable = true)
# |-- birth_date: date (nullable = true)
# |-- registered_at: timestamp (nullable = true)
# |-- account_balance: double (nullable = true)
# Apply any PySpark transformations
from pyspark.sql.functions import lit
df_to_write = df.withColumn("engine", lit("onetl"))
df_to_write.printSchema()
# root
# |-- id: integer (nullable = true)
# |-- phone_number: string (nullable = true)
# |-- region: string (nullable = true)
# |-- birth_date: date (nullable = true)
# |-- registered_at: timestamp (nullable = true)
# |-- account_balance: double (nullable = true)
# |-- engine: string (nullable = false)
# Initialize Hive connection
hive = Hive(cluster="rnd-dwh", spark=spark)
# Initialize DBWriter
db_writer = DBWriter(
connection=hive,
target="dl_sb.demo_table",
# Set some Hive write options:
options=Hive.WriteOptions(if_exists="replace_entire_table"),
)
# Write data from DataFrame to Hive
db_writer.run(df_to_write)
# Success!
SFTP → HDFS
~~~~~~~~~~~
Download files from SFTP & upload them to HDFS.
.. code:: bash
# install onETL with SFTP and HDFS clients, and Kerberos support
pip install onetl[hdfs,sftp,kerberos]
.. code:: python
# import function to setup onETL logging
from onetl.log import setup_logging
# Import required connections
from onetl.connection import SFTP, HDFS
# Import onETL classes to download & upload files
from onetl.file import FileDownloader, FileUploader
# import filter & limit classes
from onetl.file.filter import Glob, ExcludeDir
from onetl.file.limit import MaxFilesCount
# change logging level to INFO, and set up default logging format and handler
setup_logging()
# Initialize SFTP connection and check it
sftp = SFTP(
host="sftp.test.com",
user="someuser",
password="somepassword",
).check()
# >>> INFO:|SFTP| Connection is available
# Initialize downloader
file_downloader = FileDownloader(
connection=sftp,
source_path="/remote/tests/Report", # path on SFTP
local_path="/local/onetl/Report", # local fs path
filters=[
# download only files matching the glob
Glob("*.csv"),
# exclude files from this directory
ExcludeDir("/remote/tests/Report/exclude_dir/"),
],
limits=[
# download max 1000 files per run
MaxFilesCount(1000),
],
options=FileDownloader.Options(
# delete files from SFTP after successful download
delete_source=True,
# mark file as failed if it already exist in local_path
if_exists="error",
),
)
# Download files to local filesystem
download_result = downloader.run()
# Method run returns a DownloadResult object,
# which contains collection of downloaded files, divided to 4 categories
download_result
# DownloadResult(
# successful=[
# LocalPath('/local/onetl/Report/file_1.json'),
# LocalPath('/local/onetl/Report/file_2.json'),
# ],
# failed=[FailedRemoteFile('/remote/onetl/Report/file_3.json')],
# ignored=[RemoteFile('/remote/onetl/Report/file_4.json')],
# missing=[],
# )
# Raise exception if there are failed files, or there were no files in the remote filesystem
download_result.raise_if_failed() or download_result.raise_if_empty()
# Do any kind of magic with files: rename files, remove header for csv files, ...
renamed_files = my_rename_function(download_result.success)
# function removed "_" from file names
# [
# LocalPath('/home/onetl/Report/file1.json'),
# LocalPath('/home/onetl/Report/file2.json'),
# ]
# Initialize HDFS connection
hdfs = HDFS(
host="my.name.node",
user="someuser",
password="somepassword", # or keytab
)
# Initialize uploader
file_uploader = FileUploader(
connection=hdfs,
target_path="/user/onetl/Report/", # hdfs path
)
# Upload files from local fs to HDFS
upload_result = file_uploader.run(renamed_files)
# Method run returns a UploadResult object,
# which contains collection of uploaded files, divided to 4 categories
upload_result
# UploadResult(
# successful=[RemoteFile('/user/onetl/Report/file1.json')],
# failed=[FailedLocalFile('/local/onetl/Report/file2.json')],
# ignored=[],
# missing=[],
# )
# Raise exception if there are failed files, or there were no files in the local filesystem, or some input file is missing
upload_result.raise_if_failed() or upload_result.raise_if_empty() or upload_result.raise_if_missing()
# Success!
S3 → Postgres
~~~~~~~~~~~~~~~~
Read files directly from S3 path, convert them to dataframe, transform it and then write to a database.
.. code:: bash
# install onETL and PySpark
pip install onetl[spark]
.. code:: python
# Import pyspark to initialize the SparkSession
from pyspark.sql import SparkSession
# import function to setup onETL logging
from onetl.log import setup_logging
# Import required connections
from onetl.connection import Postgres, SparkS3
# Import onETL classes to read files
from onetl.file import FileDFReader
from onetl.file.format import CSV
# Import onETL classes to write data
from onetl.db import DBWriter
# change logging level to INFO, and set up default logging format and handler
setup_logging()
# Initialize new SparkSession with Hadoop AWS libraries and Postgres driver loaded
maven_packages = SparkS3.get_packages(spark_version="3.5.3") + Postgres.get_packages()
spark = (
SparkSession.builder.appName("spark_app_onetl_demo")
.config("spark.jars.packages", ",".join(maven_packages))
.getOrCreate()
)
# Initialize S3 connection and check it
spark_s3 = SparkS3(
host="s3.test.com",
protocol="https",
bucket="my-bucket",
access_key="somekey",
secret_key="somesecret",
# Access bucket as s3.test.com/my-bucket
extra={"path.style.access": True},
spark=spark,
).check()
# >>> INFO:|SparkS3| Connection is available
# Describe file format and parsing options
csv = CSV(
delimiter=";",
header=True,
encoding="utf-8",
)
# Describe DataFrame schema of files
from pyspark.sql.types import (
DateType,
DoubleType,
IntegerType,
StringType,
StructField,
StructType,
TimestampType,
)
df_schema = StructType(
[
StructField("id", IntegerType()),
StructField("phone_number", StringType()),
StructField("region", StringType()),
StructField("birth_date", DateType()),
StructField("registered_at", TimestampType()),
StructField("account_balance", DoubleType()),
],
)
# Initialize file df reader
reader = FileDFReader(
connection=spark_s3,
source_path="/remote/tests/Report", # path on S3 there *.csv files are located
format=csv, # file format with specific parsing options
df_schema=df_schema, # columns & types
)
# Read files directly from S3 as Spark DataFrame
df = reader.run()
# Check that DataFrame schema is same as expected
df.printSchema()
# root
# |-- id: integer (nullable = true)
# |-- phone_number: string (nullable = true)
# |-- region: string (nullable = true)
# |-- birth_date: date (nullable = true)
# |-- registered_at: timestamp (nullable = true)
# |-- account_balance: double (nullable = true)
# Apply any PySpark transformations
from pyspark.sql.functions import lit
df_to_write = df.withColumn("engine", lit("onetl"))
df_to_write.printSchema()
# root
# |-- id: integer (nullable = true)
# |-- phone_number: string (nullable = true)
# |-- region: string (nullable = true)
# |-- birth_date: date (nullable = true)
# |-- registered_at: timestamp (nullable = true)
# |-- account_balance: double (nullable = true)
# |-- engine: string (nullable = false)
# Initialize Postgres connection
postgres = Postgres(
host="192.169.11.23",
user="onetl",
password="somepassword",
database="mydb",
spark=spark,
)
# Initialize DBWriter
db_writer = DBWriter(
connection=postgres,
# write to specific table
target="public.my_table",
# with some writing options
options=Postgres.WriteOptions(if_exists="append"),
)
# Write DataFrame to Postgres table
db_writer.run(df_to_write)
# Success!
Raw data
{
"_id": null,
"home_page": "https://github.com/MobileTeleSystems/onetl",
"name": "onetl",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.7",
"maintainer_email": null,
"keywords": "Spark, ETL, JDBC, HWM",
"author": "DataOps.ETL",
"author_email": "onetools@mts.ru",
"download_url": "https://files.pythonhosted.org/packages/cb/07/ff156d837332346a4ebc82acbc3331051225e12156c6eada671f9d3acc8d/onetl-0.12.5.tar.gz",
"platform": null,
"description": ".. _readme:\n\nonETL\n=====\n\n|Repo Status| |PyPI Latest Release| |PyPI License| |PyPI Python Version| |PyPI Downloads|\n|Documentation| |CI Status| |Test Coverage| |pre-commit.ci Status|\n\n.. |Repo Status| image:: https://www.repostatus.org/badges/latest/active.svg\n :alt: Repo status - Active\n :target: https://github.com/MobileTeleSystems/onetl\n.. |PyPI Latest Release| image:: https://img.shields.io/pypi/v/onetl\n :alt: PyPI - Latest Release\n :target: https://pypi.org/project/onetl/\n.. |PyPI License| image:: https://img.shields.io/pypi/l/onetl.svg\n :alt: PyPI - License\n :target: https://github.com/MobileTeleSystems/onetl/blob/develop/LICENSE.txt\n.. |PyPI Python Version| image:: https://img.shields.io/pypi/pyversions/onetl.svg\n :alt: PyPI - Python Version\n :target: https://pypi.org/project/onetl/\n.. |PyPI Downloads| image:: https://img.shields.io/pypi/dm/onetl\n :alt: PyPI - Downloads\n :target: https://pypi.org/project/onetl/\n.. |Documentation| image:: https://readthedocs.org/projects/onetl/badge/?version=stable\n :alt: Documentation - ReadTheDocs\n :target: https://onetl.readthedocs.io/\n.. |CI Status| image:: https://github.com/MobileTeleSystems/onetl/workflows/Tests/badge.svg\n :alt: Github Actions - latest CI build status\n :target: https://github.com/MobileTeleSystems/onetl/actions\n.. |Test Coverage| image:: https://codecov.io/gh/MobileTeleSystems/onetl/branch/develop/graph/badge.svg?token=RIO8URKNZJ\n :alt: Test coverage - percent\n :target: https://codecov.io/gh/MobileTeleSystems/onetl\n.. |pre-commit.ci Status| image:: https://results.pre-commit.ci/badge/github/MobileTeleSystems/onetl/develop.svg\n :alt: pre-commit.ci - status\n :target: https://results.pre-commit.ci/latest/github/MobileTeleSystems/onetl/develop\n\n|Logo|\n\n.. |Logo| image:: https://raw.githubusercontent.com/MobileTeleSystems/onetl/0.12.5/docs/_static/logo_wide.svg\n :alt: onETL logo\n :target: https://github.com/MobileTeleSystems/onetl\n\nWhat is onETL?\n--------------\n\nPython ETL/ELT library powered by `Apache Spark <https://spark.apache.org/>`_ & other open-source tools.\n\nGoals\n-----\n\n* Provide unified classes to extract data from (**E**) & load data to (**L**) various stores.\n* Provides `Spark DataFrame API <https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.html>`_ for performing transformations (**T**) in terms of *ETL*.\n* Provide direct assess to database, allowing to execute SQL queries, as well as DDL, DML, and call functions/procedures. This can be used for building up *ELT* pipelines.\n* Support different `read strategies <https://onetl.readthedocs.io/en/stable/strategy/index.html>`_ for incremental and batch data fetching.\n* Provide `hooks <https://onetl.readthedocs.io/en/stable/hooks/index.html>`_ & `plugins <https://onetl.readthedocs.io/en/stable/plugins.html>`_ mechanism for altering behavior of internal classes.\n\nNon-goals\n---------\n\n* onETL is not a Spark replacement. It just provides additional functionality that Spark does not have, and improves UX for end users.\n* onETL is not a framework, as it does not have requirements to project structure, naming, the way of running ETL/ELT processes, configuration, etc. All of that should be implemented in some other tool.\n* onETL is deliberately developed without any integration with scheduling software like Apache Airflow. All integrations should be implemented as separated tools.\n* Only batch operations, no streaming. For streaming prefer `Apache Flink <https://flink.apache.org/>`_.\n\nRequirements\n------------\n\n* **Python 3.7 - 3.12**\n* PySpark 2.3.x - 3.5.x (depends on used connector)\n* Java 8+ (required by Spark, see below)\n* Kerberos libs & GCC (required by ``Hive``, ``HDFS`` and ``SparkHDFS`` connectors)\n\nSupported storages\n------------------\n\nDatabase\n~~~~~~~~\n\n+--------------------+--------------+-------------------------------------------------------------------------------------------------------------------------+\n| Type | Storage | Powered by |\n+====================+==============+=========================================================================================================================+\n| Database | Clickhouse | Apache Spark `JDBC Data Source <https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html>`_ |\n+ +--------------+ +\n| | MSSQL | |\n+ +--------------+ +\n| | MySQL | |\n+ +--------------+ +\n| | Postgres | |\n+ +--------------+ +\n| | Oracle | |\n+ +--------------+ +\n| | Teradata | |\n+ +--------------+-------------------------------------------------------------------------------------------------------------------------+\n| | Hive | Apache Spark `Hive integration <https://spark.apache.org/docs/latest/sql-data-sources-hive-tables.html>`_ |\n+ +--------------+-------------------------------------------------------------------------------------------------------------------------+\n| | Kafka | Apache Spark `Kafka integration <https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html>`_ |\n+ +--------------+-------------------------------------------------------------------------------------------------------------------------+\n| | Greenplum | VMware `Greenplum Spark connector <https://docs.vmware.com/en/VMware-Greenplum-Connector-for-Apache-Spark/index.html>`_ |\n+ +--------------+-------------------------------------------------------------------------------------------------------------------------+\n| | MongoDB | `MongoDB Spark connector <https://www.mongodb.com/docs/spark-connector/current>`_ |\n+--------------------+--------------+-------------------------------------------------------------------------------------------------------------------------+\n| File | HDFS | `HDFS Python client <https://pypi.org/project/hdfs/>`_ |\n+ +--------------+-------------------------------------------------------------------------------------------------------------------------+\n| | S3 | `minio-py client <https://pypi.org/project/minio/>`_ |\n+ +--------------+-------------------------------------------------------------------------------------------------------------------------+\n| | SFTP | `Paramiko library <https://pypi.org/project/paramiko/>`_ |\n+ +--------------+-------------------------------------------------------------------------------------------------------------------------+\n| | FTP | `FTPUtil library <https://pypi.org/project/ftputil/>`_ |\n+ +--------------+ +\n| | FTPS | |\n+ +--------------+-------------------------------------------------------------------------------------------------------------------------+\n| | WebDAV | `WebdavClient3 library <https://pypi.org/project/webdavclient3/>`_ |\n+ +--------------+-------------------------------------------------------------------------------------------------------------------------+\n| | Samba | `pysmb library <https://pypi.org/project/pysmb/>`_ |\n+--------------------+--------------+-------------------------------------------------------------------------------------------------------------------------+\n| Files as DataFrame | SparkLocalFS | Apache Spark `File Data Source <https://spark.apache.org/docs/latest/sql-data-sources-generic-options.html>`_ |\n| +--------------+ +\n| | SparkHDFS | |\n| +--------------+-------------------------------------------------------------------------------------------------------------------------+\n| | SparkS3 | `Hadoop AWS <https://hadoop.apache.org/docs/current3/hadoop-aws/tools/hadoop-aws/index.html>`_ library |\n+--------------------+--------------+-------------------------------------------------------------------------------------------------------------------------+\n\n.. documentation\n\nDocumentation\n-------------\n\nSee https://onetl.readthedocs.io/\n\nHow to install\n---------------\n\n.. _install:\n\nMinimal installation\n~~~~~~~~~~~~~~~~~~~~\n\n.. _minimal-install:\n\nBase ``onetl`` package contains:\n\n* ``DBReader``, ``DBWriter`` and related classes\n* ``FileDownloader``, ``FileUploader``, ``FileMover`` and related classes, like file filters & limits\n* ``FileDFReader``, ``FileDFWriter`` and related classes, like file formats\n* Read Strategies & HWM classes\n* Plugins support\n\nIt can be installed via:\n\n.. code:: bash\n\n pip install onetl\n\n.. warning::\n\n This method does NOT include any connections.\n\n This method is recommended for use in third-party libraries which require for ``onetl`` to be installed,\n but do not use its connection classes.\n\nWith DB and FileDF connections\n~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n\n.. _spark-install:\n\nAll DB connection classes (``Clickhouse``, ``Greenplum``, ``Hive`` and others)\nand all FileDF connection classes (``SparkHDFS``, ``SparkLocalFS``, ``SparkS3``)\nrequire Spark to be installed.\n\n.. _java-install:\n\nFirstly, you should install JDK. The exact installation instruction depends on your OS, here are some examples:\n\n.. code:: bash\n\n yum install java-1.8.0-openjdk-devel # CentOS 7 + Spark 2\n dnf install java-11-openjdk-devel # CentOS 8 + Spark 3\n apt-get install openjdk-11-jdk # Debian-based + Spark 3\n\n.. _spark-compatibility-matrix:\n\nCompatibility matrix\n^^^^^^^^^^^^^^^^^^^^\n\n+--------------------------------------------------------------+-------------+-------------+-------+\n| Spark | Python | Java | Scala |\n+==============================================================+=============+=============+=======+\n| `2.3.x <https://spark.apache.org/docs/2.3.1/#downloading>`_ | 3.7 only | 8 only | 2.11 |\n+--------------------------------------------------------------+-------------+-------------+-------+\n| `2.4.x <https://spark.apache.org/docs/2.4.8/#downloading>`_ | 3.7 only | 8 only | 2.11 |\n+--------------------------------------------------------------+-------------+-------------+-------+\n| `3.2.x <https://spark.apache.org/docs/3.2.4/#downloading>`_ | 3.7 - 3.10 | 8u201 - 11 | 2.12 |\n+--------------------------------------------------------------+-------------+-------------+-------+\n| `3.3.x <https://spark.apache.org/docs/3.3.4/#downloading>`_ | 3.7 - 3.10 | 8u201 - 17 | 2.12 |\n+--------------------------------------------------------------+-------------+-------------+-------+\n| `3.4.x <https://spark.apache.org/docs/3.4.3/#downloading>`_ | 3.7 - 3.12 | 8u362 - 20 | 2.12 |\n+--------------------------------------------------------------+-------------+-------------+-------+\n| `3.5.x <https://spark.apache.org/docs/3.5.3/#downloading>`_ | 3.8 - 3.12 | 8u371 - 20 | 2.12 |\n+--------------------------------------------------------------+-------------+-------------+-------+\n\n.. _pyspark-install:\n\nThen you should install PySpark via passing ``spark`` to ``extras``:\n\n.. code:: bash\n\n pip install onetl[spark] # install latest PySpark\n\nor install PySpark explicitly:\n\n.. code:: bash\n\n pip install onetl pyspark==3.5.3 # install a specific PySpark version\n\nor inject PySpark to ``sys.path`` in some other way BEFORE creating a class instance.\n**Otherwise connection object cannot be created.**\n\nWith File connections\n~~~~~~~~~~~~~~~~~~~~~\n\n.. _files-install:\n\nAll File (but not *FileDF*) connection classes (``FTP``, ``SFTP``, ``HDFS`` and so on) requires specific Python clients to be installed.\n\nEach client can be installed explicitly by passing connector name (in lowercase) to ``extras``:\n\n.. code:: bash\n\n pip install onetl[ftp] # specific connector\n pip install onetl[ftp,ftps,sftp,hdfs,s3,webdav,samba] # multiple connectors\n\nTo install all file connectors at once you can pass ``files`` to ``extras``:\n\n.. code:: bash\n\n pip install onetl[files]\n\n**Otherwise class import will fail.**\n\nWith Kerberos support\n~~~~~~~~~~~~~~~~~~~~~\n\n.. _kerberos-install:\n\nMost of Hadoop instances set up with Kerberos support,\nso some connections require additional setup to work properly.\n\n* ``HDFS``\n Uses `requests-kerberos <https://pypi.org/project/requests-kerberos/>`_ and\n `GSSApi <https://pypi.org/project/gssapi/>`_ for authentication.\n It also uses ``kinit`` executable to generate Kerberos ticket.\n\n* ``Hive`` and ``SparkHDFS``\n require Kerberos ticket to exist before creating Spark session.\n\nSo you need to install OS packages with:\n\n* ``krb5`` libs\n* Headers for ``krb5``\n* ``gcc`` or other compiler for C sources\n\nThe exact installation instruction depends on your OS, here are some examples:\n\n.. code:: bash\n\n dnf install krb5-devel gcc # CentOS, OracleLinux\n apt install libkrb5-dev gcc # Debian-based\n\nAlso you should pass ``kerberos`` to ``extras`` to install required Python packages:\n\n.. code:: bash\n\n pip install onetl[kerberos]\n\nFull bundle\n~~~~~~~~~~~\n\n.. _full-bundle:\n\nTo install all connectors and dependencies, you can pass ``all`` into ``extras``:\n\n.. code:: bash\n\n pip install onetl[all]\n\n # this is just the same as\n pip install onetl[spark,files,kerberos]\n\n.. warning::\n\n This method consumes a lot of disk space, and requires for Java & Kerberos libraries to be installed into your OS.\n\n.. _quick-start:\n\nQuick start\n------------\n\nMSSQL \u2192 Hive\n~~~~~~~~~~~~\n\nRead data from MSSQL, transform & write to Hive.\n\n.. code:: bash\n\n # install onETL and PySpark\n pip install onetl[spark]\n\n.. code:: python\n\n # Import pyspark to initialize the SparkSession\n from pyspark.sql import SparkSession\n\n # import function to setup onETL logging\n from onetl.log import setup_logging\n\n # Import required connections\n from onetl.connection import MSSQL, Hive\n\n # Import onETL classes to read & write data\n from onetl.db import DBReader, DBWriter\n\n # change logging level to INFO, and set up default logging format and handler\n setup_logging()\n\n # Initialize new SparkSession with MSSQL driver loaded\n maven_packages = MSSQL.get_packages()\n spark = (\n SparkSession.builder.appName(\"spark_app_onetl_demo\")\n .config(\"spark.jars.packages\", \",\".join(maven_packages))\n .enableHiveSupport() # for Hive\n .getOrCreate()\n )\n\n # Initialize MSSQL connection and check if database is accessible\n mssql = MSSQL(\n host=\"mssqldb.demo.com\",\n user=\"onetl\",\n password=\"onetl\",\n database=\"Telecom\",\n spark=spark,\n # These options are passed to MSSQL JDBC Driver:\n extra={\"applicationIntent\": \"ReadOnly\"},\n ).check()\n\n # >>> INFO:|MSSQL| Connection is available\n\n # Initialize DBReader\n reader = DBReader(\n connection=mssql,\n source=\"dbo.demo_table\",\n columns=[\"on\", \"etl\"],\n # Set some MSSQL read options:\n options=MSSQL.ReadOptions(fetchsize=10000),\n )\n\n # checks that there is data in the table, otherwise raises exception\n reader.raise_if_no_data()\n\n # Read data to DataFrame\n df = reader.run()\n df.printSchema()\n # root\n # |-- id: integer (nullable = true)\n # |-- phone_number: string (nullable = true)\n # |-- region: string (nullable = true)\n # |-- birth_date: date (nullable = true)\n # |-- registered_at: timestamp (nullable = true)\n # |-- account_balance: double (nullable = true)\n\n # Apply any PySpark transformations\n from pyspark.sql.functions import lit\n\n df_to_write = df.withColumn(\"engine\", lit(\"onetl\"))\n df_to_write.printSchema()\n # root\n # |-- id: integer (nullable = true)\n # |-- phone_number: string (nullable = true)\n # |-- region: string (nullable = true)\n # |-- birth_date: date (nullable = true)\n # |-- registered_at: timestamp (nullable = true)\n # |-- account_balance: double (nullable = true)\n # |-- engine: string (nullable = false)\n\n # Initialize Hive connection\n hive = Hive(cluster=\"rnd-dwh\", spark=spark)\n\n # Initialize DBWriter\n db_writer = DBWriter(\n connection=hive,\n target=\"dl_sb.demo_table\",\n # Set some Hive write options:\n options=Hive.WriteOptions(if_exists=\"replace_entire_table\"),\n )\n\n # Write data from DataFrame to Hive\n db_writer.run(df_to_write)\n\n # Success!\n\nSFTP \u2192 HDFS\n~~~~~~~~~~~\n\nDownload files from SFTP & upload them to HDFS.\n\n.. code:: bash\n\n # install onETL with SFTP and HDFS clients, and Kerberos support\n pip install onetl[hdfs,sftp,kerberos]\n\n.. code:: python\n\n # import function to setup onETL logging\n from onetl.log import setup_logging\n\n # Import required connections\n from onetl.connection import SFTP, HDFS\n\n # Import onETL classes to download & upload files\n from onetl.file import FileDownloader, FileUploader\n\n # import filter & limit classes\n from onetl.file.filter import Glob, ExcludeDir\n from onetl.file.limit import MaxFilesCount\n\n # change logging level to INFO, and set up default logging format and handler\n setup_logging()\n\n # Initialize SFTP connection and check it\n sftp = SFTP(\n host=\"sftp.test.com\",\n user=\"someuser\",\n password=\"somepassword\",\n ).check()\n\n # >>> INFO:|SFTP| Connection is available\n\n # Initialize downloader\n file_downloader = FileDownloader(\n connection=sftp,\n source_path=\"/remote/tests/Report\", # path on SFTP\n local_path=\"/local/onetl/Report\", # local fs path\n filters=[\n # download only files matching the glob\n Glob(\"*.csv\"),\n # exclude files from this directory\n ExcludeDir(\"/remote/tests/Report/exclude_dir/\"),\n ],\n limits=[\n # download max 1000 files per run\n MaxFilesCount(1000),\n ],\n options=FileDownloader.Options(\n # delete files from SFTP after successful download\n delete_source=True,\n # mark file as failed if it already exist in local_path\n if_exists=\"error\",\n ),\n )\n\n # Download files to local filesystem\n download_result = downloader.run()\n\n # Method run returns a DownloadResult object,\n # which contains collection of downloaded files, divided to 4 categories\n download_result\n\n # DownloadResult(\n # successful=[\n # LocalPath('/local/onetl/Report/file_1.json'),\n # LocalPath('/local/onetl/Report/file_2.json'),\n # ],\n # failed=[FailedRemoteFile('/remote/onetl/Report/file_3.json')],\n # ignored=[RemoteFile('/remote/onetl/Report/file_4.json')],\n # missing=[],\n # )\n\n # Raise exception if there are failed files, or there were no files in the remote filesystem\n download_result.raise_if_failed() or download_result.raise_if_empty()\n\n # Do any kind of magic with files: rename files, remove header for csv files, ...\n renamed_files = my_rename_function(download_result.success)\n\n # function removed \"_\" from file names\n # [\n # LocalPath('/home/onetl/Report/file1.json'),\n # LocalPath('/home/onetl/Report/file2.json'),\n # ]\n\n # Initialize HDFS connection\n hdfs = HDFS(\n host=\"my.name.node\",\n user=\"someuser\",\n password=\"somepassword\", # or keytab\n )\n\n # Initialize uploader\n file_uploader = FileUploader(\n connection=hdfs,\n target_path=\"/user/onetl/Report/\", # hdfs path\n )\n\n # Upload files from local fs to HDFS\n upload_result = file_uploader.run(renamed_files)\n\n # Method run returns a UploadResult object,\n # which contains collection of uploaded files, divided to 4 categories\n upload_result\n\n # UploadResult(\n # successful=[RemoteFile('/user/onetl/Report/file1.json')],\n # failed=[FailedLocalFile('/local/onetl/Report/file2.json')],\n # ignored=[],\n # missing=[],\n # )\n\n # Raise exception if there are failed files, or there were no files in the local filesystem, or some input file is missing\n upload_result.raise_if_failed() or upload_result.raise_if_empty() or upload_result.raise_if_missing()\n\n # Success!\n\n\nS3 \u2192 Postgres\n~~~~~~~~~~~~~~~~\n\nRead files directly from S3 path, convert them to dataframe, transform it and then write to a database.\n\n.. code:: bash\n\n # install onETL and PySpark\n pip install onetl[spark]\n\n.. code:: python\n\n # Import pyspark to initialize the SparkSession\n from pyspark.sql import SparkSession\n\n # import function to setup onETL logging\n from onetl.log import setup_logging\n\n # Import required connections\n from onetl.connection import Postgres, SparkS3\n\n # Import onETL classes to read files\n from onetl.file import FileDFReader\n from onetl.file.format import CSV\n\n # Import onETL classes to write data\n from onetl.db import DBWriter\n\n # change logging level to INFO, and set up default logging format and handler\n setup_logging()\n\n # Initialize new SparkSession with Hadoop AWS libraries and Postgres driver loaded\n maven_packages = SparkS3.get_packages(spark_version=\"3.5.3\") + Postgres.get_packages()\n spark = (\n SparkSession.builder.appName(\"spark_app_onetl_demo\")\n .config(\"spark.jars.packages\", \",\".join(maven_packages))\n .getOrCreate()\n )\n\n # Initialize S3 connection and check it\n spark_s3 = SparkS3(\n host=\"s3.test.com\",\n protocol=\"https\",\n bucket=\"my-bucket\",\n access_key=\"somekey\",\n secret_key=\"somesecret\",\n # Access bucket as s3.test.com/my-bucket\n extra={\"path.style.access\": True},\n spark=spark,\n ).check()\n\n # >>> INFO:|SparkS3| Connection is available\n\n # Describe file format and parsing options\n csv = CSV(\n delimiter=\";\",\n header=True,\n encoding=\"utf-8\",\n )\n\n # Describe DataFrame schema of files\n from pyspark.sql.types import (\n DateType,\n DoubleType,\n IntegerType,\n StringType,\n StructField,\n StructType,\n TimestampType,\n )\n\n df_schema = StructType(\n [\n StructField(\"id\", IntegerType()),\n StructField(\"phone_number\", StringType()),\n StructField(\"region\", StringType()),\n StructField(\"birth_date\", DateType()),\n StructField(\"registered_at\", TimestampType()),\n StructField(\"account_balance\", DoubleType()),\n ],\n )\n\n # Initialize file df reader\n reader = FileDFReader(\n connection=spark_s3,\n source_path=\"/remote/tests/Report\", # path on S3 there *.csv files are located\n format=csv, # file format with specific parsing options\n df_schema=df_schema, # columns & types\n )\n\n # Read files directly from S3 as Spark DataFrame\n df = reader.run()\n\n # Check that DataFrame schema is same as expected\n df.printSchema()\n # root\n # |-- id: integer (nullable = true)\n # |-- phone_number: string (nullable = true)\n # |-- region: string (nullable = true)\n # |-- birth_date: date (nullable = true)\n # |-- registered_at: timestamp (nullable = true)\n # |-- account_balance: double (nullable = true)\n\n # Apply any PySpark transformations\n from pyspark.sql.functions import lit\n\n df_to_write = df.withColumn(\"engine\", lit(\"onetl\"))\n df_to_write.printSchema()\n # root\n # |-- id: integer (nullable = true)\n # |-- phone_number: string (nullable = true)\n # |-- region: string (nullable = true)\n # |-- birth_date: date (nullable = true)\n # |-- registered_at: timestamp (nullable = true)\n # |-- account_balance: double (nullable = true)\n # |-- engine: string (nullable = false)\n\n # Initialize Postgres connection\n postgres = Postgres(\n host=\"192.169.11.23\",\n user=\"onetl\",\n password=\"somepassword\",\n database=\"mydb\",\n spark=spark,\n )\n\n # Initialize DBWriter\n db_writer = DBWriter(\n connection=postgres,\n # write to specific table\n target=\"public.my_table\",\n # with some writing options\n options=Postgres.WriteOptions(if_exists=\"append\"),\n )\n\n # Write DataFrame to Postgres table\n db_writer.run(df_to_write)\n\n # Success!\n",
"bugtrack_url": null,
"license": "Apache-2.0",
"summary": "One ETL tool to rule them all",
"version": "0.12.5",
"project_urls": {
"CI/CD": "https://github.com/MobileTeleSystems/onetl/actions",
"Documentation": "https://onetl.readthedocs.io/",
"Homepage": "https://github.com/MobileTeleSystems/onetl",
"Source": "https://github.com/MobileTeleSystems/onetl",
"Tracker": "https://github.com/MobileTeleSystems/onetl/issues"
},
"split_keywords": [
"spark",
" etl",
" jdbc",
" hwm"
],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "e53ee808d9a077afa86f3114ed7a2ba867633dce377c438deaa8935dd543f19f",
"md5": "bca970b9d39d85dc382568dc9ce1c2dc",
"sha256": "395868cc3fbb751056b951b4fec26ef754ad53e55e2a3ca1f7894102869efbc5"
},
"downloads": -1,
"filename": "onetl-0.12.5-py3-none-any.whl",
"has_sig": false,
"md5_digest": "bca970b9d39d85dc382568dc9ce1c2dc",
"packagetype": "bdist_wheel",
"python_version": "py3",
"requires_python": ">=3.7",
"size": 339149,
"upload_time": "2024-12-03T09:32:10",
"upload_time_iso_8601": "2024-12-03T09:32:10.568098Z",
"url": "https://files.pythonhosted.org/packages/e5/3e/e808d9a077afa86f3114ed7a2ba867633dce377c438deaa8935dd543f19f/onetl-0.12.5-py3-none-any.whl",
"yanked": false,
"yanked_reason": null
},
{
"comment_text": "",
"digests": {
"blake2b_256": "cb07ff156d837332346a4ebc82acbc3331051225e12156c6eada671f9d3acc8d",
"md5": "8142515b6d0f157f2cd2040a2682baae",
"sha256": "02447817d24c5bff7b9872ebd4deec1a6fe13f01821de9569661b1d70a06fc79"
},
"downloads": -1,
"filename": "onetl-0.12.5.tar.gz",
"has_sig": false,
"md5_digest": "8142515b6d0f157f2cd2040a2682baae",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.7",
"size": 228070,
"upload_time": "2024-12-03T09:32:12",
"upload_time_iso_8601": "2024-12-03T09:32:12.977365Z",
"url": "https://files.pythonhosted.org/packages/cb/07/ff156d837332346a4ebc82acbc3331051225e12156c6eada671f9d3acc8d/onetl-0.12.5.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-12-03 09:32:12",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "MobileTeleSystems",
"github_project": "onetl",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "onetl"
}