import logging
from typing import List, Any, Tuple, Dict, cast, Optional
-import os
-import base64
+from copy import copy
from orchestrator import DaemonDescription
-from ceph.deployment.service_spec import OAuth2ProxySpec
+from ceph.deployment.service_spec import OAuth2ProxySpec, MgmtGatewaySpec
from cephadm.services.cephadmservice import CephadmService, CephadmDaemonDeploySpec
from .service_registry import register_cephadm_service
def get_service_ips_and_hosts(self, service_name: str) -> List[str]:
entries = set()
+ mgmt_gw_spec = cast(MgmtGatewaySpec, self.mgr.spec_store['mgmt-gateway'].spec)
+ if mgmt_gw_spec.virtual_ip is not None:
+ entries.add(mgmt_gw_spec.virtual_ip)
for dd in self.mgr.cache.get_daemons_by_service(service_name):
assert dd.hostname is not None
addr = dd.ip if dd.ip else self.mgr.inventory.get_addr(dd.hostname)
def get_certificates(self, svc_spec: OAuth2ProxySpec, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[str, str]:
cert = self.mgr.cert_mgr.get_cert('oauth2_proxy_cert')
key = self.mgr.cert_mgr.get_key('oauth2_proxy_key')
+ user_made = False
if not (cert and key):
# not available on store, check if provided on the spec
if svc_spec.ssl_certificate and svc_spec.ssl_certificate_key:
+ user_made = True
cert = svc_spec.ssl_certificate
key = svc_spec.ssl_certificate_key
else:
cert, key = self.mgr.cert_mgr.generate_cert(host_fqdn, addr)
# save certificates
if cert and key:
- self.mgr.cert_mgr.save_cert('oauth2_proxy_cert', cert)
- self.mgr.cert_mgr.save_key('oauth2_proxy_key', key)
+ self.mgr.cert_mgr.save_cert('oauth2_proxy_cert', cert, user_made=user_made)
+ self.mgr.cert_mgr.save_key('oauth2_proxy_key', key, user_made=user_made)
else:
logger.error("Failed to obtain certificate and key from mgmt-gateway.")
return cert, key
- def generate_random_secret(self) -> str:
- random_bytes = os.urandom(32)
- base64_secret = base64.urlsafe_b64encode(random_bytes).rstrip(b'=').decode('utf-8')
- return base64_secret
-
def generate_config(self, daemon_spec: CephadmDaemonDeploySpec) -> Tuple[Dict[str, Any], List[str]]:
assert self.TYPE == daemon_spec.daemon_type
svc_spec = cast(OAuth2ProxySpec, self.mgr.spec_store[daemon_spec.service_name].spec)
- allowlist_domains = svc_spec.allowlist_domains or []
+ allowlist_domains = copy(svc_spec.allowlist_domains) or []
allowlist_domains += self.get_service_ips_and_hosts('mgmt-gateway')
context = {
'spec': svc_spec,
- 'cookie_secret': svc_spec.cookie_secret or self.generate_random_secret(),
+ 'cookie_secret': svc_spec.cookie_secret,
'allowlist_domains': allowlist_domains,
'redirect_url': svc_spec.redirect_url or self.get_redirect_url()
}
enable_auth=True,
virtual_ip=virtual_ip)
+ allowed_domain = '192.168.100.1:8080'
oauth2_spec = OAuth2ProxySpec(provider_display_name='my_idp_provider',
client_id='my_client_id',
client_secret='my_client_secret',
oidc_issuer_url='http://192.168.10.10:8888/dex',
cookie_secret='kbAEM9opAmuHskQvt0AW8oeJRaOM2BYy5Loba0kZ0SQ=',
ssl_certificate=ceph_generated_cert,
- ssl_certificate_key=ceph_generated_key)
+ ssl_certificate_key=ceph_generated_key,
+ allowlist_domains=[allowed_domain])
+ whitelist_domains = f"{allowed_domain},1::4,ceph-node" if virtual_ip is None else f"{allowed_domain},{virtual_ip},1::4,ceph-node"
redirect_url = f"https://{virtual_ip if virtual_ip else 'host_fqdn'}:5555/oauth2/callback"
expected = {
"fsid": "fsid",
# Secret value for encrypting cookies.
cookie_secret= "kbAEM9opAmuHskQvt0AW8oeJRaOM2BYy5Loba0kZ0SQ="
email_domains= "*"
- whitelist_domains= "1::4,ceph-node\""""),
+ whitelist_domains= "{whitelist_domains}\""""),
"oauth2-proxy.crt": f"{ceph_generated_cert}",
"oauth2-proxy.key": f"{ceph_generated_key}",
}
self.redirect_url = redirect_url
#: The secret key used for signing cookies. Its length must be 16,
# 24, or 32 bytes to create an AES cipher.
- self.cookie_secret = cookie_secret
+ self.cookie_secret = cookie_secret or self.generate_random_secret()
#: The multi-line SSL certificate for encrypting communications.
self.ssl_certificate = ssl_certificate
#: The multi-line SSL certificate private key for decrypting communications.
self.allowlist_domains = allowlist_domains
self.unmanaged = unmanaged
+ def generate_random_secret(self) -> str:
+ import base64
+ random_bytes = os.urandom(32)
+ base64_secret = base64.urlsafe_b64encode(random_bytes).decode('utf-8')
+ return base64_secret
+
def get_port_start(self) -> List[int]:
ports = [4180]
return ports