]> code.communitydata.science - mediawiki_dump_tools.git/blobdiff - test/Wikiq_Unit_Test.py
checking in work to deepen migration to new mediawikiutils
[mediawiki_dump_tools.git] / test / Wikiq_Unit_Test.py
index b32d6a9f9b391779fbd78296df4d24dbcd2e8565..e8937184593fe2e2b872c0a25fbdca31fb14f410 100644 (file)
@@ -1,9 +1,10 @@
 import unittest
 import os
-import sys
 import subprocess
 from shutil import copyfile
-import pdb
+import pandas as pd
+from pandas.util.testing import assert_frame_equal
+from io import StringIO
 
 # with / without pwr DONE
 # with / without url encode DONE
@@ -15,195 +16,290 @@ import pdb
 # wikia and wikipedia data DONE
 # malformed xmls DONE
 
-class Test_Wikipedia(unittest.TestCase):
-    def setUp(self):
-        if not os.path.exists("test_output"):
-            os.mkdir("test_output")
+# class Test_Persistence_Bug(unittest.TestCase):
 
-        self.wiki = 'ikwiki-20180301-pages-meta-history'
-        self.wikiq_out_name =  self.wiki + ".tsv"
-        self.test_output_dir = os.path.join(".", "test_output")
-        self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
+#     def setUp(self):
+#         if not os.path.exists("test_output"):
+#             os.mkdir("test_output")
 
-        self.infile = "{0}.xml.bz2".format(self.wiki)    
-        self.base_call = "../wikiq {0} -o {1}"
-        self.input_dir = "dumps"
-        self.input_file = os.path.join(".", self.input_dir,self.infile)
-        self.baseline_output_dir = "baseline_output"
+#         self.wiki = 'enwiki-test'
+#         self.wikiq_out_name =  self.wiki + ".tsv"
+#         self.test_output_dir = os.path.join(".", "test_output")
+#         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
 
-    def test_WP_url_encode(self):
-        call = self.base_call.format(self.input_file, self.test_output_dir)
-        call = call + " --url-encode"
-        proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
-        proc.wait()
-        test_file =  "url-encode_" + self.wikiq_out_name
-        copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
-        baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
+#         self.infile = "{0}.xml".format(self.wiki)    
+#         self.base_call = "../wikiq {0} -o {1}"
+#         self.input_dir = "dumps"
+#         self.input_file = os.path.join(".", self.input_dir,self.infile)
+#         self.baseline_output_dir = "baseline_output"
 
-        test_lines = open(os.path.join(self.test_output_dir,test_file))
-        baseline_lines = open(baseline_file)
-        for test, baseline in zip(test_lines, baseline_lines):
-            self.assertEqual(test,baseline)
+#     def test_segment_persistence(self):
+#         test_filename =  "sequence-" + self.wikiq_out_name
+#         test_file = os.path.join(self.test_output_dir, test_filename)
+#         if os.path.exists(test_file):
+#             os.remove(test_file)
+        
+#         call = self.base_call.format(self.input_file, self.test_output_dir)
+#         call = call + " --url-encode --persistence sequence --collapse-user"
+#         print(os.path.abspath('.'))
+#         print(call)
+#         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
+#         proc.wait()
 
-        test_lines.close()
-        baseline_lines.close()
+#         copyfile(self.call_output, test_file)
+#         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
+#         # as a test let's make sure that we get equal data frames
+#         test = pd.read_table(test_file)
+#         baseline = pd.read_table(baseline_file)
+#         assert_frame_equal(test,baseline)
 
-class Test_Basic(unittest.TestCase):
 
+class Test_Wikipedia(unittest.TestCase):
     def setUp(self):
         if not os.path.exists("test_output"):
             os.mkdir("test_output")
 
-        self.wiki = 'sailormoon'
+        self.wiki = 'ikwiki-20180301-pages-meta-history'
         self.wikiq_out_name =  self.wiki + ".tsv"
         self.test_output_dir = os.path.join(".", "test_output")
         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
 
