Coverage for src/crawler/utils.py: 54%

82 statements  

« prev     ^ index     » next       coverage.py v7.9.0, created at 2025-10-08 15:14 +0000

1import json 

2import os 

3import re 

4import unicodedata 

5from functools import lru_cache 

6from typing import Literal 

7 

8import regex 

9from django.contrib.auth.models import User 

10from history.model_data import HistoryEventDict, HistoryEventType 

11from history.models import HistoryEvent 

12 

13# from ptf.models import ResourceId 

14from history.utils import insert_history_event 

15from ptf import model_helpers 

16from ptf.cmds import ptf_cmds 

17from ptf.exceptions import ResourceDoesNotExist 

18from ptf.model_data import ResourceData, create_extlink, create_publicationdata 

19from ptf.models import Collection 

20 

21from crawler.types import JSONCol 

22 

23 

24def insert_crawl_event_in_history( 

25 colid: str, 

26 source_domain: str, 

27 username: str, 

28 status: HistoryEvent.EventStatusEnum, 

29 tasks_count, 

30 message: str, 

31 event_type: HistoryEventType = "import", 

32 title=None, 

33): 

34 collection = model_helpers.get_collection(colid, sites=False) 

35 user = User.objects.get(username=username) 

36 

37 event_data: HistoryEventDict = { 

38 "type": event_type, 

39 "pid": f"{colid}-{source_domain}", 

40 "col": collection, 

41 "source": source_domain, 

42 "status": status, 

43 "title": collection.title_html if collection else (title or colid), 

44 "userid": user.pk, 

45 "type_error": "", 

46 "data": { 

47 "message": message, 

48 }, 

49 } 

50 

51 insert_history_event(event_data) 

52 

53 

54def col_has_source(col: JSONCol, filter: str): 

55 return any(source for source in col["sources"] if source == filter) 

56 

57 

58def get_cols_by_source(source: str) -> list[JSONCol]: 

59 """ 

60 Get all cols by source 

61 @param source: str 

62 @return: list of collections 

63 """ 

64 data = get_all_cols() 

65 

66 return [col for col in data.values() if col_has_source(col, source)] 

67 

68 

69def get_all_cols_by_source(): 

70 """ 

71 Get all cols by source 

72 @return: dict of collections by source 

73 """ 

74 data = get_all_cols() 

75 

76 sources = {} 

77 for col in data.values(): 

78 for source in col["sources"]: 

79 if source not in sources: 

80 sources[source] = [] 

81 sources[source].append(col) 

82 

83 return sources 

84 

85 

86@lru_cache(maxsize=None) 

87def get_all_cols() -> dict[str, JSONCol]: 

88 with open( 

89 os.path.dirname(os.path.abspath(__file__)) + "/data/all_cols.json", encoding="utf8" 

90 ) as data_collections: 

91 return json.load(data_collections) 

92 

93 

94def get_or_create_collection(pid: str): 

95 """ 

96 Creates a Collection based on its pid. 

97 The pid has to be in the list of collections given by the Documentation team (CSV then transformed in JSON) 

98 """ 

99 

100 all_collections = get_all_cols() 

101 

102 if pid not in all_collections: 102 ↛ 103line 102 didn't jump to line 103 because the condition on line 102 was never true

103 raise ValueError(f"{pid} is not listed in all_cols.csv") 

104 

105 col_data = [item for item in all_collections.items() if item[0] == pid][0][1] 

106 

107 collection: Collection | None = model_helpers.get_collection(pid, sites=False) 

108 

109 if not collection: 109 ↛ 110line 109 didn't jump to line 110 because the condition on line 109 was never true

110 p = model_helpers.get_provider("mathdoc-id") 

111 

112 xcol = create_publicationdata() 

113 xcol.coltype = col_data["type"] 

114 xcol.pid = pid 

115 xcol.title_tex = col_data["title"] 

116 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs 

117 # xcol.e_issn = col_data["ISSN_électronique"] 

118 # xcol.issn = col_data["ISSN_papier"] 

119 xcol.title_html = col_data["title"] 

120 xcol.title_xml = f"<title-group><title>{col_data['title']}</title></title-group>" 

121 xcol.lang = "en" 

122 

123 cmd = ptf_cmds.addCollectionPtfCmd({"xobj": xcol}) 

124 cmd.set_provider(p) 

125 collection = cmd.do() 

126 

127 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs 

128 # if col_data["ISSN_électronique"] != "": 

129 # e_issn = { 

130 # "resource_id": collection.resource_ptr_id, 

131 # "id_type": "e_issn", 

132 # "id_value": col_data["ISSN_électronique"], 

133 # } 

134 # ResourceId.objects.create(**e_issn) 

135 # 

136 # if col_data["ISSN_papier"] != "": 

137 # issn = { 

138 # "resource_id": collection.resource_ptr_id, 

139 # "id_type": "issn", 

140 # "id_value": col_data["ISSN_papier"], 

141 # } 

142 # ResourceId.objects.create(**issn) 

143 

144 if not collection: 144 ↛ 145line 144 didn't jump to line 145 because the condition on line 144 was never true

145 raise ResourceDoesNotExist(f"Resource {pid} does not exist") 

146 

147 return collection 

148 

149 

150def cleanup_str(input: str): 

151 # some white spaces aren't actual space characters, like \xa0 

152 input = unicodedata.normalize("NFKC", input) 

153 # 

154 input = re.sub(r"[\x7f]+", "", input) 

155 # remove useless continuous \n and spaces from the string 

156 return re.sub(r"[\n\t\r ]+", " ", input).strip() 

157 

158 

159def add_pdf_link_to_xarticle( 

160 xarticle: ResourceData, 

161 pdf_url: str, 

162 mimetype: Literal["application/pdf", "application/x-tex"] = "application/pdf", 

163): 

164 xarticle.streams.append( 

165 { 

166 "rel": "full-text", 

167 "mimetype": mimetype, 

168 "location": pdf_url, 

169 "base": "", 

170 "text": "Full Text", 

171 } 

172 ) 

173 

174 # The pdf url is already added as a stream (just above) but might be replaced by a file later on. 

175 # Keep the pdf url as an Extlink if we want to propose both option: 

176 # - direct download of a local PDF 

177 # - URL to the remote PDF 

178 rel = "article-pdf" if mimetype == "application/pdf" else "article-tex" 

179 ext_link = create_extlink(rel=rel, location=pdf_url) 

180 xarticle.ext_links.append(ext_link) 

181 

182 

183def regex_to_dict(pattern: str, value: str, *args, error_msg="Regex failed to parse"): 

184 issue_search = regex.search(pattern, value) 

185 if not issue_search: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true

186 raise ValueError(error_msg) 

187 

188 return issue_search.groupdict() 

189 

190 

191try: 

192 from crawler.tests.data_generation.decorators import skip_generation 

193except ImportError: 

194 

195 def skip_generation(func): 

196 def wrapper(*args, **kwargs): 

197 return func(*args, **kwargs) 

198 

199 return wrapper