Coverage for src / crawler / utils.py: 48%

183 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-04-30 12:41 +0000

1import base64 

2import hashlib 

3import json 

4import logging 

5import os 

6import pickle 

7import re 

8import time 

9import unicodedata 

10from collections.abc import Callable 

11from datetime import datetime, timedelta, timezone 

12from functools import lru_cache, wraps 

13from threading import Lock 

14from typing import Literal, TypeVar, cast 

15from urllib.parse import urlparse 

16 

17import regex 

18from bs4 import BeautifulSoup 

19from django.conf import settings 

20from django.contrib.auth.models import User 

21from history.model_data import HistoryEventDict, HistoryEventType 

22from history.models import HistoryEventStatus 

23 

24# from ptf.models import ResourceId 

25from history.utils import insert_history_event 

26from ptf import model_helpers 

27from ptf.cmds import ptf_cmds 

28from ptf.cmds.xml.xml_utils import escape 

29from ptf.exceptions import ResourceDoesNotExist 

30from ptf.model_data import ResourceData, create_extlink, create_publicationdata 

31from ptf.models import Collection 

32from pymongo import MongoClient 

33from pymongo.errors import DocumentTooLarge 

34from pymongo.synchronous.database import Database 

35from requests import Session 

36from requests_cache import CachedSession, MongoCache 

37 

38from crawler.types import JSONCol 

39 

40logger = logging.getLogger(__name__) 

41 

42 

43def insert_crawl_event_in_history( 

44 colid: str, 

45 source_domain: str, 

46 username: str, 

47 status: HistoryEventStatus, 

48 tasks_count, 

49 message: str, 

50 event_type: HistoryEventType = "import", 

51 title=None, 

52): 

53 collection = model_helpers.get_collection(colid, sites=False) 

54 user = User.objects.get(username=username) 

55 

56 event_data: HistoryEventDict = { 

57 "type": event_type, 

58 "pid": f"{colid}-{source_domain}", 

59 "col": collection, 

60 "source": source_domain, 

61 "status": status, 

62 "title": collection.title_html if collection else (title or colid), 

63 "userid": user.pk, 

64 "type_error": "", 

65 "message": message, 

66 } 

67 

68 insert_history_event(event_data) 

69 

70 

71def col_has_source(col: JSONCol, filter: str): 

72 return any(source for source in col["sources"] if source == filter) 

73 

74 

75def get_cols_by_source(source: str) -> list[JSONCol]: 

76 """ 

77 Get all cols by source 

78 @param source: str 

79 @return: list of collections 

80 """ 

81 data = get_all_cols() 

82 

83 return [col for col in data.values() if col_has_source(col, source)] 

84 

85 

86def get_all_cols_by_source(): 

87 """ 

88 Get all cols by source 

89 @return: dict of collections by source 

90 """ 

91 data = get_all_cols() 

92 

93 sources: dict[str, list] = {} 

94 for col in data.values(): 

95 for source in col["sources"]: 

96 if source not in sources: 

97 sources[source] = [] 

98 sources[source].append(col) 

99 

100 return sources 

101 

102 

103@lru_cache(maxsize=None) 

104def get_all_cols() -> dict[str, JSONCol]: 

105 with open( 

106 os.path.dirname(os.path.abspath(__file__)) + "/data/all_cols.json", encoding="utf8" 

107 ) as data_collections: 

108 return json.load(data_collections) 

109 

110 

111def get_or_create_collection(pid: str): 

112 """ 

113 Creates a Collection based on its pid. 

114 The pid has to be in the list of collections given by the Documentation team (CSV then transformed in JSON) 

115 """ 

116 

117 all_collections = get_all_cols() 

118 

119 if pid not in all_collections: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true

120 raise ValueError(f"{pid} is not listed in all_cols.csv") 

121 

122 col_data = [item for item in all_collections.items() if item[0] == pid][0][1] 

123 

124 collection: Collection | None = model_helpers.get_collection(pid, sites=False) 

125 

126 if not collection: 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true

127 p = model_helpers.get_provider("mathdoc-id") 

128 

129 xcol = create_publicationdata() 

130 xcol.coltype = col_data["type"] 

131 xcol.pid = pid 

132 xcol.title_tex = col_data["title"] 

133 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs 

134 # xcol.e_issn = col_data["ISSN_électronique"] 

135 # xcol.issn = col_data["ISSN_papier"] 

136 xcol.title_html = col_data["title"] 

137 xcol.title_xml = f"<title-group><title>{col_data['title']}</title></title-group>" 

138 xcol.lang = "en" 

139 

