# raise an ImportError
import sys
import mock
- sys.modules['ceph_module'] = mock.Mock()
mgr = mock.Mock()
mgr.get_frontend_path.side_effect = lambda: os.path.abspath("./frontend/dist")
+
+ def mock_ceph_modules():
+ class MockRadosError(Exception):
+ def __init__(self, message, errno=None):
+ super(MockRadosError, self).__init__(message)
+ self.errno = errno
+
+ def __str__(self):
+ msg = super(MockRadosError, self).__str__()
+ if self.errno is None:
+ return msg
+ return '[errno {0}] {1}'.format(self.errno, msg)
+
+ rbd = mock.Mock()
+ rbd.RBD_FEATURE_LAYERING = 1
+ rbd.RBD_FEATURE_STRIPINGV2 = 2
+ rbd.RBD_FEATURE_EXCLUSIVE_LOCK = 4
+ rbd.RBD_FEATURE_OBJECT_MAP = 8
+ rbd.RBD_FEATURE_FAST_DIFF = 16
+ rbd.RBD_FEATURE_DEEP_FLATTEN = 32
+ rbd.RBD_FEATURE_JOURNALING = 64
+ rbd.RBD_FEATURE_DATA_POOL = 128
+ rbd.RBD_FEATURE_OPERATIONS = 256
+
+ sys.modules.update({
+ 'rados': mock.Mock(Error=MockRadosError, OSError=MockRadosError),
+ 'rbd': rbd,
+ 'cephfs': mock.Mock(),
+ 'ceph_module': mock.Mock(),
+ })
+
+ mock_ceph_modules()
import json
import cherrypy
-import rados
-import rbd
+import rados # pylint: disable=import-error
+import rbd # pylint: disable=import-error
from . import ApiController, UiApiController, RESTController, BaseController, Endpoint,\
ReadPermission, UpdatePermission, Task
from functools import partial
import cherrypy
-import cephfs
+import cephfs # pylint: disable=import-error
from . import ApiController, RESTController, UiApiController, BaseController, \
Endpoint, Task, ReadPermission, ControllerDoc, EndpointDoc
import cherrypy
-import rbd
+import rbd # pylint: disable=import-error
from . import ApiController, RESTController, Task, UpdatePermission, \
DeletePermission, CreatePermission, ReadPermission, allow_empty_body
import cherrypy
-import rbd
+import rbd # pylint: disable=import-error
from . import ApiController, Endpoint, Task, BaseController, ReadPermission, \
RESTController
def code(self):
if self._code:
return str(self._code)
- return str(abs(self.errno))
+ return str(abs(self.errno)) if self.errno is not None else 'Error'
# access control module exceptions
import tempfile
import threading
import time
+
from uuid import uuid4
from OpenSSL import crypto
-import _strptime # pylint: disable=unused-import
from mgr_module import MgrModule, MgrStandbyModule, Option, CLIWriteCommand
from mgr_util import get_default_addr, ServerConfigException, verify_tls_files
+import _strptime # pylint: disable=unused-import
+
try:
import cherrypy
from cherrypy._cptools import HandlerWrapperTool
import json
-import rados
+import rados # pylint: disable=import-error
from mgr_module import CommandResult
from mgr_util import get_time_series_rates, get_most_recent_rate
from contextlib import contextmanager
-import cephfs
+import cephfs # pylint: disable=import-error
from .. import mgr, logger
import cherrypy
-import rbd
-import rados
+import rbd # pylint: disable=import-error
+import rados # pylint: disable=import-error
from .. import logger
from ..services.ceph_service import SendCommandError
import six
-import rbd
+import rbd # pylint: disable=import-error
from .. import mgr
from .ceph_service import CephService
import time
-import rados
+import rados # pylint: disable=import-error
from . import ControllerTestCase
from ..services.ceph_service import SendCommandError