Coverage for src / crawler / abstract_crawlers / threaded_crawler.py: 56%
28 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-04-30 12:41 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-04-30 12:41 +0000
1from abc import ABC
2from concurrent.futures import (
3 Executor,
4 ThreadPoolExecutor,
5)
6from typing import TYPE_CHECKING
8from celery import current_app
9from ptf.model_data import (
10 IssueData,
11 TitleDict,
12)
14from crawler.abstract_crawlers.base_crawler import BaseCollectionCrawler
16if TYPE_CHECKING:
17 from concurrent.futures import Future
20class CrawlerTitleDict(TitleDict):
21 title_tex: str | None
24class ThreadedCrawler(BaseCollectionCrawler, ABC):
25 """
26 Main thread Database Thread Matching Thread
27 -----------------------------------------------------------------------------------------------------------------------------------------
28 BaseCollectionCrawler.crawl_issue
29 |
30 v
31 ThreadedCrawler.add_xissue_into_database --> BaseCollectionCrawler.add_xissue_into_database
32 |
33 v
34 ThreadedCrawler._handle_future_exceptions
35 MatchingCrawler._issue_added_callback --> MatchingCrawler.match_zbmath_issue
37 Wait database threads
38 Insert crawl history event
40 Wait matching threads
41 Insert matching history event
43 """
45 database_executor: Executor
46 database_futures: "list[Future]"
47 exception: BaseException | None = None
49 def __init__(self, *args, **kwargs):
50 super().__init__(*args, **kwargs)
51 self.database_executor = ThreadPoolExecutor(
52 max_workers=1, thread_name_prefix="crawler_database_thread"
53 )
54 self.database_futures = []
56 def add_xissue_into_database(self, xissue):
57 # Runs in main thread
58 if self.exception:
59 raise self.exception
60 # Do not parallelize if inside a celery worker
61 if current_app.current_worker_task:
62 super().add_xissue_into_database(xissue)
63 return xissue
64 future = self.database_executor.submit(super().add_xissue_into_database, xissue)
65 future.add_done_callback(self._issue_added_callback)
66 self.database_futures.append(future)
67 return xissue
69 def _issue_added_callback(self, future: "Future[IssueData]"):
70 # Runs in database thread
71 pass