Coverage for src / crawler / cmds / augment / zbmath.py: 19%

114 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-02-02 15:55 +0000

1import logging 

2from concurrent.futures import Future, ThreadPoolExecutor 

3from typing import TYPE_CHECKING 

4 

5from history.model_data import HistoryChildDict, MatchingEventResult 

6from history.models import HistoryEventStatus 

7from history.utils import insert_history_event 

8from ptf import model_helpers 

9from ptf.cmds.base_cmds import baseCmd 

10from ptf.model_helpers import add_or_update_extid, get_extid 

11from ptf.models import Article, Collection 

12 

13from crawler.cmds.augment import update_article_with_xarticle 

14from crawler.models.source import Source 

15from crawler.zbmath import match_zbl_article, zbmath_request_article, zbmath_request_article_by_doi 

16 

17_logger = logging.getLogger(__name__) 

18 

19if TYPE_CHECKING: 

20 from collections.abc import Callable 

21 from typing import NotRequired, TypedDict 

22 

23 from ptf.models import ArticleQuerySet 

24 

25 class MatchZblArticlesParams(TypedDict): 

26 source_id: str 

27 "Source 'domain' (unique identifier) for filtering articles" 

28 collection_pid: NotRequired[str] 

29 "Collection (issue/volume) identifier for filtering articles" 

30 update_pdf_link: NotRequired[bool] 

31 "Optional, False by default. Controls whenever our pdf link is replaced with zbmath's one" 

32 issue_pids: NotRequired[list[str]] 

33 "Optional. List of issue PIDS to filter the matching with" 

34 

35 class MatchingOperationMessage(TypedDict): 

36 pid: str 

37 zbl_id: "NotRequired[str]" 

38 score: "NotRequired[int]" 

39 status: HistoryEventStatus 

40 message: "NotRequired[str]" 

41 status_message: "NotRequired[str]" 

42 

43 

44hardcoded_overrides = { 

45 "AM_1999_150_1_a3": "0995.14018", 

46 "AM_1999_149_2_a6": "0937.52004", 

47 "AM_1999_149_3_a8": "0942.58016", 

48 "AM_2000_151_2_a1": "1078.12500", 

49 "AM_2000_151_2_a5": "1037.11049", 

50 "AM_2000_151_2_a6": "1037.11050", 

51 "AM_2000_151_2_a9": "0956.22012", 

52 "AM_2000_152_1_a1": "0989.11023", 

53 "AM_2000_152_2_a3": "1042.14018", 

54 "AM_1999_150_3_a12": "900398017", 

55 "ELA_2001__8__a10": "0989.34004", 

56 "ELA_2006__15__a10": "1142.15303", 

57} 

58 

59 

60def compile_history_events(event_messages: "list[MatchingOperationMessage]"): 

61 children = [] 

62 for msg in event_messages: 

63 resource = model_helpers.get_resource(msg["pid"]) 

64 if not resource: 

65 raise ValueError("Resource not found") 

66 data: "HistoryChildDict" = { 

67 "resource": resource, 

68 "type": "zbl-id", 

69 "status": msg.get("status"), 

70 } 

71 

72 if "zbl_id" in msg and msg["zbl_id"] != "": 

73 data["url"] = f"https://zbmath.org/{msg['zbl_id']}" 

74 

75 if "score" in msg: 

76 data["score"] = msg["score"] 

77 

78 if "message" in msg: 

79 data["message"] = msg["message"] 

80 

81 if "status_message" in msg: 

82 data["status_message"] = msg["status_message"] 

83 

84 children.append(data) 

85 return children 

86 

87 

88class AugmentZblArticles(baseCmd[None]): 

89 """ 

90 Matches all articles zith ZBL in a source/collection 

91 

92 Adds ZBLIDs 

93 """ 

94 

95 # class settings 

96 required_params = ["queryset"] 

97 # Input params 

98 collection_pid: str | None 

99 source_id: str 

100 # Init properties 

101 event_messages: "list[MatchingOperationMessage]" 

102 collection: "Collection | None" = None 

103 queryset: "ArticleQuerySet" 

104 

105 collection_pid = None 

106 update_pdf_link = False 

107 issue_pids: list[str] = [] 

108 

109 callback: "Callable" = lambda _: _ 

110 

111 def __init__(self, params: "MatchZblArticlesParams"): 

112 super().__init__(params) 

113 self.event_messages = [] 

114 self.queryset = Article.objects.all() 

115 if self.collection_pid: 

116 self.queryset = self.queryset.filter( 

117 my_container__my_collection__pid=self.collection_pid 

118 ) 

119 self.collection = Collection.objects.get(pid=self.collection_pid) 

120 if self.source_id: 

121 source = Source.objects.get(domain=self.source_id) 

122 self.queryset = self.queryset.filter(my_container__origin__source=source) 

123 if self.issue_pids: 

124 self.queryset = self.queryset.filter(my_container__pid__in=self.issue_pids) 

125 

126 def internal_do(self) -> None: 

