Coverage for src / crawler / utils.py: 48%
180 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-02-03 09:36 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2026-02-03 09:36 +0000
1import base64
2import hashlib
3import json
4import logging
5import os
6import pickle
7import re
8import time
9import unicodedata
10from datetime import datetime, timedelta, timezone
11from functools import lru_cache, wraps
12from threading import Lock
13from typing import Callable, Literal, TypeVar, cast
14from urllib.parse import urlparse
16import regex
17from bs4 import BeautifulSoup
18from django.conf import settings
19from django.contrib.auth.models import User
20from history.model_data import HistoryEventDict, HistoryEventType
21from history.models import HistoryEventStatus
23# from ptf.models import ResourceId
24from history.utils import insert_history_event
25from ptf import model_helpers
26from ptf.cmds import ptf_cmds
27from ptf.cmds.xml.xml_utils import escape
28from ptf.exceptions import ResourceDoesNotExist
29from ptf.model_data import ResourceData, create_extlink, create_publicationdata
30from ptf.models import Collection
31from pymongo import MongoClient
32from pymongo.errors import DocumentTooLarge
33from pymongo.synchronous.database import Database
34from requests import Session
35from requests_cache import CachedSession, MongoCache
37from crawler.types import JSONCol
39logger = logging.getLogger(__name__)
42def insert_crawl_event_in_history(
43 colid: str,
44 source_domain: str,
45 username: str,
46 status: HistoryEventStatus,
47 tasks_count,
48 message: str,
49 event_type: HistoryEventType = "import",
50 title=None,
51):
52 collection = model_helpers.get_collection(colid, sites=False)
53 user = User.objects.get(username=username)
55 event_data: HistoryEventDict = {
56 "type": event_type,
57 "pid": f"{colid}-{source_domain}",
58 "col": collection,
59 "source": source_domain,
60 "status": status,
61 "title": collection.title_html if collection else (title or colid),
62 "userid": user.pk,
63 "type_error": "",
64 "message": message,
65 }
67 insert_history_event(event_data)
70def col_has_source(col: JSONCol, filter: str):
71 return any(source for source in col["sources"] if source == filter)
74def get_cols_by_source(source: str) -> list[JSONCol]:
75 """
76 Get all cols by source
77 @param source: str
78 @return: list of collections
79 """
80 data = get_all_cols()
82 return [col for col in data.values() if col_has_source(col, source)]
85def get_all_cols_by_source():
86 """
87 Get all cols by source
88 @return: dict of collections by source
89 """
90 data = get_all_cols()
92 sources: dict[str, list] = {}
93 for col in data.values():
94 for source in col["sources"]:
95 if source not in sources:
96 sources[source] = []
97 sources[source].append(col)
99 return sources
102@lru_cache(maxsize=None)
103def get_all_cols() -> dict[str, JSONCol]:
104 with open(
105 os.path.dirname(os.path.abspath(__file__)) + "/data/all_cols.json", encoding="utf8"
106 ) as data_collections:
107 return json.load(data_collections)
110def get_or_create_collection(pid: str):
111 """
112 Creates a Collection based on its pid.
113 The pid has to be in the list of collections given by the Documentation team (CSV then transformed in JSON)
114 """
116 all_collections = get_all_cols()
118 if pid not in all_collections: 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true
119 raise ValueError(f"{pid} is not listed in all_cols.csv")
121 col_data = [item for item in all_collections.items() if item[0] == pid][0][1]
123 collection: Collection | None = model_helpers.get_collection(pid, sites=False)
125 if not collection: 125 ↛ 126line 125 didn't jump to line 126 because the condition on line 125 was never true
126 p = model_helpers.get_provider("mathdoc-id")
128 xcol = create_publicationdata()
129 xcol.coltype = col_data["type"]
130 xcol.pid = pid
131 xcol.title_tex = col_data["title"]
132 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs
133 # xcol.e_issn = col_data["ISSN_électronique"]
134 # xcol.issn = col_data["ISSN_papier"]
135 xcol.title_html = col_data["title"]
136 xcol.title_xml = f"<title-group><title>{col_data['title']}</title></title-group>"
137 xcol.lang = "en"
139 cmd = ptf_cmds.addCollectionPtfCmd({"xobj": xcol})
140 cmd.set_provider(p)
141 collection = cmd.do()
143 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs
144 # if col_data["ISSN_électronique"] != "":
145 # e_issn = {
146 # "resource_id": collection.resource_ptr_id,
147 # "id_type": "e_issn",
148 # "id_value": col_data["ISSN_électronique"],
149 # }
150 # ResourceId.objects.create(**e_issn)
151 #
152 # if col_data["ISSN_papier"] != "":
153 # issn = {
154 # "resource_id": collection.resource_ptr_id,
155 # "id_type": "issn",
156 # "id_value": col_data["ISSN_papier"],
157 # }
158 # ResourceId.objects.create(**issn)
160 if not collection: 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true
161 raise ResourceDoesNotExist(f"Resource {pid} does not exist")
163 return collection
166def cleanup_str(input: str):
167 # some white spaces aren't actual space characters, like \xa0
168 input = unicodedata.normalize("NFKC", input)
169 #
170 input = re.sub(r"[\x7f]+", "", input)
171 # remove useless continuous \n and spaces from the string
172 input = re.sub(r"[\n\t\r ]+", " ", input).strip()
173 return escape(input)
176def add_pdf_link_to_xarticle(
177 xarticle: ResourceData,
178 pdf_url: str,
179 mimetype: Literal["application/pdf", "application/x-tex", "text/html"] = "application/pdf",
180):
181 xarticle.streams.append(
182 {
183 "rel": "full-text",
184 "mimetype": mimetype,
185 "location": pdf_url,
186 "base": "",
187 "text": "Full Text",
188 }
189 )
191 # The pdf url is already added as a stream (just above) but might be replaced by a file later on.
192 # Keep the pdf url as an Extlink if we want to propose both option:
193 # - direct download of a local PDF
194 # - URL to the remote PDF
195 if mimetype == "application/pdf": 195 ↛ 197line 195 didn't jump to line 197 because the condition on line 195 was always true
196 rel = "article-pdf"
197 elif mimetype == "text/html":
198 rel = "article-html"
199 else:
200 rel = "article-tex"
201 ext_link = create_extlink(rel=rel, location=pdf_url)
202 xarticle.ext_links.append(ext_link)
205def regex_to_dict(pattern: str, value: str, *, error_msg="Regex failed to parse"):
206 issue_search = regex.search(pattern, value)
207 if not issue_search: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true
208 raise ValueError(error_msg)
210 return issue_search.groupdict()
213def get_base(soup: BeautifulSoup, default: str):
214 base_tag = soup.select_one("head base")
215 if not base_tag: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true
216 return default
217 base = base_tag.get("href")
218 if not isinstance(base, str): 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true
219 raise ValueError("Cannot parse base href")
220 return base
223_session = None
226def get_session(headers={}, match_headers=["Range"]) -> "Session":
227 global _session
228 if not _session: 228 ↛ 240line 228 didn't jump to line 240 because the condition on line 228 was always true
229 _session = session_with_delay_factory(CachedSession)(
230 allowable_methods=("GET", "POST", "HEAD"),
231 match_headers=match_headers,
232 headers=headers,
233 backend=MongoCache(
234 host=getattr(settings, "MONGO_HOSTNAME", "localhost"), decode_content=False
235 ),
236 expire_after=timedelta(days=30),
237 allowable_codes=(200, 206, 404),
238 delay=getattr(settings, "REQUESTS_INTERVAL", 90),
239 )
240 return _session
243try:
244 from crawler.tests.data_generation.decorators import skip_generation
245except ImportError:
247 def skip_generation(func):
248 def wrapper(*args, **kwargs):
249 return func(*args, **kwargs)
251 return wrapper
254S = TypeVar("S", bound=Session)
257def session_with_delay_factory(base: type[S]) -> type[S]:
258 """
259 Create a `requests.Session` with a per-hostname delay across threads
261 ```py
262 session_with_delay_factory
263 WARN : Probably not async-safe
264 """
266 class DelayedSession(base):
267 global_lock: Lock
268 domain_lock: dict[str, Lock]
269 delay: int | None
270 "Optional override delay (in seconds). Uses settings.REQUESTS_INTERVAL, or a default of 90 otherwise"
272 per_hostname_delay = {"zbmath.org": 10}
274 def __init__(self, delay=None, *args, **kwargs) -> None:
275 """
276 delay: waits a number of seconds after making the query (if it is not cached)"""
277 super().__init__(*args, **kwargs)
278 self.global_lock = Lock()
279 self.domain_lock = {}
280 self.delay = delay
282 def request(self, method: str, url: str, *args, delay: int | None = None, **kwargs):
283 with self.global_lock:
284 parsed = urlparse(url).netloc
285 if parsed not in self.domain_lock:
286 self.domain_lock[parsed] = Lock()
288 # prevent parallelism for the same domain
289 with self.domain_lock[parsed]:
290 try:
291 response = super().request(method, url, *args, **kwargs)
292 except DocumentTooLarge as e:
293 logger.error(e)
294 if isinstance(self, CachedSession):
295 with self.cache_disabled():
296 response = self.request(method, url, *args, **kwargs)
297 if isinstance(self, CachedSession):
298 if not getattr(response, "from_cache", False):
299 seconds_to_wait = max(
300 delay or self.delay or getattr(settings, "REQUESTS_INTERVAL", 90),
301 self.per_hostname_delay.get("parsed", 0),
302 )
303 if seconds_to_wait:
304 logger.info(
305 f"Pausing for {int(seconds_to_wait)}s (resuming at {(datetime.now() + timedelta(seconds=seconds_to_wait)).time()})"
306 )
307 time.sleep(seconds_to_wait)
309 return response
311 return cast(type[S], DelayedSession)
314class PickleSerializer(object):
315 def serialize(self, obj):
316 return base64.b64encode(pickle.dumps(obj, protocol=-1))
318 def deserialize(self, serialized):
319 return pickle.loads(base64.b64decode(serialized))
322RT = TypeVar("RT")
325def mongo_cache(
326 db_conn: Database = MongoClient(host=getattr(settings, "MONGO_HOSTNAME", "localhost"))[
327 "crawler_func_cache"
328 ],
329 prefix="cache_",
330 capped=True,
331 capped_size=1000000000,
332 hash_keys=True,
333 serializer=PickleSerializer,
334) -> Callable[[Callable[..., RT]], Callable[..., RT]]:
335 """Helper decorator to speedup local development
336 ```py
339 @mongo_cache(
340 db_conn=MongoClient(host=getattr(settings, "MONGO_HOSTNAME", "localhost"))[
341 "crawler_func_cache"
342 ]
343 )
344 def parse_collection_content(self, content):
345 ...
346 ```
347 """
349 def decorator(func: Callable[..., RT]) -> Callable[..., RT]:
350 serializer_ins = serializer()
351 col_name = "%s%s" % (prefix, func.__name__)
352 if capped:
353 db_conn.create_collection(col_name, capped=capped, size=capped_size)
355 cache_col = db_conn[col_name]
356 cache_col.create_index("key", unique=True)
357 cache_col.create_index("date", expireAfterSeconds=86400)
359 @wraps(func)
360 def wrapped_func(*args, **kwargs) -> RT:
361 cache_key = pickle.dumps((args[1:], kwargs), protocol=-1)
362 if hash_keys:
363 cache_key = hashlib.md5(cache_key).hexdigest()
364 else:
365 cache_key = base64.b64encode(cache_key)
367 cached_obj = cache_col.find_one(dict(key=cache_key))
368 if cached_obj:
369 return serializer_ins.deserialize(cached_obj["result"])
371 ret = func(*args, **kwargs)
372 cache_col.update_one(
373 {"key": cache_key},
374 {
375 "$set": {
376 "result": serializer_ins.serialize(ret),
377 "date": datetime.now(timezone.utc),
378 }
379 },
380 upsert=True,
381 )
383 return ret
385 return wrapped_func
387 return decorator