Coverage for src / crawler / tasks.py: 0%

124 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-06-19 13:33 +0000

1import logging 

2import traceback 

3from typing import TYPE_CHECKING 

4 

5from celery import shared_task, states 

6from django.contrib.auth.models import User 

7from history.model_data import HistoryEventStatus 

8from history.utils import insert_history_event 

9from opentelemetry import trace 

10from ptf.models import Collection, Container 

11from task.custom_task import TaskWithProgress 

12from task.tasks import increment_progress 

13 

14from crawler.factory import crawler_factory 

15from crawler.models import Source 

16from crawler.utils import get_all_cols 

17 

18if TYPE_CHECKING: 

19 from history.model_data import HistoryEventDict 

20 from ptf.model_data import IssueData 

21 

22tracer = trace.get_tracer(__name__) 

23logger = logging.getLogger(__name__) 

24 

25 

26def crawl_sources( 

27 user_name, 

28 only_new=False, 

29 period: tuple[int, int] = (0, 9999), 

30 number: tuple[int, int] = (0, 99999), 

31): 

32 logger.info("Start crawling all sources") 

33 sources = Source.objects.all() 

34 

35 # we launch the source crawlings concurrently 

36 for source in sources: 

37 collections = ( 

38 Collection.objects.filter(content__origin__source=source) 

39 .distinct() 

40 .order_by("pid") 

41 .values("pid") 

42 ) 

43 colids = [col["pid"] for col in collections] 

44 crawl_source.delay(colids, source.domain, user_name, only_new, period, number) 

45 

46 

47@shared_task( 

48 name="crawler.tasks.crawl_source", 

49 bind=True, 

50 queue="coordinator", 

51 base=TaskWithProgress, 

52) 

53def crawl_source( 

54 self: "TaskWithProgress", 

55 colids: list, 

56 source_domain: str, 

57 user_name, 

58 only_new=False, 

59 period: tuple[int, int] = (0, 9999), 

60 number: tuple[int, int] = (0, 99999), 

61): 

62 event_dict: "HistoryEventDict" = { 

63 "type": "import-source", 

64 "pid": "import all", 

65 "col": None, 

66 "status": HistoryEventStatus.PENDING, 

67 } 

68 logger.info("Start crawling the source: %s", source_domain) 

69 try: 

70 self.update_state( 

71 meta={"current": 0, "total": len(colids), "progress": 0, "col": source_domain}, 

72 state=states.STARTED, 

73 ) 

74 results = [] 

75 for col in colids: 

76 logger.info("Start crawling the collection: %s", col) 

77 promise = crawl_collection.delay( 

78 col, source_domain, user_name, only_new, period, number 

79 ) 

80 

81 result = self.wait_child(promise) 

82 results.append(result) 

83 increment_progress.delay(self.request.id) 

84 

85 event_dict["status"] = HistoryEventStatus.OK 

86 exceptions = [result for result in results if isinstance(result, Exception)] 

87 if len(exceptions) > 0: 

88 raise ExceptionGroup("Encountered errors while processing subtasks", exceptions) 

89 except Exception: 

90 event_dict["status"] = HistoryEventStatus.ERROR 

91 event_dict["message"] = traceback.format_exc() 

92 logger.error(event_dict["message"]) 

93 raise 

94 finally: 

95 insert_history_event(event_dict) 

96 

97 

98@shared_task( 

99 name="crawler.tasks.crawl_collection", 

100 bind=True, 

101 queue="coordinator", 

102 base=TaskWithProgress, 

103) 

104@tracer.start_as_current_span("CrawlCollectionTask.do") 

105def crawl_collection( 

106 task: "TaskWithProgress", 

107 colid: str, 

108 source_domain: str, 

109 user_name, 

110 only_new=False, 

111 period: tuple[int, int] = (0, 9999), 

112 number: tuple[int, int] = (0, 99999), 

113): 

114 event_dict: "HistoryEventDict" = { 

115 "type": "import-collection", 

116 "pid": f"{colid}-{source_domain}", 

117 "col": None, 

118 "source": source_domain, 

119 "status": HistoryEventStatus.PENDING, 

120 } 

121 

122 try: 

123 logger.debug("craw_collection task") 

124 user = User.objects.get(username=user_name) 

125 collection = Collection.objects.filter(pid=colid).first() 

126 event_dict["col"] = collection 

127 

128 event_dict["userid"] = user.pk 

129 all_cols = get_all_cols() 

130 col_data = all_cols[colid] 

131 url = col_data["sources"][source_domain] 

132 

133 issue_list = crawl_issue_list(source_domain, colid, url, user_name) 

134 if not issue_list: 

135 event_dict["status"] = HistoryEventStatus.WARNING 

