Coverage for src / crawler / cmds / augment / augment_cmd.py: 20%
108 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-05-21 12:58 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-05-21 12:58 +0000
1import logging
2from concurrent.futures import Future, ThreadPoolExecutor
3from typing import TYPE_CHECKING, NotRequired, TypedDict
5from history.model_data import HistoryChildDict, MatchingEventResult
6from history.models import HistoryEventStatus
7from history.utils import insert_history_event
8from matching_back.backends import get_backends
9from matching_back.backends.matching_backend import MatchingBackend
10from ptf import model_helpers
11from ptf.cmds.base_cmds import baseCmd
12from ptf.models import Article, Collection
14from crawler.cmds.augment import update_article_with_xarticle
15from crawler.models.source import Source
17if TYPE_CHECKING:
18 from collections.abc import Callable
20 from ptf.models import ArticleQuerySet
22 class AugmentArticlesParams(TypedDict):
23 backend: MatchingBackend
24 source_id: str
25 collection_pid: NotRequired[str]
26 issue_pids: NotRequired[list[str]]
27 update_pdf_link: NotRequired[bool]
29 from typing import NotRequired, TypedDict
31 class MatchingOperationMessage(TypedDict):
32 pid: str
33 zbl_id: "NotRequired[str]"
34 score: "NotRequired[int]"
35 status: HistoryEventStatus
36 message: "NotRequired[str]"
37 status_message: "NotRequired[str]"
40_logger = logging.getLogger(__name__)
43def compile_history_events(event_messages: "list[MatchingOperationMessage]"):
44 children = []
45 for msg in event_messages:
46 resource = model_helpers.get_resource(msg["pid"])
47 if not resource:
48 raise ValueError("Resource not found")
49 data: "HistoryChildDict" = {
50 "resource": resource,
51 "type": "zbl-id",
52 "status": msg.get("status"),
53 }
55 if "zbl_id" in msg and msg["zbl_id"] != "":
56 data["url"] = f"https://zbmath.org/{msg['zbl_id']}"
58 if "score" in msg:
59 data["score"] = msg["score"]
61 if "message" in msg:
62 data["message"] = msg["message"]
64 if "status_message" in msg:
65 data["status_message"] = msg["status_message"]
67 children.append(data)
68 return children
71class AugmentArticlesCmd(baseCmd[None]):
72 """
73 Metadata augment command.
74 Calls the desired Matching Backend defined in ptf-back.
75 """
77 required_params = ["backend", "source_id"]
79 backend: str
80 source_id: str
81 collection_pid: str | None = None
82 issue_pids: list[str] = []
83 update_pdf_link: bool = False
84 collection: "Collection | None" = None
85 queryset: "ArticleQuerySet"
86 event_messages: list[dict]
87 callback: "Callable" = lambda _: _
89 def __init__(self, params: "AugmentArticlesParams"):
90 super().__init__(params)
91 self.backends = get_backends(self.backend)
92 self.event_messages = []
93 self.queryset = Article.objects.all()
95 if self.collection_pid:
96 self.queryset = self.queryset.filter(
97 my_container__my_collection__pid=self.collection_pid
98 )
99 self.collection = Collection.objects.get(pid=self.collection_pid)
101 if self.source_id:
102 source = Source.objects.get(domain=self.source_id)
103 self.queryset = self.queryset.filter(my_container__origin__source=source)
105 if self.issue_pids:
106 self.queryset = self.queryset.filter(my_container__pid__in=self.issue_pids)
108 def internal_do(self) -> None:
109 for self.backend in self.backends:
110 _logger.info(
111 f"Start augment [{self.backend.name}] "
112 f"source={self.source_id} collection={self.collection_pid} "
113 f"({self.queryset.count()} articles)"
114 )
115 promises: "set[Future]" = set()
116 with ThreadPoolExecutor(max_workers=1) as executor:
117 for article in self.queryset.iterator(chunk_size=2000):
118 xarticle, event = self._find_article(article)
120 if event:
121 self.event_messages.append(event)
123 if not xarticle:
124 self.callback()
125 continue
127 # Handle asyncronous results and exceptions as soon as possible
128 completed: "set[Future]" = set()
129 for f in promises:
130 if f.done():
131 completed.add(f)
132 exc = f.exception()
133 if exc:
134 executor.shutdown(wait=False, cancel_futures=True)
135 raise exc
136 promises -= completed
137 # Runs the database insertion in parallel
138 promise = executor.submit(
139 update_article_with_xarticle,
140 article,
141 xarticle,
142 merge_titles=False,
143 update_pdf_link=self.update_pdf_link,
144 )
145 promise.add_done_callback(self.callback)
146 promises.add(promise)
147 _logger.info(f"Augment [{self.backend.name}] over")
149 def _find_article(self, article: Article) -> tuple:
150 """
151 Calls ArticleData from backend.
152 """
153 # With external ID
154 xarticle = self.backend.find_by_extids(article=article)
155 if xarticle:
156 _logger.debug(f"[{self.backend.name}] {article.pid}: found via External ID")
157 return xarticle, self._make_event(article, MatchingEventResult.ALREADY_PRESENT)
159 # Matching fuzzy
160 xarticle = self.backend.find_by_matching(article)
161 if xarticle:
162 _logger.debug(f"[{self.backend.name}] {article.pid}: found via Matching")
163 return xarticle, self._make_event(article, MatchingEventResult.ADDED)
165 _logger.info(
166 f"[{self.backend.name}] {article.pid}: not found with External ID nor Matching"
167 )
168 return None, self._make_event(
169 article, MatchingEventResult.NOT_FOUND, status=HistoryEventStatus.WARNING
170 )
172 def _make_event(
173 self,
174 article: Article,
175 status_message: str,
176 status: HistoryEventStatus = HistoryEventStatus.OK,
177 ) -> dict:
178 return {
179 "pid": article.pid,
180 "status": status,
181 "status_message": status_message,
182 "backend": self.backend.name,
183 }
185 def insert_history_event(self):
186 children = []
187 for msg in self.event_messages:
188 resource = model_helpers.get_resource(msg["pid"])
189 if not resource:
190 raise ValueError(f"Resource not found: {msg['pid']}")
191 data: "HistoryChildDict" = {
192 "resource": resource,
193 "type": msg.get("backend", self.backend.name),
194 "status": msg.get("status"),
195 }
196 if "score" in msg:
197 data["score"] = msg["score"]
198 if "message" in msg:
199 data["message"] = msg["message"]
200 if "status_message" in msg:
201 data["status_message"] = msg["status_message"]
202 children.append(data)
204 insert_history_event(
205 {
206 "pid": f"{self.source_id}_{self.collection_pid or ''}_{self.backend.name}_matching",
207 "col": self.collection,
208 "source": self.source_id,
209 "status": HistoryEventStatus.OK,
210 "type": "matching",
211 "children": children,
212 }
213 )