Taiwan Hadoop Forum

台灣 Hadoop 技術討論區
現在的時間是 2022-07-01, 15:14

所有顯示的時間為 UTC + 8 小時




發表新文章 回覆主題  [ 4 篇文章 ] 
發表人 內容
 文章主題 : 用eclipse可順利執行,而用hadoop jar指令執行會出現錯誤
文章發表於 : 2013-11-06, 22:22 
離線

註冊時間: 2013-03-22, 20:28
文章: 26
我用了Scanner來傳回使用者給的值,在eclipse上使用Run on Hadoop執行出來是正確的結果。

但我打包成jar檔後,在ubuntu的終端機使用指令hadoop jar來執行卻會有錯誤...就是使用者輸入的值會沒有傳回到Hadoop,造成Hadoop會使用預設值0來計算。

輸入檔為:
1<tab>0.1,0.3
2<tab>0.5,0.2

如果a=2,b=3,輸出結果應該會是這樣:
1<tab>1.6,1.2
2<tab>1.5,2.1

但用指令執行會變成:
1<tab>0.4,0.0
2<tab>0.7,0.0

覺得很奇怪是scanner不能這樣用嗎?還是我哪邊寫錯了呢?
麻煩大家幫我看看好嗎? 感謝!

以下是我的程式碼:
代碼:
package wordcount;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Progressable;

import java.util.Scanner;

public class WordCount {
   static String[] Path = new String[2];
   static int count = 0;
   static int a, b;

  public static class TokenizerMapper extends Mapper<Text, Text, Text, Text>{
    public void map(Text key, Text value, Context context)
          throws IOException, InterruptedException {
       String[] values = value.toString().split(",");
        String result = new String();
        double x = Double.valueOf(values[0]);
        double y = Double.valueOf(values[1]);
        double sum1 = x + y;
        double z = y*a;
        result = sum1 +","+ z;
        context.write(key, new Text(result));
    }
  }
 
  public static class IntSumReducer extends Reducer<Text,Text,Text,Text> {
    public void reduce(Text key, Iterable<Text> values, Context context)
          throws IOException, InterruptedException {
       for (Text val : values) {
          String[] revalues = val.toString().split(",");
          String f_result = new String();
          double m = Double.valueOf(revalues[0]);
          double n = Double.valueOf(revalues[1]);
          double sum2 = m + n*2;
          double o = m*b;
          f_result = sum2 + "," + o;
          context.write(key, new Text(f_result));
          
       }

    }
  }

  public static void main(String[] args) throws Exception {

      /*System.out.println("Hi");
      for(int i=2; i<args.length; i++){
         System.out.println(args[i]);
      }

      a = Integer.valueOf(args[2]);
      b = Integer.valueOf(args[3]);*/
      
     Scanner scanner = new Scanner(System.in);//System.in用來取得使用者的輸入
     System.out.println("please input a number for a");
     a = scanner.nextInt();// 將數字取出
     System.out.println("please input a number for b");
     b = scanner.nextInt();// 將數字取出
    
      System.out.println("a: " + a + " b: " + b);
      
      
    Configuration conf = new Configuration();

    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    /*
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    */
     
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setInputFormatClass(KeyValueTextInputFormat.class);   
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);

  }
}




回頂端
 個人資料 E-mail  
 
 文章主題 : Re: 用eclipse可順利執行,而用hadoop jar指令執行會出現錯誤
文章發表於 : 2013-11-12, 17:11 
離線

註冊時間: 2009-11-09, 19:52
文章: 2897
OK, I got you.

原因:因為 a 跟 b 在全分散模式與偽分散模式下只會存在 main(),並不會傳遞給 reducer。

這跟 MapReduce 的邏輯流有關,我上課時有畫了一張圖,當運行於本機模式時(Ex. Eclipse 單機執行)
WordCount 類別中的 static a,b 跟 Reduce 中所用到的 a,b 因為在同一台,同一個 Java VM (java process)
所以輸入的 a 跟 b 可以正確傳遞進去。

