Coverage for src/crawler/tasks.py: 0%

66 statements  

« prev     ^ index     » next       coverage.py v7.9.0, created at 2025-11-21 14:41 +0000

1import logging 

2import traceback 

3from typing import TYPE_CHECKING 

4 

5from celery import shared_task, states 

6from django.contrib.auth.models import User 

7from history.model_data import HistoryEventStatus 

8from history.utils import insert_history_event 

9from opentelemetry import trace 

10from ptf.models import Container 

11from task.tasks import increment_progress 

12 

13from crawler.factory import crawler_factory 

14from crawler.utils import get_all_cols 

15 

16if TYPE_CHECKING: 

17 from celery import Task 

18 from history.model_data import HistoryEventDict 

19 from ptf.model_data import IssueData 

20 

21tracer = trace.get_tracer(__name__) 

22logger = logging.getLogger(__name__) 

23 

24 

25@shared_task(name="crawler.tasks.crawl_source", bind=True, queue="coordinator") 

26def crawl_source(source_domain, username): 

27 raise NotImplementedError() 

28 # cols = get_cols_by_source(source_domain) 

29 # pids = [col["pid"] for col in cols] 

30 

31 # for pid in pids: 

32 # run_task( 

33 # CrawlCollectionTask, 

34 # pid, 

35 # source_domain, 

36 # username, 

37 # subcall=True, 

38 # name=f"{pid}-{source_domain}", 

39 # ) 

40 # run_task(EndCrawlSourceTask, source_domain) 

41 

42 

43@shared_task(name="crawler.tasks.crawl_collection", bind=True, queue="coordinator") 

44@tracer.start_as_current_span("CrawlCollectionTask.do") 

45def crawl_collection( 

46 self: "Task", 

47 colid: str, 

48 source_domain: str, 

49 username: str, 

50 only_new=False, 

51 period: tuple[int, int] = (0, 9999), 

52 number: tuple[int, int] = (0, 99999), 

53): 

54 event_dict: "HistoryEventDict" = { 

55 "type": "import-source", 

56 "pid": f"{source_domain}", 

57 "col": None, 

58 "source": source_domain, 

59 "status": HistoryEventStatus.PENDING, 

60 } 

61 try: 

62 user = User.objects.get(username=username) 

63 event_dict["userid"] = user.pk 

64 all_cols = get_all_cols() 

65 col_data = all_cols[colid] 

66 url = col_data["sources"][source_domain] 

67 

68 self.update_state( 

69 meta={"progress": 0, "col": colid}, 

70 state=states.STARTED, 

71 ) 

72 issue_list = crawl_issue_list(source_domain, colid, url, username) 

73 if not issue_list: 

74 raise ValueError("No issue found") 

75 issue_list = filter_issues(colid, issue_list, period, number, only_new) 

76 self.update_state( 

77 meta={"current": 0, "total": len(issue_list), "progress": 0, "col": colid}, 

78 state=states.STARTED, 

79 ) 

80 for issue in issue_list.values(): 

81 crawl_issue(issue, source_domain, colid, url, username) 

82 increment_progress(self, self.request.id) 

83 except Exception: 

84 event_dict["status"] = HistoryEventStatus.ERROR 

85 event_dict["message"] = traceback.format_exc() 

86 logger.error(event_dict["message"]) 

87 raise 

88 finally: 

89 insert_history_event(event_dict) 

90 

91 

92@shared_task(name="crawler.tasks.crawl_collection", queue="executor") 

93def crawl_issue_list(source_name: str, colid: str, url: str, username: str): 

94 crawler = crawler_factory(source_name, colid, username) 

95 return crawler.crawl_collection() 

96 

97 

98@shared_task(name="crawler.tasks.crawl_issue", queue="executor") 

99def crawl_issue(issue: "IssueData", source_name: str, colid: str, url: str, username: str): 

100 crawler = crawler_factory(source_name, colid, username) 

101 crawler.crawl_issue(issue) 

102 

103 

104def filter_issues( 

105 colid: str, 

106 issues: "dict[str, IssueData]", 

107 period: tuple[int, int] = (0, 9999), 

108 number: tuple[int, int] = (0, 99999), 

109 only_new=False, 

110): 

111 def is_year_in_range(year): 

112 return period[0] <= int(year) <= period[1] 

113 

114 def is_number_in_range(n): 

115 # number can be "1", "1-2" 

116 if "-" in n or "–" in n: 

117 n = n.split("-" if "-" in n else "–") 

118 return number[0] <= int(n[0]) <= int(n[1]) <= number[1] 

119 

120 all_containers = Container.objects.filter(my_collection__pid=colid).all() 

121 issues = { 

122 pid: issue 

123 for pid, issue in issues.items() 

124 if is_year_in_range(issue.year) and is_number_in_range(issue.number) 

125 } 

126 # TODO : only crawl newer issues ? 

127 if only_new: 

128 issues = { 

129 pid: issue 

130 for pid, issue in issues.items() 

131 if not any(container.pid == pid for container in all_containers) 

132 } 

133 

134 return issues