from ceph.deployment.drive_selection import DriveSelection
import orchestrator
+from cephadm.utils import forall_hosts
from orchestrator import OrchestratorError
from mgr_module import MonCommandFailed
def create_from_spec(self, drive_group: DriveGroupSpec) -> str:
logger.debug(f"Processing DriveGroup {drive_group}")
- ret = []
osd_id_claims = self.find_destroyed_osds()
logger.info(f"Found osd claims for drivegroup {drive_group.service_id} -> {osd_id_claims}")
- for host, drive_selection in self.prepare_drivegroup(drive_group):
+
+ @forall_hosts
+ def create_from_spec_one(host: str, drive_selection: DriveSelection) -> Optional[str]:
logger.info('Applying %s on host %s...' % (drive_group.service_id, host))
cmd = self.driveselection_to_ceph_volume(drive_selection,
osd_id_claims.get(host, []))
if not cmd:
logger.debug("No data_devices, skipping DriveGroup: {}".format(drive_group.service_id))
- continue
+ return None
env_vars: List[str] = [f"CEPH_VOLUME_OSDSPEC_AFFINITY={drive_group.service_id}"]
ret_msg = self.create_single_host(
host, cmd, replace_osd_ids=osd_id_claims.get(host, []), env_vars=env_vars
)
- ret.append(ret_msg)
- return ", ".join(ret)
+ return ret_msg
+
+ ret = create_from_spec_one(self.prepare_drivegroup(drive_group))
+ return ", ".join(filter(None, ret))
def create_single_host(self, host: str, cmd: str, replace_osd_ids=None, env_vars: Optional[List[str]] = None) -> str:
out, err, code = self._run_ceph_volume_command(host, cmd, env_vars=env_vars)