Guides and Best Practices Tech Deep Dives

Simplify Dask Deployments with Tanzu Kubernetes Grid and Helm

Machine Learning (ML) can greatly enhance cloud-native applications to make them proactive and intelligent. But this power comes at a cost: ML is very compute-intensive and often requires distributed compute platforms to train models at the speed of the business.

In this article, I demonstrate how to deploy Dask (parallel compute for ML workloads) on Tanzu Kubernetes Grid (TKG) clusters in a simple and reproducible manner — as easily as installing a Python environment. Running Dask on TKG clusters helps development teams train their models faster by harnessing available datacenter compute power.

Why Dask?

Dask natively scales Python. It provides advanced parallelism for analytics, enabling performance at scale for many data-science and analytics tools.

Dask offers three critical advantages to data-science and IT ops teams:

  • Dask gets implemented in Python and natively extends a significant portion of NumPy, Pandas, and scikit-learn. This translates into a minimum learning curve for most data scientists.
  • Dask can handle large datasets (TBs or more) by distributing the workload among Kubernetes (K8s) pods, allowing data-science teams to harness the stability and abundance of computing power available in the datacenter. 
  • Dask can be deployed and updated on K8s using Helm charts, which simplifies IT ops tasks, such as deployment, security, configuration management, and updates. Helm charts provide the means to deliver Dask clusters on the fly with one command-line instruction.

At VMware, we continue enhancing our Tanzu Kubernetes Grid (TKG) to run all types of containerized workloads (ML included), orchestrated by K8s. Below, I explain the steps our IT ops and data scientists use to deploy Dask on a TKG cluster that resides on an on-premise datacenter (such as VMware Cloud Foundation) or in a public cloud (such as VMware Cloud on either AWS, Azure, or GCP).

1)   Set up ML-ready cloud infrastructure

The beauty of K8s is that it allows you to run containerized applications on any virtual or cloud infrastructure. Nevertheless, you have to consider that technology companies continuously develop compute, networking, and storage accelerators that speed up ML workloads. For example, VMware and Intel have developed the Hybrid Cloud Analytics Solution (HCAS), which provides a reference architecture with the hardware and software components that you need to stand up a hybrid cloud infrastructure ready for ML. So, if you plan to expand or renew your IT infrastructure for ML, you may use HCAS as a reference point.

2)   Make the Tanzu Kubernetes Grid (TKG) K8s cluster Dask-ready

Most of the time, a K8s cluster running in a datacenter will provide more extensive and predictable computing power than a personal computer. By running Dask clusters from the datacenter, data scientists can free up their personal computers for other types of work. That said, it is faster to run heavy computations (such as ML training) on servers optimized to run ML workloads.

Creating K8s clusters is very easy with Tanzu. (Get the details in this Create Tanzu Kubernetes Clusters document). If you’d like to try Dask on K8s on a single machine (such as a laptop), I recommend checking out MicroK8s or Docker Desktop.

If you have a K8s cluster available, here are the steps you’ll need to deploy Dask on it:

Create a K8s namespace to keep your Dask pods organized. For example, using the namespace “dask,” as per below:

kubectl create namespace dask

Define an RBAC to deploy Dask Helm charts. When using TKG, you’ll have to define a new cluster role to deploy the Helm chart on your K8s cluster. Below, you can see the role.yml file that defines such a role. I suggest reading the full detail about RBAC for Tanzu K8s clusters and how to tighten the RBAC security level to meet your organization’s policies.

role.yaml

apiVersion: policy/v1beta1
kind: PodSecurityPolicy
metadata:
 name: kubeapps-psp
spec:
 privileged: true
 seLinux:
  rule: RunAsAny
 supplementalGroups:
  rule: RunAsAny
 runAsUser:
  rule: RunAsAny
 fsGroup:
  rule: RunAsAny
 volumes:
  - '*'
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
 name: kubeapps-clusterrole
rules:
 - apiGroups:
    - policy
   resources:
    - podsecuritypolicies
   verbs:
    - use
   resourceNames:
    - kubeapps-psp
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
 name: kubeapps-clusterrole
 namespace: dask
roleRef:
 apiGroup: rbac.authorization.k8s.io
 kind: ClusterRole
 name: kubeapps-clusterrole
subjects:
 - apiGroup: rbac.authorization.k8s.io
   kind: Group
   name: system:serviceaccounts
 - kind: ServiceAccount
   name: default
   namespace: dask

Next, you apply the RBAC definition.

# Mind the namespace indicated by the “-n dask” option
kubeclt -n dask apply -f role.yml

