Taiwan Hadoop Forum

台灣 Hadoop 技術討論區
現在的時間是 2022-06-30, 03:31

所有顯示的時間為 UTC + 8 小時




發表新文章 回覆主題  [ 2 篇文章 ] 
發表人 內容
 文章主題 : 使用HIPI框架在hadoop上做圖像處理的時候出了問題,請各位大大指教
文章發表於 : 2013-10-21, 16:05 
離線

註冊時間: 2013-08-28, 23:08
文章: 11
因為HIPI本身提供了一個hib格式的文件,所以我想把所有的圖片文件全部合並為hib格式,使用的是HIPI自帶的example裏面的downloader例子,但是這個例子里的文件來源是網絡,而我的圖片在主機上放著,所以對源文件的map部份進行了修改,對比如下:
原文件:
代碼:
package hipi.examples.downloader;

import hipi.image.ImageHeader.ImageType;
import hipi.imagebundle.HipiImageBundle;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.StringReader;
import java.net.URL;
import java.net.URLConnection;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* A utility MapReduce program that takes a list of image URL's, downloads them, and creates
* a {@link hipi.imagebundle.HipiImageBundle} from them.
*
* When running this program, the user must specify 3 parameters. The first is the location
* of the list of URL's (one URL per line), the second is the output path for the HIB that will
* be generated, and the third is the number of nodes that should be used during the
* program's execution. This final parameter should be chosen with respect to the total
* bandwidth your particular cluster is able to handle. An example usage would be:
* <br /><br />
* downloader.jar /path/to/urls.txt /path/to/output.hib 10
* <br /><br />
* This program will automatically force 10 nodes to download the set of URL's contained in
* the input list, thus if your list contains 100,000 images, each node in this example will
* be responsible for downloading 10,000 images.
*
*/
public class Downloader extends Configured implements Tool{

   
   public static class DownloaderMapper extends Mapper<IntWritable, Text, BooleanWritable, Text>
   {
      private static Configuration conf;
      // This method is called on every node
      public void setup(Context jc) throws IOException
      {
         conf = jc.getConfiguration();
      }

      public void map(IntWritable key, Text value, Context context)
      throws IOException, InterruptedException
      {
         String temp_path = conf.get("downloader.outpath") + key.get() + ".hib.tmp";
         System.out.println("Temp path: " + temp_path);
         
         HipiImageBundle hib = new HipiImageBundle(new Path(temp_path), conf);
         hib.open(HipiImageBundle.FILE_MODE_WRITE, true);

         String word = value.toString();

         BufferedReader reader = new BufferedReader(new StringReader(word));
         String uri;
         int i = key.get();
         int iprev = i;
         while((uri = reader.readLine()) != null)         
         {
            if(i >= iprev+100) {
               hib.close();
               context.write(new BooleanWritable(true), new Text(hib.getPath().toString()));
               temp_path = conf.get("downloader.outpath") + i + ".hib.tmp";
               hib = new HipiImageBundle(new Path(temp_path), conf);
               hib.open(HipiImageBundle.FILE_MODE_WRITE, true);
               iprev = i;
            }
            long startT=0;
            long stopT=0;      
            startT = System.currentTimeMillis();             

            try {
               String type = "";
               URLConnection conn;
               // Attempt to download
               context.progress();

               try {
                  URL link = new URL(uri);
                  System.err.println("Downloading " + link.toString());
                  conn = link.openConnection();
                  conn.connect();
                  type = conn.getContentType();
               } catch (Exception e)
               {
                  System.err.println("Connection error to image : " + uri);
                  continue;
               }

               if (type == null)
                  continue;

               if (type.compareTo("image/gif") == 0)
                  continue;

               if (type != null && type.compareTo("image/jpeg") == 0)
                  hib.addImage(conn.getInputStream(), ImageType.JPEG_IMAGE);
               
            } catch(Exception e)
            {
               e.printStackTrace();
               System.err.println("Error... probably cluster downtime");
               try
               {
                  Thread.sleep(1000);            
               } catch (InterruptedException e1)
               {
                  e1.printStackTrace();
               }
            }

            i++;
            
            // Emit success
            stopT = System.currentTimeMillis();
            float el = (float)(stopT-startT)/1000.0f;
            System.err.println("> Took " + el + " seconds\n");            
         }


         try
         {
            reader.close();
            hib.close();
            context.write(new BooleanWritable(true), new Text(hib.getPath().toString()));
         } catch (Exception e)
         {
            e.printStackTrace();
         }

      }
   }

   public static class DownloaderReducer extends Reducer<BooleanWritable, Text, BooleanWritable, Text> {

