Coverage for src / crawler / tasks.py: 0%

90 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-04-08 09:35 +0000

1import logging 

2import time 

3import traceback 

4from typing import TYPE_CHECKING 

5 

6from celery import shared_task, states 

7from django.contrib.auth.models import User 

8from history.model_data import HistoryEventStatus 

9from history.utils import insert_history_event 

10from opentelemetry import trace 

11from ptf.models import Collection, Container 

12from task.custom_task import TaskWithProgress 

13from task.tasks import increment_progress 

14 

15from crawler.factory import crawler_factory 

16from crawler.utils import get_all_cols 

17 

18if TYPE_CHECKING: 

19 from history.model_data import HistoryEventDict 

20 from ptf.model_data import IssueData 

21 

22tracer = trace.get_tracer(__name__) 

23logger = logging.getLogger(__name__) 

24 

25 

26@shared_task(name="crawler.tasks.crawl_source", bind=True, queue="coordinator") 

27def crawl_source(source_domain, username): 

28 raise NotImplementedError() 

29 # cols = get_cols_by_source(source_domain) 

30 # pids = [col["pid"] for col in cols] 

31 

32 # for pid in pids: 

33 # run_task( 

34 # CrawlCollectionTask, 

35 # pid, 

36 # source_domain, 

37 # username, 

38 # subcall=True, 

39 # name=f"{pid}-{source_domain}", 

40 # ) 

41 # run_task(EndCrawlSourceTask, source_domain) 

42 

43 

44@shared_task( 

45 name="crawler.tasks.crawl_collection", 

46 bind=True, 

47 queue="coordinator", 

48 base=TaskWithProgress, 

49) 

50@tracer.start_as_current_span("CrawlCollectionTask.do") 

51def crawl_collection( 

52 self: "TaskWithProgress", 

53 colid: str, 

54 source_domain: str, 

55 user_name, 

56 only_new=False, 

57 period: tuple[int, int] = (0, 9999), 

58 number: tuple[int, int] = (0, 99999), 

59): 

60 logger.debug("craw_collection task") 

61 user = User.objects.get(username=user_name) 

62 collection = Collection.objects.get(pid=colid) 

63 event_dict: "HistoryEventDict" = { 

64 "type": "import-source", 

65 "pid": f"{source_domain}", 

66 "col": collection, 

67 "source": source_domain, 

68 "status": HistoryEventStatus.PENDING, 

69 } 

70 try: 

71 event_dict["userid"] = user.pk 

72 all_cols = get_all_cols() 

73 col_data = all_cols[colid] 

74 url = col_data["sources"][source_domain] 

75 

76 issue_list = crawl_issue_list(source_domain, colid, url, user_name) 

77 if not issue_list: 

78 raise ValueError("No issue found") 

79 issue_list = filter_issues(colid, issue_list, period, number, event_dict, only_new) 

80 

81 if issue_list: 

82 self.update_state( 

83 meta={"current": 0, "total": len(issue_list), "progress": 0, "col": colid}, 

84 state=states.STARTED, 

85 ) 

86 else: 

87 event_dict["message"] = "No issue to update with the selection" 

88 for issue in issue_list.values(): 

89 logger.info("%s issues to process", len(issue_list)) 

90 logger.info("task_id! %s", self.request.id) 

91 promise = crawl_issue.delay(issue, source_domain, colid, url, user_name) 

92 increment_progress.delay(self.request.id) 

93 # wait for the craw_issue task to be finished before process the following 

94 while not promise.ready(): 

95 time.sleep(2) 

96 pass 

97 event_dict["status"] = HistoryEventStatus.OK 

98 

99 except BaseException: 

100 event_dict["status"] = HistoryEventStatus.ERROR 

101 event_dict["message"] = traceback.format_exc() 

102 logger.error(event_dict["message"]) 

103 raise 

104 finally: 

105 insert_history_event(event_dict) 

106 logger.info("history event inserted %s", event_dict["pid"]) 

107 

108 

109@shared_task(name="crawler.tasks.crawl_issue_list", queue="executor") 

110def crawl_issue_list(source_domain: str, colid: str, url: str, username: str): 

111 crawler = crawler_factory(source_domain, colid, username) 

112 return crawler.crawl_collection() 

113 

114 

115@shared_task(name="crawler.tasks.crawl_issue", queue="executor") 

116def crawl_issue(issue: "IssueData", source_domain: str, colid: str, url: str, username: str): 

117 crawler = crawler_factory(source_domain, colid, username) 

118 logger.info("crawl issue: %s, %s", source_domain, colid) 

119 crawler.crawl_issue(issue) 

120 

121 

122def filter_issues( 

123 colid: str, 

124 issues: "dict[str, IssueData]", 

125 period: tuple[int, int] = (0, 9999), 

126 number: tuple[int, int] = (0, 99999), 

127 event_dict=None, 

128 only_new=False, 

129): 

130 if event_dict is None: 

131 event_dict = {} 

132 

133 def is_year_in_range(year): 

134 try: 

135 return period[0] <= int(year) <= period[1] 

136 except ValueError: 

137 event_dict["status"] = HistoryEventStatus.ERROR 

138 insert_history_event(event_dict) 

139 logger.error( 

140 "Missing the year or the number property for issues in the collection %s", colid 

141 ) 

142 return False 

143 

144 def is_number_in_range(n): 

145 try: 

146 # n can be "1-2" 

147 if "-" in n or "–" in n: 

148 n = n.split("-" if "-" in n else "–") 

149 return number[0] <= int(n[0]) <= int(n[1]) <= number[1] 

150 # n can be "3" 

151 return number[0] <= int(n) <= number[1] # TODO test this 

152 except ValueError: 

153 event_dict["status"] = HistoryEventStatus.ERROR 

154 insert_history_event(event_dict) 

155 logger.error( 

156 "Missing the year or the number property for issues in the collection %s", colid 

157 ) 

158 return False 

159 

160 all_containers = Container.objects.filter(my_collection__pid=colid).all() 

161 issues = { 

162 pid: issue 

163 for pid, issue in issues.items() 

164 if is_year_in_range(issue.year) and is_number_in_range(issue.number) 

165 } 

166 

167 if only_new: 

168 issues = { 

169 pid: issue 

170 for pid, issue in issues.items() 

171 if not any(container.pid == pid for container in all_containers) 

172 } 

173 

174 return issues