Coverage for src / crawler / tasks.py: 0%

124 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-04-30 12:41 +0000

1import logging 

2import time 

3import traceback 

4from typing import TYPE_CHECKING 

5 

6from celery import shared_task, states 

7from celery.exceptions import TimeoutError 

8from django.contrib.auth.models import User 

9from history.model_data import HistoryEventStatus 

10from history.utils import insert_history_event 

11from opentelemetry import trace 

12from ptf.models import Collection, Container 

13from task.custom_task import TaskWithProgress 

14from task.runner import TaskAborted 

15from task.tasks import increment_progress 

16 

17from crawler.factory import crawler_factory 

18from crawler.utils import get_all_cols 

19 

20if TYPE_CHECKING: 

21 from history.model_data import HistoryEventDict 

22 from ptf.model_data import IssueData 

23 

24tracer = trace.get_tracer(__name__) 

25logger = logging.getLogger(__name__) 

26 

27 

28@shared_task( 

29 name="crawler.tasks.crawl_source", 

30 bind=True, 

31 queue="coordinator", 

32 base=TaskWithProgress, 

33) 

34def crawl_source( 

35 self: "TaskWithProgress", 

36 colids: list, 

37 source_domain: str, 

38 user_name, 

39 only_new=False, 

40 period: tuple[int, int] = (0, 9999), 

41 number: tuple[int, int] = (0, 99999), 

42): 

43 event_dict: "HistoryEventDict" = { 

44 "type": "import-source", 

45 "pid": "import all", 

46 "col": source_domain, 

47 "status": HistoryEventStatus.PENDING, 

48 } 

49 logger.info("Start crawling the source: %s", source_domain) 

50 try: 

51 self.update_state( 

52 meta={"current": 0, "total": len(colids), "progress": 0, "col": source_domain}, 

53 state=states.STARTED, 

54 ) 

55 results = [] 

56 for col in colids: 

57 logger.info("Start crawling the collection: %s", col) 

58 promise = crawl_collection.delay( 

59 col, source_domain, user_name, only_new, period, number 

60 ) 

61 # as the collection crawlings are sequential, we can manage a progress bar for tasks from the coordinator queue 

62 while not promise.ready(): 

63 try: 

64 results.append( 

65 promise.get(disable_sync_subtasks=False, propagate=False, timeout=2) 

66 ) 

67 increment_progress.delay(self.request.id) 

68 break 

69 except TimeoutError: 

70 logger.debug("Timeout Error for the collection: %s", col) 

71 if not self.is_aborted(): 

72 continue 

73 self.app.control.revoke(promise.id) 

74 promise.abort() 

75 raise TaskAborted("Task aborted by user") 

76 

77 event_dict["status"] = HistoryEventStatus.OK 

78 exceptions = [result for result in results if isinstance(result, Exception)] 

79 if len(exceptions) > 0: 

80 raise ExceptionGroup("Encountered errors while processing subtasks", exceptions) 

81 except Exception: 

82 event_dict["status"] = HistoryEventStatus.ERROR 

83 event_dict["message"] = traceback.format_exc() 

84 logger.error(event_dict["message"]) 

85 raise 

86 finally: 

87 insert_history_event(event_dict) 

88 

89 

90@shared_task( 

91 name="crawler.tasks.crawl_collection", 

92 bind=True, 

93 queue="coordinator", 

94 base=TaskWithProgress, 

95) 

96@tracer.start_as_current_span("CrawlCollectionTask.do") 

97def crawl_collection( 

98 self: "TaskWithProgress", 

99 colid: str, 

100 source_domain: str, 

101 user_name, 

102 only_new=False, 

103 period: tuple[int, int] = (0, 9999), 

104 number: tuple[int, int] = (0, 99999), 

105): 

106 logger.debug("craw_collection task") 

107 user = User.objects.get(username=user_name) 

108 collection = Collection.objects.get(pid=colid) 

109 event_dict: "HistoryEventDict" = { 

110 "type": "import-collection", 

111 "pid": f"{source_domain}", 

112 "col": collection, 

113 "source": source_domain, 

114 "status": HistoryEventStatus.PENDING, 

115 } 

116 try: 

117 event_dict["userid"] = user.pk 

118 all_cols = get_all_cols() 

119 col_data = all_cols[colid] 

120 url = col_data["sources"][source_domain] 

