]> code.communitydata.science - cdsc_reddit.git/commitdiff
changes from dirty branch.
authorNathan TeBlunthuis <nathante@uw.edu>
Thu, 18 May 2023 17:29:08 +0000 (10:29 -0700)
committerNathan TeBlunthuis <nathante@uw.edu>
Thu, 18 May 2023 17:29:08 +0000 (10:29 -0700)
clustering/Makefile
clustering/clustering_base.py
clustering/umap_hdbscan_clustering.py
datasets/comments_2_parquet_part2.py
datasets/job_script.sh
datasets/submissions_2_parquet_part1.py
datasets/submissions_2_parquet_part2.py

index 559a85ca0f8d01dc02a6b951f26f1dce97c59dee..6f25a7d0f7c38954ea3e2032495812df2f2d0ad6 100644 (file)
@@ -4,7 +4,7 @@ similarity_data=/gscratch/comdata/output/reddit_similarity
 clustering_data=/gscratch/comdata/output/reddit_clustering
 kmeans_selection_grid=--max_iters=[3000] --n_inits=[10] --n_clusters=[100,500,1000,1250,1500,1750,2000]
 
 clustering_data=/gscratch/comdata/output/reddit_clustering
 kmeans_selection_grid=--max_iters=[3000] --n_inits=[10] --n_clusters=[100,500,1000,1250,1500,1750,2000]
 
-umap_hdbscan_selection_grid=--min_cluster_sizes=[2] --min_samples=[2,3,4,5] --cluster_selection_epsilons=[0,0.01,0.05,0.1,0.15,0.2] --cluster_selection_methods=[eom,leaf] --n_neighbors=[5,15,25,50,75,100] --learning_rate=[1] --min_dist=[0,0.1,0.25,0.5,0.75,0.9,0.99] --local_connectivity=[1] --densmap=[True,False] --n_components=[2,5,10]
+umap_hdbscan_selection_grid=--min_cluster_sizes=[2] --min_samples=[2,3,4,5] --cluster_selection_epsilons=[0,0.01,0.05,0.1,0.15,0.2] --cluster_selection_methods=[eom,leaf] --n_neighbors=[5,15,25,50,75,100] --learning_rate=[1] --min_dist=[0,0.1,0.25,0.5,0.75,0.9,0.99] --local_connectivity=[1] --densmap=[True,False] --n_components=[2,5,10,15,25]
 
 hdbscan_selection_grid=--min_cluster_sizes=[2,3,4,5] --min_samples=[2,3,4,5] --cluster_selection_epsilons=[0,0.01,0.05,0.1,0.15,0.2] --cluster_selection_methods=[eom,leaf]
 affinity_selection_grid=--dampings=[0.5,0.6,0.7,0.8,0.95,0.97,0.99] --preference_quantiles=[0.1,0.3,0.5,0.7,0.9] --convergence_iters=[15]
 
 hdbscan_selection_grid=--min_cluster_sizes=[2,3,4,5] --min_samples=[2,3,4,5] --cluster_selection_epsilons=[0,0.01,0.05,0.1,0.15,0.2] --cluster_selection_methods=[eom,leaf]
 affinity_selection_grid=--dampings=[0.5,0.6,0.7,0.8,0.95,0.97,0.99] --preference_quantiles=[0.1,0.3,0.5,0.7,0.9] --convergence_iters=[15]
index ced627d2863eb9448126c625020fb7aba530b21c..98a260e9aca97e455f1fb48201a2880ef20e332c 100644 (file)
@@ -21,9 +21,9 @@ class clustering_job:
         self.subreddits, self.mat = self.read_distance_mat(self.infile)
         self.clustering = self.call(self.mat, *self.args, **self.kwargs)
         self.cluster_data = self.process_clustering(self.clustering, self.subreddits)
         self.subreddits, self.mat = self.read_distance_mat(self.infile)
         self.clustering = self.call(self.mat, *self.args, **self.kwargs)
         self.cluster_data = self.process_clustering(self.clustering, self.subreddits)
-        self.score = self.silhouette()
         self.outpath.mkdir(parents=True, exist_ok=True)
         self.cluster_data.to_feather(self.outpath/(self.name + ".feather"))
         self.outpath.mkdir(parents=True, exist_ok=True)
         self.cluster_data.to_feather(self.outpath/(self.name + ".feather"))
+
         self.hasrun = True
         self.cleanup()
 
         self.hasrun = True
         self.cleanup()
 
@@ -62,6 +62,7 @@ class clustering_job:
         else:
             score = None
             self.silsampout = None
         else:
             score = None
             self.silsampout = None
+
         return score
 
     def read_distance_mat(self, similarities, use_threads=True):
         return score
 
     def read_distance_mat(self, similarities, use_threads=True):
