Coverage for src / crawler / tasks.py: 0%
124 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-04-30 12:41 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-04-30 12:41 +0000
1import logging
2import time
3import traceback
4from typing import TYPE_CHECKING
6from celery import shared_task, states
7from celery.exceptions import TimeoutError
8from django.contrib.auth.models import User
9from history.model_data import HistoryEventStatus
10from history.utils import insert_history_event
11from opentelemetry import trace
12from ptf.models import Collection, Container
13from task.custom_task import TaskWithProgress
14from task.runner import TaskAborted
15from task.tasks import increment_progress
17from crawler.factory import crawler_factory
18from crawler.utils import get_all_cols
20if TYPE_CHECKING:
21 from history.model_data import HistoryEventDict
22 from ptf.model_data import IssueData
24tracer = trace.get_tracer(__name__)
25logger = logging.getLogger(__name__)
28@shared_task(
29 name="crawler.tasks.crawl_source",
30 bind=True,
31 queue="coordinator",
32 base=TaskWithProgress,
33)
34def crawl_source(
35 self: "TaskWithProgress",
36 colids: list,
37 source_domain: str,
38 user_name,
39 only_new=False,
40 period: tuple[int, int] = (0, 9999),
41 number: tuple[int, int] = (0, 99999),
42):
43 event_dict: "HistoryEventDict" = {
44 "type": "import-source",
45 "pid": "import all",
46 "col": source_domain,
47 "status": HistoryEventStatus.PENDING,
48 }
49 logger.info("Start crawling the source: %s", source_domain)
50 try:
51 self.update_state(
52 meta={"current": 0, "total": len(colids), "progress": 0, "col": source_domain},
53 state=states.STARTED,
54 )
55 results = []
56 for col in colids:
57 logger.info("Start crawling the collection: %s", col)
58 promise = crawl_collection.delay(
59 col, source_domain, user_name, only_new, period, number
60 )
61 # as the collection crawlings are sequential, we can manage a progress bar for tasks from the coordinator queue
62 while not promise.ready():
63 try:
64 results.append(
65 promise.get(disable_sync_subtasks=False, propagate=False, timeout=2)
66 )
67 increment_progress.delay(self.request.id)
68 break
69 except TimeoutError:
70 logger.debug("Timeout Error for the collection: %s", col)
71 if not self.is_aborted():
72 continue
73 self.app.control.revoke(promise.id)
74 promise.abort()
75 raise TaskAborted("Task aborted by user")
77 event_dict["status"] = HistoryEventStatus.OK
78 exceptions = [result for result in results if isinstance(result, Exception)]
79 if len(exceptions) > 0:
80 raise ExceptionGroup("Encountered errors while processing subtasks", exceptions)
81 except Exception:
82 event_dict["status"] = HistoryEventStatus.ERROR
83 event_dict["message"] = traceback.format_exc()
84 logger.error(event_dict["message"])
85 raise
86 finally:
87 insert_history_event(event_dict)
90@shared_task(
91 name="crawler.tasks.crawl_collection",
92 bind=True,
93 queue="coordinator",
94 base=TaskWithProgress,
95)
96@tracer.start_as_current_span("CrawlCollectionTask.do")
97def crawl_collection(
98 self: "TaskWithProgress",
99 colid: str,
100 source_domain: str,
101 user_name,
102 only_new=False,
103 period: tuple[int, int] = (0, 9999),
104 number: tuple[int, int] = (0, 99999),
105):
106 logger.debug("craw_collection task")
107 user = User.objects.get(username=user_name)
108 collection = Collection.objects.get(pid=colid)
109 event_dict: "HistoryEventDict" = {
110 "type": "import-collection",
111 "pid": f"{source_domain}",
112 "col": collection,
113 "source": source_domain,
114 "status": HistoryEventStatus.PENDING,
115 }
116 try:
117 event_dict["userid"] = user.pk
118 all_cols = get_all_cols()
119 col_data = all_cols[colid]
120 url = col_data["sources"][source_domain]
122 issue_list = crawl_issue_list(source_domain, colid, url, user_name)
123 if not issue_list:
124 raise ValueError("No issue found")
125 issue_list = filter_issues(colid, issue_list, period, number, event_dict, only_new)
127 if issue_list:
128 self.update_state(
129 meta={"current": 0, "total": len(issue_list), "progress": 0, "col": colid},
130 state=states.STARTED,
131 )
132 else:
133 event_dict["message"] = "No issue to update with the selection"
134 logger.info(
135 "%s issues to process for the source: %s and the collection: %s",
136 len(issue_list),
137 source_domain,
138 collection.pid,
139 )
140 for issue in issue_list.values():
141 promise = crawl_issue.delay(issue, source_domain, colid, url, user_name)
142 increment_progress.delay(self.request.id)
143 while not promise.ready():
144 time.sleep(2)
145 pass
146 event_dict["status"] = HistoryEventStatus.OK
148 except BaseException:
149 event_dict["status"] = HistoryEventStatus.ERROR
150 event_dict["message"] = traceback.format_exc()
151 logger.error(event_dict["message"])
152 raise
153 finally:
154 insert_history_event(event_dict)
155 logger.info("history event inserted %s", event_dict["pid"])
158@shared_task(name="crawler.tasks.crawl_issue_list", queue="executor")
159def crawl_issue_list(source_domain: str, colid: str, url: str, username: str):
160 crawler = crawler_factory(source_domain, colid, username)
161 return crawler.crawl_collection()
164@shared_task(name="crawler.tasks.crawl_issue", queue="executor")
165def crawl_issue(issue: "IssueData", source_domain: str, colid: str, url: str, username: str):
166 crawler = crawler_factory(source_domain, colid, username)
167 logger.info("crawl issue: %s, %s", source_domain, colid)
168 crawler.crawl_issue(issue)
171def filter_issues(
172 colid: str,
173 issues: "dict[str, IssueData]",
174 period: tuple[int, int] = (0, 9999),
175 number: tuple[int, int] = (0, 99999),
176 event_dict=None,
177 only_new=False,
178):
179 if event_dict is None:
180 event_dict = {}
182 def is_year_in_range(year):
183 try:
184 return period[0] <= int(year) <= period[1]
185 except ValueError:
186 event_dict["status"] = HistoryEventStatus.ERROR
187 insert_history_event(event_dict)
188 logger.error("Missing the year property for issues in the collection %s", colid)
189 return False
191 def is_number_in_range(n):
192 try:
193 # n can be "1-2"
194 if "-" in n or "–" in n:
195 n = n.split("-" if "-" in n else "–")
196 return number[0] <= int(n[0]) <= int(n[1]) <= number[1]
197 # n can be "3"
198 return number[0] <= int(n) <= number[1]
199 except ValueError:
200 # issue.number is not an integer, nor an integer range
201 event_dict["status"] = HistoryEventStatus.ERROR
202 insert_history_event(event_dict)
203 logger.warning(
204 "The number property for issues in the collection %s is not defined or is not a number",
205 colid,
206 )
207 return False
209 all_containers = Container.objects.filter(my_collection__pid=colid).all()
211 if period == (0, 9999) and number != (0, 99999):
212 # no filter expected for the period, let's filter only on the volume number
213 # we select the issue if the issue.number is "" (not always defined)
214 issues = {
215 pid: issue
216 for pid, issue in issues.items()
217 if is_number_in_range(issue.number) or issue.number == ""
218 }
220 if number == (0, 99999) and period != (0, 9999):
221 # no filter expected for the volume numbers, let's filter only on the period
222 issues = {pid: issue for pid, issue in issues.items() if is_year_in_range(issue.year)}
224 if only_new:
225 issues = {
226 pid: issue
227 for pid, issue in issues.items()
228 if not any(container.pid == pid for container in all_containers)
229 }
230 logger.info("issues filtered: %s", issues)
231 return issues