#!/usr/bin/env python3 # Scrape the Wikia userroles api # Copyright (C) 2018 Nathan TeBlunthuis # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. import argparse import csv import json import sys import time import re import os from importlib import reload from json.decoder import JSONDecodeError from os import path from scraper_utils import prepare_output, read_wikilist, add_parser_arguments import requests reload(sys) roles = ['bot', 'sysop', 'bureaucrat', 'staff', 'rollback', # 'util', 'helper', 'vstf', 'checkuser-global', 'bot-global', 'council', 'authenticated', 'checkuser', 'chatmoderator', 'adminmentor', 'steward', 'oversight', 'founder', 'rollbacker', 'checkuser', 'researcher'] class ListUserAPI(): def __init__(self, url_root, wikitype): self.wikitype = wikitype if self.wikitype == "wikia": self._api_url = url_root + 'index.php?action=ajax&rs=ListusersAjax::axShowUsers' else: # wikitype == "wikipedia" self._api_url = url_root + 'api.php' def _fetch_http(self, url, params): if self.wikitype == "wikia": response = requests.get(url=url, params=params, headers={ 'Accept-encoding': 'gzip'}) return(response.text) else: # wikitype == "wikipedia" response = requests.get(url=url, params=params) return(response) def call(self, params): response = self._fetch_http(self._api_url, params) if self.wikitype == "wikia": return json.loads(response) else: return response.json() def write_user_csvfile(output_file, user_list): csvfile = csv.writer(output_file, delimiter='\t', quotechar='"', quoting=csv.QUOTE_NONNUMERIC) # construct and output the header csvfile.writerow(['username', 'groups', 'edits', 'last.logged', 'last.edited']) for user in user_list: csvfile.writerow(user) def get_administrators_for_wiki(wikiname, url_root, wikitype="wikia"): increment_size = 500 offset = 0 if wikitype == "wikia": query = {'groups': 'bot,sysop,bureaucrat,', 'edits': 0, 'limit': increment_size, 'offset': offset, 'numOrder': 1, 'order': 'username:asc'} else: # wikitype == "wikipedia" query = {'action': 'query', 'list': 'allusers', 'augroup': "|".join(roles), 'auprop': 'groups', 'aulimit': 500, 'format': 'json'} # FIND THE CORRECT URL (there may be redirects) if wikitype == "wikia": url_root = requests.get(url_root).url re_str = "^http://(community|www).wikia.com/" if re.match(re_str, url_root): # api_url # 'http://community.wikia.com/wiki/Community_Central:Not_a_valid_Wikia': print("ERROR: %s no longer exists" % wikiname) return "deleted" try: wiki = ListUserAPI(url_root, wikitype=wikitype) rv = wiki.call(query) except requests.ConnectionError as e: print("ERROR: cannot read the event log: %s" % wikiname) notauthorized.append(wikiname) return "notauthorized" except JSONDecodeError as e: print("ERROR: cannot read the event log: %s" % wikiname) notauthorized.append(wikiname) return "notauthorized" output_file = open("{0}/{1}.tsv".format(output_path, wikiname), 'w') if wikitype == "wikia": raw_userlist = rv['aaData'] while (rv['iTotalRecords'] + offset) < rv['iTotalDisplayRecords']: # increment the offset and make a new query offset = offset + increment_size query['offset'] = offset rv = wiki.call(query) raw_userlist.extend(rv['aaData']) print("Another one: offset is %s" % offset) # go through and edit the html output of the json processed_userlist = [] for row in raw_userlist: row[0] = re.sub(r'^.*?(.*?)<.*$', r'\1', row[0]) # work around change in wikia api that removed last.logged if len(row) < 5: row.append(row[3]) row[3] = None row[4] = re.sub(r'^.*oldid=(\d+)".*$', r'\1', row[4]) row[4] = re.sub(r'^\-$', r'', row[4]) processed_userlist.append(row) write_user_csvfile(output_file, processed_userlist) output_file.close() else: raw_userlist = rv['query']['allusers'] outlines = ['\t'.join(["username", "groups"])] while 'continue' in rv: query['continue'] = str(rv['continue']) query['aufrom'] = str(rv['continue']['aufrom']) rv = wiki.call(query) raw_userlist = rv['query']['allusers'] outlines.extend( ['\t'.join([q['name'], ','.join(q['groups'])]) for q in raw_userlist]) output_file.write('\n'.join(outlines)) output_file.flush() outlines = [] # open and then send data to the output data file # the call is # %run userroles_from_listusers.py --sep=\\t --nuke_old ../identifyWikis/wikiteamWikilist.tsv /com/projects/messagewalls/userroles/listusers if __name__ == '__main__': parser = argparse.ArgumentParser( description="Get user roles for Wikis from the Mediawiki list users API") parser = add_parser_arguments(parser) args = parser.parse_args() output_path = args.output header = not args.no_header prepare_output(output_path, args.nuke_old) wikilist = read_wikilist(args) deleted = [] notauthorized = [] files = [os.path.join(output_path, i) for i in os.listdir(output_path)] for wiki, url, wikitype in wikilist: if "{0}.{1}".format(path.join(output_path, wiki), 'tsv') in files: print("SKIPPING: file \"%s\" already exists)" % wiki) continue print("Processing wiki: %s" % wiki) result = get_administrators_for_wiki(wiki, url, wikitype=wikitype) if result == "deleted": deleted.append(wiki) elif result == "notauthorized": notauthorized.append(wiki) else: pass time.sleep(1) df = open("allusers_error_deleted.txt", 'w') df.write('\n'.join(deleted)) df.close() na = open("allusers_error_notauthorized.txt", 'w') na.write('\n'.join(notauthorized)) na.close()