Coverage for src/crawler/utils.py: 54%
82 statements
« prev ^ index » next coverage.py v7.9.0, created at 2025-11-21 14:41 +0000
« prev ^ index » next coverage.py v7.9.0, created at 2025-11-21 14:41 +0000
1import json
2import os
3import re
4import unicodedata
5from functools import lru_cache
6from typing import Literal
8import regex
9from django.contrib.auth.models import User
10from history.model_data import HistoryEventDict, HistoryEventType
11from history.models import HistoryEventStatus
13# from ptf.models import ResourceId
14from history.utils import insert_history_event
15from ptf import model_helpers
16from ptf.cmds import ptf_cmds
17from ptf.exceptions import ResourceDoesNotExist
18from ptf.model_data import ResourceData, create_extlink, create_publicationdata
19from ptf.models import Collection
21from crawler.types import JSONCol
24def insert_crawl_event_in_history(
25 colid: str,
26 source_domain: str,
27 username: str,
28 status: HistoryEventStatus,
29 tasks_count,
30 message: str,
31 event_type: HistoryEventType = "import",
32 title=None,
33):
34 collection = model_helpers.get_collection(colid, sites=False)
35 user = User.objects.get(username=username)
37 event_data: HistoryEventDict = {
38 "type": event_type,
39 "pid": f"{colid}-{source_domain}",
40 "col": collection,
41 "source": source_domain,
42 "status": status,
43 "title": collection.title_html if collection else (title or colid),
44 "userid": user.pk,
45 "type_error": "",
46 "message": message,
47 }
49 insert_history_event(event_data)
52def col_has_source(col: JSONCol, filter: str):
53 return any(source for source in col["sources"] if source == filter)
56def get_cols_by_source(source: str) -> list[JSONCol]:
57 """
58 Get all cols by source
59 @param source: str
60 @return: list of collections
61 """
62 data = get_all_cols()
64 return [col for col in data.values() if col_has_source(col, source)]
67def get_all_cols_by_source():
68 """
69 Get all cols by source
70 @return: dict of collections by source
71 """
72 data = get_all_cols()
74 sources = {}
75 for col in data.values():
76 for source in col["sources"]:
77 if source not in sources:
78 sources[source] = []
79 sources[source].append(col)
81 return sources
84@lru_cache(maxsize=None)
85def get_all_cols() -> dict[str, JSONCol]:
86 with open(
87 os.path.dirname(os.path.abspath(__file__)) + "/data/all_cols.json", encoding="utf8"
88 ) as data_collections:
89 return json.load(data_collections)
92def get_or_create_collection(pid: str):
93 """
94 Creates a Collection based on its pid.
95 The pid has to be in the list of collections given by the Documentation team (CSV then transformed in JSON)
96 """
98 all_collections = get_all_cols()
100 if pid not in all_collections: 100 ↛ 101line 100 didn't jump to line 101 because the condition on line 100 was never true
101 raise ValueError(f"{pid} is not listed in all_cols.csv")
103 col_data = [item for item in all_collections.items() if item[0] == pid][0][1]
105 collection: Collection | None = model_helpers.get_collection(pid, sites=False)
107 if not collection: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true
108 p = model_helpers.get_provider("mathdoc-id")
110 xcol = create_publicationdata()
111 xcol.coltype = col_data["type"]
112 xcol.pid = pid
113 xcol.title_tex = col_data["title"]
114 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs
115 # xcol.e_issn = col_data["ISSN_électronique"]
116 # xcol.issn = col_data["ISSN_papier"]
117 xcol.title_html = col_data["title"]
118 xcol.title_xml = f"<title-group><title>{col_data['title']}</title></title-group>"
119 xcol.lang = "en"
121 cmd = ptf_cmds.addCollectionPtfCmd({"xobj": xcol})
122 cmd.set_provider(p)
123 collection = cmd.do()
125 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs
126 # if col_data["ISSN_électronique"] != "":
127 # e_issn = {
128 # "resource_id": collection.resource_ptr_id,
129 # "id_type": "e_issn",
130 # "id_value": col_data["ISSN_électronique"],
131 # }
132 # ResourceId.objects.create(**e_issn)
133 #
134 # if col_data["ISSN_papier"] != "":
135 # issn = {
136 # "resource_id": collection.resource_ptr_id,
137 # "id_type": "issn",
138 # "id_value": col_data["ISSN_papier"],
139 # }
140 # ResourceId.objects.create(**issn)
142 if not collection: 142 ↛ 143line 142 didn't jump to line 143 because the condition on line 142 was never true
143 raise ResourceDoesNotExist(f"Resource {pid} does not exist")
145 return collection
148def cleanup_str(input: str):
149 # some white spaces aren't actual space characters, like \xa0
150 input = unicodedata.normalize("NFKC", input)
151 #
152 input = re.sub(r"[\x7f]+", "", input)
153 # remove useless continuous \n and spaces from the string
154 return re.sub(r"[\n\t\r ]+", " ", input).strip()
157def add_pdf_link_to_xarticle(
158 xarticle: ResourceData,
159 pdf_url: str,
160 mimetype: Literal["application/pdf", "application/x-tex"] = "application/pdf",
161):
162 xarticle.streams.append(
163 {
164 "rel": "full-text",
165 "mimetype": mimetype,
166 "location": pdf_url,
167 "base": "",
168 "text": "Full Text",
169 }
170 )
172 # The pdf url is already added as a stream (just above) but might be replaced by a file later on.
173 # Keep the pdf url as an Extlink if we want to propose both option:
174 # - direct download of a local PDF
175 # - URL to the remote PDF
176 rel = "article-pdf" if mimetype == "application/pdf" else "article-tex"
177 ext_link = create_extlink(rel=rel, location=pdf_url)
178 xarticle.ext_links.append(ext_link)
181def regex_to_dict(pattern: str, value: str, *args, error_msg="Regex failed to parse"):
182 issue_search = regex.search(pattern, value)
183 if not issue_search: 183 ↛ 184line 183 didn't jump to line 184 because the condition on line 183 was never true
184 raise ValueError(error_msg)
186 return issue_search.groupdict()
189try:
190 from crawler.tests.data_generation.decorators import skip_generation
191except ImportError:
193 def skip_generation(func):
194 def wrapper(*args, **kwargs):
195 return func(*args, **kwargs)
197 return wrapper