# Kubeflow Pipelines SDK kfp-kubernetes API Reference
The Kubeflow Pipelines SDK kfp-kubernetes python library (part of the [Kubeflow Pipelines](https://www.kubeflow.org/docs/components/pipelines/) project) is an addon to the [Kubeflow Pipelines SDK](https://kubeflow-pipelines.readthedocs.io/) that enables authoring Kubeflow pipelines with Kubernetes-specific features and concepts, such as:
* [Secrets](https://kubernetes.io/docs/concepts/configuration/secret/)
* [PersistentVolumeClaims](https://kubernetes.io/docs/concepts/storage/persistent-volumes/#persistentvolumeclaims)
* [ImagePullPolicies](https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy)
* [Ephemeral volumes](https://kubernetes.io/docs/concepts/storage/ephemeral-volumes/)
* [Node selectors](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#nodeselector)
* [Tolerations](https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/)
* [Labels and annotations](https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/object-meta/#ObjectMeta)
* and more
Be sure to check out the full [API Reference](https://kfp-kubernetes.readthedocs.io/) for more details.
## Installation
The `kfp-kubernetes` package can be installed as a KFP SDK extra dependency.
```sh
pip install kfp[kubernetes]
```
Or installed independently:
```sh
pip install kfp-kubernetes
```
## Getting started
The following is an example of a simple pipeline that uses the kfp-kubernetes library to mount a pre-existing secret as an environment variable available in the task's container.
### Secret: As environment variable
```python
from kfp import dsl
from kfp import kubernetes
@dsl.component
def print_secret():
import os
print(os.environ['SECRET_VAR'])
@dsl.pipeline
def pipeline():
task = print_secret()
kubernetes.use_secret_as_env(task,
secret_name='my-secret',
secret_key_to_env={'password': 'SECRET_VAR'})
```
## Other examples
Here is a non-exhaustive list of some other examples of how to use the kfp-kubernetes library. Be sure to check out the full [API Reference](https://kfp-kubernetes.readthedocs.io/) for more details.
### Secret: As mounted volume
```python
from kfp import dsl
from kfp import kubernetes
@dsl.component
def print_secret():
with open('/mnt/my_vol') as f:
print(f.read())
@dsl.pipeline
def pipeline():
task = print_secret()
kubernetes.use_secret_as_volume(task,
secret_name='my-secret',
mount_path='/mnt/my_vol')
```
### Secret: As optional source for a mounted volume
```python
from kfp import dsl
from kfp import kubernetes
@dsl.component
def print_secret():
with open('/mnt/my_vol') as f:
print(f.read())
@dsl.pipeline
def pipeline():
task = print_secret()
kubernetes.use_secret_as_volume(task,
secret_name='my-secret',
mount_path='/mnt/my_vol'
optional=True)
```
### ConfigMap: As environment variable
```python
from kfp import dsl
from kfp import kubernetes
@dsl.component
def print_config_map():
import os
print(os.environ['CM_VAR'])
@dsl.pipeline
def pipeline():
task = print_config_map()
kubernetes.use_config_map_as_env(task,
config_map_name='my-cm',
config_map_key_to_env={'foo': 'CM_VAR'})
```
### ConfigMap: As mounted volume
```python
from kfp import dsl
from kfp import kubernetes
@dsl.component
def print_config_map():
with open('/mnt/my_vol') as f:
print(f.read())
@dsl.pipeline
def pipeline():
task = print_config_map()
kubernetes.use_config_map_as_volume(task,
config_map_name='my-cm',
mount_path='/mnt/my_vol')
```
### ConfigMap: As optional source for a mounted volume
```python
from kfp import dsl
from kfp import kubernetes
@dsl.component
def print_config_map():
with open('/mnt/my_vol') as f:
print(f.read())
@dsl.pipeline
def pipeline():
task = print_config_map()
kubernetes.use_config_map_as_volume(task,
config_map_name='my-cm',
mount_path='/mnt/my_vol',
optional=True)
```
### PersistentVolumeClaim: Dynamically create PVC, mount, then delete
```python
from kfp import dsl
from kfp import kubernetes
@dsl.component
def make_data():
with open('/data/file.txt', 'w') as f:
f.write('my data')
@dsl.component
def read_data():
with open('/reused_data/file.txt') as f:
print(f.read())
@dsl.pipeline
def my_pipeline():
pvc1 = kubernetes.CreatePVC(
# can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
pvc_name_suffix='-my-pvc',
access_modes=['ReadWriteOnce'],
size='5Gi',
storage_class_name='standard',
)
task1 = make_data()
# normally task sequencing is handled by data exchange via component inputs/outputs
# but since data is exchanged via volume, we need to call .after explicitly to sequence tasks
task2 = read_data().after(task1)
kubernetes.mount_pvc(
task1,
pvc_name=pvc1.outputs['name'],
mount_path='/data',
)
kubernetes.mount_pvc(
task2,
pvc_name=pvc1.outputs['name'],
mount_path='/reused_data',
)
# wait to delete the PVC until after task2 completes
delete_pvc1 = kubernetes.DeletePVC(
pvc_name=pvc1.outputs['name']).after(task2)
```
### PersistentVolumeClaim: Create PVC on-the-fly tied to your pod's lifecycle
```python
from kfp import dsl
from kfp import kubernetes
@dsl.component
def make_data():
with open('/data/file.txt', 'w') as f:
f.write('my data')
@dsl.pipeline
def my_pipeline():
task1 = make_data()
# note that the created pvc will be autoamatically cleaned up once pod disappeared and cannot be shared between pods
kubernetes.add_ephemeral_volume(
task1,
volume_name="my-pvc",
mount_path="/data",
access_modes=['ReadWriteOnce'],
size='5Gi',
)
```
### Pod Metadata: Add pod labels and annotations to the container pod's definition
```python
from kfp import dsl
from kfp import kubernetes
@dsl.component
def comp():
pass
@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.add_pod_label(
task,
label_key='kubeflow.com/kfp',
label_value='pipeline-node',
)
kubernetes.add_pod_annotation(
task,
annotation_key='run_id',
annotation_value='123456',
)
```
### Kubernetes Field: Use Kubernetes Field Path as enviornment variable
```python
from kfp import dsl
from kfp import kubernetes
@dsl.component
def comp():
pass
@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.use_field_path_as_env(
task,
env_name='KFP_RUN_NAME',
field_path="metadata.annotations['pipelines.kubeflow.org/run_name']"
)
```
### Timeout: Set timeout in seconds defined as pod spec's activeDeadlineSeconds
```python
from kfp import dsl
from kfp import kubernetes
@dsl.component
def comp():
pass
@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.set_timeout(task, 20)
```
### ImagePullPolicy: One of "Always" "Never", "IfNotPresent".
```python
from kfp import dsl
from kfp import kubernetes
@dsl.component
def simple_task():
print("hello-world")
@dsl.pipeline
def pipeline():
task = simple_task()
kubernetes.set_image_pull_policy(task, "Always")
```
Raw data
{
"_id": null,
"home_page": "https://github.com/kubeflow/pipelines",
"name": "kfp-kubernetes",
"maintainer": null,
"docs_url": null,
"requires_python": ">=3.9.0",
"maintainer_email": null,
"keywords": null,
"author": "google",
"author_email": "kubeflow-pipelines@google.com",
"download_url": "https://files.pythonhosted.org/packages/78/92/5df7b43e06329132d15e43eaae75518b7e9b785cfa72c5cde1f902be88a3/kfp-kubernetes-1.4.0.tar.gz",
"platform": null,
"description": "# Kubeflow Pipelines SDK kfp-kubernetes API Reference\n\nThe Kubeflow Pipelines SDK kfp-kubernetes python library (part of the [Kubeflow Pipelines](https://www.kubeflow.org/docs/components/pipelines/) project) is an addon to the [Kubeflow Pipelines SDK](https://kubeflow-pipelines.readthedocs.io/) that enables authoring Kubeflow pipelines with Kubernetes-specific features and concepts, such as:\n\n* [Secrets](https://kubernetes.io/docs/concepts/configuration/secret/)\n* [PersistentVolumeClaims](https://kubernetes.io/docs/concepts/storage/persistent-volumes/#persistentvolumeclaims)\n* [ImagePullPolicies](https://kubernetes.io/docs/concepts/containers/images/#image-pull-policy)\n* [Ephemeral volumes](https://kubernetes.io/docs/concepts/storage/ephemeral-volumes/)\n* [Node selectors](https://kubernetes.io/docs/concepts/scheduling-eviction/assign-pod-node/#nodeselector)\n* [Tolerations](https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/)\n* [Labels and annotations](https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/object-meta/#ObjectMeta)\n* and more\n\nBe sure to check out the full [API Reference](https://kfp-kubernetes.readthedocs.io/) for more details.\n\n## Installation\nThe `kfp-kubernetes` package can be installed as a KFP SDK extra dependency.\n```sh\npip install kfp[kubernetes]\n```\n\nOr installed independently:\n```sh\npip install kfp-kubernetes\n```\n\n## Getting started\n\nThe following is an example of a simple pipeline that uses the kfp-kubernetes library to mount a pre-existing secret as an environment variable available in the task's container.\n\n### Secret: As environment variable\n```python\nfrom kfp import dsl\nfrom kfp import kubernetes\n\n@dsl.component\ndef print_secret():\n import os\n print(os.environ['SECRET_VAR'])\n\n@dsl.pipeline\ndef pipeline():\n task = print_secret()\n kubernetes.use_secret_as_env(task,\n secret_name='my-secret',\n secret_key_to_env={'password': 'SECRET_VAR'})\n```\n\n## Other examples\n\nHere is a non-exhaustive list of some other examples of how to use the kfp-kubernetes library. Be sure to check out the full [API Reference](https://kfp-kubernetes.readthedocs.io/) for more details.\n\n### Secret: As mounted volume\n```python\nfrom kfp import dsl\nfrom kfp import kubernetes\n\n@dsl.component\ndef print_secret():\n with open('/mnt/my_vol') as f:\n print(f.read())\n\n@dsl.pipeline\ndef pipeline():\n task = print_secret()\n kubernetes.use_secret_as_volume(task,\n secret_name='my-secret',\n mount_path='/mnt/my_vol')\n```\n\n### Secret: As optional source for a mounted volume\n```python\nfrom kfp import dsl\nfrom kfp import kubernetes\n\n@dsl.component\ndef print_secret():\n with open('/mnt/my_vol') as f:\n print(f.read())\n\n@dsl.pipeline\ndef pipeline():\n task = print_secret()\n kubernetes.use_secret_as_volume(task,\n secret_name='my-secret',\n mount_path='/mnt/my_vol'\n optional=True)\n```\n\n### ConfigMap: As environment variable\n```python\nfrom kfp import dsl\nfrom kfp import kubernetes\n\n@dsl.component\ndef print_config_map():\n import os\n print(os.environ['CM_VAR'])\n\n@dsl.pipeline\ndef pipeline():\n task = print_config_map()\n kubernetes.use_config_map_as_env(task,\n config_map_name='my-cm',\n config_map_key_to_env={'foo': 'CM_VAR'})\n```\n\n### ConfigMap: As mounted volume\n```python\nfrom kfp import dsl\nfrom kfp import kubernetes\n\n@dsl.component\ndef print_config_map():\n with open('/mnt/my_vol') as f:\n print(f.read())\n\n@dsl.pipeline\ndef pipeline():\n task = print_config_map()\n kubernetes.use_config_map_as_volume(task,\n config_map_name='my-cm',\n mount_path='/mnt/my_vol')\n```\n\n### ConfigMap: As optional source for a mounted volume\n```python\nfrom kfp import dsl\nfrom kfp import kubernetes\n\n@dsl.component\ndef print_config_map():\n with open('/mnt/my_vol') as f:\n print(f.read())\n\n@dsl.pipeline\ndef pipeline():\n task = print_config_map()\n kubernetes.use_config_map_as_volume(task,\n config_map_name='my-cm',\n mount_path='/mnt/my_vol',\n\t\t\t\t optional=True)\n```\n\n\n\n### PersistentVolumeClaim: Dynamically create PVC, mount, then delete\n```python\nfrom kfp import dsl\nfrom kfp import kubernetes\n\n@dsl.component\ndef make_data():\n with open('/data/file.txt', 'w') as f:\n f.write('my data')\n\n@dsl.component\ndef read_data():\n with open('/reused_data/file.txt') as f:\n print(f.read())\n\n@dsl.pipeline\ndef my_pipeline():\n pvc1 = kubernetes.CreatePVC(\n # can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC\n pvc_name_suffix='-my-pvc',\n access_modes=['ReadWriteOnce'],\n size='5Gi',\n storage_class_name='standard',\n )\n\n task1 = make_data()\n # normally task sequencing is handled by data exchange via component inputs/outputs\n # but since data is exchanged via volume, we need to call .after explicitly to sequence tasks\n task2 = read_data().after(task1)\n\n kubernetes.mount_pvc(\n task1,\n pvc_name=pvc1.outputs['name'],\n mount_path='/data',\n )\n kubernetes.mount_pvc(\n task2,\n pvc_name=pvc1.outputs['name'],\n mount_path='/reused_data',\n )\n\n # wait to delete the PVC until after task2 completes\n delete_pvc1 = kubernetes.DeletePVC(\n pvc_name=pvc1.outputs['name']).after(task2)\n```\n\n### PersistentVolumeClaim: Create PVC on-the-fly tied to your pod's lifecycle\n```python\nfrom kfp import dsl\nfrom kfp import kubernetes\n\n@dsl.component\ndef make_data():\n with open('/data/file.txt', 'w') as f:\n f.write('my data')\n\n@dsl.pipeline\ndef my_pipeline():\n task1 = make_data()\n # note that the created pvc will be autoamatically cleaned up once pod disappeared and cannot be shared between pods\n kubernetes.add_ephemeral_volume(\n task1,\n volume_name=\"my-pvc\",\n mount_path=\"/data\",\n access_modes=['ReadWriteOnce'],\n size='5Gi',\n )\n```\n\n### Pod Metadata: Add pod labels and annotations to the container pod's definition\n```python\nfrom kfp import dsl\nfrom kfp import kubernetes\n\n\n@dsl.component\ndef comp():\n pass\n\n\n@dsl.pipeline\ndef my_pipeline():\n task = comp()\n kubernetes.add_pod_label(\n task,\n label_key='kubeflow.com/kfp',\n label_value='pipeline-node',\n )\n kubernetes.add_pod_annotation(\n task,\n annotation_key='run_id',\n annotation_value='123456',\n )\n```\n\n### Kubernetes Field: Use Kubernetes Field Path as enviornment variable\n```python\nfrom kfp import dsl\nfrom kfp import kubernetes\n\n\n@dsl.component\ndef comp():\n pass\n\n\n@dsl.pipeline\ndef my_pipeline():\n task = comp()\n kubernetes.use_field_path_as_env(\n task,\n env_name='KFP_RUN_NAME',\n field_path=\"metadata.annotations['pipelines.kubeflow.org/run_name']\"\n )\n```\n\n### Timeout: Set timeout in seconds defined as pod spec's activeDeadlineSeconds\n```python\nfrom kfp import dsl\nfrom kfp import kubernetes\n\n@dsl.component\ndef comp():\n pass\n\n@dsl.pipeline\ndef my_pipeline():\n task = comp()\n kubernetes.set_timeout(task, 20)\n```\n\n### ImagePullPolicy: One of \"Always\" \"Never\", \"IfNotPresent\".\n```python\nfrom kfp import dsl\nfrom kfp import kubernetes\n\n@dsl.component\ndef simple_task():\n print(\"hello-world\")\n\n@dsl.pipeline\ndef pipeline():\n task = simple_task()\n kubernetes.set_image_pull_policy(task, \"Always\")\n```\n",
"bugtrack_url": null,
"license": "Apache 2.0",
"summary": "Kubernetes platform configuration library and generated protos.",
"version": "1.4.0",
"project_urls": {
"Bug Tracker": "https://github.com/kubeflow/pipelines/issues",
"Documentation": "https://kfp-kubernetes.readthedocs.io/",
"Homepage": "https://github.com/kubeflow/pipelines",
"Source": "https://github.com/kubeflow/pipelines/tree/master/kubernetes_platform/python"
},
"split_keywords": [],
"urls": [
{
"comment_text": "",
"digests": {
"blake2b_256": "78925df7b43e06329132d15e43eaae75518b7e9b785cfa72c5cde1f902be88a3",
"md5": "ea1ee74c70cffcf497154cae57c97a02",
"sha256": "bbb2ce9230be2cbf8b344897a518cc8debe008e153277675b16fd4634e81f358"
},
"downloads": -1,
"filename": "kfp-kubernetes-1.4.0.tar.gz",
"has_sig": false,
"md5_digest": "ea1ee74c70cffcf497154cae57c97a02",
"packagetype": "sdist",
"python_version": "source",
"requires_python": ">=3.9.0",
"size": 43663,
"upload_time": "2024-11-14T22:17:56",
"upload_time_iso_8601": "2024-11-14T22:17:56.427664Z",
"url": "https://files.pythonhosted.org/packages/78/92/5df7b43e06329132d15e43eaae75518b7e9b785cfa72c5cde1f902be88a3/kfp-kubernetes-1.4.0.tar.gz",
"yanked": false,
"yanked_reason": null
}
],
"upload_time": "2024-11-14 22:17:56",
"github": true,
"gitlab": false,
"bitbucket": false,
"codeberg": false,
"github_user": "kubeflow",
"github_project": "pipelines",
"travis_ci": false,
"coveralls": false,
"github_actions": true,
"lcname": "kfp-kubernetes"
}