140 cmd = ptf_cmds.addCollectionPtfCmd({"xobj": xcol}) 

141 cmd.set_provider(p) 

142 collection = cmd.do() 

143 

144 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs 

145 # if col_data["ISSN_électronique"] != "": 

146 # e_issn = { 

147 # "resource_id": collection.resource_ptr_id, 

148 # "id_type": "e_issn", 

149 # "id_value": col_data["ISSN_électronique"], 

150 # } 

151 # ResourceId.objects.create(**e_issn) 

152 # 

153 # if col_data["ISSN_papier"] != "": 

154 # issn = { 

155 # "resource_id": collection.resource_ptr_id, 

156 # "id_type": "issn", 

157 # "id_value": col_data["ISSN_papier"], 

158 # } 

159 # ResourceId.objects.create(**issn) 

160 

161 if not collection: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true

162 raise ResourceDoesNotExist(f"Resource {pid} does not exist") 

163 

164 return collection 

165 

166 

167def cleanup_str(input: str, unsafe=False): 

168 # some white spaces aren't actual space characters, like \xa0 

169 input = unicodedata.normalize("NFKC", input) 

170 # 

171 input = re.sub(r"[\x7f]+", "", input) 

172 # remove useless continuous \n and spaces from the string 

173 input = re.sub(r"[\n\t\r ]+", " ", input).strip() 

174 if unsafe: 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true

175 return input 

176 return escape(input) 

177 

178 

179def add_pdf_link_to_xarticle( 

180 xarticle: ResourceData, 

181 pdf_url: str, 

182 mimetype: Literal["application/pdf", "application/x-tex", "text/html"] = "application/pdf", 

183): 

184 xarticle.streams.append( 

185 { 

186 "rel": "full-text", 

187 "mimetype": mimetype, 

188 "location": pdf_url, 

189 "base": "", 

190 "text": "Full Text", 

191 } 

192 ) 

193 

194 # The pdf url is already added as a stream (just above) but might be replaced by a file later on. 

195 # Keep the pdf url as an Extlink if we want to propose both option: 

196 # - direct download of a local PDF 

197 # - URL to the remote PDF 

198 if mimetype == "application/pdf": 198 ↛ 200line 198 didn't jump to line 200 because the condition on line 198 was always true

199 rel = "article-pdf" 

200 elif mimetype == "text/html": 

201 rel = "article-html" 

202 else: 

203 rel = "article-tex" 

204 ext_link = create_extlink(rel=rel, location=pdf_url) 

205 xarticle.ext_links.append(ext_link) 

206 

207 

208def regex_to_dict(pattern: str, value: str, *, error_msg="Regex failed to parse"): 

209 issue_search = regex.search(pattern, value) 

210 if not issue_search: 210 ↛ 211line 210 didn't jump to line 211 because the condition on line 210 was never true

211 raise ValueError(error_msg) 

212 

213 return issue_search.groupdict() 

214 

215 

216def get_base(soup: BeautifulSoup, default: str): 

217 base_tag = soup.select_one("head base") 

218 if not base_tag: 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true

219 return default 

220 base = base_tag.get("href") 

221 if not isinstance(base, str): 221 ↛ 222line 221 didn't jump to line 222 because the condition on line 221 was never true

222 raise ValueError("Cannot parse base href") 

223 return base 

224 

225 

226_session = None 

227 

228 

229# Singleton to create a session 

230def get_session(headers={}, match_headers=["Range"]) -> "CachedSession": 

231 global _session 

232 if not _session: 232 ↛ 244line 232 didn't jump to line 244 because the condition on line 232 was always true

233 _session = session_with_delay_factory(CachedSession)( 

234 allowable_methods=("GET", "POST", "HEAD"), 

235 match_headers=match_headers, 

236 headers=headers, 

237 backend=MongoCache( 

238 host=getattr(settings, "MONGO_HOSTNAME", "localhost"), decode_content=False 

239 ), 

240 expire_after=timedelta(days=30), 

241 allowable_codes=(200, 206, 404), 

242 delay=getattr(settings, "REQUESTS_INTERVAL", 90), 

243 ) 

244 return _session 

245 

246 

247try: 

248 from crawler.tests.data_generation.decorators import skip_generation 

249except ImportError: 

250 

251 def skip_generation(func): 

252 def wrapper(*args, **kwargs): 

253 return func(*args, **kwargs) 

254 

255 return wrapper 

256 

257 

258S = TypeVar("S", bound=Session) 

259 

260 

261def session_with_delay_factory(base: type[S]) -> type[S]: 

