Coverage for src/crawler/utils.py: 27%

80 statements  

« prev     ^ index     » next       coverage.py v7.6.4, created at 2024-11-20 09:03 +0000

1import json 

2import os 

3import re 

4import unicodedata 

5 

6import requests 

7from django.conf import settings 

8from django.contrib.auth.models import User 

9from history.views import insert_history_event 

10from ptf import model_helpers 

11from ptf.cmds import ptf_cmds 

12from ptf.display import resolver 

13from ptf.exceptions import ResourceDoesNotExist 

14from ptf.model_data import create_publicationdata 

15from ptf.models import Collection 

16 

17from crawler.crawler_types import JSONCol 

18 

19# from ptf.models import ResourceId 

20 

21 

22def insert_crawl_event_in_history( 

23 colid, source_name, username, status, tasks_count, message, event_type="import" 

24): 

25 collection = model_helpers.get_collection(colid) 

26 if collection is None: 

27 UserWarning(f"Collection {colid} cannot be found inside model_helpers") 

28 return 

29 user = User.objects.get(username=username) 

30 

31 event_data = { 

32 "type": event_type, 

33 "pid": f"{colid}-{source_name}", 

34 "col": colid, 

35 "source": source_name, 

36 "status": status, 

37 "title": collection.title_html, 

38 "userid": user.id, 

39 "type_error": "", 

40 "data": { 

41 "ids_count": tasks_count, 

42 "message": message, 

43 "target": "", 

44 }, 

45 } 

46 

47 insert_history_event(event_data) 

48 

49 

50def get_cached_html_folder(collection_id, source_id, container_id=None, article_id=None): 

51 """ 

52 Web crawling (collections, issues, articles) save the downloaded files on disk in settings.HTML_ROOT_FOLDER 

53 The cache is used in case of a second attempt to get the HTML content. 

54 """ 

55 

56 folder = resolver.get_relative_folder(collection_id, container_id, article_id) 

57 

58 if container_id is None and article_id is None: 

59 folder = os.path.join(settings.HTML_ROOT_FOLDER, folder, "html", source_id) 

60 else: 

61 folder = os.path.join(settings.HTML_ROOT_FOLDER, folder, "html") 

62 

63 return folder 

64 

65 

66def col_has_source(col: JSONCol, filter: str): 

67 return any(source for source in col["sources"] if source == filter) 

68 

69 

70def get_cols_by_source(source: str) -> list[JSONCol]: 

71 """ 

72 Get all cols by source 

73 @param source: str 

74 @return: list of collections 

75 """ 

76 data = get_all_cols() 

77 

78 return [col for col in data.values() if col_has_source(col, source)] 

79 

80 

81def get_all_cols() -> dict[str, JSONCol]: 

82 with open( 

83 os.path.dirname(os.path.abspath(__file__)) + "/data/all_cols.json", encoding="utf8" 

84 ) as data_collections: 

85 return json.load(data_collections) 

86 

87 

88def read_json(path): 

89 with open(path, encoding="utf8") as data_collections: 

90 return json.load(data_collections) 

91 

92 

93def get_numdam_collections(): 

94 """ 

95 Returns a list of Numdam collection pids 

96 """ 

97 

98 url = "http://www.numdam.org/api-all-collections/" 

99 

100 response = requests.get(url) 

101 if response.status_code != 200: 

102 return [] 

103 

104 data = response.json() 

105 if "collections" not in data: 

106 return [] 

107 

108 return data["collections"] 

109 

110 

111def get_or_create_collection(pid: str): 

112 """ 

113 Creates a Collection based on its pid. 

114 The pid has to be in the list of collections given by the Documentation team (CSV then transformed in JSON) 

115 """ 

116 

117 all_collections = get_all_cols() 

118 

119 if pid == "DA2": 

120 col_data = { 

121 "pid": "DA2", 

122 "url": "https://discreteanalysisjournal.com", 

123 "title": "Discrete Analysis", 

124 } 

125 elif pid == "ARSIA": 

126 col_data = { 

127 "pid": "ARSIA", 

128 "url": "https://ars-inveniendi-analytica.com", 

129 "title": "Ars Inveniendi Analytica", 

130 } 

131 elif pid == "AMC": 

132 col_data = { 

133 "pid": "AMC", 

134 "url": "https://amc-journal.eu/index.php/amc/issue/archive", 

135 "title": "Ars Mathematica Contemporanea", 

136 } 

137 else: 

138 if pid not in all_collections: 

