#!/usr/bin/env python3 import argparse import csv import json import sys import time import re import os from importlib import reload from json.decoder import JSONDecodeError from os import path from scraper_utils import prepare_output, read_wikilist, add_parser_arguments import requests reload(sys) roles = ['bot', 'sysop', 'bureaucrat', 'staff', 'rollback', # 'util', 'helper', 'vstf', 'checkuser-global', 'bot-global', 'council', 'authenticated', 'checkuser', 'chatmoderator', 'adminmentor', 'steward', 'oversight', 'founder', 'rollbacker', 'checkuser', 'researcher'] class ListUserAPI(): def __init__(self, url_root, wikitype): self.wikitype = wikitype if self.wikitype == "wikia": self._api_url = url_root + 'index.php?action=ajax&rs=ListusersAjax::axShowUsers' else: # wikitype == "wikipedia" self._api_url = url_root + 'api.php' def _fetch_http(self, url, params): if self.wikitype == "wikia": response = requests.get(url=url, params=params, headers={ 'Accept-encoding': 'gzip'}) return(response.text) else: # wikitype == "wikipedia" response = requests.get(url=url, params=params) return(response) def call(self, params): response = self._fetch_http(self._api_url, params) if self.wikitype == "wikia": return json.loads(response) else: return response.json() def write_user_csvfile(output_file, user_list): csvfile = csv.writer(output_file, delimiter='\t', quotechar='"', quoting=csv.QUOTE_NONNUMERIC) # construct and output the header csvfile.writerow(['username', 'groups', 'edits', 'last.logged', 'last.edited']) for user in user_list: csvfile.writerow(user) def get_administrators_for_wiki(wikiname, url_root, wikitype="wikia"): increment_size = 500 offset = 0 if wikitype == "wikia": query = {'groups': 'bot,sysop,bureaucrat,', 'edits': 0, 'limit': increment_size, 'offset': offset, 'numOrder': 1, 'order': 'username:asc'} else: # wikitype == "wikipedia" query = {'action': 'query', 'list': 'allusers', 'augroup': "|".join(roles), 'auprop': 'groups', 'aulimit': 500, 'format': 'json'} # FIND THE CORRECT URL (there may be redirects) if wikitype == "wikia": url_root = requests.get(url_root).url re_str = "^http://(community|www).wikia.com/" if re.match(re_str, url_root): # api_url # 'http://community.wikia.com/wiki/Community_Central:Not_a_valid_Wikia': print("ERROR: %s no longer exists" % wikiname) return "deleted" try: wiki = ListUserAPI(url_root, wikitype=wikitype) rv = wiki.call(query) except requests.ConnectionError as e: print("ERROR: cannot read the event log: %s" % wikiname) notauthorized.append(wikiname) return "notauthorized" except JSONDecodeError as e: print("ERROR: cannot read the event log: %s" % wikiname) notauthorized.append(wikiname) return "notauthorized" output_file = open("{0}/{1}.tsv".format(output_path, wikiname), 'w') if wikitype == "wikia": raw_userlist = rv['aaData'] while (rv['iTotalRecords'] + offset) < rv['iTotalDisplayRecords']: # increment the offset and make a new query offset = offset + increment_size query['offset'] = offset rv = wiki.call(query) raw_userlist.extend(rv['aaData']) print("Another one: offset is %s" % offset) # go through and edit the html output of the json processed_userlist = [] for row in raw_userlist: row[0] = re.sub(r'^.*?(.*?)<.*$', r'\1', row[0]) # work around change in wikia api that removed last.logged if len(row) < 5: row.append(row[3]) row[3] = None row[4] = re.sub(r'^.*oldid=(\d+)".*$', r'\1', row[4]) row[4] = re.sub(r'^\-$', r'', row[4]) processed_userlist.append(row) write_user_csvfile(output_file, processed_userlist) output_file.close() else: raw_userlist = rv['query']['allusers'] outlines = ['\t'.join(["username", "groups"])] while 'continue' in rv: query['continue'] = str(rv['continue']) query['aufrom'] = str(rv['continue']['aufrom']) rv = wiki.call(query) raw_userlist = rv['query']['allusers'] outlines.extend( ['\t'.join([q['name'], ','.join(q['groups'])]) for q in raw_userlist]) output_file.write('\n'.join(outlines)) output_file.flush() outlines = [] # open and then send data to the output data file # the call is # %run userroles_from_listusers.py --sep=\\t --nuke_old ../identifyWikis/wikiteamWikilist.tsv /com/projects/messagewalls/userroles/listusers if __name__ == '__main__': parser = argparse.ArgumentParser( description="Get user roles for Wikis from the Mediawiki list users API") parser = add_parser_arguments(parser) args = parser.parse_args() output_path = args.output header = not args.no_header prepare_output(output_path, args.nuke_old) wikilist = read_wikilist(args) deleted = [] notauthorized = [] files = [os.path.join(output_path, i) for i in os.listdir(output_path)] for wiki, url, wikitype in wikilist: if "{0}.{1}".format(path.join(output_path, wiki), 'tsv') in files: print("SKIPPING: file \"%s\" already exists)" % wiki) continue print("Processing wiki: %s" % wiki) result = get_administrators_for_wiki(wiki, url, wikitype=wikitype) if result == "deleted": deleted.append(wiki) elif result == "notauthorized": notauthorized.append(wiki) else: pass time.sleep(1) df = open("allusers_error_deleted.txt", 'w') df.write('\n'.join(deleted)) df.close() na = open("allusers_error_notauthorized.txt", 'w') na.write('\n'.join(notauthorized)) na.close()