121 

122 issue_list = crawl_issue_list(source_domain, colid, url, user_name) 

123 if not issue_list: 

124 raise ValueError("No issue found") 

125 issue_list = filter_issues(colid, issue_list, period, number, event_dict, only_new) 

126 

127 if issue_list: 

128 self.update_state( 

129 meta={"current": 0, "total": len(issue_list), "progress": 0, "col": colid}, 

130 state=states.STARTED, 

131 ) 

132 else: 

133 event_dict["message"] = "No issue to update with the selection" 

134 logger.info( 

135 "%s issues to process for the source: %s and the collection: %s", 

136 len(issue_list), 

137 source_domain, 

138 collection.pid, 

139 ) 

140 for issue in issue_list.values(): 

141 promise = crawl_issue.delay(issue, source_domain, colid, url, user_name) 

142 increment_progress.delay(self.request.id) 

143 while not promise.ready(): 

144 time.sleep(2) 

145 pass 

146 event_dict["status"] = HistoryEventStatus.OK 

147 

148 except BaseException: 

149 event_dict["status"] = HistoryEventStatus.ERROR 

150 event_dict["message"] = traceback.format_exc() 

151 logger.error(event_dict["message"]) 

152 raise 

153 finally: 

154 insert_history_event(event_dict) 

155 logger.info("history event inserted %s", event_dict["pid"]) 

156 

157 

158@shared_task(name="crawler.tasks.crawl_issue_list", queue="executor") 

159def crawl_issue_list(source_domain: str, colid: str, url: str, username: str): 

160 crawler = crawler_factory(source_domain, colid, username) 

161 return crawler.crawl_collection() 

162 

163 

164@shared_task(name="crawler.tasks.crawl_issue", queue="executor") 

165def crawl_issue(issue: "IssueData", source_domain: str, colid: str, url: str, username: str): 

166 crawler = crawler_factory(source_domain, colid, username) 

167 logger.info("crawl issue: %s, %s", source_domain, colid) 

168 crawler.crawl_issue(issue) 

169 

170 

171def filter_issues( 

172 colid: str, 

173 issues: "dict[str, IssueData]", 

174 period: tuple[int, int] = (0, 9999), 

175 number: tuple[int, int] = (0, 99999), 

176 event_dict=None, 

177 only_new=False, 

178): 

179 if event_dict is None: 

180 event_dict = {} 

181 

182 def is_year_in_range(year): 

183 try: 

184 return period[0] <= int(year) <= period[1] 

185 except ValueError: 

186 event_dict["status"] = HistoryEventStatus.ERROR 

187 insert_history_event(event_dict) 

188 logger.error("Missing the year property for issues in the collection %s", colid) 

189 return False 

190 

191 def is_number_in_range(n): 

192 try: 

193 # n can be "1-2" 

194 if "-" in n or "–" in n: 

195 n = n.split("-" if "-" in n else "–") 

196 return number[0] <= int(n[0]) <= int(n[1]) <= number[1] 

197 # n can be "3" 

198 return number[0] <= int(n) <= number[1] 

199 except ValueError: 

200 # issue.number is not an integer, nor an integer range 

201 event_dict["status"] = HistoryEventStatus.ERROR 

202 insert_history_event(event_dict) 

203 logger.warning( 

204 "The number property for issues in the collection %s is not defined or is not a number", 

205 colid, 

206 ) 

207 return False 

208 

209 all_containers = Container.objects.filter(my_collection__pid=colid).all() 

210 

211 if period == (0, 9999) and number != (0, 99999): 

212 # no filter expected for the period, let's filter only on the volume number 

213 # we select the issue if the issue.number is "" (not always defined) 

214 issues = { 

215 pid: issue 

216 for pid, issue in issues.items() 

217 if is_number_in_range(issue.number) or issue.number == "" 

218 } 

219 

220 if number == (0, 99999) and period != (0, 9999): 

221 # no filter expected for the volume numbers, let's filter only on the period 

222 issues = {pid: issue for pid, issue in issues.items() if is_year_in_range(issue.year)} 

223 

224 if only_new: 

225 issues = { 

226 pid: issue 

227 for pid, issue in issues.items() 

228 if not any(container.pid == pid for container in all_containers) 

229 } 

230 logger.info("issues filtered: %s", issues) 

231 return issues