#!/usr/bin/env python3
+import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
from itertools import groupby, islice, chain
import fire
from collections import Counter
-import pandas as pd
import os
import datetime
import re
# compute term frequencies for comments in each subreddit by week
def weekly_tf(partition, mwe_pass = 'first'):
dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
-
if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"):
os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
ngram_output = partition.replace("parquet","txt")
- if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
- os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
+ if mwe_pass == 'first':
+ if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
+ os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
while True:
chunk = islice(outrows,outchunksize)
- chunk = (c for c in chunk if c.subreddit is not None)
+ chunk = (c for c in chunk if c[1] is not None)
pddf = pd.DataFrame(chunk, columns=["is_token"] + schema.names)
-
author_pddf = pddf.loc[pddf.is_token == False, schema.names]
pddf = pddf.loc[pddf.is_token == True, schema.names]
author_pddf = author_pddf.rename({'term':'author'}, axis='columns')
author_pddf = author_pddf.loc[:,author_schema.names]
-
table = pa.Table.from_pandas(pddf,schema=schema)
author_table = pa.Table.from_pandas(author_pddf,schema=author_schema)
- if table.shape[0] == 0:
+ do_break = True
+
+ if table.shape[0] != 0:
+ writer.write_table(table)
+ do_break = False
+ if author_table.shape[0] != 0:
+ author_writer.write_table(author_table)
+ do_break = False
+
+ if do_break:
break
- writer.write_table(table)
- author_writer.write_table(author_table)
-
+
writer.close()
author_writer.close()
with open("tf_task_list",'w') as outfile:
for f in files:
if f.endswith(".parquet"):
- outfile.write(f"python3 tf_comments.py weekly_tf --mwe-pass {mwe_pass} {f}\n")
+ outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} {f}\n")
if __name__ == "__main__":
fire.Fire({"gen_task_list":gen_task_list,