Coverage for src/crawler/by_source/bdim_crawler.py: 90%
242 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-20 09:03 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-20 09:03 +0000
1import re
3import regex
4from bs4 import BeautifulSoup
5from bs4 import Tag
6from crawler.base_crawler import BaseCollectionCrawler
7from crawler.base_crawler import add_pdf_link_to_xarticle
9from ptf.cmds.xml.jats.builder.citation import ContribAuthor
10from ptf.cmds.xml.jats.builder.citation import get_all_authors_xml
11from ptf.cmds.xml.jats.builder.citation import get_ext_link_xml
12from ptf.cmds.xml.jats.builder.citation import get_publisher_xml
13from ptf.cmds.xml.jats.builder.citation import get_source_xml
14from ptf.cmds.xml.jats.builder.citation import get_volume_xml
15from ptf.cmds.xml.jats.builder.citation import get_year_xml
16from ptf.cmds.xml.jats.builder.issue import get_title_xml
17from ptf.cmds.xml.xml_utils import escape
18from ptf.model_data import AbstractDict
19from ptf.model_data import create_articledata
20from ptf.model_data import create_contributor
21from ptf.model_data import create_issuedata
24class BdimCrawler(BaseCollectionCrawler):
25 source_name = "Biblioteca Digitale Italiana di Matematica"
26 source_domain = "BDIM"
27 source_website = "http://www.bdim.eu"
29 def __init__(self, *args, **kwargs):
30 super().__init__(*args, **kwargs)
32 # TODO: creates a cols.csv that supersedes cols_eudml.csv with the entire collection catalogue.
34 self.source = self.get_or_create_source()
36 self.issue_href = r"\?id=(?P<col>\w+)(?P<issue>_\d{1,4})"
38 def parse_collection_content(self, content):
39 """
40 Parse the HTML page of Annals of Math and returns a list of xissue.
41 Each xissue has its pid/volume/number/year metadata + its url
43 self.periode is set at the end based on the xissue years of the HTML page
44 """
45 soup = BeautifulSoup(content, "html.parser")
46 xissues = []
48 reg_issue = regex.compile(self.issue_href)
50 issue_nodes = [
51 issue
52 for issue in soup.select("div.listafascicoli a")
53 if reg_issue.search(issue.get("href"))
54 ]
56 for issue_node in issue_nodes:
57 # issue_text = issue_node.get_text()
59 part_issue = issue_node.get("href").split("_")
60 volume = part_issue[-2]
61 number = part_issue[-1]
62 year = part_issue[1]
63 serie = part_issue[2]
64 link = "/item" + issue_node.get("href")
65 xissue = self.create_xissue(link, serie, volume, number, year)
66 if xissue: 66 ↛ 56line 66 didn't jump to line 56 because the condition on line 66 was always true
67 xissues.append(xissue)
69 self.periode_begin = self.get_year(xissues[0].year)
70 self.periode_end = self.get_year(xissues[-1].year)
72 self.periode = self.get_or_create_periode()
74 return xissues
76 def get_year(self, year):
77 if "/" in year: 77 ↛ 78line 77 didn't jump to line 78 because the condition on line 77 was never true
78 year = year.split("/")[0]
80 return year
82 def create_xissue(self, url, serie, volume, number, dates):
83 year = dates.replace("/", "-")
85 xissue = create_issuedata()
86 xissue.pid = f"{self.collection_id}_{year}_{serie}_{volume}_{number}"
87 xissue.year = year
88 xissue.volume = volume
89 xissue.number = number
90 xissue.vseries = serie
91 xissue.url = self.source_website + url
93 return xissue
95 def parse_issue_content(self, content, xissue):
96 soup = BeautifulSoup(content, "html.parser")
97 article_nodes = soup.find_all("div", {"class": "referenza"})
99 for index_article, article_node in enumerate(article_nodes):
100 article_link_node = article_node.find("a", text="referenza completa")
101 if article_link_node: 101 ↛ 99line 101 didn't jump to line 99 because the condition on line 101 was always true
102 url = article_link_node.get("href")
103 xarticle = create_articledata()
104 xarticle.pid = "a" + str(index_article)
105 xarticle.url = self.source_website + url
107 xissue.articles.append(xarticle)
109 def parse_article_content(self, content, xissue, xarticle, url, pid):
110 """
111 Parse the content with Beautifulsoup and returns an ArticleData
112 """
113 xarticle = create_articledata()
114 xarticle.pid = pid
115 xarticle.lang = "it"
117 soup = BeautifulSoup(content, "html.parser")
118 # TITLE
119 title_node = soup.find("span", {"class": "titolo"})
120 if title_node: 120 ↛ 125line 120 didn't jump to line 125 because the condition on line 120 was always true
121 xarticle.title_tex = title_node.get_text()
122 if xarticle.title_tex == "":
123 xarticle.title_tex = " "
125 reg_author_link = regex.compile(r"\?testo=\w+")
126 text_author_bloc = soup.select_one("div.referenza p")
127 if text_author_bloc: 127 ↛ 133line 127 didn't jump to line 133 because the condition on line 127 was always true
128 authors = [
129 link
130 for link in text_author_bloc.select("a")
131 if reg_author_link.search(link.get("href"))
132 ]
133 if authors: 133 ↛ 163line 133 didn't jump to line 163 because the condition on line 133 was always true
134 for contrib in authors:
135 role = "author"
136 contrib_node = contrib.find("span", {"class": "autore"})
137 if contrib_node is not None: 137 ↛ 134line 137 didn't jump to line 134 because the condition on line 137 was always true
138 surname_node = contrib.find("span", {"class": "cognome"})
139 firstname_node = contrib.find("span", {"class": "nome"})
140 surname = ""
141 firstname = ""
142 author = create_contributor()
143 author["role"] = role
145 if surname_node is not None: 145 ↛ 149line 145 didn't jump to line 149 because the condition on line 145 was always true
146 surname = surname_node.get_text()
147 author["last_name"] = surname
149 if firstname_node is not None: 149 ↛ 153line 149 didn't jump to line 153 because the condition on line 149 was always true
150 firstname = firstname_node.get_text()
151 author["first_name"] = firstname
153 string_name = surname + ", " + firstname
155 if not string_name: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 string_name = contrib_node.get_text()
158 author["string_name"] = string_name
160 xarticle.contributors.append(author)
162 # ABSTRACT
163 abstract_section_node = soup.find("div", {"class": "sunto"})
164 if abstract_section_node:
165 abstract = str(abstract_section_node.get_text())
166 xabstract: AbstractDict = {
167 "tag": "abstract",
168 "value_html": "",
169 "value_tex": abstract,
170 "value_xml": "",
171 "lang": "en",
172 }
173 xarticle.abstracts.append(xabstract)
175 # PDF
176 pdf_url = soup.find_all("a", text="pdf")
177 if len(pdf_url) > 0: 177 ↛ 182line 177 didn't jump to line 182 because the condition on line 177 was always true
178 pdf_url = self.source_website + pdf_url[0].get("href")
179 add_pdf_link_to_xarticle(xarticle, pdf_url)
181 # PAGES
182 pages = soup.find("span", {"class": "pagine"})
183 if pages: 183 ↛ 194line 183 didn't jump to line 194 because the condition on line 183 was always true
184 pages_to = re.compile(r"(\(?\d+\)?)?-?(\(?\d+\)?)").search(pages.get_text())
185 if pages_to: 185 ↛ 194line 185 didn't jump to line 194 because the condition on line 185 was always true
186 parts = pages_to[0].split("-")
187 first_page = parts[0].replace("(", "").replace(")", "")
188 if len(parts) > 1: 188 ↛ 191line 188 didn't jump to line 191 because the condition on line 188 was always true
189 last_page = parts[1].replace("(", "").replace(")", "")
190 xarticle.lpage = last_page
191 xarticle.fpage = first_page
193 # Biblio
194 bibitems_tags = soup.select("div.biblio div.bibitem")
195 bibitems = [self.parse_bibitem_tag(item) for item in bibitems_tags]
196 if len(bibitems) > 0:
197 xarticle.abstracts.append(self.create_bibliography(bibitems))
199 # metadata
200 reg_zbl_id = re.compile(r"Zbl \w+")
201 reg_mr_id = re.compile(r"MR \d+")
203 medata_bloc = soup.select_one("div.referenza")
204 if not medata_bloc: 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true
205 raise ValueError("metadata_bloc cannot be found")
206 mr_id = [link for link in medata_bloc.find_all("a") if reg_mr_id.search(link.get_text())]
207 zbl_id = [link for link in medata_bloc.find_all("a") if reg_zbl_id.search(link.get_text())]
209 if len(zbl_id) > 0:
210 zblid = zbl_id[0].get("href")
211 pos = zblid.find("?q=an:")
212 if pos > 0: 212 ↛ 214line 212 didn't jump to line 214 because the condition on line 212 was always true
213 zblid = zblid[pos + 6 :]
214 xarticle.extids.append(("zbl-item-id", zblid))
215 if len(mr_id) > 0:
216 mr_id = mr_id[0].get_text()
217 mr_id = mr_id.split("MR ")
218 mr_id = mr_id[1]
219 xarticle.extids.append(("mr-item-id", mr_id))
221 if xarticle.title_tex == " ":
222 title = " "
223 if xarticle.pid == "RLINA_1965_8_39_5_a17":
224 title = "Eventi fasici nel midollo spinale quali prove di inibizione presinaptica durante il sonno desincronizzato"
225 if xarticle.pid == "RLINA_1973_8_55_6_a0":
226 title = "Complementarity between nilpotent selfmappings and periodic autohomeomorphisms."
227 if xarticle.pid == "RLINA_1973_8_55_6_a2":
228 title = "Sur une extension du lemme de Green."
229 if xarticle.pid == "RLINA_1979_8_67_1-2_a6":
230 title = "On the existence o f an unbounded connected set of solutions fo r nonlinear equations in Banach spaces."
231 if xarticle.pid == "RLINA_1972_8_52_2_a5":
232 title = "Sul carattere proiettivo del rapporto plurisezionale."
233 if xarticle.pid == "RLINA_1980_8_69_1-2_a6":
234 title = "A note on a variational formulation of the Einstein equations fo r thermo-elastic materials."
235 xarticle.title_tex = title
237 return xarticle
239 def parse_bibitem_tag(self, item: Tag):
240 value_xml = ""
241 # First pass : we create an semi-complete XML Jats string, except for the authors
242 # that we store inside authors_list to be serialized at the end
243 authors_list: list[ContribAuthor] = []
244 for c in item.children:
245 c_text = escape(c.text)
246 if isinstance(c, str):
247 value_xml += c_text
248 continue
250 if not isinstance(c, Tag): 250 ↛ 251line 250 didn't jump to line 251 because the condition on line 250 was never true
251 raise NotImplementedError("bibitem_tag is not a Tag or a string")
253 if c.name == "a":
254 a_xml, is_badge = self.parse_a_tag(c)
255 if is_badge:
256 value_xml = regex.sub(r" \| $", "", value_xml)
257 value_xml += a_xml
258 continue
260 child_class = c.get("class")
261 if not child_class: 261 ↛ 262line 261 didn't jump to line 262 because the condition on line 261 was never true
262 value_xml += c_text
263 elif "bautore" in child_class:
264 # TODO : parse firstname and lastname
265 author_data, author_xml = self.parse_biblio_author_tag(c, len(authors_list))
266 authors_list.append(author_data)
267 value_xml += author_xml
269 elif "titolo" in child_class:
270 value_xml += get_title_xml(c_text)
271 elif "rivista" in child_class:
272 value_xml += get_source_xml(c_text)
273 elif "anno" in child_class:
274 value_xml += get_year_xml(c_text)
275 elif "volume" in child_class:
276 value_xml += get_volume_xml(c_text)
277 elif "publisher" in child_class:
278 value_xml += get_publisher_xml(c_text)
279 else:
280 # booktitle
281 value_xml += c_text
283 # In order to have a valid Jats xml, we have to group all authors into the person-group xml tag.
284 authors_occurence = regex.compile(r"{author_\d}").findall(value_xml)
285 if len(authors_occurence) > 0: 285 ↛ 294line 285 didn't jump to line 294 because the condition on line 285 was always true
286 first_author = value_xml.index(authors_occurence[0])
287 last_author = value_xml.index(authors_occurence[-1]) + len(authors_occurence[-1])
288 value_xml = (
289 value_xml[:first_author]
290 + get_all_authors_xml(value_xml[first_author:last_author], authors_list)
291 + value_xml[last_author:]
292 )
294 return self.create_crawled_bibitem(value_xml)
295 # return self.create_crawled_bibitem([*bib_elements, *bib_link_elements])
297 def parse_a_tag(self, a_tag: Tag):
298 a_text = escape(a_tag.text)
299 href = a_tag.get("href")
300 if not href: 300 ↛ 301line 300 didn't jump to line 301 because the condition on line 300 was never true
301 return a_text, False
302 elif isinstance(href, list): 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true
303 raise ValueError("a tag has multiple href values !")
304 else:
305 a_type = "uri"
306 if a_text.startswith("MR "):
307 a_type = "mr-item-id"
308 a_text = a_text.removeprefix("MR ")
309 elif a_text.startswith("Zbl "):
310 a_type = "zbl-item-id"
311 a_text = a_text.removeprefix("Zbl ")
312 return get_ext_link_xml(escape(href), a_text, a_type), a_type != "uri"
314 def parse_biblio_author_tag(self, author_tag: Tag, index: int = 0):
315 value_xml = ""
316 author_data: ContribAuthor = {"template_str": ""}
317 for c in author_tag.children:
318 c_text = escape(c.text)
319 if isinstance(c, str):
320 author_data["template_str"] += c_text
321 continue
323 if not isinstance(c, Tag): 323 ↛ 324line 323 didn't jump to line 324 because the condition on line 323 was never true
324 raise NotImplementedError("author_tag is not a Tag or a string")
325 # given name = cognome = prénom
326 # surname = nome = nom de famille
327 child_class = c.get("class")
328 if not child_class: 328 ↛ 329line 328 didn't jump to line 329 because the condition on line 328 was never true
329 value_xml += c_text
330 elif "cognome" in child_class:
331 c.replace_with("{given_names}")
332 author_data["given_names"] = c_text
333 author_data["template_str"] += "{given_names}"
334 elif "nome" in child_class: 334 ↛ 317line 334 didn't jump to line 317 because the condition on line 334 was always true
335 c.replace_with("{surname}")
336 author_data["surname"] = c_text
337 author_data["template_str"] += "{surname}"
338 value_xml += "{author_" + str(index) + "}"
340 return author_data, value_xml