tf-yarn


Nametf-yarn JSON
Version 0.7.0 PyPI version JSON
download
home_pagehttps://github.com/criteo/tf-yarn
SummaryDistributed TensorFlow or pythorch on a YARN cluster
upload_time2023-11-03 16:04:24
maintainerCriteo
docs_urlNone
author
requires_python>=3.7
license
keywords tensorflow pytorch yarn
VCS
bugtrack_url
requirements cluster-pack skein tensorboard tensorflow_io pyarrow psutil
Travis-CI No Travis.
coveralls test coverage No coveralls.
            # tf-yarn

![tf-yarn](https://github.com/criteo/tf-yarn/blob/master/skein.png?raw=true)

tf-yarn is a Python library we have built at Criteo for training Pytorch and TensorFlow models on a Hadoop/YARN cluster. An introducing blog post can be found [here](https://medium.com/criteo-labs/train-tensorflow-models-on-yarn-in-just-a-few-lines-of-code-ba0f354f38e3).

It supports mono and multi-worker training, different distribution strategies and can run on CPUs or GPUs with just a few lines of code.


# Prerequisites

tf-yarn only supports Python ≥3.6.


# Installation

Note that tf-yarn does not directly depends on the ML frameworks it supports (TensorFlow, torch ...). That way, TensorFlow users don't install torch and conversely by installing tf-yarn. So you must install the ML framework(s) that you use separately (`pip install tensorflow`, `pip install torch` ...).


## Install with Pip

```bash
$ pip install tf-yarn
```


## Install from source

```bash
$ git clone https://github.com/criteo/tf-yarn
$ cd tf-yarn
$ pip install .
```


# TensorFlow prerequisites

Supported versions: [1.15.0 to 2.2.0].

Make sure to have Tensorflow working with HDFS by setting up all the environment variables as described [here](https://docs.w3cub.com/tensorflow~guide/deploy/hadoop).

You can run the `check_hadoop_env` script to check that your setup is OK (it has been installed by tf_yarn):

```
$ check_hadoop_env
# You should see something like
# INFO:tf_yarn.bin.check_hadoop_env:results will be written in /home/.../shared/Dev/tf-yarn/check_hadoop_env.log
# INFO:tf_yarn.bin.check_hadoop_env:check_env: True
# INFO:tf_yarn.bin.check_hadoop_env:write dummy file to hdfs hdfs://root/tmp/a1df7b99-fa47-4a86-b5f3-9bc09019190f/hello_tf_yarn.txt
# INFO:tf_yarn.bin.check_hadoop_env:check_local_hadoop_tensorflow: True
# INFO:root:Launching remote check
# ...
# INFO:tf_yarn.bin.check_hadoop_env:remote_check: True
# INFO:tf_yarn.bin.check_hadoop_env:Hadoop setup: OK
```


# Quick start

Distributing the training of a model with tf-yarn can be decomposed in two steps:
1. Describe your experiment: write the code that will be executed by the workers involved in the training. This includes the instantiation of the model to train, the training dataset (optionally the validation dataset) and the training loop.
2. Run your experiment: execute your code on yarn.

Refer to the part dedicated to your ML framework (TensorFlow, Pytorch ...) for a detailed description of these two steps


## TensorFlow

tf-yarn supports Keras API and the Estimator API (which was the only high-level API of the first TensorFlow releases).


### Describe your experiment


#### Keras API

A Keras experiment is described by an instance of `tf_yarn.tensorflow.KerasExperiment` composed of the following elements:

- model: compiled Keras model to train
- model_dir: hdfs directory where the model will be checkpointed
- train_params: training parameters that will be provided to `model.fit`. This does not include the training examples (input and target data)
- input_data_fn: function returning the input data (features only) to train the model on
- target_data_fn: function returning the target data (labels only) to train the model on
- validation_data_fn: function returning the data to evaluate the model on

Example:

```python
from tf_yarn.tensorflow import KerasExperiment

def input_data_fn():
    dataset = ...
    return dataset
        .shuffle(1000)
        .batch(128)
        .repeat()

def validation_data_fn():
    dataset = ...
    return dataset
        .shuffle(1000)
        .batch(128)

model = tf.keras.Sequential()
...
opt = tf.keras.optimizers.Adadelta(1.0 * HVD_SIZE)
model.compile(loss='sparse_categorical_crossentropy',
                optimizer=opt,
                metrics=['accuracy'])
train_params = {
    "steps_per_epoch": 100,
    "callbacks": my_callbacks
}

experiment = KerasExperiment(
    model=model,
    model_dir=hdfs_dir,
    train_params=train_params,
    input_data_fn=input_data_fn,
    target_data_fn=None,
    validation_data_fn=validation_data_fn
)
```


#### Estimator API

The experiment is described by an instance of `tf_yarn.tensorflow.Experiment` composed of the following elements:
- estimator: the model to train
- train_spec: an instance of [tf.estimator.TrainSpec](https://www.tensorflow.org/api_docs/python/tf/estimator/TrainSpec)
- eval_spec: an instance of [tf.estimator.EvalSpec](https://www.tensorflow.org/api_docs/python/tf/estimator/EvalSpec)

```python
from tf_yarn.tensorflow import Experiment

estimator = tf.estimator.Estimator(model_fn=model_fn)
train_spec = tf.estimator.TrainSpec(input_fn, max_steps=1)
eval_spec = tf.estimator.EvalSpec(input_fn, steps=1)
experiment = Experiment(estimator, train_spec, eval_spec)
```


### Run your experiment

To run your experiment on yarn, simply call the method `tf_yarn.tensorflow.run_on_yarn`. The only mandatory parameter is experiment_fn which must be a function accepting no parameter and returning your object `tf_yarn.tensorflow.KerasExperiment` or `tf_yarn.tensorflow.Experiment` which describes your experiment.

```python
from tf_yarn.tensorflow import run_on_yarn, KerasExperiment

def experiment_fn():
    ...
    return KerasExperiment(
        model=model,
        model_dir=hdfs_dir,
        train_params=train_params,
        input_data_fn=input_data_fn,
        target_data_fn=None,
        validation_data_fn=validation_data_fn
    )

run_on_yarn(
    experiment_fn
)
```

The default distribution strategy is [ParameterServerStrategy](https://www.tensorflow.org/tutorials/distribute/parameter_server_training) which belongs to the group of asynchronous distribution strategies.
Although this distribution strategy works very well with the Estimator API, we did not manage to make it work with the Keras API (with TensorFlow <= 2.2). So we advise Keras users to use [horovod gloo](https://github.com/criteo/tf-yarn/blob/master/docs/HorovodWithGloo.md) for distributing the training. Note that horovod gloo is a synchronous distribution strategy based on all-reduce ops:

```python
from tf_yarn.tensorflow import run_on_yarn, KerasExperiment

def experiment_fn():
    ...
    return KerasExperiment(
        model=model,
        model_dir=hdfs_dir,
        train_params=train_params,
        input_data_fn=input_data_fn,
        target_data_fn=None,
        validation_data_fn=validation_data_fn
    )

run_on_yarn(
    experiment_fn,
    custom_task_module="tf_yarn.tensorflow.tasks.gloo_allred_task"
)
```


## Pytorch


### Describe your experiment

A Pytorch experiment is described by an instance of `tf_yarn.pytorch.PytorchExperiment` composed of the following elements:

- model: model to train
- main_fn: Main function run to train the model. This function is executed by all workers involved in the training. It must accept these inputs: model to train, train dataloader, device (cpu:0, cpu:1, cuda:0, cuda:1 ...) allocated to the worker for the training and rank (worker id).
- Training dataset: training dataset (instance of `torch.utils.data.Dataset`, `webdataset.WebDataset` or `webdataset.DataPipeline`).
- dataloader_args: parameters (batch size, number of workers, collate function ...) passed to the dataloader used to load and iterate over the training dataset. Instance of `tf_yarn.pytorch.DataLoaderArgs`.
- tensorboard_hdfs_dir: HDFS directory where tensorboard results will be written at the end of the training
- ddp_args: DistributedDataParallel parameters. Refer to [Pytorch documentation](https://pytorch.org/docs/stable/_modules/torch/nn/parallel/distributed.html#DistributedDataParallel). Instance of `tf_yarn.pytorch.DistributedDataParallelArgs`

```python
from tf_yarn.pytorch import PytorchExperiment

def main_fn(
    model: torch.nn.Module,
    trainloader: torch.utils.data.dataloader.DataLoader,
    device: str,
    rank: int
):
    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
    for epoch in range(10):
        trainloader.sampler.set_epoch(epoch)
        for i, data in enumerate(trainloader, 0):
            data = data.to(rank)
            prediction = model(data)

transform = transforms.Compose(
    [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]
)
trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transform)

experiment = PytorchExperiment(
    model=model,
    main_fn=main_fn,
    train_dataset=trainset,
    dataloader_args=DataLoaderArgs(batch_size=4, num_workers=2)
)
```


### Run your experiment

To run your experiment on yarn, simply call the method `tf_yarn.pytorch.run_on_yarn`. The only mandatory parameters are:
- experiment_fn: must be a function accepting no parameter and returning your object `tf_yarn.pytorch.PytorchExperiment` which describes your experiment.
- task_specs: describe yarn resources that you want to use for your experiment.

```python
from tf_yarn.pytorch import run_on_yarn, PytorchExperiment, TaskSpec


def experiment_fn():
    ...
    return PytorchExperiment(
        model=model,
        main_fn=main_fn,
        train_dataset=trainset,
        dataloader_args=DataLoaderArgs(batch_size=4, num_workers=2)
    )


run_on_yarn(
    experiment_fn,
    task_specs={
        "worker": TaskSpec(memory=48 * 2 ** 10, vcores=48, instances=2, nb_proc_per_worker=2,
                           label=NodeLabel.GPU)
    }
)
```

The distribution strategy is [DistributedDataParallel](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html) which belongs to the family of synchronous distribution strategies.


# run_on_yarn

The method run_on_yarn exposes several parameters that let you configure the yarn job that will be created to train your model on yarn:

- `pyenv_zip_path`: Path to an archive of your python environment that will be used by executors to run your experiment. It can be a zipped conda env or a pex archive.
If your python environement is different between CPU and GPU, you can provide a dictionnary from `tf_yarn.topologies.NodeLabel` to a python environment. Example:

```python
from tf_yarn import NodeLabel
...
run_on_yarn(
    ...,
    pyenv_zip_path={
        NodeLabel.CPU: "viewfs://root/path/to/env-cpu",
        NodeLabel.GPU: "viewfs://root/path/to/env-gpu"
    }
)
```

If no archive is provided, tf-yarn will automatically package your active python environment in a pex.


- `task_specs`: used to define the resources that you need for your experiment. Dictionary from task names (ps, worker, chief, evaluator, tensorboard ...) to `tf_yarn.topologies.TaskSpec`. Example:

```python
from tf_yarn import TaskSpec, NodeLabel
...
run_on_yarn(
    ...,
    task_specs={
        "worker": TaskSpec(memory=48*2**10, vcores=48, instances=2, label=NodeLabel.GPU),
        "tensorboard": TaskSpec(memory=16*2**10, vcores=4, instances=1, label=NodeLabel.CPU)
    }
)
```

In this example, we are requesting 2 executors with GPUs, 48 vcores and 48 GBs for workers and 1 executor with 4 vcores and 16 GBs for tensorboard.


- `files`: local files or directories to upload on the executors. Dictionary from target location (on executor) to local location (on your local machine). Target locations must be relative to the executor root directory. Note that the executor root directory is appended to ``PYTHONPATH``. Therefore, any listed Python module will be importable.


- `env`: environment variables to set on executors. Dictionary from variable name to variable value. Example:

```python
run_on_yarn(
    ...,
    env={"HADOOP_CONF_DIR": "/etc/hadoop/conf.all"}
)
```


- `queue`: yarn queue to schedule your job in. Example:

```python
run_on_yarn(
    ...,
    queue="ml-gpu"
)
```


- `acls`: configures the application-level Access Control Lists (ACLs). Optional, defaults to ACLs all access. See `ACLs <https://jcrist.github.io/skein/specification.html#acls>` for details.


- `file_systems`: `skein` the library underlying `tf_yarn` automatically acquires a delegation token
for ``fs.defaultFS`` on security-enabled clusters. This should be enough for most
use-cases. However, if your experiment needs to access data on namenodes other than
the default one, you have to explicitly list them in the `file_systems` argument. This would instruct `skein` to acquire a delegation token for
these namenodes in addition to ``fs.defaultFS``:

```python
run_on_yarn(
    ...,
    file_systems=["hdfs://preprod"]
)
```

- `nb_retries`: number of times the yarn application is retried in case of failures

- `name`: Name of the yarn application


# Model evaluation

This feature is not supported for Pytorch.

Model training and model evaluation can be run independently. To do so, you must
use the parameter `custom_task_module` of `run_on_yarn`.

To run model training without evaluation:
```python
run_on_yarn(
    ...,
    task_specs={
        "chief": TaskSpec(memory="2 GiB", vcores=4),
        "worker": TaskSpec(memory="2 GiB", vcores=4, instances=2),
        "ps": TaskSpec(memory="2 GiB", vcores=8),
        "tensorboard": TaskSpec(memory="2 GiB", vcores=1)
    }
)
```

To run model evaluation:
```python
run_on_yarn(
    ...,
    task_specs={
        "evaluator": TaskSpec(memory="2 GiB", vcores=1)
    },
    custom_task_module="tf_yarn.tasks.evaluator_task"
)
```

# Examples

Please refer to the various examples available in [examples](https://github.com/criteo/tf-yarn/tree/master/examples)


# Other documentations

[MLflow](https://www.mlflow.org/docs/latest/quickstart.html) to track experiments.
More infos [here](https://github.com/criteo/tf-yarn/blob/master/docs/MLflow.md).

[Tensorboard](https://github.com/criteo/tf-yarn/blob/master/docs/Tensorboard.md) can be spawned in a separate container during learnings.

Two alternatives to TensorFlow's distribution strategies are available:
[Horovod with gloo](https://github.com/criteo/tf-yarn/blob/master/docs/HorovodWithGloo.md) and [tf-collective-all-reduce](https://github.com/criteo/tf-collective-all-reduce)



            

Raw data

            {
    "_id": null,
    "home_page": "https://github.com/criteo/tf-yarn",
    "name": "tf-yarn",
    "maintainer": "Criteo",
    "docs_url": null,
    "requires_python": ">=3.7",
    "maintainer_email": "github@criteo.com",
    "keywords": "tensorflow pytorch yarn",
    "author": "",
    "author_email": "",
    "download_url": "",
    "platform": null,
    "description": "# tf-yarn\n\n![tf-yarn](https://github.com/criteo/tf-yarn/blob/master/skein.png?raw=true)\n\ntf-yarn is a Python library we have built at Criteo for training Pytorch and TensorFlow models on a Hadoop/YARN cluster. An introducing blog post can be found [here](https://medium.com/criteo-labs/train-tensorflow-models-on-yarn-in-just-a-few-lines-of-code-ba0f354f38e3).\n\nIt supports mono and multi-worker training, different distribution strategies and can run on CPUs or GPUs with just a few lines of code.\n\n\n# Prerequisites\n\ntf-yarn only supports Python \u22653.6.\n\n\n# Installation\n\nNote that tf-yarn does not directly depends on the ML frameworks it supports (TensorFlow, torch ...). That way, TensorFlow users don't install torch and conversely by installing tf-yarn. So you must install the ML framework(s) that you use separately (`pip install tensorflow`, `pip install torch` ...).\n\n\n## Install with Pip\n\n```bash\n$ pip install tf-yarn\n```\n\n\n## Install from source\n\n```bash\n$ git clone https://github.com/criteo/tf-yarn\n$ cd tf-yarn\n$ pip install .\n```\n\n\n# TensorFlow prerequisites\n\nSupported versions: [1.15.0 to 2.2.0].\n\nMake sure to have Tensorflow working with HDFS by setting up all the environment variables as described [here](https://docs.w3cub.com/tensorflow~guide/deploy/hadoop).\n\nYou can run the `check_hadoop_env` script to check that your setup is OK (it has been installed by tf_yarn):\n\n```\n$ check_hadoop_env\n# You should see something like\n# INFO:tf_yarn.bin.check_hadoop_env:results will be written in /home/.../shared/Dev/tf-yarn/check_hadoop_env.log\n# INFO:tf_yarn.bin.check_hadoop_env:check_env: True\n# INFO:tf_yarn.bin.check_hadoop_env:write dummy file to hdfs hdfs://root/tmp/a1df7b99-fa47-4a86-b5f3-9bc09019190f/hello_tf_yarn.txt\n# INFO:tf_yarn.bin.check_hadoop_env:check_local_hadoop_tensorflow: True\n# INFO:root:Launching remote check\n# ...\n# INFO:tf_yarn.bin.check_hadoop_env:remote_check: True\n# INFO:tf_yarn.bin.check_hadoop_env:Hadoop setup: OK\n```\n\n\n# Quick start\n\nDistributing the training of a model with tf-yarn can be decomposed in two steps:\n1. Describe your experiment: write the code that will be executed by the workers involved in the training. This includes the instantiation of the model to train, the training dataset (optionally the validation dataset) and the training loop.\n2. Run your experiment: execute your code on yarn.\n\nRefer to the part dedicated to your ML framework (TensorFlow, Pytorch ...) for a detailed description of these two steps\n\n\n## TensorFlow\n\ntf-yarn supports Keras API and the Estimator API (which was the only high-level API of the first TensorFlow releases).\n\n\n### Describe your experiment\n\n\n#### Keras API\n\nA Keras experiment is described by an instance of `tf_yarn.tensorflow.KerasExperiment` composed of the following elements:\n\n- model: compiled Keras model to train\n- model_dir: hdfs directory where the model will be checkpointed\n- train_params: training parameters that will be provided to `model.fit`. This does not include the training examples (input and target data)\n- input_data_fn: function returning the input data (features only) to train the model on\n- target_data_fn: function returning the target data (labels only) to train the model on\n- validation_data_fn: function returning the data to evaluate the model on\n\nExample:\n\n```python\nfrom tf_yarn.tensorflow import KerasExperiment\n\ndef input_data_fn():\n    dataset = ...\n    return dataset\n        .shuffle(1000)\n        .batch(128)\n        .repeat()\n\ndef validation_data_fn():\n    dataset = ...\n    return dataset\n        .shuffle(1000)\n        .batch(128)\n\nmodel = tf.keras.Sequential()\n...\nopt = tf.keras.optimizers.Adadelta(1.0 * HVD_SIZE)\nmodel.compile(loss='sparse_categorical_crossentropy',\n                optimizer=opt,\n                metrics=['accuracy'])\ntrain_params = {\n    \"steps_per_epoch\": 100,\n    \"callbacks\": my_callbacks\n}\n\nexperiment = KerasExperiment(\n    model=model,\n    model_dir=hdfs_dir,\n    train_params=train_params,\n    input_data_fn=input_data_fn,\n    target_data_fn=None,\n    validation_data_fn=validation_data_fn\n)\n```\n\n\n#### Estimator API\n\nThe experiment is described by an instance of `tf_yarn.tensorflow.Experiment` composed of the following elements:\n- estimator: the model to train\n- train_spec: an instance of [tf.estimator.TrainSpec](https://www.tensorflow.org/api_docs/python/tf/estimator/TrainSpec)\n- eval_spec: an instance of [tf.estimator.EvalSpec](https://www.tensorflow.org/api_docs/python/tf/estimator/EvalSpec)\n\n```python\nfrom tf_yarn.tensorflow import Experiment\n\nestimator = tf.estimator.Estimator(model_fn=model_fn)\ntrain_spec = tf.estimator.TrainSpec(input_fn, max_steps=1)\neval_spec = tf.estimator.EvalSpec(input_fn, steps=1)\nexperiment = Experiment(estimator, train_spec, eval_spec)\n```\n\n\n### Run your experiment\n\nTo run your experiment on yarn, simply call the method `tf_yarn.tensorflow.run_on_yarn`. The only mandatory parameter is experiment_fn which must be a function accepting no parameter and returning your object `tf_yarn.tensorflow.KerasExperiment` or `tf_yarn.tensorflow.Experiment` which describes your experiment.\n\n```python\nfrom tf_yarn.tensorflow import run_on_yarn, KerasExperiment\n\ndef experiment_fn():\n    ...\n    return KerasExperiment(\n        model=model,\n        model_dir=hdfs_dir,\n        train_params=train_params,\n        input_data_fn=input_data_fn,\n        target_data_fn=None,\n        validation_data_fn=validation_data_fn\n    )\n\nrun_on_yarn(\n    experiment_fn\n)\n```\n\nThe default distribution strategy is [ParameterServerStrategy](https://www.tensorflow.org/tutorials/distribute/parameter_server_training) which belongs to the group of asynchronous distribution strategies.\nAlthough this distribution strategy works very well with the Estimator API, we did not manage to make it work with the Keras API (with TensorFlow <= 2.2). So we advise Keras users to use [horovod gloo](https://github.com/criteo/tf-yarn/blob/master/docs/HorovodWithGloo.md) for distributing the training. Note that horovod gloo is a synchronous distribution strategy based on all-reduce ops:\n\n```python\nfrom tf_yarn.tensorflow import run_on_yarn, KerasExperiment\n\ndef experiment_fn():\n    ...\n    return KerasExperiment(\n        model=model,\n        model_dir=hdfs_dir,\n        train_params=train_params,\n        input_data_fn=input_data_fn,\n        target_data_fn=None,\n        validation_data_fn=validation_data_fn\n    )\n\nrun_on_yarn(\n    experiment_fn,\n    custom_task_module=\"tf_yarn.tensorflow.tasks.gloo_allred_task\"\n)\n```\n\n\n## Pytorch\n\n\n### Describe your experiment\n\nA Pytorch experiment is described by an instance of `tf_yarn.pytorch.PytorchExperiment` composed of the following elements:\n\n- model: model to train\n- main_fn: Main function run to train the model. This function is executed by all workers involved in the training. It must accept these inputs: model to train, train dataloader, device (cpu:0, cpu:1, cuda:0, cuda:1 ...) allocated to the worker for the training and rank (worker id).\n- Training dataset: training dataset (instance of `torch.utils.data.Dataset`, `webdataset.WebDataset` or `webdataset.DataPipeline`).\n- dataloader_args: parameters (batch size, number of workers, collate function ...) passed to the dataloader used to load and iterate over the training dataset. Instance of `tf_yarn.pytorch.DataLoaderArgs`.\n- tensorboard_hdfs_dir: HDFS directory where tensorboard results will be written at the end of the training\n- ddp_args: DistributedDataParallel parameters. Refer to [Pytorch documentation](https://pytorch.org/docs/stable/_modules/torch/nn/parallel/distributed.html#DistributedDataParallel). Instance of `tf_yarn.pytorch.DistributedDataParallelArgs`\n\n```python\nfrom tf_yarn.pytorch import PytorchExperiment\n\ndef main_fn(\n    model: torch.nn.Module,\n    trainloader: torch.utils.data.dataloader.DataLoader,\n    device: str,\n    rank: int\n):\n    loss_fn = nn.CrossEntropyLoss()\n    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)\n    for epoch in range(10):\n        trainloader.sampler.set_epoch(epoch)\n        for i, data in enumerate(trainloader, 0):\n            data = data.to(rank)\n            prediction = model(data)\n\ntransform = transforms.Compose(\n    [transforms.ToTensor(), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))]\n)\ntrainset = torchvision.datasets.CIFAR10(root='./data', train=True,\n                                        download=True, transform=transform)\n\nexperiment = PytorchExperiment(\n    model=model,\n    main_fn=main_fn,\n    train_dataset=trainset,\n    dataloader_args=DataLoaderArgs(batch_size=4, num_workers=2)\n)\n```\n\n\n### Run your experiment\n\nTo run your experiment on yarn, simply call the method `tf_yarn.pytorch.run_on_yarn`. The only mandatory parameters are:\n- experiment_fn: must be a function accepting no parameter and returning your object `tf_yarn.pytorch.PytorchExperiment` which describes your experiment.\n- task_specs: describe yarn resources that you want to use for your experiment.\n\n```python\nfrom tf_yarn.pytorch import run_on_yarn, PytorchExperiment, TaskSpec\n\n\ndef experiment_fn():\n    ...\n    return PytorchExperiment(\n        model=model,\n        main_fn=main_fn,\n        train_dataset=trainset,\n        dataloader_args=DataLoaderArgs(batch_size=4, num_workers=2)\n    )\n\n\nrun_on_yarn(\n    experiment_fn,\n    task_specs={\n        \"worker\": TaskSpec(memory=48 * 2 ** 10, vcores=48, instances=2, nb_proc_per_worker=2,\n                           label=NodeLabel.GPU)\n    }\n)\n```\n\nThe distribution strategy is [DistributedDataParallel](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html) which belongs to the family of synchronous distribution strategies.\n\n\n# run_on_yarn\n\nThe method run_on_yarn exposes several parameters that let you configure the yarn job that will be created to train your model on yarn:\n\n- `pyenv_zip_path`: Path to an archive of your python environment that will be used by executors to run your experiment. It can be a zipped conda env or a pex archive.\nIf your python environement is different between CPU and GPU, you can provide a dictionnary from `tf_yarn.topologies.NodeLabel` to a python environment. Example:\n\n```python\nfrom tf_yarn import NodeLabel\n...\nrun_on_yarn(\n    ...,\n    pyenv_zip_path={\n        NodeLabel.CPU: \"viewfs://root/path/to/env-cpu\",\n        NodeLabel.GPU: \"viewfs://root/path/to/env-gpu\"\n    }\n)\n```\n\nIf no archive is provided, tf-yarn will automatically package your active python environment in a pex.\n\n\n- `task_specs`: used to define the resources that you need for your experiment. Dictionary from task names (ps, worker, chief, evaluator, tensorboard ...) to `tf_yarn.topologies.TaskSpec`. Example:\n\n```python\nfrom tf_yarn import TaskSpec, NodeLabel\n...\nrun_on_yarn(\n    ...,\n    task_specs={\n        \"worker\": TaskSpec(memory=48*2**10, vcores=48, instances=2, label=NodeLabel.GPU),\n        \"tensorboard\": TaskSpec(memory=16*2**10, vcores=4, instances=1, label=NodeLabel.CPU)\n    }\n)\n```\n\nIn this example, we are requesting 2 executors with GPUs, 48 vcores and 48 GBs for workers and 1 executor with 4 vcores and 16 GBs for tensorboard.\n\n\n- `files`: local files or directories to upload on the executors. Dictionary from target location (on executor) to local location (on your local machine). Target locations must be relative to the executor root directory. Note that the executor root directory is appended to ``PYTHONPATH``. Therefore, any listed Python module will be importable.\n\n\n- `env`: environment variables to set on executors. Dictionary from variable name to variable value. Example:\n\n```python\nrun_on_yarn(\n    ...,\n    env={\"HADOOP_CONF_DIR\": \"/etc/hadoop/conf.all\"}\n)\n```\n\n\n- `queue`: yarn queue to schedule your job in. Example:\n\n```python\nrun_on_yarn(\n    ...,\n    queue=\"ml-gpu\"\n)\n```\n\n\n- `acls`: configures the application-level Access Control Lists (ACLs). Optional, defaults to ACLs all access. See `ACLs <https://jcrist.github.io/skein/specification.html#acls>` for details.\n\n\n- `file_systems`: `skein` the library underlying `tf_yarn` automatically acquires a delegation token\nfor ``fs.defaultFS`` on security-enabled clusters. This should be enough for most\nuse-cases. However, if your experiment needs to access data on namenodes other than\nthe default one, you have to explicitly list them in the `file_systems` argument. This would instruct `skein` to acquire a delegation token for\nthese namenodes in addition to ``fs.defaultFS``:\n\n```python\nrun_on_yarn(\n    ...,\n    file_systems=[\"hdfs://preprod\"]\n)\n```\n\n- `nb_retries`: number of times the yarn application is retried in case of failures\n\n- `name`: Name of the yarn application\n\n\n# Model evaluation\n\nThis feature is not supported for Pytorch.\n\nModel training and model evaluation can be run independently. To do so, you must\nuse the parameter `custom_task_module` of `run_on_yarn`.\n\nTo run model training without evaluation:\n```python\nrun_on_yarn(\n    ...,\n    task_specs={\n        \"chief\": TaskSpec(memory=\"2 GiB\", vcores=4),\n        \"worker\": TaskSpec(memory=\"2 GiB\", vcores=4, instances=2),\n        \"ps\": TaskSpec(memory=\"2 GiB\", vcores=8),\n        \"tensorboard\": TaskSpec(memory=\"2 GiB\", vcores=1)\n    }\n)\n```\n\nTo run model evaluation:\n```python\nrun_on_yarn(\n    ...,\n    task_specs={\n        \"evaluator\": TaskSpec(memory=\"2 GiB\", vcores=1)\n    },\n    custom_task_module=\"tf_yarn.tasks.evaluator_task\"\n)\n```\n\n# Examples\n\nPlease refer to the various examples available in [examples](https://github.com/criteo/tf-yarn/tree/master/examples)\n\n\n# Other documentations\n\n[MLflow](https://www.mlflow.org/docs/latest/quickstart.html) to track experiments.\nMore infos [here](https://github.com/criteo/tf-yarn/blob/master/docs/MLflow.md).\n\n[Tensorboard](https://github.com/criteo/tf-yarn/blob/master/docs/Tensorboard.md) can be spawned in a separate container during learnings.\n\nTwo alternatives to TensorFlow's distribution strategies are available:\n[Horovod with gloo](https://github.com/criteo/tf-yarn/blob/master/docs/HorovodWithGloo.md) and [tf-collective-all-reduce](https://github.com/criteo/tf-collective-all-reduce)\n\n\n",
    "bugtrack_url": null,
    "license": "",
    "summary": "Distributed TensorFlow or pythorch on a YARN cluster",
    "version": "0.7.0",
    "project_urls": {
        "Homepage": "https://github.com/criteo/tf-yarn"
    },
    "split_keywords": [
        "tensorflow",
        "pytorch",
        "yarn"
    ],
    "urls": [
        {
            "comment_text": "",
            "digests": {
                "blake2b_256": "eea4adc2f0d952efbfe70e56f94da28f56a41f424fc1e2f88c5d2231915b65c8",
                "md5": "ae8bb1cbf9f246122abf42aafe30fb87",
                "sha256": "2ffeedc2461ae09c71f09680d9a2016a64de06eed3d31d62b27c8d724f3b74b2"
            },
            "downloads": -1,
            "filename": "tf_yarn-0.7.0-py3-none-any.whl",
            "has_sig": false,
            "md5_digest": "ae8bb1cbf9f246122abf42aafe30fb87",
            "packagetype": "bdist_wheel",
            "python_version": "py3",
            "requires_python": ">=3.7",
            "size": 67396,
            "upload_time": "2023-11-03T16:04:24",
            "upload_time_iso_8601": "2023-11-03T16:04:24.995758Z",
            "url": "https://files.pythonhosted.org/packages/ee/a4/adc2f0d952efbfe70e56f94da28f56a41f424fc1e2f88c5d2231915b65c8/tf_yarn-0.7.0-py3-none-any.whl",
            "yanked": false,
            "yanked_reason": null
        }
    ],
    "upload_time": "2023-11-03 16:04:24",
    "github": true,
    "gitlab": false,
    "bitbucket": false,
    "codeberg": false,
    "github_user": "criteo",
    "github_project": "tf-yarn",
    "travis_ci": false,
    "coveralls": false,
    "github_actions": true,
    "requirements": [
        {
            "name": "cluster-pack",
            "specs": [
                [
                    ">=",
                    "0.2.19"
                ],
                [
                    "<",
                    "3.0.0"
                ]
            ]
        },
        {
            "name": "skein",
            "specs": [
                [
                    "<",
                    "0.9"
                ],
                [
                    ">=",
                    "0.8"
                ]
            ]
        },
        {
            "name": "tensorboard",
            "specs": []
        },
        {
            "name": "tensorflow_io",
            "specs": []
        },
        {
            "name": "pyarrow",
            "specs": []
        },
        {
            "name": "psutil",
            "specs": []
        }
    ],
    "lcname": "tf-yarn"
}
        
Elapsed time: 1.74601s