262 """ 

263 Create a `requests.Session` with a per-hostname delay across threads 

264 

265 ```py 

266 session_with_delay_factory 

267 WARN : Probably not async-safe 

268 """ 

269 

270 class DelayedSession(base): 

271 global_lock: Lock 

272 domain_lock: dict[str, Lock] 

273 delay: int | None 

274 "Optional override delay (in seconds). Uses settings.REQUESTS_INTERVAL, or a default of 90 otherwise" 

275 

276 per_hostname_delay = {"zbmath.org": 10} 

277 

278 def __init__(self, delay=None, *args, **kwargs) -> None: 

279 """ 

280 delay: waits a number of seconds after making the query (if it is not cached)""" 

281 super().__init__(*args, **kwargs) 

282 self.global_lock = Lock() 

283 self.domain_lock = {} 

284 self.delay = delay 

285 

286 def request(self, method: str, url: str, *args, delay: int | None = None, **kwargs): 

287 with self.global_lock: 

288 parsed = urlparse(url).netloc 

289 if parsed not in self.domain_lock: 

290 self.domain_lock[parsed] = Lock() 

291 

292 with self.domain_lock[parsed]: 

293 try: 

294 response = super().request(method, url, *args, **kwargs) 

295 except DocumentTooLarge as e: 

296 logger.error(e) 

297 if isinstance(self, CachedSession): 

298 with self.cache_disabled(): 

299 response = super().request(method, url, *args, **kwargs) 

300 if isinstance(self, CachedSession): 

301 if not getattr(response, "from_cache", False): 

302 seconds_to_wait = max( 

303 delay or self.delay or getattr(settings, "REQUESTS_INTERVAL", 90), 

304 self.per_hostname_delay.get("parsed", 0), 

305 ) 

306 if seconds_to_wait: 

307 logger.info( 

308 f"Pausing for {int(seconds_to_wait)}s (resuming at {(datetime.now() + timedelta(seconds=seconds_to_wait)).time()})" 

309 ) 

310 time.sleep(seconds_to_wait) 

311 

312 return response 

313 

314 return cast(type[S], DelayedSession) 

315 

316 

317class PickleSerializer: 

318 def serialize(self, obj): 

319 return base64.b64encode(pickle.dumps(obj, protocol=-1)) 

320 

321 def deserialize(self, serialized): 

322 return pickle.loads(base64.b64decode(serialized)) 

323 

324 

325RT = TypeVar("RT") 

326 

327 

328def mongo_cache( 

329 db_conn: Database = MongoClient(host=getattr(settings, "MONGO_HOSTNAME", "localhost"))[ 

330 "crawler_func_cache" 

331 ], 

332 prefix="cache_", 

333 capped=True, 

334 capped_size=1000000000, 

335 hash_keys=True, 

336 serializer=PickleSerializer, 

337) -> Callable[[Callable[..., RT]], Callable[..., RT]]: 

338 """Helper decorator to speedup local development 

339 ```py 

340 

341 

342 @mongo_cache( 

343 db_conn=MongoClient(host=getattr(settings, "MONGO_HOSTNAME", "localhost"))[ 

344 "crawler_func_cache" 

345 ] 

346 ) 

347 def parse_collection_content(self, content): 

348 ... 

349 ``` 

350 """ 

351 

352 def decorator(func: Callable[..., RT]) -> Callable[..., RT]: 

353 serializer_ins = serializer() 

354 col_name = f"{prefix}{func.__name__}" 

355 if capped: 

356 db_conn.create_collection(col_name, capped=capped, size=capped_size) 

357 

358 cache_col = db_conn[col_name] 

359 cache_col.create_index("key", unique=True) 

360 cache_col.create_index("date", expireAfterSeconds=86400) 

361 

362 @wraps(func) 

363 def wrapped_func(*args, **kwargs) -> RT: 

364 cache_key = pickle.dumps((args[1:], kwargs), protocol=-1) 

365 if hash_keys: 

366 cache_key = hashlib.md5(cache_key).hexdigest() 

367 else: 

368 cache_key = base64.b64encode(cache_key) 

369 

370 cached_obj = cache_col.find_one(dict(key=cache_key)) 

371 if cached_obj: 

372 return serializer_ins.deserialize(cached_obj["result"]) 

373 

374 ret = func(*args, **kwargs) 

375 cache_col.update_one( 

376 {"key": cache_key}, 

377 { 

378 "$set": { 

379 "result": serializer_ins.serialize(ret), 

380 "date": datetime.now(timezone.utc), 

381 } 

382 }, 

383 upsert=True, 

384 ) 

385 

386 return ret 

387 

388 return wrapped_func 

389 

390 return decorator