Machine Learning in Dask

In this piece, we’ll see how we can use Dask to work with large datasets on our local machines.



Processing a couple of gigabytes of data on one's laptop is usually an uphill task, unless the laptop has high RAM and a whole lot of compute power.

That notwithstanding, data scientists still have to look for alternative solutions to deal with this problem. Some of the hacks involve tweaking Pandas to enable it to process huge datasets, buying a GPU machine, or purchasing compute power on the cloud. In this piece, we’ll see how we can use Dask to work with large datasets on our local machines.

 

Dask and Python

 
Dask is a flexible library for parallel computing in Python. It’s built to integrate nicely with other open-source projects such as NumPy, Pandas, and scikit-learn. In Dask, Dask arrays are the equivalent of NumPy Arrays, Dask DataFrames the equivalent of Pandas DataFrames, and Dask-ML the equivalent of scikit-learn.

These similarities make it very easy to adopt Dask into your workflow. The advantage of using Dask is that you can scale computations to multiple cores on your computer. This enables you to work on large datasets that don’t fit into memory. It also aids in speeding up computations that would ordinarily take a long time.

 

Dask DataFrames

 
When loading in huge data, Dask will usually read in a sample of the data in order to infer the data types. This will mostly lead to issues if a given column has different data types. In order to avoid type errors, it’s usually good practice to declare the data types beforehand. Dask is able to load huge files by cutting it up into chunks, as defined by the blocksize parameter.

data_types ={'column1': str,'column2': float}
df = dd.read_csv(“data,csv”,dtype = data_types,blocksize=64000000 )


 

Commands in a Dask DataFrame are mostly similar to the ones in Pandas. For example, getting the head and tail is similar:


df.head()
df.tail()


Functions on the DataFrame are run lazily. This means that they aren’t computed until the compute function is called.

df.isnull().sum().compute()


Since the data is loaded in partitions, some Pandas functions such as sort_values() will fail. The workaround is to use the nlargest() function.

 

Dask Clusters

 
Parallel computation is key in Dask, as it allows one to run computations on multiple cores. Dask provides a machine scheduler that works on a single machine. It does not scale. It also offers a distributed scheduler that can scale to multiple machines.

Using dask.distributed requires that you set up a Client. This should be the first thing you do if you intend to use dask.distributed in your analysis. It offers low latency, data locality, data sharing between the workers, and is easy to set up.

from dask.distributed import Client
client = Client()


Using dask.distributed is advantageous even on a single machine, because it offers some diagnostic features via a dashboard.

Failure to declare a Client will leave you using the single machine scheduler by default. It provides parallelism on a single computer by using processes or threads.

 

Dask ML

 
Dask also enables you to perform machine learning training and prediction in a parallel manner. The goal of dask-ml is to offer machine learning that’s scalable. When you declare n_jobs = -1 in scikit-learn, you can run your computations in parallel. Dask utilizes this capability in order to enable you to distribute this compute in a cluster. This is done with the help of joblib, a package that allows for parallelism and pipelining in Python. Using Dask ML, you can implement scikit-learn models as well as other libraries such as XGboost.

This is what a simple implementation would look like.

First import train_test_split as usual for splitting your data into training and testing sets.

from dask_ml.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)


Next, import the model you’d like to use and instantiate it.

from sklearn.ensemble import RandomForestClassifier
model = RandomForestClassifier(verbose=1)


You then have to import joblib to enable parallel computation.

import joblib


Next, run your training and prediction using the parallel backend.

from sklearn.externals.joblib import parallel_backend
with parallel_backend(‘dask’):
 model.fit(X_train,y_train)
 predictions = model.predict(X_test)


Machine learning is rapidly moving closer to where data is collected — edge devices. Subscribe to the Fritz AI Newsletter to learn more about this transition and how it can help scale your business.

 

Limitations and Memory Usage

 
Individual tasks in Dask can’t run in parallel. The workers are Python processes that inherit all the advantages and disadvantages of Python computations. Care should also be taken when working in a distributed environment to ensure data security and privacy.

Dask has a central scheduler that tracks data on worker nodes and on the cluster. This scheduler also controls the freeing of data from the cluster. Once a task is completed it clears it from the memory in order to free up memory for other tasks. If something is needed by a certain client, or if it’s important for ongoing computations, it’s held in memory.

Another Dask limitation is that it doesn’t implement all functions in Pandas. The Pandas interface is large, so Dask doesn’t implement it in its entirety. This means that trying some of these operations on Dask can be an uphill climb. Also, the operations that are slow on Pandas are also slow on Dask.

 

When you wouldn’t need a Dask DataFrame

 
In the following situations, you may not need Dask:

  • When there are Pandas functions you need that haven’t been implemented in Dask.
  • When your data fits perfectly into your computer’s memory.
  • When your data isn’t in tabular form. In this case, try dask.bag or dask.array.

 

Final Thoughts

 
In this article, we have seen how we can use Dask to work with a huge dataset on our local machine or in a distributed manner. We’ve seen that we can use Dask because of its familiar syntax and ability to scale. It has the ability to scale to thousands of cores.

We’ve also seen that we can use it in machine learning for training and running predictions. You can learn more by checking out these presentations in the official documentation:

Presentations On Dask - Dask 2.15.0 documentation

 
Bio: Derrick Mwiti is a data analyst, a writer, and a mentor. He is driven by delivering great results in every task, and is a mentor at Lapid Leaders Africa.

Original. Reposted with permission.

Related: