+ if self.output_parquet is True:
+ self.flush_parquet_buffer()
+ self.pq_writer.close()
+
+ else:
+ output_file.close()
+
+
+ def write_parquet_row(self, rev_data):
+ if 'deleted' in rev_data.keys():
+ rev_data['deleted'] = True if rev_data['deleted'] == "TRUE" else False
+
+ if 'minor' in rev_data.keys():
+ rev_data['minor'] = True if rev_data['minor'] == "TRUE" else False
+
+
+ if 'anon' in rev_data.keys():
+ rev_data['anon'] = True if rev_data['anon'] == "TRUE" else False
+
+
+ self.parquet_buffer.append(rev_data)
+
+ if len(self.parquet_buffer) >= self.parquet_buffer_size:
+ self.flush_parquet_buffer()
+
+ def flush_parquet_buffer(self):
+ outtable = pd.DataFrame.from_records(self.parquet_buffer)
+ outtable = pa.Table.from_pandas(outtable)
+ if self.pq_writer is None:
+ schema = outtable.schema
+ for regex_schema in self.regex_schemas:
+ schema.append(regex_schema)
+
+ self.pq_writer = pq.ParquetWriter(self.output_file, schema, flavor='spark')
+
+ self.pq_writer.write_table(outtable)
+