Coverage for src/crawler/utils.py: 45%

87 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-24 10:35 +0000

1import json 

2import os 

3import re 

4import unicodedata 

5from functools import lru_cache 

6 

7import requests 

8from django.contrib.auth.models import User 

9from ptf import model_helpers 

10from ptf.cmds import ptf_cmds 

11from ptf.exceptions import ResourceDoesNotExist 

12from ptf.model_data import ResourceData, create_extlink, create_publicationdata 

13from ptf.models import Collection 

14 

15from crawler.types import JSONCol 

16 

17# from ptf.models import ResourceId 

18 

19# Make dependency on task optional 

20try: 

21 from history.views import insert_history_event 

22except ImportError: 

23 

24 def insert_history_event(_new_event): 

25 pass 

26 

27 

28def insert_crawl_event_in_history( 

29 colid, source_name, username, status, tasks_count, message, event_type="import", title=None 

30): 

31 collection = model_helpers.get_collection(colid) 

32 user = User.objects.get(username=username) 

33 

34 event_data = { 

35 "type": event_type, 

36 "pid": f"{colid}-{source_name}", 

37 "col": colid, 

38 "source": source_name, 

39 "status": status, 

40 "title": collection.title_html if collection else (title or colid), 

41 "userid": user.id, 

42 "type_error": "", 

43 "data": { 

44 "ids_count": tasks_count, 

45 "message": message, 

46 "target": "", 

47 }, 

48 } 

49 

50 insert_history_event(event_data) 

51 

52 

53def col_has_source(col: JSONCol, filter: str): 

54 return any(source for source in col["sources"] if source == filter) 

55 

56 

57def get_cols_by_source(source: str) -> list[JSONCol]: 

58 """ 

59 Get all cols by source 

60 @param source: str 

61 @return: list of collections 

62 """ 

63 data = get_all_cols() 

64 

65 return [col for col in data.values() if col_has_source(col, source)] 

66 

67 

68def get_all_cols_by_source(): 

69 """ 

70 Get all cols by source 

71 @return: dict of collections by source 

72 """ 

73 data = get_all_cols() 

74 

75 sources = {} 

76 for col in data.values(): 

77 for source in col["sources"]: 

78 if source not in sources: 

79 sources[source] = [] 

80 sources[source].append(col) 

81 

82 return sources 

83 

84 

85@lru_cache(maxsize=None) 

86def get_all_cols() -> dict[str, JSONCol]: 

87 with open( 

88 os.path.dirname(os.path.abspath(__file__)) + "/data/all_cols.json", encoding="utf8" 

89 ) as data_collections: 

90 return json.load(data_collections) 

91 

92 

93def get_numdam_collections(): 

94 """ 

95 Returns a list of Numdam collection pids 

96 """ 

97 

98 url = "http://www.numdam.org/api-all-collections/" 

99 

100 response = requests.get(url) 

101 if response.status_code != 200: 

102 return [] 

103 

104 data = response.json() 

105 if "collections" not in data: 

106 return [] 

107 

108 return data["collections"] 

109 

110 

111def get_or_create_collection(pid: str): 

112 """ 

113 Creates a Collection based on its pid. 

114 The pid has to be in the list of collections given by the Documentation team (CSV then transformed in JSON) 

115 """ 

116 

117 all_collections = get_all_cols() 

118 

119 if pid not in all_collections: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true

120 raise ValueError(f"{pid} is not listed in all_cols.csv") 

121 

122 col_data = [item for item in all_collections.items() if item[0] == pid][0][1] 

123 

124 collection: Collection | None = model_helpers.get_collection(pid) 

125 

126 if not collection: 126 ↛ 127line 126 didn't jump to line 127 because the condition on line 126 was never true

127 p = model_helpers.get_provider("mathdoc-id") 

128 

129 xcol = create_publicationdata() 

130 xcol.coltype = "journal" 

131 xcol.pid = pid 

132 xcol.title_tex = col_data["title"] 

133 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs 

134 # xcol.e_issn = col_data["ISSN_électronique"] 

135 # xcol.issn = col_data["ISSN_papier"] 

136 xcol.title_html = col_data["title"] 

137 xcol.title_xml = f"<title-group><title>{col_data['title']}</title></title-group>" 

138 xcol.lang = "en" 

139 

140 cmd = ptf_cmds.addCollectionPtfCmd({"xobj": xcol}) 

141 cmd.set_provider(p) 

142 collection = cmd.do() 

143 

144 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs 

145 # if col_data["ISSN_électronique"] != "": 

