forked from recommenders-team/recommenders
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwikidata.py
250 lines (197 loc) · 6.8 KB
/
wikidata.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import pandas as pd
import requests
import logging
from retrying import retry
logger = logging.getLogger(__name__)
API_URL_WIKIPEDIA = "https://en.wikipedia.org/w/api.php"
API_URL_WIKIDATA = "https://query.wikidata.org/sparql"
SESSION = None
def get_session(session=None):
"""Get session object
Args:
session (requests.Session): request session object
Returns:
requests.Session: request session object
"""
if session is None:
global SESSION
if SESSION is None:
SESSION = requests.Session()
session = SESSION
return session
@retry(wait_random_min=1000, wait_random_max=5000, stop_max_attempt_number=5)
def find_wikidata_id(name, limit=1, session=None):
"""Find the entity ID in wikidata from a title string.
Args:
name (str): A string with search terms (eg. "Batman (1989) film")
limit (int): Number of results to return
session (requests.Session): requests session to reuse connections
Returns:
str: wikidata entityID corresponding to the title string. 'entityNotFound' will be returned if no page is found
"""
session = get_session(session=session)
params = dict(
action="query",
list="search",
srsearch=bytes(name, encoding="utf8"),
srlimit=limit,
srprop="",
format="json",
)
try:
response = session.get(API_URL_WIKIPEDIA, params=params)
page_id = response.json()["query"]["search"][0]["pageid"]
except Exception as e:
# TODO: distinguish between connection error and entity not found
logger.error("ENTITY NOT FOUND")
return "entityNotFound"
params = dict(
action="query",
prop="pageprops",
ppprop="wikibase_item",
pageids=[page_id],
format="json",
)
try:
response = session.get(API_URL_WIKIPEDIA, params=params)
entity_id = response.json()["query"]["pages"][str(page_id)]["pageprops"][
"wikibase_item"
]
except Exception as e:
# TODO: distinguish between connection error and entity not found
logger.error("ENTITY NOT FOUND")
return "entityNotFound"
return entity_id
@retry(wait_random_min=1000, wait_random_max=5000, stop_max_attempt_number=5)
def query_entity_links(entity_id, session=None):
"""Query all linked pages from a wikidata entityID
Args:
entity_id (str): A wikidata entity ID
session (requests.Session): requests session to reuse connections
Returns:
json: Dictionary with linked pages.
"""
query = (
"""
PREFIX entity: <http://www.wikidata.org/entity/>
#partial results
SELECT ?propUrl ?propLabel ?valUrl ?valLabel
WHERE
{
hint:Query hint:optimizer 'None' .
{ BIND(entity:"""
+ entity_id
+ """ AS ?valUrl) .
BIND("N/A" AS ?propUrl ) .
BIND("identity"@en AS ?propLabel ) .
}
UNION
{ entity:"""
+ entity_id
+ """ ?propUrl ?valUrl .
?property ?ref ?propUrl .
?property rdf:type wikibase:Property .
?property rdfs:label ?propLabel
}
?valUrl rdfs:label ?valLabel
FILTER (LANG(?valLabel) = 'en') .
OPTIONAL{ ?valUrl wdt:P18 ?picture .}
FILTER (lang(?propLabel) = 'en' )
}
ORDER BY ?propUrl ?valUrl
LIMIT 500
"""
)
session = get_session(session=session)
try:
data = session.get(
API_URL_WIKIDATA, params=dict(query=query, format="json")
).json()
except Exception as e:
logger.error("ENTITY NOT FOUND")
return {}
return data
def read_linked_entities(data):
"""Obtain lists of liken entities (IDs and names) from dictionary
Args:
data (json): dictionary with linked pages
Returns:
list, list:
- List of liked entityIDs.
- List of liked entity names.
"""
return [
(
c.get("valUrl").get("value").replace("http://www.wikidata.org/entity/", ""),
c.get("valLabel").get("value"),
)
for c in data.get("results", {}).get("bindings", [])
]
@retry(wait_random_min=1000, wait_random_max=5000, stop_max_attempt_number=5)
def query_entity_description(entity_id, session=None):
"""Query entity wikidata description from entityID
Args:
entity_id (str): A wikidata page ID.
session (requests.Session): requests session to reuse connections
Returns:
str: Wikidata short description of the entityID
descriptionNotFound' will be returned if no description is found
"""
query = (
"""
PREFIX wd: <http://www.wikidata.org/entity/>
PREFIX schema: <http://schema.org/>
SELECT ?o
WHERE
{
wd:"""
+ entity_id
+ """ schema:description ?o.
FILTER ( lang(?o) = "en" )
}
"""
)
session = get_session(session=session)
try:
r = session.get(API_URL_WIKIDATA, params=dict(query=query, format="json"))
description = r.json()["results"]["bindings"][0]["o"]["value"]
except Exception as e:
logger.error("DESCRIPTION NOT FOUND")
return "descriptionNotFound"
return description
def search_wikidata(names, extras=None, describe=True, verbose=False):
"""Create DataFrame of Wikidata search results
Args:
names (list[str]): List of names to search for
extras (dict(str: list)): Optional extra items to assign to results for corresponding name
describe (bool): Optional flag to include description of entity
verbose (bool): Optional flag to print out intermediate data
Returns:
pandas.DataFrame: Wikipedia results for all names with found entities
"""
results = []
for idx, name in enumerate(names):
entity_id = find_wikidata_id(name)
if verbose:
print("name: {name}, entity_id: {id}".format(name=name, id=entity_id))
if entity_id == "entityNotFound":
continue
json_links = query_entity_links(entity_id)
related_links = read_linked_entities(json_links)
description = query_entity_description(entity_id) if describe else ""
for related_entity, related_name in related_links:
result = dict(
name=name,
original_entity=entity_id,
linked_entities=related_entity,
name_linked_entities=related_name,
)
if describe:
result["description"] = description
if extras is not None:
for field, lst in extras.items():
result[field] = lst[idx]
results.append(result)
return pd.DataFrame(results)