#!/usr/bin/env python3 # Obtain user roles data from the Wikia logevents api # Copyright (C) 2018 Nathan TeBlunthuis # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . from os import path import argparse import time import re import os from json.decoder import JSONDecodeError from scraper_utils import prepare_output, read_wikilist, add_parser_arguments import requests def write_logevents(logevents, out): for logevent in logevents: # if there is hidden information, we skip this one because there # is nothing to report if any(['userhidden' in logevent, 'actionhidden' in logevent, 'commenthidden' in logevent]): continue le_output = [logevent['comment'], str(logevent['logid']), str(logevent['ns']), str(logevent['pageid']), logevent['timestamp'], logevent['title'], logevent['type'], str(logevent['user'])] if "rights" in logevent: le_output.extend(['false', logevent['rights']['new'], logevent['rights']['old']]) else: le_output.extend(['true', '', '']) out.write("\t".join(le_output) + "\n") out.flush() # output data def write_blockevents(logevents, out): for logevent in logevents: # if there is hidden information, we skip this one because there # is nothing to report if any(['userhidden' in logevent, 'actionhidden' in logevent, 'commenthidden' in logevent]): continue le_output = [logevent['comment'], str(logevent['logid']), str(logevent['ns']), str(logevent['pageid']), logevent['timestamp'], logevent['title'], logevent['type'], str(logevent['user'])] if "rights" in logevent: le_output.extend(['false', logevent['rights']['new'], logevent['rights']['old']]) else: le_output.extend(['true', '', '']) out.write("\t".join(le_output) + "\n") out.flush() # output data def get_events_for_wiki(wikiname, url, output_dir, blocks_output=None, wikitype="wikia"): out = open("{0}/{1}.tsv".format(output_dir, wikiname), "w") out.write("\t".join(['comment', 'logid', 'ns', 'pageid', 'timestamp', 'title', 'type', 'user', 'ancient', 'rights-new', 'rights-old\n'])) if wikitype == "wikia": api_url = url + '/api.php' else: # wikitype == wikipedia api_url = url + "/w/api.php" letype = 'rights' if blocks_output is not None: letype = 'rights|block|unblock' blockout = open("{0}/{1}.tsv".format(blocks_output, wikiname), "w") blockout.write("\t".join(['comment', 'logid', 'ns', 'pageid', 'timestamp', 'title', 'type', 'user', 'ancient', 'rights-new', 'rights-old\n'])) query = {'action': 'query', 'list': 'logevents', 'letype': letype, 'lelimit': '500', 'format': 'json', 'ledir': 'newer'} try: response = requests.get(api_url, params=query) rv = response.json() except (JSONDecodeError): api_url = response.url # print api_url # debug if wikitype == "wikia": re_str = "^http://(community|www).wikia.com/" else: # wikitype == "wikipedia" re_str = "^(http|https)://.*wikipedia.org/" if re.match(re_str, api_url): # api_url # 'http://community.wikia.com/wiki/Community_Central:Not_a_valid_Wikia': print("ERROR: %s no longer exists" % wikiname) return else: response = requests.get(api_url, params=query) rv = response.json() try: logevents = rv['query']['logevents'] blockevents = [e for e in logevents if (e['action'] in ['block', 'unblock']) or (e['type'] in ['block', 'unblock'])] logevents = [e for e in logevents if e not in blockevents] write_logevents(logevents, out) write_blockevents(blockevents, blockout) except KeyError: print("ERROR: %s contains no logevent data" % wikiname) return while 'query-continue' in rv or 'continue' in rv: if 'query-continue' in rv: query['lestart'] = rv['query-continue']['logevents']['lestart'] else: query['continue'] = str(rv['continue']) query['lecontinue'] = str(rv['continue']['lecontinue']) response = requests.get(api_url, params=query) rv = response.json() logevents = rv['query']['logevents'] write_logevents(logevents, out) out.close() # the call is # %run userroles_from_logevents.py --sep=\\t --nuke-old --blocks-output=/com/projects/messagewalls/userroles/blockevents ../identifyWikis/wikiteamWikilist.tsv /com/projects/messagewalls/userroles/listusers if __name__ == '__main__': parser = argparse.ArgumentParser( description="Get user roles for Wikis from the Mediawiki list users API") parser = add_parser_arguments(parser) parser.add_argument('--blocks-output', type=str, help='Path to output block event logs. If empty, blocks are ignored.' ) args = parser.parse_args() output_path = args.output blocks_output = args.blocks_output header = not args.no_header prepare_output(output_path, args.nuke_old) if blocks_output is not None: prepare_output(blocks_output, args.nuke_old) wikilist = read_wikilist(args) deleted = [] notauthorized = [] files = [os.path.join(output_path, i) for i in os.listdir(output_path)] # interate through the list of wikis # for line in ["anime,http://anime.wikia.com/"]: # for line in ["blogging,http://blogging.wikia.com/"]: wikilist = read_wikilist(args) # for line in open("list_of_wikis.csv", "r").readlines(): for wiki, url, wikitype in wikilist: if "{0}.{1}".format(path.join(output_path, wiki), 'tsv') in files: print("SKIPPING: file \"%s\" already exists)" % wiki) continue if wiki in files: print("SKIPPING: file \"%s\" already exists)" % wiki) continue print("Processing wiki: %s" % wiki) get_events_for_wiki( wiki, url, output_dir=output_path, blocks_output=blocks_output, wikitype=wikitype) time.sleep(1)