但是在全分散模式與偽分散模式下,mapper 與 reducer 的程式是由 TaskTracker 產生的 Child Java Process 去執行的。
所以 JVM 不同一個,static a,b 只存在 main() 的 JVM,而在 Child 的 JVM 中,a,b 沒有初始化,應該被當成是 0 吧。

解法:使用 Configuration 物件來傳遞參數,
範例程式:https://raw.github.com/jazzwang/hadoop_labs/master/lab013/src/TokenizerMapper.java
範例跑法:http://trac.nchc.org.tw/cloud/wiki/III131019/Lab22

引言回覆:
caseSensitive = conf.getBoolean("wordcount.case.sensitive", true);


圖檔

代碼:
jazz@vmm:~/Scanner$ hadoop version
Hadoop 1.0.4
Subversion https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.0 -r 1393290
Compiled by hortonfo on Wed Oct  3 05:13:58 UTC 2012
From source with checksum fe2baea87c4c81a2c505767f3f9b71f4

jazz@vmm:~/Scanner$ ant
Buildfile: /home/jazz/Scanner/build.xml

compile:
    [mkdir] Created dir: /home/jazz/Scanner/class
    [javac] /home/jazz/Scanner/build.xml:14: warning: 'includeantruntime' was not set, defaulting to build.sysclasspath=last; set to false for repeatable builds
    [javac] Compiling 1 source file to /home/jazz/Scanner/class

doc:
    [mkdir] Created dir: /home/jazz/Scanner/doc
  [javadoc] Generating Javadoc
  [javadoc] Javadoc execution
  [javadoc] Loading source file /home/jazz/Scanner/src/WordCount.java...
  [javadoc] Constructing Javadoc information...
  [javadoc] Standard Doclet version 1.6.0_26
  [javadoc] Building tree for all the packages and classes...
  [javadoc] Building index for all the packages and classes...
  [javadoc] Building index for all classes...

jar:
      [jar] Building jar: /home/jazz/Scanner/WordCount.jar

BUILD SUCCESSFUL
Total time: 2 seconds

jazz@vmm:~/Scanner$ cat input.txt
1   0.1,0.3
2   0.5,0.2

