4 from shutil import copyfile
 
   6 from pandas.util.testing import assert_frame_equal
 
   7 from io import StringIO
 
   9 # with / without pwr DONE
 
  10 # with / without url encode DONE
 
  11 # with / without collapse user DONE
 
  12 # with output to sdtout DONE
 
  13 # note that the persistence radius is 7 by default
 
  14 # reading various file formats including
 
  15 #        7z, gz, bz2, xml  DONE
 
  16 # wikia and wikipedia data DONE
 
  19 class Test_Wikipedia(unittest.TestCase):
 
  21         if not os.path.exists("test_output"):
 
  22             os.mkdir("test_output")
 
  24         self.wiki = 'ikwiki-20180301-pages-meta-history'
 
  25         self.wikiq_out_name =  self.wiki + ".tsv"
 
  26         self.test_output_dir = os.path.join(".", "test_output")
 
  27         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
 
  29         self.infile = "{0}.xml.bz2".format(self.wiki)    
 
  30         self.base_call = "../wikiq {0} -o {1}"
 
  31         self.input_dir = "dumps"
 
  32         self.input_file = os.path.join(".", self.input_dir,self.infile)
 
  33         self.baseline_output_dir = "baseline_output"
 
  35     def test_WP_url_encode(self):
 
  36         test_filename =  "url-encode_" + self.wikiq_out_name
 
  37         test_file = os.path.join(self.test_output_dir, test_filename)
 
  38         if os.path.exists(test_file):
 
  41         call = self.base_call.format(self.input_file, self.test_output_dir)
 
  42         call = call + " --url-encode"
 
  43         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
 
  46         copyfile(self.call_output, test_file)
 
  47         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
  49         # as a test let's make sure that we get equal data frames
 
  50         test = pd.read_table(test_file)
 
  51         baseline = pd.read_table(baseline_file)
 
  52         assert_frame_equal(test,baseline)
 
  55 class Test_Basic(unittest.TestCase):
 
  58         if not os.path.exists("test_output"):
 
  59             os.mkdir("test_output")
 
  61         self.wiki = 'sailormoon'
 
  62         self.wikiq_out_name =  self.wiki + ".tsv"
 
  63         self.test_output_dir = os.path.join(".", "test_output")
 
  64         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
 
  66         self.infile = "{0}.xml.7z".format(self.wiki)
 
  67         self.base_call = "../wikiq {0} -o {1}"
 
  68         self.input_dir = "dumps"
 
  69         self.input_file = os.path.join(".", self.input_dir,self.infile)
 
  70         self.baseline_output_dir = "baseline_output"
 
  72     def test_noargs(self):
 
  74         test_filename =  "noargs_" + self.wikiq_out_name
 
  75         test_file = os.path.join(self.test_output_dir, test_filename)
 
  76         if os.path.exists(test_file):
 
  79         call = self.base_call.format(self.input_file, self.test_output_dir)
 
  80         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
 
  83         copyfile(self.call_output, test_file)
 
  85         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
  87         test = pd.read_table(test_file)
 
  88         baseline = pd.read_table(baseline_file)
 
  89         assert_frame_equal(test,baseline)
 
  92     def test_collapse_user(self):
 
  93         test_filename =  "collapse-user_" + self.wikiq_out_name
 
  94         test_file = os.path.join(self.test_output_dir, test_filename)
 
  95         if os.path.exists(test_file):
 
  98         call = self.base_call.format(self.input_file, self.test_output_dir)
 
  99         call = call + " --collapse-user"
 
 101         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
 
 104         copyfile(self.call_output, test_file)
 
 106         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
 107         test = pd.read_table(test_file)
 
 108         baseline = pd.read_table(baseline_file)
 
 109         assert_frame_equal(test,baseline)
 
 111     def test_pwr_legacy(self):
 
 112         test_filename =  "persistence_legacy_" + self.wikiq_out_name
 
 113         test_file = os.path.join(self.test_output_dir, test_filename)
 
 114         if os.path.exists(test_file):
 
 117         call = self.base_call.format(self.input_file, self.test_output_dir)
 
 118         call = call + " --persistence-legacy"
 
 119         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
 
 123         copyfile(self.call_output, test_file)
 
 125         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
 127         test = pd.read_table(test_file)
 
 128         baseline = pd.read_table(baseline_file)
 
 129         assert_frame_equal(test,baseline)
 
 132         test_filename =  "persistence_" + self.wikiq_out_name
 
 133         test_file = os.path.join(self.test_output_dir, test_filename)
 
 134         if os.path.exists(test_file):
 
 137         call = self.base_call.format(self.input_file, self.test_output_dir)
 
 138         call = call + " --persistence"
 
 139         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
 
 143         copyfile(self.call_output, test_file)
 
 145         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
 147         test = pd.read_table(test_file)
 
 148         baseline = pd.read_table(baseline_file)
 
 149         assert_frame_equal(test,baseline)
 
 152     def test_url_encode(self):
 
 153         test_filename =  "url-encode_" + self.wikiq_out_name
 
 155         test_file = os.path.join(self.test_output_dir, test_filename)
 
 156         if os.path.exists(test_file):
 
 159         call = self.base_call.format(self.input_file, self.test_output_dir)
 
 160         call = call + " --url-encode"
 
 161         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
 
 164         copyfile(self.call_output, test_file)
 
 165         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
 166         test = pd.read_table(test_file)
 
 167         baseline = pd.read_table(baseline_file)
 
 168         assert_frame_equal(test,baseline)
 
 171 class Test_Malformed(unittest.TestCase):
 
 174         if not os.path.exists("test_output"):
 
 175             os.mkdir("test_output")
 
 177         self.wiki = 'twinpeaks'
 
 178         self.wikiq_out_name =  self.wiki + ".tsv"
 
 179         self.test_output_dir = os.path.join(".", "test_output")
 
 180         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
 
 182         self.infile = "{0}.xml.7z".format(self.wiki)
 
 183         self.base_call = "../wikiq {0} -o {1}"
 
 184         self.input_dir = "dumps"
 
 185         self.input_file = os.path.join(".", self.input_dir,self.infile)
 
 188     def test_malformed_noargs(self):
 
 190         call = self.base_call.format(self.input_file, self.test_output_dir)
 
 191         proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
 
 193         outs, errs = proc.communicate()
 
 194         errlines = str(errs).split("\\n")
 
 195         self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
 
 197 class Test_Stdout(unittest.TestCase):
 
 200         self.wiki = 'sailormoon'
 
 201         self.wikiq_out_name =  self.wiki + ".tsv"
 
 203         self.infile = "{0}.xml.7z".format(self.wiki)
 
 204         self.base_call = "../wikiq {0} --stdout"
 
 205         self.input_dir = "dumps"
 
 206         self.input_file = os.path.join(".", self.input_dir,self.infile)
 
 207         self.baseline_output_dir = "baseline_output"
 
 209     def test_noargs(self):
 
 211         call = self.base_call.format(self.input_file)
 
 212         proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)
 
 213         outs = proc.stdout.decode("utf8")
 
 215         test_file = "noargs_" + self.wikiq_out_name
 
 216         baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
 
 218         test = pd.read_table(StringIO(outs))
 
 219         baseline = pd.read_table(baseline_file)
 
 220         assert_frame_equal(test,baseline)
 
 222 if __name__ == '__main__':