]> code.communitydata.science - mediawiki_dump_tools.git/blob - test/Wikiq_Unit_Test.py
add namespace filter parameter
[mediawiki_dump_tools.git] / test / Wikiq_Unit_Test.py
1 import unittest
2 import os
3 import subprocess
4 from shutil import copyfile
5 import pandas as pd
6 from pandas.util.testing import assert_frame_equal
7 from io import StringIO
8
9 # with / without pwr DONE
10 # with / without url encode DONE
11 # with / without collapse user DONE
12 # with output to sdtout DONE
13 # note that the persistence radius is 7 by default
14 # reading various file formats including
15 #        7z, gz, bz2, xml  DONE
16 # wikia and wikipedia data DONE
17 # malformed xmls DONE
18
19 class Test_Wikipedia(unittest.TestCase):
20     def setUp(self):
21         if not os.path.exists("test_output"):
22             os.mkdir("test_output")
23
24         self.wiki = 'ikwiki-20180301-pages-meta-history'
25         self.wikiq_out_name =  self.wiki + ".tsv"
26         self.test_output_dir = os.path.join(".", "test_output")
27         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
28
29         self.infile = "{0}.xml.bz2".format(self.wiki)    
30         self.base_call = "../wikiq {0} -o {1}"
31         self.input_dir = "dumps"
32         self.input_file = os.path.join(".", self.input_dir,self.infile)
33         self.baseline_output_dir = "baseline_output"
34
35     def test_WP_url_encode(self):
36         test_filename =  "url-encode_" + self.wikiq_out_name
37         test_file = os.path.join(self.test_output_dir, test_filename)
38         if os.path.exists(test_file):
39             os.remove(test_file)
40         
41         call = self.base_call.format(self.input_file, self.test_output_dir)
42         call = call + " --url-encode"
43         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
44         proc.wait()
45
46         copyfile(self.call_output, test_file)
47         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
48
49         # as a test let's make sure that we get equal data frames
50         test = pd.read_table(test_file)
51         baseline = pd.read_table(baseline_file)
52         assert_frame_equal(test,baseline)
53
54
55 class Test_Basic(unittest.TestCase):
56
57     def setUp(self):
58         if not os.path.exists("test_output"):
59             os.mkdir("test_output")
60
61         self.wiki = 'sailormoon'
62         self.wikiq_out_name =  self.wiki + ".tsv"
63         self.test_output_dir = os.path.join(".", "test_output")
64         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
65
66         self.infile = "{0}.xml.7z".format(self.wiki)
67         self.base_call = "../wikiq {0} -o {1}"
68         self.input_dir = "dumps"
69         self.input_file = os.path.join(".", self.input_dir,self.infile)
70         self.baseline_output_dir = "baseline_output"
71
72     def test_noargs(self):
73
74         test_filename =  "noargs_" + self.wikiq_out_name
75         test_file = os.path.join(self.test_output_dir, test_filename)
76         if os.path.exists(test_file):
77             os.remove(test_file)
78         
79         call = self.base_call.format(self.input_file, self.test_output_dir)
80         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
81         proc.wait()
82
83         copyfile(self.call_output, test_file)
84
85         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
86
87         test = pd.read_table(test_file)
88         baseline = pd.read_table(baseline_file)
89         assert_frame_equal(test,baseline)
90
91
92     def test_collapse_user(self):
93         test_filename =  "collapse-user_" + self.wikiq_out_name
94         test_file = os.path.join(self.test_output_dir, test_filename)
95         if os.path.exists(test_file):
96             os.remove(test_file)
97         
98         call = self.base_call.format(self.input_file, self.test_output_dir)
99         call = call + " --collapse-user"
100
101         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
102         proc.wait()
103
104         copyfile(self.call_output, test_file)
105
106         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
107         test = pd.read_table(test_file)
108         baseline = pd.read_table(baseline_file)
109         assert_frame_equal(test,baseline)
110
111     def test_pwr_segment(self):
112         test_filename =  "persistence_segment_" + self.wikiq_out_name
113         test_file = os.path.join(self.test_output_dir, test_filename)
114         if os.path.exists(test_file):
115             os.remove(test_file)
116         
117         call = self.base_call.format(self.input_file, self.test_output_dir)
118         call = call + " --persistence segment"
119         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
120         proc.wait()
121
122
123         copyfile(self.call_output, test_file)
124
125         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
126
127         test = pd.read_table(test_file)
128         baseline = pd.read_table(baseline_file)
129         assert_frame_equal(test,baseline)
130
131     def test_pwr_legacy(self):
132         test_filename =  "persistence_legacy_" + self.wikiq_out_name
133         test_file = os.path.join(self.test_output_dir, test_filename)
134         if os.path.exists(test_file):
135             os.remove(test_file)
136         
137         call = self.base_call.format(self.input_file, self.test_output_dir)
138         call = call + " --persistence legacy"
139         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
140         proc.wait()
141
142
143         copyfile(self.call_output, test_file)
144
145         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
146
147         test = pd.read_table(test_file)
148         baseline = pd.read_table(baseline_file)
149         assert_frame_equal(test,baseline)
150
151     def test_pwr(self):
152         test_filename =  "persistence_" + self.wikiq_out_name
153         test_file = os.path.join(self.test_output_dir, test_filename)
154         if os.path.exists(test_file): 
155            os.remove(test_file)
156         
157         call = self.base_call.format(self.input_file, self.test_output_dir)
158         call = call + " --persistence"
159         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
160         proc.wait()
161
162
163         copyfile(self.call_output, test_file)
164
165         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
166
167         test = pd.read_table(test_file)
168         baseline = pd.read_table(baseline_file)
169         assert_frame_equal(test,baseline)
170
171
172     def test_url_encode(self):
173         test_filename =  "url-encode_" + self.wikiq_out_name
174
175         test_file = os.path.join(self.test_output_dir, test_filename)
176         if os.path.exists(test_file):
177             os.remove(test_file)
178         
179         call = self.base_call.format(self.input_file, self.test_output_dir)
180         call = call + " --url-encode"
181         proc = subprocess.Popen(call,stdout=subprocess.PIPE,shell=True)
182         proc.wait()
183
184         copyfile(self.call_output, test_file)
185         baseline_file = os.path.join(".", self.baseline_output_dir, test_filename)
186         test = pd.read_table(test_file)
187         baseline = pd.read_table(baseline_file)
188         assert_frame_equal(test,baseline)
189
190
191 class Test_Malformed(unittest.TestCase):
192
193     def setUp(self):
194         if not os.path.exists("test_output"):
195             os.mkdir("test_output")
196
197         self.wiki = 'twinpeaks'
198         self.wikiq_out_name =  self.wiki + ".tsv"
199         self.test_output_dir = os.path.join(".", "test_output")
200         self.call_output = os.path.join(self.test_output_dir, self.wikiq_out_name)
201
202         self.infile = "{0}.xml.7z".format(self.wiki)
203         self.base_call = "../wikiq {0} -o {1}"
204         self.input_dir = "dumps"
205         self.input_file = os.path.join(".", self.input_dir,self.infile)
206
207
208     def test_malformed_noargs(self):
209
210         call = self.base_call.format(self.input_file, self.test_output_dir)
211         proc = subprocess.Popen(call,stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
212         proc.wait()
213         outs, errs = proc.communicate()
214         errlines = str(errs).split("\\n")
215         self.assertEqual(errlines[-2],'xml.etree.ElementTree.ParseError: no element found: line 1369, column 0')
216
217 class Test_Stdout(unittest.TestCase):
218
219     def setUp(self):
220         self.wiki = 'sailormoon'
221         self.wikiq_out_name =  self.wiki + ".tsv"
222
223         self.infile = "{0}.xml.7z".format(self.wiki)
224         self.base_call = "../wikiq {0} --stdout"
225         self.input_dir = "dumps"
226         self.input_file = os.path.join(".", self.input_dir,self.infile)
227         self.baseline_output_dir = "baseline_output"
228
229     def test_noargs(self):
230
231         call = self.base_call.format(self.input_file)
232         proc = subprocess.run(call,stdout=subprocess.PIPE,shell=True)
233         outs = proc.stdout.decode("utf8")
234
235         test_file = "noargs_" + self.wikiq_out_name
236         baseline_file = os.path.join(".", self.baseline_output_dir, test_file)
237         print(baseline_file)
238         test = pd.read_table(StringIO(outs))
239         baseline = pd.read_table(baseline_file)
240         assert_frame_equal(test,baseline)
241         
242 if __name__ == '__main__':
243     unittest.main()

Community Data Science Collective || Want to submit a patch?