@@ -81,9 +82,13 @@ class clustering_job:
         self.n_clusters = len(set(clusters))
 
         print(f"found {self.n_clusters} clusters")
         self.n_clusters = len(set(clusters))
 
         print(f"found {self.n_clusters} clusters")
-
         cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_})
 
         cluster_data = pd.DataFrame({'subreddit': subreddits,'cluster':clustering.labels_})
 
+
+        self.score = self.silhouette()
+        print(f"silhouette_score:{self.score}")
+
+
         cluster_sizes = cluster_data.groupby("cluster").count().reset_index()
         print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members")
 
         cluster_sizes = cluster_data.groupby("cluster").count().reset_index()
         print(f"the largest cluster has {cluster_sizes.loc[cluster_sizes.cluster!=-1].subreddit.max()} members")
 
@@ -125,7 +130,7 @@ class twoway_clustering_job(clustering_job):
         self.after_run()
         self.cleanup()
 
         self.after_run()
         self.cleanup()
 
-    def after_run():
+    def after_run(self):
         self.score = self.silhouette()
         self.outpath.mkdir(parents=True, exist_ok=True)
         print(self.outpath/(self.name+".feather"))
         self.score = self.silhouette()
         self.outpath.mkdir(parents=True, exist_ok=True)
         print(self.outpath/(self.name+".feather"))
index 5633d770391f06dd5488682b81cf03b6a6b4465e..cf4acbb0cd2887134f4717982f6ee855ebfd7038 100644 (file)
@@ -110,7 +110,7 @@ class umap_hdbscan_job(twoway_clustering_job):
         self.cluster_selection_method = hdbscan_args['cluster_selection_method']
 
     def after_run(self):
         self.cluster_selection_method = hdbscan_args['cluster_selection_method']
 
     def after_run(self):
-        coords = self.step1.emedding_
+        coords = self.step1.embedding_
         self.cluster_data['x'] = coords[:,0]
         self.cluster_data['y'] = coords[:,1]
         super().after_run()
         self.cluster_data['x'] = coords[:,0]
         self.cluster_data['y'] = coords[:,1]
         super().after_run()
index 1031c683f9ae6d94ca157f34832e2398b0af2e85..5b9a13107b97783dffab87ce6db6162322245883 100755 (executable)
@@ -9,7 +9,7 @@ from pyspark.sql import SparkSession
 spark = SparkSession.builder.getOrCreate()
 
 conf = pyspark.SparkConf().setAppName("Reddit submissions to parquet")
 spark = SparkSession.builder.getOrCreate()
 
 conf = pyspark.SparkConf().setAppName("Reddit submissions to parquet")
-conf = conf.set("spark.sql.shuffle.partitions",2000)
+conf = conf.set("spark.sql.shuffle.partitions",2400)
 conf = conf.set('spark.sql.crossJoin.enabled',"true")
 conf = conf.set('spark.debug.maxToStringFields',200)
 sc = spark.sparkContext
 conf = conf.set('spark.sql.crossJoin.enabled',"true")
 conf = conf.set('spark.debug.maxToStringFields',200)
 sc = spark.sparkContext
@@ -25,12 +25,13 @@ df = df.withColumn("Month",f.month(f.col("CreatedAt")))
 df = df.withColumn("Year",f.year(f.col("CreatedAt")))
 df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
 
 df = df.withColumn("Year",f.year(f.col("CreatedAt")))
 df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
 
