Coverage for src / crawler / tasks.py: 0%

145 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-05-21 12:58 +0000

1import logging 

2import time 

3import traceback 

4from typing import TYPE_CHECKING 

5 

6from celery import shared_task, states 

7from celery.exceptions import TimeoutError 

8from django.contrib.auth.models import User 

9from history.model_data import HistoryEventStatus 

10from history.utils import insert_history_event 

11from opentelemetry import trace 

12from ptf.models import Collection, Container 

13from task.custom_task import TaskWithProgress 

14from task.runner import TaskAborted 

15from task.tasks import increment_progress 

16 

17from crawler.factory import crawler_factory 

18from crawler.models import Source 

19from crawler.utils import get_all_cols 

20 

21if TYPE_CHECKING: 

22 from history.model_data import HistoryEventDict 

23 from ptf.model_data import IssueData 

24 

25tracer = trace.get_tracer(__name__) 

26logger = logging.getLogger(__name__) 

27 

28 

29def crawl_sources( 

30 user_name, 

31 only_new=False, 

32 period: tuple[int, int] = (0, 9999), 

33 number: tuple[int, int] = (0, 99999), 

34): 

35 event_dict: "HistoryEventDict" = { 

36 "type": "import-source", 

37 "pid": "import all", 

38 "col": None, 

39 "status": HistoryEventStatus.PENDING, 

40 } 

41 logger.info("Start crawling all sources") 

42 sources = Source.objects.all() 

43 try: 

44 # we launch the source crawlings concurrently 

45 for source in sources: 

46 collections = ( 

47 Collection.objects.filter(content__origin__source=source) 

48 .distinct() 

49 .order_by("pid") 

50 .values("pid") 

51 ) 

52 colids = [col["pid"] for col in collections] 

53 crawl_source.delay(colids, source.domain, user_name, only_new, period, number) 

54 event_dict["status"] = HistoryEventStatus.OK 

55 

56 except BaseException: 

57 event_dict["status"] = HistoryEventStatus.ERROR 

58 event_dict["message"] = traceback.format_exc() 

59 logger.error(event_dict["message"]) 

60 raise 

61 finally: 

62 insert_history_event(event_dict) 

63 logger.info("history event inserted %s", event_dict["pid"]) 

64 

65 

66@shared_task( 

67 name="crawler.tasks.crawl_source", 

68 bind=True, 

69 queue="coordinator", 

70 base=TaskWithProgress, 

71) 

72def crawl_source( 

73 self: "TaskWithProgress", 

74 colids: list, 

75 source_domain: str, 

76 user_name, 

77 only_new=False, 

78 period: tuple[int, int] = (0, 9999), 

79 number: tuple[int, int] = (0, 99999), 

80): 

81 event_dict: "HistoryEventDict" = { 

82 "type": "import-source", 

83 "pid": "import all", 

84 "col": source_domain, 

85 "status": HistoryEventStatus.PENDING, 

86 } 

87 logger.info("Start crawling the source: %s", source_domain) 

88 try: 

89 self.update_state( 

90 meta={"current": 0, "total": len(colids), "progress": 0, "col": source_domain}, 

91 state=states.STARTED, 

92 ) 

93 results = [] 

94 for col in colids: 

95 logger.info("Start crawling the collection: %s", col) 

96 promise = crawl_collection.delay( 

97 col, source_domain, user_name, only_new, period, number 

98 ) 

99 # as the collection crawlings are sequential, we can manage a progress bar for tasks from the coordinator queue 

100 while not promise.ready(): 

101 try: 

102 results.append( 

103 promise.get(disable_sync_subtasks=False, propagate=False, timeout=2) 

104 ) 

105 increment_progress.delay(self.request.id) 

106 break 

107 except TimeoutError: 

108 logger.debug("Timeout Error for the collection: %s", col) 

109 if not self.is_aborted(): 

110 continue 

111 self.app.control.revoke(promise.id) 

112 promise.abort() 

113 raise TaskAborted("Task aborted by user") 

114 

115 event_dict["status"] = HistoryEventStatus.OK 

116 exceptions = [result for result in results if isinstance(result, Exception)] 

117 if len(exceptions) > 0: 

118 raise ExceptionGroup("Encountered errors while processing subtasks", exceptions) 

119 except Exception: 

120 event_dict["status"] = HistoryEventStatus.ERROR 

121 event_dict["message"] = traceback.format_exc() 

122 logger.error(event_dict["message"]) 

123 raise 

124 finally: 

125 insert_history_event(event_dict) 

126 

127 

128@shared_task( 

129 name="crawler.tasks.crawl_collection", 

130 bind=True, 

131 queue="coordinator", 

132 base=TaskWithProgress, 

133) 

134@tracer.start_as_current_span("CrawlCollectionTask.do") 

135def crawl_collection( 

136 self: "TaskWithProgress", 

137 colid: str, 

138 source_domain: str, 

139 user_name, 

140 only_new=False, 

141 period: tuple[int, int] = (0, 9999), 

142 number: tuple[int, int] = (0, 99999), 

143): 

144 logger.debug("craw_collection task") 

