Integrating Spark with scikit-learn, visualizing eigenvectors, and fun!

Three topics in this post, to make up for the long hiatus!


Apache Spark’s MLlib has built-in support for many machine learning algorithms, but not everything of course. But one can nicely integrate scikit-learn (sklearn) functions to work inside of Spark, distributedly, which makes things very efficient. That’s what I’m going to be talking about here.

As a practical example, let’s consider k-Nearest-Neighbors (k-NN). Spark’s MLlib doesn’t have built-in support for this, but scikit-learn does.

So let’s talk about sklearn for a minute. If you have a large number of points, say a million or more, and you want to obtain nearest neighbors for all of them (as may be the case with a k-NN-based recommender system), sklearn’s NearestNeighbors on a single machine can be hard to work with. The fit() method isn’t what takes a long time, it’s subsequently producing the results for the large number of queries with kneighbors() that is expensive:

In the most straightforward deployment, if you try to send kneighbors() all point vectors in a single large matrix and ask it to come up with nearest neighbors for all of them in one fell swoop, it quickly exhausts the RAM and brings the machine to a crawl. Alternatively, the batch iteration method that I mentioned before is a good solution: after performing the initial fit, you can break the large matrix into chunks and obtain their neighbors chunk by chunk. This eases memory consumption, but can take a long time.

There are of course approximate nearest-neighbor implementations such as Spotify’s Annoy. In my use case, Annoy actually did worse than sklearn’s exact neighbors, because Annoy does not have built-in support for matrices: if you want to evaluate nearest neighbors for n query points, you have to loop through each of your n queries one at a time, whereas sklearn’s k-NN implementation can take in a single matrix containing many query points and return nearest neighbors for all of them at a blow, relatively quickly. Your mileage may vary. I’ll talk about Annoy again a little later.

To summarize the problem:

  • sklearn has good support for k-NN; Spark doesn’t.
  • sklearn’s k-NN fit() isn’t a problem
  • sklearn’s k-NN kneighbors() is a computational bottleneck for large data sets; is a good candidate for parallelization

This is where Spark comes in. All we have to do is insert kneighbors() into a Spark map function after setting the stage for it. This is especially neat if you’re already working in Spark and/or if your data is already in HDFS to begin with, as is commonly the case.

Below is a simplified Python (PySpark) code snippet to make this approach clear:

# Imports
from pyspark import SparkConf, SparkContext
from sklearn.neighbors import NearestNeighbors

# Let's say we already have a Spark object containing
# all our vectors, called myvecs

# Create kNN tree locally, and broadcast
myvecscollected = myvecs.collect()
knnobj = NearestNeighbors().fit(myvecscollected)
bc_knnobj = sc.broadcast(knnobj)

# Get neighbors for each point, distributedly
results = x: bc_knnobj.value.kneighbors(x))

Boom! That’s all you need. The key point in the above code is that we were able to pass sklearn’s NearestNeighbors’ kneighbors() method inside of Spark’s map(), which means that it can be parallel-y and nicely handled by Spark.

(You can do the same thing using Annoy instead of sklearn, except that instead of broadcasting the Annoy object to workers, you need to serialize it to a file and distribute the file to workers instead. This code shows you how.)

In my use case, harnessing Spark to distribute my sklearn code brought my runtime down from hours to minutes!

Update: between the time I first considered this problem and now, there has also emerged a Spark package for distributing sklearn functionality over Spark, as well as a more comprehensive integration called sparkit-learn. So there are several solutions available now. I still like the approach shown above for its simplicity, and for not requiring any extraneous code.


A beautiful interactive visualization of eigenvectors, courtesy of the wonderful folks at Setosa.

The thing that I love about this viz is that it doesn’t just show how eigenvectors are computed, it gives you an intuition for what they mean.


Lastly, and just for fun: Is it Pokemon or Big Data? ☺


Minibatch learning for large-scale data, using scikit-learn

Let’s say you have a data set with a million or more training points (“rows”). What’s a reasonable way to implement supervised learning?

One approach, of course, is to only use a subset of the rows. This has its merits, but there may be various reasons why you want to use the entire available data. What then?

Andy Müller created an excellent cheat sheet, thumbnailed below, showing which machine learning techniques are likely to work best in different situations (clickable version here). It’s obviously not meant to be a rigid rule, but it’s still a good place to start answering the question above, or most similar questions.


What we see from the above is that our situation points us towards Stochastic Gradient Descent (SGD) regression or classification.

Why SGD? The problem with standard (usually gradient-descent-based) regression/classification implementations, support vector machines (SVMs), random forests etc is that they do not effectively scale to the data size we are talking, because of the need to load all the data into memory at once and/or nonlinear computation time. SGD, however, can deal with large data sets effectively by breaking up the data into chunks and processing them sequentially, as we will see shortly; this is often called minibatch learning. The fact that we only need to load one chunk into memory at a time makes it useful for large-scale data, and the fact that it can work iteratively allows it to be used for online learning as well. SGD can be used for regression or classification with any regularization scheme (ridge, lasso, etc) and any loss function (squared loss, logistic loss, etc).

What is SGD? It’s been explained very nicely by Andrew Ng in his Coursera class (Week 10: Large Scale Machine Learning), and Léon Bottou has a somewhat more in-depth tutorial on it. Their explanations are excellent, and there’s no point in my duplicating them, so I’ll move on to implementation using Python and the scikit-learn (sklearn) library.

The key feature of sklearn’s SGDRegressor and SGDClassifier classes that we’re interested in is the partial_fit() method; this is what supports minibatch learning. Whereas other estimators need to receive the entire training data in one go, there is no such necessity with the SGD estimators. One can, for instance, break up a data set of a million rows into a thousand chunks, then successively execute partial_fit() on each chunk. Each time one chunk is complete, it can be thrown out of memory and the next one loaded in, so memory needs are limited to the size of one chunk, not the entire data set.

(It’s worth mentioning that the SGD estimators are not the only ones in sklearn that support minibatch learning; a variety of others are listed here. One can use this approach with any of them.)

Finally, the use of a generator in Python makes this easy to implement.

Below is a piece of simplified Python code for instructional purposes showing how to do this. It uses a generator called ‘batcherator’ to yield chunks one at a time, to be iteratively trained on using partial_fit() as described above.

from sklearn.linear_model import SGDRegressor

def iter_minibatches(chunksize):
    # Provide chunks one by one
    chunkstartmarker = 0
    while chunkstartmarker < numtrainingpoints:
        chunkrows = range(chunkstartmarker,chunkstartmarker+chunksize)
        X_chunk, y_chunk = getrows(chunkrows)
        yield X_chunk, y_chunk
        chunkstartmarker += chunksize

def main():
    batcherator = iter_minibatches(chunksize=1000)
    model = SGDRegressor()

    # Train model
    for X_chunk, y_chunk in batcherator:
        model.partial_fit(X_chunk, y_chunk)

    # Now make predictions with trained model
    y_predicted = model.predict(X_test)

We haven’t said anything about the getrows() function in the code above, since it pretty much depends on the specifics of where the data resides. Common situations might involve the data being stored on disk, stored in distributed fashion, obtained from an interface etc.

Also, while this simplistic code calls SGDRegressor with default arguments, this may not be the best thing to do. It is best to carry out careful cross-validation to determine the best hyperparameters to use, especially for regularization. There is a bunch more practical info on using sklearn’s SGD estimators here.

Hopefully this post, and the links within, give you enough info to get started. Happy large-scale learning!