Coverage for src/crawler/utils.py: 54%
82 statements
« prev ^ index » next coverage.py v7.9.0, created at 2025-10-08 15:14 +0000
« prev ^ index » next coverage.py v7.9.0, created at 2025-10-08 15:14 +0000
1import json
2import os
3import re
4import unicodedata
5from functools import lru_cache
6from typing import Literal
8import regex
9from django.contrib.auth.models import User
10from history.model_data import HistoryEventDict, HistoryEventType
11from history.models import HistoryEvent
13# from ptf.models import ResourceId
14from history.utils import insert_history_event
15from ptf import model_helpers
16from ptf.cmds import ptf_cmds
17from ptf.exceptions import ResourceDoesNotExist
18from ptf.model_data import ResourceData, create_extlink, create_publicationdata
19from ptf.models import Collection
21from crawler.types import JSONCol
24def insert_crawl_event_in_history(
25 colid: str,
26 source_domain: str,
27 username: str,
28 status: HistoryEvent.EventStatusEnum,
29 tasks_count,
30 message: str,
31 event_type: HistoryEventType = "import",
32 title=None,
33):
34 collection = model_helpers.get_collection(colid, sites=False)
35 user = User.objects.get(username=username)
37 event_data: HistoryEventDict = {
38 "type": event_type,
39 "pid": f"{colid}-{source_domain}",
40 "col": collection,
41 "source": source_domain,
42 "status": status,
43 "title": collection.title_html if collection else (title or colid),
44 "userid": user.pk,
45 "type_error": "",
46 "data": {
47 "message": message,
48 },
49 }
51 insert_history_event(event_data)
54def col_has_source(col: JSONCol, filter: str):
55 return any(source for source in col["sources"] if source == filter)
58def get_cols_by_source(source: str) -> list[JSONCol]:
59 """
60 Get all cols by source
61 @param source: str
62 @return: list of collections
63 """
64 data = get_all_cols()
66 return [col for col in data.values() if col_has_source(col, source)]
69def get_all_cols_by_source():
70 """
71 Get all cols by source
72 @return: dict of collections by source
73 """
74 data = get_all_cols()
76 sources = {}
77 for col in data.values():
78 for source in col["sources"]:
79 if source not in sources:
80 sources[source] = []
81 sources[source].append(col)
83 return sources
86@lru_cache(maxsize=None)
87def get_all_cols() -> dict[str, JSONCol]:
88 with open(
89 os.path.dirname(os.path.abspath(__file__)) + "/data/all_cols.json", encoding="utf8"
90 ) as data_collections:
91 return json.load(data_collections)
94def get_or_create_collection(pid: str):
95 """
96 Creates a Collection based on its pid.
97 The pid has to be in the list of collections given by the Documentation team (CSV then transformed in JSON)
98 """
100 all_collections = get_all_cols()
102 if pid not in all_collections: 102 ↛ 103line 102 didn't jump to line 103 because the condition on line 102 was never true
103 raise ValueError(f"{pid} is not listed in all_cols.csv")
105 col_data = [item for item in all_collections.items() if item[0] == pid][0][1]
107 collection: Collection | None = model_helpers.get_collection(pid, sites=False)
109 if not collection: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true
110 p = model_helpers.get_provider("mathdoc-id")
112 xcol = create_publicationdata()
113 xcol.coltype = col_data["type"]
114 xcol.pid = pid
115 xcol.title_tex = col_data["title"]
116 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs
117 # xcol.e_issn = col_data["ISSN_électronique"]
118 # xcol.issn = col_data["ISSN_papier"]
119 xcol.title_html = col_data["title"]
120 xcol.title_xml = f"<title-group><title>{col_data['title']}</title></title-group>"
121 xcol.lang = "en"
123 cmd = ptf_cmds.addCollectionPtfCmd({"xobj": xcol})
124 cmd.set_provider(p)
125 collection = cmd.do()
127 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs
128 # if col_data["ISSN_électronique"] != "":
129 # e_issn = {
130 # "resource_id": collection.resource_ptr_id,
131 # "id_type": "e_issn",
132 # "id_value": col_data["ISSN_électronique"],
133 # }
134 # ResourceId.objects.create(**e_issn)
135 #
136 # if col_data["ISSN_papier"] != "":
137 # issn = {
138 # "resource_id": collection.resource_ptr_id,
139 # "id_type": "issn",
140 # "id_value": col_data["ISSN_papier"],
141 # }
142 # ResourceId.objects.create(**issn)
144 if not collection: 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true
145 raise ResourceDoesNotExist(f"Resource {pid} does not exist")
147 return collection
150def cleanup_str(input: str):
151 # some white spaces aren't actual space characters, like \xa0
152 input = unicodedata.normalize("NFKC", input)
153 #
154 input = re.sub(r"[\x7f]+", "", input)
155 # remove useless continuous \n and spaces from the string
156 return re.sub(r"[\n\t\r ]+", " ", input).strip()
159def add_pdf_link_to_xarticle(
160 xarticle: ResourceData,
161 pdf_url: str,
162 mimetype: Literal["application/pdf", "application/x-tex"] = "application/pdf",
163):
164 xarticle.streams.append(
165 {
166 "rel": "full-text",
167 "mimetype": mimetype,
168 "location": pdf_url,
169 "base": "",
170 "text": "Full Text",
171 }
172 )
174 # The pdf url is already added as a stream (just above) but might be replaced by a file later on.
175 # Keep the pdf url as an Extlink if we want to propose both option:
176 # - direct download of a local PDF
177 # - URL to the remote PDF
178 rel = "article-pdf" if mimetype == "application/pdf" else "article-tex"
179 ext_link = create_extlink(rel=rel, location=pdf_url)
180 xarticle.ext_links.append(ext_link)
183def regex_to_dict(pattern: str, value: str, *args, error_msg="Regex failed to parse"):
184 issue_search = regex.search(pattern, value)
185 if not issue_search: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true
186 raise ValueError(error_msg)
188 return issue_search.groupdict()
191try:
192 from crawler.tests.data_generation.decorators import skip_generation
193except ImportError:
195 def skip_generation(func):
196 def wrapper(*args, **kwargs):
197 return func(*args, **kwargs)
199 return wrapper