version = CephIscsi.get_version(ctx, container_id)
if daemon_type == CephNvmeof.daemon_type:
version = CephNvmeof.get_version(ctx, container_id)
+ if daemon_type == SMB.daemon_type:
+ version = SMB.get_version(ctx, container_id)
elif not version:
if daemon_type in ceph_daemons():
out, err, code = call(ctx,
import json
import logging
import pathlib
+import re
import socket
from typing import List, Dict, Tuple, Optional, Any
from .. import data_utils
from .. import deployment_utils
from .. import file_utils
+from ..call_wrappers import call, CallVerbosity
from ..constants import DEFAULT_SMB_IMAGE
from ..container_daemon_form import ContainerDaemonForm, daemon_to_container
from ..container_engines import Podman
"""Provides a form for SMB containers."""
daemon_type = 'smb'
+ daemon_base = '/usr/sbin/smbd'
default_image = DEFAULT_SMB_IMAGE
@classmethod
self.smb_port = 445
logger.debug('Created SMB ContainerDaemonForm instance')
+ @staticmethod
+ def get_version(ctx: CephadmContext, container_id: str) -> Optional[str]:
+ version = None
+ out, _, ret = call(
+ ctx,
+ [
+ ctx.container_engine.path,
+ 'exec',
+ container_id,
+ SMB.daemon_base,
+ '-V',
+ ],
+ verbosity=CallVerbosity.QUIET,
+ )
+
+ if ret == 0:
+ match = re.search(r'Version\s*([\d.]+)', out)
+ if match:
+ version = match.group(1)
+ return version
+
def validate(self) -> None:
if self._instance_cfg is not None:
return