Coverage for src / crawler / utils.py: 49%

184 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-06-19 13:33 +0000

1import base64 

2import hashlib 

3import json 

4import logging 

5import os 

6import pickle 

7import re 

8import time 

9import unicodedata 

10from collections.abc import Callable 

11from datetime import datetime, timedelta, timezone 

12from functools import lru_cache, wraps 

13from threading import Lock 

14from typing import Literal, TypeVar, cast 

15from urllib.parse import urlparse 

16 

17import regex 

18from bs4 import BeautifulSoup 

19from django.conf import settings 

20from django.contrib.auth.models import User 

21from history.model_data import HistoryEventDict, HistoryEventType 

22from history.models import HistoryEventStatus 

23 

24# from ptf.models import ResourceId 

25from history.utils import insert_history_event 

26from ptf import model_helpers 

27from ptf.cmds import ptf_cmds 

28from ptf.cmds.xml.xml_utils import escape 

29from ptf.exceptions import ResourceDoesNotExist 

30from ptf.model_data import ResourceData, create_extlink, create_publicationdata 

31from ptf.models import Collection 

32from pymongo import MongoClient 

33from pymongo.errors import DocumentTooLarge 

34from pymongo.synchronous.database import Database 

35from requests import Session 

36from requests_cache import CachedSession, MongoCache 

37 

38from crawler.types import JSONCol 

39 

40logger = logging.getLogger(__name__) 

41 

42 

43def insert_crawl_event_in_history( 

44 colid: str, 

45 source_domain: str, 

46 username: str, 

47 status: HistoryEventStatus, 

48 tasks_count, 

49 message: str, 

50 event_type: HistoryEventType = "import-collection", 

51 title=None, 

52): 

53 collection = model_helpers.get_collection(colid, sites=False) 

54 user = User.objects.get(username=username) 

55 

56 event_data: HistoryEventDict = { 

57 "type": event_type, 

58 "pid": f"{colid}-{source_domain}", 

59 "col": collection, 

60 "source": source_domain, 

61 "status": status, 

62 "title": collection.title_html if collection else (title or colid), 

63 "userid": user.pk, 

64 "type_error": "", 

65 "message": message, 

66 } 

67 

68 insert_history_event(event_data) 

69 

70 

71def col_has_source(col: JSONCol, filter: str): 

72 return any(source for source in col["sources"] if source == filter) 

73 

74 

75def get_cols_by_source(source: str) -> list[JSONCol]: 

76 """ 

77 Get all cols by source 

78 @param source: str 

79 @return: list of collections 

80 """ 

81 data = get_all_cols() 

82 

83 return [col for col in data.values() if col_has_source(col, source)] 

84 

85 

86def get_all_cols_by_source(): 

87 """ 

88 Get all cols by source 

89 @return: dict of collections by source 

90 """ 

91 data = get_all_cols() 

92 

93 sources: dict[str, list] = {} 

94 for col in data.values(): 

95 for source in col["sources"]: 

96 if source not in sources: 

97 sources[source] = [] 

98 sources[source].append(col) 

99 

100 return sources 

101 

102 

103@lru_cache(maxsize=None) 

104def get_all_cols() -> dict[str, JSONCol]: 

105 with open( 

106 os.path.dirname(os.path.abspath(__file__)) + "/data/all_cols.json", encoding="utf8" 

107 ) as data_collections: 

108 return json.load(data_collections) 

109 

110 

111def get_or_create_collection(pid: str): 

112 """ 

113 Creates a Collection based on its pid. 

114 The pid has to be in the list of collections given by the Documentation team (CSV then transformed in JSON) 

115 """ 

116 

117 all_collections = get_all_cols() 

118 

119 if pid not in all_collections: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true

120 raise ValueError(f"{pid} is not listed in all_cols.csv") 

121 

122 col_data = [item for item in all_collections.items() if item[0] == pid][0][1] 

123 

124 collection: Collection | None = model_helpers.get_collection(pid, sites=False) 

125 

126 if not collection: 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true

127 p = model_helpers.get_provider("mathdoc-id") 

128 

129 xcol = create_publicationdata() 

130 xcol.coltype = col_data["type"] 

131 xcol.pid = pid 

132 xcol.title_tex = col_data["title"] 

133 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs 

134 # xcol.e_issn = col_data["ISSN_électronique"] 

135 # xcol.issn = col_data["ISSN_papier"] 

136 xcol.title_html = col_data["title"] 

137 xcol.title_xml = f"<title-group><title>{col_data['title']}</title></title-group>" 

138 xcol.lang = "en" 

