看书上的distributedCache可以方便的将文件放到hdfs,然后每个node可以方便的取出来文件,可是现在遇到问题,估计是放入文件和取出文件的路径有问题,导致了NullPointerException,请高手们帮忙看看“放入”和“取出”的代码应该怎么修改一下?谢谢了。(ps,MP算法不重要,不用看它实现了什么功能)
hadoop 0.20.2\伪分布式环境。
放入的文件格式如下: 1:1,1 2:1,2 3:1,3 .....
in、out、distributedCache文件的路径都是如hdfs://localhost:9000/user/root/in/1/Matrix的格式。
代码如下 package hdp_t1; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.net.URI; import java.util.Hashtable; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.Mapper.Context; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class Matrix { public static class MatrixMapper extends Mapper<LongWritable, Text, IntWritable, Text> { public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] lines=value.toString().split(":", 2); context.write(new IntWritable(Integer.parseInt(lines[0])), new Text(lines[1])); } } public static class MatrixReducer extends Reducer<IntWritable, Text, Text, IntWritable> { //record the local matrix private Hashtable<IntWritable,String> localMatrix=new Hashtable<IntWritable,String>(); public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { int sum = 0; for (Text val : values) { String line1=val.toString(); String[] lines1=line1.split(","); String line2=localMatrix.get(key); String[] lines2=line2.split(","); for(int i=0;i<lines1.length;i++) { sum+=Integer.parseInt( lines1[i] ) * Integer.parseInt( lines2[i] ); } } context.write(new Text(key.toString()), new IntWritable(sum)); } public void setup(Context context) { try { Path [] cacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration()); // URI [] cacheFiles = DistributedCache.getCacheFiles(context.getConfiguration()); if(null != cacheFiles && cacheFiles.length > 0){ String line; String[] tokens; BufferedReader br = new BufferedReader(new FileReader(cacheFiles[0].toString())); try{ while((line = br.readLine()) != null){ tokens = line.split(":", 2); localMatrix.put(new IntWritable( Integer.parseInt(tokens[0]) ), tokens[1].toString()); } }finally{ br.close(); } } } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); if (args.length != 3) { System.err.println("ERROR!!"); System.exit(2); } Job job = new Job(conf, "Matrix"); job.setJarByClass(Matrix.class); job.setMapperClass(MatrixMapper.class); job.setReducerClass(MatrixReducer.class); DistributedCache.addCacheFile(new Path(args[0]).toUri(), job.getConfiguration()); // DistributedCache.addCacheFile(new URI("hdfs://localhost:9000/user/root/in/1/Matrix"), job.getConfiguration()); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[2])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
|