      private static Configuration conf;      
      public void setup(Context jc) throws IOException
      {
         conf = jc.getConfiguration();
      }

      public void reduce(BooleanWritable key, Iterable<Text> values, Context context)
      throws IOException, InterruptedException
      {
         if(key.get()){
            FileSystem fileSystem = FileSystem.get(conf);
            HipiImageBundle hib = new HipiImageBundle(new Path(conf.get("downloader.outfile")), conf);
            hib.open(HipiImageBundle.FILE_MODE_WRITE, true);
            for (Text temp_string : values) {
               Path temp_path = new Path(temp_string.toString());
               HipiImageBundle input_bundle = new HipiImageBundle(temp_path, conf);
               hib.append(input_bundle);
               
               Path index_path = input_bundle.getPath();
               Path data_path = new Path(index_path.toString() + ".dat");
               System.out.println("Deleting: " + data_path.toString());
               fileSystem.delete(index_path, false);
               fileSystem.delete(data_path, false);
               
               context.write(new BooleanWritable(true), new Text(input_bundle.getPath().toString()));
               context.progress();
            }
            hib.close();
         }
      }
   }


   public int run(String[] args) throws Exception
   {   

      // Read in the configuration file
      if (args.length < 3)
      {
         System.out.println("Usage: downloader <input file> <output file> <nodes>");
         System.exit(0);
      }

      // Setup configuration
      Configuration conf = new Configuration();

      String inputFile = args[0];
      String outputFile = args[1];
      int nodes = Integer.parseInt(args[2]);

      String outputPath = outputFile.substring(0, outputFile.lastIndexOf('/')+1);
      System.out.println("Output HIB: " + outputPath);
      
      
      conf.setInt("downloader.nodes", nodes);
      conf.setStrings("downloader.outfile", outputFile);
      conf.setStrings("downloader.outpath", outputPath);

      Job job = new Job(conf, "downloader");
      job.setJarByClass(Downloader.class);
      job.setMapperClass(DownloaderMapper.class);
      job.setReducerClass(DownloaderReducer.class);

      // Set formats
      job.setOutputKeyClass(BooleanWritable.class);
      job.setOutputValueClass(Text.class);       
      job.setInputFormatClass(DownloaderInputFormat.class);

      //*************** IMPORTANT ****************\\
      job.setMapOutputKeyClass(BooleanWritable.class);
      job.setMapOutputValueClass(Text.class);
      FileOutputFormat.setOutputPath(job, new Path(outputFile + "_output"));

      DownloaderInputFormat.setInputPaths(job, new Path(inputFile));

      job.setNumReduceTasks(1);
      System.exit(job.waitForCompletion(true) ? 0 : 1);
      return 0;
   }

   public static void createDir(String path, Configuration conf) throws IOException {
      Path output_path = new Path(path);

      FileSystem fs = FileSystem.get(conf);

      if (!fs.exists(output_path)) {
         fs.mkdirs(output_path);
      }
   }

   public static void main(String[] args) throws Exception {
      int res = ToolRunner.run(new Downloader(), args);
      System.exit(res);
   }
}


我修改後的文件:
代碼:
package downloader;

import hipi.image.ImageHeader.ImageType;
import hipi.imagebundle.HipiImageBundle;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.StringReader;
import java.io.File;
//import java.net.URL;
//import java.net.URLConnection;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
* A utility MapReduce program that takes a list of image URL's, downloads them, and creates
* a {@link hipi.imagebundle.HipiImageBundle} from them.
*
* When running this program, the user must specify 3 parameters. The first is the location
* of the list of URL's (one URL per line), the second is the output path for the HIB that will
* be generated, and the third is the number of nodes that should be used during the
* program's execution. This final parameter should be chosen with respect to the total
* bandwidth your particular cluster is able to handle. An example usage would be:
* <br /><br />
* downloader.jar /path/to/urls.txt /path/to/output.hib 10
* <br /><br />
* This program will automatically force 10 nodes to download the set of URL's contained in
* the input list, thus if your list contains 100,000 images, each node in this example will
* be responsible for downloading 10,000 images.
*
*/
public class Downloader extends Configured implements Tool{

   
   public static class DownloaderMapper extends Mapper<IntWritable, Text, BooleanWritable, Text>
   {
      private static Configuration conf;
      // This method is called on every node
      public void setup(Context jc) throws IOException
      {
         conf = jc.getConfiguration();
      }

