crosstream


Namecrosstream JSON
Version 3.0.0 PyPI version JSON
download
home_pagehttps://github.com/bertwagner/crosstream
SummaryA package for streaming cross-server dataset joins in memory
upload_time2023-04-20 17:25:04
maintainer
docs_urlNone
authorBert Wagner
requires_python>=3.6
license
keywords
VCS
bugtrack_url
requirements No requirements were recorded.
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # crosstream 🐍🌊🤝

This Python package helps join large datasets across servers efficiently. It accomplishes this by streaming the data, allowing it to:
 - read each dataset only once
 - not need to store the complete datasets in memory

This is particularly helpful when your datasets are larger than the available memory on your machine or when you want to minimize network reads.

This package supports CSV and pyodbc datasources.

## Installation

From PyPI:
```
pip install crosstream
```

From source:

```
git clone https://github.com/bertwagner/crosstream.git
cd crosstream
pip install .
```

## Usage Examples

### Basic hash join

Ideally you want to make your smaller dataset the first dataset in your join.

```
import crosstream as cs
import csv

file1 = 'small_dataset.csv'
file2 = 'large_dataset.csv'

# join using column indexes or column names
c1 = cs.read_csv(file1,True,[0,1])
c2 = cs.read_csv(file2, True, ['col1','col2'])

# specify the output file
with open('joined_output.csv', 'w') as f:
    w = csv.writer(f)
    
    # write header column names
    w.writerow(c1.column_names + c2.column_names)

    for row_left,row_right in cs.inner_hash_join(c1,c2):
        # write matched results to our joined_output.csv
        w.writerow(row_left + row_right)
```

More examples can be found under the `tests` directory.

### Custom method for determining equality

By default, this package will join only if all values are equal.

If you want to perform a transformation on your data before comparing for equality, or use more complicated join equality logic, you can pass in your own function into `override_build_join_key` to define how equality is determined:

```
# define a function for transforming join key data before it's hashed
def custom_join_key_transform(value):
    transformed = value.replace(' ','')
    return transformed
```

And then pass that into the `inner_hash_join()` method:

```
...
for row_left,row_right in cs.inner_hash_join(c1,c2,override_join_key_transform=custom_join_key_transform)
...
```

### Custom method for processing matched hashes

By default this package returns tuples of the joined rows. If you want to customize what the output of your matched data looks like (perform transformations after a match is found), you can pass a function to the `override_process_matched_hashes` argument:

```
# define a function for performing additional transformations or adding additional outputs before the columns are returned
def custom_process_matched_hashes(bucket_row,probe_row, bucket_join_column_indexes, probe_join_column_indexes):
    # adding a new column indicating the weights of these matches are equal to 1
    weight=1.0
    return tuple(bucket_row),tuple(probe_row),(weight,)
```

And then pass that into the `inner_hash_join()` method:

```
for row_left,row_right,weight in cs.inner_hash_join(c1,c2,override_process_matched_hashes=custom_process_matched_hashes):
```


## Tests

If you want to run the tests, you will need to ensure you have the sqlite odbc driver installed:

```
apt-get install libsqliteodbc unixodbc
```

Then execute the tests:

```
pytest
```

And check for coverage:

