Etsy Icon>

Code as Craft

Conjecture: Scalable Machine Learning in Hadoop with Scalding main image

Conjecture: Scalable Machine Learning in Hadoop with Scalding



Predictive machine learning models are an important tool for many aspects of e-commerce.  At Etsy, we use machine learning as a component in a diverse set of critical tasks. For instance, we use predictive machine learning models to estimate click rates of items so that we can present high quality and relevant items to potential buyers on the site.  This estimation is particularly important when used for ranking our cost-per-click search ads, a substantial source of revenue. In addition to contributing to on-site experiences, we use machine learning as a component of many internal tools, such as routing and prioritizing our internal support e-mail queue.  By automatically categorizing and estimating an "urgency" for inbound support e-mails, we can assign support requests to the appropriate personnel and ensure that urgent requests are handled by staff more rapidly, helping to ensure a good customer experience.

To quickly develop these types of predictive models while making use of our MapReduce cluster, we decided to construct our own machine learning framework, open source under the name "Conjecture."  It consists of three main parts:

  1. Java classes which define the machine learning models and data types.
  2. Scala methods which perform MapReduce training using Scalding.
  3. PHP classes which use the produced models to make predictions in real-time on the web site.

This article is intended to give a brief introduction to predictive modeling, an overview of Conjecture’s capabilities, as well as a preview of what we are currently developing.

Predictive Models

The main goal in constructing a predictive model is to make a function which maps an input into a prediction.  The input can be anything - from the text of an email, to the pixels of an image, or the list of users who interacted with an item.  The predictions we produce currently are of two types: either a real value (such as a click rate) or a discrete label (such as the name of an inbox in which to direct an email).

The only prerequisite for constructing such a model is a source of training examples: pairs consisting of an input and its observed output.  In the case of click rate prediction these can be constructed by examining historic click rates of items.  For the classification of e-mails as urgent or not, we took the historic e-mails as the inputs, and an indicator of whether the support staff had marked the email as urgent or not after reading its contents.

Feature Representation

Having gathered the training data, then next step is to convert it into a representation which Conjecture can understand.  As is common in machine learning, we convert the raw input into a feature representation, which involves evaluating several "feature functions" of the input and constructing a feature vector from the results.  For example, to classify emails the feature functions are things like: the indicator of whether the word "account" is in the email, whether the email is from a registered user or not, whether the email is a follow up to an earlier email and so on.

Additionally, for e-mail classification we also included subsequences of words which appeared in the email. For example the feature representation of an urgent email which was from a registered user, and had the word "time" in the subject, and the string "how long will it take to receive our item?" in the body may look like:

"label": {"value" : 1.0},
  "vector" : {
    "subject___time" : 1.0,
    "body___will::it::take" : 1.0,
    "body___long::will": 1.0,
    "body___to::receive::our" : 1.0,
    "is_registered___true" : 1.0,

We make use of a sparse feature representation, which is a mapping of string feature names to double values.  We use a modified GNU trove hashmap to store this information while being memory efficient.

Many machine learning frameworks store features in a purely numerical format. While storing information as, for instance, an array of doubles is far more compact than our "string-keyed vectors", model interpretation and introspection becomes much more difficult. The choice to store the names of features along with their numeric value allows us to easily inspect for any weirdness in our input, and quickly iterate on models by finding the causes of problematic predictions.

Model Estimation

Once a dataset has been assembled, we can estimate the optimal predictive model.  In essence we are trying to find a model that makes predictions which tend to agree with the observed outcomes in the training set.  The statistical theory surrounding "Empirical Risk Minimization" tells us that under some mild conditions, we may expect a similar level of accuracy from the model when applied to unseen data.

Conjecture, like many current libraries for performing scalable machine learning, leverages a family of techniques known as online learning. In online learning, the model processes the labeled training examples one at a time, making an update to the underlaying prediction function after each observation.  While the online learning paradigm isn't compatible with every machine learning technique, it is a natural fit for several important  classes of machine learning models such as logistic regression and large margin classification, both of which are implemented in Conjecture.

Since we often have datasets with millions of training examples, processing them sequentially on a single machine is unfeasible. Some form of parallelization is required. However, traditionally the online learning framework does not directly lend itself to a parallel method for model training. Recent research into parallelized online learning in Hadoop gives us a way to perform sequential updates of many models in parallel across many machines, each separate process consuming a fraction of the total available training data. These "sub-models" are aggregated into a single model that can be used to make predictions or, optionally, feed back into another "round" of the process for the purpose of further training. Theoretical results tell us, that when performed correctly, this process will result in a reliable predictive model, similar to what would be generated had there been no parallelization. In practice, we find the models converge to an accurate state quickly, after a few iterations.

 Conjecture provides an implementation for parallelized online optimization of machine learning models in Scalding,  a Scala wrapper for Cascading -- a package which plans workflows consisting of several MapReduce jobs each.  Scalding also abstracts over the key-value pairs required by MapReduce, and permits arbitrary n-ary tuples to be used as the data elements. In Conjecture, we train a separate model on each mapper using online updates, by making use of the map-side aggregation functionality of Cascading. Here, in each mapper process, data is grouped by common keys and a reduce function is applied. Data is then sent across the network to reduce nodes which complete the aggregation. This map-side aggregation is conceptually the same as the combiners of MapReduce, though the work resides in the same process as the map task itself.  The key to our distributed online machine learning algorithm is in the definition of appropriate reduce operations so that map-side aggregators will implement as much of the learning as possible.  The alternative -- shuffling the examples to reducers processes across the MapReduce cluster and performing learning there -- is slower as it requires much more communication.

During training, the mapper processes consume an incoming stream of training instances. The mappers take these examples and emit pairs consisting of the example and an "empty" (untrained) predictive model. This sequence of  pairs is passed to the aggregator, where the actual online learning is performed. The aggregator process implements a reduce function which takes two such pairs and produces a single pair with an updated model.  Call the pairs a and b, each with a member model and a member called example. Pairs "rolled up" by this reduce process always contain a model, and an example which has not yet been used for training.  When the reduce operation is consuming two models which both have some training, they are merged (e.g., by summing or averaging the parameter values) otherwise, we continue to train whichever model has some training already.  Due to the order in which Cascading calls the reduce function in the aggregator, we end up building a single model on each machine, these are then shuffled to a single reducer where they are merged.  Finally we can update the final model on the one labeled example which it has not yet seen.  Note that the reduce function we gave is not associative -- the order in which the pairs are processed will affect the output model.  However, this approach is robust in that it will produce a useful model irrespective of the ordering. The logic for the reduce function is:

train_reduce(a,b) = {
  // neither pair has a trained model,
  // update a model on one example,
  // emit that model and the other example 
  if(a.model.isEmpty && b.model.isEmpty) {  
    (b.model.train(a.example), b.example)
  // one model is trained, other isn't.
  // update trained model, emit that
  // and the other example
  if(!a.model.isEmpty && b.model.isEmpty) {
    (a.model.train(b.example), a.example)
  // mirroring second case
  if(a.model.isEmpty && !b.model.isEmpty) {
    (b.model.train(a.example), b.example)
  // both models are partially trained. 
  // update one model, merge that model
  // with the other. emit with
  // unobserved example
  if(!a.model.isEmpty && !b.model.isEmpty) {
    (b.model.merge(a.model.train(a.example)), b.example)

In conjecture, we extend this basic approach, consuming one example at a time, so that we can perform “mini batch” training. This is a variation of online learning where a small set of training examples are all used at once to perform a single update. This leads to more flexibility in the types of models we can train, and also lends better statistical properties to the resulting models (for example by reducing variance in the gradient estimates in the case of logistic regression). What’s more it comes at no increase in computational complexity over the standard training method. We implement mini-batch training by amending the reduce function to construct lists of examples, only performing the training when sufficiently many examples have been aggregated.

Model Evaluation

We implement a parallelized cross validation){:target="_blank"} so that we can get estimates of the performance of the output models on unseen inputs.  This involves splitting the data up into a few parts (called "folds" in the literature), training multiple models, each of which is not trained on one of the folds, and then testing each model on the fold which it didn’t see during training.  We consider several evaluation metrics for the models, such as accuracy, the area under the ROC curve, and so on.  Since this procedure yields one set of performance metrics for each fold, we take the appropriately weighted means of these, and the observed variance also gives an indication of the reliability of the estimate.  Namely if the accuracy of the classification is similar across all the folds then we may anticipate a similar level of accuracy on unobserved inputs.  On the other hand if there is a great discrepancy between the performance on different folds then it suggests that the mean will be an unreliable estimate of future performance, and possibly that either more data is needed or the model needs some more feature engineering.

Inevitably, building useful machine learning models requires iteration-- things seldom work very well right out of the box. Often this iteration involves inspecting a model, developing an understanding as to why a model operates in the way that it does, and then fixing any unusual or undesired behavior. Leveraging our detailed data representation, we have developed tools enabling the manual  examination of the learned models.  Such inspection should only be carried out for debugging purposes, or to ensure that features were correctly constructed, and not to draw conclusions about the data.  It is impossible to draw valid conclusions about parameter values without knowing their associated covariance matrix -- which we do not handle since it is presumably too large.

Applying the Models

As part of the model training, we output a JSON-serialized version of the model. This allows us to load the models into any platform we’d like to use. In our case, we want to use our models on our web servers, which use PHP. To accomplish this, we deploy our model file (a single JSON string encoding the internal model data structures) to the servers where we instantiate it using PHP’s json_decode() function. We also provide utility functions in PHP to process model inputs into the same feature representations used in Java/Scala, ensuring that a model is correctly applied. An example of a json-encoded conjecture model is below:

  "argString": "--zero_class_prob 0.5 --model mira --out_dir contact_seller 
                --date 2014_05_24 --folds 5 --iters 10 
                --input contact_seller/instances 
                --final_thresholding 0.001 --hdfs",
  "exponentialLearningRateBase": 1.0,
  "initialLearningRate": 0.1,
  "modelType": "MIRA",
  "param": {
    "freezeKeySet": false,
    "vector": {
      "__bias__": -0.16469815089457646,
      "subject___time": -0.01698080417481483,
      "body___will::it::take": 0.05834880357927012,
      "body___long::will": 0.0818060174986067991,
      "is_registered___true": -0.002215130480164454,

Currently the JSON-serialized models are stored in a git repository and deployed to web servers via Deployinator (see the Code as Craft post about it here). This architecture was chosen so that we could use our established infrastructure to deploy files from git to the web servers to distribute our models. We gain the ability to quickly prototype and iterate on our work, while also gaining the ability to revert to previous versions of a model at will. Our intention is to move to a system of automated nightly updates, rather than continue with the manually controlled Deployinator process.

Deployinator broadcasts models to all the servers running code that could reference Conjecture models, including the web hosts, and our cluster of gearman workers that perform asynchronous tasks, as well as utility boxes which are used to run cron jobs and ad hoc jobs. Having the models local to the code that’s referencing them avoids network overhead associated with storing models in databases; the process of reading and deserializing models, then making predictions is extremely fast.

Conclusions and Future Work

The initial release of Conjecture shares some of Etsy’s tooling for building classification and regression models at massive scale using Hadoop. This infrastructure is well-tested and practical, we’d like to get it in the hands of the community as soon as possible. However, this release represents only a fraction of the machine learning tooling that we use to power many features across the site.  Future releases of Conjecture will include tools for building cluster models and infrastructure for building recommender systems on implicit feedback data in Scalding. Finally, we will release "web code" written in PHP and other languages that can consume Conjecture models and make predictions efficiently in a live production environment.