How to unzip the files stored in hdfs using spark java

unzip file in spark scala
hadoop unzip multiple files
read gz file in spark scala
pyspark read zip file from s3
spark compression formats
hadoop zip compression codec
pyspark zipinputstream
zip in spark
List<String> list= jsc.wholeTextFiles(hdfsPath).keys().collect();
        for (String string : list) {
        System.out.println(string);
        }

Here i am getting all the zip files.From here i am unable to proceed how to extract each file and store into hdfs path with same zipname folder

You can use like below, But only thing we need to do collect at zipFilesRdd.collect().forEach before writing the contents into hdfs. Map and flat map gives task not serializable at this point.

public void readWriteZipContents(String zipLoc,String hdfsBasePath){
    JavaSparkContext jsc = new JavaSparkContext(new SparkContext(new SparkConf()));
    JavaPairRDD<String, PortableDataStream> zipFilesRdd = jsc.binaryFiles(zipLoc);
    zipFilesRdd.collect().forEach(file -> {
        ZipInputStream zipStream = new ZipInputStream(file._2.open());
        ZipEntry zipEntry = null;
        Scanner sc = new Scanner(zipStream);
        try {
            while ((zipEntry = zipStream.getNextEntry()) != null) {
                String entryName = zipEntry.getName();
                if (!zipEntry.isDirectory()) {
                    //create the path in hdfs and write its contents
                   Configuration configuration = new Configuration();
                    configuration.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
                    configuration.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
                    FileSystem fs = FileSystem.get(URI.create("hdfs://localhost:8020"), configuration);
                    FSDataOutputStream hdfsfile = fs.create(new Path(hdfsBasePath + "/" + entryName));
                   while(sc.hasNextLine()){
                       hdfsfile.writeBytes(sc.nextLine());
                   }
                   hdfsfile.close();
                   hdfsfile.flush();
                }
                zipStream.closeEntry();
            }
        } catch (IllegalArgumentException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        sc.close();
        //return fileNames.iterator();
    });
}

Zip Files, Learn how to read data in Zip compressed files using Databricks. Hadoop does not have support for zip files as a compression codec. decompressed in Apache Spark as long as it has the right file extension, you must directory, you can invoke the Databricks %sh zip magic command to unzip the file. I have a directory in hdfs whih has many zipped files. I want to start Hadoop streaming and so need these files. How can I unzip them?

With gzip files, wholeTextFiles should gunzip everything automatically. With zip files however, the only way I know is to use binaryFiles and to unzip the data by hand.

sc
    .binaryFiles(hdfsDir)
    .mapValues(x=> { 
        var result = scala.collection.mutable.ArrayBuffer.empty[String]
        val zis = new ZipInputStream(x.open())
        var entry : ZipEntry = null
        while({entry = zis.getNextEntry();entry} != null) {
            val scanner = new Scanner(zis)
            while (sc.hasNextLine()) {result+=sc.nextLine()} 
        }
        zis.close()
        result
    }

This gives you a (pair) RDD[String, ArrayBuffer[String]] where the key is the name of the file on hdfs and the value the unzipped content of the zip file (one line per element of the ArrayBuffer). If a given zip file contains more than one file, everything is aggregated. You may adapt the code to fit your exact needs. For instance, flatMapValues instead of mapValues would flatten everything (RDD[String, String]) to take advantage of spark's parallelism.

Note also that in the while condition, "{entry = is.getNextEntry();entry} could be replaced by (entry = is.getNextEntry()) in java. In scala however the result of an affectation is Unit so this would yield an infinite loop.

How to unzip a zipped file stored in Hadoop hdfs?, I have a directory in hdfs whih has many zipped files. I want to start How to create a Hive table from sequence file stored in HDFS? There are  I have a zip file in hdfs. I want to unzip it. how to do it?

Come up with this solution written in Scala.

Tested with spark2 (version 2.3.0.cloudera2), scala (version 2.11.8)

def extractHdfsZipFile(source_zip : String, target_folder : String,
    sparksession : SparkSession) : Boolean = {

    val hdfs_config = sparksession.sparkContext.hadoopConfiguration
    val buffer = new Array[Byte](1024)

    /*
     .collect -> run on driver only, not able to serialize hdfs Configuration
    */
    val zip_files = sparksession.sparkContext.binaryFiles(source_zip).collect.
      foreach{ zip_file: (String, PortableDataStream) =>
        // iterate over zip_files
        val zip_stream : ZipInputStream = new ZipInputStream(zip_file._2.open)
        var zip_entry: ZipEntry = null

        try {
          // iterate over all ZipEntry from ZipInputStream
          while ({zip_entry = zip_stream.getNextEntry; zip_entry != null}) {
            // skip directory
            if (!zip_entry.isDirectory()) {
              println(s"Extract File: ${zip_entry.getName()}, with Size: ${zip_entry.getSize()}")
              // create new hdfs file
              val fs : FileSystem = FileSystem.get(hdfs_config)
              val hdfs_file : FSDataOutputStream = fs.create(new Path(target_folder + "/" + zip_entry.getName()))

              var len : Int = 0
              // write until zip_stream is null
              while({len = zip_stream.read(buffer); len > 0}) {
                hdfs_file.write(buffer, 0, len)
              }
              // close and flush hdfs_file
              hdfs_file.close()
              hdfs_file.flush()
            }
            zip_stream.closeEntry()
          }
          zip_stream.close()
        } catch {
          case zip : ZipException => {
            println(zip.printStackTrace)
            println("Please verify that you do not use compresstype9.")
            // for DEBUG throw exception
            //false
            throw zip
          }
          case e : Exception => {
            println(e.printStackTrace)
            // for DEBUG throw exception
            //false
            throw e
          }
        }
    }
    true
  }

Writing to HDFS in Spark/Scala reading the zip files - scala - html, Code is shown below` import java.util.zip.ZipInputStream How to extract password protected zip files using Apache Spark(Scala)?. I have written the Process Spark Streaming rdd and store to single HDFS file import org.apache.​hadoop.fs. Java Unzip File. To unzip a zip file, we need to read the zip file with ZipInputStream and then read all the ZipEntry one by one. Then use FileOutputStream to write them to file system. We also need to create the output directory if it doesn’t exists and any nested directories present in the zip file.

Zip Files, Learn how to read data in Zip compressed files using Azure Databricks. NET · Node.js · Java · Go Hadoop does not have support for zip files as a compression codec. in Apache Spark as long as it has the right file extension, you must the Azure Databricks %sh zip magic command to unzip the file. Bash solution. In my case, I did not want to pipe-unzip the files since I was not sure of their content. Instead, I wanted to make sure all files in the zip files will be put extracted on HDFS.

Using the HDFS Connector with Spark, Install Spark and its dependencies, Java and Scala, by using the code examples that follow. Download the HDFS Connector and Create Configuration Files unzip oci-hdfs.zip -d oci-hdfs cd $HOME mkdir .oci # Create or copy your API key Note that the table stores its data externally in Object Storage and the data can​  I was trying to unzip a zip file, stored in Hadoop file system, & store it back in hadoop file system. I tried following commands, but none of them worked.

org.apache.hadoop.fs.Path Scala Example, The following examples show how to use org.apache.hadoop.fs.Path. Project: spark-drools Author: reynoldsm88 File: HDFS.scala View Source Project (license​), 9 votes, vote down vote up package com.aluxian.tweeather.scripts import java.net. rootPath, "_attributes"), conf) val store = new HadoopAttributeStore(​args. This article is about how to write a utility class for extracting files and directories in a compressed zip archive, using built-in Java API. The java.util.zip package provides the following classes for extracting files and directories from a ZIP archive: Based on the path of a ZipEntry, we re-create directory structure when extracting the zip

Comments
  • i will suggest you can go for native coding with Java and do the unzip. Spark can help you to read the file using wholeTextFiles
  • java.lang.IllegalArgumentException: Wrong FS: hdfs://localhost:8020/Logs, expected: file:///
  • where you geeting this exception.? seems like you running spark in local mode. give complete stack trace. And at which line you getting the exception.
  • We are using spark library in maven, I am reading the zip file from local machine and extracting to hdfs
  • Ok, if you reading the zip file from local, then you need to give the path with "file:///" as it saying in the exception. eg, jsc.binaryFiles("file:///localLocation")
  • local file is able to read but while saving the file to hdfs getting error at FSDataOutputStream hdfsfile = FileSystem.get(jsc.hadoopConfiguration()).create(new Path(hdfsBasePath+"/"+entryName));