Coverage for src / crawler / matching_crawler.py: 48%
25 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-02-02 15:55 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-02-02 15:55 +0000
1from concurrent.futures import (
2 Executor,
3 ThreadPoolExecutor,
4)
5from typing import TYPE_CHECKING
7from crawler.by_source.slc_crawler import IssueData
8from crawler.cmds.augment.zbmath import AugmentZblArticles
9from crawler.threaded_crawler import ThreadedCrawler
11if TYPE_CHECKING:
12 from concurrent.futures import Future
14 from crawler.cmds.augment.zbmath import MatchingOperationMessage
17class MatchingCrawler(ThreadedCrawler):
18 zbmath_executor: Executor
19 zbmath_futures: "list[Future]"
20 exception: BaseException | None = None
22 matching_event_messages: "list[MatchingOperationMessage]"
24 def __init__(
25 self,
26 *args,
27 **kwargs,
28 ):
29 super().__init__(*args, **kwargs)
30 self.zbmath_executor = ThreadPoolExecutor(
31 max_workers=1, thread_name_prefix="crawler_zbmath_thread"
32 )
33 self.zbmath_futures = []
34 self.matching_event_messages = []
36 def _issue_added_callback(self, future: "Future[IssueData]"):
37 # Runs in database thread
38 container = future.result()
39 if not container.pid:
40 raise ValueError("Issue doesn't have any pid")
41 matching_future = self.zbmath_executor.submit(self.match_zbmath_issue, container.pid)
42 self.zbmath_futures.append(matching_future)
44 def match_zbmath_issue(self, issue_pid: str):
45 # Runs in matching thread
46 cmd = AugmentZblArticles(
47 {
48 "collection_pid": self.collection_id,
49 "source_id": self.source_domain,
50 "issue_pids": [issue_pid],
51 }
52 )
53 cmd.do()
54 self.matching_event_messages.extend(cmd.event_messages)