Data Science in the Cloud with Dask

Scaling large data analyses for data science and machine learning is growing in importance. Dask and Coiled are making it easy and fast for folks to do just that. Read on to find out how.



By Hugo Bowne-Anderson, Head of Data Science Evangelism and VP of Marketing at Coiled

The capability to scale large data analyses is growing in importance when it comes to data science and machine learning, and at a rapid rate. Fortunately, tools like Dask and Coiled are making it easy and fast for folks to do just that.

Dask is a popular solution for scaling up analyses when working in the PyData Ecosystem and Python. This is the case because Dask is designed to parallelize any PyData library, and hence seamlessly works with the host of PyData tools.

Scaling up your analysis to utilize all the cores of a sole workstation is the first step when starting to work with a large dataset.

Next, to leverage a cluster on the cloud (Azure, Google Cloud Platform, AWS, and so on) you might need to scale out your computation.

Read on and we will:

  • Use pandas to showcase a common pattern in data science workflows,
  • Utilize Dask to scale up workflows, harnessing the cores of a sole workstation, and
  • Demonstrate scaling out our workflow to the cloud with Coiled Cloud.

Find all the code here on github.

Note: Before you get started, it’s important to think about if scaling your computation is actually necessary. Consider making your pandas code more efficient before you jump in. With machine learning, you can measure if more data will result in model improvement by plotting learning curves before you begin.

 

PANDAS AND ETL: A COMMON PATTERN IN DATA SCIENCE

 
First, we’ll use pandas on an in-memory dataset to introduce a common data science pattern. This is a 700 MB subset of the NYC taxi dataset, which is about 10 GB in total.

We want to see the scaling shine bright, so we picked a relatively boring workflow. Now we read in the data, massage it, create a feature, and save it to Parquet (not human-readable but vastly more efficient than CSV).

# Import pandas and read in beginning of 1st file
import pandas as pd
df = pd.read_csv("data_taxi/yellow_tripdata_2019-01.csv")

# Alter data types for efficiency
df = df.astype({
    "VendorID": "uint8",
    "passenger_count": "uint8",
    "RatecodeID": "uint8",
    "store_and_fwd_flag": "category",
    "PULocationID": "uint16",
    "DOLocationID": "uint16",    
})

# Create new feature in dataset: tip_ratio
df["tip_ratio"] = df.tip_amount / df.total_amount

# Write to parquet
df.to_parquet("data_taxi/yellow_tripdata_2019-01.parq")

 

This took roughly 1 minute on my laptop, a wait time for analysis we can tolerate (maybe).

Now we want to perform the same analysis on the dataset at large.

 

DASK: SCALING UP YOUR DATA SCIENCE

 
The 10GB size of the data set is more than the RAM on my laptop, so we can’t store it in memory.

Instead we could write a for loop:

for filename in glob("~/data_taxi/yellow_tripdata_2019-*.csv"):
	df = pd.read_csv(filename)
	...
	df.to_parquet(...)

 

However, the multiple cores on my laptop aren’t taken advantage of through this method, nor is this a graceful solution. Here comes Dask for single machine parallelism.

Importing several aspects of Dask, we’ll spin up a local cluster and launch a Dask client:

from dask.distributed import LocalCluster, Client
cluster = LocalCluster(n_workers=4)
client = Client(cluster)
client

 

Then we import Dask DataFrame, lazily read in the data, and perform the ETL pipeline just as we did with pandas before.

import dask.dataframe as dd

df = dd.read_csv(
	"data_taxi/yellow_tripdata_2019-*.csv",
	dtype={'RatecodeID': 'float64',
   	'VendorID': 'float64',
   	'passenger_count': 'float64',
   	'payment_type': 'float64'}
)

# Alter data types for efficiency
df = df.astype({
	"VendorID": "UInt8",
	"passenger_count": "UInt8",
	"RatecodeID": "UInt8",
	"store_and_fwd_flag": "category",
	"PULocationID": "UInt16",
	"DOLocationID": "UInt16",    
})