127 promises: "set[Future]" = set() 

128 with ThreadPoolExecutor(max_workers=1) as executor: 

129 for article in self.queryset.iterator(chunk_size=2000): 

130 xarticle = self.find_zbl_article(article) 

131 if not xarticle: 

132 self.callback() 

133 continue 

134 

135 # Handle asyncronous results and exceptions as soon as possible 

136 completed: "set[Future]" = set() 

137 for f in promises: 

138 if f.done(): 

139 completed.add(f) 

140 exception = f.exception() 

141 if exception: 

142 executor.shutdown(wait=False, cancel_futures=True) 

143 raise exception 

144 

145 promises = promises - completed 

146 # Runs the database insertion in parallel 

147 promise = executor.submit( 

148 update_article_with_xarticle, 

149 article, 

150 xarticle, 

151 merge_titles=False, 

152 update_pdf_link=self.update_pdf_link, 

153 ) 

154 promise.add_done_callback(self.callback) 

155 promises.add(promise) 

156 

157 def find_zbl_article(self, article: "Article"): 

158 """Finds zbl article using extids, fallback using marching if no relevant extid is found""" 

159 

160 # already present 

161 zbl_item_id = get_extid(article, "zbl-item-id") 

162 if zbl_item_id: 

163 xarticle = zbmath_request_article(zbl_item_id.id_value) 

164 if xarticle: 

165 _logger.debug(f"Article {article.pid} already has a zblid") 

166 self.event_messages.append( 

167 { 

168 "pid": article.pid, 

169 "zbl_id": zbl_item_id.id_value, 

170 "status": HistoryEventStatus.OK, 

171 "status_message": MatchingEventResult.ALREADY_PRESENT, 

172 } 

173 ) 

174 return xarticle 

175 

176 # hardcoded 

177 if article.pid in hardcoded_overrides: 

178 zbl_id = hardcoded_overrides[article.pid] 

179 add_or_update_extid(article, "zbl-item-id", zbl_id, False, False) 

180 xarticle = zbmath_request_article(zbl_id) 

181 if xarticle: 

182 _logger.debug(f"Adding hardcoded zblid to {article.pid}") 

183 self.event_messages.append( 

184 { 

185 "pid": xarticle.pid or article.pid, 

186 "zbl_id": zbl_id, 

187 "status": HistoryEventStatus.OK, 

188 "status_message": MatchingEventResult.ADDED, 

189 "message": "hardcoded", 

190 } 

191 ) 

192 return xarticle 

193 

194 # use extids 

195 if article.doi: 

196 xarticle = zbmath_request_article_by_doi(article.doi) 

197 if xarticle: 

198 _logger.debug(f"Found matching doi in zbmath for article {article.pid}") 

199 zbl_id: str = next(i[1] for i in xarticle.extids if i[0] == "zbl-item-id") 

200 self.event_messages.append( 

201 { 

202 "pid": article.pid, 

203 "zbl_id": zbl_id, 

204 "status": HistoryEventStatus.OK, 

205 "status_message": MatchingEventResult.ADDED, 

206 "message": "doi_found", 

207 } 

208 ) 

209 return xarticle 

210 

211 # fallback to matching 

212 item: dict | None = match_zbl_article(article) 

213 if not item: 

214 self.event_messages.append( 

215 { 

216 "pid": article.pid, 

217 "status": HistoryEventStatus.WARNING, 

218 "status_message": MatchingEventResult.NOT_FOUND, 

219 } 

220 ) 

221 return 

222 

223 if item["score"] < 8: 

224 _logger.info(f"Got score {item['score']} for resource {article.pid}") 

225 self.event_messages.append( 

226 { 

227 "pid": article.pid, 

228 "zbl_id": item["zbl_id"], 

229 "status": HistoryEventStatus.WARNING, 

230 "status_message": MatchingEventResult.LOW_SCORE, 

231 "score": item["score"], 

232 "message": "matching", 

233 } 

234 ) 

235 return 

236 

237 add_or_update_extid(article, "zbl-item-id", item["zbl_id"], False, False) 

238 xarticle = zbmath_request_article(item["zbl_id"]) 

239 if xarticle: 

240 _logger.debug(f"Got score {item['score']} for resource {article.pid}. Adding zbl_id") 

241 self.event_messages.append( 

242 { 

243 "pid": article.pid, 

244 "zbl_id": item["zbl_id"], 

245 "status": HistoryEventStatus.OK, 

246 "status_message": MatchingEventResult.ADDED, 

247 "score": item["score"], 

248 "message": "matching", 

249 } 

250 ) 

251 return xarticle 

252 

253 def insert_history_event(self): 

254 insert_history_event( 

255 { 

256 "pid": f"{self.source_id}_{self.collection_pid or ''}_matching", 

257 "col": self.collection, 

258 "source": self.source_id, 

259 "status": HistoryEventStatus.OK, 

260 "type": "matching", 

261 "children": compile_history_events(self.event_messages), 

262 } 

263 )