1
votes

I have multiple compressed files and each compressed file contains 8 xml files of size 5-10kb. I took this data for testing purpose otherwise live data has 1000s of xml files. I wrote map only program to uncompress the compressed file

        for(FileStatus status : status_list){
            this.unzip(status.getPath().toString() , DestPath, fs);
        }

This method will create file and read uncompressed data

    FSDataOutputStream out = fs.create(new Path(filePath));
    byte[] bytesIn = new byte[BUFFER_SIZE];
    int read = 0;
    while ((read = zipIn.read(bytesIn)) != -1) {
        out.write(bytesIn, 0, read);
    }
    out.flush();
    out.close();

When mapper tried to write multiple files each of different name, hadoop returns LeaseExpiredException.

15/09/26 19:53:46 INFO mapreduce.Job: Task Id : attempt_1443265405944_0005_m_000000_0, Status : FAILED
Error: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease on /test_poc/x/injection_00001 (163rd copy).xml (inode 400006): File does not exist. [Lease.  Holder: DFSClient_attempt_1443265405944_0005_m_000000_0_-657768289_1, pendingcreates: 1]
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3431)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3236)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3074)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3034)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:723)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:492)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:969)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043)

    at org.apache.hadoop.ipc.Client.call(Client.java:1476)
    at org.apache.hadoop.ipc.Client.call(Client.java:1407)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
    at com.sun.proxy.$Proxy12.addBlock(Unknown Source)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:418)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
    at com.sun.proxy.$Proxy13.addBlock(Unknown Source)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1430)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1226)
    at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:449)

I don't know how to handle multiple compressed files without using the loop. I wrote map reduce program using MR2 API and using Hadoop 2.7.1 in psuedo distributed mode. Any pointers?

2
It's not entirely clear to me what is contained in the zipIn stream. You've said that each zip file contains multiple XML files, but your logic appears to consume zipIn completely. Can you show more of your code, with a single file and with multiple files?ThrawnCA
Are you uncompressing it in setup method of mapper?Sachin Janani

2 Answers

0
votes

Assuming that zipIn is a java.util.zip.ZipInputStream, shouldn't you be iteratively calling getNextEntry instead of reading bytes?

0
votes

I resolved this issue after doing some changes in my code. In the first part of code, I was trying to unzip all the zip files whereas I should have access the spilts. Hadoop basic, which I forgot during implementation.