Coverage for nexusLIMS/exporters/destinations/cdcs.py: 100%

69 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2026-03-24 05:23 +0000

1"""CDCS export destination plugin. 

2 

3Exports NexusLIMS XML records to a CDCS (Configurable Data Curation System) 

4instance using the CDCS REST API. 

5""" 

6 

7from __future__ import annotations 

8 

9import logging 

10from http import HTTPStatus 

11from urllib.parse import urljoin 

12 

13from nexusLIMS.config import settings 

14from nexusLIMS.exporters.base import ExportContext, ExportResult 

15from nexusLIMS.utils.cdcs import AuthenticationError 

16from nexusLIMS.utils.network import nexus_req 

17 

18_logger = logging.getLogger(__name__) 

19 

20 

21class CDCSDestination: 

22 """CDCS export destination plugin. 

23 

24 Uploads NexusLIMS XML records to a CDCS instance and assigns them 

25 to the configured workspace. 

26 

27 Attributes 

28 ---------- 

29 name : str 

30 Destination identifier: "cdcs" 

31 priority : int 

32 Export priority: 100 (high priority, runs first) 

33 """ 

34 

35 name = "cdcs" 

36 priority = 100 

37 

38 @property 

39 def enabled(self) -> bool: 

40 """Check if CDCS is configured and enabled. 

41 

42 Returns 

43 ------- 

44 bool 

45 True if both NX_CDCS_TOKEN and NX_CDCS_URL are configured 

46 """ 

47 return ( 

48 hasattr(settings, "NX_CDCS_TOKEN") 

49 and hasattr(settings, "NX_CDCS_URL") 

50 and settings.NX_CDCS_TOKEN is not None 

51 and settings.NX_CDCS_URL is not None 

52 ) 

53 

54 def validate_config(self) -> tuple[bool, str | None]: # noqa: PLR0911 

55 """Validate CDCS configuration. 

56 

57 Tests: 

58 - NX_CDCS_TOKEN is configured 

59 - NX_CDCS_URL is configured 

60 - Can authenticate to CDCS API 

61 

62 Returns 

63 ------- 

64 tuple[bool, str | None] 

65 (is_valid, error_message) 

66 """ 

67 if not hasattr(settings, "NX_CDCS_TOKEN"): 

68 return False, "NX_CDCS_TOKEN not configured" 

69 if not settings.NX_CDCS_TOKEN: 

70 return False, "NX_CDCS_TOKEN is empty" 

71 if not hasattr(settings, "NX_CDCS_URL"): 

72 return False, "NX_CDCS_URL not configured" 

73 if not settings.NX_CDCS_URL: 

74 return False, "NX_CDCS_URL is empty" 

75 

76 # Test authentication by getting workspace ID 

77 try: 

78 self._get_workspace_id() 

79 except AuthenticationError as e: 

80 return False, f"CDCS authentication failed: {e}" 

81 except Exception as e: 

82 return False, f"CDCS configuration error: {e}" 

83 

84 return True, None 

85 

86 def export(self, context: ExportContext) -> ExportResult: 

87 """Export record to CDCS. 

88 

89 Reads the XML file, uploads it to CDCS, and assigns it to the 

90 configured workspace. Never raises exceptions - all errors are 

91 caught and returned as ExportResult with success=False. 

92 

93 Parameters 

94 ---------- 

95 context 

96 Export context with file path and session metadata 

97 

98 Returns 

99 ------- 

100 ExportResult 

101 Result of the export attempt 

102 """ 

103 try: 

104 # Read XML content 

105 with context.xml_file_path.open(encoding="utf-8") as f: 

106 xml_content = f.read() 

107 

108 # Upload to CDCS 

109 title = context.xml_file_path.stem 

110 record_id, record_url = self._upload_to_cdcs(xml_content, title) 

111 

112 return ExportResult( 

113 success=True, 

114 destination_name=self.name, 

115 record_id=str(record_id), 

116 record_url=record_url, 

117 ) 

118 

119 except Exception as e: 

120 _logger.exception( 

121 "Failed to export to CDCS: %s", 

122 context.xml_file_path.name, 

123 ) 

124 return ExportResult( 

125 success=False, 

126 destination_name=self.name, 

127 error_message=str(e), 

128 ) 

129 

130 def _upload_to_cdcs(self, xml_content: str, title: str) -> tuple[int, str]: 

131 """Upload XML to CDCS and return (record_id, record_url). 

132 

133 Parameters 

134 ---------- 

135 xml_content 

136 XML content to upload 

137 title 

138 Title for the record 

139 

140 Returns 

141 ------- 

142 tuple[int, str] 

143 (record_id, record_url) 

144 

145 Raises 

146 ------ 

147 RuntimeError 

148 If upload fails 

149 """ 

150 endpoint = urljoin(str(settings.NX_CDCS_URL), "rest/data/") 

151 

152 payload = { 

153 "template": self._get_template_id(), 

154 "title": title, 

155 "xml_content": xml_content, 

156 } 

157 

158 post_r = nexus_req( 

159 endpoint, "POST", json=payload, token_auth=settings.NX_CDCS_TOKEN 

160 ) 

161 

162 if post_r.status_code != HTTPStatus.CREATED: 

163 msg = f"CDCS upload failed: {post_r.text}" 

164 raise RuntimeError(msg) 

165 

166 record_id = post_r.json()["id"] 

167 

168 # Assign to workspace 

169 wrk_endpoint = urljoin( 

170 str(settings.NX_CDCS_URL), 

171 f"rest/data/{record_id}/assign/{self._get_workspace_id()}", 

172 ) 

173 _ = nexus_req(wrk_endpoint, "PATCH", token_auth=settings.NX_CDCS_TOKEN) 

174 

175 record_url = urljoin(str(settings.NX_CDCS_URL), f"data?id={record_id}") 

176 _logger.info('Record "%s" available at %s', title, record_url) 

177 

178 return record_id, record_url 

179 

180 def _get_template_id(self) -> str: 

181 """Get current template ID from CDCS. 

182 

183 Returns 

184 ------- 

185 str 

186 Template ID 

187 

188 Raises 

189 ------ 

190 AuthenticationError 

191 If authentication fails 

192 """ 

193 endpoint = urljoin( 

194 str(settings.NX_CDCS_URL), "rest/template-version-manager/global/" 

195 ) 

196 r = nexus_req(endpoint, "GET", token_auth=settings.NX_CDCS_TOKEN) 

197 

198 if r.status_code in (HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN): 

199 msg = "Could not authenticate to CDCS" 

200 raise AuthenticationError(msg) 

201 

202 return r.json()[0]["current"] 

203 

204 def _get_workspace_id(self) -> int | None: 

205 """Get workspace ID from CDCS. 

206 

207 Returns 

208 ------- 

209 int or None 

210 Workspace ID, or ``None`` if no workspaces are available (which 

211 still means authentication succeeded). 

212 

213 Raises 

214 ------ 

215 AuthenticationError 

216 If authentication fails 

217 """ 

218 endpoint = urljoin(str(settings.NX_CDCS_URL), "rest/workspace/read_access/") 

219 r = nexus_req(endpoint, "GET", token_auth=settings.NX_CDCS_TOKEN) 

220 

221 if r.status_code in (HTTPStatus.UNAUTHORIZED, HTTPStatus.FORBIDDEN): 

222 msg = "Could not authenticate to CDCS" 

223 raise AuthenticationError(msg) 

224 

225 workspaces = r.json() 

226 return workspaces[0]["id"] if workspaces else None