Optional: pull the Dask container images to your local registry. There are cases where IT ops deploys a local repository, such as Harbor, that secures artifacts with policies and role-based access control, ensures images are scanned and free from vulnerabilities, and signs images as trusted. For example, at VMware, we use Harbor, so I need to pull the Dask images from Docker Hub into Harbor before I can pull them from my K8s clusters. If you prefer to pull directly from Docker Hub, you can ignore this step.

Here is an example of how to pull the Dask images tagged as 2021.5.1. Change the domain name to match the fully qualified name of your organization:

# Pull the image for Dask workers and scheduler
docker pull harbor.vmware.com/cache/daskdev/dask:2021.5.1

# Pull the image for the jupyter lab dev env
docker pull harbor.vmware.com/cache/daskdev/dask-notebook:2021.5.1

Notice that you need two images — one for the Dask scheduler and workers and another for a Jupyter Lab pod that gets deployed by the Helm chart. I strongly recommend that your data-science personnel use that Jupyter Lab environment for development, to ensure that Python libraries versions properly match.  Version alignment is essential to prevent all kinds of API compatibility issues.

Optional: Customize the Dask Helm chart’s values.yaml file. The values.yaml file allows you to customize the way Dask gets deployed. I encourage you to get familiar with it to learn about the things you can customize. Here is a version of values.yaml that I used to deploy Dask in my environment:

# nameOverride: dask
# fullnameOverride: dask

scheduler:
  name: scheduler  # Dask scheduler name.
  enabled: true  # Enable/disable scheduler.
  image:
    # Container image repository.
    repository: "daskdev/dask"  
    tag: 2021.5.1  # Container image tag.
    pullPolicy: IfNotPresent  # Container image pull policy.
    # Container image [pull secrets]
    # https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-
    # private-registry/
    #  - name: regcred
    pullSecrets:  
  replicas: 1  # Number of schedulers (should always be 1).
  # Scheduler service type. `LoadBalancer` exposes it  outside your cluster
  # serviceType: "NodePort"
  # serviceType: "LoadBalancer"
  serviceType: "LoadBalancer" 
  # Some cloud providers allow you to specify the loadBalancerIP 
  # when using the `LoadBalancer` service type. 
  # If your cloud does not support it this option will be ignored.  
  loadBalancerIP: null
  servicePort: 8786 # Scheduler service internal port.
  serviceAnnotations: {} # Scheduler service annotations.
  extraArgs: [] # Extra CLI arguments to be passed to the scheduler
    # - --preload
    # - scheduler-setup.py
  # Scheduler pod resources. See `values.yaml` for example values
  resources: {}
  #  limits:
  #    cpu: 1.8
  #    memory: 6G
  #  requests:
  #    cpu: 1.8
  #    memory: 6G
  tolerations: []  # Tolerations.
  affinity: {}  # Container affinity.
  nodeSelector: {}  # Node Selector.
  securityContext: {}  # Security Context.
  # serviceAccountName: ""
  metrics:
    # Enable scheduler metrics. Pip package [prometheus-client]
    # https://pypi.org/project/prometheus-client/ 
    # should be present on scheduler
    enabled: false 
    serviceMonitor:
      enabled: false  # Enable scheduler servicemonitor.
      # Deploy servicemonitor in different namespace, e.g. monitoring.
      namespace: "" 
      # Select  namespaces the Endpoints objects are discovered from.
      namespaceSelector: {} 
      # Default: scrape .Release.Namespace only
      # To scrape all, use the following:
      # namespaceSelector:
      #   any: true
      interval: 30s # Interval at which metrics should be scraped.
      jobLabel: "" # The label to use to retrieve the job name from.
      # TargetLabels transfers labels on the K8s Service onto the target
      targetLabels: [] 
      # MetricRelabelConfigs to apply to samples before ingestion.
      metricRelabelings: [] 

webUI:
  name: webui  # Dask webui name.
  servicePort: 80 # webui service internal port.
  ingress:
    enabled: false  # Enable ingress.
    tls: false  # Ingress should use TLS.
    # secretName: dask-scheduler-tls
    hostname: dask-ui.example.com  # Ingress hostname.
    annotations:  # Ingress annotations. See `values.yaml` for example values
      # kubernetes.io/ingress.class: "nginx"
      # secretName: my-tls-cert
      # kubernetes.io/tls-acme: "true"