jazz@vmm:~/Scanner$ hadoop jar WordCount.jar input.txt output
please input a number for a
2
please input a number for b
3
a: 2 b: 3
13/11/12 17:08:31 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/11/12 17:08:31 INFO input.FileInputFormat: Total input paths to process : 1
13/11/12 17:08:31 WARN snappy.LoadSnappy: Snappy native library not loaded
13/11/12 17:08:31 INFO mapred.JobClient: Running job: job_local_0001
13/11/12 17:08:31 INFO util.ProcessTree: setsid exited with exit code 0
13/11/12 17:08:31 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@2a0ab444
13/11/12 17:08:32 INFO mapred.MapTask: io.sort.mb = 100
13/11/12 17:08:32 INFO mapred.MapTask: data buffer = 79691776/99614720
13/11/12 17:08:32 INFO mapred.MapTask: record buffer = 262144/327680
13/11/12 17:08:32 INFO mapred.MapTask: Starting flush of map output
13/11/12 17:08:32 INFO mapred.MapTask: Finished spill 0
13/11/12 17:08:32 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
13/11/12 17:08:32 INFO mapred.JobClient:  map 0% reduce 0%
13/11/12 17:08:34 INFO mapred.LocalJobRunner:
13/11/12 17:08:34 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
13/11/12 17:08:34 INFO mapred.Task:  Using ResourceCalculatorPlugin : org.apache.hadoop.util.LinuxResourceCalculatorPlugin@466355dc
13/11/12 17:08:34 INFO mapred.LocalJobRunner:
13/11/12 17:08:34 INFO mapred.Merger: Merging 1 sorted segments
13/11/12 17:08:35 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 26 bytes
13/11/12 17:08:35 INFO mapred.LocalJobRunner:
13/11/12 17:08:35 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
13/11/12 17:08:35 INFO mapred.LocalJobRunner:
13/11/12 17:08:35 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
13/11/12 17:08:35 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to output
13/11/12 17:08:35 INFO mapred.JobClient:  map 100% reduce 0%
13/11/12 17:08:37 INFO mapred.LocalJobRunner: reduce > reduce
13/11/12 17:08:37 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
13/11/12 17:08:38 INFO mapred.JobClient:  map 100% reduce 100%
13/11/12 17:08:38 INFO mapred.JobClient: Job complete: job_local_0001
13/11/12 17:08:38 INFO mapred.JobClient: Counters: 20
13/11/12 17:08:38 INFO mapred.JobClient:   File Output Format Counters
13/11/12 17:08:38 INFO mapred.JobClient:     Bytes Written=62
13/11/12 17:08:38 INFO mapred.JobClient:   FileSystemCounters
13/11/12 17:08:38 INFO mapred.JobClient:     FILE_BYTES_READ=8966
13/11/12 17:08:38 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=73344
13/11/12 17:08:38 INFO mapred.JobClient:   File Input Format Counters
13/11/12 17:08:38 INFO mapred.JobClient:     Bytes Read=20
13/11/12 17:08:38 INFO mapred.JobClient:   Map-Reduce Framework
13/11/12 17:08:38 INFO mapred.JobClient:     Map output materialized bytes=30
13/11/12 17:08:38 INFO mapred.JobClient:     Map input records=2
13/11/12 17:08:38 INFO mapred.JobClient:     Reduce shuffle bytes=0
13/11/12 17:08:38 INFO mapred.JobClient:     Spilled Records=4
13/11/12 17:08:38 INFO mapred.JobClient:     Map output bytes=20
13/11/12 17:08:38 INFO mapred.JobClient:     Total committed heap usage (bytes)=514588672
13/11/12 17:08:38 INFO mapred.JobClient:     CPU time spent (ms)=0
13/11/12 17:08:38 INFO mapred.JobClient:     SPLIT_RAW_BYTES=98
13/11/12 17:08:38 INFO mapred.JobClient:     Combine input records=0
13/11/12 17:08:38 INFO mapred.JobClient:     Reduce input records=2
13/11/12 17:08:38 INFO mapred.JobClient:     Reduce input groups=2
13/11/12 17:08:38 INFO mapred.JobClient:     Combine output records=0
13/11/12 17:08:38 INFO mapred.JobClient:     Physical memory (bytes) snapshot=0
13/11/12 17:08:38 INFO mapred.JobClient:     Reduce output records=2
13/11/12 17:08:38 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=0
13/11/12 17:08:38 INFO mapred.JobClient:     Map output records=2

jazz@vmm:~/Scanner$ cat output/part-r-00000
1   1.6,1.2000000000000002
2   1.5,2.0999999999999996


切換成偽分散模式(pseudo-distributed mode)

代碼:
jazz@vmm:~/Scanner$ hadoop fs -put input.txt input.txt

jazz@vmm:~/Scanner$ hadoop fs -rmr output
Deleted hdfs://localhost:9000/user/jazz/output

