In this post, we'll show how to read multiple CSV files in parallel with Python and Pandas. The files will be read into temporary DataFrames and loaded into a single DataFrame.
Another example of parallisation is available here: Pandas Easy Parallelization with df.iterrows() or For Loop
You can see the full code bellow and all the steps with explanation:
from multiprocessing import Pool
from zipfile import ZipFile
import pandas as pd
import tarfile
def process_archive(csv_file):
try:
df_temp = pd.read_csv(zip_file)
df_temp.to_csv('data/all_filses.csv', mode='a', header=False)
except:
print(csv_file + '\n')
zip_file = 'data/41.zip'
zip_file = ZipFile(zip_file)
zip_files = {text_file.filename for text_file in zip_file.infolist() if text_file.filename.endswith('.csv')}
p = Pool(12)
p.map(process_archive, zip_files)
Step 1: Read all files from the
First we are going to read all file names from the zip file in the iterator. Only .csv
files will be read from the archive file:
zip_file = ZipFile(zip_file)
zip_files = {text_file.filename for text_file in zip_file.infolist() if text_file.filename.endswith('.csv')}
results in:
- 41/file1.csv
- 41/file2.csv
Note: if you work with tar.gz
file than you need a change in the reading the archive and processing it:
zip_file = tarfile.open(zip_file, "r:gz")
Step 2: Read the archived files with method
We need a method which is going to be used for parallel execution. It will read the CSV files and write them to new CSV file:
def process_archive(csv_file):
try:
df_temp = pd.read_csv(zip_file)
df_temp.to_csv('data/all_filses.csv', mode='a', header=False)
except:
print(csv_file + '\n')
I've noticed that for huge amounts of small files - i.e. 100 000+ CSV files and a high number of parallel processes - then errors are raised.
Note: The arguments - mode='a', header=False
ensure that we are in appending mode and headers will be skipped.
Step 3: Process multiple CSV files in parallel
Finally we are going to perform the parallel processing. So we will pass the iterator from step 1 to the method defined in step 2. At this step we are defining the number of the parallel processes
p = Pool(12)
p.map(process_archive, zip_files)
Conclusion
The parallel processing of the CSV files speeds up the processing of the files. Another benefit of this technique is that disk space is saved.