--- /dev/null
+#!/usr/bin/env python3
+
+'''
+Take a stratified sample of article quality labels.
+
+For now we just stratify by label type.
+Later we might add date.
+Later we might stratify by wikiproject too.
+
+A key limitation of this approach is that we can sample on the level of the page.
+We'd really like to be able to sample on the level of edit session.
+But that isn't possible because of how article assessments work.
+'''
+from itertools import islice, chain
+from pathlib import Path
+import pandas as pd
+import numpy as np
+random = np.random.RandomState(1968)
+import json
+import pyarrow.feather as feather
+import fire
+from collections import Counter
+from pyRemembeR import Remember
+from enum import IntEnum, unique
+from datetime import datetime
+from dataclasses import dataclass, asdict
+from multiprocessing import Pool
+from urllib.parse import unquote
+from pyspark.sql import functions as f
+from pyspark.sql import SparkSession, Window
+from pyspark.sql.functions import udf
+from pyspark.sql.types import StringType
+from numpy import dtype
+import csv
+
+def wikiq_to_parquet():
+
+ path = Path("/gscratch/comdata/users/nathante/wikiqRunning/wikiq_output/")
+ outpath = Path("/gscratch/comdata/output/wikiq_enwiki_20200301_nathante_parquet/")
+ files = list(map(Path,path.glob("*.tsv")))
+ dumpfile = files[0]
+
+ def wikiq_tsv_to_parquet(dumpfile, outpath = Path("/gscratch/comdata/output/wikiq_enwiki_20200301_nathante.parquet/")):
+ outfile = outpath / (dumpfile.name + ".parquet")
+ outpath.mkdir(parents=True, exist_ok=True)
+ _wikiq_tsv_to_parquet(dumpfile,outfile)
+
+ dumpfile = Path("/gscratch/comdata/users/nathante/wikiqRunning/wikiq_output/enwiki-20200301-pages-meta-history12-p4980874p5038451.tsv")
+
+ def _wikiq_tsv_to_parquet(dumpfile, outfile):
+
+ dtypes = {'anon': dtype('O'), 'articleid': dtype('int64'), 'deleted': dtype('bool'), 'editor': dtype('O'), 'editor_id': dtype('float64'), 'minor': dtype('bool'), 'namespace': dtype('int64'), 'revert': dtype('O'), 'reverteds': dtype('O'), 'revid': dtype('int64'), 'sha1': dtype('O'), 'text_chars': dtype('float64'), 'title': dtype('O')}
+
+ print(dumpfile)
+ df = pd.read_csv(dumpfile,sep='\t',quoting=csv.QUOTE_NONE,error_bad_lines=False, warn_bad_lines=True,parse_dates=['date_time'],dtype=dtypes)
+
+ df.to_parquet(outfile)
+
+ with Pool(28) as pool:
+ jobs = pool.imap_unordered(wikiq_tsv_to_parquet, files)
+ list(jobs)
+
+ spark = SparkSession.builder.getOrCreate()
+
+ @udf(StringType())
+ def decode_strip_udf(val):
+ if val is None:
+ return ""
+ else:
+ return unquote(val).strip('\"')
+ df = spark.read.parquet('/gscratch/comdata/output/wikiq_enwiki_20200301_nathante.parquet')
+ df = df.withColumnRenamed("anon","anonRaw")
+ df = df.withColumn("anon",f.when(f.col("anonRaw")=="TRUE",True).otherwise(False))
+ df = df.drop("anonRaw")
+ df = df.withColumnRenamed("text_chars","text_chars_raw")
+ df = df.withColumn("text_chars",f.col("text_chars_raw").cast('int'))
+ df = df.drop("text_chars_raw")
+ df = df.withColumnRenamed("editor_id",'editor_id_raw')
+ df = df.withColumn("editor_id",f.col("editor_id_raw").cast("int"))
+ df = df.drop("editor_id_raw")
+ df = df.withColumnRenamed("revert","revert_raw")
+ df = df.withColumn("revert",f.when(f.col("revert_raw")=="TRUE",True).otherwise(False))
+ df = df.drop("revert_raw")
+ df = df.withColumnRenamed("title","title_raw")
+ df = df.withColumn("title", decode_strip_udf(f.col("title_raw")))
+ df = df.drop("title_raw")
+ df = df.withColumnRenamed("editor","editor_raw")
+ df = df.withColumn("editor", decode_strip_udf(f.col("editor_raw")))
+ df = df.drop("editor_raw")
+ df = df.repartition(400,'articleid')
+ df.write.parquet("/gscratch/comdata/output/wikiq_enwiki_20200301_nathante_partitioned.parquet",mode='overwrite')
+
+@unique
+class WP10(IntEnum):
+ start = 1
+ stub = 2
+ c = 3
+ b = 4
+ a = 5
+ ga = 6
+ fa = 7
+
+ @staticmethod
+ def from_string(s):
+ return {'start':WP10.start,
+ 'stub':WP10.stub,
+ 'c':WP10.c,
+ 'b':WP10.b,
+ 'a':WP10.a,
+ 'ga':WP10.ga,
+ 'fa':WP10.fa}.get(s,None)
+
+ def to_string(self):
+ return {WP10.start:'start',
+ WP10.stub:'stub',
+ WP10.c:'c',
+ WP10.b:'b',
+ WP10.a:'a',
+ WP10.ga:'ga',
+ WP10.fa:'fa'}[self]
+
+
+@dataclass
+class PageLabel:
+ timestamp:datetime
+ wp10:WP10
+
+ @staticmethod
+ def from_json(obj):
+ timestamp = obj.get('timestamp',None)
+ if timestamp is not None:
+ timestamp = datetime.strptime(obj['timestamp'],'%Y%m%d%H%M%S')
+ else:
+ timestamp = None
+
+ return PageLabel(timestamp=timestamp,
+ wp10=WP10.from_string(obj.get('wp10')))
+
+ @staticmethod
+ def from_row(row):
+ return PageLabel(timestamp = row.timestamp,
+ wp10 = WP10(row.wp10))
+
+ def to_json(self):
+ d = asdict(self)
+
+ if self.timestamp is not None:
+ d['timestamp'] = self.timestamp.strftime('%Y%m%d%H%M%S')
+
+ if self.wp10 is not None:
+ d['wp10'] = self.wp10.to_string()
+
+ return json.dumps(d)
+
+@dataclass
+class TalkPageLabel(PageLabel):
+ dump_talk_page_title:str
+ talk_page_id:int
+ project:str
+
+ @staticmethod
+ def from_json(obj):
+ res = PageLabel.from_json(obj)
+
+ return TalkPageLabel(dump_talk_page_title=obj.get('dump_talk_page_title',None),
+ talk_page_id=obj.get('talk_page_id',None),
+ project=obj.get("project",None),
+ **asdict(res)
+ )
+ @staticmethod
+ def from_row(row):
+ res = PageLabel.from_row(row)
+ return TalkPageLabel(dump_talk_page_title = row.dump_talk_page_title,
+ talk_page_id = row.talk_page_id,
+ project = row.project
+ **asdict(res))
+
+
+
+@dataclass
+class ArticlePageLabel(PageLabel):
+ '''class representing labels to a page'''
+ title: str
+ articleid: int
+ revid:int
+
+ @staticmethod
+ def from_json(obj):
+ res = PageLabel.from_json(obj)
+
+ return ArticlePageLabel(title=obj.get('title',None),
+ articleid=obj.get('articleid',None),
+ **asdict(res)
+ )
+
+ @staticmethod
+ def from_row(row):
+ res = PageLabel.from_row(row)
+ return ArticlePageLabel(title = row.title,
+ articleid = row.articleid,
+ revid = row.revid,
+ **asdict(res))
+
+infiles="enwiki-20200301-pages-meta-history*.xml-p*.7z_article_labelings.json"; samplesize=5000*7
+
+def main(infiles="enwiki-20200301-pages-meta-history*.xml-p*.7z_article_labelings.json", samplesize=5000*7):
+ path = Path('data')
+ infiles = path.glob(infiles)
+
+ pool = Pool(28)
+
+ lines = chain(* map(lambda f: open(f,'r'), infiles))
+
+ parsed = pool.imap_unordered(json.loads, lines, chunksize=int(1e3))
+ formatted = pool.imap_unordered(TalkPageLabel.from_json, parsed, chunksize=int(1e3))
+ dicted = pool.imap_unordered(asdict,formatted, chunksize=int(1e3))
+
+ # data frame of the the latest labels.
+ df = pd.DataFrame(dicted)
+
+ df = df.loc[df.timestamp <= datetime(2019,1,1)]
+
+ groups = df.groupby(["talk_page_id"])
+ max_labels = groups.wp10.max().reset_index()
+
+ df2 = pd.merge(df,max_labels,on=['talk_page_id','wp10'],how='right')
+ last_timestamp = df2.groupby(['talk_page_id']).timestamp.max().reset_index()
+
+ df2 = pd.merge(df2, last_timestamp, on=['talk_page_id','timestamp'], how='right')
+ first_project = df2.groupby(['talk_page_id']).project.first()
+ df2 = pd.merge(df2, first_project,on=['talk_page_id','project'], how='right')
+
+ tpid = df2
+
+ #.wp10.max().reset_index()
+ tpid = tpid.loc[~tpid.dump_talk_page_title.isna()]
+
+ # pick out just the samples we want.
+ spark = SparkSession.builder.getOrCreate()
+
+ sparkdf = spark.read.parquet("/gscratch/comdata/output/wikiq_enwiki_20200301_nathante_partitioned.parquet")
+
+ tpid['timestamp'] = tpid['timestamp'].dt.tz_localize('utc')
+ labels = spark.createDataFrame(tpid)
+ talks = sparkdf.filter(sparkdf.namespace==1)
+ articles = sparkdf.filter(sparkdf.namespace==0)
+
+ # labels = labels.join(talks,on=[labels.talk_page_id == talks.articleid],how='left_outer')
+
+ talks = talks.join(labels,on=[labels.talk_page_id == talks.articleid])
+
+ #talks.filter(talks.wp10==7).select('talk_page_id').distinct().count()
+
+ talks = talks.withColumn('timediff', f.datediff(talks.timestamp, talks.date_time))
+
+ talks = talks.filter(talks.timediff <= 0)
+
+ win = Window.partitionBy("talk_page_id")
+ talks = talks.withColumn('best_timediff', f.max('timediff').over(win))
+ talks = talks.filter(talks.timediff == talks.best_timediff)
+
+ talks = talks.withColumn('article_title',f.substring_index(f.col("title"),':',-1))
+ talks = talks.select(['article_title','wp10',f.col('timestamp').alias('timestamp'),'talk_page_id']).distinct()
+
+ articles = articles.join(talks,on=[talks.article_title == articles.title])
+
+ articles = articles.withColumn('timediff', f.datediff(articles.timestamp, articles.date_time))
+ articles = articles.filter(articles.timediff <= 0)
+
+ win2 = Window.partitionBy("articleid")
+ articles = articles.filter(f.col("revert")==False)
+ articles = articles.withColumn('best_timediff', f.max('timediff').over(win2))
+ articles = articles.filter(articles.timediff == articles.best_timediff)
+ articles = articles.select(['revid','timestamp','wp10','articleid','title'])
+
+ articles = articles.groupby(['timestamp','wp10','articleid','title']).agg(f.first(f.col("revid")).alias("revid"))
+
+ articles.write.parquet("data/article_quality_data.parquet",mode='overwrite')
+
+ tpid = pd.read_parquet("data/article_quality_data.parquet")
+
+ # we want to sample /papges/ not /labels/.
+ # so we need to do a /full/ groupby pages.
+ # this is why we have a lot of RAM!
+ # we need the number of
+ label_counts = {}
+ sample_page_ids = {}
+ label_max_samplesize = int(samplesize / len(WP10))
+ sample_chunks = []
+
+ for lab in WP10:
+ print(lab)
+ page_ids = tpid.loc[tpid.wp10==lab].articleid
+ label_counts[lab] = len(page_ids)
+ print(lab,label_counts)
+ if(label_counts[lab] <= label_max_samplesize):
+ sample_page_ids[lab] = page_ids
+ else:
+ sample_page_ids[lab] = random.choice(page_ids,label_max_samplesize,replace=False)
+
+ # get the labels for each sampled article
+ sample_data_lab = tpid.loc[(tpid.articleid.isin(sample_page_ids[lab]))]
+
+ sample_chunks.append(sample_data_lab)
+
+ remember = Remember(f='remember_sample_quality_labels.RDS')
+
+ remember(label_max_samplesize, 'label_max_samplesize')
+
+
+ # Note that different wikiprojects can have different labels
+ sample = pd.concat(sample_chunks,ignore_index=True)
+
+ revisions_per_article = sparkdf.filter(sparkdf.namespace==0).select(['revid','articleid','date_time','title'])
+ revisions_per_article = revisions_per_article.filter(f.col("date_time") >= datetime(2019,1,1))
+ revisions_per_article = revisions_per_article.filter(f.col("date_time") <= datetime(2019,12,31))
+ revisions_per_article = revisions_per_article.groupby(["articleid",'title']).count().toPandas()
+
+ revisions_per_article['title'] = revisions_per_article.title.apply(lambda s: unquote(s).strip('\"'))
+
+ revisions_per_article = pd.merge(revisions_per_article,tpid,left_on='articleid',right_on='articleid')
+
+ revisions_per_class = revisions_per_article.groupby('wp10').agg({'count':'sum'}).reset_index()
+ revisions_per_class['wp10'] = revisions_per_class.wp10.apply(lambda s: WP10(s).to_string())
+
+ label_counts = pd.DataFrame({'wp10':map(lambda x: x.to_string(),label_counts.keys()),'n_articles':label_counts.values()})
+ label_counts = pd.merge(label_counts,revisions_per_class,left_on='wp10',right_on='wp10')
+ label_counts = label_counts.rename(columns={'count':'n_revisions'})
+
+ remember(label_counts, 'label_sample_counts')
+
+ sample.to_feather("data/20200301_article_labelings_sample.feather")
+
+ sample = pd.read_feather("data/20200301_article_labelings_sample.feather")
+ sample_counts = sample.articleid.groupby(sample.wp10).count().reset_index()
+ remember(sample_counts,'sample_counts')
+
+ sample_labels = sample.apply(ArticlePageLabel.from_row,axis=1)
+ sample_labels = map(PageLabel.to_json, sample_labels)
+
+ with open("data/20200301_article_labelings_sample.json",'w') as of:
+ of.writelines((l + '\n' for l in sample_labels))
+
+ pool.close()
+
+if __name__ == "__main__":
+ fire.Fire(main)
+