      public void map(IntWritable key, Text value, Context context)
      throws IOException, InterruptedException
      {
         String temp_path = conf.get("downloader.outpath") + key.get() + ".hib.tmp";
         System.out.println("Temp path: " + temp_path);
         
         HipiImageBundle hib = new HipiImageBundle(new Path(temp_path), conf);
         hib.open(HipiImageBundle.FILE_MODE_WRITE, true);

         String word = value.toString();

         BufferedReader reader = new BufferedReader(new StringReader(word));
         String uri;
         int i = key.get();
         int iprev = i;
         while((uri = reader.readLine()) != null)         
         {
            if(i >= iprev+100) {
               hib.close();
               context.write(new BooleanWritable(true), new Text(hib.getPath().toString()));
               System.out.println(hib.getPath().toString());
               temp_path = conf.get("downloader.outpath") + i + ".hib.tmp";
               hib = new HipiImageBundle(new Path(temp_path), conf);
               hib.open(HipiImageBundle.FILE_MODE_WRITE, true);
               iprev = i;
               System.out.println(iprev);
            }
            long startT=0;
            long stopT=0;      
            startT = System.currentTimeMillis();             

            //try {
               //String type = "";
               //URLConnection conn;
               // Attempt to download
               //context.progress();

               try {
                        File file=new File(uri);
                        FileInputStream fis=new FileInputStream(file);
                  hib.addImage(fis, ImageType.JPEG_IMAGE);
               } catch (Exception e)
               {
                  System.err.println("Connection error to image : " + uri);
                  continue;
               }

               //if (type == null)
                  //continue;

               //if (type.compareTo("image/gif") == 0)
                  //continue;

               //if (type != null && type.compareTo("image/jpeg") == 0)
                  //hib.addImage(conn.getInputStream(), ImageType.JPEG_IMAGE);
               
            //} catch(Exception e)
            //{
               //e.printStackTrace();
            //   System.err.println("Error... probably cluster downtime");
            //   try
            //   {
            //      Thread.sleep(1000);            
            //   } catch (InterruptedException e1)
            //   {
            //      e1.printStackTrace();
               //}
            //}

            i++;
            System.out.println(i);
            System.out.println(uri);
            // Emit success
            stopT = System.currentTimeMillis();
            float el = (float)(stopT-startT)/1000.0f;
            System.err.println("> Took " + el + " seconds\n");            
         }


         try
         {
            reader.close();
            hib.close();
            context.write(new BooleanWritable(true), new Text(hib.getPath().toString()));
            System.out.println(hib.getPath().toString());
         } catch (Exception e)
         {
            e.printStackTrace();
         }
         
      }
   }

   public static class DownloaderReducer extends Reducer<BooleanWritable, Text, BooleanWritable, Text> {

      private static Configuration conf;      
      public void setup(Context jc) throws IOException
      {
         conf = jc.getConfiguration();
      }

      //System.out.print(conf.get("downloader.outfile"));
      public void reduce(BooleanWritable key, Iterable<Text> values, Context context)
      throws IOException, InterruptedException
      {
         System.out.println(new Path(conf.get("downloader.outfile")));
         System.out.println(key.toString());
         if(key.get()){
            FileSystem fileSystem = FileSystem.get(conf);
            HipiImageBundle hib = new HipiImageBundle(new Path(conf.get("downloader.outfile")), conf);
            hib.open(HipiImageBundle.FILE_MODE_WRITE, true);
            for (Text temp_string : values) {
               Path temp_path = new Path(temp_string.toString());
               System.out.println(temp_path);
               HipiImageBundle input_bundle = new HipiImageBundle(temp_path, conf);
               hib.append(input_bundle);
               
               Path index_path = input_bundle.getPath();
               Path data_path = new Path(index_path.toString() + ".dat");
               System.out.println("Deleting: " + data_path.toString());
               fileSystem.delete(index_path, false);
               fileSystem.delete(data_path, false);
               
               System.out.println(temp_path);
               //context.write(new BooleanWritable(true), new Text(input_bundle.getPath().toString()));
               //context.progress();
            }
            hib.close();
            context.write(new BooleanWritable(true), new Text(hib.getPath().toString()));
            //context.progress();
         }
      }
   }


