]> code.communitydata.science - mediawiki_dump_tools.git/blobdiff - test/Wikiq_Unit_Test.py
migrate to mwpersistence. this fixes many issues. We preserve legacy persistence...
[mediawiki_dump_tools.git] / test / Wikiq_Unit_Test.py
index 3a1e24beb65795d8b904f3c56fcb36284379a3a1..ab0a53255ae553034a22c354ec69f86c6466ca45 100644 (file)
@@ -1,17 +1,58 @@
 import unittest
 import os
 import sys
+import subprocess
 from shutil import copyfile
+import pdb
 
 # with / without pwr DONE
 # with / without url encode DONE
 # with / without collapse user DONE
-# with output to sdtout
+# with output to sdtout DONE
 # note that the persistence radius is 7 by default
 # reading various file formats including
-#        7z, gz, bz2, xml 
-# wikia and wikipedia data
-# malformed xmls
+#        7z, gz, bz2, xml  DONE
+# wikia and wikipedia data DONE
+# malformed xmls DONE
+
+class Test_Wikipedia(unittest.TestCase):
+    def setUp(self):
+        if not os.path.exists("test_output"):
+            os.mkdir("test_output")
+
+        self.wiki = 'ikwiki-20180301-pages-meta-history'
+        self.wikiq_out_name =  self.wiki + ".tsv"
+        self.test_output_dir = os.path.join(".", "test_output")
+        self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
+
+        self.infile = "{0}.xml.bz2".format(self.wiki)    
+        self.base_call = "../wikiq {0} -o {1}"
+        self.input_dir = "dumps"
+        self.input_file = os.path.join(".", self.input_dir,self.infile)
+        self.baseline_output_dir = "baseline_output"
+
+    def test_WP_url_encode(self):
+        test_filename =  "url-encode_" + self.wikiq_out_name
+        test_file = os.path.join(self.test_output_dir, test_filename)
+        if os.path.exists(test_file):
+            os.remove(test_file)
+        
+        call = self.base_call.format(self.input_file, self.test_output_dir)
+        call = call + " --url-encode"
+        proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
+        proc.wait()
+
+        copyfile(self.call_output, test_file)
+        baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
+
+        test_lines = open(test_file)
+        baseline_lines = open(baseline_file)
+        for test, baseline in zip(test_lines, baseline_lines):
+            self.assertEqual(test,baseline)
+
+        test_lines.close()
+        baseline_lines.close()
+
 
 class Test_Basic(unittest.TestCase):
 
@@ -32,35 +73,44 @@ class Test_Basic(unittest.TestCase):
 
     def test_noargs(self):
 
+        test_filename =  "noargs_" + self.wikiq_out_name
+        test_file = os.path.join(self.test_output_dir, test_filename)
+        if os.path.exists(test_file):
+            os.remove(test_file)
+        
         call = self.base_call.format(self.input_file, self.test_output_dir)
-        os.system(call)
+        proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
+        proc.wait()
 
-        test_file =  "noargs_" + self.wikiq_out_name
-        copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
+        copyfile(self.call_output, test_file)
 
-        baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
+        baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
-        test_lines = open(os.path.join(self.test_output_dir,test_file))
+        test_lines = open(test_file)
         baseline_lines = open(baseline_file)
         for test, baseline in zip(test_lines, baseline_lines):
-            self.assertEqual(test,baseline)
+            self.assertEqual(test, baseline)
 
         test_lines.close()
         baseline_lines.close()
 
     def test_collapse_user(self):
+        test_filename =  "collapse-user_" + self.wikiq_out_name
+        test_file = os.path.join(self.test_output_dir, test_filename)
+        if os.path.exists(test_file):
+            os.remove(test_file)
+        
         call = self.base_call.format(self.input_file, self.test_output_dir)
-        os.system(call)
         call = call + " --collapse-user"
 
-        os.system(call)
+        proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
+        proc.wait()
 
-        test_file =  "collapse-user_" + self.wikiq_out_name
-        copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
+        copyfile(self.call_output, test_file)
 
-        baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
+        baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
-        test_lines = open(os.path.join(self.test_output_dir,test_file))
+        test_lines = open(test_file)
         baseline_lines = open(baseline_file)
         for test, baseline in zip(test_lines, baseline_lines):
             self.assertEqual(test,baseline)
@@ -68,17 +118,23 @@ class Test_Basic(unittest.TestCase):
         test_lines.close()
         baseline_lines.close()
 
