Coverage for src/crawler/utils.py: 45%

80 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2025-02-14 14:36 +0000

1import json 

2import os 

3import re 

4import unicodedata 

5from functools import lru_cache 

6 

7import requests 

8from django.contrib.auth.models import User 

9from history.views import insert_history_event 

10from ptf import model_helpers 

11from ptf.cmds import ptf_cmds 

12from ptf.exceptions import ResourceDoesNotExist 

13from ptf.model_data import ResourceData, create_extlink, create_publicationdata 

14from ptf.models import Collection 

15 

16from crawler.types import JSONCol 

17 

18# from ptf.models import ResourceId 

19 

20 

21def insert_crawl_event_in_history( 

22 colid, source_name, username, status, tasks_count, message, event_type="import" 

23): 

24 collection = model_helpers.get_collection(colid) 

25 if collection is None: 

26 UserWarning(f"Collection {colid} cannot be found inside model_helpers") 

27 return 

28 user = User.objects.get(username=username) 

29 

30 event_data = { 

31 "type": event_type, 

32 "pid": f"{colid}-{source_name}", 

33 "col": colid, 

34 "source": source_name, 

35 "status": status, 

36 "title": collection.title_html if collection is not None else "", 

37 "userid": user.id, 

38 "type_error": "", 

39 "data": { 

40 "ids_count": tasks_count, 

41 "message": message, 

42 "target": "", 

43 }, 

44 } 

45 

46 insert_history_event(event_data) 

47 

48 

49def col_has_source(col: JSONCol, filter: str): 

50 return any(source for source in col["sources"] if source == filter) 

51 

52 

53def get_cols_by_source(source: str) -> list[JSONCol]: 

54 """ 

55 Get all cols by source 

56 @param source: str 

57 @return: list of collections 

58 """ 

59 data = get_all_cols() 

60 

61 return [col for col in data.values() if col_has_source(col, source)] 

62 

63 

64def get_all_cols_by_source(): 

65 """ 

66 Get all cols by source 

67 @return: dict of collections by source 

68 """ 

69 data = get_all_cols() 

70 

71 sources = {} 

72 for col in data.values(): 

73 for source in col["sources"]: 

74 if source not in sources: 

75 sources[source] = [] 

76 sources[source].append(col) 

77 

78 return sources 

79 

80 

81@lru_cache(maxsize=None) 

82def get_all_cols() -> dict[str, JSONCol]: 

83 with open( 

84 os.path.dirname(os.path.abspath(__file__)) + "/data/all_cols.json", encoding="utf8" 

85 ) as data_collections: 

86 return json.load(data_collections) 

87 

88 

89def get_numdam_collections(): 

90 """ 

91 Returns a list of Numdam collection pids 

92 """ 

93 

94 url = "http://www.numdam.org/api-all-collections/" 

95 

96 response = requests.get(url) 

97 if response.status_code != 200: 

98 return [] 

99 

100 data = response.json() 

101 if "collections" not in data: 

102 return [] 

103 

104 return data["collections"] 

105 

106 

107def get_or_create_collection(pid: str): 

108 """ 

109 Creates a Collection based on its pid. 

110 The pid has to be in the list of collections given by the Documentation team (CSV then transformed in JSON) 

111 """ 

112 

113 all_collections = get_all_cols() 

114 

115 if pid not in all_collections: 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true

116 raise ValueError(f"{pid} is not listed in all_cols.csv") 

117 

118 col_data = [item for item in all_collections.items() if item[0] == pid][0][1] 

119 

120 collection: Collection | None = model_helpers.get_collection(pid) 

121 

122 if not collection: 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true

123 p = model_helpers.get_provider("mathdoc-id") 

124 

125 xcol = create_publicationdata() 

126 xcol.coltype = "journal" 

127 xcol.pid = pid 

128 xcol.title_tex = col_data["title"] 

129 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs 

130 # xcol.e_issn = col_data["ISSN_électronique"] 

131 # xcol.issn = col_data["ISSN_papier"] 

132 xcol.title_html = col_data["title"] 

133 xcol.title_xml = f"<title-group><title>{col_data['title']}</title></title-group>" 

134 xcol.lang = "en" 

135 

136 cmd = ptf_cmds.addCollectionPtfCmd({"xobj": xcol}) 

137 cmd.set_provider(p) 

138 collection = cmd.do() 

139 

