Coverage for src / crawler / utils.py: 48%
183 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-04-30 12:41 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-04-30 12:41 +0000
1import base64
2import hashlib
3import json
4import logging
5import os
6import pickle
7import re
8import time
9import unicodedata
10from collections.abc import Callable
11from datetime import datetime, timedelta, timezone
12from functools import lru_cache, wraps
13from threading import Lock
14from typing import Literal, TypeVar, cast
15from urllib.parse import urlparse
17import regex
18from bs4 import BeautifulSoup
19from django.conf import settings
20from django.contrib.auth.models import User
21from history.model_data import HistoryEventDict, HistoryEventType
22from history.models import HistoryEventStatus
24# from ptf.models import ResourceId
25from history.utils import insert_history_event
26from ptf import model_helpers
27from ptf.cmds import ptf_cmds
28from ptf.cmds.xml.xml_utils import escape
29from ptf.exceptions import ResourceDoesNotExist
30from ptf.model_data import ResourceData, create_extlink, create_publicationdata
31from ptf.models import Collection
32from pymongo import MongoClient
33from pymongo.errors import DocumentTooLarge
34from pymongo.synchronous.database import Database
35from requests import Session
36from requests_cache import CachedSession, MongoCache
38from crawler.types import JSONCol
40logger = logging.getLogger(__name__)
43def insert_crawl_event_in_history(
44 colid: str,
45 source_domain: str,
46 username: str,
47 status: HistoryEventStatus,
48 tasks_count,
49 message: str,
50 event_type: HistoryEventType = "import",
51 title=None,
52):
53 collection = model_helpers.get_collection(colid, sites=False)
54 user = User.objects.get(username=username)
56 event_data: HistoryEventDict = {
57 "type": event_type,
58 "pid": f"{colid}-{source_domain}",
59 "col": collection,
60 "source": source_domain,
61 "status": status,
62 "title": collection.title_html if collection else (title or colid),
63 "userid": user.pk,
64 "type_error": "",
65 "message": message,
66 }
68 insert_history_event(event_data)
71def col_has_source(col: JSONCol, filter: str):
72 return any(source for source in col["sources"] if source == filter)
75def get_cols_by_source(source: str) -> list[JSONCol]:
76 """
77 Get all cols by source
78 @param source: str
79 @return: list of collections
80 """
81 data = get_all_cols()
83 return [col for col in data.values() if col_has_source(col, source)]
86def get_all_cols_by_source():
87 """
88 Get all cols by source
89 @return: dict of collections by source
90 """
91 data = get_all_cols()
93 sources: dict[str, list] = {}
94 for col in data.values():
95 for source in col["sources"]:
96 if source not in sources:
97 sources[source] = []
98 sources[source].append(col)
100 return sources
103@lru_cache(maxsize=None)
104def get_all_cols() -> dict[str, JSONCol]:
105 with open(
106 os.path.dirname(os.path.abspath(__file__)) + "/data/all_cols.json", encoding="utf8"
107 ) as data_collections:
108 return json.load(data_collections)
111def get_or_create_collection(pid: str):
112 """
113 Creates a Collection based on its pid.
114 The pid has to be in the list of collections given by the Documentation team (CSV then transformed in JSON)
115 """
117 all_collections = get_all_cols()
119 if pid not in all_collections: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true
120 raise ValueError(f"{pid} is not listed in all_cols.csv")
122 col_data = [item for item in all_collections.items() if item[0] == pid][0][1]
124 collection: Collection | None = model_helpers.get_collection(pid, sites=False)
126 if not collection: 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true
127 p = model_helpers.get_provider("mathdoc-id")
129 xcol = create_publicationdata()
130 xcol.coltype = col_data["type"]
131 xcol.pid = pid
132 xcol.title_tex = col_data["title"]
133 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs
134 # xcol.e_issn = col_data["ISSN_électronique"]
135 # xcol.issn = col_data["ISSN_papier"]
136 xcol.title_html = col_data["title"]
137 xcol.title_xml = f"<title-group><title>{col_data['title']}</title></title-group>"
138 xcol.lang = "en"
140 cmd = ptf_cmds.addCollectionPtfCmd({"xobj": xcol})
141 cmd.set_provider(p)
142 collection = cmd.do()
144 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs
145 # if col_data["ISSN_électronique"] != "":
146 # e_issn = {
147 # "resource_id": collection.resource_ptr_id,
148 # "id_type": "e_issn",
149 # "id_value": col_data["ISSN_électronique"],
150 # }
151 # ResourceId.objects.create(**e_issn)
152 #
153 # if col_data["ISSN_papier"] != "":
154 # issn = {
155 # "resource_id": collection.resource_ptr_id,
156 # "id_type": "issn",
157 # "id_value": col_data["ISSN_papier"],
158 # }
159 # ResourceId.objects.create(**issn)
161 if not collection: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true
162 raise ResourceDoesNotExist(f"Resource {pid} does not exist")
164 return collection
167def cleanup_str(input: str, unsafe=False):
168 # some white spaces aren't actual space characters, like \xa0
169 input = unicodedata.normalize("NFKC", input)
170 #
171 input = re.sub(r"[\x7f]+", "", input)
172 # remove useless continuous \n and spaces from the string
173 input = re.sub(r"[\n\t\r ]+", " ", input).strip()
174 if unsafe: 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true
175 return input
176 return escape(input)
179def add_pdf_link_to_xarticle(
180 xarticle: ResourceData,
181 pdf_url: str,
182 mimetype: Literal["application/pdf", "application/x-tex", "text/html"] = "application/pdf",
183):
184 xarticle.streams.append(
185 {
186 "rel": "full-text",
187 "mimetype": mimetype,
188 "location": pdf_url,
189 "base": "",
190 "text": "Full Text",
191 }
192 )
194 # The pdf url is already added as a stream (just above) but might be replaced by a file later on.
195 # Keep the pdf url as an Extlink if we want to propose both option:
196 # - direct download of a local PDF
197 # - URL to the remote PDF
198 if mimetype == "application/pdf": 198 ↛ 200line 198 didn't jump to line 200 because the condition on line 198 was always true
199 rel = "article-pdf"
200 elif mimetype == "text/html":
201 rel = "article-html"
202 else:
203 rel = "article-tex"
204 ext_link = create_extlink(rel=rel, location=pdf_url)
205 xarticle.ext_links.append(ext_link)
208def regex_to_dict(pattern: str, value: str, *, error_msg="Regex failed to parse"):
209 issue_search = regex.search(pattern, value)
210 if not issue_search: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true
211 raise ValueError(error_msg)
213 return issue_search.groupdict()
216def get_base(soup: BeautifulSoup, default: str):
217 base_tag = soup.select_one("head base")
218 if not base_tag: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true
219 return default
220 base = base_tag.get("href")
221 if not isinstance(base, str): 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true
222 raise ValueError("Cannot parse base href")
223 return base
226_session = None
229# Singleton to create a session
230def get_session(headers={}, match_headers=["Range"]) -> "CachedSession":
231 global _session
232 if not _session: 232 ↛ 244line 232 didn't jump to line 244 because the condition on line 232 was always true
233 _session = session_with_delay_factory(CachedSession)(
234 allowable_methods=("GET", "POST", "HEAD"),
235 match_headers=match_headers,
236 headers=headers,
237 backend=MongoCache(
238 host=getattr(settings, "MONGO_HOSTNAME", "localhost"), decode_content=False
239 ),
240 expire_after=timedelta(days=30),
241 allowable_codes=(200, 206, 404),
242 delay=getattr(settings, "REQUESTS_INTERVAL", 90),
243 )
244 return _session
247try:
248 from crawler.tests.data_generation.decorators import skip_generation
249except ImportError:
251 def skip_generation(func):
252 def wrapper(*args, **kwargs):
253 return func(*args, **kwargs)
255 return wrapper
258S = TypeVar("S", bound=Session)
261def session_with_delay_factory(base: type[S]) -> type[S]:
262 """
263 Create a `requests.Session` with a per-hostname delay across threads
265 ```py
266 session_with_delay_factory
267 WARN : Probably not async-safe
268 """
270 class DelayedSession(base):
271 global_lock: Lock
272 domain_lock: dict[str, Lock]
273 delay: int | None
274 "Optional override delay (in seconds). Uses settings.REQUESTS_INTERVAL, or a default of 90 otherwise"
276 per_hostname_delay = {"zbmath.org": 10}
278 def __init__(self, delay=None, *args, **kwargs) -> None:
279 """
280 delay: waits a number of seconds after making the query (if it is not cached)"""
281 super().__init__(*args, **kwargs)
282 self.global_lock = Lock()
283 self.domain_lock = {}
284 self.delay = delay
286 def request(self, method: str, url: str, *args, delay: int | None = None, **kwargs):
287 with self.global_lock:
288 parsed = urlparse(url).netloc
289 if parsed not in self.domain_lock:
290 self.domain_lock[parsed] = Lock()
292 with self.domain_lock[parsed]:
293 try:
294 response = super().request(method, url, *args, **kwargs)
295 except DocumentTooLarge as e:
296 logger.error(e)
297 if isinstance(self, CachedSession):
298 with self.cache_disabled():
299 response = super().request(method, url, *args, **kwargs)
300 if isinstance(self, CachedSession):
301 if not getattr(response, "from_cache", False):
302 seconds_to_wait = max(
303 delay or self.delay or getattr(settings, "REQUESTS_INTERVAL", 90),
304 self.per_hostname_delay.get("parsed", 0),
305 )
306 if seconds_to_wait:
307 logger.info(
308 f"Pausing for {int(seconds_to_wait)}s (resuming at {(datetime.now() + timedelta(seconds=seconds_to_wait)).time()})"
309 )
310 time.sleep(seconds_to_wait)
312 return response
314 return cast(type[S], DelayedSession)
317class PickleSerializer:
318 def serialize(self, obj):
319 return base64.b64encode(pickle.dumps(obj, protocol=-1))
321 def deserialize(self, serialized):
322 return pickle.loads(base64.b64decode(serialized))
325RT = TypeVar("RT")
328def mongo_cache(
329 db_conn: Database = MongoClient(host=getattr(settings, "MONGO_HOSTNAME", "localhost"))[
330 "crawler_func_cache"
331 ],
332 prefix="cache_",
333 capped=True,
334 capped_size=1000000000,
335 hash_keys=True,
336 serializer=PickleSerializer,
337) -> Callable[[Callable[..., RT]], Callable[..., RT]]:
338 """Helper decorator to speedup local development
339 ```py
342 @mongo_cache(
343 db_conn=MongoClient(host=getattr(settings, "MONGO_HOSTNAME", "localhost"))[
344 "crawler_func_cache"
345 ]
346 )
347 def parse_collection_content(self, content):
348 ...
349 ```
350 """
352 def decorator(func: Callable[..., RT]) -> Callable[..., RT]:
353 serializer_ins = serializer()
354 col_name = f"{prefix}{func.__name__}"
355 if capped:
356 db_conn.create_collection(col_name, capped=capped, size=capped_size)
358 cache_col = db_conn[col_name]
359 cache_col.create_index("key", unique=True)
360 cache_col.create_index("date", expireAfterSeconds=86400)
362 @wraps(func)
363 def wrapped_func(*args, **kwargs) -> RT:
364 cache_key = pickle.dumps((args[1:], kwargs), protocol=-1)
365 if hash_keys:
366 cache_key = hashlib.md5(cache_key).hexdigest()
367 else:
368 cache_key = base64.b64encode(cache_key)
370 cached_obj = cache_col.find_one(dict(key=cache_key))
371 if cached_obj:
372 return serializer_ins.deserialize(cached_obj["result"])
374 ret = func(*args, **kwargs)
375 cache_col.update_one(
376 {"key": cache_key},
377 {
378 "$set": {
379 "result": serializer_ins.serialize(ret),
380 "date": datetime.now(timezone.utc),
381 }
382 },
383 upsert=True,
384 )
386 return ret
388 return wrapped_func
390 return decorator