Coverage for src / crawler / cmds / augment / zbmath.py: 19%

116 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-03-19 14:59 +0000

1import logging 

2from concurrent.futures import Future, ThreadPoolExecutor 

3from typing import TYPE_CHECKING 

4 

5from history.model_data import HistoryChildDict, MatchingEventResult 

6from history.models import HistoryEventStatus 

7from history.utils import insert_history_event 

8from ptf import model_helpers 

9from ptf.cmds.base_cmds import baseCmd 

10from ptf.model_helpers import add_or_update_extid, get_extid 

11from ptf.models import Article, Collection 

12 

13from crawler.cmds.augment import update_article_with_xarticle 

14from crawler.models.source import Source 

15from crawler.zbmath import ( 

16 match_zbl_article, 

17 zbmath_request_article, 

18 zbmath_request_article_by_arxivId, 

19 zbmath_request_article_by_doi, 

20) 

21 

22_logger = logging.getLogger(__name__) 

23 

24if TYPE_CHECKING: 

25 from collections.abc import Callable 

26 from typing import NotRequired, TypedDict 

27 

28 from ptf.models import ArticleQuerySet 

29 

30 class MatchZblArticlesParams(TypedDict): 

31 source_id: str 

32 "Source 'domain' (unique identifier) for filtering articles" 

33 collection_pid: NotRequired[str] 

34 "Collection (issue/volume) identifier for filtering articles" 

35 update_pdf_link: NotRequired[bool] 

36 "Optional, False by default. Controls whenever our pdf link is replaced with zbmath's one" 

37 issue_pids: NotRequired[list[str]] 

38 "Optional. List of issue PIDS to filter the matching with" 

39 

40 class MatchingOperationMessage(TypedDict): 

41 pid: str 

42 zbl_id: "NotRequired[str]" 

43 score: "NotRequired[int]" 

44 status: HistoryEventStatus 

45 message: "NotRequired[str]" 

46 status_message: "NotRequired[str]" 

47 

48 

49hardcoded_overrides = { 

50 "AM_1999_150_1_a3": "0995.14018", 

51 "AM_1999_149_2_a6": "0937.52004", 

52 "AM_1999_149_3_a8": "0942.58016", 

53 "AM_2000_151_2_a1": "1078.12500", 

54 "AM_2000_151_2_a5": "1037.11049", 

55 "AM_2000_151_2_a6": "1037.11050", 

56 "AM_2000_151_2_a9": "0956.22012", 

57 "AM_2000_152_1_a1": "0989.11023", 

58 "AM_2000_152_2_a3": "1042.14018", 

59 "AM_1999_150_3_a12": "900398017", 

60 "ELA_2001__8__a10": "0989.34004", 

61 "ELA_2006__15__a10": "1142.15303", 

62} 

63 

64 

65def compile_history_events(event_messages: "list[MatchingOperationMessage]"): 

66 children = [] 

67 for msg in event_messages: 

68 resource = model_helpers.get_resource(msg["pid"]) 

69 if not resource: 

70 raise ValueError("Resource not found") 

71 data: "HistoryChildDict" = { 

72 "resource": resource, 

73 "type": "zbl-id", 

74 "status": msg.get("status"), 

75 } 

76 

77 if "zbl_id" in msg and msg["zbl_id"] != "": 

78 data["url"] = f"https://zbmath.org/{msg['zbl_id']}" 

79 

80 if "score" in msg: 

81 data["score"] = msg["score"] 

82 

83 if "message" in msg: 

84 data["message"] = msg["message"] 

85 

86 if "status_message" in msg: 

87 data["status_message"] = msg["status_message"] 

88 

89 children.append(data) 

90 return children 

91 

92 

93class AugmentZblArticles(baseCmd[None]): 

94 """ 

95 Matches all articles zith ZBL in a source/collection 

96 

97 Adds ZBLIDs 

98 """ 

99 

100 # class settings 

101 required_params = ["queryset"] 

102 # Input params 

103 collection_pid: str | None 

104 source_id: str 

105 # Init properties 

106 event_messages: "list[MatchingOperationMessage]" 

107 collection: "Collection | None" = None 

108 queryset: "ArticleQuerySet" 

109 

110 collection_pid = None 

111 update_pdf_link = False 

112 issue_pids: list[str] = [] 

113 

114 callback: "Callable" = lambda _: _ 

115 

116 def __init__(self, params: "MatchZblArticlesParams"): 

117 super().__init__(params) 

118 self.event_messages = [] 

119 self.queryset = Article.objects.all() 

120 if self.collection_pid: 

121 self.queryset = self.queryset.filter( 

122 my_container__my_collection__pid=self.collection_pid 

123 ) 

124 self.collection = Collection.objects.get(pid=self.collection_pid) 

125 if self.source_id: 

126 source = Source.objects.get(domain=self.source_id) 

127 self.queryset = self.queryset.filter(my_container__origin__source=source) 

128 if self.issue_pids: 

129 self.queryset = self.queryset.filter(my_container__pid__in=self.issue_pids) 

130 

131 def internal_do(self) -> None: 

132 promises: "set[Future]" = set() 

133 with ThreadPoolExecutor(max_workers=1) as executor: 

134 for article in self.queryset.iterator(chunk_size=2000): 

135 xarticle = self.find_zbl_article(article) 

136 if not xarticle: 

137 self.callback() 

138 continue 

139 

140 # Handle asyncronous results and exceptions as soon as possible 

141 completed: "set[Future]" = set() 

142 for f in promises: 

143 if f.done(): 

144 completed.add(f) 

145 exception = f.exception() 

146 if exception: 

147 executor.shutdown(wait=False, cancel_futures=True) 

148 raise exception 

149 

150 promises = promises - completed 

151 # Runs the database insertion in parallel 

152 promise = executor.submit( 

153 update_article_with_xarticle, 

154 article, 

155 xarticle, 

156 merge_titles=False, 

157 update_pdf_link=self.update_pdf_link, 

158 ) 

159 promise.add_done_callback(self.callback) 

160 promises.add(promise) 

161 

162 def find_zbl_article(self, article: "Article"): 

163 """Finds zbl article using extids, fallback using marching if no relevant extid is found""" 

164 

165 # already present 

166 zbl_item_id = get_extid(article, "zbl-item-id") 

167 if zbl_item_id: 

168 xarticle = zbmath_request_article(zbl_item_id.id_value) 

169 if xarticle: 

170 _logger.debug(f"Article {article.pid} already has a zblid") 

171 self.event_messages.append( 

172 { 

173 "pid": article.pid, 

174 "zbl_id": zbl_item_id.id_value, 

175 "status": HistoryEventStatus.OK, 

176 "status_message": MatchingEventResult.ALREADY_PRESENT, 

177 } 

178 ) 

179 return xarticle 

180 

181 # hardcoded 

182 if article.pid in hardcoded_overrides: 

183 zbl_id = hardcoded_overrides[article.pid] 

184 add_or_update_extid(article, "zbl-item-id", zbl_id, False, False) 

185 xarticle = zbmath_request_article(zbl_id) 

186 if xarticle: 

187 _logger.debug(f"Adding hardcoded zblid to {article.pid}") 

188 self.event_messages.append( 

189 { 

190 "pid": xarticle.pid or article.pid, 

191 "zbl_id": zbl_id, 

192 "status": HistoryEventStatus.OK, 

193 "status_message": MatchingEventResult.ADDED, 

194 "message": "hardcoded", 

195 } 

196 ) 

197 return xarticle 

198 

199 # use extids 

200 if article.doi: 

201 xarticle = zbmath_request_article_by_doi(article.doi) 

202 if xarticle: 

203 _logger.debug(f"Found matching doi in zbmath for article {article.pid}") 

204 zbl_id: str = next(i[1] for i in xarticle.extids if i[0] == "zbl-item-id") 

205 self.event_messages.append( 

206 { 

207 "pid": article.pid, 

208 "zbl_id": zbl_id, 

209 "status": HistoryEventStatus.OK, 

210 "status_message": MatchingEventResult.ADDED, 

211 "message": "doi_found", 

212 } 

213 ) 

214 return xarticle 

215 

216 # fallback to matching 

217 item: dict | None = match_zbl_article(article) 

218 if not item: 

219 self.event_messages.append( 

220 { 

221 "pid": article.pid, 

222 "status": HistoryEventStatus.WARNING, 

223 "status_message": MatchingEventResult.NOT_FOUND, 

224 } 

225 ) 

226 return 

227 

228 if item["score"] < 8: 

229 _logger.info(f"Got score {item['score']} for resource {article.pid}") 

230 self.event_messages.append( 

231 { 

232 "pid": article.pid, 

233 "zbl_id": item["zbl_id"], 

234 "status": HistoryEventStatus.WARNING, 

235 "status_message": MatchingEventResult.LOW_SCORE, 

236 "score": item["score"], 

237 "message": "matching", 

238 } 

239 ) 

240 return 

241 if item["zbl_id"].startswith("arXiv:") and "arxiv_id" in item: 

242 # Sometimes the zbl_id returned by the matching API is not the REAL zbl_id. 

243 xarticle = zbmath_request_article_by_arxivId(item["arxiv_id"]) 

244 else: 

245 add_or_update_extid(article, "zbl-item-id", item["zbl_id"], False, False) 

246 xarticle = zbmath_request_article(item["zbl_id"]) 

247 if xarticle: 

248 _logger.debug(f"Got score {item['score']} for resource {article.pid}. Adding zbl_id") 

249 self.event_messages.append( 

250 { 

251 "pid": article.pid, 

252 "zbl_id": item["zbl_id"], 

253 "status": HistoryEventStatus.OK, 

254 "status_message": MatchingEventResult.ADDED, 

255 "score": item["score"], 

256 "message": "matching", 

257 } 

258 ) 

259 return xarticle 

260 

261 def insert_history_event(self): 

262 insert_history_event( 

263 { 

264 "pid": f"{self.source_id}_{self.collection_pid or ''}_matching", 

265 "col": self.collection, 

266 "source": self.source_id, 

267 "status": HistoryEventStatus.OK, 

268 "type": "matching", 

269 "children": compile_history_events(self.event_messages), 

270 } 

271 )