Coverage for src/crawler/utils.py: 48%

95 statements  

« prev     ^ index     » next       coverage.py v7.8.2, created at 2025-06-16 07:44 +0000

1import json 

2import os 

3import re 

4import unicodedata 

5from functools import lru_cache 

6from typing import Literal 

7 

8import regex 

9import requests 

10from django.contrib.auth.models import User 

11from ptf import model_helpers 

12from ptf.cmds import ptf_cmds 

13from ptf.exceptions import ResourceDoesNotExist 

14from ptf.model_data import ResourceData, create_extlink, create_publicationdata 

15from ptf.models import Collection 

16 

17from crawler.types import JSONCol 

18 

19# from ptf.models import ResourceId 

20 

21# Make dependency on task optional 

22try: 

23 from history.views import insert_history_event 

24except ImportError: 

25 

26 def insert_history_event(_new_event): 

27 pass 

28 

29 

30def insert_crawl_event_in_history( 

31 colid, source_name, username, status, tasks_count, message, event_type="import", title=None 

32): 

33 collection = model_helpers.get_collection(colid, sites=False) 

34 user = User.objects.get(username=username) 

35 

36 event_data = { 

37 "type": event_type, 

38 "pid": f"{colid}-{source_name}", 

39 "col": colid, 

40 "source": source_name, 

41 "status": status, 

42 "title": collection.title_html if collection else (title or colid), 

43 "userid": user.id, 

44 "type_error": "", 

45 "data": { 

46 "ids_count": tasks_count, 

47 "message": message, 

48 "target": "", 

49 }, 

50 } 

51 

52 insert_history_event(event_data) 

53 

54 

55def col_has_source(col: JSONCol, filter: str): 

56 return any(source for source in col["sources"] if source == filter) 

57 

58 

59def get_cols_by_source(source: str) -> list[JSONCol]: 

60 """ 

61 Get all cols by source 

62 @param source: str 

63 @return: list of collections 

64 """ 

65 data = get_all_cols() 

66 

67 return [col for col in data.values() if col_has_source(col, source)] 

68 

69 

70def get_all_cols_by_source(): 

71 """ 

72 Get all cols by source 

73 @return: dict of collections by source 

74 """ 

75 data = get_all_cols() 

76 

77 sources = {} 

78 for col in data.values(): 

79 for source in col["sources"]: 

80 if source not in sources: 

81 sources[source] = [] 

82 sources[source].append(col) 

83 

84 return sources 

85 

86 

87@lru_cache(maxsize=None) 

88def get_all_cols() -> dict[str, JSONCol]: 

89 with open( 

90 os.path.dirname(os.path.abspath(__file__)) + "/data/all_cols.json", encoding="utf8" 

91 ) as data_collections: 

92 return json.load(data_collections) 

93 

94 

95def get_numdam_collections(): 

96 """ 

97 Returns a list of Numdam collection pids 

98 """ 

99 

100 url = "https://www.numdam.org/api-all-collections/" 

101 

102 response = requests.get(url) 

103 if response.status_code != 200: 

104 return [] 

105 

106 data = response.json() 

107 if "collections" not in data: 

108 return [] 

109 

110 return data["collections"] 

111 

112 

113def get_or_create_collection(pid: str): 

114 """ 

115 Creates a Collection based on its pid. 

116 The pid has to be in the list of collections given by the Documentation team (CSV then transformed in JSON) 

117 """ 

118 

119 all_collections = get_all_cols() 

120 

121 if pid not in all_collections: 121 ↛ 122line 121 didn't jump to line 122 because the condition on line 121 was never true

122 raise ValueError(f"{pid} is not listed in all_cols.csv") 

123 

124 col_data = [item for item in all_collections.items() if item[0] == pid][0][1] 

125 

126 collection: Collection | None = model_helpers.get_collection(pid, sites=False) 

127 

128 if not collection: 128 ↛ 129line 128 didn't jump to line 129 because the condition on line 128 was never true

129 p = model_helpers.get_provider("mathdoc-id") 

130 

131 xcol = create_publicationdata() 

132 xcol.coltype = col_data["type"] 

133 xcol.pid = pid 

134 xcol.title_tex = col_data["title"] 

135 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs 

136 # xcol.e_issn = col_data["ISSN_électronique"] 

137 # xcol.issn = col_data["ISSN_papier"] 

138 xcol.title_html = col_data["title"] 

139 xcol.title_xml = f"<title-group><title>{col_data['title']}</title></title-group>" 

140 xcol.lang = "en" 

141 

142 cmd = ptf_cmds.addCollectionPtfCmd({"xobj": xcol}) 

143 cmd.set_provider(p) 

144 collection = cmd.do() 

145 

146 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs 

147 # if col_data["ISSN_électronique"] != "": 

148 # e_issn = { 

149 # "resource_id": collection.resource_ptr_id, 

