Scaling Computer Vision Models with Dataflow

Scaling Machine Learning models is hard and expensive. We will shortly introduce the Google Cloud service Dataflow, and how it can be used to run predictions on millions of images in a serverless way.



By Pablo Soto, Founding Partner at Pento

Image

 

We will shortly introduce the Google Cloud service Dataflow, and how it can be used to run predictions on millions of images in a serverless way. No cluster creation, no maintenance, and you only pay for what you use. We will start by providing a context on why we think this is important, a brief introduction to a couple of concepts and then go directly into a use case.

Scaling Machine Learning models is hard and expensive. It is costly mainly for two reasons: because the infrastructure is expensive, and because your experimentation pipeline is slow causing your ML team to be constantly waiting for the results.

It is easy to underestimate how is expensive it is to have an ML team waiting for results. Not only because of the time wasted but also because people get frustrated and end up losing motivation.

One way to optimize your ML experimentation process it is to build and manage your own infrastructure. This is common in big companies, but this is often prohibitively expensive for smaller companies.

Traditional software development had similar challenges years ago, companies needed to scale, mainly driven by business needs (slow services/API/website don't scale). Cloud Providers saw an opportunity to provide services that allow companies to scale without upfront costs. They mainly approach this issue in two ways: services that are easier to manage and serverless services. The latter were particularly attractive for small companies that didn't have a big DevOps team or much money to pay for their own infrastructure.

A similar trend is happening in Machine Learning. This time is driven by the scarcity and high cost of ML talent. Like SW, small companies can't invest a huge amount of money to create a custom optimized infrastructure to ML, it requires a team of experts to build it and maintain it.

Cloud Providers saw an opportunity to add value similarly by providing Serverless solutions to ML. GCP, in particular, has done this for a while. GCP AI Platform offers different services for each stage of an ML project. But they also count on other services that can help solve ML challenges.

 

What is Dataflow?

 
Dataflow is a fully managed processing service, that uses Apache Beam as its programming model to define and execute pipelines. Dataflow is one of the many runners that Beam supports.

Apache Beam provides a portable API layer for building sophisticated data-parallel processing pipelines that may be executed across a diversity of execution engines, or runners.

Dataflow in particular has some interesting features:

  • Fully managed
  • Autoscaling
  • Easy implementation

 

What is Apache Beam?

 
As mentioned above, Beam is a programming model used to define and execute a processing pipeline. It provides an interface to create a processing pipeline that can be executed in multiple environments (such as Spark, Hadoop, Dataflow).

 

Use case: Extracting image features with a pre-trained ResNet50

 
As Computer Vision engineers, we often need to extract embeddings using a CNN model. Here we will present how to build a Dataflow pipeline that generates image feature embedding using a pre-trained ResNet50. Our pipeline will consist of 3 simple steps:

  1. Load images
  2. Extract features
  3. Store features

To implement our pipeline we are going to use the Apache Beam Python SDK, Keras, Pillow, and Click. Because we will use non-python dependencies we have to structure our code as it is described here.

The first thing we need to do is create a pipeline configuration:

if local:
    # Execute pipeline in your local machine.
    runner_options = {
        "runner": "DirectRunner",
    }
else:
    runner_options = {
        "runner": "DataflowRunner",
        "temp_location": "...",
        "staging_location.": "...",
        "max_num_workers": max_num_workers,
    }

options = PipelineOptions(
      project=project_id,
      job_name=job_name,
      region=region,
      **runner_options
  )

options.view_as(SetupOptions).save_main_session = True
options.view_as(SetupOptions).setup_file = os.path.join(
    pathlib.Path(__file__).parent.absolute(), "..", "setup.py")


As you can see the options change depending on whether we are executing the pipeline locally or on Dataflow.

Then we need to create a Pipeline:

with beam.Pipeline(options=options) as p:
        ...


p is our pipeline, to add new steps to the pipeline use the operator | as follow:

p = p | Task();


If you want to explicitly name the step you can use the operator >>:

p = p | ("task_name" >> Task());


And now we need to specify the steps for our pipeline. There are multiple ways to implement it:

  • Built-in implementations: BigQuery connector, ReadFile, etc.
  • Custom implementation: Map, Filter, ParDo
(p
 | "source" >> ReadFromText(input_path)
 | "load" >> beam.Map(load)
 | "extract" >> beam.Map(extract)
 | "store" >> beam.ParDo(store, output_path))


Here you can see we have added the three steps mentioned above. We are loading the paths to the images from a CSV file in Google Cloud, then we load the images, extract their embedding, and store them in Google Cloud Storage.

In all the steps in our pipeline we expect to receive one input and return one output. To apply these type steps we must use the method beam.Map. Alternatively, If you are dealing with steps that could potentially receive/return zero or more inputs/outputs, you could use beam.ParDo.

Now that the pipeline is defined, let's take a look at implementations.

 

Loading images

 
At this step, we need to download the image from GCS, load it as a Pillow Image, resize it, and convert it into a numpy array object. The reason why we download and resize in a single step, is that we want to reduce the data transferred between steps in the pipeline, thus decreasing the cost.

def load(path):
    """
    Receives an image path and returns a dictionary containing
    the image path and a resized version of the image as a np.array.
    """
    buf = GcsIO().open(path, mime_type="image/jpeg")
    img = Image.open(io.BytesIO(buf.read()))
    img = img.resize((IMAGE_HEIGHT, IMAGE_WIDTH), Image.ANTIALIAS)
    return {"path": path, "image": np.array(img)}


Beam provides a set of built-in Connector to handle I/O.

 

Extracting features

 
In this step, we need to load the ResNet model and run the predictions. To avoid loading the model multiple times on the same worker, a singleton wrapper was added to the FeatureExtractor class. Once we extract the embedding, we add it to the item dictionary, and as we do not longer need to have the full image loaded in memory, we remove it from dictionary.

def singleton(cls):
    instances = {}

    def get_instance(*args, **kwargs):
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)
        return instances[cls]

    return get_instance

