Coverage for src/crawler/tasks.py: 0%
66 statements
« prev ^ index » next coverage.py v7.9.0, created at 2025-11-21 14:41 +0000
« prev ^ index » next coverage.py v7.9.0, created at 2025-11-21 14:41 +0000
1import logging
2import traceback
3from typing import TYPE_CHECKING
5from celery import shared_task, states
6from django.contrib.auth.models import User
7from history.model_data import HistoryEventStatus
8from history.utils import insert_history_event
9from opentelemetry import trace
10from ptf.models import Container
11from task.tasks import increment_progress
13from crawler.factory import crawler_factory
14from crawler.utils import get_all_cols
16if TYPE_CHECKING:
17 from celery import Task
18 from history.model_data import HistoryEventDict
19 from ptf.model_data import IssueData
21tracer = trace.get_tracer(__name__)
22logger = logging.getLogger(__name__)
25@shared_task(name="crawler.tasks.crawl_source", bind=True, queue="coordinator")
26def crawl_source(source_domain, username):
27 raise NotImplementedError()
28 # cols = get_cols_by_source(source_domain)
29 # pids = [col["pid"] for col in cols]
31 # for pid in pids:
32 # run_task(
33 # CrawlCollectionTask,
34 # pid,
35 # source_domain,
36 # username,
37 # subcall=True,
38 # name=f"{pid}-{source_domain}",
39 # )
40 # run_task(EndCrawlSourceTask, source_domain)
43@shared_task(name="crawler.tasks.crawl_collection", bind=True, queue="coordinator")
44@tracer.start_as_current_span("CrawlCollectionTask.do")
45def crawl_collection(
46 self: "Task",
47 colid: str,
48 source_domain: str,
49 username: str,
50 only_new=False,
51 period: tuple[int, int] = (0, 9999),
52 number: tuple[int, int] = (0, 99999),
53):
54 event_dict: "HistoryEventDict" = {
55 "type": "import-source",
56 "pid": f"{source_domain}",
57 "col": None,
58 "source": source_domain,
59 "status": HistoryEventStatus.PENDING,
60 }
61 try:
62 user = User.objects.get(username=username)
63 event_dict["userid"] = user.pk
64 all_cols = get_all_cols()
65 col_data = all_cols[colid]
66 url = col_data["sources"][source_domain]
68 self.update_state(
69 meta={"progress": 0, "col": colid},
70 state=states.STARTED,
71 )
72 issue_list = crawl_issue_list(source_domain, colid, url, username)
73 if not issue_list:
74 raise ValueError("No issue found")
75 issue_list = filter_issues(colid, issue_list, period, number, only_new)
76 self.update_state(
77 meta={"current": 0, "total": len(issue_list), "progress": 0, "col": colid},
78 state=states.STARTED,
79 )
80 for issue in issue_list.values():
81 crawl_issue(issue, source_domain, colid, url, username)
82 increment_progress(self, self.request.id)
83 except Exception:
84 event_dict["status"] = HistoryEventStatus.ERROR
85 event_dict["message"] = traceback.format_exc()
86 logger.error(event_dict["message"])
87 raise
88 finally:
89 insert_history_event(event_dict)
92@shared_task(name="crawler.tasks.crawl_collection", queue="executor")
93def crawl_issue_list(source_name: str, colid: str, url: str, username: str):
94 crawler = crawler_factory(source_name, colid, username)
95 return crawler.crawl_collection()
98@shared_task(name="crawler.tasks.crawl_issue", queue="executor")
99def crawl_issue(issue: "IssueData", source_name: str, colid: str, url: str, username: str):
100 crawler = crawler_factory(source_name, colid, username)
101 crawler.crawl_issue(issue)
104def filter_issues(
105 colid: str,
106 issues: "dict[str, IssueData]",
107 period: tuple[int, int] = (0, 9999),
108 number: tuple[int, int] = (0, 99999),
109 only_new=False,
110):
111 def is_year_in_range(year):
112 return period[0] <= int(year) <= period[1]
114 def is_number_in_range(n):
115 # number can be "1", "1-2"
116 if "-" in n or "–" in n:
117 n = n.split("-" if "-" in n else "–")
118 return number[0] <= int(n[0]) <= int(n[1]) <= number[1]
120 all_containers = Container.objects.filter(my_collection__pid=colid).all()
121 issues = {
122 pid: issue
123 for pid, issue in issues.items()
124 if is_year_in_range(issue.year) and is_number_in_range(issue.number)
125 }
126 # TODO : only crawl newer issues ?
127 if only_new:
128 issues = {
129 pid: issue
130 for pid, issue in issues.items()
131 if not any(container.pid == pid for container in all_containers)
132 }
134 return issues