add tests for wikipedia, malformed xml, bzip2, correct bz2 bug in wikiq.
[mediawiki_dump_tools.git] / test / Wikiq_Unit_Test.py
1 import unittest
2 import os
3 import sys
4 import subprocess
5 from shutil import copyfile
6 import pdb
7
8 # with / without pwr DONE
9 # with / without url encode DONE
10 # with / without collapse user DONE
11 # with output to sdtout DONE
12 # note that the persistence radius is 7 by default
13 # reading various file formats including
14 #        7z, gz, bz2, xml  DONE
15 # wikia and wikipedia data DONE
16 # malformed xmls DONE
17
18 class Test_Wikipedia(unittest.TestCase):
19     def setUp(self):
20         if not os.path.exists("test_output"):
21             os.mkdir("test_output")
22
23         self.wiki = 'ikwiki-20180301-pages-meta-history'
24         self.wikiq_out_name =  self.wiki + ".tsv"
25         self.test_output_dir = os.path.join(".", "test_output")
26         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
27
28         self.infile = "{0}.xml.bz2".format(self.wiki)    
29         self.base_call = "../wikiq {0} -o {1}"
30         self.input_dir = "dumps"
31         self.input_file = os.path.join(".", self.input_dir,self.infile)
32         self.baseline_output_dir = "baseline_output"
33
34     def test_WP_url_encode(self):
35         call = self.base_call.format(self.input_file, self.test_output_dir)
36         call = call + " --url-encode"
37         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
38         proc.wait()
39         test_file =  "url-encode_" + self.wikiq_out_name
40         copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
41         baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
42
43         test_lines = open(os.path.join(self.test_output_dir,test_file))
44         baseline_lines = open(baseline_file)
45         for test, baseline in zip(test_lines, baseline_lines):
46             self.assertEqual(test,baseline)
47
48         test_lines.close()
49         baseline_lines.close()
50
51
52 class Test_Basic(unittest.TestCase):
53
54     def setUp(self):
55         if not os.path.exists("test_output"):
56             os.mkdir("test_output")
57
58         self.wiki = 'sailormoon'
59         self.wikiq_out_name =  self.wiki + ".tsv"
60         self.test_output_dir = os.path.join(".", "test_output")
61         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
62
63         self.infile = "{0}.xml.7z".format(self.wiki)
64         self.base_call = "../wikiq {0} -o {1}"
65         self.input_dir = "dumps"
66         self.input_file = os.path.join(".", self.input_dir,self.infile)
67         self.baseline_output_dir = "baseline_output"
68
69     def test_noargs(self):
70
71         call = self.base_call.format(self.input_file, self.test_output_dir)
72         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
73         proc.wait()
74         test_file =  "noargs_" + self.wikiq_out_name
75         copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
76
77         baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
78
79         test_lines = open(os.path.join(self.test_output_dir,test_file))
80         baseline_lines = open(baseline_file)
81         for test, baseline in zip(test_lines, baseline_lines):
82             self.assertEqual(test,baseline)
83
84         test_lines.close()
85         baseline_lines.close()
86
87     def test_collapse_user(self):
88         call = self.base_call.format(self.input_file, self.test_output_dir)
89         call = call + " --collapse-user"
90
91         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
92         proc.wait()
93
94         test_file =  "collapse-user_" + self.wikiq_out_name
95         copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
96
97         baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
98
99         test_lines = open(os.path.join(self.test_output_dir,test_file))
100         baseline_lines = open(baseline_file)
101         for test, baseline in zip(test_lines, baseline_lines):
102             self.assertEqual(test,baseline)
103
104         test_lines.close()
105         baseline_lines.close()
106
107     def test_pwr(self):
108         call = self.base_call.format(self.input_file, self.test_output_dir)
109         call = call + " --persistence"
110         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
111         proc.wait()
112
113         test_file =  "persistence_" + self.wikiq_out_name
114         copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
115
116         baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
117
118         test_lines = open(os.path.join(self.test_output_dir,test_file))
119         baseline_lines = open(baseline_file)
120         for test, baseline in zip(test_lines, baseline_lines):
121             self.assertEqual(test,baseline)
122
123         test_lines.close()
124         baseline_lines.close()
125
126     def test_url_encode(self):
127         call = self.base_call.format(self.input_file, self.test_output_dir)
128         call = call + " --url-encode"
129         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
130         proc.wait()
131         test_file =  "url-encode_" + self.wikiq_out_name
132         copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
133         baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
134
135         test_lines = open(os.path.join(self.test_output_dir,test_file))
136         baseline_lines = open(baseline_file)
137         for test, baseline in zip(test_lines, baseline_lines):
138             self.assertEqual(test,baseline)
139
140         test_lines.close()
141         baseline_lines.close()
142
143 class Test_Malformed(unittest.TestCase):
144
145     def setUp(self):
146         if not os.path.exists("test_output"):
147             os.mkdir("test_output")
148
149         self.wiki = 'twinpeaks'
150         self.wikiq_out_name =  self.wiki + ".tsv"
151         self.test_output_dir = os.path.join(".", "test_output")
152         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
153
154         self.infile = "{0}.xml.7z".format(self.wiki)
155         self.base_call = "../wikiq {0} -o {1}"
156         self.input_dir = "dumps"
157         self.input_file = os.path.join(".", self.input_dir,self.infile)
158
159
160     def test_malformed_noargs(self):
161
162         call = self.base_call.format(self.input_file, self.test_output_dir)
163         proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
164         proc.wait()
165         outs, errs = proc.communicate()
166         errlines = str(errs).split("\\n")
167         self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
168
169 class Test_Stdout(unittest.TestCase):
170
171     def setUp(self):
172         self.wiki = 'sailormoon'
173         self.wikiq_out_name =  self.wiki + ".tsv"
174
175         self.infile = "{0}.xml.7z".format(self.wiki)
176         self.base_call = "../wikiq {0} --stdout"
177         self.input_dir = "dumps"
178         self.input_file = os.path.join(".", self.input_dir,self.infile)
179         self.baseline_output_dir = "baseline_output"
180
181     def test_noargs(self):
182
183         call = self.base_call.format(self.input_file)
184         proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)
185         outs = proc.stdout.decode('utf-8')
186         
187         test_file = "noargs_" + self.wikiq_out_name
188         baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
189
190         test_lines = outs.splitlines(True)
191         baseline_lines = open(baseline_file)
192         for test, baseline in zip(test_lines, baseline_lines):
193             self.assertEqual(test,baseline)
194
195         # test_file =  "noargs_" + self.wikiq_out_name
196         # copyfile(self.call_output, os.path.join(self.test_output_dir, test_file))
197
198         # baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
199
200         # test_lines = open(os.path.join(self.test_output_dir,test_file))
201         # baseline_lines = open(baseline_file)
202         # for test, baseline in zip(test_lines, baseline_lines):
203         #     self.assertEqual(test,baseline)
204         # test_lines.close()
205         # baseline_lines.close()
206
207         
208 if __name__ == '__main__':
209     unittest.main()

Community Data Science Collective || Want to submit a patch?