from itertools import chain, islice
import csv
from json import JSONDecodeError
+from os import path
class LabelData:
__slots__ = ['entityid','label','langcode','is_alt']
- def __init__(self, wd_res, entityid, is_alt):
+ def __init__(self, wd_res, is_alt):
obj = wd_res.get('label',None)
self.label = obj.get('value',None)
self.langcode = obj.get('xml:lang',None)
- self.entityid = entityid
+ self.entityid = wd_res.get('entity',None).get('value',None)
self.is_alt = is_alt
def to_list(self):
self.langcode,
self.is_alt]
-
-def GetAllLabels(in_csv, outfile, topN):
+def GetAllLabels(in_csvs, outfile, topNs):
def load_entity_ids(in_csv, topN=5):
with open(in_csv,'r',newline='') as infile:
- reader = csv.DictReader(infile)
+ reader = list(csv.DictReader(infile))
for row in reader:
if int(row['search_position']) < topN:
yield row["entityid"]
- ids = set(load_entity_ids(in_csv, topN))
+ ids = set(chain(* map(lambda in_csv, topN: load_entity_ids(in_csv, topN), in_csvs, topNs)))
- labeldata = chain(* map(GetEntityLabels, ids))
+ labeldata = GetEntityLabels(ids)
with open(outfile, 'w', newline='') as of:
writer = csv.writer(of)
writer.writerows(map(LabelData.to_list,labeldata))
-def GetEntityLabels(entityid):
+def GetEntityLabels(entityids):
- def run_query_and_parse(query, entityid, is_alt):
- results = run_sparql_query(query % entityid)
+ def run_query_and_parse(query, is_alt):
+ results = run_sparql_query(query)
try:
jobj = results.json()
+
res = jobj.get('results',None)
if res is not None:
res = res.get('bindings',None)
if res is None:
raise requests.APIError(f"got invalid response from wikidata for {query % entityid}")
+
for info in res:
- yield LabelData(info, entityid, is_alt)
+ yield LabelData(info, is_alt)
except JSONDecodeError as e:
print(e)
- print(query % entityid)
+ print(query)
+ def prep_query(query, prop, entityids):
+ values = ' '.join(('wd:{0}'.format(id) for id in entityids))
+ return query.format(prop, values)
+
+ base_query = """
+ SELECT DISTINCT ?entity ?label WHERE {{
+ ?entity {0} ?label;
+ VALUES ?entity {{ {1} }}
+ }}"""
+
+ # we can't get all the entities at once. how about 100 at a time?
+ chunksize = 100
+ entityids = (id for id in entityids)
+ chunk = list(islice(entityids, chunksize))
+ calls = []
+ while len(chunk) > 0:
+ label_query = prep_query(base_query, "rdfs:label", chunk)
+ altLabel_query = prep_query(base_query, "skos:altLabel", chunk)
+ label_results = run_query_and_parse(label_query, is_alt=False)
+ altLabel_results = run_query_and_parse(altLabel_query, is_alt=True)
+ calls.extend([label_results, altLabel_results])
+ chunk = list(islice(entityids, chunksize))
+
+ return chain(*calls)
+
- label_base_query = """
- SELECT DISTINCT ?label WHERE {
- wd:%s rdfs:label ?label;
- }"""
+def find_new_output_file(output, i = 1):
+ if path.exists(output):
+ name, ext = path.splitext(output)
- altLabel_base_query = """
- SELECT DISTINCT ?label WHERE {
- wd:%s skos:altLabel ?label;
- }"""
+ return find_new_output_file(f"{name}_{i}.{ext}", i+1)
+ else:
+ return output
- label_results = run_query_and_parse(label_base_query, entityid, is_alt=False)
+if __name__ == "__main__":
+ import argparse
+ parser = argparse.ArgumentParser("Use wikidata to find transliterations of terms")
+ parser.add_argument('inputs', type=str, nargs='+', help='one or more files to read. the inputs are generated by wikidata_search.py')
+ parser.add_argument('--topN', type=int, nargs='+', help='limit number of wikidata search results to use, can pass one arg for each source.')
+ parser.add_argument('--output', type=str, help='an output file. defaults to stdout',default=20)
- altLabel_results = run_query_and_parse(altLabel_base_query, entityid, is_alt=True)
+ args = parser.parse_args()
- return chain(label_results, altLabel_results)
-
+ output = find_new_output_file(args.output)
-if __name__ == "__main__":
- GetAllLabels("../data/output/wikidata_search_results.csv","../data/output/wikidata_entity_labels.csv", topN=20)
+ GetAllLabels(args.inputs, output, topNs=args.topN)