150 # "id_type": "e_issn", 

151 # "id_value": col_data["ISSN_électronique"], 

152 # } 

153 # ResourceId.objects.create(**e_issn) 

154 # 

155 # if col_data["ISSN_papier"] != "": 

156 # issn = { 

157 # "resource_id": collection.resource_ptr_id, 

158 # "id_type": "issn", 

159 # "id_value": col_data["ISSN_papier"], 

160 # } 

161 # ResourceId.objects.create(**issn) 

162 

163 if not collection: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true

164 raise ResourceDoesNotExist(f"Resource {pid} does not exist") 

165 

166 return collection 

167 

168 

169# ??? is this used ? 

170NUMDAM_COLLECTIONS = [ 

171 "ACIRM", 

172 "ALCO", 

173 "AFST", 

174 "AIHPC", 

175 "AIHPA", 

176 "AIHPB", 

177 "AIF", 

178 "AIHP", 

179 "AUG", 

180 "AMPA", 

181 "AHL", 

182 "AMBP", 

183 "ASENS", 

184 "ASCFPA", 

185 "ASCFM", 

186 "ASNSP", 

187 "AST", 

188 "BSMF", 

189 "BSMA", 

190 "CTGDC", 

191 "BURO", 

192 "CSHM", 

193 "CG", 

194 "CM", 

195 "CRMATH", 

196 "CML", 

197 "CJPS", 

198 "CIF", 

199 "DIA", 

200 "COCV", 

201 "M2AN", 

202 "PS", 

203 "GAU", 

204 "GEA", 

205 "STS", 

206 "TAN", 

207 "JSFS", 

208 "JEP", 

209 "JMPA", 

210 "JTNB", 

211 "JEDP", 

212 "CAD", 

213 "CCIRM", 

214 "RCP25", 

215 "MSIA", 

216 "MRR", 

217 "MSH", 

218 "MSMF", 

219 "MSM", 

220 "NAM", 

221 "OJMO", 

222 "PHSC", 

223 "PSMIR", 

224 "PDML", 

225 "PMB", 

226 "PMIHES", 

227 "PMIR", 

228 "RO", 

229 "RCP", 

230 "ITA", 

231 "RSMUP", 

232 "RSA", 

233 "RHM", 

234 "SG", 

235 "SB", 

236 "SBCD", 

237 "SC", 

238 "SCC", 

239 "SAF", 

240 "SDPP", 

241 "SMJ", 

242 "SPHM", 

243 "SPS", 

244 "STNB", 

245 "STNG", 

246 "TSG", 

247 "SD", 

248 "SE", 

249 "SEDP", 

250 "SHC", 

251 "SJ", 

252 "SJL", 

253 "SLSEDP", 

254 "SLDB", 

255 "SL", 

256 "SPK", 

257 "SAC", 

258 "SMS", 

259 "SLS", 

260 "SSL", 

261 "SENL", 

262 "SSS", 

263 "SAD", 

264 "THESE", 

265 "SMAI-JCM", 

266 "WBLN", 

267] 

268 

269 

270def cleanup_str(input: str): 

271 # some white spaces aren't actual space characters, like \xa0 

272 input = unicodedata.normalize("NFKC", input) 

273 # 

274 input = re.sub(r"[\x7f]+", "", input) 

275 # remove useless continuous \n and spaces from the string 

276 return re.sub(r"[\n\t\r ]+", " ", input).strip() 

277 

278 

279def add_pdf_link_to_xarticle( 

280 xarticle: ResourceData, 

281 pdf_url: str, 

282 mimetype: Literal["application/pdf", "application/x-tex"] = "application/pdf", 

283): 

284 xarticle.streams.append( 

285 { 

286 "rel": "full-text", 

287 "mimetype": mimetype, 

288 "location": pdf_url, 

289 "base": "", 

290 "text": "Full Text", 

291 } 

292 ) 

293 

294 # The pdf url is already added as a stream (just above) but might be replaced by a file later on. 

295 # Keep the pdf url as an Extlink if we want to propose both option: 

296 # - direct download of a local PDF 

297 # - URL to the remote PDF 

298 rel = "article-pdf" if mimetype == "application/pdf" else "article-tex" 

299 ext_link = create_extlink(rel=rel, location=pdf_url) 

300 xarticle.ext_links.append(ext_link) 

301 

302 

303def regex_to_dict(pattern: str, value: str, *args, error_msg="Regex failed to parse"): 

304 issue_search = regex.search(pattern, value) 

305 if not issue_search: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true

306 raise ValueError(error_msg) 

307 

308 return issue_search.groupdict() 

309 

310 

311try: 

312 from crawler.tests.data_generation.decorators import skip_generation 

313except ImportError: 

314 

315 def skip_generation(func): 

316 def wrapper(*args, **kwargs): 

317 return func(*args, **kwargs) 

318 

319 return wrapper