-        self.infile = "{0}.xml.7z".format(self.wiki)
+        self.infile = "{0}.xml.bz2".format(self.wiki)    
         self.base_call = "../wikiq {0} -o {1}"
         self.input_dir = "dumps"
         self.input_file = os.path.join(".", self.input_dir,self.infile)
         self.baseline_output_dir = "baseline_output"
 
-    def test_noargs(self):
-
+    def test_WP_url_encode(self):
+        test_filename =  "url-encode_" + self.wikiq_out_name
+        test_file = os.path.join(self.test_output_dir, test_filename)
+        if os.path.exists(test_file):
+            os.remove(test_file)
+        
         call = self.base_call.format(self.input_file, self.test_output_dir)
+        call = call + " --url-encode"
         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
         proc.wait()
-        test_file =  "noargs_" + self.wikiq_out_name
-        copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
 
-        baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
+        copyfile(self.call_output, test_file)
+        baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
-        test_lines = open(os.path.join(self.test_output_dir,test_file))
-        baseline_lines = open(baseline_file)
-        for test, baseline in zip(test_lines, baseline_lines):
-            self.assertEqual(test,baseline)
+        # as a test let's make sure that we get equal data frames
 
-        test_lines.close()
-        baseline_lines.close()
+        test = pd.read_table(test_file)
+        baseline = pd.read_table(baseline_file)
+        assert_frame_equal(test,baseline)
 
-    def test_collapse_user(self):
+    def test_WP_namespaces(self):
+        print(os.path.abspath('.'))
+        test_filename =  "namespaces_" + self.wikiq_out_name
+        test_file = os.path.join(self.test_output_dir, test_filename)
+        if os.path.exists(test_file):
+            os.remove(test_file)
+        
         call = self.base_call.format(self.input_file, self.test_output_dir)
-        call = call + " --collapse-user"
-
+        call = call + " -n 0 -n 1"
+        print(call)
         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
         proc.wait()
+#        copyfile(self.call_output, test_file)
+        baseline_file = os.path.join(os.path.abspath("."), self.baseline_output_dir, test_filename)
 
-        test_file =  "collapse-user_" + self.wikiq_out_name
-        copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
+        # as a test let's make sure that we get equal data frames
+        test = pd.read_table(test_file)
+        num_wrong_ns = sum(~ test.namespace.isin({0,1}))
+        self.assertEqual(num_wrong_ns, 0)
+        baseline = pd.read_table(baseline_file)
+        assert_frame_equal(test,baseline)
 
-        baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
 
-        test_lines = open(os.path.join(self.test_output_dir,test_file))
-        baseline_lines = open(baseline_file)
-        for test, baseline in zip(test_lines, baseline_lines):
-            self.assertEqual(test,baseline)
+# class Test_Basic(unittest.TestCase):
 
-        test_lines.close()
-        baseline_lines.close()
+#     def setUp(self):
+#         if not os.path.exists("test_output"):
+#             os.mkdir("test_output")
 
-    def test_pwr(self):
-        call = self.base_call.format(self.input_file, self.test_output_dir)
-        call = call + " --persistence"
-        proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
-        proc.wait()
+#         self.wiki = 'sailormoon'
+#         self.wikiq_out_name =  self.wiki + ".tsv"
+#         self.test_output_dir = os.path.join(".", "test_output")
+#         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
 
-        test_file =  "persistence_" + self.wikiq_out_name
-        copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
+#         self.infile = "{0}.xml.7z".format(self.wiki)
+#         self.base_call = "../wikiq {0} -o {1}"
+#         self.input_dir = "dumps"
+#         self.input_file = os.path.join(".", self.input_dir,self.infile)
+#         self.baseline_output_dir = "baseline_output"
 
-        baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
+#     def test_noargs(self):
 
-        test_lines = open(os.path.join(self.test_output_dir,test_file))
-        baseline_lines = open(baseline_file)
-        for test, baseline in zip(test_lines, baseline_lines):
-            self.assertEqual(test,baseline)
+#         test_filename =  "noargs_" + self.wikiq_out_name
+#         test_file = os.path.join(self.test_output_dir, test_filename)
+#         if os.path.exists(test_file):
+#             os.remove(test_file)
+        
+#         call = self.base_call.format(self.input_file, self.test_output_dir)
+#         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
+#         proc.wait()
 
