Coverage for src / crawler / utils.py: 48%
182 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-03-19 14:59 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-03-19 14:59 +0000
1import base64
2import hashlib
3import json
4import logging
5import os
6import pickle
7import re
8import time
9import unicodedata
10from datetime import datetime, timedelta, timezone
11from functools import lru_cache, wraps
12from threading import Lock
13from typing import Callable, Literal, TypeVar, cast
14from urllib.parse import urlparse
16import regex
17from bs4 import BeautifulSoup
18from django.conf import settings
19from django.contrib.auth.models import User
20from history.model_data import HistoryEventDict, HistoryEventType
21from history.models import HistoryEventStatus
23# from ptf.models import ResourceId
24from history.utils import insert_history_event
25from ptf import model_helpers
26from ptf.cmds import ptf_cmds
27from ptf.cmds.xml.xml_utils import escape
28from ptf.exceptions import ResourceDoesNotExist
29from ptf.model_data import ResourceData, create_extlink, create_publicationdata
30from ptf.models import Collection
31from pymongo import MongoClient
32from pymongo.errors import DocumentTooLarge
33from pymongo.synchronous.database import Database
34from requests import Session
35from requests_cache import CachedSession, MongoCache
37from crawler.types import JSONCol
39logger = logging.getLogger(__name__)
42def insert_crawl_event_in_history(
43 colid: str,
44 source_domain: str,
45 username: str,
46 status: HistoryEventStatus,
47 tasks_count,
48 message: str,
49 event_type: HistoryEventType = "import",
50 title=None,
51):
52 collection = model_helpers.get_collection(colid, sites=False)
53 user = User.objects.get(username=username)
55 event_data: HistoryEventDict = {
56 "type": event_type,
57 "pid": f"{colid}-{source_domain}",
58 "col": collection,
59 "source": source_domain,
60 "status": status,
61 "title": collection.title_html if collection else (title or colid),
62 "userid": user.pk,
63 "type_error": "",
64 "message": message,
65 }
67 insert_history_event(event_data)
70def col_has_source(col: JSONCol, filter: str):
71 return any(source for source in col["sources"] if source == filter)
74def get_cols_by_source(source: str) -> list[JSONCol]:
75 """
76 Get all cols by source
77 @param source: str
78 @return: list of collections
79 """
80 data = get_all_cols()
82 return [col for col in data.values() if col_has_source(col, source)]
85def get_all_cols_by_source():
86 """
87 Get all cols by source
88 @return: dict of collections by source
89 """
90 data = get_all_cols()
92 sources: dict[str, list] = {}
93 for col in data.values():
94 for source in col["sources"]:
95 if source not in sources:
96 sources[source] = []
97 sources[source].append(col)
99 return sources
102@lru_cache(maxsize=None)
103def get_all_cols() -> dict[str, JSONCol]:
104 with open(
105 os.path.dirname(os.path.abspath(__file__)) + "/data/all_cols.json", encoding="utf8"
106 ) as data_collections:
107 return json.load(data_collections)
110def get_or_create_collection(pid: str):
111 """
112 Creates a Collection based on its pid.
113 The pid has to be in the list of collections given by the Documentation team (CSV then transformed in JSON)
114 """
116 all_collections = get_all_cols()
118 if pid not in all_collections: 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true
119 raise ValueError(f"{pid} is not listed in all_cols.csv")
121 col_data = [item for item in all_collections.items() if item[0] == pid][0][1]
123 collection: Collection | None = model_helpers.get_collection(pid, sites=False)
125 if not collection: 125 ↛ 126line 125 didn't jump to line 126 because the condition on line 125 was never true
126 p = model_helpers.get_provider("mathdoc-id")
128 xcol = create_publicationdata()
129 xcol.coltype = col_data["type"]
130 xcol.pid = pid
131 xcol.title_tex = col_data["title"]
132 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs
133 # xcol.e_issn = col_data["ISSN_électronique"]
134 # xcol.issn = col_data["ISSN_papier"]
135 xcol.title_html = col_data["title"]
136 xcol.title_xml = f"<title-group><title>{col_data['title']}</title></title-group>"
137 xcol.lang = "en"
139 cmd = ptf_cmds.addCollectionPtfCmd({"xobj": xcol})
140 cmd.set_provider(p)
141 collection = cmd.do()
143 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs
144 # if col_data["ISSN_électronique"] != "":
145 # e_issn = {
146 # "resource_id": collection.resource_ptr_id,
147 # "id_type": "e_issn",
148 # "id_value": col_data["ISSN_électronique"],
149 # }
150 # ResourceId.objects.create(**e_issn)
151 #
152 # if col_data["ISSN_papier"] != "":
153 # issn = {
154 # "resource_id": collection.resource_ptr_id,
155 # "id_type": "issn",
156 # "id_value": col_data["ISSN_papier"],
157 # }
158 # ResourceId.objects.create(**issn)
160 if not collection: 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true
161 raise ResourceDoesNotExist(f"Resource {pid} does not exist")
163 return collection
166def cleanup_str(input: str, unsafe=False):
167 # some white spaces aren't actual space characters, like \xa0
168 input = unicodedata.normalize("NFKC", input)
169 #
170 input = re.sub(r"[\x7f]+", "", input)
171 # remove useless continuous \n and spaces from the string
172 input = re.sub(r"[\n\t\r ]+", " ", input).strip()
173 if unsafe: 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true
174 return input
175 return escape(input)
178def add_pdf_link_to_xarticle(
179 xarticle: ResourceData,
180 pdf_url: str,
181 mimetype: Literal["application/pdf", "application/x-tex", "text/html"] = "application/pdf",
182):
183 xarticle.streams.append(
184 {
185 "rel": "full-text",
186 "mimetype": mimetype,
187 "location": pdf_url,
188 "base": "",
189 "text": "Full Text",
190 }
191 )
193 # The pdf url is already added as a stream (just above) but might be replaced by a file later on.
194 # Keep the pdf url as an Extlink if we want to propose both option:
195 # - direct download of a local PDF
196 # - URL to the remote PDF
197 if mimetype == "application/pdf": 197 ↛ 199line 197 didn't jump to line 199 because the condition on line 197 was always true
198 rel = "article-pdf"
199 elif mimetype == "text/html":
200 rel = "article-html"
201 else:
202 rel = "article-tex"
203 ext_link = create_extlink(rel=rel, location=pdf_url)
204 xarticle.ext_links.append(ext_link)
207def regex_to_dict(pattern: str, value: str, *, error_msg="Regex failed to parse"):
208 issue_search = regex.search(pattern, value)
209 if not issue_search: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true
210 raise ValueError(error_msg)
212 return issue_search.groupdict()
215def get_base(soup: BeautifulSoup, default: str):
216 base_tag = soup.select_one("head base")
217 if not base_tag: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true
218 return default
219 base = base_tag.get("href")
220 if not isinstance(base, str): 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true
221 raise ValueError("Cannot parse base href")
222 return base
225_session = None
228# Singleton to create a session
229def get_session(headers={}, match_headers=["Range"]) -> "CachedSession":
230 global _session
231 if not _session: 231 ↛ 243line 231 didn't jump to line 243 because the condition on line 231 was always true
232 _session = session_with_delay_factory(CachedSession)(
233 allowable_methods=("GET", "POST", "HEAD"),
234 match_headers=match_headers,
235 headers=headers,
236 backend=MongoCache(
237 host=getattr(settings, "MONGO_HOSTNAME", "localhost"), decode_content=False
238 ),
239 expire_after=timedelta(days=30),
240 allowable_codes=(200, 206, 404),
241 delay=getattr(settings, "REQUESTS_INTERVAL", 90),
242 )
243 return _session
246try:
247 from crawler.tests.data_generation.decorators import skip_generation
248except ImportError:
250 def skip_generation(func):
251 def wrapper(*args, **kwargs):
252 return func(*args, **kwargs)
254 return wrapper
257S = TypeVar("S", bound=Session)
260def session_with_delay_factory(base: type[S]) -> type[S]:
261 """
262 Create a `requests.Session` with a per-hostname delay across threads
264 ```py
265 session_with_delay_factory
266 WARN : Probably not async-safe
267 """
269 class DelayedSession(base):
270 global_lock: Lock
271 domain_lock: dict[str, Lock]
272 delay: int | None
273 "Optional override delay (in seconds). Uses settings.REQUESTS_INTERVAL, or a default of 90 otherwise"
275 per_hostname_delay = {"zbmath.org": 10}
277 def __init__(self, delay=None, *args, **kwargs) -> None:
278 """
279 delay: waits a number of seconds after making the query (if it is not cached)"""
280 super().__init__(*args, **kwargs)
281 self.global_lock = Lock()
282 self.domain_lock = {}
283 self.delay = delay
285 def request(self, method: str, url: str, *args, delay: int | None = None, **kwargs):
286 with self.global_lock:
287 parsed = urlparse(url).netloc
288 if parsed not in self.domain_lock:
289 self.domain_lock[parsed] = Lock()
291 # prevent parallelism for the same domain
292 with self.domain_lock[parsed]:
293 try:
294 response = super().request(method, url, *args, **kwargs)
295 except DocumentTooLarge as e:
296 logger.error(e)
297 if isinstance(self, CachedSession):
298 with self.cache_disabled():
299 response = self.request(method, url, *args, **kwargs)
300 if isinstance(self, CachedSession):
301 if not getattr(response, "from_cache", False):
302 seconds_to_wait = max(
303 delay or self.delay or getattr(settings, "REQUESTS_INTERVAL", 90),
304 self.per_hostname_delay.get("parsed", 0),
305 )
306 if seconds_to_wait:
307 logger.info(
308 f"Pausing for {int(seconds_to_wait)}s (resuming at {(datetime.now() + timedelta(seconds=seconds_to_wait)).time()})"
309 )
310 time.sleep(seconds_to_wait)
312 return response
314 return cast(type[S], DelayedSession)
317class PickleSerializer(object):
318 def serialize(self, obj):
319 return base64.b64encode(pickle.dumps(obj, protocol=-1))
321 def deserialize(self, serialized):
322 return pickle.loads(base64.b64decode(serialized))
325RT = TypeVar("RT")
328def mongo_cache(
329 db_conn: Database = MongoClient(host=getattr(settings, "MONGO_HOSTNAME", "localhost"))[
330 "crawler_func_cache"
331 ],
332 prefix="cache_",
333 capped=True,
334 capped_size=1000000000,
335 hash_keys=True,
336 serializer=PickleSerializer,
337) -> Callable[[Callable[..., RT]], Callable[..., RT]]:
338 """Helper decorator to speedup local development
339 ```py
342 @mongo_cache(
343 db_conn=MongoClient(host=getattr(settings, "MONGO_HOSTNAME", "localhost"))[
344 "crawler_func_cache"
345 ]
346 )
347 def parse_collection_content(self, content):
348 ...
349 ```
350 """
352 def decorator(func: Callable[..., RT]) -> Callable[..., RT]:
353 serializer_ins = serializer()
354 col_name = "%s%s" % (prefix, func.__name__)
355 if capped:
356 db_conn.create_collection(col_name, capped=capped, size=capped_size)
358 cache_col = db_conn[col_name]
359 cache_col.create_index("key", unique=True)
360 cache_col.create_index("date", expireAfterSeconds=86400)
362 @wraps(func)
363 def wrapped_func(*args, **kwargs) -> RT:
364 cache_key = pickle.dumps((args[1:], kwargs), protocol=-1)
365 if hash_keys:
366 cache_key = hashlib.md5(cache_key).hexdigest()
367 else:
368 cache_key = base64.b64encode(cache_key)
370 cached_obj = cache_col.find_one(dict(key=cache_key))
371 if cached_obj:
372 return serializer_ins.deserialize(cached_obj["result"])
374 ret = func(*args, **kwargs)
375 cache_col.update_one(
376 {"key": cache_key},
377 {
378 "$set": {
379 "result": serializer_ins.serialize(ret),
380 "date": datetime.now(timezone.utc),
381 }
382 },
383 upsert=True,
384 )
386 return ret
388 return wrapped_func
390 return decorator