Coverage for src / crawler / utils.py: 48%

180 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-02-03 09:36 +0000

1import base64 

2import hashlib 

3import json 

4import logging 

5import os 

6import pickle 

7import re 

8import time 

9import unicodedata 

10from datetime import datetime, timedelta, timezone 

11from functools import lru_cache, wraps 

12from threading import Lock 

13from typing import Callable, Literal, TypeVar, cast 

14from urllib.parse import urlparse 

15 

16import regex 

17from bs4 import BeautifulSoup 

18from django.conf import settings 

19from django.contrib.auth.models import User 

20from history.model_data import HistoryEventDict, HistoryEventType 

21from history.models import HistoryEventStatus 

22 

23# from ptf.models import ResourceId 

24from history.utils import insert_history_event 

25from ptf import model_helpers 

26from ptf.cmds import ptf_cmds 

27from ptf.cmds.xml.xml_utils import escape 

28from ptf.exceptions import ResourceDoesNotExist 

29from ptf.model_data import ResourceData, create_extlink, create_publicationdata 

30from ptf.models import Collection 

31from pymongo import MongoClient 

32from pymongo.errors import DocumentTooLarge 

33from pymongo.synchronous.database import Database 

34from requests import Session 

35from requests_cache import CachedSession, MongoCache 

36 

37from crawler.types import JSONCol 

38 

39logger = logging.getLogger(__name__) 

40 

41 

42def insert_crawl_event_in_history( 

43 colid: str, 

44 source_domain: str, 

45 username: str, 

46 status: HistoryEventStatus, 

47 tasks_count, 

48 message: str, 

49 event_type: HistoryEventType = "import", 

50 title=None, 

51): 

52 collection = model_helpers.get_collection(colid, sites=False) 

53 user = User.objects.get(username=username) 

54 

55 event_data: HistoryEventDict = { 

56 "type": event_type, 

57 "pid": f"{colid}-{source_domain}", 

58 "col": collection, 

59 "source": source_domain, 

60 "status": status, 

61 "title": collection.title_html if collection else (title or colid), 

62 "userid": user.pk, 

63 "type_error": "", 

64 "message": message, 

65 } 

66 

67 insert_history_event(event_data) 

68 

69 

70def col_has_source(col: JSONCol, filter: str): 

71 return any(source for source in col["sources"] if source == filter) 

72 

73 

74def get_cols_by_source(source: str) -> list[JSONCol]: 

75 """ 

76 Get all cols by source 

77 @param source: str 

78 @return: list of collections 

79 """ 

80 data = get_all_cols() 

81 

82 return [col for col in data.values() if col_has_source(col, source)] 

83 

84 

85def get_all_cols_by_source(): 

86 """ 

87 Get all cols by source 

88 @return: dict of collections by source 

89 """ 

90 data = get_all_cols() 

91 

92 sources: dict[str, list] = {} 

93 for col in data.values(): 

94 for source in col["sources"]: 

95 if source not in sources: 

96 sources[source] = [] 

97 sources[source].append(col) 

98 

99 return sources 

100 

101 

102@lru_cache(maxsize=None) 

103def get_all_cols() -> dict[str, JSONCol]: 

104 with open( 

105 os.path.dirname(os.path.abspath(__file__)) + "/data/all_cols.json", encoding="utf8" 

106 ) as data_collections: 

107 return json.load(data_collections) 

108 

109 

110def get_or_create_collection(pid: str): 

111 """ 

112 Creates a Collection based on its pid. 

113 The pid has to be in the list of collections given by the Documentation team (CSV then transformed in JSON) 

114 """ 

115 

116 all_collections = get_all_cols() 

117 

118 if pid not in all_collections: 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true

119 raise ValueError(f"{pid} is not listed in all_cols.csv") 

120 

121 col_data = [item for item in all_collections.items() if item[0] == pid][0][1] 

122 

123 collection: Collection | None = model_helpers.get_collection(pid, sites=False) 

124 

125 if not collection: 125 ↛ 126line 125 didn't jump to line 126 because the condition on line 125 was never true

126 p = model_helpers.get_provider("mathdoc-id") 

127 

128 xcol = create_publicationdata() 

129 xcol.coltype = col_data["type"] 

130 xcol.pid = pid 

131 xcol.title_tex = col_data["title"] 

132 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs 

133 # xcol.e_issn = col_data["ISSN_électronique"] 

134 # xcol.issn = col_data["ISSN_papier"] 

135 xcol.title_html = col_data["title"] 

136 xcol.title_xml = f"<title-group><title>{col_data['title']}</title></title-group>" 

137 xcol.lang = "en" 

138 

139 cmd = ptf_cmds.addCollectionPtfCmd({"xobj": xcol}) 

140 cmd.set_provider(p) 

141 collection = cmd.do() 

142 

143 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs 

144 # if col_data["ISSN_électronique"] != "": 

145 # e_issn = { 

146 # "resource_id": collection.resource_ptr_id, 

147 # "id_type": "e_issn", 

148 # "id_value": col_data["ISSN_électronique"], 

149 # } 

150 # ResourceId.objects.create(**e_issn) 

151 # 

152 # if col_data["ISSN_papier"] != "": 

153 # issn = { 

154 # "resource_id": collection.resource_ptr_id, 

155 # "id_type": "issn", 

156 # "id_value": col_data["ISSN_papier"], 

157 # } 

158 # ResourceId.objects.create(**issn) 

159 

160 if not collection: 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true

161 raise ResourceDoesNotExist(f"Resource {pid} does not exist") 

162 

163 return collection 

164 

165 

166def cleanup_str(input: str): 

167 # some white spaces aren't actual space characters, like \xa0 

168 input = unicodedata.normalize("NFKC", input) 

169 # 

170 input = re.sub(r"[\x7f]+", "", input) 

171 # remove useless continuous \n and spaces from the string 

172 input = re.sub(r"[\n\t\r ]+", " ", input).strip() 

173 return escape(input) 

174 

175 

176def add_pdf_link_to_xarticle( 

177 xarticle: ResourceData, 

178 pdf_url: str, 

179 mimetype: Literal["application/pdf", "application/x-tex", "text/html"] = "application/pdf", 

180): 

181 xarticle.streams.append( 

182 { 

183 "rel": "full-text", 

184 "mimetype": mimetype, 

185 "location": pdf_url, 

186 "base": "", 

187 "text": "Full Text", 

188 } 

189 ) 

190 

191 # The pdf url is already added as a stream (just above) but might be replaced by a file later on. 

192 # Keep the pdf url as an Extlink if we want to propose both option: 

193 # - direct download of a local PDF 

194 # - URL to the remote PDF 

195 if mimetype == "application/pdf": 195 ↛ 197line 195 didn't jump to line 197 because the condition on line 195 was always true

196 rel = "article-pdf" 

197 elif mimetype == "text/html": 

198 rel = "article-html" 

199 else: 

200 rel = "article-tex" 

201 ext_link = create_extlink(rel=rel, location=pdf_url) 

202 xarticle.ext_links.append(ext_link) 

203 

204 

205def regex_to_dict(pattern: str, value: str, *, error_msg="Regex failed to parse"): 

206 issue_search = regex.search(pattern, value) 

207 if not issue_search: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true

208 raise ValueError(error_msg) 

209 

210 return issue_search.groupdict() 

211 

212 

213def get_base(soup: BeautifulSoup, default: str): 

214 base_tag = soup.select_one("head base") 

215 if not base_tag: 215 ↛ 216line 215 didn't jump to line 216 because the condition on line 215 was never true

216 return default 

217 base = base_tag.get("href") 

218 if not isinstance(base, str): 218 ↛ 219line 218 didn't jump to line 219 because the condition on line 218 was never true

219 raise ValueError("Cannot parse base href") 

220 return base 

221 

222 

223_session = None 

224 

225 

226def get_session(headers={}, match_headers=["Range"]) -> "Session": 

227 global _session 

228 if not _session: 228 ↛ 240line 228 didn't jump to line 240 because the condition on line 228 was always true

229 _session = session_with_delay_factory(CachedSession)( 

230 allowable_methods=("GET", "POST", "HEAD"), 

231 match_headers=match_headers, 

232 headers=headers, 

233 backend=MongoCache( 

234 host=getattr(settings, "MONGO_HOSTNAME", "localhost"), decode_content=False 

235 ), 

236 expire_after=timedelta(days=30), 

237 allowable_codes=(200, 206, 404), 

238 delay=getattr(settings, "REQUESTS_INTERVAL", 90), 

239 ) 

240 return _session 

241 

242 

243try: 

244 from crawler.tests.data_generation.decorators import skip_generation 

245except ImportError: 

246 

247 def skip_generation(func): 

248 def wrapper(*args, **kwargs): 

249 return func(*args, **kwargs) 

250 

251 return wrapper 

252 

253 

254S = TypeVar("S", bound=Session) 

255 

256 

257def session_with_delay_factory(base: type[S]) -> type[S]: 

258 """ 

259 Create a `requests.Session` with a per-hostname delay across threads 

260 

261 ```py 

262 session_with_delay_factory 

263 WARN : Probably not async-safe 

264 """ 

