-import os
+from .module import CephadmOrchestrator
+
+__all__ = [
+ "CephadmOrchestrator",
+]
+import os
if 'UNITTEST' in os.environ:
import tests
-
-from .module import CephadmOrchestrator
+ __all__.append(tests.__name__)
import logging
-from typing import TYPE_CHECKING, Iterator, Union
+from typing import TYPE_CHECKING, Iterator
from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, HostPlacementSpec
from cephadm.schedule import HostAssignment
import shlex
from collections import defaultdict
from configparser import ConfigParser
-from contextlib import contextmanager
from functools import wraps
from tempfile import TemporaryDirectory
from threading import Event
import string
from typing import List, Dict, Optional, Callable, Tuple, TypeVar, \
- Any, Set, TYPE_CHECKING, cast, Iterator, NamedTuple, Sequence
+ Any, Set, TYPE_CHECKING, cast, NamedTuple, Sequence
import datetime
import os
from ceph.deployment.drive_group import DriveGroupSpec
from ceph.deployment.service_spec import \
NFSServiceSpec, ServiceSpec, PlacementSpec, assert_valid_host, \
- HostPlacementSpec, HA_RGWSpec
+ HostPlacementSpec
from ceph.utils import str_to_datetime, datetime_to_str, datetime_now
from cephadm.serve import CephadmServe
from cephadm.services.cephadmservice import CephadmDaemonSpec
from mgr_module import MgrModule, HandleCommandResult, Option
-from mgr_util import create_self_signed_cert, verify_tls, ServerConfigException
+from mgr_util import create_self_signed_cert
import secrets
import orchestrator
from orchestrator import OrchestratorError, OrchestratorValidationError, HostSpec, \
- CLICommandMeta, OrchestratorEvent, set_exception_subject, DaemonDescription
+ CLICommandMeta, DaemonDescription
from orchestrator._interface import GenericSpec
-from orchestrator._interface import daemon_type_to_service, service_to_daemon_types
+from orchestrator._interface import daemon_type_to_service
from . import remotes
from . import utils
from .services.iscsi import IscsiService
from .services.ha_rgw import HA_RGWService
from .services.nfs import NFSService
-from .services.osd import RemoveUtil, OSDRemovalQueue, OSDService, OSD, NotFoundError
+from .services.osd import OSDRemovalQueue, OSDService, OSD, NotFoundError
from .services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
NodeExporterService
from .schedule import HostAssignment
from remoto.backends import BaseConnection
BaseConnection.has_connection = remoto_has_connection
import remoto.process
- import execnet.gateway_bootstrap
except ImportError as e:
remoto = None
remoto_import_error = str(e)
from typing import List, Optional, Callable, Iterable, TypeVar, Set
import orchestrator
-from ceph.deployment.service_spec import PlacementSpec, HostPlacementSpec, ServiceSpec
+from ceph.deployment.service_spec import HostPlacementSpec, ServiceSpec
from orchestrator._interface import DaemonDescription
from orchestrator import OrchestratorValidationError
-import datetime
import json
import logging
from collections import defaultdict
import json
import re
import logging
-import secrets
import subprocess
-import collections
from abc import ABCMeta, abstractmethod
from typing import TYPE_CHECKING, List, Callable, Any, TypeVar, Generic, \
Optional, Dict, Any, Tuple, NewType
from ceph.deployment.service_spec import ServiceSpec, RGWSpec
from ceph.deployment.utils import is_ipv6, unwrap_ipv6
from orchestrator import OrchestratorError, DaemonDescription
-from orchestrator._interface import daemon_type_to_service, service_to_daemon_types
+from orchestrator._interface import daemon_type_to_service
from cephadm import utils
-from mgr_util import create_self_signed_cert, ServerConfigException, verify_tls
+from mgr_util import ServerConfigException, verify_tls
if TYPE_CHECKING:
from cephadm.module import CephadmOrchestrator
-import json
import logging
from typing import List, cast, Tuple, Dict, Any
from ceph.deployment.service_spec import HA_RGWSpec
-from orchestrator import DaemonDescription, OrchestratorError
from .cephadmservice import CephadmDaemonSpec, CephService
-from ..utils import CephadmNoImage, cephadmNoImage, resolve_ip
+from ..utils import resolve_ip
logger = logging.getLogger(__name__)
import logging
from typing import List, cast
-from mgr_module import MonCommandFailed
from ceph.deployment.service_spec import IscsiServiceSpec
-from orchestrator import DaemonDescription, OrchestratorError
+from orchestrator import DaemonDescription
from .cephadmservice import CephadmDaemonSpec, CephService
from .. import utils
import logging
-from typing import TYPE_CHECKING, Dict, Tuple, Any, List
+from typing import Dict, Tuple, Any, List
from ceph.deployment.service_spec import NFSServiceSpec
import rados
-from orchestrator import OrchestratorError, DaemonDescription
+from orchestrator import DaemonDescription
-from cephadm import utils
from cephadm.services.cephadmservice import AuthEntity, CephadmDaemonSpec, CephService
-if TYPE_CHECKING:
- from cephadm.module import CephadmOrchestrator
-
logger = logging.getLogger(__name__)
import json
import logging
from threading import Lock
-from typing import List, Dict, Any, Set, Union, Tuple, cast, Optional, TYPE_CHECKING
+from typing import List, Dict, Any, Set, Tuple, cast, Optional, TYPE_CHECKING
from ceph.deployment import translate
from ceph.deployment.drive_group import DriveGroupSpec
import copy
from typing import Optional, TYPE_CHECKING
-from jinja2 import Environment, PackageLoader, select_autoescape, StrictUndefined, Template
+from jinja2 import Environment, PackageLoader, select_autoescape, StrictUndefined
from jinja2 import exceptions as j2_exceptions
if TYPE_CHECKING:
--- /dev/null
+import pytest
+
+from cephadm.services.osd import RemoveUtil, OSD
+from tests import mock
+
+from .fixtures import with_cephadm_module
+
+
+@pytest.fixture()
+def cephadm_module():
+ with with_cephadm_module({}) as m:
+ yield m
+
+
+@pytest.fixture()
+def rm_util():
+ with with_cephadm_module({}) as m:
+ r = RemoveUtil.__new__(RemoveUtil)
+ r.__init__(m)
+ yield r
+
+
+@pytest.fixture()
+def osd_obj():
+ with mock.patch("cephadm.services.osd.RemoveUtil"):
+ o = OSD(0, mock.MagicMock())
+ yield o
from typing import Any, Iterator, List
except ImportError:
pass
-import pytest
from cephadm import CephadmOrchestrator
-from cephadm.services.osd import RemoveUtil, OSD
from orchestrator import raise_if_exception, Completion, HostSpec
from tests import mock
yield m
-@pytest.fixture()
-def cephadm_module():
- with with_cephadm_module({}) as m:
- yield m
-
-
-@pytest.fixture()
-def rm_util():
- with with_cephadm_module({}) as m:
- r = RemoveUtil.__new__(RemoveUtil)
- r.__init__(m)
- yield r
-
-
-@pytest.fixture()
-def osd_obj():
- with mock.patch("cephadm.services.osd.RemoveUtil"):
- o = OSD(0, mock.MagicMock())
- yield o
-
-
def wait(m, c):
# type: (CephadmOrchestrator, Completion) -> Any
m.process([c])
try:
- import pydevd # if in debugger
+ # if in debugger
+ import pydevd # noqa: F401
in_debug = True
except ImportError:
in_debug = False
from ceph.deployment.drive_group import DriveGroupSpec, DeviceSelection
from cephadm.serve import CephadmServe
from cephadm.services.osd import OSD, OSDRemovalQueue
-from cephadm.utils import CephadmNoImage
try:
- from typing import Any, List
+ from typing import List
except ImportError:
pass
from ceph.deployment.drive_selection.selector import DriveSelection
from ceph.deployment.inventory import Devices, Device
from ceph.utils import datetime_to_str, datetime_now
-from orchestrator import ServiceDescription, DaemonDescription, InventoryHost, \
+from orchestrator import DaemonDescription, InventoryHost, \
HostSpec, OrchestratorError
from tests import mock
-from .fixtures import cephadm_module, wait, _run_cephadm, match_glob, with_host, \
+from .fixtures import wait, _run_cephadm, match_glob, with_host, \
with_cephadm_module, with_service, assert_rm_service, _deploy_cephadm_binary
from cephadm.module import CephadmOrchestrator
-import sys
-import time
-
-
-try:
- from typing import Any
-except ImportError:
- pass
-
import pytest
-
-from tests import mock
-from .fixtures import cephadm_module, wait
+from .fixtures import wait
from ..module import trivial_completion, forall_hosts
import json
-import pytest
-
from ceph.deployment.service_spec import PlacementSpec, ServiceSpec, HostPlacementSpec
from ceph.utils import datetime_to_str, datetime_now
from cephadm import CephadmOrchestrator
from cephadm.inventory import SPEC_STORE_PREFIX
-from cephadm.tests.fixtures import _run_cephadm, cephadm_module, wait, with_host
-from orchestrator import OrchestratorError
+from cephadm.tests.fixtures import _run_cephadm, wait, with_host
from cephadm.serve import CephadmServe
from tests import mock
import json
-from cephadm.services.osd import RemoveUtil, OSDRemovalQueue, OSD
+from cephadm.services.osd import OSDRemovalQueue, OSD
import pytest
-from .fixtures import rm_util, osd_obj, cephadm_module
from tests import mock
from datetime import datetime
from unittest.mock import MagicMock, call
from cephadm.services.cephadmservice import MonService, MgrService, MdsService, RgwService, \
- RbdMirrorService, CrashService, CephadmService, AuthEntity, CephadmExporter
+ RbdMirrorService, CrashService, CephadmExporter
from cephadm.services.iscsi import IscsiService
from cephadm.services.nfs import NFSService
-from cephadm.services.osd import RemoveUtil, OSDRemovalQueue, OSDService, OSD, NotFoundError
+from cephadm.services.osd import OSDService
from cephadm.services.monitoring import GrafanaService, AlertmanagerService, PrometheusService, \
NodeExporterService
from ceph.deployment.service_spec import IscsiServiceSpec
import yaml
from ceph.deployment.service_spec import ServiceSpec, NFSServiceSpec, RGWSpec, \
- IscsiServiceSpec, AlertManagerSpec, HostPlacementSpec, CustomContainerSpec, \
- HA_RGWSpec
+ IscsiServiceSpec, AlertManagerSpec, HostPlacementSpec, CustomContainerSpec
from orchestrator import DaemonDescription, OrchestratorError
import pytest
-from cephadm.tests.fixtures import cephadm_module
from cephadm.template import TemplateMgr, UndefinedError, TemplateNotFoundError
from cephadm import CephadmOrchestrator
from cephadm.upgrade import CephadmUpgrade
from cephadm.serve import CephadmServe
-from .fixtures import _run_cephadm, wait, cephadm_module, with_host, with_service
+from .fixtures import _run_cephadm, wait, with_host, with_service
@mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}'))
import orchestrator
from cephadm.serve import CephadmServe
from cephadm.utils import ceph_release_to_major, name_to_config_section, CEPH_UPGRADE_ORDER
-from orchestrator import OrchestratorError, DaemonDescription, daemon_type_to_service, service_to_daemon_types
+from orchestrator import OrchestratorError, DaemonDescription, daemon_type_to_service
if TYPE_CHECKING:
from .module import CephadmOrchestrator
from __future__ import absolute_import
-import os
-
-if 'UNITTEST' in os.environ:
- import tests
from .module import OrchestratorCli
OrchestratorEvent, set_exception_subject, \
InventoryHost, DeviceLightLoc, \
UpgradeStatusSpec, daemon_type_to_service, service_to_daemon_types
+
+__all__ = [
+ 'OrchestratorCli',
+ 'Completion',
+ 'TrivialReadCompletion',
+ 'raise_if_exception',
+ 'ProgressReference',
+ 'pretty_print',
+ '_Promise',
+ 'CLICommand',
+ '_cli_write_command',
+ '_cli_read_command',
+ 'CLICommandMeta',
+ 'Orchestrator',
+ 'OrchestratorClientMixin',
+ 'OrchestratorValidationError',
+ 'OrchestratorError',
+ 'NoOrchestrator',
+ 'ServiceDescription',
+ 'InventoryFilter',
+ 'HostSpec',
+ 'DaemonDescription',
+ 'OrchestratorEvent',
+ 'set_exception_subject',
+ 'InventoryHost',
+ 'DeviceLightLoc',
+ 'UpgradeStatusSpec',
+ 'daemon_type_to_service',
+ 'service_to_daemon_types',
+
+]
+
+import os
+if 'UNITTEST' in os.environ:
+ import tests
+ __all__.append(tests.__name__)
from ceph.deployment.service_spec import PlacementSpec, ServiceSpec
from ceph.utils import datetime_now
-from mgr_util import format_bytes, to_pretty_timedelta, format_dimless
+from mgr_util import to_pretty_timedelta, format_dimless
from mgr_module import MgrModule, HandleCommandResult, Option
from ._interface import OrchestratorClientMixin, DeviceLightLoc, _cli_read_command, \
raise_if_exception, _cli_write_command, TrivialReadCompletion, OrchestratorError, \
- NoOrchestrator, OrchestratorValidationError, NFSServiceSpec, HA_RGWSpec, \
+ NoOrchestrator, OrchestratorValidationError, NFSServiceSpec, \
RGWSpec, InventoryFilter, InventoryHost, HostSpec, CLICommandMeta, \
- ServiceDescription, DaemonDescription, IscsiServiceSpec, json_to_generic_spec, GenericSpec
+ ServiceDescription, DaemonDescription, IscsiServiceSpec, json_to_generic_spec
def nice_delta(now: datetime.datetime, t: Optional[datetime.datetime], suffix: str = '') -> str:
max-line-length = 100
ignore =
E501,
- F401,
F811,
W503,
exclude =