Sensei Map Reduce

A tutorial of how to implement basic Sensei map reduce Jobs.

Intent

It's not that easy to extend the Sensei query functionality. At minimum you would need to implement your own FacetHandler, which would require at least a week of ramping up with Bobo Architecture. One will need to understand how collectors, comparators, facets, scoring functions, explanations work. All these stuff is not easy to grasp, especially because that code is performance critical and our team needed to consider trade-offs between readability and writing an extremely efficient code(reusing arrays, avoiding autoboxing, polymorphism, object creation, etc). Moreover you can not control how the results produced by the facet handler on the segment level, will be merged together on partition and cluster node level. That's why we didn't include group by by multiple columns and aggregation functions in the first release.

Our team believes that Sensei map reduce will help users to overcome the aforementioned difficulties. This way it would be easy to implement Business Intelligence on top of Sensei with the less than 1 second latency 

Interface

/**
 * By extending this interface, one can access the Sensei segment data, produce intermediate results, aggregate those results on 
 * the partition, node and cluster level. This is much simple than to implement your own facet handler. 
 * Also this allows to enhance the bobo/Sensei merging logic
 *
 * @param 
 * @param 
 */
public interface SenseiMapReduce extends Serializable {
  /**
   * "mapReduce":{"function":"com.senseidb.search.req.mapred.functions.MaxMapReduce","parameters":{"column":"groupid"}} 
   * the argument corresponds to the parameters object in Json request. It is used to initialize the mapred job
   * 
   */
  public void init(JSONObject params);
  /**
   * The map function. It can get the docId  from the docIds array containing value from 0 to docIdCount. 
   * All the docIds with array indexes >= docIdCount should be ignored
   * One can simply get the document's uid by calling uids[docId]
   * @param docIds
   * @param docIdCount
   * @param uids
   * @param accessor is used to get field's values 
   * @param facetCountsAccessor 
   * @return arbitrary map function results
   */
  public MapResult map(IntArray docIds, int docIdCount, long[] uids, FieldAccessor accessor, FacetCountAccessor facetCountsAccessor);
  /**
   * Merge map results objects to reduce memory and serialization costs. If this method will not merge map results, there is a high chance, that you'd get 
   * outOfMemory in case there is a significant number of documents indexed
   * @param mapResults
   * @return
   */
  public List  combine(List mapResults, CombinerStage combinerStage);
  /**
   * Reduce the merged map results
   * @param combineResults
   * @return
   */
  public ReduceResult  reduce(List combineResults);
  /**
   * Converts the result of the reduce function into JsonObject, so that it can be sent back to the client
   * @param reduceResult
   * @return
   */
  public JSONObject  render(ReduceResult reduceResult);
}

Example

Let's implement the count group by on multiple columns