@singleton
class Extractor:
    """
    Extract image embeddings using a pre-trained ResNet50 model.
    """
    def __init__(self):
        self.model = ResNet50(
            input_shape=(IMAGE_WIDTH, IMAGE_HEIGHT, 3),
            include_top=False,
            weights="imagenet",
            pooling=None,
        )

    def extract(self, image):
        return self.model.predict(np.expand_dims(image, axis=0))[0]

def extract(item):
    """
    Extracts the feature embedding from item["image"].
    """
    extractor = Extractor()
    item["embedding"] = extractor.extract(item["image"])
    # We do not longer need the image, remove it from the dictonary to free memory.
    del item["image"]
    return item


 

Storing features

 
Now that we have the embedding we just need to store it in GCS.

def store(item, output_path):
    """
    Store the image embeddings item["embedding"] in GCS.
    """
    name = item["path"].split("/")[-1].split(".")[0]
    path = os.path.join(output_path, f"{name}.npy")

    fin = io.BytesIO()
    np.save(fin, item["embedding"])

    fout = beam.io.gcp.gcsio.GcsIO().open(path, mode="w")
    fin.seek(0)
    fout.write(fin.read())


Dataflow allows you to specify the machine type, number of workers, objective, etc, providing a lot of flexibility to find the right configuration for the task at hand. You can check all the options here.

Once you run your Dataflow pipeline you will see logs about the status of the execution in your machine. Another alternative is to use Dataflow web UI. You can learn more about it on its doccumentation.

Dataflow UI

You can find the full code here.

Apache Beam also supports branching your pipeline. Let's say you want to create 2D visualizations of your embedding, you can do this by branching your pipeline in the following way:

p = (
   p
     | "source" >> ReadFromText(input_path)
     | "load" >> beam.Map(load)
     | "extract" >> beam.Map(extract)))

(p | "store" >> beam.ParDo(store, output_path))
(p | "reduce_dim" >> beam.ParDo(reduce_dim))


This allows Beam to execute store and reduce_dim independently, providing more flexibility to optimize the pipeline, and therefore improving its performance.

 

Scale

 
Dataflow provides multiple configurations that allow you to customize the way you want to scale, in a really easy way. By default, it chooses the values that improve performance. During the execution itself, Dataflow changes the configuration to optimize the performance, changing, for instance, the number of workers, the machine type, etc.

You can let Dataflow find out what is the best configuration each time you run the pipeline. But if you already know what your jobs need, such as the amount of RAM, or whether it is CPU intensive, what is the maximum throughput supported by your external services (i.e.: your DB or API capacity), it is better to specify this information directly. This will help Dataflow converge faster into the right configuration, speeding up the executing and reducing costs.

Dataflow should be enough to cover most of small/medium size companies requirements to scale. To have a number as reference, Dataflow is able to execute the use case presented above at rate of 5k images per seconds, using up to 1k workers. Here it is the list of Dataflow quotas, the main one being the 1k workers limit.

 

Summary

 
Apache Beam is a simple programming model that allows you to execute ML steps in a structured way and running it in different environments. One of these environments is Dataflow, a Beam execution engine that is fully managed, supports autoscaling, and allows us to optimize for different objectives.

Dataflow is a good tool to run your ML models at scale without too much investment upfront or maintenance. It provides a simple API and has an active OSS community. In this article, we show how simple it is to build an image embedding extractor using Apache Beam, and scale to millions of images. In this way you can save hundreds of dollars without worring about the infrastructure, while speeding up your experimentation process.

 

Resources

 

Python examples:

 
Bio: Pablo Soto is a Founding Partner at Pento, machine learning specialists.

Original. Reposted with permission.

Related: