Coverage for src / crawler / cmds / augment / augment_cmd.py: 20%

108 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-05-21 12:58 +0000

1import logging 

2from concurrent.futures import Future, ThreadPoolExecutor 

3from typing import TYPE_CHECKING, NotRequired, TypedDict 

4 

5from history.model_data import HistoryChildDict, MatchingEventResult 

6from history.models import HistoryEventStatus 

7from history.utils import insert_history_event 

8from matching_back.backends import get_backends 

9from matching_back.backends.matching_backend import MatchingBackend 

10from ptf import model_helpers 

11from ptf.cmds.base_cmds import baseCmd 

12from ptf.models import Article, Collection 

13 

14from crawler.cmds.augment import update_article_with_xarticle 

15from crawler.models.source import Source 

16 

17if TYPE_CHECKING: 

18 from collections.abc import Callable 

19 

20 from ptf.models import ArticleQuerySet 

21 

22 class AugmentArticlesParams(TypedDict): 

23 backend: MatchingBackend 

24 source_id: str 

25 collection_pid: NotRequired[str] 

26 issue_pids: NotRequired[list[str]] 

27 update_pdf_link: NotRequired[bool] 

28 

29 from typing import NotRequired, TypedDict 

30 

31 class MatchingOperationMessage(TypedDict): 

32 pid: str 

33 zbl_id: "NotRequired[str]" 

34 score: "NotRequired[int]" 

35 status: HistoryEventStatus 

36 message: "NotRequired[str]" 

37 status_message: "NotRequired[str]" 

38 

39 

40_logger = logging.getLogger(__name__) 

41 

42 

43def compile_history_events(event_messages: "list[MatchingOperationMessage]"): 

44 children = [] 

45 for msg in event_messages: 

46 resource = model_helpers.get_resource(msg["pid"]) 

47 if not resource: 

48 raise ValueError("Resource not found") 

49 data: "HistoryChildDict" = { 

50 "resource": resource, 

51 "type": "zbl-id", 

52 "status": msg.get("status"), 

53 } 

54 

55 if "zbl_id" in msg and msg["zbl_id"] != "": 

56 data["url"] = f"https://zbmath.org/{msg['zbl_id']}" 

57 

58 if "score" in msg: 

59 data["score"] = msg["score"] 

60 

61 if "message" in msg: 

62 data["message"] = msg["message"] 

63 

64 if "status_message" in msg: 

65 data["status_message"] = msg["status_message"] 

66 

67 children.append(data) 

68 return children 

69 

70 

71class AugmentArticlesCmd(baseCmd[None]): 

72 """ 

73 Metadata augment command. 

74 Calls the desired Matching Backend defined in ptf-back. 

75 """ 

76 

77 required_params = ["backend", "source_id"] 

78 

79 backend: str 

80 source_id: str 

81 collection_pid: str | None = None 

82 issue_pids: list[str] = [] 

83 update_pdf_link: bool = False 

84 collection: "Collection | None" = None 

85 queryset: "ArticleQuerySet" 

86 event_messages: list[dict] 

87 callback: "Callable" = lambda _: _ 

88 

89 def __init__(self, params: "AugmentArticlesParams"): 

90 super().__init__(params) 

91 self.backends = get_backends(self.backend) 

92 self.event_messages = [] 

93 self.queryset = Article.objects.all() 

94 

95 if self.collection_pid: 

96 self.queryset = self.queryset.filter( 

97 my_container__my_collection__pid=self.collection_pid 

98 ) 

99 self.collection = Collection.objects.get(pid=self.collection_pid) 

100 

101 if self.source_id: 

102 source = Source.objects.get(domain=self.source_id) 

103 self.queryset = self.queryset.filter(my_container__origin__source=source) 

104 

105 if self.issue_pids: 

106 self.queryset = self.queryset.filter(my_container__pid__in=self.issue_pids) 

107 

108 def internal_do(self) -> None: 

109 for self.backend in self.backends: 

110 _logger.info( 

111 f"Start augment [{self.backend.name}] " 

112 f"source={self.source_id} collection={self.collection_pid} " 

113 f"({self.queryset.count()} articles)" 

114 ) 

115 promises: "set[Future]" = set() 

116 with ThreadPoolExecutor(max_workers=1) as executor: 

117 for article in self.queryset.iterator(chunk_size=2000): 

118 xarticle, event = self._find_article(article) 

119 

120 if event: 

121 self.event_messages.append(event) 

122 

123 if not xarticle: 

124 self.callback() 

125 continue 

126 

127 # Handle asyncronous results and exceptions as soon as possible 

128 completed: "set[Future]" = set() 

129 for f in promises: 

130 if f.done(): 

131 completed.add(f) 

132 exc = f.exception() 

133 if exc: 

134 executor.shutdown(wait=False, cancel_futures=True) 

135 raise exc 

136 promises -= completed 

137 # Runs the database insertion in parallel 

138 promise = executor.submit( 

139 update_article_with_xarticle, 

140 article, 

141 xarticle, 

142 merge_titles=False, 

143 update_pdf_link=self.update_pdf_link, 

144 ) 

145 promise.add_done_callback(self.callback) 

146 promises.add(promise) 

147 _logger.info(f"Augment [{self.backend.name}] over") 

148 

149 def _find_article(self, article: Article) -> tuple: 

150 """ 

151 Calls ArticleData from backend. 

152 """ 

153 # With external ID 

154 xarticle = self.backend.find_by_extids(article=article) 

155 if xarticle: 

156 _logger.debug(f"[{self.backend.name}] {article.pid}: found via External ID") 

157 return xarticle, self._make_event(article, MatchingEventResult.ALREADY_PRESENT) 

158 

159 # Matching fuzzy 

160 xarticle = self.backend.find_by_matching(article) 

161 if xarticle: 

162 _logger.debug(f"[{self.backend.name}] {article.pid}: found via Matching") 

163 return xarticle, self._make_event(article, MatchingEventResult.ADDED) 

164 

165 _logger.info( 

166 f"[{self.backend.name}] {article.pid}: not found with External ID nor Matching" 

167 ) 

168 return None, self._make_event( 

169 article, MatchingEventResult.NOT_FOUND, status=HistoryEventStatus.WARNING 

170 ) 

171 

172 def _make_event( 

173 self, 

174 article: Article, 

175 status_message: str, 

176 status: HistoryEventStatus = HistoryEventStatus.OK, 

177 ) -> dict: 

178 return { 

179 "pid": article.pid, 

180 "status": status, 

181 "status_message": status_message, 

182 "backend": self.backend.name, 

183 } 

184 

185 def insert_history_event(self): 

186 children = [] 

187 for msg in self.event_messages: 

188 resource = model_helpers.get_resource(msg["pid"]) 

189 if not resource: 

190 raise ValueError(f"Resource not found: {msg['pid']}") 

191 data: "HistoryChildDict" = { 

192 "resource": resource, 

193 "type": msg.get("backend", self.backend.name), 

194 "status": msg.get("status"), 

195 } 

196 if "score" in msg: 

197 data["score"] = msg["score"] 

198 if "message" in msg: 

199 data["message"] = msg["message"] 

200 if "status_message" in msg: 

201 data["status_message"] = msg["status_message"] 

202 children.append(data) 

203 

204 insert_history_event( 

205 { 

206 "pid": f"{self.source_id}_{self.collection_pid or ''}_{self.backend.name}_matching", 

207 "col": self.collection, 

208 "source": self.source_id, 

209 "status": HistoryEventStatus.OK, 

210 "type": "matching", 

211 "children": children, 

212 } 

213 )