Coverage for src / crawler / threaded_crawler.py: 56%
25 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-02-02 15:55 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-02-02 15:55 +0000
1from abc import ABC, abstractmethod
2from concurrent.futures import (
3 Executor,
4 ThreadPoolExecutor,
5)
6from typing import TYPE_CHECKING
8from ptf.model_data import (
9 IssueData,
10 TitleDict,
11)
13from crawler.base_crawler import BaseCollectionCrawler
15if TYPE_CHECKING:
16 from concurrent.futures import Future
19class CrawlerTitleDict(TitleDict):
20 title_tex: str | None
23class ThreadedCrawler(BaseCollectionCrawler, ABC):
24 """
25 Main thread Database Thread Matching Thread
26 -----------------------------------------------------------------------------------------------------------------------------------------
27 BaseCollectionCrawler.crawl_issue
28 |
29 v
30 ThreadedCrawler.add_xissue_into_database --> BaseCollectionCrawler.add_xissue_into_database
31 |
32 v
33 ThreadedCrawler._handle_future_exceptions
34 MatchingCrawler._issue_added_callback --> MatchingCrawler.match_zbmath_issue
36 Wait database threads
37 Insert crawl history event
39 Wait matching threads
40 Insert matching history event
42 """
44 database_executor: Executor
45 database_futures: "list[Future]"
46 exception: BaseException | None = None
48 def __init__(self, *args, **kwargs):
49 super().__init__(*args, **kwargs)
50 self.database_executor = ThreadPoolExecutor(
51 max_workers=1, thread_name_prefix="crawler_database_thread"
52 )
53 self.database_futures = []
55 def add_xissue_into_database(self, xissue):
56 # Runs in main thread
57 if self.exception:
58 raise self.exception
59 future = self.database_executor.submit(super().add_xissue_into_database, xissue)
60 future.add_done_callback(self._issue_added_callback)
61 self.database_futures.append(future)
62 return xissue
64 @abstractmethod
65 def _issue_added_callback(self, future: "Future[IssueData]"):
66 # Runs in database thread
67 pass