]> code.communitydata.science - wikia_userroles_scraper.git/blob - userroles_from_listusers.py
put in the longer version of the copyright
[wikia_userroles_scraper.git] / userroles_from_listusers.py
1 #!/usr/bin/env python3
2
3 # Scrape the Wikia userroles api
4 # Copyright (C) 2018  Nathan TeBlunthuis
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 # GNU General Public License for more details.
15
16 # You should have received a copy of the GNU General Public License
17 # along with this program.  If not, see <https://www.gnu.org/licenses/>.
18
19 import argparse
20 import csv
21 import json
22 import sys
23 import time
24 import re
25 import os
26 from importlib import reload
27 from json.decoder import JSONDecodeError
28 from os import path
29 from scraper_utils import prepare_output, read_wikilist, add_parser_arguments
30
31 import requests
32
33 reload(sys)
34
35 roles = ['bot', 'sysop', 'bureaucrat', 'staff', 'rollback',  # 'util',
36          'helper', 'vstf', 'checkuser-global', 'bot-global',
37          'council', 'authenticated', 'checkuser', 'chatmoderator',
38          'adminmentor', 'steward', 'oversight', 'founder', 'rollbacker', 'checkuser', 'researcher']
39
40
41 class ListUserAPI():
42
43     def __init__(self, url_root, wikitype):
44         self.wikitype = wikitype
45         if self.wikitype == "wikia":
46             self._api_url = url_root + 'index.php?action=ajax&rs=ListusersAjax::axShowUsers'
47         else:  # wikitype == "wikipedia"
48             self._api_url = url_root + 'api.php'
49
50     def _fetch_http(self, url, params):
51         if self.wikitype == "wikia":
52             response = requests.get(url=url, params=params, headers={
53                                     'Accept-encoding': 'gzip'})
54             return(response.text)
55         else:  # wikitype == "wikipedia"
56             response = requests.get(url=url, params=params)
57             return(response)
58
59     def call(self, params):
60         response = self._fetch_http(self._api_url, params)
61         if self.wikitype == "wikia":
62             return json.loads(response)
63         else:
64             return response.json()
65
66
67 def write_user_csvfile(output_file, user_list):
68     csvfile = csv.writer(output_file, delimiter='\t',
69                          quotechar='"', quoting=csv.QUOTE_NONNUMERIC)
70
71     # construct and output the header
72     csvfile.writerow(['username', 'groups',
73                       'edits', 'last.logged', 'last.edited'])
74
75     for user in user_list:
76         csvfile.writerow(user)
77
78
79 def get_administrators_for_wiki(wikiname, url_root, wikitype="wikia"):
80     increment_size = 500
81     offset = 0
82
83     if wikitype == "wikia":
84
85         query = {'groups': 'bot,sysop,bureaucrat,',
86                  'edits': 0,
87                  'limit': increment_size,
88                  'offset': offset,
89                  'numOrder': 1,
90                  'order': 'username:asc'}
91
92     else:  # wikitype == "wikipedia"
93         query = {'action': 'query',
94                  'list': 'allusers',
95                  'augroup': "|".join(roles),
96                  'auprop': 'groups',
97                  'aulimit': 500,
98                  'format': 'json'}
99
100     # FIND THE CORRECT URL (there may be redirects)
101
102     if wikitype == "wikia":
103         url_root = requests.get(url_root).url
104         re_str = "^http://(community|www).wikia.com/"
105         if re.match(re_str, url_root):
106             # api_url
107             # 'http://community.wikia.com/wiki/Community_Central:Not_a_valid_Wikia':
108             print("ERROR: %s no longer exists" % wikiname)
109
110             return "deleted"
111     try:
112         wiki = ListUserAPI(url_root, wikitype=wikitype)
113         rv = wiki.call(query)
114
115     except requests.ConnectionError as e:
116         print("ERROR: cannot read the event log: %s" % wikiname)
117         notauthorized.append(wikiname)
118         return "notauthorized"
119
120     except JSONDecodeError as e:
121         print("ERROR: cannot read the event log: %s" % wikiname)
122         notauthorized.append(wikiname)
123         return "notauthorized"
124
125     output_file = open("{0}/{1}.tsv".format(output_path, wikiname), 'w')
126     if wikitype == "wikia":
127         raw_userlist = rv['aaData']
128
129         while (rv['iTotalRecords'] + offset) < rv['iTotalDisplayRecords']:
130             # increment the offset and make a new query
131             offset = offset + increment_size
132             query['offset'] = offset
133             rv = wiki.call(query)
134             raw_userlist.extend(rv['aaData'])
135             print("Another one: offset is %s" % offset)
136
137         # go through and edit the html output of the json
138         processed_userlist = []
139         for row in raw_userlist:
140             row[0] = re.sub(r'^.*?<a href=.*?>(.*?)<.*$', r'\1', row[0])
141
142             # work around change in wikia api that removed last.logged
143             if len(row) < 5:
144                 row.append(row[3])
145                 row[3] = None
146
147             row[4] = re.sub(r'^.*oldid=(\d+)".*$', r'\1', row[4])
148             row[4] = re.sub(r'^\-$', r'', row[4])
149             processed_userlist.append(row)
150
151         write_user_csvfile(output_file, processed_userlist)
152         output_file.close()
153
154     else:
155         raw_userlist = rv['query']['allusers']
156         outlines = ['\t'.join(["username", "groups"])]
157         while 'continue' in rv:
158             query['continue'] = str(rv['continue'])
159             query['aufrom'] = str(rv['continue']['aufrom'])
160             rv = wiki.call(query)
161             raw_userlist = rv['query']['allusers']
162             outlines.extend(
163                 ['\t'.join([q['name'], ','.join(q['groups'])]) for q in raw_userlist])
164             output_file.write('\n'.join(outlines))
165             output_file.flush()
166             outlines = []
167
168     # open and then send data to the output data file
169
170 # the call is
171 # %run userroles_from_listusers.py --sep=\\t --nuke_old ../identifyWikis/wikiteamWikilist.tsv /com/projects/messagewalls/userroles/listusers
172
173 if __name__ == '__main__':
174     parser = argparse.ArgumentParser(
175         description="Get user roles for Wikis from the Mediawiki list users API")
176
177     parser = add_parser_arguments(parser)
178     args = parser.parse_args()
179     output_path = args.output
180     header = not args.no_header
181
182     prepare_output(output_path, args.nuke_old)
183
184     wikilist = read_wikilist(args)
185     deleted = []
186     notauthorized = []
187
188     files = [os.path.join(output_path, i) for i in os.listdir(output_path)]
189
190     for wiki, url, wikitype in wikilist:
191         if "{0}.{1}".format(path.join(output_path, wiki), 'tsv') in files:
192             print("SKIPPING: file \"%s\" already exists)" % wiki)
193             continue
194         print("Processing wiki: %s" % wiki)
195
196         result = get_administrators_for_wiki(wiki, url, wikitype=wikitype)
197         if result == "deleted":
198             deleted.append(wiki)
199         elif result == "notauthorized":
200             notauthorized.append(wiki)
201         else:
202             pass
203         time.sleep(1)
204
205     df = open("allusers_error_deleted.txt", 'w')
206     df.write('\n'.join(deleted))
207     df.close()
208
209     na = open("allusers_error_notauthorized.txt", 'w')
210     na.write('\n'.join(notauthorized))
211     na.close()

Community Data Science Collective || Want to submit a patch?