-from typing import Any, Callable, Iterator, Tuple
+from typing import Any, Callable, Iterator, Tuple, no_type_check
import contextlib
import errno
import functools
import object_format
-from mgr_module import CLICommand
+from mgr_module import CLICommandBase
from . import resourcelib
from .proto import Self
return self._func(*args, **kwargs)
-class SMBCommand:
+class SMBCLICommandBase(CLICommandBase):
"""A combined decorator and descriptor. Sets up the common parts of the
CLICommand and object formatter.
As a descriptor, it returns objects that can be called and wrap the
Example:
>>> class Example:
- ... @SMBCommand('share foo', perm='r')
+ ... @SMBCLICommand('share foo', perm='r')
... def foo(self):
... return {'test': 1}
...
"""
def __init__(self, name: str, perm: str) -> None:
- self._name = name
- self._perm = perm
+ super().__init__(f"smb {name}", perm)
+ @no_type_check
def __call__(self, func: Callable) -> Self:
self._func = func
- cc = CLICommand(f'smb {self._name}', perm=self._perm)
# the smb module assumes that it will always be used with python
# versions sufficiently new enough to always use ordered dicts
# (builtin). We dont want the json/yaml sorted by keys losing our
sort_json=False,
sort_yaml=False,
)
+
rsp = object_format.Responder(_fmt)
ewrap = error_wrapper()
- self._command = cc(rsp(ewrap(func)))
+ self._command = super().__call__(rsp(ewrap(func)))
+
return self
+ @no_type_check
def __get__(self, obj: Any, objtype: Any = None) -> _cmdlet:
return _cmdlet(
self._func.__get__(obj, objtype),
)
+SMBCLICommand = SMBCLICommandBase.make_registry_subtype("SMBCLICommand")
+
+
class InvalidInputValue(object_format.ErrorResponseBase):
def format_response(self) -> Tuple[int, str, str]:
return -errno.EINVAL, "", str(self)
sqlite_store,
utils,
)
+from .cli import SMBCLICommand
from .enums import (
AuthMode,
InputPasswordFilter,
class Module(orchestrator.OrchestratorClientMixin, MgrModule):
+ CLICommand = SMBCLICommand
MODULE_OPTIONS: List[Option] = [
Option(
'update_orchestration',
all_results = all_results.convert_results(out_op)
return all_results
- @cli.SMBCommand('apply', perm='rw')
+ @SMBCLICommand('apply', perm='rw')
def apply_resources(
self,
inbuf: str,
[results.InvalidResourceResult(err.resource_data, str(err))]
)
- @cli.SMBCommand('cluster ls', perm='r')
+ @SMBCLICommand('cluster ls', perm='r')
def cluster_ls(self) -> List[str]:
"""List smb clusters by ID"""
return [cid for cid in self._handler.cluster_ids()]
- @cli.SMBCommand('cluster create', perm='rw')
+ @SMBCLICommand('cluster create', perm='rw')
def cluster_create(
self,
cluster_id: str,
password_filter_out=password_filter_out,
).squash(cluster)
- @cli.SMBCommand('cluster rm', perm='rw')
+ @SMBCLICommand('cluster rm', perm='rw')
def cluster_rm(
self,
cluster_id: str,
[cluster], password_filter_out=password_filter
).one()
- @cli.SMBCommand('share ls', perm='r')
+ @SMBCLICommand('share ls', perm='r')
def share_ls(self, cluster_id: str) -> List[str]:
"""List smb shares in a cluster by ID"""
return [
if cid == cluster_id
]
- @cli.SMBCommand('share create', perm='rw')
+ @SMBCLICommand('share create', perm='rw')
def share_create(
self,
cluster_id: str,
)
return self._apply_res([share], create_only=True).one()
- @cli.SMBCommand('share rm', perm='rw')
+ @SMBCLICommand('share rm', perm='rw')
def share_rm(self, cluster_id: str, share_id: str) -> results.Result:
"""Remove an smb share"""
share = resources.RemovedShare(
)
return self._apply_res([share]).one()
- @cli.SMBCommand("show", perm="r")
+ @SMBCLICommand("show", perm="r")
def show(
self,
resource_names: Optional[List[str]] = None,