]> code.communitydata.science - covid19.git/blob - keywords/src/wikidata_search.py
a3abbc0d800732298cc6481261dc967945586a18
[covid19.git] / keywords / src / wikidata_search.py
1 # generate a list of wikidata entities related to keywords
2 from os import path
3 from sys import stdout
4 from wikidata_api_calls import search_wikidata, get_wikidata_api
5 import csv
6 from itertools import chain
7
8 class Wikidata_ResultSet:
9     def __init__(self):
10         self.results = []
11
12     def extend(self, term, results):
13         self.results.append(
14             (Wikidata_Result(term, result, i)
15              for i, result in enumerate(results))
16         )
17
18     def to_csv(self, outfile=None, mode='w'):
19         if outfile is None:
20             of = stdout
21
22         else:
23             if path.exists(outfile) and mode != 'w':
24                 of = open(outfile,'a',newline='')
25             else:
26                 of = open(outfile,'w',newline='')
27         writer = csv.writer(of)
28         writer.writerow(Wikidata_Result.__slots__)
29         writer.writerows(map(Wikidata_Result.to_list, chain(* self.results)))
30
31
32 class Wikidata_Result:
33     # store unique entities found in the search results, the position in the search result, and the date
34     __slots__=['search_term','entityid','pageid','search_position','timestamp']
35
36     def __init__(self,
37                  term,
38                  search_result,
39                  position):
40
41         self.search_term = term.strip()
42         self.entityid = search_result['title']
43         self.pageid = int(search_result['pageid'])
44         self.search_position = int(position)
45         self.timestamp = search_result['timestamp']
46
47     def to_list(self):
48         return [self.search_term,
49                 self.entityid,
50                 self.pageid,
51                 self.search_position,
52                 self.timestamp]
53     
54 def run_wikidata_searches(terms):
55     api = get_wikidata_api()
56     resultset = Wikidata_ResultSet()
57     for term in terms:
58         search_results = search_wikidata(api, term)
59         resultset.extend(term, search_results)
60     return resultset
61
62 def read_google_trends_files(terms_files):
63     def _read_file(infile):
64         return csv.DictReader(open(infile,'r',newline=''))
65
66     for row in chain(* [_read_file(terms_file) for terms_file in terms_files]):
67         yield row['query']
68
69
70 def trawl_google_trends(terms_files, outfile = None, mode='w'):
71     terms = list(read_google_trends_files(terms_files))
72     resultset = run_wikidata_searches(terms)
73     resultset.to_csv(outfile, mode)
74
75 def trawl_base_terms(infiles, outfile = None, mode='w'):
76     terms = list(chain(* (open(infile,'r') for infile in infiles)))
77     resultset = run_wikidata_searches(terms)
78     resultset.to_csv(outfile, mode)
79
80     ## search each of the base terms in wikidata
81
82     # store unique entities found in the search results, the position in the search result, and the date
83
84 if __name__ == "__main__":
85     import argparse
86     parser = argparse.ArgumentParser("Search wikidata for entities related to a set of terms.")
87     parser.add_argument('inputs', type=str, nargs='+', help='one or more files to read')
88     parser.add_argument('--use-gtrends', action='store_true', help = 'toggle whether the input is the output from google trends')
89     parser.add_argument('--output', type=str, help='an output file. defaults to stdout')
90     parser.add_argument('--overwrite', action='store_true', help = 'overwrite existing output files instead of appending')
91     args = parser.parse_args()
92     if args.use_gtrends:
93         trawl_google_trends(args.inputs, args.output)
94     else:
95         trawl_base_terms(args.inputs, args.output)

Community Data Science Collective || Want to submit a patch?