X-Git-Url: https://code.communitydata.science/covid19.git/blobdiff_plain/09d171608f699eafe6528325938a892937a65302..98b07b8098611287eaa775b09622d1f3514303c8:/keywords/src/wikidata_search.py diff --git a/keywords/src/wikidata_search.py b/keywords/src/wikidata_search.py new file mode 100644 index 0000000..a3abbc0 --- /dev/null +++ b/keywords/src/wikidata_search.py @@ -0,0 +1,95 @@ +# generate a list of wikidata entities related to keywords +from os import path +from sys import stdout +from wikidata_api_calls import search_wikidata, get_wikidata_api +import csv +from itertools import chain + +class Wikidata_ResultSet: + def __init__(self): + self.results = [] + + def extend(self, term, results): + self.results.append( + (Wikidata_Result(term, result, i) + for i, result in enumerate(results)) + ) + + def to_csv(self, outfile=None, mode='w'): + if outfile is None: + of = stdout + + else: + if path.exists(outfile) and mode != 'w': + of = open(outfile,'a',newline='') + else: + of = open(outfile,'w',newline='') + writer = csv.writer(of) + writer.writerow(Wikidata_Result.__slots__) + writer.writerows(map(Wikidata_Result.to_list, chain(* self.results))) + + +class Wikidata_Result: + # store unique entities found in the search results, the position in the search result, and the date + __slots__=['search_term','entityid','pageid','search_position','timestamp'] + + def __init__(self, + term, + search_result, + position): + + self.search_term = term.strip() + self.entityid = search_result['title'] + self.pageid = int(search_result['pageid']) + self.search_position = int(position) + self.timestamp = search_result['timestamp'] + + def to_list(self): + return [self.search_term, + self.entityid, + self.pageid, + self.search_position, + self.timestamp] + +def run_wikidata_searches(terms): + api = get_wikidata_api() + resultset = Wikidata_ResultSet() + for term in terms: + search_results = search_wikidata(api, term) + resultset.extend(term, search_results) + return resultset + +def read_google_trends_files(terms_files): + def _read_file(infile): + return csv.DictReader(open(infile,'r',newline='')) + + for row in chain(* [_read_file(terms_file) for terms_file in terms_files]): + yield row['query'] + + +def trawl_google_trends(terms_files, outfile = None, mode='w'): + terms = list(read_google_trends_files(terms_files)) + resultset = run_wikidata_searches(terms) + resultset.to_csv(outfile, mode) + +def trawl_base_terms(infiles, outfile = None, mode='w'): + terms = list(chain(* (open(infile,'r') for infile in infiles))) + resultset = run_wikidata_searches(terms) + resultset.to_csv(outfile, mode) + + ## search each of the base terms in wikidata + + # store unique entities found in the search results, the position in the search result, and the date + +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser("Search wikidata for entities related to a set of terms.") + parser.add_argument('inputs', type=str, nargs='+', help='one or more files to read') + parser.add_argument('--use-gtrends', action='store_true', help = 'toggle whether the input is the output from google trends') + parser.add_argument('--output', type=str, help='an output file. defaults to stdout') + parser.add_argument('--overwrite', action='store_true', help = 'overwrite existing output files instead of appending') + args = parser.parse_args() + if args.use_gtrends: + trawl_google_trends(args.inputs, args.output) + else: + trawl_base_terms(args.inputs, args.output)