139 

140 cmd = ptf_cmds.addCollectionPtfCmd({"xobj": xcol}) 

141 cmd.set_provider(p) 

142 collection = cmd.do() 

143 

144 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs 

145 # if col_data["ISSN_électronique"] != "": 

146 # e_issn = { 

147 # "resource_id": collection.resource_ptr_id, 

148 # "id_type": "e_issn", 

149 # "id_value": col_data["ISSN_électronique"], 

150 # } 

151 # ResourceId.objects.create(**e_issn) 

152 # 

153 # if col_data["ISSN_papier"] != "": 

154 # issn = { 

155 # "resource_id": collection.resource_ptr_id, 

156 # "id_type": "issn", 

157 # "id_value": col_data["ISSN_papier"], 

158 # } 

159 # ResourceId.objects.create(**issn) 

160 

161 if not collection: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true

162 raise ResourceDoesNotExist(f"Resource {pid} does not exist") 

163 

164 return collection 

165 

166 

167def cleanup_str(input: str, unsafe=False): 

168 # some white spaces aren't actual space characters, like \xa0 

169 input = unicodedata.normalize("NFKC", input) 

170 # 

171 input = re.sub(r"[\x7f]+", "", input) 

172 # remove useless continuous \n and spaces from the string 

173 input = re.sub(r"[\n\t\r ]+", " ", input).strip() 

174 if unsafe: 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true

175 return input 

176 return escape(input) 

177 

178 

179def add_pdf_link_to_xarticle( 

180 xarticle: ResourceData, 

181 pdf_url: str, 

182 mimetype: Literal["application/pdf", "application/x-tex", "text/html"] = "application/pdf", 

183): 

184 xarticle.streams.append( 

185 { 

186 "rel": "full-text", 

187 "mimetype": mimetype, 

188 "location": pdf_url, 

189 "base": "", 

190 "text": "Full Text", 

191 } 

192 ) 

193 

194 # The pdf url is already added as a stream (just above) but might be replaced by a file later on. 

195 # Keep the pdf url as an Extlink if we want to propose both option: 

196 # - direct download of a local PDF 

197 # - URL to the remote PDF 

198 if mimetype == "application/pdf": 198 ↛ 200line 198 didn't jump to line 200 because the condition on line 198 was always true

199 rel = "article-pdf" 

200 elif mimetype == "text/html": 

201 rel = "article-html" 

202 else: 

203 rel = "article-tex" 

204 ext_link = create_extlink(rel=rel, location=pdf_url) 

205 xarticle.ext_links.append(ext_link) 

206 

207 

208def regex_to_dict(pattern: str, value: str, *, error_msg="Regex failed to parse"): 

209 issue_search = regex.search(pattern, value) 

210 if not issue_search: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true

211 raise ValueError(error_msg) 

212 

213 return issue_search.groupdict() 

214 

215 

216def get_base(soup: BeautifulSoup, default: str): 

217 base_tag = soup.select_one("head base") 

218 if not base_tag: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true

219 return default 

220 base = base_tag.get("href") 

221 if not isinstance(base, str): 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true

222 raise ValueError("Cannot parse base href") 

223 return base 

224 

225 

226_session = None 

227 

228 

229# Singleton to create a session 

230def get_session(headers={}, match_headers=["Range"]) -> "CachedSession": 

231 global _session 

232 if not _session: 232 ↛ 244line 232 didn't jump to line 244 because the condition on line 232 was always true

233 _session = session_with_delay_factory(CachedSession)( 

234 allowable_methods=("GET", "POST", "HEAD"), 

235 match_headers=match_headers, 

236 headers=headers, 

237 backend=MongoCache( 

238 host=getattr(settings, "MONGO_HOSTNAME", "localhost"), decode_content=False 

239 ), 

240 expire_after=timedelta(days=30), 

241 allowable_codes=(200, 206, 404), 

242 delay=getattr(settings, "REQUESTS_INTERVAL", 90), 

243 ) 

244 return _session 

245 

246 

247try: 

248 from crawler.tests.data_generation.decorators import skip_generation 

249except ImportError: 

250 

251 def skip_generation(func): 

252 def wrapper(*args, **kwargs): 

253 return func(*args, **kwargs) 

254 

255 return wrapper 

256 

257 

258S = TypeVar("S", bound=Session) 

259 

260 

261def session_with_delay_factory(base: type[S]) -> type[S]: 

262 """ 

263 Create a `requests.Session` with a per-hostname delay across threads 

264 

265 ```py 

266 session_with_delay_factory 

267 WARN : Probably not async-safe 

268 """ 

