Multiprocessing big CSV's does not return expected amount of rows

python multiprocessing
python read large csv file in chunks
python multithreading for loop
pandas chunksize multiprocessing
multiprocessing read csv python
python parallel processing large file
read csv file using multithreading in java
pandas read csv multithreaded

I'm trying to help someone out with something. I'm by no means an expert programmer but what I'm trying to do is calculate a value from one CSV based on the year and ID from another CSV. The program works as I intended to if I statically put a smaller sample size for time and testing purposes (amount_of_reviews works with a 180mb CSV). But when I want it to work ALL the data I seem to be missing about 2000 from the expected 20245 results (one of the threads fails perhaps?). I am using multiprocessing to reduce the time the program takes to run. I will just go ahead and post all my code here and I hope maybe someone with experience can spot my mistake(s).

import csv
import os
from multiprocessing import Process, Lock, Array, Value
import datetime
from ctypes import c_char_p

print (datetime.datetime.now())

with open('D:/temp/listings.csv', encoding="utf8") as f:
    reader = csv.reader(f)
    f.seek(0)

    idSet = set()
    for row in reader:
        idSet.add(row[0])

idList = list(idSet)
idList = sorted(idList)
listings = []

def amount_of_reviews_2019(id):
    total = 0
    with open('D:/temp/reviews.csv', encoding="utf8") as f:
        reader = csv.reader(f)
        f.seek(0)
        next(reader)

        for row in reader:
            if int(row[2][:4]) >= 2019 and row[0] == id:
                total = total + 1
        return total


def calc(id):
    with open('D:/temp/listings.csv', encoding="utf8") as f:
        reader = csv.reader(f)
        f.seek(1)
        listing = []
        for row in reader:
            if row[0] == id: 
                listing.append(row[0])
                listing.append(row[48])
                listing.append(row[49])
                listing.append(amount_of_reviews_2019(id))
        listings.append(listing)
        print(len(listings))

def format_csv(data, lock):
    with lock:
        with open('D:/temp/multiprocessing.csv', 'a+', newline='', encoding="utf8") as csvfile:
            filewriter = csv.writer(csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
            print(data)
            filewriter.writerows(data)
            #for y in data:
                #filewriter.writerow([y[0], y[1], y[2], y[3]])


def do(counter, lock):
    for id in idList:
        if counter.value < len(idList): #len(idList) = 20245 #When i put lets say 15 here I get all 15 expected results
            with counter.get_lock():
                counter.value += 1 #I am aware I skip the 0 index here
                print(counter.value)
            calc(idList[counter.value])
        else:
            format_csv(listings, lock)
            break

if __name__ == '__main__':
    lock = Lock()

    print(len(idList))
    sharedCounter = Value('i', 0)

    processes = []
    for i in range(os.cpu_count()):
        print('registering process %d' % i)
        processes.append(Process(target=do, args=(sharedCounter, lock)))

    for process in processes:
        process.start()

    for process in processes:
        process.join()

    print (datetime.datetime.now())

Diagnosis

Without having given it an in-depth look, I would say there are two main culprits here, and they both go hand-in-hand:

First, there's the repeated file parsing and iteration. You iterate over every ID in the "main loop", so 20,025 times. For each ID, you then read and iterate over the entire listings file (20,051 lines), and the entire reviews file (493,816 lines). That adds up to reading a cool 10 billion 290 million 186 thousand 675 lines of CSV.

Second, there's the multiprocessing itself. I haven't given it an in-depth look, but I think it's fair to say that we can get a good idea of the problem just from the code. As we saw above, for each ID your program opens both CSV files. Having a bunch of processes which all need to write to the same two files, 20,000 times in total, can't be good for performance. I wouldn't be entirely surprised if the code ran faster without the multiprocessing than with it. There's also the potential race condition mentioned by Daniel Junglas.


Solutions
1.

Alright, it's still a mess, but I just wanted to get something out there before the turn of the century. I will keep searching for a better solution. Based on the number of listings which appear in reviews but not in listings.csv, amongst other things, the ideal solution might be slightly different.

import numpy as np
import pandas as pd

listings_df = pd.read_csv('../resources/listings.csv', header=0, usecols=['id'], dtype={'id': str})

reviews_df = pd.read_csv('../resources/reviews.csv', header=0, parse_dates=['date'], dtype={'listing_id': str})

valid_reviews = reviews_df[reviews_df['date'] >= pd.Timestamp(year=2019, month=1, day=1)]

review_id_counts = valid_reviews['listing_id'].value_counts()

counts_res: pd.DataFrame = pd.merge(listings_df, review_id_counts, left_on='id', right_index=True, how='left').rename(columns={'listing_id': 'review_count'})
counts_res['review_count'] = counts_res['review_count'].fillna(0).astype(np.int64)

counts_res.to_csv(path_or_buf='../out/listing_review_counts.csv', index=False)

Runtime is around 1s, which means I did beat my target of 5 seconds or less. Yay :)

2.

This method uses a dictionary to count reviews, and the standard csv module. Bear in mind that it will throw an error if a review is for a listing which is not in listings.csv.

import csv
import datetime

with open('../resources/listings.csv') as listings_file:
    reader = csv.DictReader(listings_file)
    listing_review_counts = dict.fromkeys((row['id'] for row in reader), 0)

cutoff_date = datetime.date(2019, 1, 1)

with open('../resources/reviews.csv') as reviews_file:
    reader = csv.DictReader(reviews_file)
    for row in reader:
        rev_date = datetime.datetime.fromisoformat(row['date']).date()
        if rev_date >= cutoff_date:
            listing_review_counts[row['listing_id']] += 1

with open('../out/listing_review_counts_2.csv', 'w', newline='') as out_file:
    writer = csv.writer(out_file)
    writer.writerow(('id', 'review_count'))
    writer.writerows(listing_review_counts.items())
3.

This method uses collections.Counter and the standard csv module.

import collections as colls
import csv
import datetime

cutoff_date = datetime.date(2019, 1, 1)

with open('../resources/reviews.csv') as reviews_file:
    reader = csv.DictReader(reviews_file)
    review_listing_counts = colls.Counter(
        (row['listing_id'] for row in reader if datetime.datetime.fromisoformat(row['date']).date() >= cutoff_date))

with open('../resources/listings.csv') as listings_file, open('../out/listing_review_counts_3.csv', 'w',
                                                              newline='') as out_file:
    reader = csv.DictReader(listings_file)
    listings_ids = (row['id'] for row in reader)

    writer = csv.writer(out_file)
    writer.writerow(('id', 'review_count'))
    writer.writerows(((curr_id, review_listing_counts[curr_id]) for curr_id in listings_ids))

Let me know if you have any questions, if I should include some explanations, etc. :)

Processing huge CSV file using Python and multithreading, I have a function that yields lines from a huge CSV file lazily: def get_next_line(): with open(sample_csv,'r') as f: for line in f: yield line def� Here is a multiprocessing version of the same snippet from above. import pandas as pd import multiprocessing as mp LARGE_FILE = "D: \\ my_large_file.txt" CHUNKSIZE = 100000 # processing 100,000 rows at a time def process_frame ( df ): # process data frame return len ( df ) if __name__ == '__main__' : reader = pd . read_table ( LARGE_FILE

This code looks like it has a race condition:

    with counter.get_lock():
        counter.value += 1 #I am aware I skip the 0 index here
        print(counter.value)
    calc(idList[counter.value])

You increment the counter while holding a lock on it, fine. However, then in idList[counter.value] you query the value of the counter outside the lock. So another thread/process may have changed the counter in the meantime. In that case you will read an unexpected value from the counter. A safe way to write your code would be this:

    value = None
    with counter.get_lock():
        counter.value += 1 #I am aware I skip the 0 index here
        value = counter
    print(value)
    calc(idList[value])

EDIT Here is a version of your code that has all race conditions removed (I believe) and also has the file I/O removed. It works correctly for me. Maybe you can add back the file I/O piece by piece and see where things go wrong

import csv
import os
from multiprocessing import Process, Lock, Array, Value
import datetime

print (datetime.datetime.now())

idSet = set(range(20245))
idList = list(idSet)
idList = sorted(idList)
listings = []

totalCounter = Value('i', 0)

def calc(id):
    listing = []
    listings.append(listing)

def format_csv(data, lock):
    with lock:
        totalCounter.value += len(data)

def do(counter, lock):
    for id in idList:
        value = None
        with counter.get_lock():
            if counter.value < len(idList):
                value = counter.value
                counter.value += 1
        if value is not None:
            calc(idList[value])
        else:
            format_csv(listings, lock)
            break

if __name__ == '__main__':
    lock = Lock()

    sharedCounter = Value('i', 0)

    processes = []
    for i in range(os.cpu_count()):
        processes.append(Process(target=do, args=(sharedCounter, lock)))

    for process in processes:
        process.start()

    for process in processes:
        process.join()

    print (datetime.datetime.now())
    print('len(idList): %d, total: %d' % (len(idList), totalCounter.value))

Process CSV files with multiprocessing in Pandas – Raheel Masood, Pandas gives you the ability to read large csv in chunks using a iterator. This way you don't have to load the full csv file into memory before you� Processing large CSV chunks unevenly with Pandas and multiprocessing - pandas_multiprocessing_uneven.py

I would suggest using pandas to read the files (thanks Alexander). and then loop through the listings and sum all reviews that have that specific id and are after 2019 :

import numpy as np
import pandas
import datetime
import time

listing_csv_filename = r'listings.csv'
reviews_csv_filename = r'reviews.csv'
start = time.time()
df_listing = pandas.read_csv(listing_csv_filename, delimiter=',', quotechar='"')
df_reviews = pandas.read_csv(reviews_csv_filename, delimiter=',', parse_dates=[1])
values = list()
valid_year = df_reviews['date'] > datetime.datetime(2019, 1, 1, 0, 0, 0)
for id_num in df_listing['id']:
    valid = (df_reviews['listing_id'] == id_num) & valid_year
    values.append((id_num, np.sum(valid)))

print(values)
print(time.time() - start)

Python Multiprocessing Example, Python Multithreading and Multiprocessing Tutorial This is why Python multithreading can provide a large speed increase. encoded as JSON) and 3 consumers for cpu bound script (looping csv files, searching decoded JSON data sent by� Pandas - speed up read_csv with multiprocessing? I might be very much on the wrong path here. But is it possible to use the multiprocessing module to speed up reading large files into a pandas data frame?

Python Multithreading Tutorial: Concurrency and Parallelism, When you work with large datasets, usually there will be a problem of slow Data.csv — Has Date, Employee Code, Efficiency (Production� Hi i have CSV Dataset which have 311030 rows and 42 columns and want to upload into table widget in pyqt4 .When i upload this dataset into the table widget by CSV.reader() the application stop working and a pop window appear which shown this words”Python stop working” so Kindly Guide me How to solve this problem.Thanks

MultiProcessing in Python to Speed up your Data Science, Pandas doesn't have multiprocessing support and it is slow with df = dd. read_csv('trainset_*.csv')CPU times: user 154 ms, sys: 58.6 ms, total:� this question asked Nov 19 '12 at 1:13 Big Dogg 659 2 7 17 3 Check out an example from the documentation of using multiprocessing.Lock to synchronize multiple processes. – John Vinyard Nov 19 '12 at 1:22 8 You could have a only single process writing results, with a Queue as input that could be fed by the other worker processes.

Are you still using Pandas for big data? | by Roman Orac, In this case we need to use the API and do a streaming insert. I am a big advocate of Pandas for handling large CSV's and thankfully there is a library that handles� 16.6.1. Introduction¶. multiprocessing is a package that supports spawning processes using an API similar to the threading module. The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.

Comments
  • 20K rows is a very small number of rows. It may not even fill the CPU's cache. The cost of cross-process communication alone should negate any benefits. Even with multithreading, the cost may be higher than any benefits
  • amount_of_reviews works with a 180mb CSV that it has to run through a lot of times. len(idList) = 20245. Currently it takes around 4h to run. 20+ without multiprocessing
  • In that case, update the question and add all the relevant information. Don't force people to read the entire code to understand what's going on. In this case though, you could just load the data into a database, index it and start crunching.
  • In general, data parallelism works by splitting the data into partitions and having a worker crunch a single partition each time. No need for locking or synchronization in this case. CSVs can be easily partitioned along lines. When all the workers finish, a single process can collect their results.
  • BTW that's how map/reduce works: one step splits the inputs and maps them to workers, another collects the results produces the final output.
  • Soory for the late reply. Thank you so much. This makes all the difference. Which method would you normally prefer to use when doing something like this? I knew my program was inefficient but you really showed me how bad it was and why. I will be trying to use numpy and pandas more in the future. Thanks again!
  • @imagine93 It really depends on the rest of the program. Pandas is great for any number of complex operations, and has no major downsides compared to the others, so I would probably go for that.
  • This makes a lot of sense. I will make this change and let it run overnight to test if this solved my initial problem. Thanks for your time
  • This change made sense but didn't help solving the issue. I still got less (18589) rows returned instead of the expected 20245 (len(idList))
  • Two things: 1. To simplify debugging you can check whether calc is invoked for each id. You don't actually have to execute the code in calc. As far as I can tell, the function will always add exactly one element to listings. If the function is not invoked for each id or if there is an invocation for which it does not add an element then you can backtrack from there. 2. The test counter.value < len(idList) should be inside the lock as well. Also this for id in idList in function do() looks a bit fishy to me. You can try while True and break if counter.value == len.
  • I have edited my answer to show what I think is a correct code that is as close as possible to your original code.
  • A few tips: you can have those two open() in a single with statement. Instead of calling readlines() and the interating, just iterate over the file object itself. It should save time and memory. There is probably no disadvantage, and even an advantage, to making all those .replace() without checking whether the substring is contained in the string. Why not just use Pandas for this? I haven’t seen the CSV yet, but unless it’s extremely screwed up read_csv() should be fine, no?
  • Thanks for that, pandas readcsv seems to be a lot more robust.