]> code.communitydata.science - cdsc_reddit.git/blobdiff - tf_reddit_comments.py
TF reddit comments.
[cdsc_reddit.git] / tf_reddit_comments.py
diff --git a/tf_reddit_comments.py b/tf_reddit_comments.py
new file mode 100644 (file)
index 0000000..010b759
--- /dev/null
@@ -0,0 +1,75 @@
+import pyarrow as pa
+import pyarrow.dataset as ds
+import pyarrow.parquet as pq
+from itertools import groupby, islice, chain
+import fire
+from collections import Counter
+import pandas as pd
+import os
+import datetime
+
+# compute term frequencies for comments in each subreddit by week
+def weekly_tf(partition):
+    dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
+    batches = dataset.to_batches(columns=['CreatedAt','subreddit','body'])
+
+    schema = pa.schema([pa.field('subreddit', pa.string(), nullable=False),
+                        pa.field('term', pa.string(), nullable=False),
+                        pa.field('week', pa.date32(), nullable=False),
+                        pa.field('tf', pa.int64(), nullable=False)]
+    )
+
+    dfs = (b.to_pandas() for b in batches)
+
+    def add_week(df):
+        df['week'] = (df.CreatedAt - pd.to_timedelta(df.CreatedAt.dt.dayofweek, unit='d')).dt.date
+        return(df)
+
+    dfs = (add_week(df) for df in dfs)
+
+    def iterate_rows(dfs):
+        for df in dfs:
+            for row in df.itertuples():
+                yield row
+
+    rows = iterate_rows(dfs)
+
+    subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week))
+
+    def tf_comments(subreddit_weeks):
+        for key, posts in subreddit_weeks:
+            subreddit, week = key
+            tfs = Counter([])
+
+            for post in posts:
+                tfs.update(post.body.split())
+
+            for term, tf in tfs.items():
+                yield [subreddit, term, week, tf]
+            
+    outrows = tf_comments(subreddit_weeks)
+
+    outchunksize = 10000
+
+    with pq.ParquetWriter("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/{partition}",schema=schema,compression='snappy',flavor='spark') as writer:
+        while True:
+            chunk = islice(outrows,outchunksize)
+            pddf = pd.DataFrame(chunk, columns=schema.names)
+            table = pa.Table.from_pandas(pddf,schema=schema)
+            if table.shape[0] == 0:
+                break
+            writer.write_table(table)
+
+        writer.close()
+
+
+def gen_task_list():
+    files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/")
+    with open("tf_task_list",'w') as outfile:
+        for f in files:
+            if f.endswith(".parquet"):
+                outfile.write(f"python3 tf_reddit_comments.py weekly_tf {f}\n")
+
+if __name__ == "__main__":
+    fire.Fire({"gen_task_list":gen_task_list,
+               "weekly_tf":weekly_tf})

Community Data Science Collective || Want to submit a patch?