worker:
  name: worker  # Dask worker name.
  image:
    # Container image repository.
    repository: "daskdev/dask"  
    tag: 2021.5.1  # Container image tag.
    pullPolicy: IfNotPresent  # Container image pull policy.
    # Dask worker command. E.g `dask-cuda-worker` for GPU worker.
    dask_worker: "dask-worker"  
    # Container image [pull secrets]
    # https://kubernetes.io/docs/tasks/configure-pod-container/pull-image
    # -private-registry/
    #  - name: regcred
    pullSecrets: 
  replicas: 3  # Number of workers.
  strategy:
    type: RollingUpdate  # Strategy used to replace old Pods with new ones.
  # connect to already existing scheduler, deployed not by this chart.  
  custom_scheduler_url: null
  default_resources:  # overwritten by resource limits if they exist
    cpu: 1  # Default CPU (DEPRECATED use `resources`).
    memory: "4GiB"  # Default memory (DEPRECATED use `resources`).
  env:  # Environment variables. See `values.yaml` for example values.
  #  - name: EXTRA_APT_PACKAGES
  #    value: build-essential openssl
  #  - name: EXTRA_CONDA_PACKAGES
  #    value: numba xarray -c conda-forge
    - name: EXTRA_PIP_PACKAGES
      value: s3fs mimesis dask-ml xgboost pyarrow seaborn hvplot --upgrade
  extraArgs: [] # Extra CLI arguments to be passed to the worker
    # - --preload
    # - worker-setup.py
  resources: {}  # Worker pod resources. See `values.yaml` for example values
  #  limits:
  #    cpu: 1
  #    memory: 3G
  #    nvidia.com/gpu: 1
  #  requests:
  #    cpu: 1
  #    memory: 3G
  #    nvidia.com/gpu: 1
  # Worker Pod volumes and volume mounts, 
  # mounts.volumes follows kuberentes api v1
  # Volumes spec. mounts.volumeMounts follows 
  # kubernetesapi v1 VolumeMount spec
  mounts: {} 
  #  volumes:
  #    - name: data
  #      emptyDir: {}
  #  volumeMounts:
  #    - name: data
  #      mountPath: /data
  annotations: {}  # Annotations
  tolerations: []  # Tolerations.
  affinity: {}  # Container affinity.
  nodeSelector: {}  # Node Selector.
  securityContext: {}  # Security Context.
  # serviceAccountName: ""
  # port: ""
  portDashboard: 8790 # Worker dashboard and metrics port.
  # This option overrides "--nthreads" on workers, which defaults to 
  # resources.limits.cpu / default_resources.limits.cpu
  # use it if you need to limit the # of threads used by multicore workers
  # or to make workers with non-whole-number cpu limits
  # threads_per_worker: 1
  metrics:
    # Enable workers metrics. Pip package [prometheus-client]
    # https://pypi.org/project/prometheus-client should be present on workers
    enabled: false 
    podMonitor:
      enabled: false # Enable workers podmonitor
      namespace: "" # Deploy podmonitor in different namespace
      # Selector to select which namespaces the 
      # Endpoints objects are discovered from.
      namespaceSelector: {} 
      # Default: scrape .Release.Namespace only
      # To scrape all, use the following:
      # namespaceSelector:
      #   any: true
      interval: 30s # Interval at which metrics should be scraped.
      jobLabel: "" # The label to use to retrieve the job name from.
      # PodTargetLabels transfers labels on the K8S Pod onto the target.
      podTargetLabels: [] 
      # MetricRelabelConfigs to apply to samples before ingestion.
      metricRelabelings: [] 

