Coverage for src / crawler / utils.py: 48%

182 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-03-19 14:59 +0000

1import base64 

2import hashlib 

3import json 

4import logging 

5import os 

6import pickle 

7import re 

8import time 

9import unicodedata 

10from datetime import datetime, timedelta, timezone 

11from functools import lru_cache, wraps 

12from threading import Lock 

13from typing import Callable, Literal, TypeVar, cast 

14from urllib.parse import urlparse 

15 

16import regex 

17from bs4 import BeautifulSoup 

18from django.conf import settings 

19from django.contrib.auth.models import User 

20from history.model_data import HistoryEventDict, HistoryEventType 

21from history.models import HistoryEventStatus 

22 

23# from ptf.models import ResourceId 

24from history.utils import insert_history_event 

25from ptf import model_helpers 

26from ptf.cmds import ptf_cmds 

27from ptf.cmds.xml.xml_utils import escape 

28from ptf.exceptions import ResourceDoesNotExist 

29from ptf.model_data import ResourceData, create_extlink, create_publicationdata 

30from ptf.models import Collection 

31from pymongo import MongoClient 

32from pymongo.errors import DocumentTooLarge 

33from pymongo.synchronous.database import Database 

34from requests import Session 

35from requests_cache import CachedSession, MongoCache 

36 

37from crawler.types import JSONCol 

38 

39logger = logging.getLogger(__name__) 

40 

41 

42def insert_crawl_event_in_history( 

43 colid: str, 

44 source_domain: str, 

45 username: str, 

46 status: HistoryEventStatus, 

47 tasks_count, 

48 message: str, 

49 event_type: HistoryEventType = "import", 

50 title=None, 

51): 

52 collection = model_helpers.get_collection(colid, sites=False) 

53 user = User.objects.get(username=username) 

54 

55 event_data: HistoryEventDict = { 

56 "type": event_type, 

57 "pid": f"{colid}-{source_domain}", 

58 "col": collection, 

59 "source": source_domain, 

60 "status": status, 

61 "title": collection.title_html if collection else (title or colid), 

62 "userid": user.pk, 

63 "type_error": "", 

64 "message": message, 

65 } 

66 

67 insert_history_event(event_data) 

68 

69 

70def col_has_source(col: JSONCol, filter: str): 

71 return any(source for source in col["sources"] if source == filter) 

72 

73 

74def get_cols_by_source(source: str) -> list[JSONCol]: 

75 """ 

76 Get all cols by source 

77 @param source: str 

78 @return: list of collections 

79 """ 

80 data = get_all_cols() 

81 

82 return [col for col in data.values() if col_has_source(col, source)] 

83 

84 

85def get_all_cols_by_source(): 

86 """ 

87 Get all cols by source 

88 @return: dict of collections by source 

89 """ 

90 data = get_all_cols() 

91 

92 sources: dict[str, list] = {} 

93 for col in data.values(): 

94 for source in col["sources"]: 

95 if source not in sources: 

96 sources[source] = [] 

97 sources[source].append(col) 

98 

99 return sources 

100 

101 

102@lru_cache(maxsize=None) 

103def get_all_cols() -> dict[str, JSONCol]: 

104 with open( 

105 os.path.dirname(os.path.abspath(__file__)) + "/data/all_cols.json", encoding="utf8" 

106 ) as data_collections: 

107 return json.load(data_collections) 

108 

109 

110def get_or_create_collection(pid: str): 

111 """ 

112 Creates a Collection based on its pid. 

113 The pid has to be in the list of collections given by the Documentation team (CSV then transformed in JSON) 

114 """ 

115 

116 all_collections = get_all_cols() 

117 

118 if pid not in all_collections: 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true

119 raise ValueError(f"{pid} is not listed in all_cols.csv") 

120 

121 col_data = [item for item in all_collections.items() if item[0] == pid][0][1] 

122 

123 collection: Collection | None = model_helpers.get_collection(pid, sites=False) 

124 

125 if not collection: 125 ↛ 126line 125 didn't jump to line 126 because the condition on line 125 was never true

126 p = model_helpers.get_provider("mathdoc-id") 

127 

128 xcol = create_publicationdata() 

129 xcol.coltype = col_data["type"] 

130 xcol.pid = pid 

131 xcol.title_tex = col_data["title"] 

132 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs 

133 # xcol.e_issn = col_data["ISSN_électronique"] 

134 # xcol.issn = col_data["ISSN_papier"] 

135 xcol.title_html = col_data["title"] 

136 xcol.title_xml = f"<title-group><title>{col_data['title']}</title></title-group>" 

137 xcol.lang = "en" 

138 

139 cmd = ptf_cmds.addCollectionPtfCmd({"xobj": xcol}) 

