3

I giving a tar.bz2 file ,.gz and tar.gz files as input after changing the properties in mapred-site.xml. None of the above seem to have worked. What I assumed to happen here is the records read as input by hadoop go out of sequence ie. one column of input is string and the other is an integer but while reading it from the compressed file because of some out of sequence data, at some point hadoop reads the string part as an integer and generates an illegal format exception. I'm just a noob. I want to know whether there is a problem in the configuration or my code.

The properties in core-site.xml are

<property>
  <name>io.compression.codecs</name>
   <value>org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.GzipCodec,org.apac\
he.hadoop.io.compress.SnappyCodec</value>
   <description>A list of the compression codec classes that can be used for compression/decompression.</description>
</property>

properties in mapred-site.xml are

<property>
  <name>mapred.compress.map.output</name>
  <value>true</value>
</property>

<property>
   <name>mapred.map.output.compression.codec</name>
   <value>org.apache.hadoop.io.compress.BZip2Codec</value>
</property>
<property>
<name>mapred.output.compression.type</name>
<value>BLOCK</value>
</property>

This is my Code

package org.myorg;

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.util.NativeCodeLoader;        
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.io.compress.GzipCodec;        
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.io.compress.BZip2Codec;

public class MySort{
    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable Marks = new IntWritable();
    private Text name = new Text();
        String one,two;
    int num;
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
        one=tokenizer.nextToken();
        name.set(one);
        if(tokenizer.hasMoreTokens())
            two=tokenizer.nextToken();
        num=Integer.parseInt(two);
        Marks.set(num);
        context.write(name, Marks);
        }
    }
    } 

    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable val : values) {
        sum += val.get();
        }
        context.write(key, new IntWritable(sum));
    }
    }

    public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    //  conf.set("mapreduce.job.inputformat.class", "com.wizecommerce.utils.mapred.TextInputFormat");

    //  conf.set("mapreduce.job.outputformat.class", "com.wizecommerce.utils.mapred.TextOutputFormat");
    //  conf.setBoolean("mapreduce.map.output.compress",true);
    conf.setBoolean("mapred.output.compress",true);
    //conf.setBoolean("mapreduce.output.fileoutputformat.compress",false);
    //conf.setBoolean("mapreduce.map.output.compress",true);
    conf.set("mapred.output.compression.type", "BLOCK");     
    //conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
    //      conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
    conf.setClass("mapred.map.output.compression.codec", BZip2Codec.class, CompressionCodec.class);
        Job job = new Job(conf, "mysort");
    job.setJarByClass(org.myorg.MySort.class);
    job.setJobName("mysort");
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    //  FileInputFormat.setCompressInput(job,true);
    FileOutputFormat.setCompressOutput(job, true);
    //FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    //  conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString()); 

    FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));

    job.waitForCompletion(true);
    }

}

These are all commandsput together in a makefile

run:    all
        -sudo ./a.out
        sudo chmod 777 -R Data
        -sudo rm data.tar.bz2
        sudo tar -cvjf data.tar.bz2 Data/data.txt
        sudo javac -classpath /home/hduser/12115_Select_Query/hadoop-core-1.1.2.jar -d mysort MySort.java
        sudo jar -cvf mysort.jar -C mysort/ .
        -hadoop fs -rmr MySort/output
        -hadoop fs -rmr MySort/input
        hadoop fs -mkdir MySort/input
        hadoop fs -put data.tar.bz2 MySort/input
        hadoop jar mysort.jar org.myorg.MySort MySort/input/ MySort/output
        -sudo rm /home/hduser/Out/sort.txt
        hadoop fs -copyToLocal MySort/output/part-r-00000 /home/hduser/Out/sort.txt
        sudo gedit /home/hduser/Out/sort.txt

all:    rdata.c
        -sudo rm a.out
        -gcc rdata.c -o a.out

exec:   run

.PHONY: exec run

Command:

