Coverage for src/crawler/base_crawler.py: 75%
467 statements
« prev ^ index » next coverage.py v7.9.0, created at 2025-08-29 13:43 +0000
« prev ^ index » next coverage.py v7.9.0, created at 2025-08-29 13:43 +0000
1import logging
2import time
3from datetime import datetime, timedelta
4from email.policy import EmailPolicy
6import regex
7import requests
8from bs4 import BeautifulSoup
9from django.conf import settings
10from django.contrib.auth.models import User
11from django.utils import timezone
12from langcodes import standardize_tag
13from lingua import LanguageDetectorBuilder
14from opentelemetry import trace
15from ptf.cmds.xml.ckeditor.utils import (
16 build_jats_data_from_html_field,
17)
18from ptf.cmds.xml.jats.builder.references import (
19 get_article_title_xml,
20 get_author_xml,
21 get_fpage_xml,
22 get_lpage_xml,
23 get_source_xml,
24 get_year_xml,
25)
26from ptf.cmds.xml.jats.jats_parser import JatsBase
27from ptf.model_data import (
28 ArticleData,
29 ContributorDict,
30 IssueData,
31 ResourceData,
32 TitleDict,
33 create_abstract,
34 create_contributor,
35 create_extlink,
36 create_issuedata,
37 create_publisherdata,
38 create_titledata,
39)
40from ptf.model_data_converter import update_data_for_jats
41from pylatexenc.latex2text import LatexNodes2Text
42from pymongo.errors import DocumentTooLarge
43from pysolr import SolrError
44from requests_cache import CachedSession, MongoCache
46from crawler.cmds.xml_cmds import addOrUpdateGDMLIssueXmlCmd
47from crawler.models import Source
48from crawler.types import CitationLiteral
49from crawler.utils import add_pdf_link_to_xarticle, cleanup_str, get_or_create_collection
51# TODO: pass a class factory instead of a dependency to a site
52# TODO: pass a class factory instead of a dependency to a site
55class CrawlerTitleDict(TitleDict):
56 title_tex: str | None
59class BaseCollectionCrawler:
60 """
61 Base collection for the crawlers.
62 To create a crawler:
63 1) derive a class from BaseCollectionCrawler and name it XXXCrawler
64 2) override the functions parse_collection_content, parse_issue_content and parse_article_content
65 3) update factory.py so that crawler_factory can return your new crawler
66 """
68 logger = logging.getLogger(__name__)
69 tracer = trace.get_tracer(__name__)
71 source_name = ""
72 source_domain = ""
73 source_website = ""
75 issue_href = ""
77 collection = None
78 source = None
79 user = None
80 session: requests.Session | CachedSession
82 verify = True
83 headers = {
84 "accept_encoding": "utf-8",
85 "User-Agent": getattr(settings, "REQUESTS_USER_AGENT", "Mathdoc/1.0.0"),
86 "From": getattr(settings, "REQUESTS_EMAIL", "accueil@listes.mathdoc.fr"),
87 }
89 next_allowed_request: float = time.time()
91 # seconds to wait between two http requests
92 requests_interval = getattr(settings, "REQUESTS_INTERVAL", 90)
94 latext_parser = LatexNodes2Text()
96 # Override the values in your concrete crawler if the formulas in text (titles, abstracts)
97 # do not use the "$" to surround tex formulas
98 delimiter_inline_formula = "$"
99 delimiter_disp_formula = "$"
101 # HACK : Workaround for tests (monkeypatching)
102 # We store the class here, so we can monkeypatch it when running tests
103 # subCrawlers = {
104 # LofplCrawler: None
105 # }
106 subCrawlers: dict[type["BaseCollectionCrawler"], "BaseCollectionCrawler | None"] = {}
108 language_detector = LanguageDetectorBuilder.from_all_languages().build()
110 force_refresh = False
112 # Whereas to include headers in requests cache key
113 match_headers = False
114 orcid_re = r"https\:\/\/orcid\.org\/(?P<orcid>\d{4}-\d{4}-\d{4}-\d{4})"
116 # Set this to False on a Crawler-basis to allow inserting articles without PDFs
117 ignore_missing_pdf = True
119 def __init__(
120 self,
121 *args,
122 username: str,
123 collection_id: str,
124 collection_url: str,
125 test_mode: bool = False,
126 publisher: str = "mathdoc",
127 force_refresh=False,
128 ):
129 for CrawlerClass in self.subCrawlers: 129 ↛ 130line 129 didn't jump to line 130 because the loop on line 129 never started
130 self.subCrawlers[CrawlerClass] = CrawlerClass(
131 *args,
132 username=username,
133 collection_id=collection_id,
134 collection_url=collection_url,
135 test_mode=test_mode,
136 publisher=publisher,
137 )
138 self.logger = logging.getLogger(__name__ + "." + self.source_domain)
140 self.username = username
142 self.collection_id = collection_id
143 self.collection_url = (
144 collection_url # url of the collection. Ex: https://eudml.org/journal/10098
145 )
147 self.test_mode = test_mode
148 self.publisher = publisher
150 self.session = requests.session()
152 # Skipped when running tests
153 self.initialize()
154 self.session.verify = self.verify
155 self.force_refresh = force_refresh
157 def initialize(self):
158 """
159 Acts as a "second" init function to skip model accesses during test data generation
160 """
161 self.collection = get_or_create_collection(self.collection_id)
162 self.source = self.get_or_create_source()
163 self.user = User.objects.get(username=self.username)
164 self.session = CachedSession(
165 match_headers=self.match_headers,
166 headers=self.headers,
167 backend=MongoCache(
168 host=getattr(settings, "MONGO_HOSTNAME", "localhost"),
169 ),
170 expire_after=timedelta(days=30),
171 )
173 @classmethod
174 def can_crawl(cls, pid: str) -> bool:
175 return True
177 def parse_collection_content(self, content: str) -> list[IssueData]:
178 """
179 Parse the HTML content with BeautifulSoup
180 returns a list of xissue.
181 Override this function in a derived class
182 """
183 return []
185 def parse_issue_content(self, content: str, xissue: IssueData):
186 """
187 Parse the HTML content with BeautifulSoup
188 Fills the xissue.articles
189 Override this function in a derived class.
191 CAV : You are supposed to create articles there. Please assign a PID to each article.
192 The PID can be `a + article_index`, like this : `a0` `a21`
193 """
195 def parse_article_content(
196 self, content: str, xissue: IssueData, xarticle: ArticleData, url: str
197 ) -> ArticleData | None:
198 """
199 Parse the HTML content with BeautifulSoup
200 returns the xarticle.
201 Override this function in a derived class.
202 The xissue is passed to the function in case the article page has issue information (ex: publisher)
203 The article url is also passed as a parameter
205 CAV : You are supposed to assign articles pid again here
206 """
207 return xarticle
209 @tracer.start_as_current_span("crawl_collection")
210 def crawl_collection(self):
211 # TODO: Comments, filter
212 """
213 Crawl an entire collection. ptf.models.Container objects are created.
214 - get the HTML content of the collection_url
215 - parse the HTML content with beautifulsoup to extract the list of issues
216 - merge the xissues (some Source can have multiple pages for 1 volume/issue. We create only 1 container)
217 - crawl each issue if col_only is False
218 - Returns the list of merged issues.
219 It is an OrderedDict {pid: {"issues": xissues}}
220 The key is the pid of the merged issues.
221 Ex: The source may have Ex: Volume 6 (2000) and Volume 6 (1999)
222 the pid is then made with 1999-2000__6_
223 """
225 if self.source is None:
226 raise RuntimeError("ERROR: the source is not set")
228 content = self.download_file(self.collection_url)
229 xissues = self.parse_collection_content(content)
231 """
232 Some collections split the same volumes in different pages
233 Ex: Volume 6 (2000) and Volume 6 (1999)
234 We merge the 2 xissues with the same volume number => Volume 6 (1999-2000)
235 """
236 # merged_xissues = self.merge_xissues(xissues)
238 xissues_dict = {str(i.pid): i for i in xissues}
240 return xissues_dict
242 @tracer.start_as_current_span("crawl_issue")
243 def crawl_issue(self, xissue: IssueData):
244 """
245 Crawl 1 wag page of an issue.
246 - get the HTML content of the issue
247 - parse the HTML content with beautifulsoup to extract the list of articles and/or the issue metadata
248 - crawl each article
249 """
251 # Some source, like EuDML do not have a separate HTML pages for an issue's table of content.
252 # The list of articles directly come from the collection HTML page: the xissue has no url attribute
254 issue_url = xissue.url
255 if issue_url is not None:
256 if issue_url.endswith(".pdf"):
257 add_pdf_link_to_xarticle(xissue, issue_url)
258 xissue.url = None
259 else:
260 content = self.download_file(issue_url)
261 with self.tracer.start_as_current_span("parse_issue_content"):
262 self.parse_issue_content(content, xissue)
264 xarticles = xissue.articles
266 parsed_xarticles = []
268 for xarticle in xarticles:
269 parsed_xarticle = self.crawl_article(xarticle, xissue)
270 if parsed_xarticle is not None:
271 parsed_xarticles.append(parsed_xarticle)
273 xissue.articles = parsed_xarticles
275 article_has_pdf = self.article_has_pdf(xissue)
277 if self.ignore_missing_pdf:
278 xissue.articles = [a for a in xissue.articles if self.article_has_pdf(a)]
280 if not self.test_mode and (len(xissue.articles) > 0 or article_has_pdf):
281 self.process_resource_metadata(xissue, resource_type="issue")
282 self.add_xissue_into_database(xissue)
284 @staticmethod
285 def article_has_source(art: ArticleData | IssueData):
286 return (
287 next(
288 (e_link for e_link in art.ext_links if e_link["rel"] == "source"),
289 None,
290 )
291 is not None
292 )
294 @staticmethod
295 def article_has_pdf(art: ArticleData | IssueData):
296 return (
297 next((link for link in art.ext_links if link["rel"] == "article-pdf"), None)
298 is not None
299 )
301 def crawl_article(self, xarticle: ArticleData, xissue: IssueData):
302 # ARTICLE URL as en ExtLink (to display the link in the article page)
303 if xarticle.url is None:
304 if not self.article_has_source(xarticle): 304 ↛ 314line 304 didn't jump to line 314 because the condition on line 304 was always true
305 if xissue.url:
306 article_source = xissue.url
307 else:
308 article_source = self.collection_url
309 ext_link = create_extlink()
310 ext_link["rel"] = "source"
311 ext_link["location"] = article_source
312 ext_link["metadata"] = self.source_domain
313 xarticle.ext_links.append(ext_link)
314 return self.process_article_metadata(xarticle)
316 content = self.download_file(xarticle.url)
318 xarticle.pid = f"{xissue.pid}_{xarticle.pid}"
320 with self.tracer.start_as_current_span("parse_article_content"):
321 parsed_xarticle = self.parse_article_content(content, xissue, xarticle, xarticle.url)
322 if parsed_xarticle is None: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true
323 return None
325 if parsed_xarticle.doi:
326 parsed_xarticle.pid = (
327 parsed_xarticle.doi.replace("/", "_").replace(".", "_").replace("-", "_")
328 )
330 if not self.article_has_source(parsed_xarticle) and parsed_xarticle.url:
331 ext_link = create_extlink()
332 ext_link["rel"] = "source"
333 ext_link["location"] = parsed_xarticle.url
334 ext_link["metadata"] = self.source_domain
335 parsed_xarticle.ext_links.append(ext_link)
337 # The article title may have formulas surrounded with '$'
338 return self.process_article_metadata(parsed_xarticle)
340 def process_resource_metadata(self, xresource: ResourceData, resource_type="article"):
341 tag = "article-title" if resource_type == "article" else "issue-title"
343 # Process title tex
344 ckeditor_data = build_jats_data_from_html_field(
345 xresource.title_tex,
346 tag=tag,
347 text_lang=xresource.lang,
348 delimiter_inline=self.delimiter_inline_formula,
349 delimiter_disp=self.delimiter_disp_formula,
350 )
352 xresource.title_html = ckeditor_data["value_html"]
353 # xresource.title_tex = ckeditor_data["value_tex"]
354 xresource.title_xml = ckeditor_data["value_xml"]
356 # Process trans_title tex
357 if xresource.trans_title_tex: 357 ↛ 358line 357 didn't jump to line 358 because the condition on line 357 was never true
358 self.logger.warning(
359 "Deprecation Notice : prefer using xresource.title directly instead of xresource.trans_title_tex"
360 )
361 trans_title = self.create_trans_title(
362 xresource_lang=xresource.lang,
363 resource_type=resource_type,
364 title_tex=xresource.trans_title_tex,
365 lang=xresource.trans_lang,
366 )
367 xresource.titles.append(trans_title)
369 abstracts_to_parse = [
370 xabstract for xabstract in xresource.abstracts if xabstract["tag"] == "abstract"
371 ]
372 # abstract may have formulas surrounded with '$'
373 if len(abstracts_to_parse) > 0:
374 for xabstract in abstracts_to_parse:
375 ckeditor_data = build_jats_data_from_html_field(
376 xabstract["value_tex"],
377 tag="abstract",
378 text_lang=xabstract["lang"],
379 resource_lang=xresource.lang,
380 field_type="abstract",
381 delimiter_inline=self.delimiter_inline_formula,
382 delimiter_disp=self.delimiter_disp_formula,
383 )
385 xabstract["value_html"] = ckeditor_data["value_html"]
386 # xabstract["value_tex"] = ckeditor_data["value_tex"]
387 xabstract["value_xml"] = ckeditor_data["value_xml"]
389 return xresource
391 def process_article_metadata(self, xarticle: ArticleData):
392 self.process_resource_metadata(xarticle)
393 for bibitem in xarticle.bibitems:
394 bibitem.type = "unknown"
395 update_data_for_jats(xarticle, with_label=False)
397 return xarticle
399 def _wait_download_delay(self, url: str, force_refresh=False):
400 # If we already have a key, we can skip the timeout
401 if isinstance(self.session, CachedSession): 401 ↛ 402line 401 didn't jump to line 402 because the condition on line 401 was never true
402 if self.session.cache.contains(url=url) and not force_refresh:
403 return
405 delta = self.next_allowed_request - time.time()
406 if delta > 0: 406 ↛ 407line 406 didn't jump to line 407 because the condition on line 406 was never true
407 self.logger.debug(f"Waiting {int(delta)}s before making another request")
408 time.sleep(delta)
409 self.next_allowed_request = time.time() + self.requests_interval
411 def get(self, url: str, force_refresh=False, headers={}) -> requests.Response:
412 """
413 Wrapper around requests.get with delay based on the crawler class instance
414 """
415 response: requests.Response
416 self._wait_download_delay(url, force_refresh)
418 # self.session.cache.delete(urls=[url])
419 if isinstance(self.session, CachedSession): 419 ↛ 420line 419 didn't jump to line 420 because the condition on line 419 was never true
420 response = self.session.get(
421 url,
422 headers={**self.headers, **headers},
423 force_refresh=force_refresh,
424 )
425 else:
426 try:
427 response = self.session.get(url, headers={**self.headers, **headers})
428 except DocumentTooLarge as e:
429 self.logger.error(e)
430 response = requests.get(url, headers={**self.headers, **headers})
431 if not response.ok: 431 ↛ 432line 431 didn't jump to line 432 because the condition on line 431 was never true
432 raise requests.exceptions.HTTPError(
433 f"Endpoint answered with code {response.status_code} : {url}",
434 response=response,
435 )
437 return response
439 def download_file(self, url: str, force_refresh=False, headers={}):
440 """
441 Downloads a page and returns its content (decoded string).
442 This function handles retries and decoding
443 """
444 attempts = 0
445 while True:
446 try:
447 if attempts > 0: 447 ↛ 448line 447 didn't jump to line 448 because the condition on line 447 was never true
448 force_refresh = True
449 response = self.get(
450 url, force_refresh=force_refresh or self.force_refresh, headers=headers
451 )
452 content = self.decode_response(response)
453 if content == "" or not content: 453 ↛ 454line 453 didn't jump to line 454 because the condition on line 453 was never true
454 raise requests.exceptions.HTTPError(response)
455 if isinstance(self.session, CachedSession): 455 ↛ 456line 455 didn't jump to line 456 because the condition on line 455 was never true
456 self.session.cache.save_response(response)
457 return content
458 except (
459 requests.ConnectionError,
460 requests.ConnectTimeout,
461 requests.exceptions.HTTPError,
462 ) as e:
463 if attempts > 3:
464 raise e
465 self.logger.debug(f"Caught error : {e}", extra={"url": url})
466 attempts += 1
467 # 15 mins, 30 mins, 45 mins
468 delay_minutes = attempts * 15
469 self.logger.debug(
470 f"Retrying in {delay_minutes}mins ({(datetime.now() + timedelta(minutes=delay_minutes)).time()})",
471 extra={"url": url},
472 )
473 time.sleep(delay_minutes * 60)
475 def decode_response(self, response: requests.Response, encoding: str | None = None):
476 """Override this if the content-type headers from the sources are advertising something else than the actual content
477 SASA needs this"""
478 # Force
479 if encoding:
480 response.encoding = encoding
481 return response.text
483 # Attempt to get encoding using HTTP headers
484 content_type_tag = response.headers.get("Content-Type", None)
486 if content_type_tag: 486 ↛ 493line 486 didn't jump to line 493 because the condition on line 486 was always true
487 charset = self.parse_content_type_charset(content_type_tag)
488 if charset:
489 response.encoding = charset
490 return response.text
492 # Attempt to get encoding using HTML meta charset tag
493 soup = BeautifulSoup(response.text, "html5lib")
494 charset = soup.select_one("meta[charset]")
495 if charset:
496 htmlencoding = charset.get("charset")
497 if isinstance(htmlencoding, str): 497 ↛ 502line 497 didn't jump to line 502 because the condition on line 497 was always true
498 response.encoding = htmlencoding
499 return response.text
501 # Attempt to get encoding using HTML meta content type tag
502 content_type_tag = soup.select_one('meta[http-equiv="Content-Type"]')
503 if content_type_tag:
504 content_type = content_type_tag.get("content")
505 if isinstance(content_type, str): 505 ↛ 511line 505 didn't jump to line 511 because the condition on line 505 was always true
506 charset = self.parse_content_type_charset(content_type)
507 if charset: 507 ↛ 511line 507 didn't jump to line 511 because the condition on line 507 was always true
508 response.encoding = charset
509 return response.text
511 return response.text
513 @staticmethod
514 def parse_content_type_charset(content_type: str):
515 header = EmailPolicy.header_factory("content-type", content_type)
516 if "charset" in header.params:
517 return header.params.get("charset")
519 @tracer.start_as_current_span("add_xissue_to_database")
520 def add_xissue_into_database(self, xissue: IssueData):
521 xissue.journal = self.collection
522 xissue.source = self.source_domain
524 if xissue.year == "":
525 raise ValueError("Failsafe : Cannot insert issue without a year")
527 xpub = create_publisherdata()
528 xpub.name = self.publisher
529 xissue.publisher = xpub
530 xissue.last_modified_iso_8601_date_str = timezone.now().isoformat()
532 attempt = 1
533 success = False
535 while not success and attempt < 4:
536 try:
537 params = {"xissue": xissue, "use_body": False}
538 cmd = addOrUpdateGDMLIssueXmlCmd(params)
539 cmd.do()
540 success = True
541 self.logger.debug(f"Issue {xissue.pid} inserted in database")
542 except SolrError:
543 self.logger.warning(
544 f"Encoutered SolrError while inserting issue {xissue.pid} in database"
545 )
546 attempt += 1
547 self.logger.debug(f"Attempt {attempt}. sleeping 10 seconds.")
548 time.sleep(10)
550 if success is False:
551 raise ConnectionRefusedError("Cannot connect to SolR")
553 def get_metadata_using_citation_meta(
554 self,
555 xarticle: ArticleData,
556 xissue: IssueData,
557 soup: BeautifulSoup,
558 what: list[CitationLiteral] = [],
559 ):
560 """
561 :param xarticle: the xarticle that will collect the metadata
562 :param xissue: the xissue that will collect the publisher
563 :param soup: the BeautifulSoup object of tha article page
564 :param what: list of citation_ items to collect.
565 :return: None. The given article is modified
566 """
568 if "title" in what:
569 # TITLE
570 citation_title_node = soup.select_one("meta[name='citation_title']")
571 if citation_title_node: 571 ↛ 576line 571 didn't jump to line 576 because the condition on line 571 was always true
572 title = citation_title_node.get("content")
573 if isinstance(title, str): 573 ↛ 576line 573 didn't jump to line 576 because the condition on line 573 was always true
574 xarticle.title_tex = title
576 if "author" in what: 576 ↛ 605line 576 didn't jump to line 605 because the condition on line 576 was always true
577 # AUTHORS
578 citation_author_nodes = soup.select("meta[name^='citation_author']")
579 current_author: ContributorDict | None = None
580 for citation_author_node in citation_author_nodes:
581 if citation_author_node.get("name") == "citation_author":
582 text_author = citation_author_node.get("content")
583 if not isinstance(text_author, str): 583 ↛ 584line 583 didn't jump to line 584 because the condition on line 583 was never true
584 raise ValueError("Cannot parse author")
585 if text_author == "": 585 ↛ 586line 585 didn't jump to line 586 because the condition on line 585 was never true
586 current_author = None
587 continue
588 current_author = create_contributor(role="author", string_name=text_author)
589 xarticle.contributors.append(current_author)
590 continue
591 if current_author is None: 591 ↛ 592line 591 didn't jump to line 592 because the condition on line 591 was never true
592 self.logger.warning("Couldn't parse citation author")
593 continue
594 if citation_author_node.get("name") == "citation_author_institution":
595 text_institution = citation_author_node.get("content")
596 if not isinstance(text_institution, str): 596 ↛ 597line 596 didn't jump to line 597 because the condition on line 596 was never true
597 continue
598 current_author["addresses"].append(text_institution)
599 if citation_author_node.get("name") == "citation_author_ocrid": 599 ↛ 600line 599 didn't jump to line 600 because the condition on line 599 was never true
600 text_orcid = citation_author_node.get("content")
601 if not isinstance(text_orcid, str):
602 continue
603 current_author["orcid"] = text_orcid
605 if "pdf" in what:
606 # PDF
607 citation_pdf_node = soup.select_one('meta[name="citation_pdf_url"]')
608 if citation_pdf_node:
609 pdf_url = citation_pdf_node.get("content")
610 if isinstance(pdf_url, str): 610 ↛ 613line 610 didn't jump to line 613 because the condition on line 610 was always true
611 add_pdf_link_to_xarticle(xarticle, pdf_url)
613 if "lang" in what:
614 # LANG
615 citation_lang_node = soup.select_one("meta[name='citation_language']")
616 if citation_lang_node: 616 ↛ 622line 616 didn't jump to line 622 because the condition on line 616 was always true
617 # TODO: check other language code
618 content_text = citation_lang_node.get("content")
619 if isinstance(content_text, str): 619 ↛ 622line 619 didn't jump to line 622 because the condition on line 619 was always true
620 xarticle.lang = standardize_tag(content_text)
622 if "abstract" in what:
623 # ABSTRACT
624 abstract_node = soup.select_one("meta[name='citation_abstract']")
625 if abstract_node is not None:
626 abstract = abstract_node.get("content")
627 if not isinstance(abstract, str): 627 ↛ 628line 627 didn't jump to line 628 because the condition on line 627 was never true
628 raise ValueError("Couldn't parse abstract from meta")
629 abstract = BeautifulSoup(abstract, "html.parser").text
630 lang = abstract_node.get("lang")
631 if not isinstance(lang, str):
632 lang = self.detect_language(abstract, xarticle)
633 xarticle.abstracts.append(create_abstract(lang=lang, value_tex=abstract))
635 if "page" in what:
636 # PAGES
637 citation_fpage_node = soup.select_one("meta[name='citation_firstpage']")
638 if citation_fpage_node:
639 page = citation_fpage_node.get("content")
640 if isinstance(page, str): 640 ↛ 645line 640 didn't jump to line 645 because the condition on line 640 was always true
641 page = page.split("(")[0]
642 if len(page) < 32: 642 ↛ 645line 642 didn't jump to line 645 because the condition on line 642 was always true
643 xarticle.fpage = page
645 citation_lpage_node = soup.select_one("meta[name='citation_lastpage']")
646 if citation_lpage_node:
647 page = citation_lpage_node.get("content")
648 if isinstance(page, str): 648 ↛ 653line 648 didn't jump to line 653 because the condition on line 648 was always true
649 page = page.split("(")[0]
650 if len(page) < 32: 650 ↛ 653line 650 didn't jump to line 653 because the condition on line 650 was always true
651 xarticle.lpage = page
653 if "doi" in what:
654 # DOI
655 citation_doi_node = soup.select_one("meta[name='citation_doi']")
656 if citation_doi_node:
657 doi = citation_doi_node.get("content")
658 if isinstance(doi, str): 658 ↛ 665line 658 didn't jump to line 665 because the condition on line 658 was always true
659 doi = doi.strip()
660 pos = doi.find("10.")
661 if pos > 0:
662 doi = doi[pos:]
663 xarticle.doi = doi
665 if "mr" in what:
666 # MR
667 citation_mr_node = soup.select_one("meta[name='citation_mr']")
668 if citation_mr_node:
669 mr = citation_mr_node.get("content")
670 if isinstance(mr, str): 670 ↛ 676line 670 didn't jump to line 676 because the condition on line 670 was always true
671 mr = mr.strip()
672 if mr.find("MR") == 0: 672 ↛ 676line 672 didn't jump to line 676 because the condition on line 672 was always true
673 mr = mr[2:]
674 xarticle.extids.append(("mr-item-id", mr))
676 if "zbl" in what:
677 # ZBL
678 citation_zbl_node = soup.select_one("meta[name='citation_zbl']")
679 if citation_zbl_node:
680 zbl = citation_zbl_node.get("content")
681 if isinstance(zbl, str): 681 ↛ 687line 681 didn't jump to line 687 because the condition on line 681 was always true
682 zbl = zbl.strip()
683 if zbl.find("Zbl") == 0: 683 ↛ 687line 683 didn't jump to line 687 because the condition on line 683 was always true
684 zbl = zbl[3:].strip()
685 xarticle.extids.append(("zbl-item-id", zbl))
687 if "publisher" in what:
688 # PUBLISHER
689 citation_publisher_node = soup.select_one("meta[name='citation_publisher']")
690 if citation_publisher_node:
691 pub = citation_publisher_node.get("content")
692 if isinstance(pub, str): 692 ↛ 699line 692 didn't jump to line 699 because the condition on line 692 was always true
693 pub = pub.strip()
694 if pub != "": 694 ↛ 699line 694 didn't jump to line 699 because the condition on line 694 was always true
695 xpub = create_publisherdata()
696 xpub.name = pub
697 xissue.publisher = xpub
699 if "keywords" in what:
700 # KEYWORDS
701 citation_kwd_nodes = soup.select("meta[name='citation_keywords']")
702 for kwd_node in citation_kwd_nodes:
703 kwds = kwd_node.get("content")
704 if isinstance(kwds, str): 704 ↛ 702line 704 didn't jump to line 702 because the condition on line 704 was always true
705 kwds = kwds.split(",")
706 for kwd in kwds:
707 if kwd == "":
708 continue
709 kwd = kwd.strip()
710 xarticle.kwds.append({"type": "", "lang": xarticle.lang, "value": kwd})
712 if "references" in what:
713 citation_references = soup.select("meta[name='citation_reference']")
714 for index, tag in enumerate(citation_references):
715 content = tag.get("content")
716 if not isinstance(content, str): 716 ↛ 717line 716 didn't jump to line 717 because the condition on line 716 was never true
717 raise ValueError("Cannot parse citation_reference meta")
718 label = str(index + 1)
719 if regex.match(r"^\[\d+\].*", content): 719 ↛ 720line 719 didn't jump to line 720 because the condition on line 719 was never true
720 label = None
721 xarticle.bibitems.append(self.__parse_meta_citation_reference(content, label))
723 def create_xissue(
724 self, url: str | None, year: str, volume_number: str | None, issue_number: str | None = "1"
725 ):
726 if url is not None and url.endswith("/"):
727 url = url[:-1]
728 xissue = create_issuedata()
729 xissue.url = url
731 xissue.pid = self.get_issue_pid(self.collection_id, year, volume_number, issue_number)
733 xissue.year = year
735 if volume_number is not None:
736 xissue.volume = regex.sub(r"[^a-zA-Z0-9-]+", "_", volume_number)
738 if issue_number is not None:
739 xissue.number = issue_number.replace(",", "-")
740 return xissue
742 def detect_language(self, text: str, article: ArticleData | None = None):
743 if article and article.lang is not None and article.lang != "und":
744 return article.lang
746 language = self.language_detector.detect_language_of(text)
748 if not language: 748 ↛ 749line 748 didn't jump to line 749 because the condition on line 748 was never true
749 return "und"
750 return language.iso_code_639_1.name.lower()
752 def create_trans_title(
753 self,
754 resource_type: str,
755 title_tex: str,
756 lang: str,
757 xresource_lang: str,
758 title_type: str = "main",
759 ):
760 tag = "trans-article" if resource_type == "article" else "issue-title"
762 ckeditor_data = build_jats_data_from_html_field(
763 title_tex,
764 tag=tag,
765 text_lang=lang,
766 resource_lang=xresource_lang,
767 delimiter_inline=self.delimiter_inline_formula,
768 delimiter_disp=self.delimiter_disp_formula,
769 )
771 titledata = create_titledata(
772 lang=lang,
773 type="main",
774 title_html=ckeditor_data["value_html"],
775 title_xml=ckeditor_data["value_xml"],
776 )
778 return titledata
780 references_mapping = {
781 "citation_title": get_article_title_xml,
782 "citation_journal_title": get_source_xml,
783 "citation_publication_date": get_year_xml,
784 "citation_firstpage": get_fpage_xml,
785 "citation_lastpage": get_lpage_xml,
786 }
788 @classmethod
789 def __parse_meta_citation_reference(cls, content: str, label=None):
790 categories = content.split(";")
792 if len(categories) == 1:
793 return JatsBase.bake_ref(content, label=label)
795 citation_data = [c.split("=") for c in categories if "=" in c]
796 del categories
798 xml_string = ""
799 authors_parsed = False
800 authors_strings = []
801 for data in citation_data:
802 key = data[0].strip()
803 citation_content = data[1]
804 if key == "citation_author":
805 authors_strings.append(get_author_xml(template_str=citation_content))
806 continue
807 elif not authors_parsed:
808 xml_string += ", ".join(authors_strings)
809 authors_parsed = True
811 if key in cls.references_mapping:
812 xml_string += " " + cls.references_mapping[key](citation_content)
814 return JatsBase.bake_ref(xml_string, label=label)
816 @classmethod
817 def get_or_create_source(cls):
818 source, created = Source.objects.get_or_create(
819 domain=cls.source_domain,
820 defaults={
821 "name": cls.source_name,
822 "website": cls.source_website,
823 },
824 )
825 if created: 825 ↛ 826line 825 didn't jump to line 826 because the condition on line 825 was never true
826 source.save()
827 return source
829 @staticmethod
830 def get_issue_pid(
831 collection_id: str,
832 year: str,
833 volume_number: str | None = None,
834 issue_number: str | None = None,
835 ):
836 # Replace any non-word character with an underscore
837 pid = f"{collection_id}_{year}"
838 if volume_number is not None:
839 pid += f"_{volume_number}"
840 if issue_number is not None:
841 pid += f"_{issue_number}"
842 pid = regex.sub(r"[^a-zA-Z0-9-]+", "_", cleanup_str(pid))
843 return pid
845 @staticmethod
846 def set_pages(article: ArticleData, pages: str, separator: str = "-"):
847 pages_split = pages.split(separator)
848 if len(pages_split) == 0: 848 ↛ 849line 848 didn't jump to line 849 because the condition on line 848 was never true
849 article.page_range = pages
850 if len(pages_split) > 0: 850 ↛ exitline 850 didn't return from function 'set_pages' because the condition on line 850 was always true
851 if pages[0].isnumeric(): 851 ↛ exitline 851 didn't return from function 'set_pages' because the condition on line 851 was always true
852 article.fpage = pages_split[0]
853 if (
854 len(pages_split) > 1
855 and pages_split[0] != pages_split[1]
856 and pages_split[1].isnumeric()
857 ):
858 article.lpage = pages_split[1]