Sunday, July 21, 2019

Handling zip files in Google Cloud Dataflow

Google cloud dataflow is useful in processing large amounts of data but when it comes to processing the files in zip files, it is a bit tricky. By design, dataflow doesn't distribute the zip files from a bucket to multiple workers. Instead it allocates only one worker to unzip all the files and process them. This is not an optimal solution for the amount of data we need to handle.

There is an alternate way to do this:
  1. Put all the zip file names in a text file. Note: The following model assumes all the zip files are in GCS path.
  2. Give the text file name to the pipeline.
  3. It will go through the bucket, read the zip file names and create the absolute GCS file path and outputs them to the next in the pipeline.
  4. The next pipeline will take this GCS filepath and without caring about the file type, will unzip each input zip file and then hand over the contents to the next in the pipeline.
  5. Now instead of one worker, you have N workers to handle the zip files
The code for assigning the absolute GCS path:

import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import java.util.List; public class GetGCSUtilPath extends DoFn<String, String> { private static final Log log = LogFactory.getLog(GetGCSUtilPath.class); @ProcessElement public void processElement(ProcessContext c){ try { GcsUtil.GcsUtilFactory factory = new GcsUtil.GcsUtilFactory(); GcsUtil util = factory.create(c.getPipelineOptions()); String dataFolder = c.element(); log.warn("Processing data folder - " + dataFolder); List<GcsPath> gcsPaths = util.expand(GcsPath.fromUri(dataFolder)); for (GcsPath gcsp : gcsPaths) { c.output(gcsp.toString()); } } catch(Exception ex) { log.error("Unalbe to process the file " + c.element()); } } }


Each line in the text file, will be coming in as input (from the text file we gave in the first entry of the pipeline) and the output will be the absolute path to the GCS bucket.

The following code unzips each file, read it as a string, then hands over to the next one in the pipeline:

@ProcessElement public void processElement(ProcessContext c){ String p = c.element(); GcsUtil.GcsUtilFactory factory = new GcsUtil.GcsUtilFactory(); GcsUtil u = factory.create(c.getPipelineOptions()); byte[] buffer = new byte[50*1024]; ZipInputStream zis = null; ZipEntry ze = null; try{ SeekableByteChannel sek = u.open(GcsPath.fromUri(p)); InputStream is = Channels.newInputStream(sek); BufferedInputStream bis = new BufferedInputStream(is); zis = new ZipInputStream(bis); ze = zis.getNextEntry(); while(ze!=null){ StringBuilder zipContents = new StringBuilder(); log.info("Unzipping File {}" + ze.getName()); int len; while((len=zis.read(buffer))>0){ zipContents.append(new String(buffer, 0, len, "UTF-8")); } log.info("File " + p + " is opened successfully."); ze=zis.getNextEntry(); c.output(zipContents.toString()); TOTAL_JSON_FILES_COUNT.inc(); } } catch(Exception e){ log.error("An exception processing the zip file - " + c.element(), e); } finally { try { if (zis != null) { zis.closeEntry(); zis.close(); } } catch(Exception ex){ log.error("Unable to close the zip files: " + c.element(), ex); } } }
Finally wire them all:
Pipeline pipeline = Pipeline.create(pipelineOptions); try { pipeline.apply("Get Zip File folders", TextIO.read().from("gs://rchandra/input/listofzipfiles.txt")) .apply("Get GCS Path for each zip file", ParDo.of(new GetGCSUtilPath())) .apply("Reading Zip Files",ParDo.of(new ProcessZipFolder())) .apply("Process Json Files", ParDo.of(new ProcessJsonData())) .apply("Writing results", TextIO.write().withSuffix(".csv").withNumShards(10).to("gs://output/")); pipeline.run(); } catch(Exception ex){ log.error("Exception while running the zip files"); }
The above code reads the zip file, unzips the json files in them, process them and then finally writes the output as a csv file in the given output location.
A sample "listofzipfile.txt" can be like this:
gs://rchandra/input/bucket1/*.zip gs://rchandra/input/bucket2/*.zip gs://rchandra/input/bucket3/*.zip gs://rchandra/input/bucket4/*.zip gs://rchandra/input/bucket5/*.zip

Saturday, July 13, 2019

Adding line numbers to a large number of entries in a file

After looooong time...

I had this requirement of creating a sequence file for a Map-Reduce job.  I had a large set of files (some 1000s) in a folder and I need to create keys.txt for the Sequence file creator.

First I thought I need a painful way of writing a python module to get the list of files, add the index value as I walk through the list and then save them in a file named keys.txt

Then I found out the good old editor vi can easily do this job for me.  Here are the steps:

1) Go to the folder where you have the last set of files
2) Use ls -1a > keys.txt to get just the file names.  One caveat is this will add ., .. and keys.txt in the file list.  We need to remove them.
3) Open the keys.txt in the vi.
4) Remove those 3 lines that has those entries mentioned in 2).
5) In vi command mode, enter the following:

%s/^/\=printf('%-3d,', line('.'))

and press enter.

The above command will add the line number for each line. For example:
1,1.zip
2,2.zip
...
...
...
500000,500000.zip

And that is all folks....