   public int run(String[] args) throws Exception
   {   

      // Read in the configuration file
      if (args.length < 3)
      {
         System.out.println("Usage: downloader <input file> <output file> <nodes>");
         System.exit(0);
      }

      // Setup configuration
      Configuration conf = new Configuration();

      String inputFile = args[0];
      String outputFile = args[1];
      int nodes = Integer.parseInt(args[2]);

      String outputPath = outputFile.substring(0, outputFile.lastIndexOf('/')+1);
      System.out.println("Output HIB: " + outputPath);
      
      
      conf.setInt("downloader.nodes", nodes);
      conf.setStrings("downloader.outfile", outputFile);
      conf.setStrings("downloader.outpath", outputPath);

      Job job = new Job(conf, "downloader");
      job.setJarByClass(Downloader.class);
      job.setMapperClass(DownloaderMapper.class);
      job.setReducerClass(DownloaderReducer.class);

      // Set formats
      job.setOutputKeyClass(BooleanWritable.class);
      job.setOutputValueClass(Text.class);       
      job.setInputFormatClass(DownloaderInputFormat.class);

      //*************** IMPORTANT ****************\\
      job.setMapOutputKeyClass(BooleanWritable.class);
      job.setMapOutputValueClass(Text.class);
      FileOutputFormat.setOutputPath(job, new Path(outputFile + "_output"));

      DownloaderInputFormat.setInputPaths(job, new Path(inputFile));

      job.setNumReduceTasks(1);
      System.exit(job.waitForCompletion(true) ? 0 : 1);
      return 0;
   }

   public static void createDir(String path, Configuration conf) throws IOException {
      Path output_path = new Path(path);

      FileSystem fs = FileSystem.get(conf);

      if (!fs.exists(output_path)) {
         fs.mkdirs(output_path);
      }
   }

   public static void main(String[] args) throws Exception {
      int res = ToolRunner.run(new Downloader(), args);
      System.exit(res);
   }
}


然後上機運行測試,因為這個downloader的功能是合併眾多的小文件為一個hib,在map階段每一百個輸出為一個*.hib.tmp的文件,我的文件一共有1912個,所以輸出了20個hib.tmp,但是在reduce的時候出問題了,通過測試發現,只有一個tmp文件(也就是輸入圖片文件個數小於100)的時候,reduce正常執行,但是多個tmp文件的時候,reduce就是無法運行,報數組下標越界錯誤:
代碼:
java.lang.ArrayIndexOutOfBoundsException: 1
   at org.apache.hadoop.io.WritableComparator.readInt(WritableComparator.java:158)
   at org.apache.hadoop.io.BooleanWritable$Comparator.compare(BooleanWritable.java:103)
   at org.apache.hadoop.mapreduce.ReduceContext.nextKeyValue(ReduceContext.java:120)
   at org.apache.hadoop.mapreduce.ReduceContext.nextKey(ReduceContext.java:92)
   at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:175)
   at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:650)
   at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:418)
   at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:262)
13/10/18 05:45:09 INFO mapred.JobClient: Job complete: job_local_0001

经过对错误信息的分析,我觉得是reduce里的BooleanWritable类型的key出问题了,只能reduce一次,第二次的时候就出问题,想不出来原因不知道判斷的對不對,亦或是別的地方的問題?不知道如何修改,望各位大大指教,其他信息:
(1)我在Eclipse裏面的參數設置:/home/hadoop/image/data111.txt /home/hadoop/image/output 2,分別是輸入文件位置,輸出文件位置,節點數(需要根據自己的實際配置情況來輸入,我的是在虛擬機上搭建的環境,所以一台master,兩台slave,所以填了2)
(2)我的輸入文件的格式如下:
/home/hadoop/image/1001.jpg
/home/hadoop/image/1002.jpg
/home/hadoop/image/1003.jpg
/home/hadoop/image/1004.jpg
/home/hadoop/image/1005.jpg
/home/hadoop/image/1006.jpg
........
依此類推,詳見附件


附加檔案:
檔案註釋: 输入文件
data111.txt [54.15 KiB]
被下載 329 次
回頂端
 個人資料 E-mail  
 
 文章主題 : Re: 使用HIPI框架在hadoop上做圖像處理的時候出了問題,請各位大大指教
文章發表於 : 2013-10-24, 22:26 
離線

註冊時間: 2013-08-28, 23:08
文章: 11
已經搞定了,確定是BooleanWritable的問題,hadoop裏面的BooleanWritable有問題,需要修改compare裏面的內容,將裏面的內容替換為:
代碼:
return compareBytes(b1, s1, l1, b2, s2, l2);

因為編譯的時候總出問題,所以我自己建了一個BooleanWritable.java文件,同時還要導入WritableComparable和WritableComparator相關的包,否則會報錯。


回頂端
 個人資料 E-mail  
 
顯示文章 :  排序  
發表新文章 回覆主題  [ 2 篇文章 ] 

所有顯示的時間為 UTC + 8 小時


誰在線上

正在瀏覽這個版面的使用者:沒有註冊會員 和 2 位訪客


不能 在這個版面發表主題
不能 在這個版面回覆主題
不能 在這個版面編輯您的文章
不能 在這個版面刪除您的文章
不能 在這個版面上傳附加檔案

搜尋:
前往 :  
cron
Powered by phpBB © 2000, 2002, 2005, 2007 phpBB Group
正體中文語系由 竹貓星球 維護製作