140 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs 

141 # if col_data["ISSN_électronique"] != "": 

142 # e_issn = { 

143 # "resource_id": collection.resource_ptr_id, 

144 # "id_type": "e_issn", 

145 # "id_value": col_data["ISSN_électronique"], 

146 # } 

147 # ResourceId.objects.create(**e_issn) 

148 # 

149 # if col_data["ISSN_papier"] != "": 

150 # issn = { 

151 # "resource_id": collection.resource_ptr_id, 

152 # "id_type": "issn", 

153 # "id_value": col_data["ISSN_papier"], 

154 # } 

155 # ResourceId.objects.create(**issn) 

156 

157 if not collection: 157 ↛ 158line 157 didn't jump to line 158 because the condition on line 157 was never true

158 raise ResourceDoesNotExist(f"Resource {pid} does not exist") 

159 

160 return collection 

161 

162 

163# ??? is this used ? 

164NUMDAM_COLLECTIONS = [ 

165 "ACIRM", 

166 "ALCO", 

167 "AFST", 

168 "AIHPC", 

169 "AIHPA", 

170 "AIHPB", 

171 "AIF", 

172 "AIHP", 

173 "AUG", 

174 "AMPA", 

175 "AHL", 

176 "AMBP", 

177 "ASENS", 

178 "ASCFPA", 

179 "ASCFM", 

180 "ASNSP", 

181 "AST", 

182 "BSMF", 

183 "BSMA", 

184 "CTGDC", 

185 "BURO", 

186 "CSHM", 

187 "CG", 

188 "CM", 

189 "CRMATH", 

190 "CML", 

191 "CJPS", 

192 "CIF", 

193 "DIA", 

194 "COCV", 

195 "M2AN", 

196 "PS", 

197 "GAU", 

198 "GEA", 

199 "STS", 

200 "TAN", 

201 "JSFS", 

202 "JEP", 

203 "JMPA", 

204 "JTNB", 

205 "JEDP", 

206 "CAD", 

207 "CCIRM", 

208 "RCP25", 

209 "MSIA", 

210 "MRR", 

211 "MSH", 

212 "MSMF", 

213 "MSM", 

214 "NAM", 

215 "OJMO", 

216 "PHSC", 

217 "PSMIR", 

218 "PDML", 

219 "PMB", 

220 "PMIHES", 

221 "PMIR", 

222 "RO", 

223 "RCP", 

224 "ITA", 

225 "RSMUP", 

226 "RSA", 

227 "RHM", 

228 "SG", 

229 "SB", 

230 "SBCD", 

231 "SC", 

232 "SCC", 

233 "SAF", 

234 "SDPP", 

235 "SMJ", 

236 "SPHM", 

237 "SPS", 

238 "STNB", 

239 "STNG", 

240 "TSG", 

241 "SD", 

242 "SE", 

243 "SEDP", 

244 "SHC", 

245 "SJ", 

246 "SJL", 

247 "SLSEDP", 

248 "SLDB", 

249 "SL", 

250 "SPK", 

251 "SAC", 

252 "SMS", 

253 "SLS", 

254 "SSL", 

255 "SENL", 

256 "SSS", 

257 "SAD", 

258 "THESE", 

259 "SMAI-JCM", 

260 "WBLN", 

261] 

262 

263 

264def cleanup_str(input: str): 

265 # some white spaces aren't actual space characters, like \xa0 

266 input = unicodedata.normalize("NFKC", input) 

267 # 

268 input = re.sub(r"[\x7f]+", "", input) 

269 # remove useless continuous \n and spaces from the string 

270 return re.sub(r"[\n\t\r ]+", " ", input).strip() 

271 

272 

273def add_pdf_link_to_xarticle(xarticle: ResourceData, pdf_url: str): 

274 data = { 

275 "rel": "full-text", 

276 "mimetype": "application/pdf", 

277 "location": pdf_url, 

278 "base": "", 

279 "text": "Full Text", 

280 } 

281 xarticle.streams.append(data) 

282 

283 # The pdf url is already added as a stream (just above) but might be replaced by a file later on. 

284 # Keep the pdf url as an Extlink if we want to propose both option: 

285 # - direct download of a local PDF 

286 # - URL to the remote PDF 

287 ext_link = create_extlink(rel="article-pdf", location=pdf_url) 

288 xarticle.ext_links.append(ext_link)