Coverage for src / crawler / tasks.py: 0%

68 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-03-19 14:59 +0000

1import logging 

2import traceback 

3from typing import TYPE_CHECKING 

4 

5from celery import shared_task, states 

6from django.contrib.auth.models import User 

7from history.model_data import HistoryEventStatus 

8from history.utils import insert_history_event 

9from opentelemetry import trace 

10from ptf.models import Container 

11from task.tasks import increment_progress 

12 

13from crawler.factory import crawler_factory 

14from crawler.utils import get_all_cols 

15 

16if TYPE_CHECKING: 

17 from celery import Task 

18 from history.model_data import HistoryEventDict 

19 from ptf.model_data import IssueData 

20 

21tracer = trace.get_tracer(__name__) 

22logger = logging.getLogger(__name__) 

23 

24 

25@shared_task(name="crawler.tasks.crawl_source", bind=True, queue="coordinator") 

26def crawl_source(source_domain, username): 

27 raise NotImplementedError() 

28 # cols = get_cols_by_source(source_domain) 

29 # pids = [col["pid"] for col in cols] 

30 

31 # for pid in pids: 

32 # run_task( 

33 # CrawlCollectionTask, 

34 # pid, 

35 # source_domain, 

36 # username, 

37 # subcall=True, 

38 # name=f"{pid}-{source_domain}", 

39 # ) 

40 # run_task(EndCrawlSourceTask, source_domain) 

41 

42 

43@shared_task(name="crawler.tasks.test_task", bind=True, queue="coordinator") 

44# @tracer.start_as_current_span("CrawlCollectionTask.do") 

45def test_task(self): 

46 print("TEST TASK") 

47 logger.warning("TEST TASK") 

48 

49 

50@shared_task(name="crawler.tasks.crawl_collection", bind=True, queue="coordinator") 

51@tracer.start_as_current_span("CrawlCollectionTask.do") 

52def crawl_collection( 

53 self: "Task", 

54 colid: str, 

55 source_domain: str, 

56 username: str, 

57 only_new=False, 

58 period: tuple[int, int] = (0, 9999), 

59 number: tuple[int, int] = (0, 99999), 

60): 

61 logger.info("craw_collection task") 

62 event_dict: "HistoryEventDict" = { 

63 "type": "import-source", 

64 "pid": f"{source_domain}", 

65 "col": None, 

66 "source": source_domain, 

67 "status": HistoryEventStatus.PENDING, 

68 } 

69 try: 

70 user = User.objects.get(username=username) 

71 event_dict["userid"] = user.pk 

72 all_cols = get_all_cols() 

73 col_data = all_cols[colid] 

74 url = col_data["sources"][source_domain] 

75 

76 self.update_state( 

77 task_id=colid, 

78 meta={"progress": 0, "col": colid}, 

79 state=states.STARTED, 

80 ) 

81 issue_list = crawl_issue_list(source_domain, colid, url, username) 

82 if not issue_list: 

83 raise ValueError("No issue found") 

84 issue_list = filter_issues(colid, issue_list, period, number, only_new) 

85 self.update_state( 

86 task_id=colid, 

87 meta={"current": 0, "total": len(issue_list), "progress": 0, "col": colid}, 

88 state=states.STARTED, 

89 ) 

90 for issue in issue_list.values(): 

91 crawl_issue(issue, source_domain, colid, url, username) 

92 increment_progress(self, self.request.id) 

93 except Exception: 

94 event_dict["status"] = HistoryEventStatus.ERROR 

95 event_dict["message"] = traceback.format_exc() 

96 logger.error(event_dict["message"]) 

97 raise 

98 finally: 

99 insert_history_event(event_dict) 

100 logger.info("history event inserted %s", event_dict["pid"]) 

101 

102 

103@shared_task(name="crawler.tasks.crawl_issue_list", queue="executor") 

104def crawl_issue_list(source_domain: str, colid: str, url: str, username: str): 

105 crawler = crawler_factory(source_domain, colid, username) 

106 return crawler.crawl_collection() 

107 

108 

109@shared_task(name="crawler.tasks.crawl_issue", queue="executor") 

110def crawl_issue(issue: "IssueData", source_domain: str, colid: str, url: str, username: str): 

111 crawler = crawler_factory(source_domain, colid, username) 

112 crawler.crawl_issue(issue) 

113 

114 

115def filter_issues( 

116 colid: str, 

117 issues: "dict[str, IssueData]", 

118 period: tuple[int, int] = (0, 9999), 

119 number: tuple[int, int] = (0, 99999), 

120 only_new=False, 

121): 

122 def is_year_in_range(year): 

123 return period[0] <= int(year) <= period[1] 

124 

125 def is_number_in_range(n): 

126 # number can be "1", "1-2" 

127 if "-" in n or "–" in n: 

128 n = n.split("-" if "-" in n else "–") 

129 return number[0] <= int(n[0]) <= int(n[1]) <= number[1] 

130 

131 all_containers = Container.objects.filter(my_collection__pid=colid).all() 

132 issues = { 

133 pid: issue 

134 for pid, issue in issues.items() 

135 if is_year_in_range(issue.year) and is_number_in_range(issue.number) 

136 } 

137 # TODO : only crawl newer issues ? 

138 

139 if only_new: 

140 issues = { 

141 pid: issue 

142 for pid, issue in issues.items() 

143 if not any(container.pid == pid for container in all_containers) 

144 } 

145 

146 return issues