import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
from itertools import groupby, islice, chain
import fire
from collections import Counter
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
from itertools import groupby, islice, chain
import fire
from collections import Counter
# compute term frequencies for comments in each subreddit by week
def weekly_tf(partition, mwe_pass = 'first'):
dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
# compute term frequencies for comments in each subreddit by week
def weekly_tf(partition, mwe_pass = 'first'):
dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"):
os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"):
os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
- if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
- os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
+ if mwe_pass == 'first':
+ if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
+ os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
pddf = pddf.loc[pddf.is_token == True, schema.names]
author_pddf = author_pddf.rename({'term':'author'}, axis='columns')
author_pddf = author_pddf.loc[:,author_schema.names]
pddf = pddf.loc[pddf.is_token == True, schema.names]
author_pddf = author_pddf.rename({'term':'author'}, axis='columns')
author_pddf = author_pddf.loc[:,author_schema.names]
table = pa.Table.from_pandas(pddf,schema=schema)
author_table = pa.Table.from_pandas(author_pddf,schema=author_schema)
table = pa.Table.from_pandas(pddf,schema=schema)
author_table = pa.Table.from_pandas(author_pddf,schema=author_schema)
- if table.shape[0] == 0:
+ do_break = True
+
+ if table.shape[0] != 0:
+ writer.write_table(table)
+ do_break = False
+ if author_table.shape[0] != 0:
+ author_writer.write_table(author_table)
+ do_break = False
+
+ if do_break: