Coverage for src / crawler / abstract_crawlers / threaded_crawler.py: 56%

28 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-04-30 12:41 +0000

1from abc import ABC 

2from concurrent.futures import ( 

3 Executor, 

4 ThreadPoolExecutor, 

5) 

6from typing import TYPE_CHECKING 

7 

8from celery import current_app 

9from ptf.model_data import ( 

10 IssueData, 

11 TitleDict, 

12) 

13 

14from crawler.abstract_crawlers.base_crawler import BaseCollectionCrawler 

15 

16if TYPE_CHECKING: 

17 from concurrent.futures import Future 

18 

19 

20class CrawlerTitleDict(TitleDict): 

21 title_tex: str | None 

22 

23 

24class ThreadedCrawler(BaseCollectionCrawler, ABC): 

25 """ 

26 Main thread Database Thread Matching Thread 

27 ----------------------------------------------------------------------------------------------------------------------------------------- 

28 BaseCollectionCrawler.crawl_issue 

29 | 

30 v 

31 ThreadedCrawler.add_xissue_into_database --> BaseCollectionCrawler.add_xissue_into_database 

32 | 

33 v 

34 ThreadedCrawler._handle_future_exceptions 

35 MatchingCrawler._issue_added_callback --> MatchingCrawler.match_zbmath_issue 

36 

37 Wait database threads 

38 Insert crawl history event 

39 

40 Wait matching threads 

41 Insert matching history event 

42 

43 """ 

44 

45 database_executor: Executor 

46 database_futures: "list[Future]" 

47 exception: BaseException | None = None 

48 

49 def __init__(self, *args, **kwargs): 

50 super().__init__(*args, **kwargs) 

51 self.database_executor = ThreadPoolExecutor( 

52 max_workers=1, thread_name_prefix="crawler_database_thread" 

53 ) 

54 self.database_futures = [] 

55 

56 def add_xissue_into_database(self, xissue): 

57 # Runs in main thread 

58 if self.exception: 

59 raise self.exception 

60 # Do not parallelize if inside a celery worker 

61 if current_app.current_worker_task: 

62 super().add_xissue_into_database(xissue) 

63 return xissue 

64 future = self.database_executor.submit(super().add_xissue_into_database, xissue) 

65 future.add_done_callback(self._issue_added_callback) 

66 self.database_futures.append(future) 

67 return xissue 

68 

69 def _issue_added_callback(self, future: "Future[IssueData]"): 

70 # Runs in database thread 

71 pass