Coverage for src / crawler / utils.py: 55%

99 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-11 14:57 +0000

1import json 

2import os 

3import re 

4import unicodedata 

5from datetime import timedelta 

6from functools import lru_cache 

7from typing import Literal 

8 

9import regex 

10from bs4 import BeautifulSoup 

11from django.conf import settings 

12from django.contrib.auth.models import User 

13from history.model_data import HistoryEventDict, HistoryEventType 

14from history.models import HistoryEventStatus 

15 

16# from ptf.models import ResourceId 

17from history.utils import insert_history_event 

18from ptf import model_helpers 

19from ptf.cmds import ptf_cmds 

20from ptf.exceptions import ResourceDoesNotExist 

21from ptf.model_data import ResourceData, create_extlink, create_publicationdata 

22from ptf.models import Collection 

23from requests_cache import CachedSession, MongoCache 

24 

25from crawler.types import JSONCol 

26 

27 

28def insert_crawl_event_in_history( 

29 colid: str, 

30 source_domain: str, 

31 username: str, 

32 status: HistoryEventStatus, 

33 tasks_count, 

34 message: str, 

35 event_type: HistoryEventType = "import", 

36 title=None, 

37): 

38 collection = model_helpers.get_collection(colid, sites=False) 

39 user = User.objects.get(username=username) 

40 

41 event_data: HistoryEventDict = { 

42 "type": event_type, 

43 "pid": f"{colid}-{source_domain}", 

44 "col": collection, 

45 "source": source_domain, 

46 "status": status, 

47 "title": collection.title_html if collection else (title or colid), 

48 "userid": user.pk, 

49 "type_error": "", 

50 "message": message, 

51 } 

52 

53 insert_history_event(event_data) 

54 

55 

56def col_has_source(col: JSONCol, filter: str): 

57 return any(source for source in col["sources"] if source == filter) 

58 

59 

60def get_cols_by_source(source: str) -> list[JSONCol]: 

61 """ 

62 Get all cols by source 

63 @param source: str 

64 @return: list of collections 

65 """ 

66 data = get_all_cols() 

67 

68 return [col for col in data.values() if col_has_source(col, source)] 

69 

70 

71def get_all_cols_by_source(): 

72 """ 

73 Get all cols by source 

74 @return: dict of collections by source 

75 """ 

76 data = get_all_cols() 

77 

78 sources = {} 

79 for col in data.values(): 

80 for source in col["sources"]: 

81 if source not in sources: 

82 sources[source] = [] 

83 sources[source].append(col) 

84 

85 return sources 

86 

87 

88@lru_cache(maxsize=None) 

89def get_all_cols() -> dict[str, JSONCol]: 

90 with open( 

91 os.path.dirname(os.path.abspath(__file__)) + "/data/all_cols.json", encoding="utf8" 

92 ) as data_collections: 

93 return json.load(data_collections) 

94 

95 

96def get_or_create_collection(pid: str): 

97 """ 

98 Creates a Collection based on its pid. 

99 The pid has to be in the list of collections given by the Documentation team (CSV then transformed in JSON) 

100 """ 

101 

102 all_collections = get_all_cols() 

103 

104 if pid not in all_collections: 104 ↛ 105line 104 didn't jump to line 105 because the condition on line 104 was never true

105 raise ValueError(f"{pid} is not listed in all_cols.csv") 

106 

107 col_data = [item for item in all_collections.items() if item[0] == pid][0][1] 

108 

109 collection: Collection | None = model_helpers.get_collection(pid, sites=False) 

110 

111 if not collection: 111 ↛ 112line 111 didn't jump to line 112 because the condition on line 111 was never true

112 p = model_helpers.get_provider("mathdoc-id") 

113 

114 xcol = create_publicationdata() 

115 xcol.coltype = col_data["type"] 

116 xcol.pid = pid 

117 xcol.title_tex = col_data["title"] 

118 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs 

119 # xcol.e_issn = col_data["ISSN_électronique"] 

120 # xcol.issn = col_data["ISSN_papier"] 

