dask read_parquet with pyarrow memory blow up

pyarrow vs dask
dask read_parquet filters
dask dataframe memory usage
dask parquet
pandas read_parquet
dask repartition
pyarrow tutorial
dask read partitioned parquet

I am using dask to write and read parquet. I am writing using fastparquet engine and reading using pyarrow engine. My worker has 1 gb of memory. With fastparquet the memory usage is fine, but when i switch to pyarrow, it just blows up and causes the worker to restart. I have a reproducible example below which fails with pyarrow on a worker of 1gb memory limit. In reality my dataset is much more bigger than this. The only reason of using pyarrow is it gives me speed boost while scanning compared to fastparquet(somewhere around 7x-8x)

dask : 0.17.1

pyarrow : 0.9.0.post1

fastparquet : 0.1.3

import dask.dataframe as dd
import numpy as np
import pandas as pd

size = 9900000
tmpdir = '/tmp/test/outputParquet1'

d = {'a': np.random.normal(0, 0.3, size=size).cumsum() + 50,
    'b': np.random.choice(['A', 'B', 'C'], size=size),
    'c': np.random.choice(['D', 'E', 'F'], size=size),
    'd': np.random.normal(0, 0.4, size=size).cumsum() + 50,
    'e': np.random.normal(0, 0.5, size=size).cumsum() + 50,
    'f': np.random.normal(0, 0.6, size=size).cumsum() + 50,
    'g': np.random.normal(0, 0.7, size=size).cumsum() + 50}
df = dd.from_pandas(pd.DataFrame(d), 200)
df.to_parquet(tmpdir, compression='snappy', write_index=True, 
         engine='fastparquet')

#engine = 'pyarrow' #fails due to worker restart
engine = 'fastparquet' #works fine
df_partitioned = dd.read_parquet(tmpdir + "/*.parquet", engine=engine)
print(df_partitioned.count().compute())
df_partitioned.query("b=='A'").count().compute()

Edit: My original setup has spark jobs running that writes data parallely into partitions using fastparquet. So the metadata file is created in the innermost partition rather than the parent directory.Hence using glob paths instead of parent directory(fastparquet is much faster with parent directory read whereas pyarrow wins when scanning with glob path)

I recommend selecting the columns you need in the read_parquet call

df = dd.read_parquet('/path/to/*.parquet', engine='pyarrow', columns=['b'])

This will allow you to efficiently read only a few columns that you need rather than all of the columns at once.

[FEA] Reading Single Large Compressed parquet files into multiple , dask.dataframe.read_parquet(path, columns=None, filters=None, have to worry about a single row group blowing up memory on decompression . Enable row-group task partitioning in pyarrow-based read_parquet #5508. However, dask.to_parquet creates parquet files in hive mode ( dirs) instead of "simple". Then, dask is unable to merge multiple parquet files in this format, back into a dataframe.

Some timing results on my non memory-restricted system

With your example data

In [17]: df_partitioned = dd.read_parquet(tmpdir, engine='fastparquet')

In [18]: %timeit df_partitioned.count().compute()
2.47 s ± 114 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [19]: df_partitioned = dd.read_parquet(tmpdir, engine='pyarrow')

In [20]: %timeit df_partitioned.count().compute()
1.93 s ± 96.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

With columns b and c converted to categorical before writing

In [30]: df_partitioned = dd.read_parquet(tmpdir, engine='fastparquet')

In [31]: %timeit df_partitioned.count().compute()
1.25 s ± 83.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

In [32]: df_partitioned = dd.read_parquet(tmpdir, engine='pyarrow')

In [33]: %timeit df_partitioned.count().compute()
1.82 s ± 63 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

With fastparquet direct, single-threaded

In [36]: %timeit fastparquet.ParquetFile(tmpdir).to_pandas().count()
1.82 s ± 19 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

With 20 partitions instead of 200 (fastparquet, categories)

In [42]: %timeit df_partitioned.count().compute()
863 ms ± 78.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Questions for tag fastparquet, How to open huge parquet file using Pandas without enough RAM · Reading large dask read_parquet with pyarrow memory blow up · Decompression  Dask DataFrames¶ (Note: This tutorial is a fork of the official dask tutorial, which you can find here). In this tutorial, we will use dask.dataframe to do parallel operations on dask dataframes look and feel like Pandas dataframes but they run on the same infrastructure that powers dask.delayed.

You could also filter as you load the data.e.g by a specific column df = dd.read_parquet('/path/to/*.parquet', engine='fastparquet', filters=[(COLUMN, 'operation', 'SOME_VALUE')]).

Imagine operations like ==, >, <, and so on.

dask/dask, (in my experiments, it seems dask memory usage depends on the memory available) I was wondering that a 25 gb dataframe on disk blew up to 60 gb in memory. of fastparquet much) that it's a fix in how dask implements read_parquet globbing for fastparquet? But, are you subltly saying i should be using pyarrow? "You've solved the issue of process-level reproducibility by packaging up\r your apps and execution environments into a number of Docker containers.\r But once you have a lot of containers running, you'll probably need to\r coordinate them across a cluster of machines while keeping them healthy and\r making sure they can find each other.

[PDF] Dask.distributed Documentation, Dask.distributed is a centrally managed, distributed, dynamic task scheduler. The central Examples. >>> df = dd.read_parquet().persist() Fractions of worker memory at which we take action to avoid memory blowup. # Set any of the Avoid warnings in pyarrow and msgpack (GH#2364) Matthew Rocklin. • Avoid race  This banner text can have markup.. web; books; video; audio; software; images; Toggle navigation

Dask in Practice, Dask doesn't appear to have any notion of load balancing or pyarrow is the official implementation provided by the Apache These schedulers will do whatever they do, and even if one task blows up memory, as long as  Since scikit-learn isn't dask-aware, we can't simply call pipe.predict_proba(X). At some point, our dask.dataframe would be cast to a numpy.ndarray, and our memory would blow up. Fortunately, dask has some nice little escape hatches for dealing with functions that know how to operate on NumPy arrays, but not dask objects.

DataFrame, Dask pivot_table requires much more memory than pandas pivot_table to_parquet creating files not glob-able by read_parquet It took me longer than I would have liked to work out how to save a dask.dataframe in an Azure blob. openpyxl : None pandas_gbq : None pyarrow : None pytables : None pytest : None  This banner text can have markup.. web; books; video; audio; software; images; Toggle navigation

Comments
  • Do you get the same experience if you write with pyarrow? Note that when you write with fastparquet, you get a metadata file, so you can read from tmpdir without the glob part, and it should go faster.
  • Metadata file is an issue for me actually. The whole process works like this: partitions are written parallely using spark jobs. So the metadata file is created in the innermost partition, hence cant use parent directory to read all the data. Thats why using glob path(fastparquet is faster with parent directory load but with glob path load pyarrow wins hands down). Is there a way to read fastparquet without using metadata file in dask?
  • I tested different settings with varying partitions with the following results 1.Fastparquet is faster with parent directory path(needs metadata file though). Memory usage is also fine. Slower with glob path read. 2.Pyarrow is faster with glob path read. Parent directory read is difficult because of schema mismatch(the column order is not same, easily fixed while writing df). Does cause high memory usage
  • I do need all the columns I store. But is this a normal behaviour? My production environment has worker nodes configured with 5gb limit. Fastparquet uses at max 3gb whereas pyarrow just blows through that
  • You might try limiting the memory or number of threads of workers. I don't know how well the memory footprint of arrow has been determined. The dataframe is 1.6GB big, so actually you are using too many partitions in the example.
  • My prod data is really huge, size is about 50000000 rows with 17 columns. I cant use the parent directory to load data(mentioned above), hence have to make do with glob path read.