# TODO: RGW, NFS
raise NotImplementedError(service_type)
+ def remove_stateless_service(self, service_type, service_id):
+ return RookWriteCompletion(
+ lambda: self.rook_cluster.rm_service(service_type, service_id), None,
+ "Removing {0} services for {1}".format(service_type, service_id))
+
def create_osds(self, spec):
# Validate spec.node
if not self.rook_cluster.node_exists(spec.node):
def rook_api_get(self, path, **kwargs):
return self.rook_api_call("GET", path, **kwargs)
+ def rook_api_delete(self, path):
+ return self.rook_api_call("DELETE", path)
+
def rook_api_patch(self, path, **kwargs):
return self.rook_api_call("PATCH", path,
header_params={"Content-Type": "application/json-patch+json"},
else:
raise
+ def rm_service(self, service_type, service_id):
+ assert service_type in ("mds", "rgw")
+
+ if service_type == "mds":
+ rooktype = "filesystems"
+ elif service_type == "rgw":
+ rooktype = "objectstores"
+
+ objpath = "{0}/{1}".format(rooktype, service_id)
+
+ try:
+ self.rook_api_delete(objpath)
+ except ApiException as e:
+ if e.status == 404:
+ log.info("{0} service '{1}' does not exist".format(service_type, service_id))
+ # Idempotent, succeed.
+ else:
+ raise
+
def can_create_osd(self):
current_cluster = self.rook_api_get(
"clusters/{0}".format(self.cluster_name))