#!/usr/bin/env python3 # Copyright (C) 2018 Nathan TeBlunthuis # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . import time, re, os import sys from importlib import reload reload(sys) import urllib import requests import json import gzip from pprint import pprint from itertools import islice import csv roles = ['bot','sysop', 'bureaucrat','staff','rollback', # 'util', 'helper', 'vstf', 'checkuser-global', 'bot-global', 'council','authenticated', 'checkuser', 'chatmoderator', 'adminmentor','steward','oversight','founder','rollbacker','checkuser','researcher'] output_path = "userlist-2017/" class ListUserAPI(): def __init__(self, url_root,wikitype): self.wikitype = wikitype if self.wikitype=="wikia": self._api_url = url_root + 'index.php?action=ajax&rs=ListusersAjax::axShowUsers' else: # wikitype == "wikipedia" self._api_url = url_root + 'api.php' def _fetch_http(self, url, params): if self.wikitype == "wikia": response = requests.get(url=url, params=params,headers={'Accept-encoding':'gzip'}) return(response.text) else: #wikitype == "wikipedia" response = requests.get(url=url, params=params) return(response) def call(self, params): response = self._fetch_http(self._api_url, params) if self.wikitype == "wikia": return json.loads(response) else: return response.json() def write_user_csvfile(output_file, user_list): csvfile = csv.writer(output_file, delimiter='\t', quotechar='"', quoting=csv.QUOTE_NONNUMERIC) # construct and output the header csvfile.writerow(['username', 'groups', 'edits', 'last.logged', 'last.edited']) for user in user_list: csvfile.writerow(user) def get_administrators_for_wiki(wikiname, url_root, wikitype="wikia"): increment_size = 500 offset = 0 if wikitype == "wikia": query = {'groups' :'bot,sysop,bureaucrat,', 'edits' : 0, 'limit' : increment_size, 'offset' : offset, 'numOrder' : 1, 'order' : 'username:asc' } else: # wikitype == "wikipedia" query = {'action': 'query', 'list': 'allusers', 'augroup' : "|".join(roles), 'auprop' : 'groups', 'aulimit' : 500, 'format' : 'json'} ## FIND THE CORRECT URL (there may be redirects) if wikitype=="wikia": url_root = requests.get(url_root).url re_str = "^http://(community|www).wikia.com/" if re.match(re_str, url_root): # api_url 'http://community.wikia.com/wiki/Community_Central:Not_a_valid_Wikia': print("ERROR: %s no longer exists" % wikiname) return "deleted" try: wiki = ListUserAPI(url_root,wikitype=wikitype) rv = wiki.call(query) except requests.ConnectionError as e: print("ERROR: cannot read the event log: %s" % wikiname) notauthorized.append(wikiname) return "notauthorized" if wikitype == "wikia": raw_userlist = rv['aaData'] while (rv['iTotalRecords'] + offset) < rv['iTotalDisplayRecords']: # increment the offset and make a new query offset = offset + increment_size query['offset'] = offset rv = wiki.call(query) raw_userlist.extend(rv['aaData']) print("Another one: offset is %s" % offset) # go through and edit the html output of the json processed_userlist = [] for row in raw_userlist: row[0] = re.sub(r'^.*?(.*?)<.*$', r'\1', row[0]) row[4] = re.sub(r'^.*oldid=(\d+)".*$', r'\1', row[4]) row[4] = re.sub(r'^\-$', r'', row[4]) processed_userlist.append(row) output_file = open("{0}/{1}.tsv".format(output_path, wikiname),'w') write_user_csvfile(output_file, processed_userlist) output_file.close() else: output_file = open("{0}/{1}.tsv".format(output_path, wikiname),'w') raw_userlist = rv['query']['allusers'] outlines = ['\t'.join(["username","groups"])] outlines.extend(['\t'.join([q['name'],','.join(q['groups'])]) for q in raw_userlist]) output_file.write('\n'.join(outlines)) outlines = [] while 'continue' in rv: query['continue'] = str(rv['continue']) query['aufrom']= str(rv['continue']['aufrom']) rv = wiki.call(query) raw_userlist = rv['query']['allusers'] outlines.extend(['\t'.join([q['name'],','.join(q['groups'])]) for q in raw_userlist]) output_file.write('\n'.join(outlines)) output_file.flush() outlines = [] # open and then send data to the output data file # read in the a list of files so we can skip them if we're already # downloaded them files = [os.path.join(output_path, i) for i in os.listdir(output_path)] # iterate through the list of files # for line in open("list_of_wikis.csv", "r").readlines(): # next line useful for working with a reduced list: d = [(line.split(",")[0], line.split(",")[1]) for line in islice(open("../wikis.needing.userroles.csv"),1,None)] deleted = [] notauthorized = [] for wiki, url in d: wiki = wiki.strip() url = url.strip() print(url) if os.path.join(output_path,wiki+".tsv") in files: print("SKIPPING: file \"%s\" already exists)" % wiki) continue print("Processing wiki: %s" % wiki) if "wikipedia.org" in url: wikitype = "wikipedia" url = url + '/w/' if "wikia.com" in url: wikitype = "wikia" result = get_administrators_for_wiki(wiki, url, wikitype=wikitype) if result == "deleted": deleted.append(wiki) elif result == "notauthorized": notauthorized.append(wiki) else: pass time.sleep(1) df = open("allusers_WP_error_deleted.txt",'w') df.write('\n'.join(deleted)) df.close() na = open("allusers_WP_error_notauthorized.txt",'w') na.write('\n'.join(notauthorized)) na.close()