public class CountGroupByMapReduce implements SenseiMapReduce<HashMap<String, IntContainer>, ArrayList<GroupedValue>> {
private String[] columns;
public void init(JSONObject params) {
    try {
      JSONArray columnsJson = params.getJSONArray("columns");
      columns = new String[columnsJson.length()];
      for (int i = 0; i < columnsJson.length(); i++) {
        columns[i] = columnsJson.getString(i);
      }
    } catch (JSONException ex) {
      throw new RuntimeException(ex);
    }
}

Secondly we will create the function, that will map column values into the compound key

private String getKey(String[] columns, FieldAccessor fieldAccessor, int docId) {
    StringBuilder key = new StringBuilder(fieldAccessor.getString(columns[0], docId));
    for (int i = 1; i < columns.length; i++) {
      key.append(":").append(fieldAccessor.get(columns[i], docId).toString());
    }
    return key.toString();
  }

For the sake of performance let's store intermediate results as primitive ints

public static class IntContainer implements Serializable {
    public int value;
    public IntContainer(int value) {
      super();
      this.value = value;
    }
    public IntContainer add(int value) {
      this.value += value;
      return this;
    }
  }

Now we're ready implement the map function :

public Map<String, IntContainer> map(int[] docIds, int docIdCount, long[] uids, FieldAccessor accessor, FacetCountAccessor facetCountsAccessor) {
    HashMap<String, IntContainer> ret = new HashMap<String, IntContainer>();
    for (int i = 0; i < docIdCount; i++) {     
      String key = getKey(columns, accessor, docIds[i]);
      IntContainer count = ret.get(key);
      if (!ret.containsKey(key)) {
        ret.put(key, new IntContainer(1));
      } else {
        count.add(1);
      }
    }
    return ret;
  }

And the combine function to merge the intermediate results on partition and Node level:

@Override
  public List<Map<String, IntContainer>> combine(List<Map<String, IntContainer>> mapResults) {
    HashMap<String, IntContainer> ret = mapResults.get(0);
    for (int i = 1; i < mapResults.size(); i++) {
      HashMap<String, IntContainer> map = mapResults.get(i);
      for (String key : map.keySet()) {
        IntContainer count = ret.get(key);
        if (count != null) {
          count.add(map.get(key).value);
        } else {
          ret.put(key, map.get(key));
        }
      }
    }
   
    return java.util.Arrays.asList(ret);
  }

In the reduce function we'll do the same as in the combine method and also sort the results

>public List<GroupedValue> reduce(List<Map<String, IntContainer>> combineResults) {
    HashMap<String, IntContainer> retMap = combineResults.get(0);
    for (int i = 1; i < combineResults.size(); i++) {
      HashMap<String, IntContainer> map = combineResults.get(i);
      for (String key : map.keySet()) {
        IntContainer count = retMap.get(key);
        if (count != null) {
          count.add(map.get(key).value);
        } else {
          retMap.put(key, map.get(key));
        }
      }
    }
    ArrayList<GroupedValue> ret = new ArrayList<CountGroupByMapReduce.GroupedValue>();
    for (Map.Entry<String, IntContainer> entry : retMap.entrySet()) {
      ret.add(new GroupedValue(entry.getKey(), entry.getValue().value));
    }
    Collections.sort(ret);
    return ret;
  }
  public static class GroupedValue implements Comparable {
    String key;
    int value;
    public GroupedValue(String key, int value) {
      super();
      this.key = key;
      this.value = value;
    }
    @Override
    public int compareTo(Object o) {
      return ((GroupedValue) o).value - value;
    }   
  }

The last step is to render the results back to Json:

public JSONObject render(ArrayList<GroupedValue> reduceResult) {
    try {
      List<JSONObject> ret = new ArrayList<JSONObject>();
      for (GroupedValue grouped : reduceResult) {
        ret.add(new JSONObject().put(grouped.key, grouped.value));
      }
      return new JSONObject().put("groupedCounts", new JSONArray(ret));
    } catch (JSONException ex) {
      throw new RuntimeException(ex);
    }
  }

Running the example

The implemented class should be in the Sensei classpath. You can add the jar to the <<conf.dir>>/ext directory, on startup the Sensei will include all the jars in that directory to the classpath

Let's start the car demo example described at http://senseidb.github.com/sensei/gettingStarted.html . Open localhost:8080 and send the following query :

{
 "size":0,
 //Let's limit the data set to all the cars that are not gold
 "filter": {"terms": {"color": {"excludes": ["gold"],"includes": [],"operator": "or"}}},
 "mapReduce": {
  "function": "com.senseidb.search.req.mapred.CountGroupByMapReduce",
  "parameters": {"columns": ["groupid","color"]}
 }
}

In the mapReduceResult section of the response you should see the grouped counts

Additional benefits of the Sensei Map Reduce:

The map reduce is completely integrated into the Sensei query, that means you can specify queries, filters, selections so that the map reduce will be run on the data set filtered by those queries

BQL integration:

Map reduce functions could be executed from BQL. Such functions as sum, avg, count, min, max could be specified in the select clause. They might be used in conjunction with group by clause. For example:

select sum(cost), avg(year) where tag='automatic' group by groupid, color
select distinctCount(year) where tag='manual'

One could also invoke an arbitrary map reduce function using the 'EXECUTE' clause

select from Sensei where color = 'pink' execute(com.senseidb.search.req.mapred.functions.CustomMapReduce, year:2001, myMap:{'limit':1000, 'groupBy':'year'})