3 # Scrape the Wikia userroles api
4 # Copyright (C) 2018 Nathan TeBlunthuis
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program. If not, see <https://www.gnu.org/licenses/>.
26 from importlib import reload
27 from json.decoder import JSONDecodeError
29 from scraper_utils import prepare_output, read_wikilist, add_parser_arguments
35 roles = ['bot', 'sysop', 'bureaucrat', 'staff', 'rollback', # 'util',
36 'helper', 'vstf', 'checkuser-global', 'bot-global',
37 'council', 'authenticated', 'checkuser', 'chatmoderator',
38 'adminmentor', 'steward', 'oversight', 'founder', 'rollbacker', 'checkuser', 'researcher']
43 def __init__(self, url_root, wikitype):
44 self.wikitype = wikitype
45 if self.wikitype == "wikia":
46 self._api_url = url_root + 'index.php?action=ajax&rs=ListusersAjax::axShowUsers'
47 else: # wikitype == "wikipedia"
48 self._api_url = url_root + 'api.php'
50 def _fetch_http(self, url, params):
51 if self.wikitype == "wikia":
52 response = requests.get(url=url, params=params, headers={
53 'Accept-encoding': 'gzip'})
55 else: # wikitype == "wikipedia"
56 response = requests.get(url=url, params=params)
59 def call(self, params):
60 response = self._fetch_http(self._api_url, params)
61 if self.wikitype == "wikia":
62 return json.loads(response)
64 return response.json()
67 def write_user_csvfile(output_file, user_list):
68 csvfile = csv.writer(output_file, delimiter='\t',
69 quotechar='"', quoting=csv.QUOTE_NONNUMERIC)
71 # construct and output the header
72 csvfile.writerow(['username', 'groups',
73 'edits', 'last.logged', 'last.edited'])
75 for user in user_list:
76 csvfile.writerow(user)
79 def get_administrators_for_wiki(wikiname, url_root, wikitype="wikia"):
83 if wikitype == "wikia":
85 query = {'groups': 'bot,sysop,bureaucrat,',
87 'limit': increment_size,
90 'order': 'username:asc'}
92 else: # wikitype == "wikipedia"
93 query = {'action': 'query',
95 'augroup': "|".join(roles),
100 # FIND THE CORRECT URL (there may be redirects)
102 if wikitype == "wikia":
103 url_root = requests.get(url_root).url
104 re_str = "^http://(community|www).wikia.com/"
105 if re.match(re_str, url_root):
107 # 'http://community.wikia.com/wiki/Community_Central:Not_a_valid_Wikia':
108 print("ERROR: %s no longer exists" % wikiname)
112 wiki = ListUserAPI(url_root, wikitype=wikitype)
113 rv = wiki.call(query)
115 except requests.ConnectionError as e:
116 print("ERROR: cannot read the event log: %s" % wikiname)
117 notauthorized.append(wikiname)
118 return "notauthorized"
120 except JSONDecodeError as e:
121 print("ERROR: cannot read the event log: %s" % wikiname)
122 notauthorized.append(wikiname)
123 return "notauthorized"
125 output_file = open("{0}/{1}.tsv".format(output_path, wikiname), 'w')
126 if wikitype == "wikia":
127 raw_userlist = rv['aaData']
129 while (rv['iTotalRecords'] + offset) < rv['iTotalDisplayRecords']:
130 # increment the offset and make a new query
131 offset = offset + increment_size
132 query['offset'] = offset
133 rv = wiki.call(query)
134 raw_userlist.extend(rv['aaData'])
135 print("Another one: offset is %s" % offset)
137 # go through and edit the html output of the json
138 processed_userlist = []
139 for row in raw_userlist:
140 row[0] = re.sub(r'^.*?<a href=.*?>(.*?)<.*$', r'\1', row[0])
142 # work around change in wikia api that removed last.logged
147 row[4] = re.sub(r'^.*oldid=(\d+)".*$', r'\1', row[4])
148 row[4] = re.sub(r'^\-$', r'', row[4])
149 processed_userlist.append(row)
151 write_user_csvfile(output_file, processed_userlist)
155 raw_userlist = rv['query']['allusers']
156 outlines = ['\t'.join(["username", "groups"])]
157 while 'continue' in rv:
158 query['continue'] = str(rv['continue'])
159 query['aufrom'] = str(rv['continue']['aufrom'])
160 rv = wiki.call(query)
161 raw_userlist = rv['query']['allusers']
163 ['\t'.join([q['name'], ','.join(q['groups'])]) for q in raw_userlist])
164 output_file.write('\n'.join(outlines))
168 # open and then send data to the output data file
171 # %run userroles_from_listusers.py --sep=\\t --nuke_old ../identifyWikis/wikiteamWikilist.tsv /com/projects/messagewalls/userroles/listusers
173 if __name__ == '__main__':
174 parser = argparse.ArgumentParser(
175 description="Get user roles for Wikis from the Mediawiki list users API")
177 parser = add_parser_arguments(parser)
178 args = parser.parse_args()
179 output_path = args.output
180 header = not args.no_header
182 prepare_output(output_path, args.nuke_old)
184 wikilist = read_wikilist(args)
188 files = [os.path.join(output_path, i) for i in os.listdir(output_path)]
190 for wiki, url, wikitype in wikilist:
191 if "{0}.{1}".format(path.join(output_path, wiki), 'tsv') in files:
192 print("SKIPPING: file \"%s\" already exists)" % wiki)
194 print("Processing wiki: %s" % wiki)
196 result = get_administrators_for_wiki(wiki, url, wikitype=wikitype)
197 if result == "deleted":
199 elif result == "notauthorized":
200 notauthorized.append(wiki)
205 df = open("allusers_error_deleted.txt", 'w')
206 df.write('\n'.join(deleted))
209 na = open("allusers_error_notauthorized.txt", 'w')
210 na.write('\n'.join(notauthorized))