4 from shutil import copyfile
 
   6 from pandas.testing import assert_frame_equal
 
   7 from io import StringIO
 
   9 # with / without pwr DONE
 
  10 # with / without url encode DONE
 
  11 # with / without collapse user DONE
 
  12 # with output to sdtout DONE
 
  13 # note that the persistence radius is 7 by default
 
  14 # reading various file formats including
 
  15 #        7z, gz, bz2, xml  DONE
 
  16 # wikia and wikipedia data DONE
 
  19 class Test_Wikipedia(unittest.TestCase):
 
  21         if not os.path.exists("test_output"):
 
  22             os.mkdir("test_output")
 
  24         self.wiki = 'ikwiki-20180301-pages-meta-history'
 
  25         self.wikiq_out_name =  self.wiki + ".tsv"
 
  26         self.test_output_dir = os.path.join(".", "test_output")
 
  27         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
 
  29         self.infile = "{0}.xml.bz2".format(self.wiki)    
 
  30         self.base_call = "../wikiq {0} -o {1}"
 
  31         self.input_dir = "dumps"
 
  32         self.input_file = os.path.join(".", self.input_dir,self.infile)
 
  33         self.baseline_output_dir = "baseline_output"
 
  35     def test_WP_url_encode(self):
 
  36         test_filename =  "url-encode_" + self.wikiq_out_name
 
  37         test_file = os.path.join(self.test_output_dir, test_filename)
 
  38         if os.path.exists(test_file):
 
  41         call = self.base_call.format(self.input_file, self.test_output_dir)
 
  42         call = call + " --url-encode"
 
  43         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
 
  46         copyfile(self.call_output, test_file)
 
  47         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
  49         # as a test let's make sure that we get equal data frames
 
  50         test = pd.read_table(test_file)
 
  51         baseline = pd.read_table(baseline_file)
 
  52         assert_frame_equal(test,baseline, check_like=True)
 
  54     def test_WP_namespaces(self):
 
  55         print(os.path.abspath('.'))
 
  56         test_filename =  "namespaces_" + self.wikiq_out_name
 
  57         test_file = os.path.join(self.test_output_dir, test_filename)
 
  58         if os.path.exists(test_file):
 
  61         call = self.base_call.format(self.input_file, self.test_output_dir)
 
  62         call = call + " -n 0 -n 1"
 
  64         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
 
  66         copyfile(self.call_output, test_file)
 
  67         baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename)
 
  69         # as a test let's make sure that we get equal data frames
 
  70         test = pd.read_table(test_file)
 
  71         num_wrong_ns = sum(~ test.namespace.isin({0,1}))
 
  72         self.assertEqual(num_wrong_ns, 0)
 
  73         baseline = pd.read_table(baseline_file)
 
  74         assert_frame_equal(test,baseline, check_like=True)
 
  76     def test_WP_revert_radius(self):
 
  77         print(os.path.abspath('.'))
 
  78         test_filename =  "revert_radius_" + self.wikiq_out_name
 
  79         test_file = os.path.join(self.test_output_dir, test_filename)
 
  80         if os.path.exists(test_file):
 
  83         call = self.base_call.format(self.input_file, self.test_output_dir)
 
  84         call = call + " -n 0 -n 1 -rr 1"
 
  86         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
 
  88         copyfile(self.call_output, test_file)
 
  89         baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename)
 
  91         # as a test let's make sure that we get equal data frames
 
  92         test = pd.read_table(test_file)
 
  93         num_wrong_ns = sum(~ test.namespace.isin({0,1}))
 
  94         self.assertEqual(num_wrong_ns, 0)
 
  95         baseline = pd.read_table(baseline_file)
 
  96         assert_frame_equal(test,baseline, check_like=True)
 
 100 class Test_Basic(unittest.TestCase):
 
 103         if not os.path.exists("test_output"):
 
 104             os.mkdir("test_output")
 
 106         self.wiki = 'sailormoon'
 
 107         self.wikiq_out_name =  self.wiki + ".tsv"
 
 108         self.test_output_dir = os.path.join(".", "test_output")
 
 109         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
 
 111         self.infile = "{0}.xml.7z".format(self.wiki)
 
 112         self.base_call = "../wikiq {0} -o {1}"
 
 113         self.input_dir = "dumps"
 
 114         self.input_file = os.path.join(".", self.input_dir,self.infile)
 
 115         self.baseline_output_dir = "baseline_output"
 
 117     def test_noargs(self):
 
 119         test_filename =  "noargs_" + self.wikiq_out_name
 
 120         test_file = os.path.join(self.test_output_dir, test_filename)
 
 121         if os.path.exists(test_file):
 
 124         call = self.base_call.format(self.input_file, self.test_output_dir)
 
 125         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
 
 128         copyfile(self.call_output, test_file)
 
 130         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
 132         test = pd.read_table(test_file)
 
 133         baseline = pd.read_table(baseline_file)
 
 134         assert_frame_equal(test,baseline, check_like=True)
 
 137     def test_collapse_user(self):
 
 138         test_filename =  "collapse-user_" + self.wikiq_out_name
 
 139         test_file = os.path.join(self.test_output_dir, test_filename)
 
 140         if os.path.exists(test_file):
 
 143         call = self.base_call.format(self.input_file, self.test_output_dir)
 
 144         call = call + " --collapse-user"
 
 146         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
 
 149         copyfile(self.call_output, test_file)
 
 151         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
 152         test = pd.read_table(test_file)
 
 153         baseline = pd.read_table(baseline_file)
 
 154         assert_frame_equal(test,baseline, check_like=True)
 
 156     def test_pwr_segment(self):
 
 157         test_filename =  "persistence_segment_" + self.wikiq_out_name
 
 158         test_file = os.path.join(self.test_output_dir, test_filename)
 
 159         if os.path.exists(test_file):
 
 162         call = self.base_call.format(self.input_file, self.test_output_dir)
 
 163         call = call + " --persistence segment"
 
 164         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
 
 168         copyfile(self.call_output, test_file)
 
 170         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
 172         test = pd.read_table(test_file)
 
 173         baseline = pd.read_table(baseline_file)
 
 174         assert_frame_equal(test,baseline, check_like=True)
 
 176     def test_pwr_legacy(self):
 
 177         test_filename =  "persistence_legacy_" + self.wikiq_out_name
 
 178         test_file = os.path.join(self.test_output_dir, test_filename)
 
 179         if os.path.exists(test_file):
 
 182         call = self.base_call.format(self.input_file, self.test_output_dir)
 
 183         call = call + " --persistence legacy"
 
 184         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
 
 188         copyfile(self.call_output, test_file)
 
 190         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
 192         test = pd.read_table(test_file)
 
 193         baseline = pd.read_table(baseline_file)
 
 194         assert_frame_equal(test,baseline, check_like=True)
 
 197         test_filename =  "persistence_" + self.wikiq_out_name
 
 198         test_file = os.path.join(self.test_output_dir, test_filename)
 
 199         if os.path.exists(test_file): 
 
 202         call = self.base_call.format(self.input_file, self.test_output_dir)
 
 203         call = call + " --persistence"
 
 204         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
 
 208         copyfile(self.call_output, test_file)
 
 210         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
 212         test = pd.read_table(test_file)
 
 213         baseline = pd.read_table(baseline_file)
 
 215         test = test.reindex(columns=sorted(test.columns))
 
 216         assert_frame_equal(test,baseline, check_like=True)
 
 219     def test_url_encode(self):
 
 220         test_filename =  "url-encode_" + self.wikiq_out_name
 
 222         test_file = os.path.join(self.test_output_dir, test_filename)
 
 223         if os.path.exists(test_file):
 
 226         call = self.base_call.format(self.input_file, self.test_output_dir)
 
 227         call = call + " --url-encode"
 
 228         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
 
 231         copyfile(self.call_output, test_file)
 
 232         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
 233         test = pd.read_table(test_file)
 
 234         baseline = pd.read_table(baseline_file)
 
 236         test = test.reindex(columns=sorted(test.columns))
 
 237         assert_frame_equal(test,baseline, check_like=True)
 
 240 class Test_Malformed(unittest.TestCase):
 
 242         if not os.path.exists("test_output"):
 
 243             os.mkdir("test_output")
 
 245         self.wiki = 'twinpeaks'
 
 246         self.wikiq_out_name =  self.wiki + ".tsv"
 
 247         self.test_output_dir = os.path.join(".", "test_output")
 
 248         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name) 
 
 250         self.infile = "{0}.xml.7z".format(self.wiki)
 
 251         self.base_call = "../wikiq {0} -o {1}"
 
 252         self.input_dir = "dumps"
 
 253         self.input_file = os.path.join(".", self.input_dir,self.infile)
 
 256     def test_malformed_noargs(self):
 
 258         call = self.base_call.format(self.input_file, self.test_output_dir)
 
 259         proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
 
 261         outs, errs = proc.communicate()
 
 262         errlines = str(errs).split("\\n")
 
 263         self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
 
 265 class Test_Stdout(unittest.TestCase):
 
 268         self.wiki = 'sailormoon'
 
 269         self.wikiq_out_name =  self.wiki + ".tsv"
 
 271         self.infile = "{0}.xml.7z".format(self.wiki)
 
 272         self.base_call = "../wikiq {0} --stdout"
 
 273         self.input_dir = "dumps"
 
 274         self.input_file = os.path.join(".", self.input_dir,self.infile)
 
 275         self.baseline_output_dir = "baseline_output"
 
 277     def test_noargs(self):
 
 279         call = self.base_call.format(self.input_file)
 
 281         proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)
 
 282         outs = proc.stdout.decode("utf8")
 
 284         test_file = "noargs_" + self.wikiq_out_name
 
 285         baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
 
 287         test = pd.read_table(StringIO(outs))
 
 288         baseline = pd.read_table(baseline_file)
 
 289         assert_frame_equal(test,baseline, check_like=True)
 
 291 class Test_Regex(unittest.TestCase):
 
 294         self.wiki = 'regextest'
 
 295         self.wikiq_out_name = self.wiki + '.tsv'
 
 296         self.infile = "{0}.xml.bz2".format(self.wiki)
 
 298         self.input_dir = "dumps"
 
 299         self.input_file = os.path.join(".", self.input_dir,self.infile)
 
 301         if not os.path.exists("test_output"):
 
 302             os.mkdir("test_output")
 
 304         self.test_output_dir = os.path.join(".", "test_output")
 
 305         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
 
 306         # we have two base calls, one for checking inputs and the other for checking outputs
 
 307         self.base_call = "../wikiq {0}"
 
 308         self.base_call_outs = "../wikiq {0} -o {1}"
 
 310         self.baseline_output_dir = "baseline_output"
 
 312         # sample inputs for checking that bad inputs get terminated / test_regex_inputs
 
 313         self.bad_inputs_list = [
 
 316             #number of reg and number of labels do not match 
 
 317             "-RP 'NPO V' -RP THE -RPl testlabel",
 
 319             "-CP '(Tamil|Li)' -RPl testlabel",
 
 322             "-RP '\\b\\w{3}\\b' -RPl threeletters -CP '\\b\\w{3}\\b'"
 
 325         # sample inputs for checking the outcomes of good inputs / test_basic_regex
 
 326         self.good_inputs_list = [
 
 327             "-RP '\\b\\d{3}\\b' -RPl threedigits",
 
 328             "-RP 'TestCase' -RP 'page' -RPl testcases -RPl page_word",
 
 329             "-CP 'Chevalier' -CPl chev_com -RP 'welcome to Wikipedia' -RPl wiki_welcome -CP 'Warning' -CPl warning",
 
 330             "-CP 'WP:EVADE' -CPl wp_evade"         
 
 334         self.cap_inputs_list = [
 
 335             "-RP 'Li Chevalier' -RPl li_cheval -CP '(?P<letter>\\b[a-zA-Z]{3}\\b)|(?P<number>\\b\\d+\\b)|(?P<cat>\\bcat\\b)' -CPl three",
 
 336             "-CP '(?P<a>\\bTestCaseA\\b)|(?P<b>\\bTestCaseB\\b)|(?P<c>\\bTestCaseC\\b)|(?P<d>\\bTestCaseD\\b)' -CPl testcase -RP '(?P<npov>npov|NPOV)|(?P<neutral>neutral point of view)' -RPl npov"
 
 341     def test_regex_inputs(self):
 
 342         for input in self.bad_inputs_list:
 
 343             call = self.base_call.format(self.input_file)
 
 344             call = call + " --stdout " + input
 
 346             proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
 
 347             stdout,stderr = proc.communicate()
 
 348             #print(proc.returncode)
 
 350             # we want to check that the bad inputs were caught and sys.exit is stopping the code
 
 351             print(stderr.decode("utf-8"))
 
 352             self.assertNotEqual(proc.returncode,0)
 
 354     def test_basic_regex(self):
 
 355         for i, input in enumerate(self.good_inputs_list):
 
 357             test_filename = "basic_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i))
 
 358             #print(test_filename)
 
 359             test_file = os.path.join(self.test_output_dir, test_filename)
 
 360             if os.path.exists(test_file):
 
 363             call = self.base_call_outs.format(self.input_file, self.test_output_dir)
 
 364             call = call + " " + input
 
 367             proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
 
 369             copyfile(self.call_output, test_file)
 
 371             test = pd.read_table(test_file)
 
 373             baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
 374             baseline = pd.read_table(baseline_file)
 
 375             assert_frame_equal(test, baseline, check_like=True)
 
 379     def test_capturegroup_regex(self):
 
 380         for i, input in enumerate(self.cap_inputs_list):
 
 381             test_filename = "capturegroup_{0}_{1}.tsv".format(self.wikiq_out_name[:-4], str(i))
 
 383             test_file = os.path.join(self.test_output_dir, test_filename)
 
 384             if os.path.exists(test_file):
 
 387             call = self.base_call_outs.format(self.input_file, self.test_output_dir)
 
 388             call = call + " " + input
 
 391             proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE,shell=True)
 
 394             copyfile(self.call_output, test_file)
 
 396             test = pd.read_table(test_file)
 
 398             baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
 399             baseline = pd.read_table(baseline_file)
 
 400             assert_frame_equal(test, baseline, check_like=True)
 
 403 if __name__ == '__main__':