jupyter:
  name: jupyter  # Jupyter name.
  enabled: true  # Enable/disable the bundled Jupyter notebook.
  # Create RBAC service account and role to allow Jupyter pod 
  # to scale worker pods and access logs.
  rbac: true  
  image:
    # Container image repository.
    repository: "daskdev/dask-notebook"  
    tag: 2021.5.1  # Container image tag.
    pullPolicy: IfNotPresent  # Container image pull policy.
    # Container image [pull secrets]
    # https://kubernetes.io/docs/tasks/configure-pod-container/
    # pull-image-private-registry/
    pullSecrets:  
    #  - name: regcred
    #
  replicas: 1  # Number of notebook servers.
  # Scheduler service type. `LoadBalancer` exposes it outside of your cluster
  serviceType: "LoadBalancer" 
  # serviceType: "NodePort"
  # serviceType: "LoadBalancer"
  servicePort: 80  # Jupyter service internal port.
  # This hash corresponds to the password 'dask'
  password: 'sha1:aae8550c0a44:9507d45e087d5ee481a5ce9f4f16f37a0867318c' 
  # Password hash. Default hash corresponds to the password `dask`.
  env:  # Environment variables. See `values.yaml` for example values.
  #  - name: EXTRA_CONDA_PACKAGES
  #    value: "numba xarray -c conda-forge"
    - name: EXTRA_PIP_PACKAGES
      value: s3fs mimesis dask-ml xgboost pyarrow seaborn hvplot --upgrade
  command: null  # Container command.
  args:  # Container arguments.
  #  - "start.sh"
  #  - "jupyter"
  #  - "lab"
  extraConfig: |-
    # Extra Jupyter config goes here
    # E.g
    # c.NotebookApp.port = 8888
  resources: {} # Jupyter pod resources. See `values.yaml` for example values
  #  limits:
  #    cpu: 2
  #    memory: 6G
  #  requests:
  #    cpu: 2
  #    memory: 6G
  # Worker Pod volumes and volume mounts, 
  # mounts.volumes follows kuberentes api v1
  # Volumes spec. mounts.volumeMounts follows 
  # kubernetesapi v1 VolumeMount spec
  mounts: {} 
  #  volumes:
  #    - name: data
  #      emptyDir: {}
  #  volumeMounts:
  #    - name: data
  #      mountPath: /data
  tolerations: []  # Tolerations.
  affinity: {}  # Container affinity.
  nodeSelector: {}  # Node Selector.
  securityContext: {}  # Security Context.
  serviceAccountName: "dask-jupyter"  # Service account for use with RBAC
  ingress:
    enabled: false  # Enable ingress.
    tls: false  # Ingress should use TLS.
    # secretName: dask-jupyter-tls
    hostname: dask-jupyter.example.com  # Ingress hostname.
    annotations:  # Ingress annotations. See `values.yaml` for example values
      # kubernetes.io/ingress.class: "nginx"
      # secretName: my-tls-cert
      # kubernetes.io/tls-acme: "true"

For the purposes of this article, please pay special attention to the following sections:

You need to define the list of pip packages to be installed both in the workers and in the Jupyter client. Here is a list of packages I find convenient to include with Dask:

  • s3fs, pyarrow and mimesis to use S3-compatible storage with the cluster
  • dask-ml to get access to Dask ML modules
  • seaborn and hvplot to have Dask data-frame-compatible plotting capabilities, similar to what’s available for Pandas.

Here is the code block. Make sure you have the same list for the worker pods and for the Jupyter lab client pod:

- name: EXTRA_PIP_PACKAGES
 value: s3fs mimesis dask-ml pyarrow seaborn hvplot -
-upgrade

If you are using a private container repository, such as Harbor, you need to change the image and tag entries to match your environment’s setup. For example:

# Image for worker pods and scheduler
repository: "harbor.vmware.com/cache/daskdev/dask" 
tag: 2021.5.1

# Image for the jupyter-lab pod
repository: "harbor.vmware.com/cache/daskdev/dask-notebook" 
tag: 2021.5.1

You may want to expose Dask services using a K8s LoadBalancer service. You can do that by using defining every “serviceType” component as “LoadBalancer:”

serviceType: "LoadBalancer" # Scheduler service type…

Deploy the Helm chart

Now you’re ready to deploy the Dask Helm chart. You can find the list of available charts at https://helm.dask.org/. In this article, we’ll be deploying the Dask chart version 2021.5.1 with Helm 3.

Add the Dask repo and update the repos list:

helm repo add dask https://helm.dask.org
helm repo update

Install the Helm chart’s version of preference. For example:

helm install --namespace dask --version 2021.5.1 dask-2021-5-1 dask/dask -f values.yaml

Mind that “dask-2021-5-1” is the “release” name, and it’s up to you to pick the name. You will get a text with the details about the deployment, such as the below:

NAME: dask-2021-5-1
LAST DEPLOYED: Wed Jun 2 12:30:25 2021
NAMESPACE: dask
STATUS: deployed
REVISION: 1
TEST SUITE: None
NOTES:
Thank you for installing DASK, released at name: dask-2021-5-1.

To learn more about the release, try:

 $ helm status dask-2021-5-1 # information about running pods 
   and this message
 $ helm get dask-2021-5-1 # get full Kubernetes specification

This release includes a Dask scheduler, 3 Dask workers, and 1 
Jupyter servers.

