Analyze Web of Data

This is an example for developers who wish to create and run analysis jobs against exports from sindice.

Sample demo project to start can be download from here.

The project can be built using maven.

To use this project within eclipse, first use the following command to create an eclipse project:
mvn eclipse:eclipse

The sindice export consists of a set of compressed files which contain one record for each line of the file. The record is formatted in json using the same schema than for the sindice cache. See 'Response Format' section here:
http://sindice.com/developers/cacheapi

This project contains a reader for the sindice exports and an example map reduce job for processing the dump.

The example job merely counts the occurrences of each predicate within sindice. See:

src/main/java/org/sindice/mapreduce/example/PredicateCount.java
/**
 * @project hbase-helper
 * @author Renaud Delbru [ 14 May  ]
 * @link http://renaud.delbru.fr/
 * @copyright Copyright (C) , All rights reserved.
 */
package org.sindice.mapreduce.example;
import java.io.IOException;
import java.io.StringReader;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.openrdf.model.Resource;
import org.openrdf.model.URI;
import org.openrdf.model.Value;
import org.openrdf.rio.StatementHandler;
import org.openrdf.rio.StatementHandlerException;
import org.openrdf.rio.ntriples.NTriplesParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * Example job to count all the predicates used in sindice.
 * @author robful
 *
 */
public class PredicateCount {
  static enum Time { PARSE_JSON_TIME, PROCESS_TRIPLES_TIME, WRITE_COUNTS_TIME }
  static enum Document { SKIPPED, ADDED, ALL }
  static enum Record {NO_URL,NO_EXPLICIT_CONTENT}
  static enum Triples {EXPLICIT,IMPLICIT}
  protected static final transient
  Logger logger = LoggerFactory.getLogger(PredicateCount.class);
  private final static IntWritable ONE = new IntWritable(1);
  public static class PredicateCounter
  extends Mapper, Text, IntWritable> {
    private int                  counter = 0;
    private long                 timer = 0;
    @Override
    protected void setup(final Context context)
    throws IOException, InterruptedException {
      counter = 0;
      timer = 0;
    }
    @Override
    public void map(final LongWritable key, 
		final Map mapContainingOneSindiceRecord, final Context context)
		throws IOException, InterruptedException {
      counter += 1;
      context.getCounter(Document.ALL).increment(1);
      long start = System.currentTimeMillis();
      String url = null;
      try {
        // ensure the map has a url.
        url = (String) mapContainingOneSindiceRecord.get("url");
        if(url == null){
          context.getCounter(Record.NO_URL).increment(1);
          return;
        }
        // we are interested in explicit and implicit content.
        // explicit are the statements declared, 
        // implicit are the statements inferred.
        List explicitContent = 
          (List) mapContainingOneSindiceRecord.get("explicit_content");
        List implicitContent = 
          (List) mapContainingOneSindiceRecord.get("implicit_content");
        // Skip this record if there are no explicit triples 
        // (old data in sindice to be cleaned...)
        if(explicitContent.size() ==0){
          logger.warn("No explicit_content for "+url);
          context.getCounter(Record.NO_EXPLICIT_CONTENT).increment(1);
          return;
        }
        // update some counters about how many triples.
        context.getCounter(Triples.EXPLICIT).increment(explicitContent.size());
        context.getCounter(Triples.IMPLICIT).increment(implicitContent.size());
        // convert the list of triples into a parse-able string.
        String nTriples = toNTripleString(explicitContent);
        // maintain a local count to minimise output from this job
        // this will be updated as the triples are parsed from the ntriples string.
        final Map predicateCount = new HashMap();
        // create a parser to receive the parsed triples.
        NTriplesParser x = new NTriplesParser();
        x.setStatementHandler( new StatementHandler(){
          @Override
          public void handleStatement(Resource s, URI p, Value o)
          throws StatementHandlerException {
            final String predicateUri = p.getURI();
              // count how many instances of each predicate are encountered.
              Integer count = predicateCount.get(predicateUri);
              if(count == null) count = 0;
              count+=1;
              predicateCount.put(predicateUri, count);
          }});
        // parse the triples, updating the predicate count map.
        x.parse(new StringReader(nTriples), url);
        // debug information about timing.
        context.getCounter(
          Time.PROCESS_TRIPLES_TIME).increment(System.currentTimeMillis() - start);
        start = System.currentTimeMillis();
        // write out the number of times each predicate was encountered.
        for(String predicateUri : predicateCount.keySet()){
          context.write(
            new Text(predicateUri), new IntWritable(predicateCount.get(predicateUri)));
        }
        // debug information about timing.
        context.getCounter(
        	Time.WRITE_COUNTS_TIME).increment(System.currentTimeMillis() - start);
        context.progress();
      }catch(InterruptedException e){
        logger.warn("Document "+key+" skipped "+(url==null?"":url), e);
        throw e; // bubble up the interruption.
      }catch (final Exception e) {
        logger.warn("Document "+key+" skipped "+(url==null?"":url), e);
        context.getCounter(Document.SKIPPED).increment(1);
        return;
      }
      timer += System.currentTimeMillis() - start;
      if (counter % 1000 == 1) {
        logger.info("Processed {} documents. Current: {}", counter, url);
        final double throughput = (int)(1000d / (timer / 1000d));
        logger.info("Update Throughtput: {} docs/second", throughput);
        timer = 0;
      }
    }
    /**
     * Currently the triples returned in the sindice object for 
     * explicit_content or implicit_content
     * might be either a single String or a list of Strings.
     * @param o
     * @return Either the single string of input, 
     * or the joined up lines of the array.
     */
    private String toNTripleString(List triples) {
        StringBuilder sb = new StringBuilder();
        for(String t : triples){
          sb.append(t);
        }
        return sb.toString();
    }
    @Override
    protected void cleanup(final Context context)
    throws IOException, InterruptedException {
      logger.info("Commiting hbase write buffer");
    }
  }
  /**
   * Reducer taken from Hadoop WordCount example.
   * @author robful
   *
   */
  public static class IntSumReducer 
  extends Reducer {
    private IntWritable result = new IntWritable();
    public void reduce(Text key, Iterable values, 
                       Context context
    ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }
  /**
   * Sets up the actual job.
   *
   * @param conf  The current configuration.
   * @param args  The command line parameters.
   * @return The newly created job.
   * @throws IOException When setting up the job fails.
   */
  public static Job createSubmitablePredicateCountJob(
  final Configuration conf, final Path inputPath, final Path outputPath)
  throws IOException {
    final Job job = new Job(conf, "PredicateCount-"+System.currentTimeMillis());
    job.setJarByClass(PredicateCount.class);
    job.setInputFormatClass(JSonLineInputFormat.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setMapperClass(PredicateCounter.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    FileInputFormat.addInputPath(job, inputPath);
    FileOutputFormat.setOutputPath(job, outputPath);
    return job;
  }
  public static Job createSubmitableSortCountJob(
    final Configuration conf, final Path inputPath, final Path outputPath)
  throws IOException {
    final Job job = new Job(conf, "PredicateCount-sort-"+System.currentTimeMillis());
    job.setJarByClass(PredicateCount.class);
    job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setMapperClass(InverseMapper.class);
    job.setNumReduceTasks(1);                 // write a single file
    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(Text.class);
    job.setNumReduceTasks(1);
    FileInputFormat.addInputPath(job, inputPath);
    FileOutputFormat.setOutputPath(job, outputPath);
    return job;
  }
  private void printUsage() {
    System.out.println("Usage : PredicateCount  " );
  }
  public int run(final String[] args) throws Exception {
    if (args.length < 1) {
      this.printUsage();
      return 1;
    }
    final Configuration conf = new Configuration();
    conf.set("mapred.child.java.opts", "-server -XX:+UseParallelGC -Xmx1024m");
    conf.setInt("io.file.buffer.size", 32 * 1024);
    conf.setInt("mapred.linerecordreader.maxlength",10000000); // skip anything larger
    Path outputPathUnsortedResults = new Path(args[1]+"-unsorted");
    Path outputPath = new Path(args[1]);
    final Job job = createSubmitablePredicateCountJob(
      conf, new Path(args[0]),outputPathUnsortedResults);
    final Job sortJob = createSubmitableSortCountJob(
      conf, outputPathUnsortedResults, outputPath);
    if(job.waitForCompletion(true)){
      return sortJob.waitForCompletion(true) ? 0 : 1;
    }else{
      return 1;
    }
  }
  public static void main(final String[] args) throws Exception {
    System.exit(new PredicateCount().run(args));
  }
}

The example can be run within your IDE using the test class:

src/test/java/org/sindice/mapreduce/example/PredicateCountTest.java
package org.sindice.mapreduce.example;
import static org.junit.Assert.*;
import java.util.Iterator;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Job;
import org.junit.Test;
import org.sindice.mapreduce.example.PredicateCount;
public class PredicateCountTest extends BaseHadoopTestCase{
  @Test
  public void foo() throws Exception{
    FileSystem fs = FileSystem.get(localConf);
    Path input = localFileToPath("src/test/data/json/sample.json.gz").getParent();
    Path countOutput = new Path("target/test/output-count");
    Path sortOutput = new Path("target/test/output-sort");
    fs.delete(countOutput, true);
    fs.delete(sortOutput, true);
    Job countJob = PredicateCount.
		createSubmitablePredicateCountJob(localConf, input , countOutput);
    assertTrue("count job failed",countJob.waitForCompletion(true));
    CounterGroup group = countJob.getCounters().
		getGroup("org.apache.hadoop.mapred.Task$Counter");
    assertEquals("Wrong number of map input records",5,
		group.findCounter("MAP_INPUT_RECORDS").getValue());
    assertEquals("Wrong number of map output records",35,
		group.findCounter("MAP_OUTPUT_RECORDS").getValue());
    assertEquals("Wrong number of reduce output records",8,
		group.findCounter("REDUCE_OUTPUT_RECORDS").getValue());
    final Job sortJob = PredicateCount.
		createSubmitableSortCountJob(localConf, countOutput, sortOutput);
    assertTrue("sort job failed",sortJob.waitForCompletion(true));
  }
}

To develop your own map reduce job, use the PredicateCount as an example, and create your own test cases to verify that your job behaves as expected.

To run your map/reduce job in a live hadoop cluster, first create an assembly file containing the required classes, then use the hadoop jar command. Three steps are involved.

  1. Create an assembly file containing the required classes.
    mvn assembly:assembly
    
  2. Obtain the sindice export from the sindice team and load it into your hdfs system.
    ~/hadoop/bin/hadoop fs -copyFromLocal sindice-export sindice-export
    
  3. Run your hadoop job against the sindice export
    ~/hadoop/bin/hadoop jar target/sindice-map-reduce-example-assembly.jar 
    org.sindice.mapreduce.example.PredicateCount sindice-export output-folder