def from_dict(cls, fsal_dict: Dict[str, Any]) -> 'FSAL':
if fsal_dict.get('name') == 'CEPH':
return CephFSFSAL.from_dict(fsal_dict)
+ if fsal_dict.get('name') == 'RGW':
+ return RGWFSAL.from_dict(fsal_dict)
raise NFSInvalidOperation(f'Unknown FSAL {fsal_dict.get("name")}')
@classmethod
def from_fsal_block(cls, fsal_block: Dict[str, Any]) -> 'FSAL':
if fsal_block.get('name') == 'CEPH':
return CephFSFSAL.from_fsal_block(fsal_block)
+ if fsal_block.get('name') == 'RGW':
+ return RGWFSAL.from_fsal_block(fsal_block)
raise NFSInvalidOperation(f'Unknown FSAL {fsal_block.get("name")}')
def to_fsal_block(self) -> Dict[str, Any]:
return r
+class RGWFSAL(FSAL):
+ def __init__(self,
+ name: str,
+ user_id: Optional[str] = None,
+ access_key_id: Optional[str] = None,
+ secret_access_key: Optional[str] = None
+ ):
+ super().__init__(name)
+ assert name == 'RGW'
+ self.user_id = user_id
+ self.access_key_id = access_key_id
+ self.secret_access_key = secret_access_key
+
+ @classmethod
+ def from_fsal_block(cls, fsal_block: Dict[str, str]) -> 'RGWFSAL':
+ return cls(fsal_block['name'],
+ fsal_block.get('user_id'),
+ fsal_block.get('access_key'),
+ fsal_block.get('secret_access_key'))
+
+ def to_fsal_block(self) -> Dict[str, str]:
+ result = {
+ 'block_name': 'FSAL',
+ 'name': self.name,
+ }
+ if self.user_id:
+ result['user_id'] = self.user_id
+ if self.fs_name:
+ result['access_key_id'] = self.access_key_id
+ if self.secret_access_key:
+ result['secret_access_key'] = self.secret_access_key
+ return result
+
+ @classmethod
+ def from_dict(cls, fsal_dict: Dict[str, str]) -> 'RGWFSAL':
+ return cls(fsal_dict['name'],
+ fsal_dict.get('user_id'),
+ fsal_dict.get('access_key_id'),
+ fsal_dict.get('secret_access_key'))
+
+ def to_dict(self) -> Dict[str, str]:
+ r = {'name': self.name}
+ if self.user_id:
+ r['user_id'] = self.user_id
+ if self.access_key_id:
+ r['access_key_id'] = self.access_key_id
+ if self.secret_access_key:
+ r['secret_access_key'] = self.secret_access_key
+ return r
+
+
class Client:
def __init__(self,
addresses: List[str],
fs = cast(CephFSFSAL, self.fsal)
if not check_fs(mgr, fs.fs_name):
raise FSNotFound(fs.fs_name)
+ elif self.fsal.name == 'RGW':
+ rgw = cast(RGWFSAL, self.fsal)
+ pass
else:
raise NFSInvalidOperation('FSAL {self.fsal.name} not supported')