269 

270 class DelayedSession(base): 

271 global_lock: Lock 

272 domain_lock: dict[str, Lock] 

273 delay: int | None 

274 pause_function = staticmethod(time.sleep) 

275 "Optional override delay (in seconds). Uses settings.REQUESTS_INTERVAL, or a default of 90 otherwise" 

276 

277 per_hostname_delay = {"zbmath.org": 10} 

278 

279 def __init__(self, delay=None, *args, **kwargs) -> None: 

280 """ 

281 delay: waits a number of seconds after making the query (if it is not cached)""" 

282 super().__init__(*args, **kwargs) 

283 self.global_lock = Lock() 

284 self.domain_lock = {} 

285 self.delay = delay 

286 

287 def request(self, method: str, url: str, *args, delay: int | None = None, **kwargs): 

288 with self.global_lock: 

289 parsed = urlparse(url).netloc 

290 if parsed not in self.domain_lock: 

291 self.domain_lock[parsed] = Lock() 

292 

293 with self.domain_lock[parsed]: 

294 try: 

295 response = super().request(method, url, *args, **kwargs) 

296 except DocumentTooLarge as e: 

297 logger.error(e) 

298 if isinstance(self, CachedSession): 

299 with self.cache_disabled(): 

300 response = super().request(method, url, *args, **kwargs) 

301 if isinstance(self, CachedSession): 

302 if not getattr(response, "from_cache", False): 

303 seconds_to_wait = max( 

304 delay or self.delay or getattr(settings, "REQUESTS_INTERVAL", 90), 

305 self.per_hostname_delay.get("parsed", 0), 

306 ) 

307 if seconds_to_wait: 

308 logger.info( 

309 f"Pausing for {int(seconds_to_wait)}s (resuming at {(datetime.now() + timedelta(seconds=seconds_to_wait)).time()})" 

310 ) 

311 self.pause_function(seconds_to_wait) 

312 

313 return response 

314 

315 return cast(type[S], DelayedSession) 

316 

317 

318class PickleSerializer: 

319 def serialize(self, obj): 

320 return base64.b64encode(pickle.dumps(obj, protocol=-1)) 

321 

322 def deserialize(self, serialized): 

323 return pickle.loads(base64.b64decode(serialized)) 

324 

325 

326RT = TypeVar("RT") 

327 

328 

329def mongo_cache( 

330 db_conn: Database = MongoClient(host=getattr(settings, "MONGO_HOSTNAME", "localhost"))[ 

331 "crawler_func_cache" 

332 ], 

333 prefix="cache_", 

334 capped=True, 

335 capped_size=1000000000, 

336 hash_keys=True, 

337 serializer=PickleSerializer, 

338) -> Callable[[Callable[..., RT]], Callable[..., RT]]: 

339 """Helper decorator to speedup local development 

340 ```py 

341 

342 

343 @mongo_cache( 

344 db_conn=MongoClient(host=getattr(settings, "MONGO_HOSTNAME", "localhost"))[ 

345 "crawler_func_cache" 

346 ] 

347 ) 

348 def parse_collection_content(self, content): 

349 ... 

350 ``` 

351 """ 

352 

353 def decorator(func: Callable[..., RT]) -> Callable[..., RT]: 

354 serializer_ins = serializer() 

355 col_name = f"{prefix}{func.__name__}" 

356 if capped: 

357 db_conn.create_collection(col_name, capped=capped, size=capped_size) 

358 

359 cache_col = db_conn[col_name] 

360 cache_col.create_index("key", unique=True) 

361 cache_col.create_index("date", expireAfterSeconds=86400) 

362 

363 @wraps(func) 

364 def wrapped_func(*args, **kwargs) -> RT: 

365 cache_key = pickle.dumps((args[1:], kwargs), protocol=-1) 

366 if hash_keys: 

367 cache_key = hashlib.md5(cache_key).hexdigest() 

368 else: 

369 cache_key = base64.b64encode(cache_key) 

370 

371 cached_obj = cache_col.find_one(dict(key=cache_key)) 

372 if cached_obj: 

373 return serializer_ins.deserialize(cached_obj["result"]) 

374 

375 ret = func(*args, **kwargs) 

376 cache_col.update_one( 

377 {"key": cache_key}, 

378 { 

379 "$set": { 

380 "result": serializer_ins.serialize(ret), 

381 "date": datetime.now(timezone.utc), 

382 } 

383 }, 

384 upsert=True, 

385 ) 

386 

387 return ret 

388 

389 return wrapped_func 

390 

391 return decorator