-    def test_pwr(self):
+    def test_pwr_legacy(self):
+        test_filename =  "persistence_legacy_" + self.wikiq_out_name
+        test_file = os.path.join(self.test_output_dir, test_filename)
+        if os.path.exists(test_file):
+            os.remove(test_file)
+        
         call = self.base_call.format(self.input_file, self.test_output_dir)
-        call = call + " --persistence"
-        os.system(call)
+        call = call + " --persistence-legacy"
+        proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
+        proc.wait()
 
-        test_file =  "persistence_" + self.wikiq_out_name
-        copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
 
-        baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
+        copyfile(self.call_output, test_file)
 
-        test_lines = open(os.path.join(self.test_output_dir,test_file))
+        baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
+
+        test_lines = open(test_file)
         baseline_lines = open(baseline_file)
         for test, baseline in zip(test_lines, baseline_lines):
             self.assertEqual(test,baseline)
@@ -87,14 +143,21 @@ class Test_Basic(unittest.TestCase):
         baseline_lines.close()
 
     def test_url_encode(self):
+        test_filename =  "url-encode_" + self.wikiq_out_name
+
+        test_file = os.path.join(self.test_output_dir, test_filename)
+        if os.path.exists(test_file):
+            os.remove(test_file)
+        
         call = self.base_call.format(self.input_file, self.test_output_dir)
         call = call + " --url-encode"
-        os.system(call)
-        test_file =  "url-encode_" + self.wikiq_out_name
-        copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
-        baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
+        proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
+        proc.wait()
+
+        copyfile(self.call_output, test_file)
+        baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
-        test_lines = open(os.path.join(self.test_output_dir,test_file))
+        test_lines = open(test_file)
         baseline_lines = open(baseline_file)
         for test, baseline in zip(test_lines, baseline_lines):
             self.assertEqual(test,baseline)
@@ -102,6 +165,83 @@ class Test_Basic(unittest.TestCase):
         test_lines.close()
         baseline_lines.close()
 
+class Test_Malformed(unittest.TestCase):
+
+    def setUp(self):
+        if not os.path.exists("test_output"):
+            os.mkdir("test_output")
+
+        self.wiki = 'twinpeaks'
+        self.wikiq_out_name =  self.wiki + ".tsv"
+        self.test_output_dir = os.path.join(".", "test_output")
+        self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
+
+        self.infile = "{0}.xml.7z".format(self.wiki)
+        self.base_call = "../wikiq {0} -o {1}"
+        self.input_dir = "dumps"
+        self.input_file = os.path.join(".", self.input_dir,self.infile)
+
+
+    def test_malformed_noargs(self):
+
+        call = self.base_call.format(self.input_file, self.test_output_dir)
+        proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
+        proc.wait()
+        outs, errs = proc.communicate()
+        errlines = str(errs).split("\\n")
+        self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
+
+class Test_Stdout(unittest.TestCase):
+
+    def setUp(self):
+        self.wiki = 'sailormoon'
+        self.wikiq_out_name =  self.wiki + ".tsv"
+
+        self.infile = "{0}.xml.7z".format(self.wiki)
+        self.base_call = "../wikiq {0} --stdout"
+        self.input_dir = "dumps"
+        self.input_file = os.path.join(".", self.input_dir,self.infile)
+        self.baseline_output_dir = "baseline_output"
+
+    def test_noargs(self):
+
+        call = self.base_call.format(self.input_file)
+        proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)
+        outs = proc.stdout.decode('utf-8')
+        
+        test_file = "noargs_" + self.wikiq_out_name
+        baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
+
+        test_lines = outs.splitlines(True)
+        baseline_lines = open(baseline_file)
+        for test, baseline in zip(test_lines, baseline_lines):
+            self.assertEqual(test,baseline)
+
+    def test_persistence(self):
+
+        call = self.base_call.format(self.input_file) + " --persistence"
+        proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)
+        outs = proc.stdout.decode('utf-8')
+        
+        test_file = "persistence_" + self.wikiq_out_name
+        baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
+
+        test_lines = outs.splitlines(True)
+        baseline_lines = open(baseline_file)
+        for test, baseline in zip(test_lines, baseline_lines):
+            self.assertEqual(test,baseline)
+
+        # test_file =  "noargs_" + self.wikiq_out_name
+        # copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
+
+        # baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
+
+        # test_lines = open(os.path.join(self.test_output_dir,test_file))
+        # baseline_lines = open(baseline_file)
+        # for test, baseline in zip(test_lines, baseline_lines):
+        #     self.assertEqual(test,baseline)
+        # test_lines.close()
+        # baseline_lines.close()
 
         
 if __name__ == '__main__':

Community Data Science Collective || Want to submit a patch?