+# flake8: noqa
from .module import Module
import re
-from typing import Any, Dict, Optional, Tuple, TYPE_CHECKING, Union
+from typing import Dict, Optional, Tuple, TYPE_CHECKING, Union
GLOBAL_POOL_KEY = (None, None)
+
class NotAuthorizedError(Exception):
pass
osd_map = module.get('osd_map')
return {pool['pool']: pool['pool_name'] for pool in osd_map['pools']
if 'rbd' in pool.get('application_metadata', {})}
-
import json
import rados
import rbd
-import re
import traceback
from datetime import datetime
from threading import Condition, Lock, Thread
-from typing import Any, Dict, List, NamedTuple, Optional, Sequence, Set, Tuple, Union
+from typing import Any, Dict, List, NamedTuple, Optional, Set, Tuple, Union
from .common import get_rbd_pools
-from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules
+from .schedule import LevelSpec, Schedules
+
def namespace_validator(ioctx: rados.Ioctx) -> None:
mode = rbd.RBD().mirror_mode_get(ioctx)
raise ValueError("namespace {} is not in mirror image mode".format(
ioctx.get_namespace()))
+
def image_validator(image: rbd.Image) -> None:
mode = image.mirror_image_get_mode()
if mode != rbd.RBD_MIRROR_IMAGE_MODE_SNAPSHOT:
pool_id, namespace, image_id, e))
self.close_image(image_spec, image)
-
def handle_create_snapshot(self,
image_spec: ImageSpec,
image: rbd.Image,
continue
image_name = self.images[pool_id][namespace][image_id]
scheduled_images.append({
- 'schedule_time' : schedule_time,
- 'image' : image_name
+ 'schedule_time': schedule_time,
+ 'image': image_name
})
- return 0, json.dumps({'scheduled_images' : scheduled_images},
+ return 0, json.dumps({'scheduled_images': scheduled_images},
indent=4, sort_keys=True), ""
import inspect
import rados
import rbd
-import traceback
from typing import cast, Any, Callable, Optional, Tuple, TypeVar
from mgr_module import CLIReadCommand, CLIWriteCommand, MgrModule, Option
ioctx: rados.Ioctx,
namespace_validator: Optional[Callable],
image_validator: Optional[Callable]) -> None:
- pool_id = ioctx.get_pool_id()
pool_name = ioctx.get_pool_name()
stale_keys = []
start_after = ''
ls = self.level_specs[level_spec_id]
if ls == parent or ls == level_spec or ls.is_child_of(level_spec):
result[level_spec_id] = {
- 'name' : schedule.name,
- 'schedule' : schedule.to_list(),
+ 'name': schedule.name,
+ 'schedule': schedule.to_list(),
}
return result
PoolSpecT = Tuple[str, str]
MigrationStatusT = Dict[str, str]
+
class TaskHandler:
lock = Lock()
condition = Condition(lock)
-import errno
import json
import rados
import rbd
-import re
import traceback
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
from .common import get_rbd_pools
-from .schedule import LevelSpec, Interval, StartTime, Schedule, Schedules
+from .schedule import LevelSpec, Schedules
class TrashPurgeScheduleHandler:
continue
pool_name = self.pools[pool_id][namespace]
scheduled.append({
- 'schedule_time' : schedule_time,
- 'pool_id' : pool_id,
- 'pool_name' : pool_name,
- 'namespace' : namespace
+ 'schedule_time': schedule_time,
+ 'pool_id': pool_id,
+ 'pool_name': pool_name,
+ 'namespace': namespace
})
- return 0, json.dumps({'scheduled' : scheduled}, indent=4,
+ return 0, json.dumps({'scheduled': scheduled}, indent=4,
sort_keys=True), ""
nfs
orchestrator
prometheus
+ rbd_support
selftest
commands =
flake8 --config=tox.ini {posargs} \