1 from wikidata_api_calls import run_sparql_query
2 from itertools import chain, islice
4 from json import JSONDecodeError
8 __slots__ = ['itemid','label','langcode','is_alt']
10 def __init__(self, wd_res, is_alt):
11 obj = wd_res.get('label',None)
12 self.label = obj.get('value',None)
13 self.langcode = obj.get('xml:lang',None)
14 self.itemid = wd_res.get('item',None).get('value',None)
23 def GetAllLabels(in_csvs, outfile, topNs):
25 def load_item_ids(in_csv, topN=5):
26 with open(in_csv,'r',newline='') as infile:
27 reader = list(csv.DictReader(infile))
29 if int(row['search_position']) < topN:
32 ids = set(chain(* map(lambda in_csv, topN: load_item_ids(in_csv, topN), in_csvs, topNs)))
33 ids = ids.union(open("../resources/main_items.txt"))
35 labeldata = GetItemLabels(ids)
37 with open(outfile, 'w', newline='') as of:
38 writer = csv.writer(of)
39 writer.writerow(LabelData.__slots__)
40 writer.writerows(map(LabelData.to_list,labeldata))
43 def GetItemLabels(itemids):
45 def run_query_and_parse(query, is_alt):
46 results = run_sparql_query(query)
50 res = jobj.get('results',None)
52 res = res.get('bindings',None)
54 raise requests.APIError(f"got invalid response from wikidata for {query % itemid}")
57 yield LabelData(info, is_alt)
59 except JSONDecodeError as e:
63 def prep_query(query, prop, itemids):
64 values = ' '.join(('wd:{0}'.format(id) for id in itemids))
65 return query.format(prop, values)
68 SELECT DISTINCT ?item ?label WHERE {{
70 VALUES ?item {{ {1} }}
73 # we can't get all the items at once. how about 100 at a time?
75 itemids = (id for id in itemids)
76 chunk = list(islice(itemids, chunksize))
79 label_query = prep_query(base_query, "rdfs:label", chunk)
80 altLabel_query = prep_query(base_query, "skos:altLabel", chunk)
81 label_results = run_query_and_parse(label_query, is_alt=False)
82 altLabel_results = run_query_and_parse(altLabel_query, is_alt=True)
83 calls.extend([label_results, altLabel_results])
84 chunk = list(islice(itemids, chunksize))
89 def find_new_output_file(output, i = 1):
90 if path.exists(output):
91 name, ext = path.splitext(output)
93 return find_new_output_file(f"{name}_{i}{ext}", i+1)
97 if __name__ == "__main__":
99 parser = argparse.ArgumentParser("Use wikidata to find translations of terms")
100 parser.add_argument('inputs', type=str, nargs='+', help='one or more files to read. the inputs are generated by wikidata_search.py')
101 parser.add_argument('--topN', type=int, nargs='+', help='limit number of wikidata search results to use, can pass one arg for each source.')
102 parser.add_argument('--output', type=str, help='an output file. defaults to stdout',default=20)
104 args = parser.parse_args()
106 output = find_new_output_file(args.output)
108 GetAllLabels(args.inputs, output, topNs=args.topN)