140 cmd.set_provider(p) 

141 collection = cmd.do() 

142 

143 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs 

144 # if col_data["ISSN_électronique"] != "": 

145 # e_issn = { 

146 # "resource_id": collection.resource_ptr_id, 

147 # "id_type": "e_issn", 

148 # "id_value": col_data["ISSN_électronique"], 

149 # } 

150 # ResourceId.objects.create(**e_issn) 

151 # 

152 # if col_data["ISSN_papier"] != "": 

153 # issn = { 

154 # "resource_id": collection.resource_ptr_id, 

155 # "id_type": "issn", 

156 # "id_value": col_data["ISSN_papier"], 

157 # } 

158 # ResourceId.objects.create(**issn) 

159 

160 if not collection: 160 ↛ 161line 160 didn't jump to line 161 because the condition on line 160 was never true

161 raise ResourceDoesNotExist(f"Resource {pid} does not exist") 

162 

163 return collection 

164 

165 

166def cleanup_str(input: str, unsafe=False): 

167 # some white spaces aren't actual space characters, like \xa0 

168 input = unicodedata.normalize("NFKC", input) 

169 # 

170 input = re.sub(r"[\x7f]+", "", input) 

171 # remove useless continuous \n and spaces from the string 

172 input = re.sub(r"[\n\t\r ]+", " ", input).strip() 

173 if unsafe: 173 ↛ 174line 173 didn't jump to line 174 because the condition on line 173 was never true

174 return input 

175 return escape(input) 

176 

177 

178def add_pdf_link_to_xarticle( 

179 xarticle: ResourceData, 

180 pdf_url: str, 

181 mimetype: Literal["application/pdf", "application/x-tex", "text/html"] = "application/pdf", 

182): 

183 xarticle.streams.append( 

184 { 

185 "rel": "full-text", 

186 "mimetype": mimetype, 

187 "location": pdf_url, 

188 "base": "", 

189 "text": "Full Text", 

190 } 

191 ) 

192 

193 # The pdf url is already added as a stream (just above) but might be replaced by a file later on. 

194 # Keep the pdf url as an Extlink if we want to propose both option: 

195 # - direct download of a local PDF 

196 # - URL to the remote PDF 

197 if mimetype == "application/pdf": 197 ↛ 199line 197 didn't jump to line 199 because the condition on line 197 was always true

198 rel = "article-pdf" 

199 elif mimetype == "text/html": 

200 rel = "article-html" 

201 else: 

202 rel = "article-tex" 

203 ext_link = create_extlink(rel=rel, location=pdf_url) 

204 xarticle.ext_links.append(ext_link) 

205 

206 

207def regex_to_dict(pattern: str, value: str, *, error_msg="Regex failed to parse"): 

208 issue_search = regex.search(pattern, value) 

209 if not issue_search: 209 ↛ 210line 209 didn't jump to line 210 because the condition on line 209 was never true

210 raise ValueError(error_msg) 

211 

212 return issue_search.groupdict() 

213 

214 

215def get_base(soup: BeautifulSoup, default: str): 

216 base_tag = soup.select_one("head base") 

217 if not base_tag: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true

218 return default 

219 base = base_tag.get("href") 

220 if not isinstance(base, str): 220 ↛ 221line 220 didn't jump to line 221 because the condition on line 220 was never true

221 raise ValueError("Cannot parse base href") 

222 return base 

223 

224 

225_session = None 

226 

227 

228# Singleton to create a session 

229def get_session(headers={}, match_headers=["Range"]) -> "CachedSession": 

230 global _session 

231 if not _session: 231 ↛ 243line 231 didn't jump to line 243 because the condition on line 231 was always true

232 _session = session_with_delay_factory(CachedSession)( 

233 allowable_methods=("GET", "POST", "HEAD"), 

234 match_headers=match_headers, 

235 headers=headers, 

236 backend=MongoCache( 

237 host=getattr(settings, "MONGO_HOSTNAME", "localhost"), decode_content=False 

238 ), 

239 expire_after=timedelta(days=30), 

240 allowable_codes=(200, 206, 404), 

241 delay=getattr(settings, "REQUESTS_INTERVAL", 90), 

242 ) 

243 return _session 

244 

245 

246try: 

247 from crawler.tests.data_generation.decorators import skip_generation 

248except ImportError: 

249 

250 def skip_generation(func): 

251 def wrapper(*args, **kwargs): 

252 return func(*args, **kwargs) 

253 

254 return wrapper 

255 

256 

257S = TypeVar("S", bound=Session) 

258 

259 

260def session_with_delay_factory(base: type[S]) -> type[S]: 

