Coverage for src / crawler / tasks.py: 0%
124 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-06-19 13:33 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-06-19 13:33 +0000
1import logging
2import traceback
3from typing import TYPE_CHECKING
5from celery import shared_task, states
6from django.contrib.auth.models import User
7from history.model_data import HistoryEventStatus
8from history.utils import insert_history_event
9from opentelemetry import trace
10from ptf.models import Collection, Container
11from task.custom_task import TaskWithProgress
12from task.tasks import increment_progress
14from crawler.factory import crawler_factory
15from crawler.models import Source
16from crawler.utils import get_all_cols
18if TYPE_CHECKING:
19 from history.model_data import HistoryEventDict
20 from ptf.model_data import IssueData
22tracer = trace.get_tracer(__name__)
23logger = logging.getLogger(__name__)
26def crawl_sources(
27 user_name,
28 only_new=False,
29 period: tuple[int, int] = (0, 9999),
30 number: tuple[int, int] = (0, 99999),
31):
32 logger.info("Start crawling all sources")
33 sources = Source.objects.all()
35 # we launch the source crawlings concurrently
36 for source in sources:
37 collections = (
38 Collection.objects.filter(content__origin__source=source)
39 .distinct()
40 .order_by("pid")
41 .values("pid")
42 )
43 colids = [col["pid"] for col in collections]
44 crawl_source.delay(colids, source.domain, user_name, only_new, period, number)
47@shared_task(
48 name="crawler.tasks.crawl_source",
49 bind=True,
50 queue="coordinator",
51 base=TaskWithProgress,
52)
53def crawl_source(
54 self: "TaskWithProgress",
55 colids: list,
56 source_domain: str,
57 user_name,
58 only_new=False,
59 period: tuple[int, int] = (0, 9999),
60 number: tuple[int, int] = (0, 99999),
61):
62 event_dict: "HistoryEventDict" = {
63 "type": "import-source",
64 "pid": "import all",
65 "col": None,
66 "status": HistoryEventStatus.PENDING,
67 }
68 logger.info("Start crawling the source: %s", source_domain)
69 try:
70 self.update_state(
71 meta={"current": 0, "total": len(colids), "progress": 0, "col": source_domain},
72 state=states.STARTED,
73 )
74 results = []
75 for col in colids:
76 logger.info("Start crawling the collection: %s", col)
77 promise = crawl_collection.delay(
78 col, source_domain, user_name, only_new, period, number
79 )
81 result = self.wait_child(promise)
82 results.append(result)
83 increment_progress.delay(self.request.id)
85 event_dict["status"] = HistoryEventStatus.OK
86 exceptions = [result for result in results if isinstance(result, Exception)]
87 if len(exceptions) > 0:
88 raise ExceptionGroup("Encountered errors while processing subtasks", exceptions)
89 except Exception:
90 event_dict["status"] = HistoryEventStatus.ERROR
91 event_dict["message"] = traceback.format_exc()
92 logger.error(event_dict["message"])
93 raise
94 finally:
95 insert_history_event(event_dict)
98@shared_task(
99 name="crawler.tasks.crawl_collection",
100 bind=True,
101 queue="coordinator",
102 base=TaskWithProgress,
103)
104@tracer.start_as_current_span("CrawlCollectionTask.do")
105def crawl_collection(
106 task: "TaskWithProgress",
107 colid: str,
108 source_domain: str,
109 user_name,
110 only_new=False,
111 period: tuple[int, int] = (0, 9999),
112 number: tuple[int, int] = (0, 99999),
113):
114 event_dict: "HistoryEventDict" = {
115 "type": "import-collection",
116 "pid": f"{colid}-{source_domain}",
117 "col": None,
118 "source": source_domain,
119 "status": HistoryEventStatus.PENDING,
120 }
122 try:
123 logger.debug("craw_collection task")
124 user = User.objects.get(username=user_name)
125 collection = Collection.objects.filter(pid=colid).first()
126 event_dict["col"] = collection
128 event_dict["userid"] = user.pk
129 all_cols = get_all_cols()
130 col_data = all_cols[colid]
131 url = col_data["sources"][source_domain]
133 issue_list = crawl_issue_list(source_domain, colid, url, user_name)
134 if not issue_list:
135 event_dict["status"] = HistoryEventStatus.WARNING
136 event_dict["message"] = f"No issue to import for the collection: {colid}"
137 return
139 issue_list = filter_issues(colid, issue_list, period, number, event_dict, only_new)
140 if not issue_list:
141 event_dict["message"] = (
142 f"No issue to import with the selection for the collection: {colid}"
143 )
144 event_dict["status"] = HistoryEventStatus.OK
145 logger.debug(event_dict["message"])
146 return
148 task.update_state(
149 meta={"current": 0, "total": len(issue_list), "progress": 0, "col": colid},
150 state=states.STARTED,
151 )
153 logger.info(
154 "%s issues to process for the source: %s and the collection: %s",
155 len(issue_list),
156 source_domain,
157 colid,
158 )
159 for issue in issue_list.values():
160 promise = crawl_issue.delay(issue, source_domain, colid, url, user_name)
161 task.wait_child(promise)
162 increment_progress.delay(task.request.id)
164 event_dict["status"] = HistoryEventStatus.OK
166 except BaseException:
167 event_dict["status"] = HistoryEventStatus.ERROR
168 event_dict["message"] = traceback.format_exc()
169 logger.error(event_dict["message"])
170 raise
171 finally:
172 insert_history_event(event_dict)
173 logger.info("history event inserted %s", event_dict["pid"])
176@shared_task(
177 name="crawler.tasks.crawl_issue_list", bind=True, base=TaskWithProgress, queue="executor"
178)
179def crawl_issue_list(
180 self: "TaskWithProgress", source_domain: str, colid: str, url: str, username: str
181):
182 crawler = crawler_factory(source_domain, colid, username, pause_function=self.wait)
184 return crawler.crawl_collection()
187@shared_task(name="crawler.tasks.crawl_issue", queue="executor", bind=True, base=TaskWithProgress)
188def crawl_issue(
189 self: "TaskWithProgress",
190 issue: "IssueData",
191 source_domain: str,
192 colid: str,
193 url: str,
194 username: str,
195):
196 crawler = crawler_factory(source_domain, colid, username, pause_function=self.wait)
197 logger.info("crawl issue: %s, %s", source_domain, colid)
198 crawler.crawl_issue(issue)
201def filter_issues(
202 colid: str,
203 issues: "dict[str, IssueData]",
204 period: tuple[int, int] = (0, 9999),
205 number: tuple[int, int] = (0, 99999),
206 event_dict=None,
207 only_new=False,
208):
209 if event_dict is None:
210 event_dict = {}
212 def is_year_in_range(year):
213 try:
214 return period[0] <= int(year) <= period[1]
215 except ValueError:
216 event_dict["status"] = HistoryEventStatus.ERROR
217 insert_history_event(event_dict)
218 logger.error("Missing the year property for issues in the collection %s", colid)
219 return False
221 def is_number_in_range(n):
222 try:
223 # n can be "1-2"
224 if "-" in n or "–" in n:
225 n = n.split("-" if "-" in n else "–")
226 return number[0] <= int(n[0]) <= int(n[1]) <= number[1]
227 # n can be "3"
228 return number[0] <= int(n) <= number[1]
229 except ValueError:
230 # issue.number is not an integer, nor an integer range
231 event_dict["status"] = HistoryEventStatus.ERROR
232 insert_history_event(event_dict)
233 logger.warning(
234 "The number property for issues in the collection %s is not defined or is not a number",
235 colid,
236 )
237 return False
239 if period == (0, 9999) and number != (0, 99999):
240 # no filter expected for the period, let's filter only on the volume number
241 # we select the issue if the issue.number is "" (not always defined)
242 issues = {
243 pid: issue
244 for pid, issue in issues.items()
245 if is_number_in_range(issue.number) or issue.number == ""
246 }
248 if number == (0, 99999) and period != (0, 9999):
249 # no filter expected for the volume numbers, let's filter only on the period
250 issues = {pid: issue for pid, issue in issues.items() if is_year_in_range(issue.year)}
252 if only_new:
253 all_containers = Container.objects.filter(my_collection__pid=colid).all()
254 issues = {
255 pid: issue
256 for pid, issue in issues.items()
257 if not any(container.pid == pid for container in all_containers)
258 }
259 logger.info("issues filtered: %s", issues)
260 return issues