```
pytest --cov-config=tests/.coveragerc --cov=crosstream --cov-report term-missing
```

            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/bertwagner/crosstream",
    "name": "crosstream",
    "maintainer": "",
    "docs_url": null,
    "requires_python": ">=3.6",
    "maintainer_email": "",
    "keywords": "",
    "author": "Bert Wagner",
    "author_email": "",
    "download_url": "https://files.pythonhosted.org/packages/e3/ab/b2f60b10d5da514f0d6415796c440877673c2460298ca91207792c80bf54/crosstream-3.0.0.tar.gz",
    "platform": null,
    "description": "# crosstream \ud83d\udc0d\ud83c\udf0a\ud83e\udd1d\r\n\r\nThis Python package helps join large datasets across servers efficiently. It accomplishes this by streaming the data, allowing it to:\r\n - read each dataset only once\r\n - not need to store the complete datasets in memory\r\n\r\nThis is particularly helpful when your datasets are larger than the available memory on your machine or when you want to minimize network reads.\r\n\r\nThis package supports CSV and pyodbc datasources.\r\n\r\n## Installation\r\n\r\nFrom PyPI:\r\n```\r\npip install crosstream\r\n```\r\n\r\nFrom source:\r\n\r\n```\r\ngit clone https://github.com/bertwagner/crosstream.git\r\ncd crosstream\r\npip install .\r\n```\r\n\r\n## Usage Examples\r\n\r\n### Basic hash join\r\n\r\nIdeally you want to make your smaller dataset the first dataset in your join.\r\n\r\n```\r\nimport crosstream as cs\r\nimport csv\r\n\r\nfile1 = 'small_dataset.csv'\r\nfile2 = 'large_dataset.csv'\r\n\r\n# join using column indexes or column names\r\nc1 = cs.read_csv(file1,True,[0,1])\r\nc2 = cs.read_csv(file2, True, ['col1','col2'])\r\n\r\n# specify the output file\r\nwith open('joined_output.csv', 'w') as f:\r\n    w = csv.writer(f)\r\n    \r\n    # write header column names\r\n    w.writerow(c1.column_names + c2.column_names)\r\n\r\n    for row_left,row_right in cs.inner_hash_join(c1,c2):\r\n        # write matched results to our joined_output.csv\r\n        w.writerow(row_left + row_right)\r\n```\r\n\r\nMore examples can be found under the `tests` directory.\r\n\r\n### Custom method for determining equality\r\n\r\nBy default, this package will join only if all values are equal.\r\n\r\nIf you want to perform a transformation on your data before comparing for equality, or use more complicated join equality logic, you can pass in your own function into `override_build_join_key` to define how equality is determined:\r\n\r\n```\r\n# define a function for transforming join key data before it's hashed\r\ndef custom_join_key_transform(value):\r\n    transformed = value.replace(' ','')\r\n    return transformed\r\n```\r\n\r\nAnd then pass that into the `inner_hash_join()` method:\r\n\r\n```\r\n...\r\nfor row_left,row_right in cs.inner_hash_join(c1,c2,override_join_key_transform=custom_join_key_transform)\r\n...\r\n```\r\n\r\n### Custom method for processing matched hashes\r\n\r\nBy default this package returns tuples of the joined rows. If you want to customize what the output of your matched data looks like (perform transformations after a match is found), you can pass a function to the `override_process_matched_hashes` argument:\r\n\r\n```\r\n# define a function for performing additional transformations or adding additional outputs before the columns are returned\r\ndef custom_process_matched_hashes(bucket_row,probe_row, bucket_join_column_indexes, probe_join_column_indexes):\r\n    # adding a new column indicating the weights of these matches are equal to 1\r\n    weight=1.0\r\n    return tuple(bucket_row),tuple(probe_row),(weight,)\r\n```\r\n\r\nAnd then pass that into the `inner_hash_join()` method:\r\n\r\n```\r\nfor row_left,row_right,weight in cs.inner_hash_join(c1,c2,override_process_matched_hashes=custom_process_matched_hashes):\r\n```\r\n\r\n\r\n## Tests\r\n\r\nIf you want to run the tests, you will need to ensure you have the sqlite odbc driver installed:\r\n\r\n```\r\napt-get install libsqliteodbc unixodbc\r\n```\r\n\r\nThen execute the tests:\r\n\r\n```\r\npytest\r\n```\r\n\r\nAnd check for coverage:\r\n\r\n```\r\npytest --cov-config=tests/.coveragerc --cov=crosstream --cov-report term-missing\r\n```\r\n",
    "bugtrack_url": null,
    "license": "",
    "summary": "A package for streaming cross-server dataset joins in memory",
    "version": "3.0.0",
    "split_keywords": [],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "a1a597a7f22fecc006c4e060144ac9ae481585cff94158c0d5cee1886fb05b98",
                "md5": "dd5806a13045c6198e8a75d2d5370ef8",
                "sha256": "6d3b34c65d3260480a8b42d5a2776f337e59c61660ef390072aaedcf56870e3d"
            },
            "downloads": -1,
            "filename": "crosstream-3.0.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "dd5806a13045c6198e8a75d2d5370ef8",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.6",
            "size": 6131,
            "upload_time": "2023-04-20T17:25:03",
            "upload_time_iso_8601": "2023-04-20T17:25:03.433252Z",
            "url": "https://files.pythonhosted.org/packages/a1/a5/97a7f22fecc006c4e060144ac9ae481585cff94158c0d5cee1886fb05b98/crosstream-3.0.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        },
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "e3abb2f60b10d5da514f0d6415796c440877673c2460298ca91207792c80bf54",
                "md5": "1b709bb64d0b49f0fc5cdb2afd897f04",
                "sha256": "7769895a332869d566bc115aaed8b691fbdb65d01d41654f3ea493d2dcb23daa"
            },
            "downloads": -1,
            "filename": "crosstream-3.0.0.tar.gz",
            "has_sig": false,
            "md5_digest": "1b709bb64d0b49f0fc5cdb2afd897f04",
            "packagetype": "sdist",
            "python_version": "source",
            "requires_python": ">=3.6",
            "size": 6115,
            "upload_time": "2023-04-20T17:25:04",
            "upload_time_iso_8601": "2023-04-20T17:25:04.878809Z",
            "url": "https://files.pythonhosted.org/packages/e3/ab/b2f60b10d5da514f0d6415796c440877673c2460298ca91207792c80bf54/crosstream-3.0.0.tar.gz",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-04-20 17:25:04",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "github_user": "bertwagner",
    "github_project": "crosstream",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": false,
    "lcname": "crosstream"
}
        
Elapsed time: 0.07348s