5 from shutil import copyfile
 
   8 # with / without pwr DONE
 
   9 # with / without url encode DONE
 
  10 # with / without collapse user DONE
 
  11 # with output to sdtout DONE
 
  12 # note that the persistence radius is 7 by default
 
  13 # reading various file formats including
 
  14 #        7z, gz, bz2, xml  DONE
 
  15 # wikia and wikipedia data DONE
 
  18 class Test_Wikipedia(unittest.TestCase):
 
  20         if not os.path.exists("test_output"):
 
  21             os.mkdir("test_output")
 
  23         self.wiki = 'ikwiki-20180301-pages-meta-history'
 
  24         self.wikiq_out_name =  self.wiki + ".tsv"
 
  25         self.test_output_dir = os.path.join(".", "test_output")
 
  26         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
 
  28         self.infile = "{0}.xml.bz2".format(self.wiki)    
 
  29         self.base_call = "../wikiq {0} -o {1}"
 
  30         self.input_dir = "dumps"
 
  31         self.input_file = os.path.join(".", self.input_dir,self.infile)
 
  32         self.baseline_output_dir = "baseline_output"
 
  34     def test_WP_url_encode(self):
 
  35         call = self.base_call.format(self.input_file, self.test_output_dir)
 
  36         call = call + " --url-encode"
 
  37         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
 
  39         test_file =  "url-encode_" + self.wikiq_out_name
 
  40         copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
 
  41         baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
 
  43         test_lines = open(os.path.join(self.test_output_dir,test_file))
 
  44         baseline_lines = open(baseline_file)
 
  45         for test, baseline in zip(test_lines, baseline_lines):
 
  46             self.assertEqual(test,baseline)
 
  49         baseline_lines.close()
 
  52 class Test_Basic(unittest.TestCase):
 
  55         if not os.path.exists("test_output"):
 
  56             os.mkdir("test_output")
 
  58         self.wiki = 'sailormoon'
 
  59         self.wikiq_out_name =  self.wiki + ".tsv"
 
  60         self.test_output_dir = os.path.join(".", "test_output")
 
  61         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
 
  63         self.infile = "{0}.xml.7z".format(self.wiki)
 
  64         self.base_call = "../wikiq {0} -o {1}"
 
  65         self.input_dir = "dumps"
 
  66         self.input_file = os.path.join(".", self.input_dir,self.infile)
 
  67         self.baseline_output_dir = "baseline_output"
 
  69     def test_noargs(self):
 
  71         call = self.base_call.format(self.input_file, self.test_output_dir)
 
  72         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
 
  74         test_file =  "noargs_" + self.wikiq_out_name
 
  75         copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
 
  77         baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
 
  79         test_lines = open(os.path.join(self.test_output_dir,test_file))
 
  80         baseline_lines = open(baseline_file)
 
  81         for test, baseline in zip(test_lines, baseline_lines):
 
  82             self.assertEqual(test,baseline)
 
  85         baseline_lines.close()
 
  87     def test_collapse_user(self):
 
  88         call = self.base_call.format(self.input_file, self.test_output_dir)
 
  89         call = call + " --collapse-user"
 
  91         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
 
  94         test_file =  "collapse-user_" + self.wikiq_out_name
 
  95         copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
 
  97         baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
 
  99         test_lines = open(os.path.join(self.test_output_dir,test_file))
 
 100         baseline_lines = open(baseline_file)
 
 101         for test, baseline in zip(test_lines, baseline_lines):
 
 102             self.assertEqual(test,baseline)
 
 105         baseline_lines.close()
 
 108         call = self.base_call.format(self.input_file, self.test_output_dir)
 
 109         call = call + " --persistence"
 
 110         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
 
 113         test_file =  "persistence_" + self.wikiq_out_name
 
 114         copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
 
 116         baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
 
 118         test_lines = open(os.path.join(self.test_output_dir,test_file))
 
 119         baseline_lines = open(baseline_file)
 
 120         for test, baseline in zip(test_lines, baseline_lines):
 
 121             self.assertEqual(test,baseline)
 
 124         baseline_lines.close()
 
 126     def test_url_encode(self):
 
 127         call = self.base_call.format(self.input_file, self.test_output_dir)
 
 128         call = call + " --url-encode"
 
 129         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
 
 131         test_file =  "url-encode_" + self.wikiq_out_name
 
 132         copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
 
 133         baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
 
 135         test_lines = open(os.path.join(self.test_output_dir,test_file))
 
 136         baseline_lines = open(baseline_file)
 
 137         for test, baseline in zip(test_lines, baseline_lines):
 
 138             self.assertEqual(test,baseline)
 
 141         baseline_lines.close()
 
 143 class Test_Malformed(unittest.TestCase):
 
 146         if not os.path.exists("test_output"):
 
 147             os.mkdir("test_output")
 
 149         self.wiki = 'twinpeaks'
 
 150         self.wikiq_out_name =  self.wiki + ".tsv"
 
 151         self.test_output_dir = os.path.join(".", "test_output")
 
 152         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
 
 154         self.infile = "{0}.xml.7z".format(self.wiki)
 
 155         self.base_call = "../wikiq {0} -o {1}"
 
 156         self.input_dir = "dumps"
 
 157         self.input_file = os.path.join(".", self.input_dir,self.infile)
 
 160     def test_malformed_noargs(self):
 
 162         call = self.base_call.format(self.input_file, self.test_output_dir)
 
 163         proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
 
 165         outs, errs = proc.communicate()
 
 166         errlines = str(errs).split("\\n")
 
 167         self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
 
 169 class Test_Stdout(unittest.TestCase):
 
 172         self.wiki = 'sailormoon'
 
 173         self.wikiq_out_name =  self.wiki + ".tsv"
 
 175         self.infile = "{0}.xml.7z".format(self.wiki)
 
 176         self.base_call = "../wikiq {0} --stdout"
 
 177         self.input_dir = "dumps"
 
 178         self.input_file = os.path.join(".", self.input_dir,self.infile)
 
 179         self.baseline_output_dir = "baseline_output"
 
 181     def test_noargs(self):
 
 183         call = self.base_call.format(self.input_file)
 
 184         proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)
 
 185         outs = proc.stdout.decode('utf-8')
 
 187         test_file = "noargs_" + self.wikiq_out_name
 
 188         baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
 
 190         test_lines = outs.splitlines(True)
 
 191         baseline_lines = open(baseline_file)
 
 192         for test, baseline in zip(test_lines, baseline_lines):
 
 193             self.assertEqual(test,baseline)
 
 195         # test_file =  "noargs_" + self.wikiq_out_name
 
 196         # copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
 
 198         # baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
 
 200         # test_lines = open(os.path.join(self.test_output_dir,test_file))
 
 201         # baseline_lines = open(baseline_file)
 
 202         # for test, baseline in zip(test_lines, baseline_lines):
 
 203         #     self.assertEqual(test,baseline)
 
 205         # baseline_lines.close()
 
 208 if __name__ == '__main__':