146 # e_issn = { 

147 # "resource_id": collection.resource_ptr_id, 

148 # "id_type": "e_issn", 

149 # "id_value": col_data["ISSN_électronique"], 

150 # } 

151 # ResourceId.objects.create(**e_issn) 

152 # 

153 # if col_data["ISSN_papier"] != "": 

154 # issn = { 

155 # "resource_id": collection.resource_ptr_id, 

156 # "id_type": "issn", 

157 # "id_value": col_data["ISSN_papier"], 

158 # } 

159 # ResourceId.objects.create(**issn) 

160 

161 if not collection: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true

162 raise ResourceDoesNotExist(f"Resource {pid} does not exist") 

163 

164 return collection 

165 

166 

167# ??? is this used ? 

168NUMDAM_COLLECTIONS = [ 

169 "ACIRM", 

170 "ALCO", 

171 "AFST", 

172 "AIHPC", 

173 "AIHPA", 

174 "AIHPB", 

175 "AIF", 

176 "AIHP", 

177 "AUG", 

178 "AMPA", 

179 "AHL", 

180 "AMBP", 

181 "ASENS", 

182 "ASCFPA", 

183 "ASCFM", 

184 "ASNSP", 

185 "AST", 

186 "BSMF", 

187 "BSMA", 

188 "CTGDC", 

189 "BURO", 

190 "CSHM", 

191 "CG", 

192 "CM", 

193 "CRMATH", 

194 "CML", 

195 "CJPS", 

196 "CIF", 

197 "DIA", 

198 "COCV", 

199 "M2AN", 

200 "PS", 

201 "GAU", 

202 "GEA", 

203 "STS", 

204 "TAN", 

205 "JSFS", 

206 "JEP", 

207 "JMPA", 

208 "JTNB", 

209 "JEDP", 

210 "CAD", 

211 "CCIRM", 

212 "RCP25", 

213 "MSIA", 

214 "MRR", 

215 "MSH", 

216 "MSMF", 

217 "MSM", 

218 "NAM", 

219 "OJMO", 

220 "PHSC", 

221 "PSMIR", 

222 "PDML", 

223 "PMB", 

224 "PMIHES", 

225 "PMIR", 

226 "RO", 

227 "RCP", 

228 "ITA", 

229 "RSMUP", 

230 "RSA", 

231 "RHM", 

232 "SG", 

233 "SB", 

234 "SBCD", 

235 "SC", 

236 "SCC", 

237 "SAF", 

238 "SDPP", 

239 "SMJ", 

240 "SPHM", 

241 "SPS", 

242 "STNB", 

243 "STNG", 

244 "TSG", 

245 "SD", 

246 "SE", 

247 "SEDP", 

248 "SHC", 

249 "SJ", 

250 "SJL", 

251 "SLSEDP", 

252 "SLDB", 

253 "SL", 

254 "SPK", 

255 "SAC", 

256 "SMS", 

257 "SLS", 

258 "SSL", 

259 "SENL", 

260 "SSS", 

261 "SAD", 

262 "THESE", 

263 "SMAI-JCM", 

264 "WBLN", 

265] 

266 

267 

268def cleanup_str(input: str): 

269 # some white spaces aren't actual space characters, like \xa0 

270 input = unicodedata.normalize("NFKC", input) 

271 # 

272 input = re.sub(r"[\x7f]+", "", input) 

273 # remove useless continuous \n and spaces from the string 

274 return re.sub(r"[\n\t\r ]+", " ", input).strip() 

275 

276 

277def add_pdf_link_to_xarticle(xarticle: ResourceData, pdf_url: str): 

278 xarticle.streams.append( 

279 { 

280 "rel": "full-text", 

281 "mimetype": "application/pdf", 

282 "location": pdf_url, 

283 "base": "", 

284 "text": "Full Text", 

285 } 

286 ) 

287 

288 # The pdf url is already added as a stream (just above) but might be replaced by a file later on. 

289 # Keep the pdf url as an Extlink if we want to propose both option: 

290 # - direct download of a local PDF 

291 # - URL to the remote PDF 

292 ext_link = create_extlink(rel="article-pdf", location=pdf_url) 

293 xarticle.ext_links.append(ext_link) 

294 

295 

296try: 

297 from crawler.tests.data_generation.decorators import skip_generation 

298except ImportError: 

299 

300 def skip_generation(func): 

301 def wrapper(*args, **kwargs): 

302 return func(*args, **kwargs) 

303 

304 return wrapper