Coverage for src/crawler/utils.py: 27%
80 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-20 09:03 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-20 09:03 +0000
1import json
2import os
3import re
4import unicodedata
6import requests
7from django.conf import settings
8from django.contrib.auth.models import User
9from history.views import insert_history_event
10from ptf import model_helpers
11from ptf.cmds import ptf_cmds
12from ptf.display import resolver
13from ptf.exceptions import ResourceDoesNotExist
14from ptf.model_data import create_publicationdata
15from ptf.models import Collection
17from crawler.crawler_types import JSONCol
19# from ptf.models import ResourceId
22def insert_crawl_event_in_history(
23 colid, source_name, username, status, tasks_count, message, event_type="import"
24):
25 collection = model_helpers.get_collection(colid)
26 if collection is None:
27 UserWarning(f"Collection {colid} cannot be found inside model_helpers")
28 return
29 user = User.objects.get(username=username)
31 event_data = {
32 "type": event_type,
33 "pid": f"{colid}-{source_name}",
34 "col": colid,
35 "source": source_name,
36 "status": status,
37 "title": collection.title_html,
38 "userid": user.id,
39 "type_error": "",
40 "data": {
41 "ids_count": tasks_count,
42 "message": message,
43 "target": "",
44 },
45 }
47 insert_history_event(event_data)
50def get_cached_html_folder(collection_id, source_id, container_id=None, article_id=None):
51 """
52 Web crawling (collections, issues, articles) save the downloaded files on disk in settings.HTML_ROOT_FOLDER
53 The cache is used in case of a second attempt to get the HTML content.
54 """
56 folder = resolver.get_relative_folder(collection_id, container_id, article_id)
58 if container_id is None and article_id is None:
59 folder = os.path.join(settings.HTML_ROOT_FOLDER, folder, "html", source_id)
60 else:
61 folder = os.path.join(settings.HTML_ROOT_FOLDER, folder, "html")
63 return folder
66def col_has_source(col: JSONCol, filter: str):
67 return any(source for source in col["sources"] if source == filter)
70def get_cols_by_source(source: str) -> list[JSONCol]:
71 """
72 Get all cols by source
73 @param source: str
74 @return: list of collections
75 """
76 data = get_all_cols()
78 return [col for col in data.values() if col_has_source(col, source)]
81def get_all_cols() -> dict[str, JSONCol]:
82 with open(
83 os.path.dirname(os.path.abspath(__file__)) + "/data/all_cols.json", encoding="utf8"
84 ) as data_collections:
85 return json.load(data_collections)
88def read_json(path):
89 with open(path, encoding="utf8") as data_collections:
90 return json.load(data_collections)
93def get_numdam_collections():
94 """
95 Returns a list of Numdam collection pids
96 """
98 url = "http://www.numdam.org/api-all-collections/"
100 response = requests.get(url)
101 if response.status_code != 200:
102 return []
104 data = response.json()
105 if "collections" not in data:
106 return []
108 return data["collections"]
111def get_or_create_collection(pid: str):
112 """
113 Creates a Collection based on its pid.
114 The pid has to be in the list of collections given by the Documentation team (CSV then transformed in JSON)
115 """
117 all_collections = get_all_cols()
119 if pid == "DA2":
120 col_data = {
121 "pid": "DA2",
122 "url": "https://discreteanalysisjournal.com",
123 "title": "Discrete Analysis",
124 }
125 elif pid == "ARSIA":
126 col_data = {
127 "pid": "ARSIA",
128 "url": "https://ars-inveniendi-analytica.com",
129 "title": "Ars Inveniendi Analytica",
130 }
131 elif pid == "AMC":
132 col_data = {
133 "pid": "AMC",
134 "url": "https://amc-journal.eu/index.php/amc/issue/archive",
135 "title": "Ars Mathematica Contemporanea",
136 }
137 else:
138 if pid not in all_collections:
139 raise ValueError(f"{pid} is not listed in the Eudml collections")
141 col_data = [item for item in all_collections.items() if item[0] == pid][0][1]
143 collection: Collection | None = model_helpers.get_collection(pid)
145 if not collection:
146 p = model_helpers.get_provider("mathdoc-id")
148 xcol = create_publicationdata()
149 xcol.coltype = "journal"
150 xcol.pid = pid
151 xcol.title_tex = col_data["title"]
152 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs
153 # xcol.e_issn = col_data["ISSN_électronique"]
154 # xcol.issn = col_data["ISSN_papier"]
155 xcol.title_html = col_data["title"]
156 xcol.title_xml = f"<title-group><title>{col_data['title']}</title></title-group>"
157 xcol.lang = "en"
159 cmd = ptf_cmds.addCollectionPtfCmd({"xobj": xcol})
160 cmd.set_provider(p)
161 collection = cmd.do()
163 # Mis en commentaire car trop tôt, Taban n'a pas encore validé les ISSNs
164 # if col_data["ISSN_électronique"] != "":
165 # e_issn = {
166 # "resource_id": collection.resource_ptr_id,
167 # "id_type": "e_issn",
168 # "id_value": col_data["ISSN_électronique"],
169 # }
170 # ResourceId.objects.create(**e_issn)
171 #
172 # if col_data["ISSN_papier"] != "":
173 # issn = {
174 # "resource_id": collection.resource_ptr_id,
175 # "id_type": "issn",
176 # "id_value": col_data["ISSN_papier"],
177 # }
178 # ResourceId.objects.create(**issn)
180 if not collection:
181 raise ResourceDoesNotExist(f"Resource {pid} does not exist")
183 return collection
186NUMDAM_COLLECTIONS = [
187 "ACIRM",
188 "ALCO",
189 "AFST",
190 "AIHPC",
191 "AIHPA",
192 "AIHPB",
193 "AIF",
194 "AIHP",
195 "AUG",
196 "AMPA",
197 "AHL",
198 "AMBP",
199 "ASENS",
200 "ASCFPA",
201 "ASCFM",
202 "ASNSP",
203 "AST",
204 "BSMF",
205 "BSMA",
206 "CTGDC",
207 "BURO",
208 "CSHM",
209 "CG",
210 "CM",
211 "CRMATH",
212 "CML",
213 "CJPS",
214 "CIF",
215 "DIA",
216 "COCV",
217 "M2AN",
218 "PS",
219 "GAU",
220 "GEA",
221 "STS",
222 "TAN",
223 "JSFS",
224 "JEP",
225 "JMPA",
226 "JTNB",
227 "JEDP",
228 "CAD",
229 "CCIRM",
230 "RCP25",
231 "MSIA",
232 "MRR",
233 "MSH",
234 "MSMF",
235 "MSM",
236 "NAM",
237 "OJMO",
238 "PHSC",
239 "PSMIR",
240 "PDML",
241 "PMB",
242 "PMIHES",
243 "PMIR",
244 "RO",
245 "RCP",
246 "ITA",
247 "RSMUP",
248 "RSA",
249 "RHM",
250 "SG",
251 "SB",
252 "SBCD",
253 "SC",
254 "SCC",
255 "SAF",
256 "SDPP",
257 "SMJ",
258 "SPHM",
259 "SPS",
260 "STNB",
261 "STNG",
262 "TSG",
263 "SD",
264 "SE",
265 "SEDP",
266 "SHC",
267 "SJ",
268 "SJL",
269 "SLSEDP",
270 "SLDB",
271 "SL",
272 "SPK",
273 "SAC",
274 "SMS",
275 "SLS",
276 "SSL",
277 "SENL",
278 "SSS",
279 "SAD",
280 "THESE",
281 "SMAI-JCM",
282 "WBLN",
283]
286def cleanup_str(input: str):
287 # some white spaces aren't actual space characters, like \xa0
288 input = unicodedata.normalize("NFKD", input)
289 # remove useless continuous \n and spaces from the string
290 return re.sub(r"[\n\t ]+", " ", input).strip()