From: Nathan TeBlunthuis Date: Mon, 2 Nov 2020 05:50:44 +0000 (-0800) Subject: Merge branch 'master' of code:cdsc_reddit into master X-Git-Url: https://code.communitydata.science/cdsc_reddit.git/commitdiff_plain/9075a8153c6a900cce0ffb28c95da09211956ff9?hp=4c78f2c527b5734deb980cf6305f7f396be38b66 Merge branch 'master' of code:cdsc_reddit into master --- diff --git a/check_comments_shas.py b/check_comments_shas.py old mode 100644 new mode 100755 index a2bc89b..199261c --- a/check_comments_shas.py +++ b/check_comments_shas.py @@ -5,8 +5,10 @@ import requests from os import path import hashlib -shasums = requests.get("https://files.pushshift.io/reddit/comments/sha256sums.txt").text +shasums1 = requests.get("https://files.pushshift.io/reddit/comments/sha256sum.txt").text +shasums2 = requests.get("https://files.pushshift.io/reddit/comments/daily/sha256sum.txt").text +shasums = shasums1 + shasums2 dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/comments" for l in shasums.strip().split('\n'): diff --git a/checkpoint_parallelsql.sbatch b/checkpoint_parallelsql.sbatch new file mode 100644 index 0000000..1975802 --- /dev/null +++ b/checkpoint_parallelsql.sbatch @@ -0,0 +1,22 @@ +#!/bin/bash +## parallel_sql_job.sh +#SBATCH --job-name=tf_subreddit_comments +## Allocation Definition +#SBATCH --account=comdata-ckpt +#SBATCH --partition=ckpt +## Resources +## Nodes. This should always be 1 for parallel-sql. +#SBATCH --nodes=1 +## Walltime (12 hours) +#SBATCH --time=12:00:00 +## Memory per node +#SBATCH --mem=32G +#SBATCH --cpus-per-task=4 +#SBATCH --ntasks=1 +module load parallel_sql + +#Put here commands to load other modules (e.g. matlab etc.) +#Below command means that parallel_sql will get tasks from the database +#and run them on the node (in parallel). So a 16 core node will have +#16 tasks running at one time. +parallel-sql --sql -a parallel --exit-on-term --jobs 4 diff --git a/comments_2_parquet.sh b/comments_2_parquet.sh index 096fa06..e9818c1 100755 --- a/comments_2_parquet.sh +++ b/comments_2_parquet.sh @@ -1,5 +1,6 @@ -#!/usr/bin/env bash +## needs to be run by hand since i don't have a nice way of waiting on a parallel-sql job to complete +#!/usr/bin/env bash echo "#!/usr/bin/bash" > job_script.sh echo "source $(pwd)/../bin/activate" >> job_script.sh echo "python3 $(pwd)/comments_2_parquet_part1.py" >> job_script.sh diff --git a/comments_2_parquet_part2.py b/comments_2_parquet_part2.py index 7b17251..62580ac 100755 --- a/comments_2_parquet_part2.py +++ b/comments_2_parquet_part2.py @@ -26,4 +26,4 @@ df2.write.parquet("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet df = df.repartition('author') df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True) df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True) -df3.write.parquet("/gscratch/comdata/output/reddit_comments_by_author.parquet", mode='overwrite') +df3.write.parquet("/gscratch/comdata/output/reddit_comments_by_author.parquet", mode='overwrite',compression='snappy') diff --git a/helper.py b/helper.py index b401cad..af87f71 100644 --- a/helper.py +++ b/helper.py @@ -40,6 +40,8 @@ def open_input_file(input_filename): cmd = ["xzcat",'-dk', '-T 20',input_filename] elif re.match(r'.*\.zst',input_filename): cmd = ['zstd','-dck', input_filename] + elif re.match(r'.*\.gz',input_filename): + cmd = ['gzip','-dc', input_filename] try: input_file = Popen(cmd, stdout=PIPE).stdout except NameError as e: diff --git a/idf_authors.py b/idf_authors.py new file mode 100644 index 0000000..379de5a --- /dev/null +++ b/idf_authors.py @@ -0,0 +1,43 @@ +from pyspark.sql import functions as f +from pyspark.sql import SparkSession + +spark = SparkSession.builder.getOrCreate() +df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/") + +max_subreddit_week_authors = df.groupby(['subreddit','week']).max('tf') +max_subreddit_week_authors = max_subreddit_week_authors.withColumnRenamed('max(tf)','sr_week_max_tf') + +df = df.join(max_subreddit_week_authors, ['subreddit','week']) + +df = df.withColumn("relative_tf", df.tf / df.sr_week_max_tf) + +# group by term / week +idf = df.groupby(['author','week']).count() + +idf = idf.withColumnRenamed('count','idf') + +# output: term | week | df +#idf.write.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_sorted_tf.parquet_temp",mode='overwrite',compression='snappy') + +# collect the dictionary to make a pydict of terms to indexes +authors = idf.select('author').distinct() +authors = authors.withColumn('author_id',f.monotonically_increasing_id()) + + +# map terms to indexes in the tfs and the idfs +df = df.join(terms,on='author') + +idf = idf.join(terms,on='author') + +# join on subreddit/term/week to create tf/dfs indexed by term +df = df.join(idf, on=['author_id','week','author']) + +# agg terms by subreddit to make sparse tf/df vectors +df = df.withColumn("tf_idf",df.relative_tf / df.sr_week_max_tf) + +df = df.groupby(['subreddit','week']).agg(f.collect_list(f.struct('term_id','tf_idf')).alias('tfidf_maps')) + +df = df.withColumn('tfidf_vec', f.map_from_entries('tfidf_maps')) + +# output: subreddit | week | tf/df +df.write.parquet('/gscratch/comdata/users/nathante/test_tfidf_authors.parquet',mode='overwrite',compression='snappy') diff --git a/idf_comments.py b/idf_comments.py new file mode 100644 index 0000000..d29be80 --- /dev/null +++ b/idf_comments.py @@ -0,0 +1,58 @@ +from pyspark.sql import functions as f +from pyspark.sql import SparkSession + +spark = SparkSession.builder.getOrCreate() +df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp") + +max_subreddit_week_terms = df.groupby(['subreddit','week']).max('tf') +max_subreddit_week_terms = max_subreddit_week_terms.withColumnRenamed('max(tf)','sr_week_max_tf') + +df = df.join(max_subreddit_week_terms, ['subreddit','week']) + +df = df.withColumn("relative_tf", df.tf / df.sr_week_max_tf) + +# group by term / week +idf = df.groupby(['term','week']).count() + +idf = idf.withColumnRenamed('count','idf') + +# output: term | week | df +#idf.write.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_sorted_tf.parquet_temp",mode='overwrite',compression='snappy') + +# collect the dictionary to make a pydict of terms to indexes +terms = idf.select('term').distinct() +terms = terms.withColumn('term_id',f.monotonically_increasing_id()) + + +# print('collected terms') + +# terms = [t.term for t in terms] +# NTerms = len(terms) +# term_id_map = {term:i for i,term in enumerate(sorted(terms))} + +# term_id_map = spark.sparkContext.broadcast(term_id_map) + +# print('term_id_map is broadcasted') + +# def map_term(x): +# return term_id_map.value[x] + +# map_term_udf = f.udf(map_term) + +# map terms to indexes in the tfs and the idfs +df = df.join(terms,on='term') + +idf = idf.join(terms,on='term') + +# join on subreddit/term/week to create tf/dfs indexed by term +df = df.join(idf, on=['term_id','week','term']) + +# agg terms by subreddit to make sparse tf/df vectors +df = df.withColumn("tf_idf",df.relative_tf / df.sr_week_max_tf) + +df = df.groupby(['subreddit','week']).agg(f.collect_list(f.struct('term_id','tf_idf')).alias('tfidf_maps')) + +df = df.withColumn('tfidf_vec', f.map_from_entries('tfidf_maps')) + +# output: subreddit | week | tf/df +df.write.parquet('/gscratch/comdata/users/nathante/test_tfidf.parquet',mode='overwrite',compression='snappy') diff --git a/pull_pushshift_comments.sh b/pull_pushshift_comments.sh index 243e464..3f6d2c9 100755 --- a/pull_pushshift_comments.sh +++ b/pull_pushshift_comments.sh @@ -4,8 +4,11 @@ user_agent='nathante teblunthuis ' output_dir='/gscratch/comdata/raw_data/reddit_dumps/comments' base_url='https://files.pushshift.io/reddit/comments/' -wget -r --no-parent -A 'RC_20*.bz2' -U $user_agent -P $output_dir -nd -nc $base_url -wget -r --no-parent -A 'RC_20*.xz' -U $user_agent -P $output_dir -nd -nc $base_url -wget -r --no-parent -A 'RC_20*.zst' -U $user_agent -P $output_dir -nd -nc $base_url +wget -r --no-parent -A 'RC_201*.bz2' -U $user_agent -P $output_dir -nd -nc $base_url +wget -r --no-parent -A 'RC_201*.xz' -U $user_agent -P $output_dir -nd -nc $base_url +wget -r --no-parent -A 'RC_201*.zst' -U $user_agent -P $output_dir -nd -nc $base_url -./check_comment_shas.py +# starting in 2020 we use daily dumps not monthly dumps +wget -r --no-parent -A 'RC_202*.gz' -U $user_agent -P $output_dir -nd -nc $base_url/daily/ + +./check_comments_shas.py diff --git a/pull_pushshift_submissions.sh b/pull_pushshift_submissions.sh index 5a53c87..99d89be 100755 --- a/pull_pushshift_submissions.sh +++ b/pull_pushshift_submissions.sh @@ -11,4 +11,4 @@ wget -r --no-parent -A 'RS_20*.bz2' -U $user_agent -P $output_dir -nd -nc $base_ wget -r --no-parent -A 'RS_20*.xz' -U $user_agent -P $output_dir -nd -nc $base_url/old_v1_data/ wget -r --no-parent -A 'RS_20*.zst' -U $user_agent -P $output_dir -nd -nc $base_url/old_v1_data/ -./check_submissions_shas.py +./check_submission_shas.py diff --git a/run_tf_jobs.sh b/run_tf_jobs.sh new file mode 100755 index 0000000..0e7d5dd --- /dev/null +++ b/run_tf_jobs.sh @@ -0,0 +1,8 @@ +#!/usr/bin/env bash +module load parallel_sql +source ./bin/activate +python3 tf_comments.py gen_task_list +psu --del --Y +cat tf_task_list | psu --load + +for job in $(seq 1 50); do sbatch checkpoint_parallelsql.sbatch; done; diff --git a/sort_tf_comments.py b/sort_tf_comments.py new file mode 100644 index 0000000..abb097e --- /dev/null +++ b/sort_tf_comments.py @@ -0,0 +1,13 @@ +#!/usr/bin/env python3 + +from pyspark.sql import functions as f +from pyspark.sql import SparkSession + +spark = SparkSession.builder.getOrCreate() +df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/") + +df = df.repartition(2000,'term') +df = df.sort(['term','week','subreddit']) +df = df.sortWithinPartitions(['term','week','subreddit']) + +df.write.parquet("/gscratch/comdata/users/nathante/reddit_tfidf_test_sorted_tf.parquet_temp",mode='overwrite',compression='snappy') diff --git a/submissions_2_parquet.sh b/submissions_2_parquet.sh index 4ec4354..f133069 100644 --- a/submissions_2_parquet.sh +++ b/submissions_2_parquet.sh @@ -1,10 +1,8 @@ -#!/usr/bin/env bash +## this should be run manually since we don't have a nice way to wait on parallel_sql jobs -echo "!#/usr/bin/bash" > job_script.sh -echo "source $(pwd)/../bin/activate" >> job_script.sh -echo "python3 $(pwd)/submissions_2_parquet_part1.py" >> job_script.sh +#!/usr/bin/env bash -srun -p comdata -A comdata --nodes=1 --mem=120G --time=48:00:00 job_script.sh +./parse_submissions.sh start_spark_and_run.sh 1 $(pwd)/submissions_2_parquet_part2.py diff --git a/submissions_2_parquet_part1.py b/submissions_2_parquet_part1.py index 23b1200..16d1988 100755 --- a/submissions_2_parquet_part1.py +++ b/submissions_2_parquet_part1.py @@ -4,7 +4,6 @@ # 1. from gz to arrow parquet (this script) # 2. from arrow parquet to spark parquet (submissions_2_parquet_part2.py) -import json from datetime import datetime from multiprocessing import Pool from itertools import islice @@ -12,19 +11,23 @@ from helper import find_dumps, open_fileset import pandas as pd import pyarrow as pa import pyarrow.parquet as pq +import simdjson +import fire +import os +parser = simdjson.Parser() def parse_submission(post, names = None): if names is None: names = ['id','author','subreddit','title','created_utc','permalink','url','domain','score','ups','downs','over_18','has_media','selftext','retrieved_on','num_comments','gilded','edited','time_edited','subreddit_type','subreddit_id','subreddit_subscribers','name','is_self','stickied','quarantine','error'] try: - post = json.loads(post) - except (json.decoder.JSONDecodeError, UnicodeDecodeError) as e: + post = parser.parse(post) + except (ValueError) as e: # print(e) # print(post) row = [None for _ in names] - row[-1] = "json.decoder.JSONDecodeError|{0}|{1}".format(e,post) + row[-1] = "Error parsing json|{0}|{1}".format(e,post) return tuple(row) row = [] @@ -55,55 +58,61 @@ def parse_submission(post, names = None): row.append(post[name]) return tuple(row) -dumpdir = "/gscratch/comdata/raw_data/reddit_dumps/submissions" - -files = list(find_dumps(dumpdir)) - -pool = Pool(28) - -stream = open_fileset(files) - -N = 100000 - -rows = pool.imap_unordered(parse_submission, stream, chunksize=int(N/28)) - -schema = pa.schema([ - pa.field('id', pa.string(),nullable=True), - pa.field('author', pa.string(),nullable=True), - pa.field('subreddit', pa.string(),nullable=True), - pa.field('title', pa.string(),nullable=True), - pa.field('created_utc', pa.timestamp('ms'),nullable=True), - pa.field('permalink', pa.string(),nullable=True), - pa.field('url', pa.string(),nullable=True), - pa.field('domain', pa.string(),nullable=True), - pa.field('score', pa.int64(),nullable=True), - pa.field('ups', pa.int64(),nullable=True), - pa.field('downs', pa.int64(),nullable=True), - pa.field('over_18', pa.bool_(),nullable=True), - pa.field('has_media',pa.bool_(),nullable=True), - pa.field('selftext',pa.string(),nullable=True), - pa.field('retrieved_on', pa.timestamp('ms'),nullable=True), - pa.field('num_comments', pa.int64(),nullable=True), - pa.field('gilded',pa.int64(),nullable=True), - pa.field('edited',pa.bool_(),nullable=True), - pa.field('time_edited',pa.timestamp('ms'),nullable=True), - pa.field('subreddit_type',pa.string(),nullable=True), - pa.field('subreddit_id',pa.string(),nullable=True), - pa.field('subreddit_subscribers',pa.int64(),nullable=True), - pa.field('name',pa.string(),nullable=True), - pa.field('is_self',pa.bool_(),nullable=True), - pa.field('stickied',pa.bool_(),nullable=True), - pa.field('quarantine',pa.bool_(),nullable=True), - pa.field('error',pa.string(),nullable=True)]) - -with pq.ParquetWriter("/gscratch/comdata/output/reddit_submissions.parquet_temp",schema=schema,compression='snappy',flavor='spark') as writer: - while True: - chunk = islice(rows,N) - pddf = pd.DataFrame(chunk, columns=schema.names) - table = pa.Table.from_pandas(pddf,schema=schema) - if table.shape[0] == 0: - break - writer.write_table(table) - - writer.close() - +def parse_dump(partition): + + N=10000 + stream = open_fileset([f"/gscratch/comdata/raw_data/reddit_dumps/submissions/{partition}"]) + rows = map(parse_submission,stream) + schema = pa.schema([ + pa.field('id', pa.string(),nullable=True), + pa.field('author', pa.string(),nullable=True), + pa.field('subreddit', pa.string(),nullable=True), + pa.field('title', pa.string(),nullable=True), + pa.field('created_utc', pa.timestamp('ms'),nullable=True), + pa.field('permalink', pa.string(),nullable=True), + pa.field('url', pa.string(),nullable=True), + pa.field('domain', pa.string(),nullable=True), + pa.field('score', pa.int64(),nullable=True), + pa.field('ups', pa.int64(),nullable=True), + pa.field('downs', pa.int64(),nullable=True), + pa.field('over_18', pa.bool_(),nullable=True), + pa.field('has_media',pa.bool_(),nullable=True), + pa.field('selftext',pa.string(),nullable=True), + pa.field('retrieved_on', pa.timestamp('ms'),nullable=True), + pa.field('num_comments', pa.int64(),nullable=True), + pa.field('gilded',pa.int64(),nullable=True), + pa.field('edited',pa.bool_(),nullable=True), + pa.field('time_edited',pa.timestamp('ms'),nullable=True), + pa.field('subreddit_type',pa.string(),nullable=True), + pa.field('subreddit_id',pa.string(),nullable=True), + pa.field('subreddit_subscribers',pa.int64(),nullable=True), + pa.field('name',pa.string(),nullable=True), + pa.field('is_self',pa.bool_(),nullable=True), + pa.field('stickied',pa.bool_(),nullable=True), + pa.field('quarantine',pa.bool_(),nullable=True), + pa.field('error',pa.string(),nullable=True)]) + + if not os.path.exists("/gscratch/comdata/output/temp/reddit_submissions.parquet/"): + os.mkdir("/gscratch/comdata/output/temp/reddit_submissions.parquet/") + + with pq.ParquetWriter(f"/gscratch/comdata/output/temp/reddit_submissions.parquet/{partition}",schema=schema,compression='snappy',flavor='spark') as writer: + while True: + chunk = islice(rows,N) + pddf = pd.DataFrame(chunk, columns=schema.names) + table = pa.Table.from_pandas(pddf,schema=schema) + if table.shape[0] == 0: + break + writer.write_table(table) + + writer.close() + +def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/submissions"): + files = list(find_dumps(dumpdir,base_pattern="RS_20*.*")) + with open("parse_submissions_task_list",'w') as of: + for fpath in files: + partition = os.path.split(fpath)[1] + of.write(f'python3 submissions_2_parquet_part1.py parse_dump {partition}\n') + +if __name__ == "__main__": + fire.Fire({'parse_dump':parse_dump, + 'gen_task_list':gen_task_list}) diff --git a/submissions_2_parquet_part2.py b/submissions_2_parquet_part2.py index b88764b..3a58617 100644 --- a/submissions_2_parquet_part2.py +++ b/submissions_2_parquet_part2.py @@ -17,7 +17,7 @@ conf = conf.set('spark.sql.crossJoin.enabled',"true") conf = conf.set('spark.debug.maxToStringFields',200) sqlContext = pyspark.SQLContext(sc) -df = spark.read.parquet("/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet") +df = spark.read.parquet("/gscratch/comdata/output/temp/reddit_submissions.parquet/") df = df.withColumn("subreddit_2", f.lower(f.col('subreddit'))) df = df.drop('subreddit') @@ -32,13 +32,11 @@ df = df.withColumn("subreddit_hash",f.sha2(f.col("subreddit"), 256)[0:3]) df = df.repartition("subreddit") df2 = df.sort(["subreddit","CreatedAt","id"],ascending=True) df2 = df.sortWithinPartitions(["subreddit","CreatedAt","id"],ascending=True) -df2.write.parquet("/gscratch/comdata/output/reddit_submissions_by_subreddit.parquet2", mode='overwrite',compression='snappy') +df2.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_subreddit.parquet2", mode='overwrite',compression='snappy') # # we also want to have parquet files sorted by author then reddit. df = df.repartition("author") df3 = df.sort(["author","CreatedAt","id"],ascending=True) df3 = df.sortWithinPartitions(["author","CreatedAt","id"],ascending=True) -df3.write.parquet("/gscratch/comdata/output/reddit_submissions_by_author.parquet2", mode='overwrite',compression='snappy') - -os.remove("/gscratch/comdata/output/reddit_submissions.parquet_temp") +df3.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_author.parquet2", mode='overwrite',compression='snappy') diff --git a/tf_comments.py b/tf_comments.py new file mode 100755 index 0000000..cb3b628 --- /dev/null +++ b/tf_comments.py @@ -0,0 +1,191 @@ +#!/usr/bin/env python3 +import pyarrow as pa +import pyarrow.dataset as ds +import pyarrow.parquet as pq +from itertools import groupby, islice, chain +import fire +from collections import Counter +import pandas as pd +import os +import datetime +import re +from nltk import wordpunct_tokenize, MWETokenizer, sent_tokenize +from nltk.corpus import stopwords +from nltk.util import ngrams +import string +from random import random + +# remove urls +# taken from https://stackoverflow.com/questions/3809401/what-is-a-good-regular-expression-to-match-a-url +urlregex = re.compile(r"[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)") + +# compute term frequencies for comments in each subreddit by week +def weekly_tf(partition, mwe_pass = 'first'): + dataset = ds.dataset(f'/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/{partition}', format='parquet') + + if not os.path.exists("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/"): + os.mkdir("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/") + + if not os.path.exists("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/"): + os.mkdir("/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/") + + ngram_output = partition.replace("parquet","txt") + + if os.path.exists(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}"): + os.remove(f"/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}") + + batches = dataset.to_batches(columns=['CreatedAt','subreddit','body','author']) + + + schema = pa.schema([pa.field('subreddit', pa.string(), nullable=False), + pa.field('term', pa.string(), nullable=False), + pa.field('week', pa.date32(), nullable=False), + pa.field('tf', pa.int64(), nullable=False)] + ) + + author_schema = pa.schema([pa.field('subreddit', pa.string(), nullable=False), + pa.field('author', pa.string(), nullable=False), + pa.field('week', pa.date32(), nullable=False), + pa.field('tf', pa.int64(), nullable=False)] + ) + + dfs = (b.to_pandas() for b in batches) + + def add_week(df): + df['week'] = (df.CreatedAt - pd.to_timedelta(df.CreatedAt.dt.dayofweek, unit='d')).dt.date + return(df) + + dfs = (add_week(df) for df in dfs) + + def iterate_rows(dfs): + for df in dfs: + for row in df.itertuples(): + yield row + + rows = iterate_rows(dfs) + + subreddit_weeks = groupby(rows, lambda r: (r.subreddit, r.week)) + + if mwe_pass != 'first': + mwe_dataset = pd.read_feather(f'/gscratch/comdata/users/nathante/reddit_multiword_expressions.feather') + mwe_dataset = mwe_dataset.sort_values(['phrasePWMI'],ascending=False) + mwe_phrases = list(mwe_dataset.phrase) + mwe_phrases = [tuple(s.split(' ')) for s in mwe_phrases] + mwe_tokenizer = MWETokenizer(mwe_phrases) + mwe_tokenize = mwe_tokenizer.tokenize + + else: + mwe_tokenize = MWETokenizer().tokenize + + def remove_punct(sentence): + new_sentence = [] + for token in sentence: + new_token = '' + for c in token: + if c not in string.punctuation: + new_token += c + if len(new_token) > 0: + new_sentence.append(new_token) + return new_sentence + + + stopWords = set(stopwords.words('english')) + + # we follow the approach described in datta, phelan, adar 2017 + def my_tokenizer(text): + # remove stopwords, punctuation, urls, lower case + # lowercase + text = text.lower() + + # remove urls + text = urlregex.sub("", text) + + # sentence tokenize + sentences = sent_tokenize(text) + + # wordpunct_tokenize + sentences = map(wordpunct_tokenize, sentences) + + # remove punctuation + + sentences = map(remove_punct, sentences) + + # remove sentences with less than 2 words + sentences = filter(lambda sentence: len(sentence) > 2, sentences) + + # datta et al. select relatively common phrases from the reddit corpus, but they don't really explain how. We'll try that in a second phase. + # they say that the extract 1-4 grams from 10% of the sentences and then find phrases that appear often relative to the original terms + # here we take a 10 percent sample of sentences + if mwe_pass == 'first': + sentences = list(sentences) + for sentence in sentences: + if random() <= 0.1: + grams = list(chain(*map(lambda i : ngrams(sentence,i),range(4)))) + with open(f'/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/{ngram_output}','a') as gram_file: + for ng in grams: + gram_file.write(' '.join(ng) + '\n') + for token in sentence: + if token not in stopWords: + yield token + + else: + # remove stopWords + sentences = map(mwe_tokenize, sentences) + sentences = map(lambda s: filter(lambda token: token not in stopWords, s), sentences) + for sentence in sentences: + for token in sentence: + yield token + + def tf_comments(subreddit_weeks): + for key, posts in subreddit_weeks: + subreddit, week = key + tfs = Counter([]) + authors = Counter([]) + for post in posts: + tokens = my_tokenizer(post.body) + tfs.update(tokens) + authors.update([post.author]) + + for term, tf in tfs.items(): + yield [True, subreddit, term, week, tf] + + for author, tf in authors.items(): + yield [False, subreddit, author, week, tf] + + outrows = tf_comments(subreddit_weeks) + + outchunksize = 10000 + + with pq.ParquetWriter(f"/gscratch/comdata/users/nathante/reddit_tfidf_test.parquet_temp/{partition}",schema=schema,compression='snappy',flavor='spark') as writer, pq.ParquetWriter(f"/gscratch/comdata/users/nathante/reddit_tfidf_test_authors.parquet_temp/{partition}",schema=author_schema,compression='snappy',flavor='spark') as author_writer: + + while True: + + chunk = islice(outrows,outchunksize) + chunk = (c for c in chunk if c[1] is not None) + pddf = pd.DataFrame(chunk, columns=["is_token"] + schema.names) + author_pddf = pddf.loc[pddf.is_token == False, schema.names] + pddf = pddf.loc[pddf.is_token == True, schema.names] + author_pddf = author_pddf.rename({'term':'author'}, axis='columns') + author_pddf = author_pddf.loc[:,author_schema.names] + + table = pa.Table.from_pandas(pddf,schema=schema) + author_table = pa.Table.from_pandas(author_pddf,schema=author_schema) + if table.shape[0] == 0: + break + writer.write_table(table) + author_writer.write_table(author_table) + + writer.close() + author_writer.close() + + +def gen_task_list(mwe_pass='first'): + files = os.listdir("/gscratch/comdata/output/reddit_comments_by_subreddit.parquet/") + with open("tf_task_list",'w') as outfile: + for f in files: + if f.endswith(".parquet"): + outfile.write(f"./tf_comments.py weekly_tf --mwe-pass {mwe_pass} {f}\n") + +if __name__ == "__main__": + fire.Fire({"gen_task_list":gen_task_list, + "weekly_tf":weekly_tf}) diff --git a/top_comment_phrases.py b/top_comment_phrases.py new file mode 100644 index 0000000..031cba5 --- /dev/null +++ b/top_comment_phrases.py @@ -0,0 +1,58 @@ +from pyspark.sql import functions as f +from pyspark.sql import Window +from pyspark.sql import SparkSession +import numpy as np + +spark = SparkSession.builder.getOrCreate() +df = spark.read.text("/gscratch/comdata/users/nathante/reddit_comment_ngrams_10p_sample/") + +df = df.withColumnRenamed("value","phrase") + +# count phrase occurrances +phrases = df.groupby('phrase').count() +phrases = phrases.withColumnRenamed('count','phraseCount') +phrases = phrases.filter(phrases.phraseCount > 10) + + +# count overall +N = phrases.select(f.sum(phrases.phraseCount).alias("phraseCount")).collect()[0].phraseCount + +print(f'analyzing PMI on a sample of {N} phrases') +logN = np.log(N) +phrases = phrases.withColumn("phraseLogProb", f.log(f.col("phraseCount")) - logN) + +# count term occurrances +phrases = phrases.withColumn('terms',f.split(f.col('phrase'),' ')) +terms = phrases.select(['phrase','phraseCount','phraseLogProb',f.explode(phrases.terms).alias('term')]) + +win = Window.partitionBy('term') +terms = terms.withColumn('termCount',f.sum('phraseCount').over(win)) +terms = terms.withColumnRenamed('count','termCount') +terms = terms.withColumn('termLogProb',f.log(f.col('termCount')) - logN) + +terms = terms.groupBy(terms.phrase, terms.phraseLogProb, terms.phraseCount).sum('termLogProb') +terms = terms.withColumnRenamed('sum(termLogProb)','termsLogProb') +terms = terms.withColumn("phrasePWMI", f.col('phraseLogProb') - f.col('termsLogProb')) + +# join phrases to term counts + + +df = terms.select(['phrase','phraseCount','phraseLogProb','phrasePWMI']) + +df = df.sort(['phrasePWMI'],descending=True) +df = df.sortWithinPartitions(['phrasePWMI'],descending=True) +df.write.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet/",mode='overwrite',compression='snappy') + +df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet/") + +df.write.csv("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.csv/",mode='overwrite',compression='none') + +df = spark.read.parquet("/gscratch/comdata/users/nathante/reddit_comment_ngrams_pwmi.parquet") +df = df.select('phrase','phraseCount','phraseLogProb','phrasePWMI') + +# choosing phrases occurring at least 3500 times in the 10% sample (35000 times) and then with a PWMI of at least 3 yeids about 65000 expressions. +# +df = df.filter(f.col('phraseCount') > 3500).filter(f.col("phrasePWMI")>3) +df = df.toPandas() +df.to_feather("/gscratch/comdata/users/nathante/reddit_multiword_expressions.feather") +df.to_csv("/gscratch/comdata/users/nathante/reddit_multiword_expressions.csv")