from sys import stdout
from wikidata_api_calls import search_wikidata, get_wikidata_api
import csv
+from itertools import chain
class Wikidata_ResultSet:
def __init__(self):
self.results = []
def extend(self, term, results):
- self.results.extend([Wikidata_Result(term, result, i)
- for i, result in enumerate(results)])
+ self.results.append(
+ (Wikidata_Result(term, result, i)
+ for i, result in enumerate(results))
+ )
def to_csv(self, outfile=None):
if outfile is None:
else:
of = open(outfile,'w',newline='')
-
writer = csv.writer(of)
writer.writerow(Wikidata_Result.__slots__)
- writer.writerows(map(Wikidata_Result.to_list, self.results))
+ writer.writerows(map(Wikidata_Result.to_list, chain(* self.results)))
class Wikidata_Result:
self.search_position,
self.timestamp]
-def run_wikidata_searches(terms_file = '../data/input/base_terms.txt', outfile="../data/output/wikidata_search_results.csv"):
-
+def run_wikidata_searches(terms):
+ api = get_wikidata_api()
resultset = Wikidata_ResultSet()
- for term in open(terms_file,'r'):
- api = get_wikidata_api()
+ for term in terms:
search_results = search_wikidata(api, term)
resultset.extend(term, search_results)
+ return resultset
+
+def read_google_trends_files(terms_files):
+ def _read_file(infile):
+ return csv.DictReader(open(infile,'r',newline=''))
+ for row in chain(* [_read_file(terms_file) for terms_file in terms_files]):
+ yield row['query']
+
+
+def trawl_google_trends(terms_files, outfile = None):
+ terms = read_google_trends_files(terms_files)
+ resultset = run_wikidata_searches(terms)
resultset.to_csv(outfile)
+def trawl_base_terms(infiles, outfile = None):
+ terms = chain(* (open(infile,'r') for infile in infiles))
+ resultset = run_wikidata_searches(terms)
+ resultset.to_csv(outfile)
## search each of the base terms in wikidata
# store unique entities found in the search results, the position in the search result, and the date
if __name__ == "__main__":
- run_wikidata_searches()
+ import argparse
+ parser = argparse.ArgumentParser("Search wikidata for entities related to a set of terms.")
+ parser.add_argument('inputs', type=str, nargs='+', help='one or more files to read')
+ parser.add_argument('--use-gtrends', action='store_true', help = 'toggle whether the input is the output from google trends')
+ parser.add_argument('--output', type=str, help='an output file. defaults to stdout')
+ args = parser.parse_args()
+ if args.use_gtrends:
+ trawl_google_trends(args.inputs, args.output)
+ else:
+ trawl_base_terms(args.inputs, args.output)
class LabelData:
__slots__ = ['entityid','label','langcode','is_alt']
- def __init__(self, wd_res, entityid, is_alt):
+ def __init__(self, wd_res, is_alt):
obj = wd_res.get('label',None)
self.label = obj.get('value',None)
self.langcode = obj.get('xml:lang',None)
- self.entityid = entityid
+ self.entityid = wd_res.get('entity',None).get('value',None)
self.is_alt = is_alt
def to_list(self):
self.langcode,
self.is_alt]
-
-def GetAllLabels(in_csv, outfile, topN):
+def GetAllLabels(in_csvs, outfile, topNs):
def load_entity_ids(in_csv, topN=5):
with open(in_csv,'r',newline='') as infile:
if int(row['search_position']) < topN:
yield row["entityid"]
- ids = set(load_entity_ids(in_csv, topN))
+ ids = set(chain(* map(lambda in_csv, topN: load_entity_ids(in_csv, topN), in_csvs, topNs)))
- labeldata = chain(* map(GetEntityLabels, ids))
+ labeldata = GetEntityLabels(ids)
with open(outfile, 'w', newline='') as of:
writer = csv.writer(of)
writer.writerows(map(LabelData.to_list,labeldata))
-def GetEntityLabels(entityid):
+def GetEntityLabels(entityids):
- def run_query_and_parse(query, entityid, is_alt):
- results = run_sparql_query(query % entityid)
+ def run_query_and_parse(query, is_alt):
+ results = run_sparql_query(query)
try:
jobj = results.json()
+
res = jobj.get('results',None)
if res is not None:
res = res.get('bindings',None)
if res is None:
raise requests.APIError(f"got invalid response from wikidata for {query % entityid}")
+
for info in res:
- yield LabelData(info, entityid, is_alt)
+ yield LabelData(info, is_alt)
except JSONDecodeError as e:
print(e)
- print(query % entityid)
+ print(query)
-
- label_base_query = """
- SELECT DISTINCT ?label WHERE {
- wd:%s rdfs:label ?label;
- }"""
-
- altLabel_base_query = """
- SELECT DISTINCT ?label WHERE {
- wd:%s skos:altLabel ?label;
- }"""
-
- label_results = run_query_and_parse(label_base_query, entityid, is_alt=False)
-
- altLabel_results = run_query_and_parse(altLabel_base_query, entityid, is_alt=True)
-
- return chain(label_results, altLabel_results)
+ def prep_query(query, prop, entityids):
+ values = ' '.join(('wd:{0}'.format(id) for id in entityids))
+ return query.format(prop, values)
+
+ base_query = """
+ SELECT DISTINCT ?entity ?label WHERE {{
+ ?entity {0} ?label;
+ VALUES ?entity {{ {1} }}
+ }}"""
+
+ # we can't get all the entities at once. how about 100 at a time?
+ chunksize = 100
+ entityids = (id for id in entityids)
+ chunk = list(islice(entityids, chunksize))
+ calls = []
+ while len(chunk) > 0:
+ label_query = prep_query(base_query, "rdfs:label", chunk)
+ altLabel_query = prep_query(base_query, "skos:altLabel", chunk)
+ label_results = run_query_and_parse(label_query, is_alt=False)
+ altLabel_results = run_query_and_parse(altLabel_query, is_alt=True)
+ calls.extend([label_results, altLabel_results])
+ chunk = list(islice(entityids, chunksize))
+
+ return chain(*calls)
if __name__ == "__main__":
- GetAllLabels("../data/output/wikidata_search_results.csv","../data/output/wikidata_entity_labels.csv", topN=20)
+ import argparse
+ parser = argparse.ArgumentParser("Use wikidata to find transliterations of terms")
+ parser.add_argument('inputs', type=str, nargs='+', help='one or more files to read. the inputs are generated by wikidata_search.py')
+ parser.add_argument('--topN', type=int, nargs='+', help='limit number of wikidata search results to use, can pass one arg for each source.')
+ parser.add_argument('--output', type=str, help='an output file. defaults to stdout',default=20)
+
+ args = parser.parse_args()
+
+ GetAllLabels(args.inputs, args.output, topNs=args.topN)