def for_daemon_type(cls, daemon_type: str) -> bool:
return cls.daemon_type == daemon_type
- def __init__(self,
- ctx,
- fsid,
- daemon_id,
- config_json,
- image=DEFAULT_IMAGE):
+ def __init__(
+ self, ctx, fsid, daemon_id, config_json, image=DEFAULT_IMAGE
+ ):
# type: (CephadmContext, str, Union[int, str], Dict, str) -> None
self.ctx = ctx
self.fsid = fsid
return cls(ctx, fsid, daemon_id, fetch_configs(ctx), ctx.image)
@classmethod
- def create(cls, ctx: CephadmContext, ident: DaemonIdentity) -> 'NFSGanesha':
+ def create(
+ cls, ctx: CephadmContext, ident: DaemonIdentity
+ ) -> 'NFSGanesha':
return cls.init(ctx, ident.fsid, ident.daemon_id)
@property
if self.rgw:
cluster = self.rgw.get('cluster', 'ceph')
rgw_user = self.rgw.get('user', 'admin')
- mounts[os.path.join(data_dir, 'keyring.rgw')] = \
- '/var/lib/ceph/radosgw/%s-%s/keyring:z' % (cluster, rgw_user)
+ mounts[
+ os.path.join(data_dir, 'keyring.rgw')
+ ] = '/var/lib/ceph/radosgw/%s-%s/keyring:z' % (cluster, rgw_user)
return mounts
def customize_container_mounts(
@staticmethod
def get_container_envs():
# type: () -> List[str]
- envs = [
- 'CEPH_CONF=%s' % (CEPH_DEFAULT_CONF)
- ]
+ envs = ['CEPH_CONF=%s' % (CEPH_DEFAULT_CONF)]
return envs
@staticmethod
def get_version(ctx, container_id):
# type: (CephadmContext, str) -> Optional[str]
version = None
- out, err, code = call(ctx,
- [ctx.container_engine.path, 'exec', container_id,
- NFSGanesha.entrypoint, '-v'],
- verbosity=CallVerbosity.QUIET)
+ out, err, code = call(
+ ctx,
+ [
+ ctx.container_engine.path,
+ 'exec',
+ container_id,
+ NFSGanesha.entrypoint,
+ '-v',
+ ],
+ verbosity=CallVerbosity.QUIET,
+ )
if code == 0:
match = re.search(r'NFS-Ganesha Release\s*=\s*[V]*([\d.]+)', out)
if match:
if self.required_files:
for fname in self.required_files:
if fname not in self.files:
- raise Error('required file missing from config-json: %s' % fname)
+ raise Error(
+ 'required file missing from config-json: %s' % fname
+ )
# check for an RGW config
if self.rgw: