Google cloud dataflow is useful in processing large amounts of data but when it comes to processing the files in zip files, it is a bit tricky. By design, dataflow doesn't distribute the zip files from a bucket to multiple workers. Instead it allocates only one worker to unzip all the files and process them. This is not an optimal solution for the amount of data we need to handle.
There is an alternate way to do this:
- Put all the zip file names in a text file. Note: The following model assumes all the zip files are in GCS path.
- Give the text file name to the pipeline.
- It will go through the bucket, read the zip file names and create the absolute GCS file path and outputs them to the next in the pipeline.
- The next pipeline will take this GCS filepath and without caring about the file type, will unzip each input zip file and then hand over the contents to the next in the pipeline.
- Now instead of one worker, you have N workers to handle the zip files
The code for assigning the absolute GCS path:
import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.util.List; public class GetGCSUtilPath extends DoFn<String, String> { private static final Log log = LogFactory.getLog(GetGCSUtilPath.class); @ProcessElement public void processElement(ProcessContext c){ try { GcsUtil.GcsUtilFactory factory = new GcsUtil.GcsUtilFactory(); GcsUtil util = factory.create(c.getPipelineOptions()); String dataFolder = c.element(); log.warn("Processing data folder - " + dataFolder); List<GcsPath> gcsPaths = util.expand(GcsPath.fromUri(dataFolder)); for (GcsPath gcsp : gcsPaths) { c.output(gcsp.toString()); } } catch(Exception ex) { log.error("Unalbe to process the file " + c.element()); } } }
Each line in the text file, will be coming in as input (from the text file we gave in the first entry of the pipeline) and the output will be the absolute path to the GCS bucket.
The following code unzips each file, read it as a string, then hands over to the next one in the pipeline:
@ProcessElement public void processElement(ProcessContext c){ String p = c.element(); GcsUtil.GcsUtilFactory factory = new GcsUtil.GcsUtilFactory(); GcsUtil u = factory.create(c.getPipelineOptions()); byte[] buffer = new byte[50*1024]; ZipInputStream zis = null; ZipEntry ze = null; try{ SeekableByteChannel sek = u.open(GcsPath.fromUri(p)); InputStream is = Channels.newInputStream(sek); BufferedInputStream bis = new BufferedInputStream(is); zis = new ZipInputStream(bis); ze = zis.getNextEntry(); while(ze!=null){ StringBuilder zipContents = new StringBuilder(); log.info("Unzipping File {}" + ze.getName()); int len; while((len=zis.read(buffer))>0){ zipContents.append(new String(buffer, 0, len, "UTF-8")); } log.info("File " + p + " is opened successfully."); ze=zis.getNextEntry(); c.output(zipContents.toString()); TOTAL_JSON_FILES_COUNT.inc(); } } catch(Exception e){ log.error("An exception processing the zip file - " + c.element(), e); } finally { try { if (zis != null) { zis.closeEntry(); zis.close(); } } catch(Exception ex){ log.error("Unable to close the zip files: " + c.element(), ex); } } }Finally wire them all:Pipeline pipeline = Pipeline.create(pipelineOptions); try { pipeline.apply("Get Zip File folders", TextIO.read().from("gs://rchandra/input/listofzipfiles.txt")) .apply("Get GCS Path for each zip file", ParDo.of(new GetGCSUtilPath())) .apply("Reading Zip Files",ParDo.of(new ProcessZipFolder())) .apply("Process Json Files", ParDo.of(new ProcessJsonData())) .apply("Writing results", TextIO.write().withSuffix(".csv").withNumShards(10).to("gs://output/")); pipeline.run(); } catch(Exception ex){ log.error("Exception while running the zip files"); }The above code reads the zip file, unzips the json files in them, process them and then finally writes the output as a csv file in the given output location.A sample "listofzipfile.txt" can be like this:gs://rchandra/input/bucket1/*.zip gs://rchandra/input/bucket2/*.zip gs://rchandra/input/bucket3/*.zip gs://rchandra/input/bucket4/*.zip gs://rchandra/input/bucket5/*.zip