ClusterEntry,
JoinAuthEntry,
ShareEntry,
+ TLSCredentialEntry,
UsersAndGroupsEntry,
)
from .proto import (
)
from .resources import SMBResource
from .results import ErrorResult, Result, ResultGroup
-from .staging import Staging, auth_refs, cross_check_resource, ug_refs
+from .staging import (
+ Staging,
+ auth_refs,
+ cross_check_resource,
+ tls_refs,
+ ug_refs,
+)
from .utils import checked, ynbool
ClusterRef = Union[resources.Cluster, resources.RemovedCluster]
_DOMAIN = 'domain'
_CLUSTERED = 'clustered'
_CEPHFS_PROXY = 'cephfs-proxy'
+_REMOTE_CONTROL = 'remote-control'
log = logging.getLogger(__name__)
shares: List[resources.Share],
join_auths: List[resources.JoinAuth],
users_and_groups: List[resources.UsersAndGroups],
+ tls_credentials: List[resources.TLSCredential],
):
self.cluster = cluster
self.shares = shares
self.join_auths = join_auths
self.users_and_groups = users_and_groups
+ self.tls_credentials = tls_credentials
# a cache for modified entries
self.cache = config_store.EntryCache()
self._users_and_groups_entry(_id).get_users_and_groups()
for _id in ug_refs(cluster)
],
+ [
+ TLSCredentialEntry.from_store(
+ self.internal_store, _id
+ ).get_tls_credential()
+ for _id in tls_refs(cluster)
+ ],
)
change_groups.append(change_group)
for change_group in change_groups:
chg_cluster_ids: Set[str] = set()
chg_join_ids: Set[str] = set()
chg_ug_ids: Set[str] = set()
+ chg_tls_ids: Set[str] = set()
for result in updated:
state = (result.status or {}).get('state', None)
if state in (State.PRESENT, State.NOT_PRESENT):
chg_join_ids.add(result.src.auth_id)
elif isinstance(result.src, resources.UsersAndGroups):
chg_ug_ids.add(result.src.users_groups_id)
+ elif isinstance(result.src, resources.TLSCredential):
+ chg_tls_ids.add(result.src.tls_credential_id)
# TODO: here's a lazy bit. if any join auths or users/groups changed we
# will regen all clusters because these can be shared by >1 cluster.
# In future, make this only pick clusters using the named resources.
- if chg_join_ids or chg_ug_ids:
+ if chg_join_ids or chg_ug_ids or chg_tls_ids:
chg_cluster_ids.update(ClusterEntry.ids(self.internal_store))
return chg_cluster_ids
)
_save_pending_join_auths(self.priv_store, change_group)
_save_pending_users_and_groups(self.priv_store, change_group)
+ _save_pending_tls_credentials(self.priv_store, change_group)
_save_pending_config(
self.public_store,
change_group,
change_group.cache, cluster.cluster_id
)
]
+ tls_credential_entries = {
+ tc.tls_credential_id: change_group.cache[
+ external.tls_credential_key(
+ cluster.cluster_id,
+ tc.tls_credential_id,
+ checked(tc.credential_type),
+ )
+ ]
+ for tc in change_group.tls_credentials
+ }
smb_spec = _generate_smb_service_spec(
cluster,
config_entries=config_entries,
join_source_entries=join_source_entries,
user_source_entries=user_source_entries,
+ tls_credential_entries=tls_credential_entries,
data_entity=data_entity,
needs_proxy=_has_proxied_vfs(change_group),
)
config_entries: List[ConfigEntry],
join_source_entries: List[ConfigEntry],
user_source_entries: List[ConfigEntry],
+ tls_credential_entries: Dict[str, ConfigEntry],
data_entity: str = '',
needs_proxy: bool = False,
) -> SMBSpec:
features.append(_CLUSTERED)
if needs_proxy:
features.append(_CEPHFS_PROXY)
+ if cluster.remote_control_is_enabled:
+ features.append(_REMOTE_CONTROL)
# only one config uri can be used, the input list should be
# ordered from lowest to highest priority and the highest priority
# item that exists in the store will be used.
user_entities: Optional[List[str]] = None
if data_entity:
user_entities = [data_entity]
+ rc_cert = rc_key = rc_ca_cert = None
+ if cluster.remote_control_is_enabled:
+ assert cluster.remote_control
+ rc_cert = _tls_uri(
+ cluster.remote_control.cert, tls_credential_entries
+ )
+ rc_key = _tls_uri(cluster.remote_control.key, tls_credential_entries)
+ rc_ca_cert = _tls_uri(
+ cluster.remote_control.ca_cert, tls_credential_entries
+ )
return SMBSpec(
service_id=cluster.cluster_id,
placement=cluster.placement,
cluster_public_addrs=cluster.service_spec_public_addrs(),
custom_ports=cluster.custom_ports,
bind_addrs=cluster.service_spec_bind_addrs(),
+ remote_control_ssl_cert=rc_cert,
+ remote_control_ssl_key=rc_key,
+ remote_control_ca_cert=rc_ca_cert,
)
change_group.cache_updated_entry(ugentry)
+def _save_pending_tls_credentials(
+ store: ConfigStore,
+ change_group: ClusterChangeGroup,
+) -> None:
+ cluster = change_group.cluster
+ assert isinstance(cluster, resources.Cluster)
+ refs = tls_refs(cluster)
+ tls_creds = {t.tls_credential_id: t for t in change_group.tls_credentials}
+ for ref in refs:
+ tc = tls_creds[ref]
+ ext_key = external.tls_credential_key(
+ cluster.cluster_id, ref, str(tc.credential_type)
+ )
+ tc_entry = store[ext_key]
+ if hasattr(tc_entry, 'set_data'):
+ tc_entry.set_data(tc.value)
+ else:
+ raise ValueError('store does not support raw entries')
+ change_group.cache_updated_entry(tc_entry)
+
+
def _save_pending_config(
store: ConfigStore,
change_group: ClusterChangeGroup,
def _smb_port(cluster: resources.Cluster, default: int = 445) -> int:
return (cluster.custom_ports or {}).get("smb", default)
+
+
+def _tls_uri(
+ src: Optional[resources.TLSSource],
+ tls_credential_entries: Dict[str, ConfigEntry],
+) -> Optional[str]:
+ if src is None:
+ return None
+ uri = tls_credential_entries[src.ref].uri
+ return f'URI:{uri}'