The Jupyter notebook server and Dask scheduler expose external 
services to which you can connect to manage notebooks, or connect 
directly to the Dask cluster. You can get these addresses by 
running the following:

 export DASK_SCHEDULER=$(kubectl get svc --namespace dask 
 dask-2021-5-1-scheduler -o 
 jsonpath='{.status.loadBalancer.ingress[0].ip}')
 export DASK_SCHEDULER_UI_IP=$(kubectl get svc --namespace dask 
 dask-2021-5-1-scheduler -o 
 jsonpath='{.status.loadBalancer.ingress[0].ip}')
 export DASK_SCHEDULER_PORT=8786
 export DASK_SCHEDULER_UI_PORT=80

 export JUPYTER_NOTEBOOK_IP=$(kubectl get svc --namespace dask 
 dask-2021-5-1-jupyter -o jsonpath='{.status.loadBalancer.ingress
 [0].ip}')
 export JUPYTER_NOTEBOOK_PORT=80

 echo tcp://$DASK_SCHEDULER:$DASK_SCHEDULER_PORT -- Dask Client 
 connection
 echo http://$DASK_SCHEDULER_UI_IP:$DASK_SCHEDULER_UI_PORT -- 
 Dask dashboardI’
 echo http://$JUPYTER_NOTEBOOK_IP:$JUPYTER_NOTEBOOK_PORT -- 
Jupyter notebook

You can verify the deployment status with the following command. Your K8s cluster might need several minutes to show the pod’s status as “running.”

kubectl -n dask get pods
# NAME                                     READY STATUS   RESTARTS AGE
# dask-2021-5-1-jupyter-7b56d4cdc7-q8jsn   1/1   Running  0        4d23h
# dask-2021-5-1-scheduler-5d8f7d7c64-2cv7m 1/1   Running  0        4d23h
# dask-2021-5-1-worker-7b9b866f6d-7w8d5    1/1   Running  0        4d23h
# dask-2021-5-1-worker-7b9b866f6d-fs9l6    1/1   Running  0        4d23h
# dask-2021-5-1-worker-7b9b866f6d-t8pgr    1/1   Running  0        4d23h

You can verify the status of the network services (LoadBalancer) that expose the external IPs for the Jupyter server and the scheduler as below:

kubectl -n dask get service
# dask-2021-5-1-scheduler  LoadBalancer  10.110.99.16   10.186.148.60   8786:32425/TCP, 80:31939/TCP

At the “External-IP” column you get the addresses that matter to the Dask cluster user:

  • The Jupyter Lab IP in this case is 10.186.148.188. Users should point their browsers to that IP (using http) and use the “dask” default password to log in to the dev environment.
  • The scheduler IP in this case is 10.186.148.60:8786 (mind the port). Specify this from your Python scripts Run the following lines of code to use this specific cluster.
import dask
from distributed import Client

client = Client('10.186.148.60:8786')
  • Finally, the IP:port combination 10.186.148.60:80 would give users access to the Dask dashboard to monitor the execution of tasks that the cluster is performing. Dask offers a lot of views that can be useful to monitor the tasks flow and the workload distribution among the workers.

Optional: Configure an S3-compatible storage service on Kubernetes to use with Dask.

The Dask client and workers will need access to a shared storage server to process the data files in conjunction. Dask supports multiple remote data options. You may easily configure a S3-compatible service using Minio, which integrates with VMware vSAN and K8s (Tanzu). The details about configuring Minio in a K8s cluster can be found here.

Once Minio is configured, you may use S3 storage buckets using Python code similar to the following:

import dask.dataframe as dd

# Configure the S3 service access dict
storage_options={
    "key": "your-S3-access-key",
    "secret": "your-S3-secret-key",
    "client_kwargs": {
        "endpoint_url": "http://minio-server-name:9000",
        'verify': False,
    }
}

# Read a .csv file into a data frame
df = dd.read_csv('s3://s3-bucket/data-file.csv', 
storage_options=storage_options)

# Serialize a data frame using parquet format. 
df.to_parquet('s3://s3-bucket', engine='pyarrow', 
storage_options=storage_options)

# Load data frame from parket files 
df = dd.read_parquet('s3://s3-bucket', engine='pyarrow', 
storage_options=storage_options)

NOTE:

  • Experiments showed us that reading from parquet files is between 4x and 10x faster than using .csv files.
  • Consider loading the data frame from parquet files prior to the exploratory and ML training tasks.

This is all you need to get Dask running on K8s! It’s as simple as installing a Python environment.

Future work

There are a couple of other upcoming blog articles around Dask:

  • For data scientists: An end-to-end model development example that illustrates how to use Dask for data loading, exploration, and transformation in for a mid-size dataset. The example will also include the training process of linear and neural models with their respective hyperparameter tuning.  
  • For IT ops teams: How to use VMware Kubeapps to deploy a Dask Helm chart on a TKG cluster using only a few clicks, instead of command-line steps. Dask deployments with Kubeapps is easier and faster.

Comments

Leave a Reply

Your email address will not be published. Required fields are marked *