hadoop jar mysort.jar org.myorg.MySort MySort/input/ MySort/output

Here is the output:

Java HotSpot(TM) 64-Bit Server VM warning: You have loaded library /usr/local/hadoop/lib/native/libhadoop.so.1.0.0 which might have disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c <libfile>', or link it with '-z noexecstack'.
14/06/25 11:20:28 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/06/25 11:20:28 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
14/06/25 11:20:29 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
14/06/25 11:20:29 INFO input.FileInputFormat: Total input paths to process : 1
14/06/25 11:20:29 INFO mapreduce.JobSubmitter: number of splits:1
14/06/25 11:20:29 INFO Configuration.deprecation: mapred.output.compress is deprecated. Instead, use mapreduce.output.fileoutputformat.compress
14/06/25 11:20:29 INFO Configuration.deprecation: mapred.map.output.compression.codec is deprecated. Instead, use mapreduce.map.output.compress.codec
14/06/25 11:20:29 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1403675322820_0001
14/06/25 11:20:30 INFO impl.YarnClientImpl: Submitted application application_1403675322820_0001
14/06/25 11:20:30 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1403675322820_0001/
14/06/25 11:20:30 INFO mapreduce.Job: Running job: job_1403675322820_0001
14/06/25 11:20:52 INFO mapreduce.Job: Job job_1403675322820_0001 running in uber mode : false
14/06/25 11:20:52 INFO mapreduce.Job:  map 0% reduce 0%
14/06/25 11:21:10 INFO mapreduce.Job: Task Id : attempt_1403675322820_0001_m_000000_0, Status : FAILED
Error: java.lang.NumberFormatException: For input string: "0ustar"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:580)
    at java.lang.Integer.parseInt(Integer.java:615)
    at org.myorg.MySort$Map.map(MySort.java:36)
    at org.myorg.MySort$Map.map(MySort.java:23)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

14/06/25 11:21:29 INFO mapreduce.Job: Task Id : attempt_1403675322820_0001_m_000000_1, Status : FAILED
Error: java.lang.NumberFormatException: For input string: "0ustar"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:580)
    at java.lang.Integer.parseInt(Integer.java:615)
    at org.myorg.MySort$Map.map(MySort.java:36)
    at org.myorg.MySort$Map.map(MySort.java:23)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

14/06/25 11:21:49 INFO mapreduce.Job: Task Id : attempt_1403675322820_0001_m_000000_2, Status : FAILED
Error: java.lang.NumberFormatException: For input string: "0ustar"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Integer.parseInt(Integer.java:580)
    at java.lang.Integer.parseInt(Integer.java:615)
    at org.myorg.MySort$Map.map(MySort.java:36)
    at org.myorg.MySort$Map.map(MySort.java:23)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:764)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)

14/06/25 11:22:10 INFO mapreduce.Job:  map 100% reduce 100%
14/06/25 11:22:10 INFO mapreduce.Job: Job job_1403675322820_0001 failed with state FAILED due to: Task failed task_1403675322820_0001_m_000000
Job failed as tasks failed. failedMaps:1 failedReduces:0

14/06/25 11:22:10 INFO mapreduce.Job: Counters: 9
    Job Counters 
        Failed map tasks=4
        Launched map tasks=4
        Other local map tasks=3
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=69797
        Total time spent by all reduces in occupied slots (ms)=0
        Total time spent by all map tasks (ms)=69797
        Total vcore-seconds taken by all map tasks=69797
        Total megabyte-seconds taken by all map tasks=71472128

I have also tried using this:

hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.3.0.jar   -Dmapred.output.compress=true   -Dmapred.compress.map.output=true   -Dmapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec   -Dmapred.reduce.tasks=0   -input MySort/input/data.txt   -output MySort/zip1

It is successfull in creating compressed files

hadoop fs -ls MySort/zip1

