Coverage for src / crawler / matching_crawler.py: 48%

25 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-02-02 15:55 +0000

1from concurrent.futures import ( 

2 Executor, 

3 ThreadPoolExecutor, 

4) 

5from typing import TYPE_CHECKING 

6 

7from crawler.by_source.slc_crawler import IssueData 

8from crawler.cmds.augment.zbmath import AugmentZblArticles 

9from crawler.threaded_crawler import ThreadedCrawler 

10 

11if TYPE_CHECKING: 

12 from concurrent.futures import Future 

13 

14 from crawler.cmds.augment.zbmath import MatchingOperationMessage 

15 

16 

17class MatchingCrawler(ThreadedCrawler): 

18 zbmath_executor: Executor 

19 zbmath_futures: "list[Future]" 

20 exception: BaseException | None = None 

21 

22 matching_event_messages: "list[MatchingOperationMessage]" 

23 

24 def __init__( 

25 self, 

26 *args, 

27 **kwargs, 

28 ): 

29 super().__init__(*args, **kwargs) 

30 self.zbmath_executor = ThreadPoolExecutor( 

31 max_workers=1, thread_name_prefix="crawler_zbmath_thread" 

32 ) 

33 self.zbmath_futures = [] 

34 self.matching_event_messages = [] 

35 

36 def _issue_added_callback(self, future: "Future[IssueData]"): 

37 # Runs in database thread 

38 container = future.result() 

39 if not container.pid: 

40 raise ValueError("Issue doesn't have any pid") 

41 matching_future = self.zbmath_executor.submit(self.match_zbmath_issue, container.pid) 

42 self.zbmath_futures.append(matching_future) 

43 

44 def match_zbmath_issue(self, issue_pid: str): 

45 # Runs in matching thread 

46 cmd = AugmentZblArticles( 

47 { 

48 "collection_pid": self.collection_id, 

49 "source_id": self.source_domain, 

50 "issue_pids": [issue_pid], 

51 } 

52 ) 

53 cmd.do() 

54 self.matching_event_messages.extend(cmd.event_messages)