If You Can Write Functions, You Can Use Dask

This article is the second article of an ongoing series on using Dask in practice. Each article in this series will be simple enough for beginners, but provide useful tips for real work. The first article in the series is about using LocalCluster.



By Hugo Shi, Founder of Saturn Cloud

I’ve been chatting with many data scientists who’ve heard of Dask, the Python framework for distributed computing, but don’t know where to start. They know that Dask can probably speed up many of their workflows by having them run in parallel across a cluster of machines, but the task of learning a whole new methodology can seem daunting. I’m here to tell you that you can start getting value from Dask without having to learn the entire framework. If you spend time waiting for notebook cells to execute, there’s a good chance Dask can save you time. Even if you only know how to write Python functions, you can take advantage of this without learning anything else! This blog post is a “how to use Dask without learning the whole thing” tutorial.

 

Dask, dataframes, bags, arrays, schedulers, workers, graphs, RAPIDS, oh no!

 
 
There are a lot of complicated content pieces out there about Dask, which can be overwhelming. This is because Dask can utilize a cluster of worker machines to do many cool things! But forget about all that for now. This article focuses on simple techniques that can save you time, without having to change much about how you work.
 

For loops and functions

 
 
Pretty much every data scientist has done something like this, where you have a set of dataframes stored in separate files and you use a for loop to read them all, do some logic, then combine them:

results = []
for file in files:
    defer = pd.read_csv(file)

    ## begin genius algorithm

    brilliant_features = []
    for feature in features:
        brilliant_features.append(compute_brilliant_feature(df, feature))
    magical_business_insight = make_the_magic(brilliant_features)

    results.append(magical_business_insight)

 

Over time, you end up with more files, or the genius_algorithm gets more complicated and takes longer to run. And you end up waiting. And waiting.


 

Step 1 is to encapsulate your code in a function. You want to encapsulate the stuff that goes inside the for loop. This makes it easier to understand what the code is doing (converting a file to something useful via magic). More importantly, it makes it easier to use that code in ways besides for loops.

def make_all_the_magics(file):
    df = pd.read_csv(file)
    brilliant_features = []
    for feature in features:
        brilliant_features.append(compute_brilliant_feature(df, feature))
    magical_business_insight = make_the_magic(brilliant_features)
    return magical_business_insight

results = []

for file in files:
    magical_business_insight = make_all_the_magics(file)
    results.append(magical_business_insight)

 

Step 2 is to parallelize it with Dask. Now, instead of using a for loop, where each iteration happens after the previous one, Dask will run them in parallel on a cluster. This should give us results far faster, and is only three lines longer than the for-loop code!

from dask import delayed
from dask.distributed import Client

# same function but with a Dask delayed decorator
@delayed
def make_all_the_magics(file):
    df = pd.read_csv(file)
    brilliant_features = []
    for feature in features:
        brilliant_features.append(compute_brilliant_feature(df, feature))
    magical_business_insight = make_the_magic(brilliant_features)
    return magical_business_insight

results = []
for file in files:
    magical_business_insight = make_all_the_magics(file)
    results.append(magical_business_insight)

# new Dask code
c = Client()
results = c.compute(results, sync=True)

 

How it works:

  • The delayed decorator transforms your function. Now, when you call it, it isn’t evaluated. Instead, you get back a delayed object, which Dask can execute later.
  • Client().compute sends all those delayed objects to the Dask cluster, where they are evaluated in parallel! That’s it, you win!
  • Instantiating a Client automatically provisions a LocalCluster. This means that the Dask parallel workers are all processes on the same machine as the one calling Dask. This makes for a concise example. For real work, I recommend creating local clusters in the terminal.

 

Practical Topics

 
 
The above stops where most Dask tutorials stop. I’ve used this approach with my own work, and with numerous customers, and a couple practical issues always arise. These next tips will help you go from that textbook example above to more useful methods in practice by covering two topics that constantly come up: large objects and error handling.
 

Large Objects

 
In order to compute functions on a distributed cluster, the objects that the functions are called on need to be sent to the workers. This can lead to performance issues, since those need to be serialized (pickled) on your computer, and sent over the network. Imagine you were doing processes on gigabytes of data–you don’t want to have to transfer that each time a function runs on it. If you accidentally send large objects, you may see a message from Dask like this:

Consider scattering large objects ahead of time
with client.scatter to reduce scheduler burden and
keep data on workers

 

There are two ways to stop this from happening: you can send smaller objects to workers so the burden isn’t so bad, or you can try to send each object to a worker only once, so you don’t keep having to make transfers.
Fix 1: send small objects when possible
This example is good, because we’re sending a file path (small string), instead of the dataframe.

# good, do this
results = []
for file in files:
    magical_business_insight = make_all_the_magics(file)
    results.append(magical_business_insight)

 

Below is what not to do. Both because you would be doing CSV reading (expensive and slow) in the loop, which is not parallel, but also because we’re now sending dataframes (which can be large).

# bad, do not do this
results = []
for file in files:
    df = pd.read_csv(file)
    magical_business_insight = make_all_the_magics(df)
    results.append(magical_business_insight)

 

Often times, code can be rewritten to change where data is being managed–either on the client or on the workers. Depending on your situation, there may be huge time savings by thinking through what functions take as input and how data transfers can be minimized.
Fix 2: send objects only once
If you have to send a big object, don’t send it multiple times. For example, if I need to send a big model object in order to compute, simply adding the parameter will serialize the model multiple times (once per file)

# bad, do not do this
results = []
for file in files:
    # big model has to be sent to a worker each time the function is called
    magical_business_insight = make_all_the_magics(file, big_model)
    results.append(magical_business_insight)

 

I can tell Dask not to do that, by wrapping it in a delayed object.

# good, do this
results = []
big_model = client.scatter(big_model) #send the model to the workers first

for file in files:
    magical_business_insight = make_all_the_magics(file, big_model)
    results.append(magical_business_insight)

 

 

Handling Failure

 
As your computational tasks grow, often times you’ll want to be able to power through failure. In this case, maybe 5% of my CSVs have bad data that I can’t handle. I’d like to process 95% of the CSVs successfully, but keep track of the failures so I can adjust my methods and try again.

This loop does this.

import traceback
from distributed.client import wait, FIRST_COMPLETED, ALL_COMPLETED

queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))]

while queue:
    result = wait(queue, return_when=FIRST_COMPLETED)
    for future in result.done:
        index = futures_to_index[future]
        if future.status == 'finished':
            print(f'finished computation #{index}')
            results[index] = future.result()
        else:
            print(f'errored #{index}')
            try:
                future.result()
            except Exception as e:
                results[index] = e
                traceback.print_exc()
    queue = result.not_done

print(results)

 

Since this function is fairly complicated at first glance, let’s break it down.

queue = c.compute(results)
futures_to_index = {fut: i for i, fut in enumerate(queue)}
results = [None for x in range(len(queue))]

 

We call compute on results, but since we’re not passing sync=True, we immediately get back futures, which represent the computation, which has not completed yet. We also create a mapping from the future itself, to the _n_th input argument that generated it. Finally, we populate a list of results filled with Nones for now.

while queue:
    result = wait(queue, return_when=FIRST_COMPLETED)

 

Next, we wait for results, and we process them as they come in. When we wait for futures, they are separated into futures that are done, and those that are not_done.

        if future.status == 'finished':
            print(f'finished computation #{index}')
            results[index] = future.result()

 

If the future is finished, then we print that we succeeded, and we store the result.

        else:
            print(f'errored #{index}')
            try:
                future.result()
            except Exception as e:
                results[index] = e
                traceback.print_exc()

 

Otherwise, we store the exception and print the stack trace.

    queue = result.not_done

 

Finally, we set the queue to those futures that have not yet been completed.
 

Conclusion

 
 
Dask can definitely save you time. If you spend time waiting for code to run, you should use these simple tips to parallelize your work. There are also many advanced things you can do with Dask, but this is a good starting point.

 
Bio: Hugo Shi is Founder of Saturn Cloud, the go-to cloud workspace to scale Python, collaborate, deploy jobs, and more.

Original. Reposted with permission.

Related: