Coverage for src / crawler / cmds / augment / zbmath.py: 19%
114 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-02-02 15:55 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-02-02 15:55 +0000
1import logging
2from concurrent.futures import Future, ThreadPoolExecutor
3from typing import TYPE_CHECKING
5from history.model_data import HistoryChildDict, MatchingEventResult
6from history.models import HistoryEventStatus
7from history.utils import insert_history_event
8from ptf import model_helpers
9from ptf.cmds.base_cmds import baseCmd
10from ptf.model_helpers import add_or_update_extid, get_extid
11from ptf.models import Article, Collection
13from crawler.cmds.augment import update_article_with_xarticle
14from crawler.models.source import Source
15from crawler.zbmath import match_zbl_article, zbmath_request_article, zbmath_request_article_by_doi
17_logger = logging.getLogger(__name__)
19if TYPE_CHECKING:
20 from collections.abc import Callable
21 from typing import NotRequired, TypedDict
23 from ptf.models import ArticleQuerySet
25 class MatchZblArticlesParams(TypedDict):
26 source_id: str
27 "Source 'domain' (unique identifier) for filtering articles"
28 collection_pid: NotRequired[str]
29 "Collection (issue/volume) identifier for filtering articles"
30 update_pdf_link: NotRequired[bool]
31 "Optional, False by default. Controls whenever our pdf link is replaced with zbmath's one"
32 issue_pids: NotRequired[list[str]]
33 "Optional. List of issue PIDS to filter the matching with"
35 class MatchingOperationMessage(TypedDict):
36 pid: str
37 zbl_id: "NotRequired[str]"
38 score: "NotRequired[int]"
39 status: HistoryEventStatus
40 message: "NotRequired[str]"
41 status_message: "NotRequired[str]"
44hardcoded_overrides = {
45 "AM_1999_150_1_a3": "0995.14018",
46 "AM_1999_149_2_a6": "0937.52004",
47 "AM_1999_149_3_a8": "0942.58016",
48 "AM_2000_151_2_a1": "1078.12500",
49 "AM_2000_151_2_a5": "1037.11049",
50 "AM_2000_151_2_a6": "1037.11050",
51 "AM_2000_151_2_a9": "0956.22012",
52 "AM_2000_152_1_a1": "0989.11023",
53 "AM_2000_152_2_a3": "1042.14018",
54 "AM_1999_150_3_a12": "900398017",
55 "ELA_2001__8__a10": "0989.34004",
56 "ELA_2006__15__a10": "1142.15303",
57}
60def compile_history_events(event_messages: "list[MatchingOperationMessage]"):
61 children = []
62 for msg in event_messages:
63 resource = model_helpers.get_resource(msg["pid"])
64 if not resource:
65 raise ValueError("Resource not found")
66 data: "HistoryChildDict" = {
67 "resource": resource,
68 "type": "zbl-id",
69 "status": msg.get("status"),
70 }
72 if "zbl_id" in msg and msg["zbl_id"] != "":
73 data["url"] = f"https://zbmath.org/{msg['zbl_id']}"
75 if "score" in msg:
76 data["score"] = msg["score"]
78 if "message" in msg:
79 data["message"] = msg["message"]
81 if "status_message" in msg:
82 data["status_message"] = msg["status_message"]
84 children.append(data)
85 return children
88class AugmentZblArticles(baseCmd[None]):
89 """
90 Matches all articles zith ZBL in a source/collection
92 Adds ZBLIDs
93 """
95 # class settings
96 required_params = ["queryset"]
97 # Input params
98 collection_pid: str | None
99 source_id: str
100 # Init properties
101 event_messages: "list[MatchingOperationMessage]"
102 collection: "Collection | None" = None
103 queryset: "ArticleQuerySet"
105 collection_pid = None
106 update_pdf_link = False
107 issue_pids: list[str] = []
109 callback: "Callable" = lambda _: _
111 def __init__(self, params: "MatchZblArticlesParams"):
112 super().__init__(params)
113 self.event_messages = []
114 self.queryset = Article.objects.all()
115 if self.collection_pid:
116 self.queryset = self.queryset.filter(
117 my_container__my_collection__pid=self.collection_pid
118 )
119 self.collection = Collection.objects.get(pid=self.collection_pid)
120 if self.source_id:
121 source = Source.objects.get(domain=self.source_id)
122 self.queryset = self.queryset.filter(my_container__origin__source=source)
123 if self.issue_pids:
124 self.queryset = self.queryset.filter(my_container__pid__in=self.issue_pids)
126 def internal_do(self) -> None:
127 promises: "set[Future]" = set()
128 with ThreadPoolExecutor(max_workers=1) as executor:
129 for article in self.queryset.iterator(chunk_size=2000):
130 xarticle = self.find_zbl_article(article)
131 if not xarticle:
132 self.callback()
133 continue
135 # Handle asyncronous results and exceptions as soon as possible
136 completed: "set[Future]" = set()
137 for f in promises:
138 if f.done():
139 completed.add(f)
140 exception = f.exception()
141 if exception:
142 executor.shutdown(wait=False, cancel_futures=True)
143 raise exception
145 promises = promises - completed
146 # Runs the database insertion in parallel
147 promise = executor.submit(
148 update_article_with_xarticle,
149 article,
150 xarticle,
151 merge_titles=False,
152 update_pdf_link=self.update_pdf_link,
153 )
154 promise.add_done_callback(self.callback)
155 promises.add(promise)
157 def find_zbl_article(self, article: "Article"):
158 """Finds zbl article using extids, fallback using marching if no relevant extid is found"""
160 # already present
161 zbl_item_id = get_extid(article, "zbl-item-id")
162 if zbl_item_id:
163 xarticle = zbmath_request_article(zbl_item_id.id_value)
164 if xarticle:
165 _logger.debug(f"Article {article.pid} already has a zblid")
166 self.event_messages.append(
167 {
168 "pid": article.pid,
169 "zbl_id": zbl_item_id.id_value,
170 "status": HistoryEventStatus.OK,
171 "status_message": MatchingEventResult.ALREADY_PRESENT,
172 }
173 )
174 return xarticle
176 # hardcoded
177 if article.pid in hardcoded_overrides:
178 zbl_id = hardcoded_overrides[article.pid]
179 add_or_update_extid(article, "zbl-item-id", zbl_id, False, False)
180 xarticle = zbmath_request_article(zbl_id)
181 if xarticle:
182 _logger.debug(f"Adding hardcoded zblid to {article.pid}")
183 self.event_messages.append(
184 {
185 "pid": xarticle.pid or article.pid,
186 "zbl_id": zbl_id,
187 "status": HistoryEventStatus.OK,
188 "status_message": MatchingEventResult.ADDED,
189 "message": "hardcoded",
190 }
191 )
192 return xarticle
194 # use extids
195 if article.doi:
196 xarticle = zbmath_request_article_by_doi(article.doi)
197 if xarticle:
198 _logger.debug(f"Found matching doi in zbmath for article {article.pid}")
199 zbl_id: str = next(i[1] for i in xarticle.extids if i[0] == "zbl-item-id")
200 self.event_messages.append(
201 {
202 "pid": article.pid,
203 "zbl_id": zbl_id,
204 "status": HistoryEventStatus.OK,
205 "status_message": MatchingEventResult.ADDED,
206 "message": "doi_found",
207 }
208 )
209 return xarticle
211 # fallback to matching
212 item: dict | None = match_zbl_article(article)
213 if not item:
214 self.event_messages.append(
215 {
216 "pid": article.pid,
217 "status": HistoryEventStatus.WARNING,
218 "status_message": MatchingEventResult.NOT_FOUND,
219 }
220 )
221 return
223 if item["score"] < 8:
224 _logger.info(f"Got score {item['score']} for resource {article.pid}")
225 self.event_messages.append(
226 {
227 "pid": article.pid,
228 "zbl_id": item["zbl_id"],
229 "status": HistoryEventStatus.WARNING,
230 "status_message": MatchingEventResult.LOW_SCORE,
231 "score": item["score"],
232 "message": "matching",
233 }
234 )
235 return
237 add_or_update_extid(article, "zbl-item-id", item["zbl_id"], False, False)
238 xarticle = zbmath_request_article(item["zbl_id"])
239 if xarticle:
240 _logger.debug(f"Got score {item['score']} for resource {article.pid}. Adding zbl_id")
241 self.event_messages.append(
242 {
243 "pid": article.pid,
244 "zbl_id": item["zbl_id"],
245 "status": HistoryEventStatus.OK,
246 "status_message": MatchingEventResult.ADDED,
247 "score": item["score"],
248 "message": "matching",
249 }
250 )
251 return xarticle
253 def insert_history_event(self):
254 insert_history_event(
255 {
256 "pid": f"{self.source_id}_{self.collection_pid or ''}_matching",
257 "col": self.collection,
258 "source": self.source_id,
259 "status": HistoryEventStatus.OK,
260 "type": "matching",
261 "children": compile_history_events(self.event_messages),
262 }
263 )