-        test_lines.close()
-        baseline_lines.close()
+#         copyfile(self.call_output, test_file)
 
-    def test_url_encode(self):
-        call = self.base_call.format(self.input_file, self.test_output_dir)
-        call = call + " --url-encode"
-        proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
-        proc.wait()
-        test_file =  "url-encode_" + self.wikiq_out_name
-        copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
-        baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
+#         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
-        test_lines = open(os.path.join(self.test_output_dir,test_file))
-        baseline_lines = open(baseline_file)
-        for test, baseline in zip(test_lines, baseline_lines):
-            self.assertEqual(test,baseline)
+#         test = pd.read_table(test_file)
+#         baseline = pd.read_table(baseline_file)
+#         assert_frame_equal(test,baseline)
 
-        test_lines.close()
-        baseline_lines.close()
 
-class Test_Malformed(unittest.TestCase):
+#     def test_collapse_user(self):
+#         test_filename =  "collapse-user_" + self.wikiq_out_name
+#         test_file = os.path.join(self.test_output_dir, test_filename)
+#         if os.path.exists(test_file):
+#             os.remove(test_file)
+        
+#         call = self.base_call.format(self.input_file, self.test_output_dir)
+#         call = call + " --collapse-user"
 
-    def setUp(self):
-        if not os.path.exists("test_output"):
-            os.mkdir("test_output")
+#         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
+#         proc.wait()
 
-        self.wiki = 'twinpeaks'
-        self.wikiq_out_name =  self.wiki + ".tsv"
-        self.test_output_dir = os.path.join(".", "test_output")
-        self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
+#         copyfile(self.call_output, test_file)
 
-        self.infile = "{0}.xml.7z".format(self.wiki)
-        self.base_call = "../wikiq {0} -o {1}"
-        self.input_dir = "dumps"
-        self.input_file = os.path.join(".", self.input_dir,self.infile)
+#         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
+#         test = pd.read_table(test_file)
+#         baseline = pd.read_table(baseline_file)
+#         assert_frame_equal(test,baseline)
 
+#     def test_pwr_segment(self):
+#         test_filename =  "persistence_segment_" + self.wikiq_out_name
+#         test_file = os.path.join(self.test_output_dir, test_filename)
+#         if os.path.exists(test_file):
+#             os.remove(test_file)
+        
+#         call = self.base_call.format(self.input_file, self.test_output_dir)
+#         call = call + " --persistence segment"
+#         print(call)
+#         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
+#         proc.wait()
 
-    def test_malformed_noargs(self):
 
-        call = self.base_call.format(self.input_file, self.test_output_dir)
-        proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
-        proc.wait()
-        outs, errs = proc.communicate()
-        errlines = str(errs).split("\\n")
-        self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
+#         copyfile(self.call_output, test_file)
 
-class Test_Stdout(unittest.TestCase):
+#         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
-    def setUp(self):
-        self.wiki = 'sailormoon'
-        self.wikiq_out_name =  self.wiki + ".tsv"
+#         test = pd.read_table(test_file)
+#         print(test)
+#         baseline = pd.read_table(baseline_file)
+#         assert_frame_equal(test,baseline)
 
-        self.infile = "{0}.xml.7z".format(self.wiki)
-        self.base_call = "../wikiq {0} --stdout"
-        self.input_dir = "dumps"
-        self.input_file = os.path.join(".", self.input_dir,self.infile)
-        self.baseline_output_dir = "baseline_output"
+#     def test_pwr_legacy(self):
+#         test_filename =  "persistence_legacy_" + self.wikiq_out_name
+#         test_file = os.path.join(self.test_output_dir, test_filename)
+#         if os.path.exists(test_file):
+#             os.remove(test_file)
+        
+#         call = self.base_call.format(self.input_file, self.test_output_dir)
+#         call = call + " --persistence legacy"
+#         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
+#         proc.wait()
+
+#         copyfile(self.call_output, test_file)
+
+#         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
-    def test_noargs(self):
+#         test = pd.read_table(test_file)
+#         baseline = pd.read_table(baseline_file)
+#         assert_frame_equal(test,baseline)
 
