Coverage for src / crawler / threaded_crawler.py: 56%

25 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-02-02 15:55 +0000

1from abc import ABC, abstractmethod 

2from concurrent.futures import ( 

3 Executor, 

4 ThreadPoolExecutor, 

5) 

6from typing import TYPE_CHECKING 

7 

8from ptf.model_data import ( 

9 IssueData, 

10 TitleDict, 

11) 

12 

13from crawler.base_crawler import BaseCollectionCrawler 

14 

15if TYPE_CHECKING: 

16 from concurrent.futures import Future 

17 

18 

19class CrawlerTitleDict(TitleDict): 

20 title_tex: str | None 

21 

22 

23class ThreadedCrawler(BaseCollectionCrawler, ABC): 

24 """ 

25 Main thread Database Thread Matching Thread 

26 ----------------------------------------------------------------------------------------------------------------------------------------- 

27 BaseCollectionCrawler.crawl_issue 

28 | 

29 v 

30 ThreadedCrawler.add_xissue_into_database --> BaseCollectionCrawler.add_xissue_into_database 

31 | 

32 v 

33 ThreadedCrawler._handle_future_exceptions 

34 MatchingCrawler._issue_added_callback --> MatchingCrawler.match_zbmath_issue 

35 

36 Wait database threads 

37 Insert crawl history event 

38 

39 Wait matching threads 

40 Insert matching history event 

41 

42 """ 

43 

44 database_executor: Executor 

45 database_futures: "list[Future]" 

46 exception: BaseException | None = None 

47 

48 def __init__(self, *args, **kwargs): 

49 super().__init__(*args, **kwargs) 

50 self.database_executor = ThreadPoolExecutor( 

51 max_workers=1, thread_name_prefix="crawler_database_thread" 

52 ) 

53 self.database_futures = [] 

54 

55 def add_xissue_into_database(self, xissue): 

56 # Runs in main thread 

57 if self.exception: 

58 raise self.exception 

59 future = self.database_executor.submit(super().add_xissue_into_database, xissue) 

60 future.add_done_callback(self._issue_added_callback) 

61 self.database_futures.append(future) 

62 return xissue 

63 

64 @abstractmethod 

65 def _issue_added_callback(self, future: "Future[IssueData]"): 

66 # Runs in database thread 

67 pass