Found 3 items
-rw-r--r--   1 hduser supergroup          0 2014-06-25 10:43 MySort/zip1/_SUCCESS
-rw-r--r--   1 hduser supergroup   42488018 2014-06-25 10:43 MySort/zip1/part-00000.bz2
-rw-r--r--   1 hduser supergroup   42504084 2014-06-25 10:43 MySort/zip1/part-00001.bz2

and then running this:

hadoop jar mysort.jar org.myorg.MySort MySort/input/ MySort/zip1

It still doesn't work . Is there something that I am missing here.

It works fine when I run it without using compressed file bz2 and directly passing it the text file Data/data.txt i.e uploading it to MySort/input in hdfs (hadoop fs -put Data/data.txt MySort/input).

Any help is Appreciated

2
  • Error seems to be happening here: num=Integer.parseInt(two); Debug it.
    – Ramanan
    Commented Jun 25, 2014 at 6:49
  • @RamananR It works fine when I run it without using compressed file bz2 and directly passing it the text file Data/data.txt. There is no problem with that part of the code num=Integer.parseInt(two);. There is probably some error in configurations for passing it compressed files. Commented Jun 25, 2014 at 6:55

1 Answer 1

2

I did a work around for this. I used a Tool Runner.

package org.myorg;

import java.io.IOException;
import java.util.*;
import org.apache.hadoop.util.NativeCodeLoader;        
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.io.compress.GzipCodec;        
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class ToolMapReduce extends Configured implements Tool 
{


    public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> 
    {
        private final static IntWritable Marks = new IntWritable();
        private Text name = new Text();
        String one,two;
        int num;
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException 
        {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) 
            {
            one=tokenizer.nextToken();
            name.set(one);
            if(tokenizer.hasMoreTokens())
                two=tokenizer.nextToken();
            num=Integer.parseInt(two);
            Marks.set(num);
            context.write(name, Marks);
            }
        }
    } 

    public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> 
    {

        public void reduce(Text key, Iterable<IntWritable> values, Context context) 
        throws IOException, InterruptedException 
        {
            int sum = 0;
            for (IntWritable val : values) 
            {
            sum += val.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    public static void main(String[] args) throws Exception  
    {
        int res = ToolRunner.run(new Configuration(), new ToolMapReduce(), args);
        System.exit(res);
    }

    public int run(String[] args) throws Exception
    {   

        Configuration conf = this.getConf();
        //Configuration conf = new Configuration();
        //conf.setOutputFormat(SequenceFileOutputFormat.class); 
        //SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK); 
        //SequenceFileOutputFormat.setCompressOutput(conf, true); 
        //conf.set("mapred.output.compress","true");
        //  conf.set("mapred.output.compression","org.apache.hadoop.io.compress.SnappyCodec");

        //conf.set("mapred.output.compression.codec","org.apache.hadoop.io.compress.SnappyCodec");
        //  conf.set("mapreduce.job.inputformat.class", "com.wizecommerce.utils.mapred.TextInputFormat");

        //  conf.set("mapreduce.job.outputformat.class", "com.wizecommerce.utils.mapred.TextOutputFormat");
        //  conf.setBoolean("mapreduce.map.output.compress",true);
        conf.setBoolean("mapred.output.compress",true);
        //conf.setBoolean("mapreduce.output.fileoutputformat.compress",false);
        //conf.setBoolean("mapreduce.map.output.compress",true);
        conf.set("mapred.output.compression.type", "BLOCK");     
        //conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
        //      conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
        conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
        Job job = new Job(conf, "mysort");
        job.setJarByClass(org.myorg.ToolMapReduce.class);
        //job.setJarByClass(org.myorg.MySort.class);
        job.setJobName("mysort");
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        //  FileInputFormat.setCompressInput(job,true);
        FileOutputFormat.setCompressOutput(job, true);
        //FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        //  conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString()); 

        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
        //job.waitForCompletion(true);
    }


}

Not the answer you're looking for? Browse other questions tagged or ask your own question.