]> code.communitydata.science - wikia_userroles_scraper.git/blob - userroles_from_listusers.py
3378d282c210f21302c73203d47725b9ca5e7ff1
[wikia_userroles_scraper.git] / userroles_from_listusers.py
1 #!/usr/bin/env python3
2
3 # Scrape the Wikia userroles api
4 # Copyright (C) 2018  Nathan TeBlunthuis
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 import argparse
12 import csv
13 import json
14 import sys
15 import time
16 import re
17 import os
18 from importlib import reload
19 from json.decoder import JSONDecodeError
20 from os import path
21 from scraper_utils import prepare_output, read_wikilist, add_parser_arguments
22
23 import requests
24
25 reload(sys)
26
27 roles = ['bot', 'sysop', 'bureaucrat', 'staff', 'rollback',  # 'util',
28          'helper', 'vstf', 'checkuser-global', 'bot-global',
29          'council', 'authenticated', 'checkuser', 'chatmoderator',
30          'adminmentor', 'steward', 'oversight', 'founder', 'rollbacker', 'checkuser', 'researcher']
31
32
33 class ListUserAPI():
34
35     def __init__(self, url_root, wikitype):
36         self.wikitype = wikitype
37         if self.wikitype == "wikia":
38             self._api_url = url_root + 'index.php?action=ajax&rs=ListusersAjax::axShowUsers'
39         else:  # wikitype == "wikipedia"
40             self._api_url = url_root + 'api.php'
41
42     def _fetch_http(self, url, params):
43         if self.wikitype == "wikia":
44             response = requests.get(url=url, params=params, headers={
45                                     'Accept-encoding': 'gzip'})
46             return(response.text)
47         else:  # wikitype == "wikipedia"
48             response = requests.get(url=url, params=params)
49             return(response)
50
51     def call(self, params):
52         response = self._fetch_http(self._api_url, params)
53         if self.wikitype == "wikia":
54             return json.loads(response)
55         else:
56             return response.json()
57
58
59 def write_user_csvfile(output_file, user_list):
60     csvfile = csv.writer(output_file, delimiter='\t',
61                          quotechar='"', quoting=csv.QUOTE_NONNUMERIC)
62
63     # construct and output the header
64     csvfile.writerow(['username', 'groups',
65                       'edits', 'last.logged', 'last.edited'])
66
67     for user in user_list:
68         csvfile.writerow(user)
69
70
71 def get_administrators_for_wiki(wikiname, url_root, wikitype="wikia"):
72     increment_size = 500
73     offset = 0
74
75     if wikitype == "wikia":
76
77         query = {'groups': 'bot,sysop,bureaucrat,',
78                  'edits': 0,
79                  'limit': increment_size,
80                  'offset': offset,
81                  'numOrder': 1,
82                  'order': 'username:asc'}
83
84     else:  # wikitype == "wikipedia"
85         query = {'action': 'query',
86                  'list': 'allusers',
87                  'augroup': "|".join(roles),
88                  'auprop': 'groups',
89                  'aulimit': 500,
90                  'format': 'json'}
91
92     # FIND THE CORRECT URL (there may be redirects)
93
94     if wikitype == "wikia":
95         url_root = requests.get(url_root).url
96         re_str = "^http://(community|www).wikia.com/"
97         if re.match(re_str, url_root):
98             # api_url
99             # 'http://community.wikia.com/wiki/Community_Central:Not_a_valid_Wikia':
100             print("ERROR: %s no longer exists" % wikiname)
101
102             return "deleted"
103     try:
104         wiki = ListUserAPI(url_root, wikitype=wikitype)
105         rv = wiki.call(query)
106
107     except requests.ConnectionError as e:
108         print("ERROR: cannot read the event log: %s" % wikiname)
109         notauthorized.append(wikiname)
110         return "notauthorized"
111
112     except JSONDecodeError as e:
113         print("ERROR: cannot read the event log: %s" % wikiname)
114         notauthorized.append(wikiname)
115         return "notauthorized"
116
117     output_file = open("{0}/{1}.tsv".format(output_path, wikiname), 'w')
118     if wikitype == "wikia":
119         raw_userlist = rv['aaData']
120
121         while (rv['iTotalRecords'] + offset) < rv['iTotalDisplayRecords']:
122             # increment the offset and make a new query
123             offset = offset + increment_size
124             query['offset'] = offset
125             rv = wiki.call(query)
126             raw_userlist.extend(rv['aaData'])
127             print("Another one: offset is %s" % offset)
128
129         # go through and edit the html output of the json
130         processed_userlist = []
131         for row in raw_userlist:
132             row[0] = re.sub(r'^.*?<a href=.*?>(.*?)<.*$', r'\1', row[0])
133
134             # work around change in wikia api that removed last.logged
135             if len(row) < 5:
136                 row.append(row[3])
137                 row[3] = None
138
139             row[4] = re.sub(r'^.*oldid=(\d+)".*$', r'\1', row[4])
140             row[4] = re.sub(r'^\-$', r'', row[4])
141             processed_userlist.append(row)
142
143         write_user_csvfile(output_file, processed_userlist)
144         output_file.close()
145
146     else:
147         raw_userlist = rv['query']['allusers']
148         outlines = ['\t'.join(["username", "groups"])]
149         while 'continue' in rv:
150             query['continue'] = str(rv['continue'])
151             query['aufrom'] = str(rv['continue']['aufrom'])
152             rv = wiki.call(query)
153             raw_userlist = rv['query']['allusers']
154             outlines.extend(
155                 ['\t'.join([q['name'], ','.join(q['groups'])]) for q in raw_userlist])
156             output_file.write('\n'.join(outlines))
157             output_file.flush()
158             outlines = []
159
160     # open and then send data to the output data file
161
162 # the call is
163 # %run userroles_from_listusers.py --sep=\\t --nuke_old ../identifyWikis/wikiteamWikilist.tsv /com/projects/messagewalls/userroles/listusers
164
165 if __name__ == '__main__':
166     parser = argparse.ArgumentParser(
167         description="Get user roles for Wikis from the Mediawiki list users API")
168
169     parser = add_parser_arguments(parser)
170     args = parser.parse_args()
171     output_path = args.output
172     header = not args.no_header
173
174     prepare_output(output_path, args.nuke_old)
175
176     wikilist = read_wikilist(args)
177     deleted = []
178     notauthorized = []
179
180     files = [os.path.join(output_path, i) for i in os.listdir(output_path)]
181
182     for wiki, url, wikitype in wikilist:
183         if "{0}.{1}".format(path.join(output_path, wiki), 'tsv') in files:
184             print("SKIPPING: file \"%s\" already exists)" % wiki)
185             continue
186         print("Processing wiki: %s" % wiki)
187
188         result = get_administrators_for_wiki(wiki, url, wikitype=wikitype)
189         if result == "deleted":
190             deleted.append(wiki)
191         elif result == "notauthorized":
192             notauthorized.append(wiki)
193         else:
194             pass
195         time.sleep(1)
196
197     df = open("allusers_error_deleted.txt", 'w')
198     df.write('\n'.join(deleted))
199     df.close()
200
201     na = open("allusers_error_notauthorized.txt", 'w')
202     na.write('\n'.join(notauthorized))
203     na.close()

Community Data Science Collective || Want to submit a patch?