# Create new feature in dataset: tip_ratio
df["tip_ratio"] = df.tip_amount / df.total_amount
# Write to Parquet
df.to_parquet("data_taxi/yellow_tripdata_2019.parq")

 

Taking about 5 minutes on my laptop, we’ll call this tolerable (I guess). But, if we wanted to do something marginally more complex (which we commonly do!), this time would quickly increase.

If I had access to a cluster on the cloud, now would be the time to utilize it!

But first, let’s reflect on what we’ve just worked out:

  • We used a Dask DataFrame - a large, virtual dataframe divided along the index into various Pandas DataFrames
  • We’re working on a local cluster, made of:
    • A scheduler (which organizes and send the work / tasks to workers) and,
    • Workers, which compute those tasks
  • We’ve launched a Dask client, which is “the user-facing entry point for cluster users.”

In short - the client lives wherever you are writing your Python code and the client talks to the scheduler, passing it the tasks.

Dask

 

 

COILED: SCALING OUT YOUR DATA SCIENCE

 
And now what we’ve been waiting for - it’s time to burst to the cloud. If you had access to cloud resources (like AWS) and knew how to configure Docker and Kubernetes containers, you could get a Dask cluster launched in the cloud. This would be time consuming, however.

Enter a handy alternative: Coiled, which we’ll use here. To do so, I've signed into Coiled Cloud (the Beta is currently free compute!), pip installed coiled, and authenticated. Feel free to follow along and do this yourself.

pip install coiled --upgrade
coiled login   # redirects you to authenticate with github or google

 

We then perform our necessary imports, spin up a cluster (takes roughly a minute), and instantiate our client:

import coiled
from dask.distributed import LocalCluster, Client
cluster = coiled.Cluster(n_workers=10)
client = Client(cluster)

 

Next we import our data (this time from s3), and perform our analysis:

import dask.dataframe as dd

# Read data into a Dask DataFrame
df = dd.read_csv(
	"s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv",
	dtype={
    	'RatecodeID': 'float64',
   	'VendorID': 'float64',
   	'passenger_count': 'float64',
   	'payment_type': 'float64'
	},
	storage_options={"anon":True}
)

# Alter data types for efficiency
df = df.astype({
	"VendorID": "UInt8",
	"passenger_count": "UInt8",
	"RatecodeID": "UInt8",
	"store_and_fwd_flag": "category",
	"PULocationID": "UInt16",
	"DOLocationID": "UInt16",    
})

# Create new feature in dataset: tip_ratio
df["tip_ratio"] = df.tip_amount / df.total_amount

# Write to Parquet
df.to_parquet("s3://hugo-coiled-tutorial/nyctaxi-2019.parq")

 

How long did this take on Coiled Cloud? 30 seconds. This is an order of magnitude less time than it took on my laptop, even for this relatively straightforward analysis.

It’s easy to see the power of being able to do this set of analyses in a single workflow. We didn’t need to switch contexts or environments. Plus, it is straightforward to go back to using Dask from Coiled on my local workstation or pandas when we’re done. Cloud computing is great when needed, but can be a burden when it’s not. We just had an experience that was a lot less burdensome.

 

DO YOU NEED FASTER DATA SCIENCE?

 
You can get started on a Coiled cluster for free right now. Coiled also handles security, conda/docker environments, and team management, so you can get back to doing data science and focus on your job. Get started today on Coiled Cloud.

 
Bio: Hugo Bowne-Anderson Hugo Bowne-Anderson is Head of Data Science Evangelism and VP of Marketing at Coiled (@CoiledHQ, LinkedIn). He has extensive experience as a data scientist, educator, evangelist, content marketer, and data strategy consultant, in industry and basic research. He also has experience teaching data science at institutions such as Yale University and Cold Spring Harbor Laboratory, conferences such as SciPy, PyCon, and ODSC and with organizations such as Data Carpentry. He is committed to spreading data skills, access to data science tooling, and open source software, both for individuals and the enterprise.

Related: