因為HIPI本身提供了一個hib格式的文件,所以我想把所有的圖片文件全部合並為hib格式,使用的是HIPI自帶的example裏面的downloader例子,但是這個例子里的文件來源是網絡,而我的圖片在主機上放著,所以對源文件的map部份進行了修改,對比如下:
原文件:
代碼:
package hipi.examples.downloader;
import hipi.image.ImageHeader.ImageType;
import hipi.imagebundle.HipiImageBundle;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.net.URL;
import java.net.URLConnection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* A utility MapReduce program that takes a list of image URL's, downloads them, and creates
* a {@link hipi.imagebundle.HipiImageBundle} from them.
*
* When running this program, the user must specify 3 parameters. The first is the location
* of the list of URL's (one URL per line), the second is the output path for the HIB that will
* be generated, and the third is the number of nodes that should be used during the
* program's execution. This final parameter should be chosen with respect to the total
* bandwidth your particular cluster is able to handle. An example usage would be:
* <br /><br />
* downloader.jar /path/to/urls.txt /path/to/output.hib 10
* <br /><br />
* This program will automatically force 10 nodes to download the set of URL's contained in
* the input list, thus if your list contains 100,000 images, each node in this example will
* be responsible for downloading 10,000 images.
*
*/
public class Downloader extends Configured implements Tool{
public static class DownloaderMapper extends Mapper<IntWritable, Text, BooleanWritable, Text>
{
private static Configuration conf;
// This method is called on every node
public void setup(Context jc) throws IOException
{
conf = jc.getConfiguration();
}
public void map(IntWritable key, Text value, Context context)
throws IOException, InterruptedException
{
String temp_path = conf.get("downloader.outpath") + key.get() + ".hib.tmp";
System.out.println("Temp path: " + temp_path);
HipiImageBundle hib = new HipiImageBundle(new Path(temp_path), conf);
hib.open(HipiImageBundle.FILE_MODE_WRITE, true);
String word = value.toString();
BufferedReader reader = new BufferedReader(new StringReader(word));
String uri;
int i = key.get();
int iprev = i;
while((uri = reader.readLine()) != null)
{
if(i >= iprev+100) {
hib.close();
context.write(new BooleanWritable(true), new Text(hib.getPath().toString()));
temp_path = conf.get("downloader.outpath") + i + ".hib.tmp";
hib = new HipiImageBundle(new Path(temp_path), conf);
hib.open(HipiImageBundle.FILE_MODE_WRITE, true);
iprev = i;
}
long startT=0;
long stopT=0;
startT = System.currentTimeMillis();
try {
String type = "";
URLConnection conn;
// Attempt to download
context.progress();
try {
URL link = new URL(uri);
System.err.println("Downloading " + link.toString());
conn = link.openConnection();
conn.connect();
type = conn.getContentType();
} catch (Exception e)
{
System.err.println("Connection error to image : " + uri);
continue;
}
if (type == null)
continue;
if (type.compareTo("image/gif") == 0)
continue;
if (type != null && type.compareTo("image/jpeg") == 0)
hib.addImage(conn.getInputStream(), ImageType.JPEG_IMAGE);
} catch(Exception e)
{
e.printStackTrace();
System.err.println("Error... probably cluster downtime");
try
{
Thread.sleep(1000);
} catch (InterruptedException e1)
{
e1.printStackTrace();
}
}
i++;
// Emit success
stopT = System.currentTimeMillis();
float el = (float)(stopT-startT)/1000.0f;
System.err.println("> Took " + el + " seconds\n");
}
try
{
reader.close();
hib.close();
context.write(new BooleanWritable(true), new Text(hib.getPath().toString()));
} catch (Exception e)
{
e.printStackTrace();
}
}
}
public static class DownloaderReducer extends Reducer<BooleanWritable, Text, BooleanWritable, Text> {
private static Configuration conf;
public void setup(Context jc) throws IOException
{
conf = jc.getConfiguration();
}
public void reduce(BooleanWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException
{
if(key.get()){
FileSystem fileSystem = FileSystem.get(conf);
HipiImageBundle hib = new HipiImageBundle(new Path(conf.get("downloader.outfile")), conf);
hib.open(HipiImageBundle.FILE_MODE_WRITE, true);
for (Text temp_string : values) {
Path temp_path = new Path(temp_string.toString());
HipiImageBundle input_bundle = new HipiImageBundle(temp_path, conf);
hib.append(input_bundle);
Path index_path = input_bundle.getPath();
Path data_path = new Path(index_path.toString() + ".dat");
System.out.println("Deleting: " + data_path.toString());
fileSystem.delete(index_path, false);
fileSystem.delete(data_path, false);
context.write(new BooleanWritable(true), new Text(input_bundle.getPath().toString()));
context.progress();
}
hib.close();
}
}
}
public int run(String[] args) throws Exception
{
// Read in the configuration file
if (args.length < 3)
{
System.out.println("Usage: downloader <input file> <output file> <nodes>");
System.exit(0);
}
// Setup configuration
Configuration conf = new Configuration();
String inputFile = args[0];
String outputFile = args[1];
int nodes = Integer.parseInt(args[2]);
String outputPath = outputFile.substring(0, outputFile.lastIndexOf('/')+1);
System.out.println("Output HIB: " + outputPath);
conf.setInt("downloader.nodes", nodes);
conf.setStrings("downloader.outfile", outputFile);
conf.setStrings("downloader.outpath", outputPath);
Job job = new Job(conf, "downloader");
job.setJarByClass(Downloader.class);
job.setMapperClass(DownloaderMapper.class);
job.setReducerClass(DownloaderReducer.class);
// Set formats
job.setOutputKeyClass(BooleanWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(DownloaderInputFormat.class);
//*************** IMPORTANT ****************\\
job.setMapOutputKeyClass(BooleanWritable.class);
job.setMapOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(outputFile + "_output"));
DownloaderInputFormat.setInputPaths(job, new Path(inputFile));
job.setNumReduceTasks(1);
System.exit(job.waitForCompletion(true) ? 0 : 1);
return 0;
}
public static void createDir(String path, Configuration conf) throws IOException {
Path output_path = new Path(path);
FileSystem fs = FileSystem.get(conf);
if (!fs.exists(output_path)) {
fs.mkdirs(output_path);
}
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Downloader(), args);
System.exit(res);
}
}
我修改後的文件:
代碼:
package downloader;
import hipi.image.ImageHeader.ImageType;
import hipi.imagebundle.HipiImageBundle;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.StringReader;
import java.io.File;
//import java.net.URL;
//import java.net.URLConnection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
* A utility MapReduce program that takes a list of image URL's, downloads them, and creates
* a {@link hipi.imagebundle.HipiImageBundle} from them.
*
* When running this program, the user must specify 3 parameters. The first is the location
* of the list of URL's (one URL per line), the second is the output path for the HIB that will
* be generated, and the third is the number of nodes that should be used during the
* program's execution. This final parameter should be chosen with respect to the total
* bandwidth your particular cluster is able to handle. An example usage would be:
* <br /><br />
* downloader.jar /path/to/urls.txt /path/to/output.hib 10
* <br /><br />
* This program will automatically force 10 nodes to download the set of URL's contained in
* the input list, thus if your list contains 100,000 images, each node in this example will
* be responsible for downloading 10,000 images.
*
*/
public class Downloader extends Configured implements Tool{
public static class DownloaderMapper extends Mapper<IntWritable, Text, BooleanWritable, Text>
{
private static Configuration conf;
// This method is called on every node
public void setup(Context jc) throws IOException
{
conf = jc.getConfiguration();
}
public void map(IntWritable key, Text value, Context context)
throws IOException, InterruptedException
{
String temp_path = conf.get("downloader.outpath") + key.get() + ".hib.tmp";
System.out.println("Temp path: " + temp_path);
HipiImageBundle hib = new HipiImageBundle(new Path(temp_path), conf);
hib.open(HipiImageBundle.FILE_MODE_WRITE, true);
String word = value.toString();
BufferedReader reader = new BufferedReader(new StringReader(word));
String uri;
int i = key.get();
int iprev = i;
while((uri = reader.readLine()) != null)
{
if(i >= iprev+100) {
hib.close();
context.write(new BooleanWritable(true), new Text(hib.getPath().toString()));
System.out.println(hib.getPath().toString());
temp_path = conf.get("downloader.outpath") + i + ".hib.tmp";
hib = new HipiImageBundle(new Path(temp_path), conf);
hib.open(HipiImageBundle.FILE_MODE_WRITE, true);
iprev = i;
System.out.println(iprev);
}
long startT=0;
long stopT=0;
startT = System.currentTimeMillis();
//try {
//String type = "";
//URLConnection conn;
// Attempt to download
//context.progress();
try {
File file=new File(uri);
FileInputStream fis=new FileInputStream(file);
hib.addImage(fis, ImageType.JPEG_IMAGE);
} catch (Exception e)
{
System.err.println("Connection error to image : " + uri);
continue;
}
//if (type == null)
//continue;
//if (type.compareTo("image/gif") == 0)
//continue;
//if (type != null && type.compareTo("image/jpeg") == 0)
//hib.addImage(conn.getInputStream(), ImageType.JPEG_IMAGE);
//} catch(Exception e)
//{
//e.printStackTrace();
// System.err.println("Error... probably cluster downtime");
// try
// {
// Thread.sleep(1000);
// } catch (InterruptedException e1)
// {
// e1.printStackTrace();
//}
//}
i++;
System.out.println(i);
System.out.println(uri);
// Emit success
stopT = System.currentTimeMillis();
float el = (float)(stopT-startT)/1000.0f;
System.err.println("> Took " + el + " seconds\n");
}
try
{
reader.close();
hib.close();
context.write(new BooleanWritable(true), new Text(hib.getPath().toString()));
System.out.println(hib.getPath().toString());
} catch (Exception e)
{
e.printStackTrace();
}
}
}
public static class DownloaderReducer extends Reducer<BooleanWritable, Text, BooleanWritable, Text> {
private static Configuration conf;
public void setup(Context jc) throws IOException
{
conf = jc.getConfiguration();
}
//System.out.print(conf.get("downloader.outfile"));
public void reduce(BooleanWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException
{
System.out.println(new Path(conf.get("downloader.outfile")));
System.out.println(key.toString());
if(key.get()){
FileSystem fileSystem = FileSystem.get(conf);
HipiImageBundle hib = new HipiImageBundle(new Path(conf.get("downloader.outfile")), conf);
hib.open(HipiImageBundle.FILE_MODE_WRITE, true);
for (Text temp_string : values) {
Path temp_path = new Path(temp_string.toString());
System.out.println(temp_path);
HipiImageBundle input_bundle = new HipiImageBundle(temp_path, conf);
hib.append(input_bundle);
Path index_path = input_bundle.getPath();
Path data_path = new Path(index_path.toString() + ".dat");
System.out.println("Deleting: " + data_path.toString());
fileSystem.delete(index_path, false);
fileSystem.delete(data_path, false);
System.out.println(temp_path);
//context.write(new BooleanWritable(true), new Text(input_bundle.getPath().toString()));
//context.progress();
}
hib.close();
context.write(new BooleanWritable(true), new Text(hib.getPath().toString()));
//context.progress();
}
}
}
public int run(String[] args) throws Exception
{
// Read in the configuration file
if (args.length < 3)
{
System.out.println("Usage: downloader <input file> <output file> <nodes>");
System.exit(0);
}
// Setup configuration
Configuration conf = new Configuration();
String inputFile = args[0];
String outputFile = args[1];
int nodes = Integer.parseInt(args[2]);
String outputPath = outputFile.substring(0, outputFile.lastIndexOf('/')+1);
System.out.println("Output HIB: " + outputPath);
conf.setInt("downloader.nodes", nodes);
conf.setStrings("downloader.outfile", outputFile);
conf.setStrings("downloader.outpath", outputPath);
Job job = new Job(conf, "downloader");
job.setJarByClass(Downloader.class);
job.setMapperClass(DownloaderMapper.class);
job.setReducerClass(DownloaderReducer.class);
// Set formats
job.setOutputKeyClass(BooleanWritable.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(DownloaderInputFormat.class);
//*************** IMPORTANT ****************\\
job.setMapOutputKeyClass(BooleanWritable.class);
job.setMapOutputValueClass(Text.class);
FileOutputFormat.setOutputPath(job, new Path(outputFile + "_output"));
DownloaderInputFormat.setInputPaths(job, new Path(inputFile));
job.setNumReduceTasks(1);
System.exit(job.waitForCompletion(true) ? 0 : 1);
return 0;
}
public static void createDir(String path, Configuration conf) throws IOException {
Path output_path = new Path(path);
FileSystem fs = FileSystem.get(conf);
if (!fs.exists(output_path)) {
fs.mkdirs(output_path);
}
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Downloader(), args);
System.exit(res);
}
}
然後上機運行測試,因為這個downloader的功能是合併眾多的小文件為一個hib,在map階段每一百個輸出為一個*.hib.tmp的文件,我的文件一共有1912個,所以輸出了20個hib.tmp,但是在reduce的時候出問題了,通過測試發現,只有一個tmp文件(也就是輸入圖片文件個數小於100)的時候,reduce正常執行,但是多個tmp文件的時候,reduce就是無法運行,報數組下標越界錯誤:
代碼:
java.lang.ArrayIndexOutOfBoundsException: 1
at org.apache.hadoop.io.WritableComparator.readInt(WritableComparator.java:158)
at org.apache.hadoop.io.BooleanWritable$Comparator.compare(BooleanWritable.java:103)
at org.apache.hadoop.mapreduce.ReduceContext.nextKeyValue(ReduceContext.java:120)
at org.apache.hadoop.mapreduce.ReduceContext.nextKey(ReduceContext.java:92)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:175)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:650)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:262)
13/10/18 05:45:09 INFO mapred.JobClient: Job complete: job_local_0001
经过对错误信息的分析,我觉得是reduce里的BooleanWritable类型的key出问题了,只能reduce一次,第二次的时候就出问题,想不出来原因不知道判斷的對不對,亦或是別的地方的問題?不知道如何修改,望各位大大指教,其他信息:
(1)我在Eclipse裏面的參數設置:/home/hadoop/image/data111.txt /home/hadoop/image/output 2,分別是輸入文件位置,輸出文件位置,節點數(需要根據自己的實際配置情況來輸入,我的是在虛擬機上搭建的環境,所以一台master,兩台slave,所以填了2)
(2)我的輸入文件的格式如下:
/home/hadoop/image/1001.jpg
/home/hadoop/image/1002.jpg
/home/hadoop/image/1003.jpg
/home/hadoop/image/1004.jpg
/home/hadoop/image/1005.jpg
/home/hadoop/image/1006.jpg
........
依此類推,詳見附件