Coverage for src / crawler / tasks.py: 0%
90 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-04-08 09:35 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-04-08 09:35 +0000
1import logging
2import time
3import traceback
4from typing import TYPE_CHECKING
6from celery import shared_task, states
7from django.contrib.auth.models import User
8from history.model_data import HistoryEventStatus
9from history.utils import insert_history_event
10from opentelemetry import trace
11from ptf.models import Collection, Container
12from task.custom_task import TaskWithProgress
13from task.tasks import increment_progress
15from crawler.factory import crawler_factory
16from crawler.utils import get_all_cols
18if TYPE_CHECKING:
19 from history.model_data import HistoryEventDict
20 from ptf.model_data import IssueData
22tracer = trace.get_tracer(__name__)
23logger = logging.getLogger(__name__)
26@shared_task(name="crawler.tasks.crawl_source", bind=True, queue="coordinator")
27def crawl_source(source_domain, username):
28 raise NotImplementedError()
29 # cols = get_cols_by_source(source_domain)
30 # pids = [col["pid"] for col in cols]
32 # for pid in pids:
33 # run_task(
34 # CrawlCollectionTask,
35 # pid,
36 # source_domain,
37 # username,
38 # subcall=True,
39 # name=f"{pid}-{source_domain}",
40 # )
41 # run_task(EndCrawlSourceTask, source_domain)
44@shared_task(
45 name="crawler.tasks.crawl_collection",
46 bind=True,
47 queue="coordinator",
48 base=TaskWithProgress,
49)
50@tracer.start_as_current_span("CrawlCollectionTask.do")
51def crawl_collection(
52 self: "TaskWithProgress",
53 colid: str,
54 source_domain: str,
55 user_name,
56 only_new=False,
57 period: tuple[int, int] = (0, 9999),
58 number: tuple[int, int] = (0, 99999),
59):
60 logger.debug("craw_collection task")
61 user = User.objects.get(username=user_name)
62 collection = Collection.objects.get(pid=colid)
63 event_dict: "HistoryEventDict" = {
64 "type": "import-source",
65 "pid": f"{source_domain}",
66 "col": collection,
67 "source": source_domain,
68 "status": HistoryEventStatus.PENDING,
69 }
70 try:
71 event_dict["userid"] = user.pk
72 all_cols = get_all_cols()
73 col_data = all_cols[colid]
74 url = col_data["sources"][source_domain]
76 issue_list = crawl_issue_list(source_domain, colid, url, user_name)
77 if not issue_list:
78 raise ValueError("No issue found")
79 issue_list = filter_issues(colid, issue_list, period, number, event_dict, only_new)
81 if issue_list:
82 self.update_state(
83 meta={"current": 0, "total": len(issue_list), "progress": 0, "col": colid},
84 state=states.STARTED,
85 )
86 else:
87 event_dict["message"] = "No issue to update with the selection"
88 for issue in issue_list.values():
89 logger.info("%s issues to process", len(issue_list))
90 logger.info("task_id! %s", self.request.id)
91 promise = crawl_issue.delay(issue, source_domain, colid, url, user_name)
92 increment_progress.delay(self.request.id)
93 # wait for the craw_issue task to be finished before process the following
94 while not promise.ready():
95 time.sleep(2)
96 pass
97 event_dict["status"] = HistoryEventStatus.OK
99 except BaseException:
100 event_dict["status"] = HistoryEventStatus.ERROR
101 event_dict["message"] = traceback.format_exc()
102 logger.error(event_dict["message"])
103 raise
104 finally:
105 insert_history_event(event_dict)
106 logger.info("history event inserted %s", event_dict["pid"])
109@shared_task(name="crawler.tasks.crawl_issue_list", queue="executor")
110def crawl_issue_list(source_domain: str, colid: str, url: str, username: str):
111 crawler = crawler_factory(source_domain, colid, username)
112 return crawler.crawl_collection()
115@shared_task(name="crawler.tasks.crawl_issue", queue="executor")
116def crawl_issue(issue: "IssueData", source_domain: str, colid: str, url: str, username: str):
117 crawler = crawler_factory(source_domain, colid, username)
118 logger.info("crawl issue: %s, %s", source_domain, colid)
119 crawler.crawl_issue(issue)
122def filter_issues(
123 colid: str,
124 issues: "dict[str, IssueData]",
125 period: tuple[int, int] = (0, 9999),
126 number: tuple[int, int] = (0, 99999),
127 event_dict=None,
128 only_new=False,
129):
130 if event_dict is None:
131 event_dict = {}
133 def is_year_in_range(year):
134 try:
135 return period[0] <= int(year) <= period[1]
136 except ValueError:
137 event_dict["status"] = HistoryEventStatus.ERROR
138 insert_history_event(event_dict)
139 logger.error(
140 "Missing the year or the number property for issues in the collection %s", colid
141 )
142 return False
144 def is_number_in_range(n):
145 try:
146 # n can be "1-2"
147 if "-" in n or "–" in n:
148 n = n.split("-" if "-" in n else "–")
149 return number[0] <= int(n[0]) <= int(n[1]) <= number[1]
150 # n can be "3"
151 return number[0] <= int(n) <= number[1] # TODO test this
152 except ValueError:
153 event_dict["status"] = HistoryEventStatus.ERROR
154 insert_history_event(event_dict)
155 logger.error(
156 "Missing the year or the number property for issues in the collection %s", colid
157 )
158 return False
160 all_containers = Container.objects.filter(my_collection__pid=colid).all()
161 issues = {
162 pid: issue
163 for pid, issue in issues.items()
164 if is_year_in_range(issue.year) and is_number_in_range(issue.number)
165 }
167 if only_new:
168 issues = {
169 pid: issue
170 for pid, issue in issues.items()
171 if not any(container.pid == pid for container in all_containers)
172 }
174 return issues