/**
* @project hbase-helper
* @author Renaud Delbru [ 14 May ]
* @link http://renaud.delbru.fr/
* @copyright Copyright (C) , All rights reserved.
*/
package org.sindice.mapreduce.example;
import java.io.IOException;
import java.io.StringReader;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.openrdf.model.Resource;
import org.openrdf.model.URI;
import org.openrdf.model.Value;
import org.openrdf.rio.StatementHandler;
import org.openrdf.rio.StatementHandlerException;
import org.openrdf.rio.ntriples.NTriplesParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Example job to count all the predicates used in sindice.
* @author robful
*
*/
public class PredicateCount {
static enum Time { PARSE_JSON_TIME, PROCESS_TRIPLES_TIME, WRITE_COUNTS_TIME }
static enum Document { SKIPPED, ADDED, ALL }
static enum Record {NO_URL,NO_EXPLICIT_CONTENT}
static enum Triples {EXPLICIT,IMPLICIT}
protected static final transient
Logger logger = LoggerFactory.getLogger(PredicateCount.class);
private final static IntWritable ONE = new IntWritable(1);
public static class PredicateCounter
extends Mapper, Text, IntWritable> {
private int counter = 0;
private long timer = 0;
@Override
protected void setup(final Context context)
throws IOException, InterruptedException {
counter = 0;
timer = 0;
}
@Override
public void map(final LongWritable key,
final Map mapContainingOneSindiceRecord, final Context context)
throws IOException, InterruptedException {
counter += 1;
context.getCounter(Document.ALL).increment(1);
long start = System.currentTimeMillis();
String url = null;
try {
// ensure the map has a url.
url = (String) mapContainingOneSindiceRecord.get("url");
if(url == null){
context.getCounter(Record.NO_URL).increment(1);
return;
}
// we are interested in explicit and implicit content.
// explicit are the statements declared,
// implicit are the statements inferred.
List explicitContent =
(List) mapContainingOneSindiceRecord.get("explicit_content");
List implicitContent =
(List) mapContainingOneSindiceRecord.get("implicit_content");
// Skip this record if there are no explicit triples
// (old data in sindice to be cleaned...)
if(explicitContent.size() ==0){
logger.warn("No explicit_content for "+url);
context.getCounter(Record.NO_EXPLICIT_CONTENT).increment(1);
return;
}
// update some counters about how many triples.
context.getCounter(Triples.EXPLICIT).increment(explicitContent.size());
context.getCounter(Triples.IMPLICIT).increment(implicitContent.size());
// convert the list of triples into a parse-able string.
String nTriples = toNTripleString(explicitContent);
// maintain a local count to minimise output from this job
// this will be updated as the triples are parsed from the ntriples string.
final Map predicateCount = new HashMap();
// create a parser to receive the parsed triples.
NTriplesParser x = new NTriplesParser();
x.setStatementHandler( new StatementHandler(){
@Override
public void handleStatement(Resource s, URI p, Value o)
throws StatementHandlerException {
final String predicateUri = p.getURI();
// count how many instances of each predicate are encountered.
Integer count = predicateCount.get(predicateUri);
if(count == null) count = 0;
count+=1;
predicateCount.put(predicateUri, count);
}});
// parse the triples, updating the predicate count map.
x.parse(new StringReader(nTriples), url);
// debug information about timing.
context.getCounter(
Time.PROCESS_TRIPLES_TIME).increment(System.currentTimeMillis() - start);
start = System.currentTimeMillis();
// write out the number of times each predicate was encountered.
for(String predicateUri : predicateCount.keySet()){
context.write(
new Text(predicateUri), new IntWritable(predicateCount.get(predicateUri)));
}
// debug information about timing.
context.getCounter(
Time.WRITE_COUNTS_TIME).increment(System.currentTimeMillis() - start);
context.progress();
}catch(InterruptedException e){
logger.warn("Document "+key+" skipped "+(url==null?"":url), e);
throw e; // bubble up the interruption.
}catch (final Exception e) {
logger.warn("Document "+key+" skipped "+(url==null?"":url), e);
context.getCounter(Document.SKIPPED).increment(1);
return;
}
timer += System.currentTimeMillis() - start;
if (counter % 1000 == 1) {
logger.info("Processed {} documents. Current: {}", counter, url);
final double throughput = (int)(1000d / (timer / 1000d));
logger.info("Update Throughtput: {} docs/second", throughput);
timer = 0;
}
}
/**
* Currently the triples returned in the sindice object for
* explicit_content or implicit_content
* might be either a single String or a list of Strings.
* @param o
* @return Either the single string of input,
* or the joined up lines of the array.
*/
private String toNTripleString(List triples) {
StringBuilder sb = new StringBuilder();
for(String t : triples){
sb.append(t);
}
return sb.toString();
}
@Override
protected void cleanup(final Context context)
throws IOException, InterruptedException {
logger.info("Commiting hbase write buffer");
}
}
/**
* Reducer taken from Hadoop WordCount example.
* @author robful
*
*/
public static class IntSumReducer
extends Reducer {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
/**
* Sets up the actual job.
*
* @param conf The current configuration.
* @param args The command line parameters.
* @return The newly created job.
* @throws IOException When setting up the job fails.
*/
public static Job createSubmitablePredicateCountJob(
final Configuration conf, final Path inputPath, final Path outputPath)
throws IOException {
final Job job = new Job(conf, "PredicateCount-"+System.currentTimeMillis());
job.setJarByClass(PredicateCount.class);
job.setInputFormatClass(JSonLineInputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setMapperClass(PredicateCounter.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
return job;
}
public static Job createSubmitableSortCountJob(
final Configuration conf, final Path inputPath, final Path outputPath)
throws IOException {
final Job job = new Job(conf, "PredicateCount-sort-"+System.currentTimeMillis());
job.setJarByClass(PredicateCount.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
job.setMapperClass(InverseMapper.class);
job.setNumReduceTasks(1); // write a single file
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);
job.setNumReduceTasks(1);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
return job;
}
private void printUsage() {
System.out.println("Usage : PredicateCount " );
}
public int run(final String[] args) throws Exception {
if (args.length < 1) {
this.printUsage();
return 1;
}
final Configuration conf = new Configuration();
conf.set("mapred.child.java.opts", "-server -XX:+UseParallelGC -Xmx1024m");
conf.setInt("io.file.buffer.size", 32 * 1024);
conf.setInt("mapred.linerecordreader.maxlength",10000000); // skip anything larger
Path outputPathUnsortedResults = new Path(args[1]+"-unsorted");
Path outputPath = new Path(args[1]);
final Job job = createSubmitablePredicateCountJob(
conf, new Path(args[0]),outputPathUnsortedResults);
final Job sortJob = createSubmitableSortCountJob(
conf, outputPathUnsortedResults, outputPath);
if(job.waitForCompletion(true)){
return sortJob.waitForCompletion(true) ? 0 : 1;
}else{
return 1;
}
}
public static void main(final String[] args) throws Exception {
System.exit(new PredicateCount().run(args));
}
}