-df = df.repartition('subreddit')
-df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
-df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
-df2.write.parquet("/gscratch/scrubbed/comdata/output/reddit_comments_by_subreddit.parquet", mode='overwrite', compression='snappy')
+# df = df.repartition(1200,'subreddit')
+df2 = df.sort(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
+df2 = df2.sortWithinPartitions(["subreddit","CreatedAt","link_id","parent_id","Year","Month","Day"],ascending=True)
+# df2.write.parquet("/gscratch/scrubbed/comdata/reddit_comments_by_subreddit.parquet", mode='overwrite', compression='snappy')
 
 
-df = df.repartition('author')
-df3 = df.sort(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
-df3 = df3.sortWithinPartitions(["author","CreatedAt","subreddit","link_id","parent_id","Year","Month","Day"],ascending=True)
-df3.write.parquet("/gscratch/scrubbed/comdata/output/reddit_comments_by_author.parquet", mode='overwrite',compression='snappy')
+#df = spark.read.parquet("/gscratch/scrubbed/comdata/reddit_comments_by_subreddit.parquet")
+df = df.repartition(2400,'author','subreddit',"Year","Month","Day")
+df3 = df.sort(["author","subreddit","Year","Month","Day","CreatedAt","link_id","parent_id"],ascending=True)
+df3 = df3.sortWithinPartitions(["author","subreddit","Year","Month","Day","CreatedAt","link_id","parent_id"],ascending=True)
+df3.write.parquet("/gscratch/scrubbed/comdata/reddit_comments_by_author.parquet", mode='overwrite',compression='snappy')
index 5b5a7d30ef47bfdd99e3a22ef50ed1de059b7e01..ca994d529e9ee2cf538782557f18b93cb82bc3bf 100755 (executable)
@@ -1,4 +1,6 @@
 #!/usr/bin/bash
 #!/usr/bin/bash
+source ~/.bashrc
+echo $(hostname)
 start_spark_cluster.sh
 start_spark_cluster.sh
-singularity exec  /gscratch/comdata/users/nathante/containers/nathante.sif spark-submit --master spark://$(hostname):7077 comments_2_parquet_part2.py 
-singularity exec /gscratch/comdata/users/nathante/containers/nathante.sif stop-all.sh
+spark-submit --verbose --master spark://$(hostname):43015 submissions_2_parquet_part2.py 
+stop-all.sh
index 77ae09f33ca260261ea919cbf23bbe822aa940c4..d1a8a3d392bb1e75e24ff907a72f9710bc6d7b29 100755 (executable)
@@ -58,7 +58,7 @@ def parse_submission(post, names = None):
 def parse_dump(partition):
 
     N=10000
 def parse_dump(partition):
 
     N=10000
-    stream = open_fileset([f"/gscratch/comdata/raw_data/reddit_dumps/submissions/{partition}"])
+    stream = open_fileset([f"/gscratch/comdata/raw_data/submissions/{partition}"])
     rows = map(parse_submission,stream)
     schema = pa.schema([
         pa.field('id', pa.string(),nullable=True),
     rows = map(parse_submission,stream)
     schema = pa.schema([
         pa.field('id', pa.string(),nullable=True),
@@ -102,7 +102,7 @@ def parse_dump(partition):
 
         writer.close()
 
 
         writer.close()
 
-def gen_task_list(dumpdir="/gscratch/comdata/raw_data/reddit_dumps/submissions"):
+def gen_task_list(dumpdir="/gscratch/comdata/raw_data/submissions"):
     files = list(find_dumps(dumpdir,base_pattern="RS_20*.*"))
     with open("submissions_task_list.sh",'w') as of:
         for fpath in files:
     files = list(find_dumps(dumpdir,base_pattern="RS_20*.*"))
     with open("submissions_task_list.sh",'w') as of:
         for fpath in files:
index 3a586174113adaa5cf9d3f577c6bfc46aff9538a..7dc4f743d689facf931add34b028abe211bd807e 100644 (file)
@@ -29,14 +29,14 @@ df = df.withColumn("Day",f.dayofmonth(f.col("CreatedAt")))
 df = df.withColumn("subreddit_hash",f.sha2(f.col("subreddit"), 256)[0:3])
 
 # next we gotta resort it all.
 df = df.withColumn("subreddit_hash",f.sha2(f.col("subreddit"), 256)[0:3])
 
 # next we gotta resort it all.
-df = df.repartition("subreddit")
-df2 = df.sort(["subreddit","CreatedAt","id"],ascending=True)
+df = df.repartition(800,"subreddit","Year","Month")
+df2 = df.sort(["subreddit","Year","Month","CreatedAt","id"],ascending=True)
 df2 = df.sortWithinPartitions(["subreddit","CreatedAt","id"],ascending=True)
 df2.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_subreddit.parquet2", mode='overwrite',compression='snappy')
 
 
 # # we also want to have parquet files sorted by author then reddit. 
 df2 = df.sortWithinPartitions(["subreddit","CreatedAt","id"],ascending=True)
 df2.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_subreddit.parquet2", mode='overwrite',compression='snappy')
 
 
 # # we also want to have parquet files sorted by author then reddit. 
-df = df.repartition("author")
-df3 = df.sort(["author","CreatedAt","id"],ascending=True)
+df = df.repartition(800,"author","subreddit","Year","Month")
+df3 = df.sort(["author","Year","Month","CreatedAt","id"],ascending=True)
 df3 = df.sortWithinPartitions(["author","CreatedAt","id"],ascending=True)
 df3.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_author.parquet2", mode='overwrite',compression='snappy')
 df3 = df.sortWithinPartitions(["author","CreatedAt","id"],ascending=True)
 df3.write.parquet("/gscratch/comdata/output/temp/reddit_submissions_by_author.parquet2", mode='overwrite',compression='snappy')

Community Data Science Collective || Want to submit a patch?