136 event_dict["message"] = f"No issue to import for the collection: {colid}" 

137 return 

138 

139 issue_list = filter_issues(colid, issue_list, period, number, event_dict, only_new) 

140 if not issue_list: 

141 event_dict["message"] = ( 

142 f"No issue to import with the selection for the collection: {colid}" 

143 ) 

144 event_dict["status"] = HistoryEventStatus.OK 

145 logger.debug(event_dict["message"]) 

146 return 

147 

148 task.update_state( 

149 meta={"current": 0, "total": len(issue_list), "progress": 0, "col": colid}, 

150 state=states.STARTED, 

151 ) 

152 

153 logger.info( 

154 "%s issues to process for the source: %s and the collection: %s", 

155 len(issue_list), 

156 source_domain, 

157 colid, 

158 ) 

159 for issue in issue_list.values(): 

160 promise = crawl_issue.delay(issue, source_domain, colid, url, user_name) 

161 task.wait_child(promise) 

162 increment_progress.delay(task.request.id) 

163 

164 event_dict["status"] = HistoryEventStatus.OK 

165 

166 except BaseException: 

167 event_dict["status"] = HistoryEventStatus.ERROR 

168 event_dict["message"] = traceback.format_exc() 

169 logger.error(event_dict["message"]) 

170 raise 

171 finally: 

172 insert_history_event(event_dict) 

173 logger.info("history event inserted %s", event_dict["pid"]) 

174 

175 

176@shared_task( 

177 name="crawler.tasks.crawl_issue_list", bind=True, base=TaskWithProgress, queue="executor" 

178) 

179def crawl_issue_list( 

180 self: "TaskWithProgress", source_domain: str, colid: str, url: str, username: str 

181): 

182 crawler = crawler_factory(source_domain, colid, username, pause_function=self.wait) 

183 

184 return crawler.crawl_collection() 

185 

186 

187@shared_task(name="crawler.tasks.crawl_issue", queue="executor", bind=True, base=TaskWithProgress) 

188def crawl_issue( 

189 self: "TaskWithProgress", 

190 issue: "IssueData", 

191 source_domain: str, 

192 colid: str, 

193 url: str, 

194 username: str, 

195): 

196 crawler = crawler_factory(source_domain, colid, username, pause_function=self.wait) 

197 logger.info("crawl issue: %s, %s", source_domain, colid) 

198 crawler.crawl_issue(issue) 

199 

200 

201def filter_issues( 

202 colid: str, 

203 issues: "dict[str, IssueData]", 

204 period: tuple[int, int] = (0, 9999), 

205 number: tuple[int, int] = (0, 99999), 

206 event_dict=None, 

207 only_new=False, 

208): 

209 if event_dict is None: 

210 event_dict = {} 

211 

212 def is_year_in_range(year): 

213 try: 

214 return period[0] <= int(year) <= period[1] 

215 except ValueError: 

216 event_dict["status"] = HistoryEventStatus.ERROR 

217 insert_history_event(event_dict) 

218 logger.error("Missing the year property for issues in the collection %s", colid) 

219 return False 

220 

221 def is_number_in_range(n): 

222 try: 

223 # n can be "1-2" 

224 if "-" in n or "–" in n: 

225 n = n.split("-" if "-" in n else "–") 

226 return number[0] <= int(n[0]) <= int(n[1]) <= number[1] 

227 # n can be "3" 

228 return number[0] <= int(n) <= number[1] 

229 except ValueError: 

230 # issue.number is not an integer, nor an integer range 

231 event_dict["status"] = HistoryEventStatus.ERROR 

232 insert_history_event(event_dict) 

233 logger.warning( 

234 "The number property for issues in the collection %s is not defined or is not a number", 

235 colid, 

236 ) 

237 return False 

238 

239 if period == (0, 9999) and number != (0, 99999): 

240 # no filter expected for the period, let's filter only on the volume number 

241 # we select the issue if the issue.number is "" (not always defined) 

242 issues = { 

243 pid: issue 

244 for pid, issue in issues.items() 

245 if is_number_in_range(issue.number) or issue.number == "" 

246 } 

247 

248 if number == (0, 99999) and period != (0, 9999): 

249 # no filter expected for the volume numbers, let's filter only on the period 

250 issues = {pid: issue for pid, issue in issues.items() if is_year_in_range(issue.year)} 

251 

252 if only_new: 

253 all_containers = Container.objects.filter(my_collection__pid=colid).all() 

254 issues = { 

255 pid: issue 

256 for pid, issue in issues.items() 

257 if not any(container.pid == pid for container in all_containers) 

258 } 

259 logger.info("issues filtered: %s", issues) 

260 return issues