jazz@vmm:~/Scanner$ hadoop jar WordCount.jar input.txt output
please input a number for a
2
please input a number for b
3
a: 2 b: 3
13/11/12 17:12:03 INFO input.FileInputFormat: Total input paths to process : 1
13/11/12 17:12:03 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/11/12 17:12:03 WARN snappy.LoadSnappy: Snappy native library not loaded
13/11/12 17:12:04 INFO mapred.JobClient: Running job: job_201311121710_0002
13/11/12 17:12:05 INFO mapred.JobClient:  map 0% reduce 0%
13/11/12 17:12:31 INFO mapred.JobClient:  map 100% reduce 100%
13/11/12 17:12:36 INFO mapred.JobClient: Job complete: job_201311121710_0002
13/11/12 17:12:36 INFO mapred.JobClient: Counters: 29
13/11/12 17:12:36 INFO mapred.JobClient:   Job Counters
13/11/12 17:12:36 INFO mapred.JobClient:     Launched reduce tasks=1
13/11/12 17:12:36 INFO mapred.JobClient:     SLOTS_MILLIS_MAPS=11817
13/11/12 17:12:36 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0
13/11/12 17:12:36 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
13/11/12 17:12:36 INFO mapred.JobClient:     Launched map tasks=1
13/11/12 17:12:36 INFO mapred.JobClient:     Data-local map tasks=1
13/11/12 17:12:36 INFO mapred.JobClient:     SLOTS_MILLIS_REDUCES=10014
13/11/12 17:12:36 INFO mapred.JobClient:   File Output Format Counters
13/11/12 17:12:36 INFO mapred.JobClient:     Bytes Written=20
13/11/12 17:12:36 INFO mapred.JobClient:   FileSystemCounters
13/11/12 17:12:36 INFO mapred.JobClient:     FILE_BYTES_READ=30
13/11/12 17:12:36 INFO mapred.JobClient:     HDFS_BYTES_READ=126
13/11/12 17:12:36 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=43463
13/11/12 17:12:36 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=20
13/11/12 17:12:36 INFO mapred.JobClient:   File Input Format Counters
13/11/12 17:12:36 INFO mapred.JobClient:     Bytes Read=20
13/11/12 17:12:36 INFO mapred.JobClient:   Map-Reduce Framework
13/11/12 17:12:36 INFO mapred.JobClient:     Map output materialized bytes=30
13/11/12 17:12:36 INFO mapred.JobClient:     Map input records=2
13/11/12 17:12:36 INFO mapred.JobClient:     Reduce shuffle bytes=0
13/11/12 17:12:36 INFO mapred.JobClient:     Spilled Records=4
13/11/12 17:12:36 INFO mapred.JobClient:     Map output bytes=20
13/11/12 17:12:36 INFO mapred.JobClient:     CPU time spent (ms)=2760
13/11/12 17:12:36 INFO mapred.JobClient:     Total committed heap usage (bytes)=401997824
13/11/12 17:12:36 INFO mapred.JobClient:     Combine input records=0
13/11/12 17:12:36 INFO mapred.JobClient:     SPLIT_RAW_BYTES=106
13/11/12 17:12:36 INFO mapred.JobClient:     Reduce input records=2
13/11/12 17:12:36 INFO mapred.JobClient:     Reduce input groups=2
13/11/12 17:12:36 INFO mapred.JobClient:     Combine output records=0
13/11/12 17:12:36 INFO mapred.JobClient:     Physical memory (bytes) snapshot=345063424
13/11/12 17:12:36 INFO mapred.JobClient:     Reduce output records=2
13/11/12 17:12:36 INFO mapred.JobClient:     Virtual memory (bytes) snapshot=1199955968
13/11/12 17:12:36 INFO mapred.JobClient:     Map output records=2

jazz@vmm:~/Scanner$ hadoop fs -cat output/part-r-00000
1   0.4,0.0
2   0.7,0.0

- Jazz


回頂端
 個人資料 E-mail  
 
 文章主題 : Re: 用eclipse可順利執行,而用hadoop jar指令執行會出現錯誤
文章發表於 : 2013-11-14, 22:52 
離線

註冊時間: 2013-03-22, 20:28
文章: 26
非常感謝jazz大!!

不過我又有一個問題了,我再增加了一個Static 的一維陣列global。
global[0]主要是儲存map做完運算後的sum1,global[1]則是z,然後再傳給reduce印出來。
結果印出來的值都是null...
感覺這個陣列似乎跟之前一樣會傳不過去reduce,目前已經知道該怎麼傳參數,可是陣列我試了好久還是不知道該怎麼辦,可否再提示一下呢? :oops:

以下是程式碼:
代碼:
package wordcount;

import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Progressable;

import java.util.Scanner;

public class WordCount {
   static String[] Path = new String[2];
   static int count = 0;
   static int a, b;
   static Double[] global = new Double[2];

  public static class TokenizerMapper extends Mapper<Text, Text, Text, Text>{
    
