Fixes: https://tracker.ceph.com/issues/46542
Signed-off-by: Avan Thakkar <athakkar@redhat.com>
if use_ssl:
# SSL initialization
- cert = self.get_store("crt") # type: ignore
+ cert = self.get_localized_store("crt") # type: ignore
if cert is not None:
self.cert_tmp = tempfile.NamedTemporaryFile()
self.cert_tmp.write(cert.encode('utf-8'))
else:
cert_fname = self.get_localized_module_option('crt_file') # type: ignore
- pkey = self.get_store("key") # type: ignore
+ pkey = self.get_localized_store("key") # type: ignore
if pkey is not None:
self.pkey_tmp = tempfile.NamedTemporaryFile()
self.pkey_tmp.write(pkey.encode('utf-8'))
"""
return self._ceph_get_store(key)
+ def get_localized_store(self, key: str, default: Optional[str] = None) -> Optional[str]:
+ r = self._ceph_get_store(_get_localized_key(self.get_mgr_id(), key))
+ if r is None:
+ r = self._ceph_get_store(key)
+ if r is None:
+ r = default
+ return r
+
def get_active_uri(self) -> str:
return self._ceph_get_active_uri()