-        call = self.base_call.format(self.input_file)
-        proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)
-        outs = proc.stdout.decode('utf-8')
+#     def test_pwr(self):
+#         test_filename =  "persistence_" + self.wikiq_out_name
+#         test_file = os.path.join(self.test_output_dir, test_filename)
+#         if os.path.exists(test_file): 
+#            os.remove(test_file)
         
-        test_file = "noargs_" + self.wikiq_out_name
-        baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
+#         call = self.base_call.format(self.input_file, self.test_output_dir)
+#         call = call + " --persistence"
+#         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
+#         proc.wait()
 
-        test_lines = outs.splitlines(True)
-        baseline_lines = open(baseline_file)
-        for test, baseline in zip(test_lines, baseline_lines):
-            self.assertEqual(test,baseline)
 
-        # test_file =  "noargs_" + self.wikiq_out_name
-        # copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
+#         copyfile(self.call_output, test_file)
 
-        # baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
+#         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
 
-        # test_lines = open(os.path.join(self.test_output_dir,test_file))
-        # baseline_lines = open(baseline_file)
-        # for test, baseline in zip(test_lines, baseline_lines):
-        #     self.assertEqual(test,baseline)
-        # test_lines.close()
-        # baseline_lines.close()
+#         test = pd.read_table(test_file)
+#         baseline = pd.read_table(baseline_file)
+#         assert_frame_equal(test,baseline)
 
+
+#     def test_url_encode(self):
+#         test_filename =  "url-encode_" + self.wikiq_out_name
+
+#         test_file = os.path.join(self.test_output_dir, test_filename)
+#         if os.path.exists(test_file):
+#             os.remove(test_file)
+        
+#         call = self.base_call.format(self.input_file, self.test_output_dir)
+#         call = call + " --url-encode"
+#         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
+#         proc.wait()
+
+#         copyfile(self.call_output, test_file)
+#         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
+#         test = pd.read_table(test_file)
+#         baseline = pd.read_table(baseline_file)
+#         assert_frame_equal(test,baseline)
+
+
+# class Test_Malformed(unittest.TestCase):
+#     def setUp(self):
+#         if not os.path.exists("test_output"):
+#             os.mkdir("test_output")
+
+#         self.wiki = 'twinpeaks'
+#         self.wikiq_out_name =  self.wiki + ".tsv"
+#         self.test_output_dir = os.path.join(".", "test_output")
+#         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
+
+#         self.infile = "{0}.xml.7z".format(self.wiki)
+#         self.base_call = "../wikiq {0} -o {1}"
+#         self.input_dir = "dumps"
+#         self.input_file = os.path.join(".", self.input_dir,self.infile)
+
+
+#     def test_malformed_noargs(self):
+
+#         call = self.base_call.format(self.input_file, self.test_output_dir)
+#         proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
+#         proc.wait()
+#         outs, errs = proc.communicate()
+#         errlines = str(errs).split("\\n")
+#         self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
+
+# class Test_Stdout(unittest.TestCase):
+
+#     def setUp(self):
+#         self.wiki = 'sailormoon'
+#         self.wikiq_out_name =  self.wiki + ".tsv"
+
+#         self.infile = "{0}.xml.7z".format(self.wiki)
+#         self.base_call = "../wikiq {0} --stdout"
+#         self.input_dir = "dumps"
+#         self.input_file = os.path.join(".", self.input_dir,self.infile)
+#         self.baseline_output_dir = "baseline_output"
+
+#     def test_noargs(self):
+
+#         call = self.base_call.format(self.input_file)
+#         proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)
+#         outs = proc.stdout.decode("utf8")
+
+#         test_file = "noargs_" + self.wikiq_out_name
+#         baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
+#         print(baseline_file)
+#         test = pd.read_table(StringIO(outs))
+#         baseline = pd.read_table(baseline_file)
+#         assert_frame_equal(test,baseline)
         
 if __name__ == '__main__':
     unittest.main()

Community Data Science Collective || Want to submit a patch?