Coverage for src / crawler / tasks.py: 0%
68 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-03-19 14:59 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-03-19 14:59 +0000
1import logging
2import traceback
3from typing import TYPE_CHECKING
5from celery import shared_task, states
6from django.contrib.auth.models import User
7from history.model_data import HistoryEventStatus
8from history.utils import insert_history_event
9from opentelemetry import trace
10from ptf.models import Container
11from task.tasks import increment_progress
13from crawler.factory import crawler_factory
14from crawler.utils import get_all_cols
16if TYPE_CHECKING:
17 from celery import Task
18 from history.model_data import HistoryEventDict
19 from ptf.model_data import IssueData
21tracer = trace.get_tracer(__name__)
22logger = logging.getLogger(__name__)
25@shared_task(name="crawler.tasks.crawl_source", bind=True, queue="coordinator")
26def crawl_source(source_domain, username):
27 raise NotImplementedError()
28 # cols = get_cols_by_source(source_domain)
29 # pids = [col["pid"] for col in cols]
31 # for pid in pids:
32 # run_task(
33 # CrawlCollectionTask,
34 # pid,
35 # source_domain,
36 # username,
37 # subcall=True,
38 # name=f"{pid}-{source_domain}",
39 # )
40 # run_task(EndCrawlSourceTask, source_domain)
43@shared_task(name="crawler.tasks.test_task", bind=True, queue="coordinator")
44# @tracer.start_as_current_span("CrawlCollectionTask.do")
45def test_task(self):
46 print("TEST TASK")
47 logger.warning("TEST TASK")
50@shared_task(name="crawler.tasks.crawl_collection", bind=True, queue="coordinator")
51@tracer.start_as_current_span("CrawlCollectionTask.do")
52def crawl_collection(
53 self: "Task",
54 colid: str,
55 source_domain: str,
56 username: str,
57 only_new=False,
58 period: tuple[int, int] = (0, 9999),
59 number: tuple[int, int] = (0, 99999),
60):
61 logger.info("craw_collection task")
62 event_dict: "HistoryEventDict" = {
63 "type": "import-source",
64 "pid": f"{source_domain}",
65 "col": None,
66 "source": source_domain,
67 "status": HistoryEventStatus.PENDING,
68 }
69 try:
70 user = User.objects.get(username=username)
71 event_dict["userid"] = user.pk
72 all_cols = get_all_cols()
73 col_data = all_cols[colid]
74 url = col_data["sources"][source_domain]
76 self.update_state(
77 task_id=colid,
78 meta={"progress": 0, "col": colid},
79 state=states.STARTED,
80 )
81 issue_list = crawl_issue_list(source_domain, colid, url, username)
82 if not issue_list:
83 raise ValueError("No issue found")
84 issue_list = filter_issues(colid, issue_list, period, number, only_new)
85 self.update_state(
86 task_id=colid,
87 meta={"current": 0, "total": len(issue_list), "progress": 0, "col": colid},
88 state=states.STARTED,
89 )
90 for issue in issue_list.values():
91 crawl_issue(issue, source_domain, colid, url, username)
92 increment_progress(self, self.request.id)
93 except Exception:
94 event_dict["status"] = HistoryEventStatus.ERROR
95 event_dict["message"] = traceback.format_exc()
96 logger.error(event_dict["message"])
97 raise
98 finally:
99 insert_history_event(event_dict)
100 logger.info("history event inserted %s", event_dict["pid"])
103@shared_task(name="crawler.tasks.crawl_issue_list", queue="executor")
104def crawl_issue_list(source_domain: str, colid: str, url: str, username: str):
105 crawler = crawler_factory(source_domain, colid, username)
106 return crawler.crawl_collection()
109@shared_task(name="crawler.tasks.crawl_issue", queue="executor")
110def crawl_issue(issue: "IssueData", source_domain: str, colid: str, url: str, username: str):
111 crawler = crawler_factory(source_domain, colid, username)
112 crawler.crawl_issue(issue)
115def filter_issues(
116 colid: str,
117 issues: "dict[str, IssueData]",
118 period: tuple[int, int] = (0, 9999),
119 number: tuple[int, int] = (0, 99999),
120 only_new=False,
121):
122 def is_year_in_range(year):
123 return period[0] <= int(year) <= period[1]
125 def is_number_in_range(n):
126 # number can be "1", "1-2"
127 if "-" in n or "–" in n:
128 n = n.split("-" if "-" in n else "–")
129 return number[0] <= int(n[0]) <= int(n[1]) <= number[1]
131 all_containers = Container.objects.filter(my_collection__pid=colid).all()
132 issues = {
133 pid: issue
134 for pid, issue in issues.items()
135 if is_year_in_range(issue.year) and is_number_in_range(issue.number)
136 }
137 # TODO : only crawl newer issues ?
139 if only_new:
140 issues = {
141 pid: issue
142 for pid, issue in issues.items()
143 if not any(container.pid == pid for container in all_containers)
144 }
146 return issues