     public void setup(Context context)
     {
        a = context.getConfiguration().getInt("wordcount.case.sensitive",0);
        b = context.getConfiguration().getInt("wordcount.case.sensitive2",0);
     }
    
    public void map(Text key, Text value, Context context)
          throws IOException, InterruptedException {
       String[] values = value.toString().split(",");
        String result = new String();
        double x = Double.valueOf(values[0]);
        double y = Double.valueOf(values[1]);
        double sum1 = x + y;
        double z = y*a;   
       
        global[0] = sum1;
        global[1] = z;
       
        result = sum1 +","+ z;
        context.write(key, new Text(result));
    }
  }
 
  public static class IntSumReducer extends Reducer<Text,Text,Text,Text> {
    
     public void setup(Context context)
     {
        a = context.getConfiguration().getInt("wordcount.case.sensitive",0);
        b = context.getConfiguration().getInt("wordcount.case.sensitive2",0);
     }
    
    public void reduce(Text key, Iterable<Text> values, Context context)
          throws IOException, InterruptedException {
       for (Text val : values) {
          String[] revalues = val.toString().split(",");
          String f_result = new String();
          double m = Double.valueOf(revalues[0]);
          double n = Double.valueOf(revalues[1]);
          double sum2 = m + n*2;
          double o = m*b;
          f_result = sum2 + "," + o + "," + global[0] + "," + global[1];
          context.write(key, new Text(f_result));
         
       }

    }
  }
 
  public static void main(String[] args) throws Exception {
     
    Configuration conf = new Configuration();
   
    conf.getInt("wordcount.case.sensitive", 0);
    conf.getInt("wordcount.case.sensitive2", 0);
   
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    /*
    if (otherArgs.length != 2) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }
    */   
   
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setInputFormatClass(KeyValueTextInputFormat.class);   
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);

  }
}

使用的指令如下:
代碼:
/opt/hadoop/bin/hadoop jar /home/n4540/wordcount.jar -Dwordcount.case.sensitive=2 -Dwordcount.case.sensitive2=3 test2 output

輸入檔為:
1<tab>0.1,0.3
2<tab>0.5,0.2

出來的結果如下:
代碼:
1   1.6,1.2000000000000002,null,null
2   1.5,2.0999999999999996,null,null


回頂端
 個人資料 E-mail  
 
 文章主題 : Re: 用eclipse可順利執行,而用hadoop jar指令執行會出現錯誤
文章發表於 : 2013-11-15, 23:59 
離線

註冊時間: 2009-11-09, 19:52
文章: 2897
RED 寫:
非常感謝jazz大!!
不過我又有一個問題了,我再增加了一個Static 的一維陣列global。
global[0]主要是儲存map做完運算後的sum1,global[1]則是z,然後再傳給reduce印出來。
結果印出來的值都是null...
感覺這個陣列似乎跟之前一樣會傳不過去reduce,目前已經知道該怎麼傳參數,可是陣列我試了好久還是不知道該怎麼辦,可否再提示一下呢? :oops:


同樣的道理,map 跟 reduce 的 Child process 是各自獨立的 java process,
所以 map 中修改的 global 變數,只存在於 mapper 的 JVM
並不會出現在執行 reduce 的 Child JVM。

解法:
1. 有人會使用 HDFS FileSystem 的 API 產生檔案給 reduce 讀
2. 有人改用 memcache 或資料庫的方式,來達成參數傳遞的目的。

- Jazz


回頂端
 個人資料 E-mail  
 
顯示文章 :  排序  
發表新文章 回覆主題  [ 4 篇文章 ] 

所有顯示的時間為 UTC + 8 小時


誰在線上

正在瀏覽這個版面的使用者:沒有註冊會員 和 1 位訪客


不能 在這個版面發表主題
不能 在這個版面回覆主題
不能 在這個版面編輯您的文章
不能 在這個版面刪除您的文章
不能 在這個版面上傳附加檔案

搜尋:
前往 :  
cron
Powered by phpBB © 2000, 2002, 2005, 2007 phpBB Group
正體中文語系由 竹貓星球 維護製作