Coverage for src / crawler / tasks.py: 0%
145 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-05-21 12:58 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-05-21 12:58 +0000
1import logging
2import time
3import traceback
4from typing import TYPE_CHECKING
6from celery import shared_task, states
7from celery.exceptions import TimeoutError
8from django.contrib.auth.models import User
9from history.model_data import HistoryEventStatus
10from history.utils import insert_history_event
11from opentelemetry import trace
12from ptf.models import Collection, Container
13from task.custom_task import TaskWithProgress
14from task.runner import TaskAborted
15from task.tasks import increment_progress
17from crawler.factory import crawler_factory
18from crawler.models import Source
19from crawler.utils import get_all_cols
21if TYPE_CHECKING:
22 from history.model_data import HistoryEventDict
23 from ptf.model_data import IssueData
25tracer = trace.get_tracer(__name__)
26logger = logging.getLogger(__name__)
29def crawl_sources(
30 user_name,
31 only_new=False,
32 period: tuple[int, int] = (0, 9999),
33 number: tuple[int, int] = (0, 99999),
34):
35 event_dict: "HistoryEventDict" = {
36 "type": "import-source",
37 "pid": "import all",
38 "col": None,
39 "status": HistoryEventStatus.PENDING,
40 }
41 logger.info("Start crawling all sources")
42 sources = Source.objects.all()
43 try:
44 # we launch the source crawlings concurrently
45 for source in sources:
46 collections = (
47 Collection.objects.filter(content__origin__source=source)
48 .distinct()
49 .order_by("pid")
50 .values("pid")
51 )
52 colids = [col["pid"] for col in collections]
53 crawl_source.delay(colids, source.domain, user_name, only_new, period, number)
54 event_dict["status"] = HistoryEventStatus.OK
56 except BaseException:
57 event_dict["status"] = HistoryEventStatus.ERROR
58 event_dict["message"] = traceback.format_exc()
59 logger.error(event_dict["message"])
60 raise
61 finally:
62 insert_history_event(event_dict)
63 logger.info("history event inserted %s", event_dict["pid"])
66@shared_task(
67 name="crawler.tasks.crawl_source",
68 bind=True,
69 queue="coordinator",
70 base=TaskWithProgress,
71)
72def crawl_source(
73 self: "TaskWithProgress",
74 colids: list,
75 source_domain: str,
76 user_name,
77 only_new=False,
78 period: tuple[int, int] = (0, 9999),
79 number: tuple[int, int] = (0, 99999),
80):
81 event_dict: "HistoryEventDict" = {
82 "type": "import-source",
83 "pid": "import all",
84 "col": source_domain,
85 "status": HistoryEventStatus.PENDING,
86 }
87 logger.info("Start crawling the source: %s", source_domain)
88 try:
89 self.update_state(
90 meta={"current": 0, "total": len(colids), "progress": 0, "col": source_domain},
91 state=states.STARTED,
92 )
93 results = []
94 for col in colids:
95 logger.info("Start crawling the collection: %s", col)
96 promise = crawl_collection.delay(
97 col, source_domain, user_name, only_new, period, number
98 )
99 # as the collection crawlings are sequential, we can manage a progress bar for tasks from the coordinator queue
100 while not promise.ready():
101 try:
102 results.append(
103 promise.get(disable_sync_subtasks=False, propagate=False, timeout=2)
104 )
105 increment_progress.delay(self.request.id)
106 break
107 except TimeoutError:
108 logger.debug("Timeout Error for the collection: %s", col)
109 if not self.is_aborted():
110 continue
111 self.app.control.revoke(promise.id)
112 promise.abort()
113 raise TaskAborted("Task aborted by user")
115 event_dict["status"] = HistoryEventStatus.OK
116 exceptions = [result for result in results if isinstance(result, Exception)]
117 if len(exceptions) > 0:
118 raise ExceptionGroup("Encountered errors while processing subtasks", exceptions)
119 except Exception:
120 event_dict["status"] = HistoryEventStatus.ERROR
121 event_dict["message"] = traceback.format_exc()
122 logger.error(event_dict["message"])
123 raise
124 finally:
125 insert_history_event(event_dict)
128@shared_task(
129 name="crawler.tasks.crawl_collection",
130 bind=True,
131 queue="coordinator",
132 base=TaskWithProgress,
133)
134@tracer.start_as_current_span("CrawlCollectionTask.do")
135def crawl_collection(
136 self: "TaskWithProgress",
137 colid: str,
138 source_domain: str,
139 user_name,
140 only_new=False,
141 period: tuple[int, int] = (0, 9999),
142 number: tuple[int, int] = (0, 99999),
143):
144 logger.debug("craw_collection task")
145 user = User.objects.get(username=user_name)
146 collection = Collection.objects.filter(pid=colid).first()
147 event_dict: "HistoryEventDict" = {
148 "type": "import-collection",
149 "pid": f"{source_domain}",
150 "col": collection,
151 "source": source_domain,
152 "status": HistoryEventStatus.PENDING,
153 }
154 try:
155 event_dict["userid"] = user.pk
156 all_cols = get_all_cols()
157 col_data = all_cols[colid]
158 url = col_data["sources"][source_domain]
160 issue_list = crawl_issue_list(source_domain, colid, url, user_name)
161 if issue_list:
162 issue_list = filter_issues(colid, issue_list, period, number, event_dict, only_new)
163 if issue_list:
164 self.update_state(
165 meta={"current": 0, "total": len(issue_list), "progress": 0, "col": colid},
166 state=states.STARTED,
167 )
169 logger.info(
170 "%s issues to process for the source: %s and the collection: %s",
171 len(issue_list),
172 source_domain,
173 colid,
174 )
175 for issue in issue_list.values():
176 promise = crawl_issue.delay(issue, source_domain, colid, url, user_name)
177 increment_progress.delay(self.request.id)
178 while not promise.ready():
179 time.sleep(2)
180 pass
181 event_dict["status"] = HistoryEventStatus.OK
182 else:
183 event_dict["message"] = (
184 f"No issue to import with the selection for the collection: {colid}"
185 )
186 event_dict["status"] = HistoryEventStatus.OK
187 logger.debug(event_dict["message"])
188 else:
189 event_dict["status"] = HistoryEventStatus.WARNING
190 event_dict["message"] = f"No issue to import for the collection: {colid}"
192 except BaseException:
193 event_dict["status"] = HistoryEventStatus.ERROR
194 event_dict["message"] = traceback.format_exc()
195 logger.error(event_dict["message"])
196 raise
197 finally:
198 insert_history_event(event_dict)
199 logger.info("history event inserted %s", event_dict["pid"])
202@shared_task(name="crawler.tasks.crawl_issue_list", queue="executor")
203def crawl_issue_list(source_domain: str, colid: str, url: str, username: str):
204 crawler = crawler_factory(source_domain, colid, username)
205 return crawler.crawl_collection()
208@shared_task(name="crawler.tasks.crawl_issue", queue="executor")
209def crawl_issue(issue: "IssueData", source_domain: str, colid: str, url: str, username: str):
210 crawler = crawler_factory(source_domain, colid, username)
211 logger.info("crawl issue: %s, %s", source_domain, colid)
212 crawler.crawl_issue(issue)
215def filter_issues(
216 colid: str,
217 issues: "dict[str, IssueData]",
218 period: tuple[int, int] = (0, 9999),
219 number: tuple[int, int] = (0, 99999),
220 event_dict=None,
221 only_new=False,
222):
223 if event_dict is None:
224 event_dict = {}
226 def is_year_in_range(year):
227 try:
228 return period[0] <= int(year) <= period[1]
229 except ValueError:
230 event_dict["status"] = HistoryEventStatus.ERROR
231 insert_history_event(event_dict)
232 logger.error("Missing the year property for issues in the collection %s", colid)
233 return False
235 def is_number_in_range(n):
236 try:
237 # n can be "1-2"
238 if "-" in n or "–" in n:
239 n = n.split("-" if "-" in n else "–")
240 return number[0] <= int(n[0]) <= int(n[1]) <= number[1]
241 # n can be "3"
242 return number[0] <= int(n) <= number[1]
243 except ValueError:
244 # issue.number is not an integer, nor an integer range
245 event_dict["status"] = HistoryEventStatus.ERROR
246 insert_history_event(event_dict)
247 logger.warning(
248 "The number property for issues in the collection %s is not defined or is not a number",
249 colid,
250 )
251 return False
253 if period == (0, 9999) and number != (0, 99999):
254 # no filter expected for the period, let's filter only on the volume number
255 # we select the issue if the issue.number is "" (not always defined)
256 issues = {
257 pid: issue
258 for pid, issue in issues.items()
259 if is_number_in_range(issue.number) or issue.number == ""
260 }
262 if number == (0, 99999) and period != (0, 9999):
263 # no filter expected for the volume numbers, let's filter only on the period
264 issues = {pid: issue for pid, issue in issues.items() if is_year_in_range(issue.year)}
266 if only_new:
267 all_containers = Container.objects.filter(my_collection__pid=colid).all()
268 issues = {
269 pid: issue
270 for pid, issue in issues.items()
271 if not any(container.pid == pid for container in all_containers)
272 }
273 logger.info("issues filtered: %s", issues)
274 return issues