139 raise ValueError(f"{pid} is not listed in the Eudml collections") 

140 

141 col_data = [item for item in all_collections.items() if item[0] == pid][0][1] 

142 

143 collection: Collection | None = model_helpers.get_collection(pid) 

144 

145 if not collection: 

146 p = model_helpers.get_provider("mathdoc-id") 

147 

148 xcol = create_publicationdata() 

149 xcol.coltype = "journal" 

150 xcol.pid = pid 

151 xcol.title_tex = col_data["title"] 

152 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs 

153 # xcol.e_issn = col_data["ISSN_électronique"] 

154 # xcol.issn = col_data["ISSN_papier"] 

155 xcol.title_html = col_data["title"] 

156 xcol.title_xml = f"<title-group><title>{col_data['title']}</title></title-group>" 

157 xcol.lang = "en" 

158 

159 cmd = ptf_cmds.addCollectionPtfCmd({"xobj": xcol}) 

160 cmd.set_provider(p) 

161 collection = cmd.do() 

162 

163 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs 

164 # if col_data["ISSN_électronique"] != "": 

165 # e_issn = { 

166 # "resource_id": collection.resource_ptr_id, 

167 # "id_type": "e_issn", 

168 # "id_value": col_data["ISSN_électronique"], 

169 # } 

170 # ResourceId.objects.create(**e_issn) 

171 # 

172 # if col_data["ISSN_papier"] != "": 

173 # issn = { 

174 # "resource_id": collection.resource_ptr_id, 

175 # "id_type": "issn", 

176 # "id_value": col_data["ISSN_papier"], 

177 # } 

178 # ResourceId.objects.create(**issn) 

179 

180 if not collection: 

181 raise ResourceDoesNotExist(f"Resource {pid} does not exist") 

182 

183 return collection 

184 

185 

186NUMDAM_COLLECTIONS = [ 

187 "ACIRM", 

188 "ALCO", 

189 "AFST", 

190 "AIHPC", 

191 "AIHPA", 

192 "AIHPB", 

193 "AIF", 

194 "AIHP", 

195 "AUG", 

196 "AMPA", 

197 "AHL", 

198 "AMBP", 

199 "ASENS", 

200 "ASCFPA", 

201 "ASCFM", 

202 "ASNSP", 

203 "AST", 

204 "BSMF", 

205 "BSMA", 

206 "CTGDC", 

207 "BURO", 

208 "CSHM", 

209 "CG", 

210 "CM", 

211 "CRMATH", 

212 "CML", 

213 "CJPS", 

214 "CIF", 

215 "DIA", 

216 "COCV", 

217 "M2AN", 

218 "PS", 

219 "GAU", 

220 "GEA", 

221 "STS", 

222 "TAN", 

223 "JSFS", 

224 "JEP", 

225 "JMPA", 

226 "JTNB", 

227 "JEDP", 

228 "CAD", 

229 "CCIRM", 

230 "RCP25", 

231 "MSIA", 

232 "MRR", 

233 "MSH", 

234 "MSMF", 

235 "MSM", 

236 "NAM", 

237 "OJMO", 

238 "PHSC", 

239 "PSMIR", 

240 "PDML", 

241 "PMB", 

242 "PMIHES", 

243 "PMIR", 

244 "RO", 

245 "RCP", 

246 "ITA", 

247 "RSMUP", 

248 "RSA", 

249 "RHM", 

250 "SG", 

251 "SB", 

252 "SBCD", 

253 "SC", 

254 "SCC", 

255 "SAF", 

256 "SDPP", 

257 "SMJ", 

258 "SPHM", 

259 "SPS", 

260 "STNB", 

261 "STNG", 

262 "TSG", 

263 "SD", 

264 "SE", 

265 "SEDP", 

266 "SHC", 

267 "SJ", 

268 "SJL", 

269 "SLSEDP", 

270 "SLDB", 

271 "SL", 

272 "SPK", 

273 "SAC", 

274 "SMS", 

275 "SLS", 

276 "SSL", 

277 "SENL", 

278 "SSS", 

279 "SAD", 

280 "THESE", 

281 "SMAI-JCM", 

282 "WBLN", 

283] 

284 

285 

286def cleanup_str(input: str): 

287 # some white spaces aren't actual space characters, like \xa0 

288 input = unicodedata.normalize("NFKD", input) 

289 # remove useless continuous \n and spaces from the string 

290 return re.sub(r"[\n\t ]+", " ", input).strip()