]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/mgr/mirroring: interface to mirror CephFS directory snapshots
authorVenky Shankar <vshankar@redhat.com>
Thu, 9 Jul 2020 10:43:47 +0000 (06:43 -0400)
committerVenky Shankar <vshankar@redhat.com>
Thu, 4 Feb 2021 04:49:15 +0000 (23:49 -0500)
Signed-off-by: Venky Shankar <vshankar@redhat.com>
(cherry picked from commit b7acf7fc77d8c7865935643c3eb484be2f9999a1)

17 files changed:
ceph.spec.in
debian/ceph-mgr-modules-core.install
src/pybind/mgr/mirroring/__init__.py [new file with mode: 0644]
src/pybind/mgr/mirroring/fs/__init__.py [new file with mode: 0644]
src/pybind/mgr/mirroring/fs/blocklist.py [new file with mode: 0644]
src/pybind/mgr/mirroring/fs/dir_map/__init__.py [new file with mode: 0644]
src/pybind/mgr/mirroring/fs/dir_map/create.py [new file with mode: 0644]
src/pybind/mgr/mirroring/fs/dir_map/load.py [new file with mode: 0644]
src/pybind/mgr/mirroring/fs/dir_map/policy.py [new file with mode: 0644]
src/pybind/mgr/mirroring/fs/dir_map/state_transition.py [new file with mode: 0644]
src/pybind/mgr/mirroring/fs/dir_map/update.py [new file with mode: 0644]
src/pybind/mgr/mirroring/fs/exception.py [new file with mode: 0644]
src/pybind/mgr/mirroring/fs/notify.py [new file with mode: 0644]
src/pybind/mgr/mirroring/fs/snapshot_mirror.py [new file with mode: 0644]
src/pybind/mgr/mirroring/fs/utils.py [new file with mode: 0644]
src/pybind/mgr/mirroring/module.py [new file with mode: 0644]
src/pybind/mgr/tox.ini

index 4463ff758f6dc5f130f7fc86702a3ce7076099d0..2482328d3d9830f216dd2cb3e2fb19b82ec1527a 100644 (file)
@@ -1708,6 +1708,7 @@ fi
 %{_datadir}/ceph/mgr/iostat
 %{_datadir}/ceph/mgr/localpool
 %{_datadir}/ceph/mgr/mds_autoscaler
+%{_datadir}/ceph/mgr/mirroring
 %{_datadir}/ceph/mgr/orchestrator
 %{_datadir}/ceph/mgr/osd_perf_query
 %{_datadir}/ceph/mgr/osd_support
index a1a74c0af2d1f9a052557c769d2090e303317846..764038dd0a222423eb0671368ee5091a7af0c73e 100644 (file)
@@ -6,6 +6,7 @@ usr/share/ceph/mgr/influx
 usr/share/ceph/mgr/insights
 usr/share/ceph/mgr/iostat
 usr/share/ceph/mgr/localpool
+usr/share/ceph/mgr/mirroring
 usr/share/ceph/mgr/orchestrator
 usr/share/ceph/mgr/osd_perf_query
 usr/share/ceph/mgr/osd_support
diff --git a/src/pybind/mgr/mirroring/__init__.py b/src/pybind/mgr/mirroring/__init__.py
new file mode 100644 (file)
index 0000000..8f210ac
--- /dev/null
@@ -0,0 +1 @@
+from .module import Module
diff --git a/src/pybind/mgr/mirroring/fs/__init__.py b/src/pybind/mgr/mirroring/fs/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/pybind/mgr/mirroring/fs/blocklist.py b/src/pybind/mgr/mirroring/fs/blocklist.py
new file mode 100644 (file)
index 0000000..473b5f2
--- /dev/null
@@ -0,0 +1,10 @@
+import logging
+
+log = logging.getLogger(__name__)
+
+def blocklist(mgr, addr):
+    cmd = {'prefix': 'osd blocklist', 'blocklistop': 'add', 'addr': str(addr)}
+    r, outs, err = mgr.mon_command(cmd)
+    if r != 0:
+        log.error(f'blocklist error: {err}')
+    return r
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/__init__.py b/src/pybind/mgr/mirroring/fs/dir_map/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/create.py b/src/pybind/mgr/mirroring/fs/dir_map/create.py
new file mode 100644 (file)
index 0000000..963dfe9
--- /dev/null
@@ -0,0 +1,23 @@
+import errno
+import logging
+
+import rados
+
+from ..exception import MirrorException
+from ..utils import MIRROR_OBJECT_NAME
+
+log = logging.getLogger(__name__)
+
+def create_mirror_object(rados_inst, pool_id):
+    log.info(f'creating mirror object: {MIRROR_OBJECT_NAME}')
+    try:
+        with rados_inst.open_ioctx2(pool_id) as ioctx:
+            with rados.WriteOpCtx() as write_op:
+                write_op.new(rados.LIBRADOS_CREATE_EXCLUSIVE)
+                ioctx.operate_write_op(write_op, MIRROR_OBJECT_NAME)
+    except rados.Error as e:
+        if e.errno == errno.EEXIST:
+            # be graceful
+            return -e.errno
+        log.error(f'failed to create mirror object: {e}')
+        raise Exception(-e.args[0])
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/load.py b/src/pybind/mgr/mirroring/fs/dir_map/load.py
new file mode 100644 (file)
index 0000000..42468b4
--- /dev/null
@@ -0,0 +1,74 @@
+import errno
+import pickle
+import logging
+from typing import Dict
+
+import rados
+
+from ..exception import MirrorException
+from ..utils import MIRROR_OBJECT_NAME, DIRECTORY_MAP_PREFIX, \
+    INSTANCE_ID_PREFIX
+
+log = logging.getLogger(__name__)
+
+MAX_RETURN = 256
+
+def handle_dir_load(dir_mapping, dir_map):
+    for directory_str, encoded_map in dir_map.items():
+        dir_path = directory_str[len(DIRECTORY_MAP_PREFIX):]
+        decoded_map = pickle.loads(encoded_map)
+        log.debug(f'{dir_path} -> {decoded_map}')
+        dir_mapping[dir_path] = decoded_map
+
+def load_dir_map(ioctx):
+    dir_mapping = {} # type: Dict[str, Dict]
+    log.info('loading dir map...')
+    try:
+        with rados.ReadOpCtx() as read_op:
+            start = ""
+            while True:
+                iter, ret = ioctx.get_omap_vals(read_op, start, DIRECTORY_MAP_PREFIX, MAX_RETURN)
+                if not ret == 0:
+                    log.error(f'failed to fetch dir mapping omap')
+                    raise Exception(-errno.EINVAL)
+                ioctx.operate_read_op(read_op, MIRROR_OBJECT_NAME)
+                dir_map = dict(iter)
+                if not dir_map:
+                    break
+                handle_dir_load(dir_mapping, dir_map)
+                start = dir_map.popitem()[0]
+        log.info("loaded {0} directory mapping(s) from disk".format(len(dir_mapping)))
+        return dir_mapping
+    except rados.Error as e:
+        log.error(f'exception when loading directory mapping: {e}')
+        raise Exception(-e.errno)
+
+def handle_instance_load(instance_mapping, instance_map):
+    for instance, e_data in instance_map.items():
+        instance_id = instance[len(INSTANCE_ID_PREFIX):]
+        d_data = pickle.loads(e_data)
+        log.debug(f'{instance_id} -> {d_data}')
+        instance_mapping[instance_id] = d_data
+
+def load_instances(ioctx):
+    instance_mapping = {} # type: Dict[str, Dict]
+    log.info('loading instances...')
+    try:
+        with rados.ReadOpCtx() as read_op:
+            start = ""
+            while True:
+                iter, ret = ioctx.get_omap_vals(read_op, start, INSTANCE_ID_PREFIX, MAX_RETURN)
+                if not ret == 0:
+                    log.error(f'failed to fetch instance omap')
+                    raise Exception(-errno.EINVAL)
+                ioctx.operate_read_op(read_op, MIRROR_OBJECT_NAME)
+                instance_map = dict(iter)
+                if not instance_map:
+                    break
+                handle_instance_load(instance_mapping, instance_map)
+                start = instance_map.popitem()[0]
+        log.info("loaded {0} instance(s) from disk".format(len(instance_mapping)))
+        return instance_mapping
+    except rados.Error as e:
+        log.error(f'exception when loading instances: {e}')
+        raise Exception(-e.errno)
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/policy.py b/src/pybind/mgr/mirroring/fs/dir_map/policy.py
new file mode 100644 (file)
index 0000000..921372b
--- /dev/null
@@ -0,0 +1,363 @@
+import os
+import errno
+import logging
+import time
+from threading import Lock
+from typing import Dict
+
+from .state_transition import ActionType, PolicyAction, Transition, \
+    State, StateTransition
+from ..exception import MirrorException
+
+log = logging.getLogger(__name__)
+
+class DirectoryState:
+    def __init__(self, instance_id=None, mapped_time=None):
+        self.instance_id = instance_id
+        self.mapped_time = mapped_time
+        self.state = State.UNASSOCIATED
+        self.stalled = False
+        self.transition = Transition(ActionType.NONE)
+        self.next_state = None
+        self.purging = False
+
+    def __str__(self):
+        return f'[instance_id={self.instance_id}, mapped_time={self.mapped_time},'\
+            f' state={self.state}, transition={self.transition}, next_state={self.next_state},'\
+            f' purging={self.purging}]'
+
+class Policy:
+    def __init__(self):
+        self.dir_states = {}
+        self.instance_to_dir_map = {}
+        self.dead_instances = []
+        self.lock = Lock()
+
+    @staticmethod
+    def is_instance_action(action_type):
+        return action_type in (ActionType.ACQUIRE,
+                               ActionType.RELEASE)
+
+    def is_dead_instance(self, instance_id):
+        return instance_id in self.dead_instances
+
+    def is_state_scheduled(self, dir_state, state):
+        return dir_state.state == state or dir_state.next_state == state
+
+    def is_shuffling(self, dir_path):
+        log.debug(f'is_shuffling: {dir_path}')
+        return self.is_state_scheduled(self.dir_states[dir_path], State.SHUFFLING)
+
+    def can_shuffle_dir(self, dir_path):
+        """Right now, shuffle directories only based on idleness. Later, we
+        probably want to avoid shuffling images that were recently shuffled.
+        """
+        log.debug(f'can_shuffle_dir: {dir_path}')
+        dir_state = self.dir_states[dir_path]
+        return StateTransition.is_idle(dir_state.state)
+
+    def set_state(self, dir_state, state, ignore_current_state=False):
+        if not ignore_current_state and dir_state.state == state:
+            return False
+        elif StateTransition.is_idle(dir_state.state):
+            dir_state.state = state
+            dir_state.next_state = None
+            dir_state.transition = StateTransition.transit(
+                dir_state.state, dir_state.transition.action_type)
+            return True
+        dir_state.next_state = state
+        return False
+
+    def init(self, dir_mapping):
+        with self.lock:
+            for dir_path, dir_map in dir_mapping.items():
+                instance_id = dir_map['instance_id']
+                if instance_id:
+                    if not instance_id in self.instance_to_dir_map:
+                        self.instance_to_dir_map[instance_id] = []
+                    self.instance_to_dir_map[instance_id].append(dir_path)
+                self.dir_states[dir_path] = DirectoryState(instance_id, dir_map['last_shuffled'])
+                dir_state = self.dir_states[dir_path]
+                state = State.INITIALIZING if instance_id else State.ASSOCIATING
+                if instance_id:
+                    purging = dir_mapping.get('purging', False)
+                    if purging:
+                        dir_state.purging = True
+                        state = State.DISASSOCIATING
+                    else:
+                        state = State.INITIALIZING
+                else:
+                    state = State.ASSOCIATING
+                log.debug(f'starting state: {dir_path} {state}')
+                self.set_state(dir_state, state)
+                log.debug(f'init dir_state: {dir_state}')
+
+    def lookup(self, dir_path):
+        log.debug(f'looking up {dir_path}')
+        with self.lock:
+            dir_state = self.dir_states.get(dir_path, None)
+            if dir_state:
+                return {'instance_id': dir_state.instance_id,
+                        'mapped_time': dir_state.mapped_time,
+                        'purging': dir_state.purging}
+            return None
+
+    def map(self, dir_path, dir_state):
+        log.debug(f'mapping {dir_path}')
+        min_instance_id = None
+        current_instance_id = dir_state.instance_id
+        if current_instance_id and not self.is_dead_instance(current_instance_id):
+            return True
+        if self.is_dead_instance(current_instance_id):
+            self.unmap(dir_path, dir_state)
+        for instance_id, dir_paths in self.instance_to_dir_map.items():
+            if self.is_dead_instance(instance_id):
+                continue
+            if not min_instance_id or len(dir_paths) < len(self.instance_to_dir_map[min_instance_id]):
+                min_instance_id = instance_id
+        if not min_instance_id:
+            log.debug(f'instance unavailable for {dir_path}')
+            return False
+        log.debug(f'dir_path {dir_path} maps to instance {min_instance_id}')
+        dir_state.instance_id = min_instance_id
+        dir_state.mapped_time = time.time()
+        self.instance_to_dir_map[min_instance_id].append(dir_path)
+        return True
+
+    def unmap(self, dir_path, dir_state):
+        instance_id = dir_state.instance_id
+        log.debug(f'unmapping {dir_path} from instance {instance_id}')
+        self.instance_to_dir_map[instance_id].remove(dir_path)
+        dir_state.instance_id = None
+        dir_state.mapped_time = None
+        if self.is_dead_instance(instance_id) and not self.instance_to_dir_map[instance_id]:
+            self.instance_to_dir_map.pop(instance_id)
+            self.dead_instances.remove(instance_id)
+
+    def shuffle(self, dirs_per_instance, include_stalled_dirs):
+        log.debug(f'directories per instance: {dirs_per_instance}')
+        shuffle_dirs = []
+        for instance_id, dir_paths in self.instance_to_dir_map.items():
+            cut_off = len(dir_paths) - dirs_per_instance
+            if cut_off > 0:
+                for dir_path in dir_paths:
+                    if cut_off == 0:
+                        break
+                    if self.is_shuffling(dir_path):
+                        cut_off -= 1
+                    elif self.can_shuffle_dir(dir_path):
+                        cut_off -= 1
+                        shuffle_dirs.append(dir_path)
+        if include_stalled_dirs:
+            for dir_path, dir_state in self.dir_states.items():
+                if dir_state.stalled:
+                    log.debug(f'{dir_path} is stalled: {dir_state} -- trigerring kick')
+                    dir_state.stalled = False
+                    shuffle_dirs.append(dir_path)
+        return shuffle_dirs
+
+    def execute_policy_action(self, dir_path, dir_state, policy_action):
+        log.debug(f'executing for directory {dir_path} policy_action {policy_action}')
+
+        done = True
+        if policy_action == PolicyAction.MAP:
+            done = self.map(dir_path, dir_state)
+        elif policy_action == PolicyAction.UNMAP:
+            self.unmap(dir_path, dir_state)
+        elif policy_action == PolicyAction.REMOVE:
+            if dir_state.state == State.UNASSOCIATED:
+                self.dir_states.pop(dir_path)
+        else:
+            raise Exception()
+        return done
+
+    def start_action(self, dir_path):
+        log.debug(f'start action: {dir_path}')
+        with self.lock:
+            dir_state = self.dir_states.get(dir_path, None)
+            if not dir_state:
+                raise Exception()
+            log.debug(f'dir_state: {dir_state}')
+            if dir_state.transition.start_policy_action:
+                stalled = not self.execute_policy_action(dir_path, dir_state,
+                                                         dir_state.transition.start_policy_action)
+                if stalled:
+                    dir_state.stalled = True
+                    log.debug(f'state machine stalled')
+                    return ActionType.NONE
+            return dir_state.transition.action_type
+
+    def finish_action(self, dir_path, r):
+        log.debug(f'finish action {dir_path} r={r}')
+        with self.lock:
+            dir_state = self.dir_states.get(dir_path, None)
+            if not dir_state:
+                raise Exception()
+            if r < 0 and (not Policy.is_instance_action(dir_state.transition.action_type) or
+                          not dir_state.instance_id or
+                          not dir_state.instance_id in self.dead_instances):
+                return True
+            log.debug(f'dir_state: {dir_state}')
+            finish_policy_action = dir_state.transition.finish_policy_action
+            dir_state.transition = StateTransition.transit(
+                dir_state.state, dir_state.transition.action_type)
+            log.debug(f'transitioned to dir_state: {dir_state}')
+            if dir_state.transition.final_state:
+                log.debug('reached final state')
+                dir_state.state = dir_state.transition.final_state
+                dir_state.transition = Transition(ActionType.NONE)
+                log.debug(f'final dir_state: {dir_state}')
+            if StateTransition.is_idle(dir_state.state) and dir_state.next_state:
+                self.set_state(dir_state, dir_state.next_state)
+            pending = not dir_state.transition.action_type == ActionType.NONE
+            if finish_policy_action:
+                self.execute_policy_action(dir_path, dir_state, finish_policy_action)
+            return pending
+
+    def find_tracked_ancestor_or_subtree(self, dir_path):
+        for tracked_path, _ in self.dir_states.items():
+            comp = [dir_path, tracked_path]
+            cpath = os.path.commonpath(comp)
+            if cpath in comp:
+                what = 'subtree' if cpath == tracked_path else 'ancestor'
+                return (tracked_path, what)
+        return None
+
+    def add_dir(self, dir_path):
+        log.debug(f'adding dir_path {dir_path}')
+        with self.lock:
+            if dir_path in self.dir_states:
+                return False
+            as_info = self.find_tracked_ancestor_or_subtree(dir_path)
+            if as_info:
+                raise MirrorException(-errno.EINVAL, f'{dir_path} is a {as_info[1]} of tracked path {as_info[0]}')
+            self.dir_states[dir_path] = DirectoryState()
+            dir_state = self.dir_states[dir_path]
+            log.debug(f'add dir_state: {dir_state}')
+            if dir_state.state == State.INITIALIZING:
+                return False
+            return self.set_state(dir_state, State.ASSOCIATING)
+
+    def remove_dir(self, dir_path):
+        log.debug(f'removing dir_path {dir_path}')
+        with self.lock:
+            dir_state = self.dir_states.get(dir_path, None)
+            if not dir_state:
+                return False
+            log.debug(f'removing dir_state: {dir_state}')
+            dir_state.purging = True
+            return self.set_state(dir_state, State.DISASSOCIATING)
+
+    def add_instances_initial(self, instance_ids):
+        """Take care of figuring out instances which no longer exist
+        and remove them. This is to be done only once on startup to
+        identify instances which were previously removed but directories
+        are still mapped (on-disk) to them.
+        """
+        for instance_id in instance_ids:
+            if not instance_id in self.instance_to_dir_map:
+                self.instance_to_dir_map[instance_id] = []
+        dead_instances = []
+        for instance_id, _ in self.instance_to_dir_map.items():
+            if not instance_id in instance_ids:
+                dead_instances.append(instance_id)
+        if dead_instances:
+            self._remove_instances(dead_instances)
+
+    def add_instances(self, instance_ids, initial_update=False):
+        log.debug(f'adding instances: {instance_ids} initial_update {initial_update}')
+        with self.lock:
+            if initial_update:
+                self.add_instances_initial(instance_ids)
+            else:
+                nr_instances = len(self.instance_to_dir_map)
+                nr_dead_instances = len(self.dead_instances)
+                if nr_instances > 0:
+                    # adjust dead instances
+                    nr_instances -= nr_dead_instances
+                include_stalled_dirs = nr_instances == 0
+                for instance_id in instance_ids:
+                    if not instance_id in self.instance_to_dir_map:
+                        self.instance_to_dir_map[instance_id] = []
+                dirs_per_instance = int(len(self.dir_states) /
+                                        (len(self.instance_to_dir_map) - nr_dead_instances))
+                if dirs_per_instance == 0:
+                    dirs_per_instance += 1
+                shuffle_dirs = []
+                # super set of directories which are candidates for shuffling -- choose
+                # those which can be shuffle rightaway (others will be shuffled when
+                # they reach idle state).
+                shuffle_dirs_ss = self.shuffle(dirs_per_instance, include_stalled_dirs)
+                if include_stalled_dirs:
+                    return shuffle_dirs_ss
+                for dir_path in shuffle_dirs_ss:
+                    dir_state = self.dir_states[dir_path]
+                    if self.set_state(dir_state, State.SHUFFLING):
+                        shuffle_dirs.append(dir_path)
+                log.debug(f'remapping directories: {shuffle_dirs}')
+                return shuffle_dirs
+
+    def remove_instances(self, instance_ids):
+        with self.lock:
+            return self._remove_instances(instance_ids)
+
+    def _remove_instances(self, instance_ids):
+        log.debug(f'removing instances: {instance_ids}')
+        shuffle_dirs = []
+        for instance_id in instance_ids:
+            if not instance_id in self.instance_to_dir_map:
+                continue
+            if not self.instance_to_dir_map[instance_id]:
+                self.instance_to_dir_map.pop(instance_id)
+                continue
+            self.dead_instances.append(instance_id)
+            dir_paths = self.instance_to_dir_map[instance_id]
+            log.debug(f'force shuffling instance_id {instance_id}, directories {dir_paths}')
+            for dir_path in dir_paths:
+                dir_state = self.dir_states[dir_path]
+                if self.is_state_scheduled(dir_state, State.DISASSOCIATING):
+                    log.debug(f'dir_path {dir_path} is disassociating, ignoring...')
+                    continue
+                log.debug(f'shuffling dir_path {dir_path}')
+                if self.set_state(dir_state, State.SHUFFLING, True):
+                    shuffle_dirs.append(dir_path)
+        log.debug(f'shuffling {shuffle_dirs}')
+        return shuffle_dirs
+
+    def dir_status(self, dir_path):
+        with self.lock:
+            dir_state = self.dir_states.get(dir_path, None)
+            if not dir_state:
+                raise MirrorException(-errno.ENOENT, f'{dir_path} is not tracked')
+            res = {} # type: Dict
+            if dir_state.stalled:
+                res['state'] = 'stalled'
+                res['reason'] = 'no mirror daemons running'
+            elif dir_state.state == State.ASSOCIATING:
+                res['state'] = 'mapping'
+            else:
+                state = None
+                dstate = dir_state.state
+                if dstate == State.ASSOCIATING:
+                    state = 'mapping'
+                elif dstate == State.DISASSOCIATING:
+                    state = 'unmapping'
+                elif dstate == State.SHUFFLING:
+                    state = 'shuffling'
+                elif dstate == State.ASSOCIATED:
+                    state = 'mapped'
+                elif dstate == State.INITIALIZING:
+                    state = 'resolving'
+                res['state'] = state
+                res['instance_id'] = dir_state.instance_id
+                res['last_shuffled'] = dir_state.mapped_time
+            return res
+
+    def instance_summary(self):
+        with self.lock:
+            res = {
+                'mapping': {}
+            } # type: Dict
+            for instance_id, dir_paths in self.instance_to_dir_map.items():
+                res['mapping'][instance_id] = f'{len(dir_paths)} directories'
+            return res
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/state_transition.py b/src/pybind/mgr/mirroring/fs/dir_map/state_transition.py
new file mode 100644 (file)
index 0000000..ef59a6a
--- /dev/null
@@ -0,0 +1,94 @@
+import logging
+from enum import Enum, unique
+from typing import Dict
+
+log = logging.getLogger(__name__)
+
+@unique
+class State(Enum):
+    UNASSOCIATED   = 0
+    INITIALIZING   = 1
+    ASSOCIATING    = 2
+    ASSOCIATED     = 3
+    SHUFFLING      = 4
+    DISASSOCIATING = 5
+
+@unique
+class ActionType(Enum):
+    NONE       = 0
+    MAP_UPDATE = 1
+    MAP_REMOVE = 2
+    ACQUIRE    = 3
+    RELEASE    = 4
+
+@unique
+class PolicyAction(Enum):
+    MAP    = 0
+    UNMAP  = 1
+    REMOVE = 2
+
+class TransitionKey:
+    def __init__(self, state, action_type):
+        self.transition_key = [state, action_type]
+
+    def __hash__(self):
+        return hash(tuple(self.transition_key))
+
+    def __eq__(self, other):
+        return self.transition_key == other.transition_key
+
+    def __neq__(self, other):
+        return not(self == other)
+
+class Transition:
+    def __init__(self, action_type, start_policy_action=None,
+                 finish_policy_action=None, final_state=None):
+        self.action_type = action_type
+        self.start_policy_action = start_policy_action
+        self.finish_policy_action = finish_policy_action
+        self.final_state = final_state
+
+    def __str__(self):
+        return "[action_type={0}, start_policy_action={1}, finish_policy_action={2}, final_state={3}".format(
+            self.action_type, self.start_policy_action, self.finish_policy_action, self.final_state)
+
+class StateTransition:
+    transition_table = {} # type: Dict[TransitionKey, Transition]
+
+    @staticmethod
+    def transit(state, action_type):
+        try:
+            return StateTransition.transition_table[TransitionKey(state, action_type)]
+        except KeyError:
+            raise Exception()
+
+    @staticmethod
+    def is_idle(state):
+        return state in (State.UNASSOCIATED, State.ASSOCIATED)
+
+StateTransition.transition_table = {
+    TransitionKey(State.INITIALIZING, ActionType.NONE) : Transition(ActionType.ACQUIRE),
+    TransitionKey(State.INITIALIZING, ActionType.ACQUIRE) : Transition(ActionType.NONE,
+                                                                       final_state=State.ASSOCIATED),
+
+    TransitionKey(State.ASSOCIATING, ActionType.NONE) : Transition(ActionType.MAP_UPDATE,
+                                                                   start_policy_action=PolicyAction.MAP),
+    TransitionKey(State.ASSOCIATING, ActionType.MAP_UPDATE) : Transition(ActionType.ACQUIRE),
+    TransitionKey(State.ASSOCIATING, ActionType.ACQUIRE) : Transition(ActionType.NONE,
+                                                                      final_state=State.ASSOCIATED),
+
+    TransitionKey(State.DISASSOCIATING, ActionType.NONE) : Transition(ActionType.RELEASE,
+                                                                      finish_policy_action=PolicyAction.UNMAP),
+    TransitionKey(State.DISASSOCIATING, ActionType.RELEASE) : Transition(ActionType.MAP_REMOVE,
+                                                                         finish_policy_action=PolicyAction.REMOVE),
+    TransitionKey(State.DISASSOCIATING, ActionType.MAP_REMOVE) : Transition(ActionType.NONE,
+                                                                            final_state=State.UNASSOCIATED),
+
+    TransitionKey(State.SHUFFLING, ActionType.NONE) : Transition(ActionType.RELEASE,
+                                                                 finish_policy_action=PolicyAction.UNMAP),
+    TransitionKey(State.SHUFFLING, ActionType.RELEASE) : Transition(ActionType.MAP_UPDATE,
+                                                                    start_policy_action=PolicyAction.MAP),
+    TransitionKey(State.SHUFFLING, ActionType.MAP_UPDATE) : Transition(ActionType.ACQUIRE),
+    TransitionKey(State.SHUFFLING, ActionType.ACQUIRE) : Transition(ActionType.NONE,
+                                                                    final_state=State.ASSOCIATED),
+    }
diff --git a/src/pybind/mgr/mirroring/fs/dir_map/update.py b/src/pybind/mgr/mirroring/fs/dir_map/update.py
new file mode 100644 (file)
index 0000000..a70baa0
--- /dev/null
@@ -0,0 +1,151 @@
+import errno
+import pickle
+import logging
+
+import rados
+
+from ..utils import MIRROR_OBJECT_NAME, DIRECTORY_MAP_PREFIX, \
+    INSTANCE_ID_PREFIX, MIRROR_OBJECT_PREFIX
+
+log = logging.getLogger(__name__)
+
+MAX_UPDATE = 256
+
+class UpdateDirMapRequest:
+    def __init__(self, ioctx, update_mapping, removals, on_finish_callback):
+        self.ioctx = ioctx
+        self.update_mapping = update_mapping
+        self.removals = removals
+        self.on_finish_callback = on_finish_callback
+
+    @staticmethod
+    def omap_key(dir_path):
+        return f'{DIRECTORY_MAP_PREFIX}{dir_path}'
+
+    def send(self):
+        log.info('updating image map')
+        self.send_update()
+
+    def send_update(self):
+        log.debug(f'pending updates: {len(self.update_mapping)}+{len(self.removals)}')
+        try:
+            with rados.WriteOpCtx() as write_op:
+                keys = []
+                vals = []
+                dir_keys = list(self.update_mapping.keys())[0:MAX_UPDATE]
+                # gather updates
+                for dir_path in dir_keys:
+                    mapping = self.update_mapping.pop(dir_path)
+                    keys.append(UpdateDirMapRequest.omap_key(dir_path))
+                    vals.append(pickle.dumps(mapping))
+                    self.ioctx.set_omap(write_op, tuple(keys), tuple(vals))
+                # gather deletes
+                slicept = MAX_UPDATE - len(dir_keys)
+                removals = [UpdateDirMapRequest.omap_key(dir_path) for dir_path in self.removals[0:slicept]]
+                self.removals = self.removals[slicept:]
+                self.ioctx.remove_omap_keys(write_op, tuple(removals))
+                log.debug(f'applying {len(keys)} updates, {len(removals)} deletes')
+                self.ioctx.operate_aio_write_op(write_op, MIRROR_OBJECT_NAME, oncomplete=self.handle_update)
+        except rados.Error as e:
+            log.error(f'UpdateDirMapRequest.send_update exception: {e}')
+            self.finish(-e.args[0])
+
+    def handle_update(self, completion):
+        r = completion.get_return_value()
+        log.debug(f'handle_update: r={r}')
+        if not r == 0:
+            self.finish(r)
+        elif self.update_mapping or self.removals:
+            self.send_update()
+        else:
+            self.finish(0)
+
+    def finish(self, r):
+        log.info(f'finish: r={r}')
+        self.on_finish_callback(r)
+
+class UpdateInstanceRequest:
+    def __init__(self, ioctx, instances_added, instances_removed, on_finish_callback):
+        self.ioctx = ioctx
+        self.instances_added = instances_added
+        # purge vs remove: purge list is for purging on-disk instance
+        # object. remove is for purging instance map.
+        self.instances_removed = instances_removed.copy()
+        self.instances_purge = instances_removed.copy()
+        self.on_finish_callback = on_finish_callback
+
+    @staticmethod
+    def omap_key(instance_id):
+        return f'{INSTANCE_ID_PREFIX}{instance_id}'
+
+    @staticmethod
+    def cephfs_mirror_object_name(instance_id):
+        assert instance_id != ''
+        return f'{MIRROR_OBJECT_PREFIX}.{instance_id}'
+
+    def send(self):
+        log.info('updating instances')
+        self.send_update()
+
+    def send_update(self):
+        self.remove_instance_object()
+
+    def remove_instance_object(self):
+        log.debug(f'pending purges: {len(self.instances_purge)}')
+        if not self.instances_purge:
+            self.update_instance_map()
+            return
+        instance_id = self.instances_purge.pop()
+        self.ioctx.aio_remove(
+            UpdateInstanceRequest.cephfs_mirror_object_name(instance_id), oncomplete=self.handle_remove)
+
+    def handle_remove(self, completion):
+        r = completion.get_return_value()
+        log.debug(f'handle_remove: r={r}')
+        # cephfs-mirror instances remove their respective instance
+        # objects upon termination. so we handle ENOENT here. note
+        # that when an instance is blocklisted, it wont be able to
+        # purge its instance object, so we do it on its behalf.
+        if not r == 0 and not r == -errno.ENOENT:
+            self.finish(r)
+            return
+        self.remove_instance_object()
+
+    def update_instance_map(self):
+        log.debug(f'pending updates: {len(self.instances_added)}+{len(self.instances_removed)}')
+        try:
+            with rados.WriteOpCtx() as write_op:
+                keys = []
+                vals = []
+                instance_ids = list(self.instances_added.keys())[0:MAX_UPDATE]
+                # gather updates
+                for instance_id in instance_ids:
+                    data = self.instances_added.pop(instance_id)
+                    keys.append(UpdateInstanceRequest.omap_key(instance_id))
+                    vals.append(pickle.dumps(data))
+                    self.ioctx.set_omap(write_op, tuple(keys), tuple(vals))
+                # gather deletes
+                slicept = MAX_UPDATE - len(instance_ids)
+                removals = [UpdateInstanceRequest.omap_key(instance_id) \
+                            for instance_id in self.instances_removed[0:slicept]]
+                self.instances_removed = self.instances_removed[slicept:]
+                self.ioctx.remove_omap_keys(write_op, tuple(removals))
+                log.debug(f'applying {len(keys)} updates, {len(removals)} deletes')
+                self.ioctx.operate_aio_write_op(write_op, MIRROR_OBJECT_NAME, oncomplete=self.handle_update)
+        except rados.Error as e:
+            log.error(f'UpdateInstanceRequest.update_instance_map exception: {e}')
+            self.finish(-e.args[0])
+
+    def handle_update(self, completion):
+        r = completion.get_return_value()
+        log.debug(f'handle_update: r={r}')
+        if not r == 0:
+            self.finish(r)
+        elif self.instances_added or self.instances_removed:
+            self.update_instance_map()
+        else:
+            self.finish(0)
+
+    def finish(self, r):
+        log.info(f'finish: r={r}')
+        self.on_finish_callback(r)
diff --git a/src/pybind/mgr/mirroring/fs/exception.py b/src/pybind/mgr/mirroring/fs/exception.py
new file mode 100644 (file)
index 0000000..d041b27
--- /dev/null
@@ -0,0 +1,3 @@
+class MirrorException(Exception):
+    def __init__(self, error_code, error_msg=''):
+        super().__init__(error_code, error_msg)
diff --git a/src/pybind/mgr/mirroring/fs/notify.py b/src/pybind/mgr/mirroring/fs/notify.py
new file mode 100644 (file)
index 0000000..992cba2
--- /dev/null
@@ -0,0 +1,121 @@
+import errno
+import json
+import logging
+import threading
+import time
+
+import rados
+
+from .utils import MIRROR_OBJECT_PREFIX, AsyncOpTracker
+
+log = logging.getLogger(__name__)
+
+class Notifier:
+    def __init__(self, ioctx):
+        self.ioctx = ioctx
+
+    @staticmethod
+    def instance_object(instance_id):
+        return f'{MIRROR_OBJECT_PREFIX}.{instance_id}'
+
+    def notify_cbk(self, dir_path, callback):
+        def cbk(_, r, acks, timeouts):
+            log.debug(f'Notifier.notify_cbk: ret {r} acks: {acks} timeouts: {timeouts}')
+            callback(dir_path, r)
+        return cbk
+
+    def notify(self, dir_path, message, callback):
+        try:
+            instance_id = message[0]
+            message = message[1]
+            log.debug(f'Notifier.notify: {instance_id} {message} for {dir_path}')
+            self.ioctx.aio_notify(
+                Notifier.instance_object(
+                    instance_id), self.notify_cbk(dir_path, callback), msg=message)
+        except rados.Error as e:
+            log.error(f'Notifier exception: {e}')
+            raise e
+
+class InstanceWatcher:
+    INSTANCE_TIMEOUT = 30
+    NOTIFY_INTERVAL = 1
+
+    class Listener:
+        def handle_instances(self, added, removed):
+            raise NotImplementedError()
+
+    def __init__(self, ioctx, instances, listener):
+        self.ioctx = ioctx
+        self.listener = listener
+        self.instances = {}
+        for instance_id, data in instances.items():
+            self.instances[instance_id] = {'addr': data['addr'],
+                                           'seen': time.time()}
+        self.lock = threading.Lock()
+        self.cond = threading.Condition(self.lock)
+        self.done = threading.Event()
+        self.waiting = threading.Event()
+        self.notify_task = None
+        self.schedule_notify_task()
+
+    def schedule_notify_task(self):
+        assert self.notify_task == None
+        self.notify_task = threading.Timer(InstanceWatcher.NOTIFY_INTERVAL, self.notify)
+        self.notify_task.start()
+
+    def wait_and_stop(self):
+        with self.lock:
+            log.info('InstanceWatcher.wait_and_stop')
+            self.waiting.set()
+            self.cond.wait_for(lambda: self.done.is_set())
+            log.info('waiting done')
+            assert self.notify_task == None
+
+    def handle_notify(self, _, r, acks, timeouts):
+        log.debug(f'InstanceWatcher.handle_notify r={r} acks={acks} timeouts={timeouts}')
+        with self.lock:
+            try:
+                added = {}
+                removed = {}
+                if acks is None:
+                    acks = []
+                ackd_instances = []
+                for ack in acks:
+                    instance_id = str(ack[0])
+                    ackd_instances.append(instance_id)
+                    # sender data is quoted
+                    notifier_data = json.loads(ack[2].decode('utf-8'))
+                    log.debug(f'InstanceWatcher.handle_notify: {instance_id}: {notifier_data}')
+                    if not instance_id in self.instances:
+                        self.instances[instance_id] = {}
+                        added[instance_id] = notifier_data['addr']
+                    self.instances[instance_id]['addr'] = notifier_data['addr']
+                    self.instances[instance_id]['seen'] = time.time()
+                # gather non responders
+                now = time.time()
+                for instance_id in list(self.instances.keys()):
+                    data = self.instances[instance_id]
+                    if (now - data['seen'] > InstanceWatcher.INSTANCE_TIMEOUT) or \
+                       (self.waiting.is_set() and instance_id not in ackd_instances):
+                        removed[instance_id] = data['addr']
+                        self.instances.pop(instance_id)
+                if added or removed:
+                    self.listener.handle_instances(added, removed)
+            except Exception as e:
+                log.warn(f'InstanceWatcher.handle_notify exception: {e}')
+            finally:
+                if not self.instances and self.waiting.is_set():
+                    self.done.set()
+                    self.cond.notifyAll()
+                else:
+                    self.schedule_notify_task()
+
+    def notify(self):
+        with self.lock:
+            self.notify_task = None
+            try:
+                log.debug('InstanceWatcher.notify')
+                self.ioctx.aio_notify(MIRROR_OBJECT_PREFIX, self.handle_notify)
+            except rados.Error as e:
+                log.warn(f'InstanceWatcher exception: {e}')
+                self.schedule_notify_task()
diff --git a/src/pybind/mgr/mirroring/fs/snapshot_mirror.py b/src/pybind/mgr/mirroring/fs/snapshot_mirror.py
new file mode 100644 (file)
index 0000000..e9e3e32
--- /dev/null
@@ -0,0 +1,601 @@
+import errno
+import json
+import logging
+import os
+import pickle
+import re
+import stat
+import threading
+import uuid
+
+import cephfs
+import rados
+
+from mgr_util import RTimer, CephfsClient, open_filesystem,\
+    CephfsConnectionException
+from .blocklist import blocklist
+from .notify import Notifier, InstanceWatcher
+from .utils import INSTANCE_ID_PREFIX, MIRROR_OBJECT_NAME, Finisher, \
+    AsyncOpTracker, connect_to_filesystem, disconnect_from_filesystem
+from .exception import MirrorException
+from .dir_map.create import create_mirror_object
+from .dir_map.load import load_dir_map, load_instances
+from .dir_map.update import UpdateDirMapRequest, UpdateInstanceRequest
+from .dir_map.policy import Policy
+from .dir_map.state_transition import ActionType
+
+log = logging.getLogger(__name__)
+
+CEPHFS_IMAGE_POLICY_UPDATE_THROTTLE_INTERVAL = 1
+
+class FSPolicy:
+    class InstanceListener(InstanceWatcher.Listener):
+        def __init__(self, fspolicy):
+            self.fspolicy = fspolicy
+
+        def handle_instances(self, added, removed):
+            self.fspolicy.update_instances(added, removed)
+
+    def __init__(self, mgr, ioctx):
+        self.mgr = mgr
+        self.ioctx = ioctx
+        self.pending = []
+        self.policy = Policy()
+        self.lock = threading.Lock()
+        self.cond = threading.Condition(self.lock)
+        self.dir_paths = []
+        self.async_requests = {}
+        self.finisher = Finisher()
+        self.op_tracker = AsyncOpTracker()
+        self.notifier = Notifier(ioctx)
+        self.instance_listener = FSPolicy.InstanceListener(self)
+        self.instance_watcher = None
+        self.stopping = threading.Event()
+        self.timer_task = RTimer(CEPHFS_IMAGE_POLICY_UPDATE_THROTTLE_INTERVAL,
+                                 self.process_updates)
+        self.timer_task.start()
+
+    def schedule_action(self, dir_paths):
+        self.dir_paths.extend(dir_paths)
+
+    def init(self, dir_mapping, instances):
+        with self.lock:
+            self.policy.init(dir_mapping)
+            # we'll schedule action for all directories, so don't bother capturing
+            # directory names here.
+            self.policy.add_instances(list(instances.keys()), initial_update=True)
+            self.instance_watcher = InstanceWatcher(self.ioctx, instances,
+                                                    self.instance_listener)
+            self.schedule_action(list(dir_mapping.keys()))
+
+    def shutdown(self):
+        with self.lock:
+            log.debug('FSPolicy.shutdown')
+            self.stopping.set()
+            log.debug('canceling update timer task')
+            self.timer_task.cancel()
+            log.debug('update timer task canceled')
+        if self.instance_watcher:
+            log.debug('stopping instance watcher')
+            self.instance_watcher.wait_and_stop()
+            log.debug('stopping instance watcher')
+        self.op_tracker.wait_for_ops()
+        log.debug('FSPolicy.shutdown done')
+
+    def handle_update_mapping(self, updates, removals, request_id, callback, r):
+        log.info(f'handle_update_mapping: {updates} {removals} {request_id} {callback} {r}')
+        with self.lock:
+            try:
+                self.async_requests.pop(request_id)
+                if callback:
+                    callback(updates, removals, r)
+            finally:
+                self.op_tracker.finish_async_op()
+
+    def handle_update_instances(self, instances_added, instances_removed, request_id, r):
+        log.info(f'handle_update_instances: {instances_added} {instances_removed} {request_id} {r}')
+        with self.lock:
+            try:
+                self.async_requests.pop(request_id)
+                if self.stopping.is_set():
+                    log.debug(f'handle_update_instances: policy shutting down')
+                    return
+                schedules = []
+                if instances_removed:
+                    schedules.extend(self.policy.remove_instances(instances_removed))
+                if instances_added:
+                    schedules.extend(self.policy.add_instances(instances_added))
+                self.schedule_action(schedules)
+            finally:
+                self.op_tracker.finish_async_op()
+
+    def update_mapping(self, update_map, removals, callback=None):
+        log.info(f'updating directory map: {len(update_map)}+{len(removals)} updates')
+        request_id = str(uuid.uuid4())
+        def async_callback(r):
+            self.finisher.queue(self.handle_update_mapping,
+                                [list(update_map.keys()), removals, request_id, callback, r])
+        request = UpdateDirMapRequest(self.ioctx, update_map.copy(), removals.copy(), async_callback)
+        self.async_requests[request_id] = request
+        self.op_tracker.start_async_op()
+        log.debug(f'async request_id: {request_id}')
+        request.send()
+
+    def update_instances(self, added, removed):
+        logging.debug(f'update_instances: added={added}, removed={removed}')
+        for instance_id, addr in removed.items():
+            log.info(f'blocklisting instance_id: {instance_id} addr: {addr}')
+            blocklist(self.mgr, addr)
+        with self.lock:
+            instances_added = {}
+            instances_removed = []
+            for instance_id, addr in added.items():
+                instances_added[instance_id] = {'version': 1, 'addr': addr}
+            instances_removed = list(removed.keys())
+            request_id = str(uuid.uuid4())
+            def async_callback(r):
+                self.finisher.queue(self.handle_update_instances,
+                                    [list(instances_added.keys()), instances_removed, request_id, r])
+            # blacklisted instances can be removed at this point. remapping directories
+            # mapped to blacklisted instances on module startup is handled in policy
+            # add_instances().
+            request = UpdateInstanceRequest(self.ioctx, instances_added.copy(),
+                                            instances_removed.copy(), async_callback)
+            self.async_requests[request_id] = request
+            log.debug(f'async request_id: {request_id}')
+            self.op_tracker.start_async_op()
+            request.send()
+
+    def continue_action(self, updates, removals, r):
+        log.debug(f'continuing action: {updates}+{removals} r={r}')
+        if self.stopping.is_set():
+            log.debug('continue_action: policy shutting down')
+            return
+        schedules = []
+        for dir_path in updates:
+            schedule = self.policy.finish_action(dir_path, r)
+            if schedule:
+                schedules.append(dir_path)
+        for dir_path in removals:
+            schedule = self.policy.finish_action(dir_path, r)
+            if schedule:
+                schedules.append(dir_path)
+        self.schedule_action(schedules)
+
+    def handle_peer_ack(self, dir_path, r):
+        log.info(f'handle_peer_ack: {dir_path} r={r}')
+        with self.lock:
+            try:
+                if self.stopping.is_set():
+                    log.debug(f'handle_peer_ack: policy shutting down')
+                    return
+                self.continue_action([dir_path], [], r)
+            finally:
+                self.op_tracker.finish_async_op()
+
+    def process_updates(self):
+        def acquire_message(dir_path):
+            return json.dumps({'dir_path': dir_path,
+                               'mode': 'acquire'
+                               })
+        def release_message(dir_path):
+            return json.dumps({'dir_path': dir_path,
+                               'mode': 'release'
+                               })
+        with self.lock:
+            if not self.dir_paths or self.stopping.is_set():
+                return
+            update_map = {}
+            removals = []
+            notifies = {}
+            instance_purges = []
+            for dir_path in self.dir_paths:
+                action_type = self.policy.start_action(dir_path)
+                lookup_info = self.policy.lookup(dir_path)
+                log.debug(f'processing action: dir_path: {dir_path}, lookup_info: {lookup_info}, action_type: {action_type}')
+                if action_type == ActionType.NONE:
+                    continue
+                elif action_type == ActionType.MAP_UPDATE:
+                    # take care to not overwrite purge status
+                    update_map[dir_path] = {'version': 1,
+                                            'instance_id': lookup_info['instance_id'],
+                                            'last_shuffled': lookup_info['mapped_time']
+                    }
+                    if lookup_info['purging']:
+                        update_map[dir_path]['purging'] = 1
+                elif action_type == ActionType.MAP_REMOVE:
+                    removals.append(dir_path)
+                elif action_type == ActionType.ACQUIRE:
+                    notifies[dir_path] = (lookup_info['instance_id'], acquire_message(dir_path))
+                elif action_type == ActionType.RELEASE:
+                    notifies[dir_path] = (lookup_info['instance_id'], release_message(dir_path))
+            if update_map or removals:
+                self.update_mapping(update_map, removals, callback=self.continue_action)
+            for dir_path, message in notifies.items():
+                self.op_tracker.start_async_op()
+                self.notifier.notify(dir_path, message, self.handle_peer_ack)
+            self.dir_paths.clear()
+
+    def add_dir(self, dir_path):
+        with self.lock:
+            lookup_info = self.policy.lookup(dir_path)
+            if lookup_info:
+                if lookup_info['purging']:
+                    raise MirrorException(-errno.EAGAIN, f'remove in-progress for {dir_path}')
+                else:
+                    raise MirrorException(-errno.EEXIST, f'directory {dir_path} is already tracked')
+            schedule = self.policy.add_dir(dir_path)
+            if not schedule:
+                return
+            update_map = {dir_path: {'version': 1, 'instance_id': '', 'last_shuffled': 0.0}}
+            updated = False
+            def update_safe(updates, removals, r):
+                nonlocal updated
+                updated = True
+                self.cond.notifyAll()
+            self.update_mapping(update_map, [], callback=update_safe)
+            self.cond.wait_for(lambda: updated)
+            self.schedule_action([dir_path])
+
+    def remove_dir(self, dir_path):
+        with self.lock:
+            lookup_info = self.policy.lookup(dir_path)
+            if not lookup_info:
+                raise MirrorException(-errno.ENOENT, f'directory {dir_path} id not tracked')
+            if lookup_info['purging']:
+                raise MirrorException(-errno.EINVAL, f'directory {dir_path} is under removal')
+            update_map = {dir_path: {'version': 1,
+                                     'instance_id': lookup_info['instance_id'],
+                                     'last_shuffled': lookup_info['mapped_time'],
+                                     'purging': 1}}
+            updated = False
+            sync_lock = threading.Lock()
+            sync_cond = threading.Condition(sync_lock)
+            def update_safe(r):
+                with sync_lock:
+                    nonlocal updated
+                    updated = True
+                    sync_cond.notifyAll()
+            request = UpdateDirMapRequest(self.ioctx, update_map.copy(), [], update_safe)
+            request.send()
+            with sync_lock:
+                sync_cond.wait_for(lambda: updated)
+            schedule = self.policy.remove_dir(dir_path)
+            if schedule:
+                self.schedule_action([dir_path])
+
+    def status(self, dir_path):
+        with self.lock:
+            res = self.policy.dir_status(dir_path)
+            return 0, json.dumps(res, indent=4, sort_keys=True), ''
+
+    def summary(self):
+        with self.lock:
+            res = self.policy.instance_summary()
+            return 0, json.dumps(res, indent=4, sort_keys=True), ''
+
+class FSSnapshotMirror:
+    def __init__(self, mgr):
+        self.mgr = mgr
+        self.rados = mgr.rados
+        self.pool_policy = {}
+        self.fs_map = self.mgr.get('fs_map')
+        self.lock = threading.Lock()
+        self.refresh_pool_policy()
+        self.local_fs = CephfsClient(mgr)
+
+    def notify(self, notify_type):
+        log.debug(f'got notify type {notify_type}')
+        if notify_type == 'fs_map':
+            with self.lock:
+                self.fs_map = self.mgr.get('fs_map')
+                self.refresh_pool_policy_locked()
+
+    @staticmethod
+    def split_spec(spec):
+        try:
+            client_id, cluster_name = spec.split('@')
+            _, client_name = client_id.split('.')
+            return client_name, cluster_name
+        except ValueError:
+            raise MirrorException(-errno.EINVAL, f'invalid cluster spec {spec}')
+
+    @staticmethod
+    def get_metadata_pool(filesystem, fs_map):
+        for fs in fs_map['filesystems']:
+            if fs['mdsmap']['fs_name'] == filesystem:
+                return fs['mdsmap']['metadata_pool']
+        return None
+
+    @staticmethod
+    def get_filesystem_id(filesystem, fs_map):
+        for fs in fs_map['filesystems']:
+            if fs['mdsmap']['fs_name'] == filesystem:
+                return fs['id']
+        return None
+
+    def filesystem_exist(self, filesystem):
+        for fs in self.fs_map['filesystems']:
+            if fs['mdsmap']['fs_name'] == filesystem:
+                return True
+        return False
+
+    def get_mirrored_filesystems(self):
+        return [fs['mdsmap']['fs_name'] for fs in self.fs_map['filesystems'] if fs.get('mirror_info', None)]
+
+    def get_filesystem_peers(self, filesystem):
+        """To be used when mirroring in enabled for the filesystem"""
+        for fs in self.fs_map['filesystems']:
+            if fs['mdsmap']['fs_name'] == filesystem:
+                return fs['mirror_info']['peers']
+        return None
+
+    def get_mirror_info(self, remote_fs):
+        try:
+            val = remote_fs.getxattr('/', 'ceph.mirror.info')
+            match = re.search(r'^cluster_id=([a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}) fs_id=(\d+)$',
+                              val.decode('utf-8'))
+            if match and len(match.groups()) == 2:
+                return {'cluster_id': match.group(1),
+                        'fs_id': int(match.group(2))
+                        }
+            return None
+        except cephfs.Error as e:
+            return None
+
+    def set_mirror_info(self, local_cluster_id, local_fsid, remote_fs):
+        log.info(f'setting {local_cluster_id}::{local_fsid} on remote')
+        try:
+            remote_fs.setxattr('/', 'ceph.mirror.info',
+                               f'cluster_id={local_cluster_id} fs_id={local_fsid}'.encode('utf-8'), os.XATTR_CREATE)
+        except cephfs.Error as e:
+            if e.errno == errno.EEXIST:
+                mi = self.get_mirror_info(remote_fs)
+                if not mi:
+                    log.error(f'error fetching mirror info when setting mirror info')
+                    raise Exception(-errno.EINVAL)
+                cluster_id = mi['cluster_id']
+                fs_id = mi['fs_id']
+                if not (cluster_id == local_cluster_id and fs_id == local_fsid):
+                    raise MirrorException(-errno.EEXIST, f'peer mirrorred by: (cluster_id: {cluster_id}, fs_id: {fs_id})')
+            else:
+                log.error(f'error setting mirrored fsid: {e}')
+                raise Exception(-e.errno)
+
+    def resolve_peer(self, fs_name, peer_uuid):
+        peers = self.get_filesystem_peers(fs_name)
+        for peer, rem in peers.items():
+            if peer == peer_uuid:
+                return rem['remote']
+        return None
+
+    def purge_mirror_info(self, local_fs_name, peer_uuid):
+        log.debug(f'local fs={local_fs_name} peer_uuid={peer_uuid}')
+        rem = self.resolve_peer(local_fs_name, peer_uuid)
+        log.debug(f'peer_uuid={peer_uuid} resolved to {rem}')
+        if rem:
+            client_name = rem['client_name']
+            cluster_name = rem['cluster_name']
+            client_name, cluster_name = FSSnapshotMirror.split_spec(f'{client_name}@{cluster_name}')
+            remote_cluster, remote_fs = connect_to_filesystem(client_name,
+                                                              cluster_name,
+                                                              rem['fs_name'], 'remote')
+            try:
+                remote_fs.removexattr('/', 'ceph.mirror.info')
+            except cephfs.Error as e:
+                if not e.errno == errno.ENOENT:
+                    log.error('error removing mirror info')
+                    raise Exception(-e.errno)
+            finally:
+                disconnect_from_filesystem(cluster_name, rem['fs_name'], remote_cluster, remote_fs)
+
+    def verify_and_set_mirror_info(self, local_fs_name, remote_cluster_spec, remote_fs_name):
+        log.debug(f'local fs={local_fs_name} remote={remote_cluster_spec}/{remote_fs_name}')
+
+        client_name, cluster_name = FSSnapshotMirror.split_spec(remote_cluster_spec)
+        remote_cluster, remote_fs = connect_to_filesystem(client_name, cluster_name,
+                                                          remote_fs_name, 'remote')
+
+        local_fsid = FSSnapshotMirror.get_filesystem_id(local_fs_name, self.fs_map)
+        if local_fsid is None:
+            log.error(f'error looking up filesystem id for {local_fs_name}')
+            raise Exception(-errno.EINVAL)
+
+        # post cluster id comparison, filesystem name comparison would suffice
+        local_cluster_id = self.rados.get_fsid()
+        remote_cluster_id = remote_cluster.get_fsid()
+        log.debug(f'local_cluster_id={local_cluster_id} remote_cluster_id={remote_cluster_id}')
+        if local_cluster_id == remote_cluster_id and local_fs_name == remote_fs_name:
+            raise MirrorException(-errno.EINVAL, "'Source and destination cluster fsid and "\
+                                  "file-system name can't be the same")
+
+        try:
+            self.set_mirror_info(local_cluster_id, local_fsid, remote_fs)
+        finally:
+            disconnect_from_filesystem(cluster_name, remote_fs_name, remote_cluster, remote_fs)
+
+    def init_pool_policy(self, filesystem):
+        metadata_pool_id = FSSnapshotMirror.get_metadata_pool(filesystem, self.fs_map)
+        if not metadata_pool_id:
+            log.error(f'cannot find metadata pool-id for filesystem {filesystem}')
+            raise Exception(-errno.EINVAL)
+        try:
+            ioctx = self.rados.open_ioctx2(metadata_pool_id)
+            # TODO: make async if required
+            dir_mapping = load_dir_map(ioctx)
+            instances = load_instances(ioctx)
+            # init policy
+            fspolicy = FSPolicy(self.mgr, ioctx)
+            log.debug(f'init policy for filesystem {filesystem}: pool-id {metadata_pool_id}')
+            fspolicy.init(dir_mapping, instances)
+            self.pool_policy[filesystem] = fspolicy
+        except rados.Error as e:
+            log.error(f'failed to access pool-id {metadata_pool_id} for filesystem {filesystem}: {e}')
+            raise Exception(-e.errno)
+
+    def refresh_pool_policy_locked(self):
+        filesystems = self.get_mirrored_filesystems()
+        log.debug(f'refreshing policy for {filesystems}')
+        for filesystem in list(self.pool_policy):
+            if not filesystem in filesystems:
+                log.info(f'shutdown pool policy for {filesystem}')
+                fspolicy = self.pool_policy.pop(filesystem)
+                fspolicy.shutdown()
+        for filesystem in filesystems:
+            if not filesystem in self.pool_policy:
+                log.info(f'init pool policy for {filesystem}')
+                self.init_pool_policy(filesystem)
+
+    def refresh_pool_policy(self):
+        with self.lock:
+            self.refresh_pool_policy_locked()
+
+    def enable_mirror(self, filesystem):
+        log.info(f'enabling mirror for filesystem {filesystem}')
+        with self.lock:
+            try:
+                metadata_pool_id = FSSnapshotMirror.get_metadata_pool(filesystem, self.fs_map)
+                if not metadata_pool_id:
+                    log.error(f'cannot find metadata pool-id for filesystem {filesystem}')
+                    raise Exception(-errno.EINVAL)
+                create_mirror_object(self.rados, metadata_pool_id)
+                cmd = {'prefix': 'fs mirror enable', 'fs_name': filesystem}
+                r, outs, err = self.mgr.mon_command(cmd)
+                if r < 0:
+                    log.error(f'mon command to enable mirror failed: {err}')
+                    raise Exception(-errno.EINVAL)
+                return 0, json.dumps({}), ''
+            except MirrorException as me:
+                return me.args[0], '', me.args[1]
+            except Exception as me:
+                return me.args[0], '', 'failed to enable mirroring'
+
+    def disable_mirror(self, filesystem):
+        log.info(f'disabling mirror for filesystem {filesystem}')
+        try:
+            with self.lock:
+                cmd = {'prefix': 'fs mirror disable', 'fs_name': filesystem}
+                r, outs, err = self.mgr.mon_command(cmd)
+                if r < 0:
+                    log.error(f'mon command to disable mirror failed: {err}')
+                    raise Exception(-errno.EINVAL)
+                return 0, json.dumps({}), ''
+        except MirrorException as me:
+            return me.args[0], '', me.args[1]
+        except Exception as e:
+            return e.args[0], '', 'failed to disable mirroring'
+
+    def peer_add(self, filesystem, remote_cluster_spec, remote_fs_name):
+        try:
+            if remote_fs_name == None:
+                remote_fs_name = filesystem
+            with self.lock:
+                fspolicy = self.pool_policy.get(filesystem, None)
+                if not fspolicy:
+                    raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
+                self.verify_and_set_mirror_info(filesystem, remote_cluster_spec, remote_fs_name)
+                cmd = {'prefix': 'fs mirror peer_add',
+                       'fs_name': filesystem,
+                       'remote_cluster_spec': remote_cluster_spec,
+                       'remote_fs_name': remote_fs_name}
+                r, outs, err = self.mgr.mon_command(cmd)
+                if r < 0:
+                    log.error(f'mon command to add peer failed: {err}')
+                    raise Exception(-errno.EINVAL)
+                return 0, json.dumps({}), ''
+        except MirrorException as me:
+            return me.args[0], '', me.args[1]
+        except Exception as e:
+            return e.args[0], '', 'failed to add peer'
+
+    def peer_remove(self, filesystem, peer_uuid):
+        try:
+            with self.lock:
+                fspolicy = self.pool_policy.get(filesystem, None)
+                if not fspolicy:
+                    raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
+                # ok, this is being a bit lazy. remove mirror info from peer followed
+                # by purging the peer from fsmap. if the mirror daemon fs map updates
+                # are laggy, they happily continue to synchronize. ideally, we should
+                # purge the peer from fsmap here and purge mirror info on fsmap update
+                # (in notify()). but thats not straightforward -- before purging mirror
+                # info, we would need to wait for all mirror daemons to catch up with
+                # fsmap updates. this involves mirror daemons sending the fsmap epoch
+                # they have seen in reply to a notify request. TODO: fix this.
+                self.purge_mirror_info(filesystem, peer_uuid)
+                cmd = {'prefix': 'fs mirror peer_remove',
+                       'fs_name': filesystem,
+                       'uuid': peer_uuid}
+                r, outs, err = self.mgr.mon_command(cmd)
+                if r < 0:
+                    log.error(f'mon command to remove peer failed: {err}')
+                    raise Exception(-errno.EINVAL)
+                return 0, json.dumps({}), ''
+        except MirrorException as me:
+            return me.args[0], '', me.args[1]
+        except Exception as e:
+            return e.args[0], '', 'failed to remove peer'
+
+    @staticmethod
+    def norm_path(dir_path):
+        if not os.path.isabs(dir_path):
+            raise MirrorException(-errno.EINVAL, f'{dir_path} should be an absolute path')
+        return os.path.normpath(dir_path)
+
+    def add_dir(self, filesystem, dir_path):
+        try:
+            with self.lock:
+                if not self.filesystem_exist(filesystem):
+                    raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
+                fspolicy = self.pool_policy.get(filesystem, None)
+                if not fspolicy:
+                    raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
+                dir_path = FSSnapshotMirror.norm_path(dir_path)
+                log.debug(f'path normalized to {dir_path}')
+                fspolicy.add_dir(dir_path)
+                return 0, json.dumps({}), ''
+        except MirrorException as me:
+            return me.args[0], '', me.args[1]
+        except Exception as e:
+            return e.args[0], '', 'failed to add directory'
+
+    def remove_dir(self, filesystem, dir_path):
+        try:
+            with self.lock:
+                if not self.filesystem_exist(filesystem):
+                    raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
+                fspolicy = self.pool_policy.get(filesystem, None)
+                if not fspolicy:
+                    raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
+                dir_path = FSSnapshotMirror.norm_path(dir_path)
+                fspolicy.remove_dir(dir_path)
+                return 0, json.dumps({}), ''
+        except MirrorException as me:
+            return me.args[0], '', me.args[1]
+        except Exception as e:
+            return e.args[0], '', 'failed to remove directory'
+
+    def status(self,filesystem, dir_path):
+        try:
+            with self.lock:
+                if not self.filesystem_exist(filesystem):
+                    raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
+                fspolicy = self.pool_policy.get(filesystem, None)
+                if not fspolicy:
+                    raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
+                dir_path = FSSnapshotMirror.norm_path(dir_path)
+                return fspolicy.status(dir_path)
+        except MirrorException as me:
+            return me.args[0], '', me.args[1]
+
+    def show_distribution(self, filesystem):
+        try:
+            with self.lock:
+                if not self.filesystem_exist(filesystem):
+                    raise MirrorException(-errno.ENOENT, f'filesystem {filesystem} does not exist')
+                fspolicy = self.pool_policy.get(filesystem, None)
+                if not fspolicy:
+                    raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
+                return fspolicy.summary()
+        except MirrorException as me:
+            return me.args[0], '', me.args[1]
diff --git a/src/pybind/mgr/mirroring/fs/utils.py b/src/pybind/mgr/mirroring/fs/utils.py
new file mode 100644 (file)
index 0000000..625e155
--- /dev/null
@@ -0,0 +1,143 @@
+import errno
+import logging
+import threading
+
+import rados
+import cephfs
+
+from .exception import MirrorException
+
+MIRROR_OBJECT_PREFIX = 'cephfs_mirror'
+MIRROR_OBJECT_NAME = MIRROR_OBJECT_PREFIX
+
+INSTANCE_ID_PREFIX = "instance_"
+DIRECTORY_MAP_PREFIX = "dir_map_"
+
+log = logging.getLogger(__name__)
+
+def connect_to_cluster(client_name, cluster_name, desc=''):
+    try:
+        log.debug(f'connecting to {desc} cluster: {client_name}/{cluster_name}')
+        r_rados = rados.Rados(rados_id=client_name, clustername=cluster_name)
+        r_rados.conf_read_file()
+        r_rados.connect()
+        log.debug(f'connected to {desc} cluster')
+        return r_rados
+    except rados.Error as e:
+        if e.errno == errno.ENOENT:
+            raise MirrorException(-e.errno, f'cluster {cluster_name} does not exist')
+        else:
+            log.error(f'error connecting to cluster: {e}')
+            raise Exception(-e.errno)
+
+def disconnect_from_cluster(cluster_name, cluster):
+    try:
+        log.debug(f'disconnecting from cluster {cluster_name}')
+        cluster.shutdown()
+        log.debug(f'disconnected from cluster {cluster_name}')
+    except Exception as e:
+        log.error(f'error disconnecting: {e}')
+
+def connect_to_filesystem(client_name, cluster_name, fs_name, desc):
+    try:
+        cluster = connect_to_cluster(client_name, cluster_name, desc)
+        log.debug(f'connecting to {desc} filesystem: {fs_name}')
+        fs = cephfs.LibCephFS(rados_inst=cluster)
+        log.debug('CephFS initializing...')
+        fs.init()
+        log.debug('CephFS mounting...')
+        fs.mount(filesystem_name=fs_name.encode('utf-8'))
+        log.debug(f'Connection to cephfs {fs_name} complete')
+        return (cluster, fs)
+    except cephfs.Error as e:
+        if e.errno == errno.ENOENT:
+            raise MirrorException(-e.errno, f'filesystem {fs_name} does not exist')
+        else:
+            log.error(f'error connecting to filesystem {fs_name}: {e}')
+            raise Exception(-e.errno)
+
+def disconnect_from_filesystem(cluster_name, fs_name, cluster, fs_handle):
+    try:
+        log.debug(f'disconnecting from filesystem {fs_name}')
+        fs_handle.shutdown()
+        log.debug(f'disconnected from filesystem {fs_name}')
+        disconnect_from_cluster(cluster_name, cluster)
+    except Exception as e:
+        log.error(f'error disconnecting: {e}')
+
+class _ThreadWrapper(threading.Thread):
+    def __init__(self, name):
+        self.q = []
+        self.stopping = threading.Event()
+        self.terminated = threading.Event()
+        self.lock = threading.Lock()
+        self.cond = threading.Condition(self.lock)
+        super().__init__(name=name)
+        super().start()
+
+    def run(self):
+        try:
+            with self.lock:
+                while True:
+                    self.cond.wait_for(lambda: self.q or self.stopping.is_set())
+                    if self.stopping.is_set():
+                        log.debug('thread exiting')
+                        self.terminated.set()
+                        self.cond.notifyAll()
+                        return
+                    q = self.q.copy()
+                    self.q.clear()
+                    self.lock.release()
+                    try:
+                        for item in q:
+                            log.debug(f'calling {item[0]} params {item[1]}')
+                            item[0](*item[1])
+                    except Exception as e:
+                        log.warn(f'callback exception: {e}')
+                    self.lock.acquire()
+        except Exception as e:
+            log.info(f'threading exception: {e}')
+
+    def queue(self, cbk, args):
+        with self.lock:
+            self.q.append((cbk, args))
+            self.cond.notifyAll()
+
+    def stop(self):
+        with self.lock:
+            self.stopping.set()
+            self.cond.notifyAll()
+            self.cond.wait_for(lambda: self.terminated.is_set())
+
+class Finisher:
+    def __init__(self):
+        self.lock = threading.Lock()
+        self.thread = _ThreadWrapper(name='finisher')
+
+    def queue(self, cbk, args=[]):
+        with self.lock:
+            self.thread.queue(cbk, args)
+
+class AsyncOpTracker:
+    def __init__(self):
+        self.ops_in_progress = 0
+        self.lock = threading.Lock()
+        self.cond = threading.Condition(self.lock)
+
+    def start_async_op(self):
+        with self.lock:
+            self.ops_in_progress += 1
+            log.debug(f'start_async_op: {self.ops_in_progress}')
+
+    def finish_async_op(self):
+        with self.lock:
+            self.ops_in_progress -= 1
+            log.debug(f'finish_async_op: {self.ops_in_progress}')
+            assert(self.ops_in_progress >= 0)
+            self.cond.notifyAll()
+
+    def wait_for_ops(self):
+        with self.lock:
+            log.debug(f'wait_for_ops: {self.ops_in_progress}')
+            self.cond.wait_for(lambda: self.ops_in_progress == 0)
+            log.debug(f'done')
diff --git a/src/pybind/mgr/mirroring/module.py b/src/pybind/mgr/mirroring/module.py
new file mode 100644 (file)
index 0000000..5549e99
--- /dev/null
@@ -0,0 +1,70 @@
+from typing import List, Optional
+
+from mgr_module import MgrModule, CLIReadCommand, CLIWriteCommand, Option
+
+from .fs.snapshot_mirror import FSSnapshotMirror
+
+class Module(MgrModule):
+    MODULE_OPTIONS: List[Option] = []
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.fs_snapshot_mirror = FSSnapshotMirror(self)
+
+    def notify(self, notify_type, notify_id):
+        self.fs_snapshot_mirror.notify(notify_type)
+
+    @CLIWriteCommand('fs snapshot mirror enable')
+    def snapshot_mirror_enable(self,
+                               fs_name: str):
+        """Enable snapshot mirroring for a filesystem"""
+        return self.fs_snapshot_mirror.enable_mirror(fs_name)
+
+    @CLIWriteCommand('fs snapshot mirror disable')
+    def snapshot_mirror_disable(self,
+                                fs_name: str):
+        """Disable snapshot mirroring for a filesystem"""
+        return self.fs_snapshot_mirror.disable_mirror(fs_name)
+
+    @CLIWriteCommand('fs snapshot mirror peer_add')
+    def snapshot_mirorr_peer_add(self,
+                                 fs_name: str,
+                                 remote_cluster_spec: str,
+                                 remote_fs_name: Optional[str] = None):
+        """Add a remote filesystem peer"""
+        return self.fs_snapshot_mirror.peer_add(fs_name, remote_cluster_spec,
+                                                remote_fs_name)
+
+    @CLIWriteCommand('fs snapshot mirror peer_remove')
+    def snapshot_mirror_peer_remove(self,
+                                    fs_name: str,
+                                    peer_uuid: str):
+        """Remove a filesystem peer"""
+        return self.fs_snapshot_mirror.peer_remove(fs_name, peer_uuid)
+
+    @CLIWriteCommand('fs snapshot mirror add')
+    def snapshot_mirror_add_dir(self,
+                                fs_name: str,
+                                path: str):
+        """Add a directory for snapshot mirroring"""
+        return self.fs_snapshot_mirror.add_dir(fs_name, path)
+
+    @CLIWriteCommand('fs snapshot mirror remove')
+    def snapshot_mirror_remove_dir(self,
+                                   fs_name: str,
+                                   path: str):
+        """Remove a snapshot mirrored directory"""
+        return self.fs_snapshot_mirror.remove_dir(fs_name, path)
+
+    @CLIReadCommand('fs snapshot mirror dirmap')
+    def snapshot_mirror_dirmap(self,
+                               fs_name: str,
+                               path: str):
+        """Get current mirror instance map for a directory"""
+        return self.fs_snapshot_mirror.status(fs_name, path)
+
+    @CLIReadCommand('fs snapshot mirror show distribution')
+    def snapshot_mirror_distribution(self,
+                                     fs_name: str):
+        """Get current instance to directory map for a filesystem"""
+        return self.fs_snapshot_mirror.show_distribution(fs_name)
index 5ea77228c623fd3f10d35c2704af2ba7ece13078..fcaeb91ca78680226d1276071d2b92da0d0d6885 100644 (file)
@@ -60,6 +60,7 @@ commands =
            -m mds_autoscaler \
            -m mgr_module \
            -m mgr_util \
+           -m mirroring \
            -m orchestrator \
            -m progress \
            -m prometheus \