Dealing with a large gzipped file in Spark

spark read gz file
pyspark read gz file from s3
databricks read gz file
spark read gzip json
how to process zip files in spark
spark 2.0 read zip file
gunzip in spark
spark read tar gz file

I have a large (about 85 GB compressed) gzipped file from s3 that I am trying to process with Spark on AWS EMR (right now with an m4.xlarge master instance and two m4.10xlarge core instances each with a 100 GB EBS volume). I am aware that gzip is a non-splittable file format, and I've seen it suggested that one should repartition the compressed file because Spark initially gives an RDD with one partition. However, after doing

scala> val raw = spark.read.format("com.databricks.spark.csv").
     | options(Map("delimiter" -> "\\t", "codec" -> "org.apache.hadoop.io.compress.GzipCodec")).
     | load("s3://path/to/file.gz").
     | repartition(sc.defaultParallelism * 3)
raw: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [_c0: string, _c1: string ... 48 more fields
scala> raw.count()

and taking a look at the Spark application UI, I still see only one active executor (the other 14 are dead) with one task, and the job never finishes (or at least I've not waited long enough for it to).

  • What is going on here? Can someone help me understand how Spark is working in this example?
  • Should I be using a different cluster configuration?
  • Unfortunately, I have no control over the mode of compression, but is there an alternative way of dealing with such a file?

If the file format is not splittable, then there's no way to avoid reading the file in its entirety on one core. In order to parallelize work, you have to know how to assign chunks of work to different computers. In the gzip case, suppose you divide it up into 128M chunks. The nth chunk depends on the n-1-th chunk's position information to know how to decompress, which depends on the n-2-nd chunk, and so on down to the first.

If you want to parallelize, you need to make this file splittable. One way is to unzip it and process it uncompressed, or you can unzip it, split it into several files (one file for each parallel task you want), and gzip each file.

Dealing with a large gzipped file in Spark, If the file format is not splittable, then there's no way to avoid reading the file in its entirety on one core. In order to parallelize work, you have to  What are “.gzip” files. The “.gzip” is the file format of the software application which is used for the file compression and also the decompression where the word “g” in the “.gzip” files is from the “GNU” project (which comprises its own separate operating system) where the single archive file is compressed by the “GNU” zip or the “.gzip” compression algorithm respectively.

I have faced this problem and here is the solution.

Best way to approach this problem is to unzip the .gz file before our Spark batch run. Then use this unzip file, after that we can use Spark parallelism.

Code to unzip the .gz file.

import gzip
import shutil
with open('file.txt.gz', 'rb') as f_in, gzip.open('file.txt', 'wb') as f_out:
    shutil.copyfileobj(f_in, f_out)

Dealing with a large gzipped file in Spark - apache-spark - html, I have a large (about 85 GB compressed) gzipped file from s3 that I am trying to process with Spark on AWS EMR (right now with an m4.xlarge master instance  Spark job to read gzip files, ignoring corrupted files - FilterBadGzipFiles.scala

Spark can parallelize reading a single gzip file.

The best you can do split it in chunks that are gzipped.

However, Spark is really slow at reading gzip files. You can do this to speed it up:

file_names_rdd = sc.parallelize(list_of_files, 100)
lines_rdd = file_names_rdd.flatMap(lambda _: gzip.open(_).readlines())

Going through Python is twice has fast as reading the native Spark gzip reader.

Dealing with a large gzipped file, I have a large (about 85 GB compressed) gzipped file from s3 that I am trying to process with Spark on AWS EMR (right now with an m4.xlarge  Hadoop does not have support for zip files as a compression codec. While a text file in GZip, BZip2, and other supported compression formats can be configured to be automatically decompressed in Apache Spark as long as it has the right file extension, you must perform additional steps to read zip files.

Learning Spark: Lightning-Fast Big Data Analysis, While Spark's textFile() method can handle compressed input, it automatically If you find yourself needing to read in a large single-file compressed input,  // Parquet files are self-describing so the schema is preserved // The result of loading a parquet file is also a DataFrame Dataset < Row > parquetFileDF = spark. read (). parquet ("people.parquet"); // Parquet files can also be used to create a temporary view and then used in SQL statements parquetFileDF. createOrReplaceTempView ("parquetFile"); Dataset < Row > namesDF = spark. sql ("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19"); Dataset < String > namesDS = namesDF. map

[#SPARK-30251] faster way to read csv.gz?, some data providers give files in csv.gz (ie 1gb compressed which is .com/​questions/40492967/dealing-with-a-large-gzipped-file-in-spark  A library for parsing and querying XML data with Apache Spark, for Spark SQL and DataFrames. The structure and test tools are mostly copied from CSV Data Source for Spark. This package supports to process format-free XML files in a distributed way, unlike JSON datasource in Spark restricts in-line JSON format.

Issue with reading large compressed file (.zip and .gz) · Issue #116 , My use case is trying to avoid to load some data into Apache Spark and to read a large compressed file with occurrence data, in order to then be That works without a great deal of memory and disk space as it only has to  SAS 9.4 Maintenance 5 includes new support for reading and writing GZIP files directly. GZIP files, usually found with a .gz file extension, are a different format than ZIP files. Although both are forms of compressed files, a GZIP file is usually a compressed copy of a single file, whereas a ZIP file is an "archive" -- a collection of files in

Comments
  • I was under the impression that Spark is decompressing the file first before repartitioning it. Is this not the case? What are the four links that I provided talking about, then?
  • Yes, Spark is decompressing the file first in its entirety (80G on one core) before it can shuffle it to increase parallelism.
  • Okay, thank you. Do you think my cluster will even be able to handle this task? If so, if I want to decompress the whole file, repartition it, and then do further processing, do you think setting spark.dynamicAllocation.enabled=true will ensure that I get one executor (with as much memory as possible) to do the decompression and then more executors (with less memory but many cores) after to do the processing?
  • This is something you don't need to (and shouldn't) do in Spark. Just do something like zcat file | split -l 1000000 to produce many new files, then recompress each one and go from there.
  • So will this split contents of .gz files equally across the nodes if I further re-partition lines_rdd ?
  • To make it equal, you may need to repartition. Unless your source gzips are all of similar size.