Coverage for src / crawler / utils.py: 55%
99 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-11 14:57 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-11 14:57 +0000
1import json
2import os
3import re
4import unicodedata
5from datetime import timedelta
6from functools import lru_cache
7from typing import Literal
9import regex
10from bs4 import BeautifulSoup
11from django.conf import settings
12from django.contrib.auth.models import User
13from history.model_data import HistoryEventDict, HistoryEventType
14from history.models import HistoryEventStatus
16# from ptf.models import ResourceId
17from history.utils import insert_history_event
18from ptf import model_helpers
19from ptf.cmds import ptf_cmds
20from ptf.exceptions import ResourceDoesNotExist
21from ptf.model_data import ResourceData, create_extlink, create_publicationdata
22from ptf.models import Collection
23from requests_cache import CachedSession, MongoCache
25from crawler.types import JSONCol
28def insert_crawl_event_in_history(
29 colid: str,
30 source_domain: str,
31 username: str,
32 status: HistoryEventStatus,
33 tasks_count,
34 message: str,
35 event_type: HistoryEventType = "import",
36 title=None,
37):
38 collection = model_helpers.get_collection(colid, sites=False)
39 user = User.objects.get(username=username)
41 event_data: HistoryEventDict = {
42 "type": event_type,
43 "pid": f"{colid}-{source_domain}",
44 "col": collection,
45 "source": source_domain,
46 "status": status,
47 "title": collection.title_html if collection else (title or colid),
48 "userid": user.pk,
49 "type_error": "",
50 "message": message,
51 }
53 insert_history_event(event_data)
56def col_has_source(col: JSONCol, filter: str):
57 return any(source for source in col["sources"] if source == filter)
60def get_cols_by_source(source: str) -> list[JSONCol]:
61 """
62 Get all cols by source
63 @param source: str
64 @return: list of collections
65 """
66 data = get_all_cols()
68 return [col for col in data.values() if col_has_source(col, source)]
71def get_all_cols_by_source():
72 """
73 Get all cols by source
74 @return: dict of collections by source
75 """
76 data = get_all_cols()
78 sources = {}
79 for col in data.values():
80 for source in col["sources"]:
81 if source not in sources:
82 sources[source] = []
83 sources[source].append(col)
85 return sources
88@lru_cache(maxsize=None)
89def get_all_cols() -> dict[str, JSONCol]:
90 with open(
91 os.path.dirname(os.path.abspath(__file__)) + "/data/all_cols.json", encoding="utf8"
92 ) as data_collections:
93 return json.load(data_collections)
96def get_or_create_collection(pid: str):
97 """
98 Creates a Collection based on its pid.
99 The pid has to be in the list of collections given by the Documentation team (CSV then transformed in JSON)
100 """
102 all_collections = get_all_cols()
104 if pid not in all_collections: 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true
105 raise ValueError(f"{pid} is not listed in all_cols.csv")
107 col_data = [item for item in all_collections.items() if item[0] == pid][0][1]
109 collection: Collection | None = model_helpers.get_collection(pid, sites=False)
111 if not collection: 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true
112 p = model_helpers.get_provider("mathdoc-id")
114 xcol = create_publicationdata()
115 xcol.coltype = col_data["type"]
116 xcol.pid = pid
117 xcol.title_tex = col_data["title"]
118 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs
119 # xcol.e_issn = col_data["ISSN_électronique"]
120 # xcol.issn = col_data["ISSN_papier"]
121 xcol.title_html = col_data["title"]
122 xcol.title_xml = f"<title-group><title>{col_data['title']}</title></title-group>"
123 xcol.lang = "en"
125 cmd = ptf_cmds.addCollectionPtfCmd({"xobj": xcol})
126 cmd.set_provider(p)
127 collection = cmd.do()
129 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs
130 # if col_data["ISSN_électronique"] != "":
131 # e_issn = {
132 # "resource_id": collection.resource_ptr_id,
133 # "id_type": "e_issn",
134 # "id_value": col_data["ISSN_électronique"],
135 # }
136 # ResourceId.objects.create(**e_issn)
137 #
138 # if col_data["ISSN_papier"] != "":
139 # issn = {
140 # "resource_id": collection.resource_ptr_id,
141 # "id_type": "issn",
142 # "id_value": col_data["ISSN_papier"],
143 # }
144 # ResourceId.objects.create(**issn)
146 if not collection: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true
147 raise ResourceDoesNotExist(f"Resource {pid} does not exist")
149 return collection
152def cleanup_str(input: str):
153 # some white spaces aren't actual space characters, like \xa0
154 input = unicodedata.normalize("NFKC", input)
155 #
156 input = re.sub(r"[\x7f]+", "", input)
157 # remove useless continuous \n and spaces from the string
158 return re.sub(r"[\n\t\r ]+", " ", input).strip()
161def add_pdf_link_to_xarticle(
162 xarticle: ResourceData,
163 pdf_url: str,
164 mimetype: Literal["application/pdf", "application/x-tex"] = "application/pdf",
165):
166 xarticle.streams.append(
167 {
168 "rel": "full-text",
169 "mimetype": mimetype,
170 "location": pdf_url,
171 "base": "",
172 "text": "Full Text",
173 }
174 )
176 # The pdf url is already added as a stream (just above) but might be replaced by a file later on.
177 # Keep the pdf url as an Extlink if we want to propose both option:
178 # - direct download of a local PDF
179 # - URL to the remote PDF
180 rel = "article-pdf" if mimetype == "application/pdf" else "article-tex"
181 ext_link = create_extlink(rel=rel, location=pdf_url)
182 xarticle.ext_links.append(ext_link)
185def regex_to_dict(pattern: str, value: str, *, error_msg="Regex failed to parse"):
186 issue_search = regex.search(pattern, value)
187 if not issue_search: 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true
188 raise ValueError(error_msg)
190 return issue_search.groupdict()
193def get_base(soup: BeautifulSoup, default: str):
194 base_tag = soup.select_one("head base")
195 if not base_tag: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true
196 return default
197 base = base_tag.get("href")
198 if not isinstance(base, str): 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true
199 raise ValueError("Cannot parse base href")
200 return base
203_session = None
206def get_session(headers={}, match_headers=None):
207 global _session
208 if not _session:
209 _session = CachedSession(
210 match_headers=match_headers,
211 headers=headers,
212 backend=MongoCache(
213 host=getattr(settings, "MONGO_HOSTNAME", "localhost"), decode_content=False
214 ),
215 expire_after=timedelta(days=30),
216 )
217 return _session
220try:
221 from crawler.tests.data_generation.decorators import skip_generation
222except ImportError:
224 def skip_generation(func):
225 def wrapper(*args, **kwargs):
226 return func(*args, **kwargs)
228 return wrapper