121 xcol.title_html = col_data["title"] 

122 xcol.title_xml = f"<title-group><title>{col_data['title']}</title></title-group>" 

123 xcol.lang = "en" 

124 

125 cmd = ptf_cmds.addCollectionPtfCmd({"xobj": xcol}) 

126 cmd.set_provider(p) 

127 collection = cmd.do() 

128 

129 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs 

130 # if col_data["ISSN_électronique"] != "": 

131 # e_issn = { 

132 # "resource_id": collection.resource_ptr_id, 

133 # "id_type": "e_issn", 

134 # "id_value": col_data["ISSN_électronique"], 

135 # } 

136 # ResourceId.objects.create(**e_issn) 

137 # 

138 # if col_data["ISSN_papier"] != "": 

139 # issn = { 

140 # "resource_id": collection.resource_ptr_id, 

141 # "id_type": "issn", 

142 # "id_value": col_data["ISSN_papier"], 

143 # } 

144 # ResourceId.objects.create(**issn) 

145 

146 if not collection: 146 ↛ 147line 146 didn't jump to line 147 because the condition on line 146 was never true

147 raise ResourceDoesNotExist(f"Resource {pid} does not exist") 

148 

149 return collection 

150 

151 

152def cleanup_str(input: str): 

153 # some white spaces aren't actual space characters, like \xa0 

154 input = unicodedata.normalize("NFKC", input) 

155 # 

156 input = re.sub(r"[\x7f]+", "", input) 

157 # remove useless continuous \n and spaces from the string 

158 return re.sub(r"[\n\t\r ]+", " ", input).strip() 

159 

160 

161def add_pdf_link_to_xarticle( 

162 xarticle: ResourceData, 

163 pdf_url: str, 

164 mimetype: Literal["application/pdf", "application/x-tex"] = "application/pdf", 

165): 

166 xarticle.streams.append( 

167 { 

168 "rel": "full-text", 

169 "mimetype": mimetype, 

170 "location": pdf_url, 

171 "base": "", 

172 "text": "Full Text", 

173 } 

174 ) 

175 

176 # The pdf url is already added as a stream (just above) but might be replaced by a file later on. 

177 # Keep the pdf url as an Extlink if we want to propose both option: 

178 # - direct download of a local PDF 

179 # - URL to the remote PDF 

180 rel = "article-pdf" if mimetype == "application/pdf" else "article-tex" 

181 ext_link = create_extlink(rel=rel, location=pdf_url) 

182 xarticle.ext_links.append(ext_link) 

183 

184 

185def regex_to_dict(pattern: str, value: str, *, error_msg="Regex failed to parse"): 

186 issue_search = regex.search(pattern, value) 

187 if not issue_search: 187 ↛ 188line 187 didn't jump to line 188 because the condition on line 187 was never true

188 raise ValueError(error_msg) 

189 

190 return issue_search.groupdict() 

191 

192 

193def get_base(soup: BeautifulSoup, default: str): 

194 base_tag = soup.select_one("head base") 

195 if not base_tag: 195 ↛ 196line 195 didn't jump to line 196 because the condition on line 195 was never true

196 return default 

197 base = base_tag.get("href") 

198 if not isinstance(base, str): 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true

199 raise ValueError("Cannot parse base href") 

200 return base 

201 

202 

203_session = None 

204 

205 

206def get_session(headers={}, match_headers=None): 

207 global _session 

208 if not _session: 

209 _session = CachedSession( 

210 match_headers=match_headers, 

211 headers=headers, 

212 backend=MongoCache( 

213 host=getattr(settings, "MONGO_HOSTNAME", "localhost"), decode_content=False 

214 ), 

215 expire_after=timedelta(days=30), 

216 ) 

217 return _session 

218 

219 

220try: 

221 from crawler.tests.data_generation.decorators import skip_generation 

222except ImportError: 

223 

224 def skip_generation(func): 

225 def wrapper(*args, **kwargs): 

226 return func(*args, **kwargs) 

227 

228 return wrapper