265 

266 class DelayedSession(base): 

267 global_lock: Lock 

268 domain_lock: dict[str, Lock] 

269 delay: int | None 

270 "Optional override delay (in seconds). Uses settings.REQUESTS_INTERVAL, or a default of 90 otherwise" 

271 

272 per_hostname_delay = {"zbmath.org": 10} 

273 

274 def __init__(self, delay=None, *args, **kwargs) -> None: 

275 """ 

276 delay: waits a number of seconds after making the query (if it is not cached)""" 

277 super().__init__(*args, **kwargs) 

278 self.global_lock = Lock() 

279 self.domain_lock = {} 

280 self.delay = delay 

281 

282 def request(self, method: str, url: str, *args, delay: int | None = None, **kwargs): 

283 with self.global_lock: 

284 parsed = urlparse(url).netloc 

285 if parsed not in self.domain_lock: 

286 self.domain_lock[parsed] = Lock() 

287 

288 # prevent parallelism for the same domain 

289 with self.domain_lock[parsed]: 

290 try: 

291 response = super().request(method, url, *args, **kwargs) 

292 except DocumentTooLarge as e: 

293 logger.error(e) 

294 if isinstance(self, CachedSession): 

295 with self.cache_disabled(): 

296 response = self.request(method, url, *args, **kwargs) 

297 if isinstance(self, CachedSession): 

298 if not getattr(response, "from_cache", False): 

299 seconds_to_wait = max( 

300 delay or self.delay or getattr(settings, "REQUESTS_INTERVAL", 90), 

301 self.per_hostname_delay.get("parsed", 0), 

302 ) 

303 if seconds_to_wait: 

304 logger.info( 

305 f"Pausing for {int(seconds_to_wait)}s (resuming at {(datetime.now() + timedelta(seconds=seconds_to_wait)).time()})" 

306 ) 

307 time.sleep(seconds_to_wait) 

308 

309 return response 

310 

311 return cast(type[S], DelayedSession) 

312 

313 

314class PickleSerializer(object): 

315 def serialize(self, obj): 

316 return base64.b64encode(pickle.dumps(obj, protocol=-1)) 

317 

318 def deserialize(self, serialized): 

319 return pickle.loads(base64.b64decode(serialized)) 

320 

321 

322RT = TypeVar("RT") 

323 

324 

325def mongo_cache( 

326 db_conn: Database = MongoClient(host=getattr(settings, "MONGO_HOSTNAME", "localhost"))[ 

327 "crawler_func_cache" 

328 ], 

329 prefix="cache_", 

330 capped=True, 

331 capped_size=1000000000, 

332 hash_keys=True, 

333 serializer=PickleSerializer, 

334) -> Callable[[Callable[..., RT]], Callable[..., RT]]: 

335 """Helper decorator to speedup local development 

336 ```py 

337 

338 

339 @mongo_cache( 

340 db_conn=MongoClient(host=getattr(settings, "MONGO_HOSTNAME", "localhost"))[ 

341 "crawler_func_cache" 

342 ] 

343 ) 

344 def parse_collection_content(self, content): 

345 ... 

346 ``` 

347 """ 

348 

349 def decorator(func: Callable[..., RT]) -> Callable[..., RT]: 

350 serializer_ins = serializer() 

351 col_name = "%s%s" % (prefix, func.__name__) 

352 if capped: 

353 db_conn.create_collection(col_name, capped=capped, size=capped_size) 

354 

355 cache_col = db_conn[col_name] 

356 cache_col.create_index("key", unique=True) 

357 cache_col.create_index("date", expireAfterSeconds=86400) 

358 

359 @wraps(func) 

360 def wrapped_func(*args, **kwargs) -> RT: 

361 cache_key = pickle.dumps((args[1:], kwargs), protocol=-1) 

362 if hash_keys: 

363 cache_key = hashlib.md5(cache_key).hexdigest() 

364 else: 

365 cache_key = base64.b64encode(cache_key) 

366 

367 cached_obj = cache_col.find_one(dict(key=cache_key)) 

368 if cached_obj: 

369 return serializer_ins.deserialize(cached_obj["result"]) 

370 

371 ret = func(*args, **kwargs) 

372 cache_col.update_one( 

373 {"key": cache_key}, 

374 { 

375 "$set": { 

376 "result": serializer_ins.serialize(ret), 

377 "date": datetime.now(timezone.utc), 

378 } 

379 }, 

380 upsert=True, 

381 ) 

382 

383 return ret 

384 

385 return wrapped_func 

386 

387 return decorator