return result
+ def _get_pool_params(self) -> Tuple[str, str]:
+ num_replicas = self.get_ceph_option('osd_pool_default_size')
+ assert type(num_replicas) is int
+
+ leaf_type_id = self.get_ceph_option('osd_crush_chooseleaf_type')
+ assert type(leaf_type_id) is int
+ crush = self.get('osd_map_crush')
+ leaf_type = 'host'
+ for t in crush['types']:
+ if t['type_id'] == leaf_type_id:
+ leaf_type = t['name']
+ break
+ return num_replicas, leaf_type
+
@handle_orch_error
def remove_service(self, service_name: str) -> str:
service_type, service_name = service_name.split('.', 1)
@handle_orch_error
def apply_rgw(self, spec):
# type: (RGWSpec) -> str
- num_replicas = self.get_ceph_option('osd_pool_default_size')
- assert type(num_replicas) is int
- leaf_type_id = self.get_ceph_option('osd_crush_chooseleaf_type')
- assert type(leaf_type_id) is int
- crush = self.get('osd_map_crush')
- leaf_type = 'host'
- for t in crush['types']:
- if t['type_id'] == leaf_type_id:
- leaf_type = t['name']
- break
+ num_replicas, leaf_type = self._get_pool_params()
return self.rook_cluster.apply_objectstore(spec, num_replicas, leaf_type)
@handle_orch_error