145 user = User.objects.get(username=user_name) 

146 collection = Collection.objects.filter(pid=colid).first() 

147 event_dict: "HistoryEventDict" = { 

148 "type": "import-collection", 

149 "pid": f"{source_domain}", 

150 "col": collection, 

151 "source": source_domain, 

152 "status": HistoryEventStatus.PENDING, 

153 } 

154 try: 

155 event_dict["userid"] = user.pk 

156 all_cols = get_all_cols() 

157 col_data = all_cols[colid] 

158 url = col_data["sources"][source_domain] 

159 

160 issue_list = crawl_issue_list(source_domain, colid, url, user_name) 

161 if issue_list: 

162 issue_list = filter_issues(colid, issue_list, period, number, event_dict, only_new) 

163 if issue_list: 

164 self.update_state( 

165 meta={"current": 0, "total": len(issue_list), "progress": 0, "col": colid}, 

166 state=states.STARTED, 

167 ) 

168 

169 logger.info( 

170 "%s issues to process for the source: %s and the collection: %s", 

171 len(issue_list), 

172 source_domain, 

173 colid, 

174 ) 

175 for issue in issue_list.values(): 

176 promise = crawl_issue.delay(issue, source_domain, colid, url, user_name) 

177 increment_progress.delay(self.request.id) 

178 while not promise.ready(): 

179 time.sleep(2) 

180 pass 

181 event_dict["status"] = HistoryEventStatus.OK 

182 else: 

183 event_dict["message"] = ( 

184 f"No issue to import with the selection for the collection: {colid}" 

185 ) 

186 event_dict["status"] = HistoryEventStatus.OK 

187 logger.debug(event_dict["message"]) 

188 else: 

189 event_dict["status"] = HistoryEventStatus.WARNING 

190 event_dict["message"] = f"No issue to import for the collection: {colid}" 

191 

192 except BaseException: 

193 event_dict["status"] = HistoryEventStatus.ERROR 

194 event_dict["message"] = traceback.format_exc() 

195 logger.error(event_dict["message"]) 

196 raise 

197 finally: 

198 insert_history_event(event_dict) 

199 logger.info("history event inserted %s", event_dict["pid"]) 

200 

201 

202@shared_task(name="crawler.tasks.crawl_issue_list", queue="executor") 

203def crawl_issue_list(source_domain: str, colid: str, url: str, username: str): 

204 crawler = crawler_factory(source_domain, colid, username) 

205 return crawler.crawl_collection() 

206 

207 

208@shared_task(name="crawler.tasks.crawl_issue", queue="executor") 

209def crawl_issue(issue: "IssueData", source_domain: str, colid: str, url: str, username: str): 

210 crawler = crawler_factory(source_domain, colid, username) 

211 logger.info("crawl issue: %s, %s", source_domain, colid) 

212 crawler.crawl_issue(issue) 

213 

214 

215def filter_issues( 

216 colid: str, 

217 issues: "dict[str, IssueData]", 

218 period: tuple[int, int] = (0, 9999), 

219 number: tuple[int, int] = (0, 99999), 

220 event_dict=None, 

221 only_new=False, 

222): 

223 if event_dict is None: 

224 event_dict = {} 

225 

226 def is_year_in_range(year): 

227 try: 

228 return period[0] <= int(year) <= period[1] 

229 except ValueError: 

230 event_dict["status"] = HistoryEventStatus.ERROR 

231 insert_history_event(event_dict) 

232 logger.error("Missing the year property for issues in the collection %s", colid) 

233 return False 

234 

235 def is_number_in_range(n): 

236 try: 

237 # n can be "1-2" 

238 if "-" in n or "–" in n: 

239 n = n.split("-" if "-" in n else "–") 

240 return number[0] <= int(n[0]) <= int(n[1]) <= number[1] 

241 # n can be "3" 

242 return number[0] <= int(n) <= number[1] 

243 except ValueError: 

244 # issue.number is not an integer, nor an integer range 

245 event_dict["status"] = HistoryEventStatus.ERROR 

246 insert_history_event(event_dict) 

247 logger.warning( 

248 "The number property for issues in the collection %s is not defined or is not a number", 

249 colid, 

250 ) 

251 return False 

252 

253 if period == (0, 9999) and number != (0, 99999): 

254 # no filter expected for the period, let's filter only on the volume number 

255 # we select the issue if the issue.number is "" (not always defined) 

256 issues = { 

257 pid: issue 

258 for pid, issue in issues.items() 

259 if is_number_in_range(issue.number) or issue.number == "" 

260 } 

261 

262 if number == (0, 99999) and period != (0, 9999): 

263 # no filter expected for the volume numbers, let's filter only on the period 

264 issues = {pid: issue for pid, issue in issues.items() if is_year_in_range(issue.year)} 

265 

266 if only_new: 

267 all_containers = Container.objects.filter(my_collection__pid=colid).all() 

268 issues = { 

269 pid: issue 

270 for pid, issue in issues.items() 

271 if not any(container.pid == pid for container in all_containers) 

272 } 

273 logger.info("issues filtered: %s", issues) 

274 return issues