261 """ 

262 Create a `requests.Session` with a per-hostname delay across threads 

263 

264 ```py 

265 session_with_delay_factory 

266 WARN : Probably not async-safe 

267 """ 

268 

269 class DelayedSession(base): 

270 global_lock: Lock 

271 domain_lock: dict[str, Lock] 

272 delay: int | None 

273 "Optional override delay (in seconds). Uses settings.REQUESTS_INTERVAL, or a default of 90 otherwise" 

274 

275 per_hostname_delay = {"zbmath.org": 10} 

276 

277 def __init__(self, delay=None, *args, **kwargs) -> None: 

278 """ 

279 delay: waits a number of seconds after making the query (if it is not cached)""" 

280 super().__init__(*args, **kwargs) 

281 self.global_lock = Lock() 

282 self.domain_lock = {} 

283 self.delay = delay 

284 

285 def request(self, method: str, url: str, *args, delay: int | None = None, **kwargs): 

286 with self.global_lock: 

287 parsed = urlparse(url).netloc 

288 if parsed not in self.domain_lock: 

289 self.domain_lock[parsed] = Lock() 

290 

291 # prevent parallelism for the same domain 

292 with self.domain_lock[parsed]: 

293 try: 

294 response = super().request(method, url, *args, **kwargs) 

295 except DocumentTooLarge as e: 

296 logger.error(e) 

297 if isinstance(self, CachedSession): 

298 with self.cache_disabled(): 

299 response = self.request(method, url, *args, **kwargs) 

300 if isinstance(self, CachedSession): 

301 if not getattr(response, "from_cache", False): 

302 seconds_to_wait = max( 

303 delay or self.delay or getattr(settings, "REQUESTS_INTERVAL", 90), 

304 self.per_hostname_delay.get("parsed", 0), 

305 ) 

306 if seconds_to_wait: 

307 logger.info( 

308 f"Pausing for {int(seconds_to_wait)}s (resuming at {(datetime.now() + timedelta(seconds=seconds_to_wait)).time()})" 

309 ) 

310 time.sleep(seconds_to_wait) 

311 

312 return response 

313 

314 return cast(type[S], DelayedSession) 

315 

316 

317class PickleSerializer(object): 

318 def serialize(self, obj): 

319 return base64.b64encode(pickle.dumps(obj, protocol=-1)) 

320 

321 def deserialize(self, serialized): 

322 return pickle.loads(base64.b64decode(serialized)) 

323 

324 

325RT = TypeVar("RT") 

326 

327 

328def mongo_cache( 

329 db_conn: Database = MongoClient(host=getattr(settings, "MONGO_HOSTNAME", "localhost"))[ 

330 "crawler_func_cache" 

331 ], 

332 prefix="cache_", 

333 capped=True, 

334 capped_size=1000000000, 

335 hash_keys=True, 

336 serializer=PickleSerializer, 

337) -> Callable[[Callable[..., RT]], Callable[..., RT]]: 

338 """Helper decorator to speedup local development 

339 ```py 

340 

341 

342 @mongo_cache( 

343 db_conn=MongoClient(host=getattr(settings, "MONGO_HOSTNAME", "localhost"))[ 

344 "crawler_func_cache" 

345 ] 

346 ) 

347 def parse_collection_content(self, content): 

348 ... 

349 ``` 

350 """ 

351 

352 def decorator(func: Callable[..., RT]) -> Callable[..., RT]: 

353 serializer_ins = serializer() 

354 col_name = "%s%s" % (prefix, func.__name__) 

355 if capped: 

356 db_conn.create_collection(col_name, capped=capped, size=capped_size) 

357 

358 cache_col = db_conn[col_name] 

359 cache_col.create_index("key", unique=True) 

360 cache_col.create_index("date", expireAfterSeconds=86400) 

361 

362 @wraps(func) 

363 def wrapped_func(*args, **kwargs) -> RT: 

364 cache_key = pickle.dumps((args[1:], kwargs), protocol=-1) 

365 if hash_keys: 

366 cache_key = hashlib.md5(cache_key).hexdigest() 

367 else: 

368 cache_key = base64.b64encode(cache_key) 

369 

370 cached_obj = cache_col.find_one(dict(key=cache_key)) 

371 if cached_obj: 

372 return serializer_ins.deserialize(cached_obj["result"]) 

373 

374 ret = func(*args, **kwargs) 

375 cache_col.update_one( 

376 {"key": cache_key}, 

377 { 

378 "$set": { 

379 "result": serializer_ins.serialize(ret), 

380 "date": datetime.now(timezone.utc), 

381 } 

382 }, 

383 upsert=True, 

384 ) 

385 

386 return ret 

387 

388 return wrapped_func 

389 

390 return decorator