conf = spark.sparkContext.getConf()
# outfile = '/gscratch/comdata/users/nathante/test_similarities_500.feather'; min_df = None; included_subreddits=None; similarity_threshold=0;
-def author_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500, exclude_phrases=True):
+def author_cosine_similarities(outfile, min_df = None, included_subreddits=None, similarity_threshold=0, topN=500):
'''
Compute similarities between subreddits based on tfi-idf vectors of author comments
'''
print(outfile)
- print(exclude_phrases)
- tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet_test1/part-00000-107cee94-92d8-4265-b804-40f1e7f1aaf2-c000.snappy.parquet')
+ tfidf = spark.read.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet')
if included_subreddits is None:
included_subreddits = list(islice(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"),topN))
sim_dist = sim_dist.repartition(1)
sim_dist.write.parquet(str(output_parquet),mode='overwrite',compression='snappy')
- spark.stop()
+
#instead of toLocalMatrix() why not read as entries and put strait into numpy
sim_entries = pd.read_parquet(output_parquet)
df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas()
+
+ spark.stop()
df['subreddit_id_new'] = df['subreddit_id_new'] - 1
df = df.sort_values('subreddit_id_new').reset_index(drop=True)
df = df.set_index('subreddit_id_new')
return similarities
if __name__ == '__main__':
- fire.Fire(term_cosine_similarities)
+ fire.Fire(author_cosine_similarities)
#SBATCH --mem=32G
#SBATCH --cpus-per-task=4
#SBATCH --ntasks=1
+#SBATCH -D /gscratch/comdata/users/nathante/cdsc-reddit
+source ./bin/activate
module load parallel_sql
-
+echo $(which perl)
+conda list pyarrow
+which python3
#Put here commands to load other modules (e.g. matlab etc.)
#Below command means that parallel_sql will get tasks from the database
#and run them on the node (in parallel). So a 16 core node will have
#!/usr/bin/env bash
echo "#!/usr/bin/bash" > job_script.sh
-echo "source $(pwd)/../bin/activate" >> job_script.sh
+#echo "source $(pwd)/../bin/activate" >> job_script.sh
echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh
srun -p comdata -A comdata --nodes=1 --mem=120G --time=48:00:00 --pty job_script.sh
import pyarrow as pa
import pyarrow.parquet as pq
-globstr_base = "/gscratch/comdata/reddit_dumps/comments/RC_20*"
-
def parse_comment(comment, names= None):
if names is None:
names = ["id","subreddit","link_id","parent_id","created_utc","author","ups","downs","score","edited","subreddit_type","subreddit_id","stickied","is_submitter","body","error"]
# conf = sc._conf.setAll([('spark.executor.memory', '20g'), ('spark.app.name', 'extract_reddit_timeline'), ('spark.executor.cores', '26'), ('spark.cores.max', '26'), ('spark.driver.memory','84g'),('spark.driver.maxResultSize','0'),('spark.local.dir','/gscratch/comdata/spark_tmp')])
-dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments"
+dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments/"
-files = list(find_dumps(dumpdir, base_pattern="RC_20*.*"))
+files = list(find_dumps(dumpdir, base_pattern="RC_20*"))
pool = Pool(28)
stream = open_fileset(files)
-N = 100000
+N = int(1e4)
rows = pool.imap_unordered(parse_comment, stream, chunksize=int(N/28))
pa.field('error', pa.string(), nullable=True),
])
-with pq.ParquetWriter("/gscratch/comdata/output/reddit_comments.parquet_temp",schema=schema,compression='snappy',flavor='spark') as writer:
- while True:
- chunk = islice(rows,N)
- pddf = pd.DataFrame(chunk, columns=schema.names)
- table = pa.Table.from_pandas(pddf,schema=schema)
- if table.shape[0] == 0:
- break
- writer.write_table(table)
+from pathlib import Path
+p = Path("/gscratch/comdata/output/reddit_comments.parquet_temp2")
+
+if not p.is_dir():
+ if p.exists():
+ p.unlink()
+ p.mkdir()
+
+else:
+ list(map(Path.unlink,p.glob('*')))
+
+part_size = int(1e7)
+part = 1
+n_output = 0
+writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
+
+while True:
+ if n_output > part_size:
+ if part > 1:
+ writer.close()
+
+ part = part + 1
+ n_output = 0
+
+ writer = pq.ParquetWriter(f"/gscratch/comdata/output/reddit_comments.parquet_temp2/part_{part}.parquet",schema=schema,compression='snappy',flavor='spark')
+
+ n_output += N
+ chunk = islice(rows,N)
+ pddf = pd.DataFrame(chunk, columns=schema.names)
+ table = pa.Table.from_pandas(pddf,schema=schema)
+ if table.shape[0] == 0:
+ break
+ writer.write_table(table)
+
- writer.close()
spark = SparkSession.builder.getOrCreate()
-df = spark.read.parquet("/gscratch/comdata/output/reddit_comments.parquet_temp2")
+df = spark.read.parquet("/gscratch/comdata/output/reddit_comments.parquet_temp2",compression='snappy')
df = df.withColumn("subreddit_2", f.lower(f.col('subreddit')))
df = df.drop('subreddit')
df = df.repartition('subreddit')
df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
-df2.write.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet", mode='overwrite', compression='snappy')
+df2.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_subreddit.parquet_new", mode='overwrite', compression='snappy')
df = df.repartition('author')
df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
-df3.write.parquet("/gscratch/comdata/output/reddit_comments_by_author.parquet", mode='overwrite',compression='snappy')
+df3.write.parquet("/gscratch/comdata/users/nathante/reddit_comments_by_author.parquet_new", mode='overwrite',compression='snappy')
fname, ext = path.splitext(fpath)
dumpext[fname].append(ext)
- ext_priority = ['.zst','.xz','.bz2']
+ ext_priority = ['.zst','.xz','.bz2','.gz']
for base, exts in dumpext.items():
ext = [ext for ext in ext_priority if ext in exts][0]
import fire
from itertools import islice
from pathlib import Path
-from similarities_helper import build_cosine_similarities
+from similarities_helper import cosine_similarities
spark = SparkSession.builder.getOrCreate()
conf = spark.sparkContext.getConf()
sim_dist.entries.toDF().write.parquet(str(output_parquet),mode='overwrite',compression='snappy')
- spark.stop()
-
#instead of toLocalMatrix() why not read as entries and put strait into numpy
sim_entries = pd.read_parquet(output_parquet)
df = tfidf.select('subreddit','subreddit_id_new').distinct().toPandas()
+ spark.stop()
df['subreddit_id_new'] = df['subreddit_id_new'] - 1
df = df.sort_values('subreddit_id_new').reset_index(drop=True)
df = df.set_index('subreddit_id_new')
#!/usr/bin/env python3
+import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds
import pyarrow.parquet as pq
from itertools import groupby, islice, chain
import fire
from collections import Counter
-import pandas as pd
import os
import datetime
import re
# compute term frequencies for comments in each subreddit by week
def weekly_tf(partition, mwe_pass = 'first'):
dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet')
-
if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"):
os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/")
ngram_output = partition.replace("parquet","txt")
- if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
- os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
+ if mwe_pass == 'first':
+ if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"):
+ os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}")
batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author'])
pddf = pddf.loc[pddf.is_token == True, schema.names]
author_pddf = author_pddf.rename({'term':'author'}, axis='columns')
author_pddf = author_pddf.loc[:,author_schema.names]
-
table = pa.Table.from_pandas(pddf,schema=schema)
author_table = pa.Table.from_pandas(author_pddf,schema=author_schema)
- if table.shape[0] == 0:
+ do_break = True
+
+ if table.shape[0] != 0:
+ writer.write_table(table)
+ do_break = False
+ if author_table.shape[0] != 0:
+ author_writer.write_table(author_table)
+ do_break = False
+
+ if do_break:
break
- writer.write_table(table)
- author_writer.write_table(author_table)
-
+
writer.close()
author_writer.close()
from pyspark.sql import SparkSession
from similarities_helper import build_tfidf_dataset
-## TODO:need to exclude automoderator / bot posts.
-## TODO:need to exclude better handle hyperlinks.
spark = SparkSession.builder.getOrCreate()
-df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/part-00000-d61007de-9cbe-4970-857f-b9fd4b35b741-c000.snappy.parquet")
+df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp")
include_subs = set(open("/gscratch/comdata/users/nathante/cdsc-reddit/top_25000_subs_by_comments.txt"))
include_subs = {s.strip('\n') for s in include_subs}
+
+# remove [deleted] and AutoModerator (TODO remove other bots)
df = df.filter(df.author != '[deleted]')
df = df.filter(df.author != 'AutoModerator')
df = build_tfidf_dataset(df, include_subs, 'author')
-df.cache()
-
df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf_authors.parquet',mode='overwrite',compression='snappy')
+
+spark.stop()
df = build_tfidf_dataset(df, include_subs, 'term')
df.write.parquet('/gscratch/comdata/users/nathante/subreddit_tfidf.parquet',mode='overwrite',compression='snappy')
+spark.stop()