]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cephfs-mirror: carve out (and implement) mirroring snapshots to peers
authorVenky Shankar <vshankar@redhat.com>
Sun, 25 Oct 2020 07:04:47 +0000 (03:04 -0400)
committerVenky Shankar <vshankar@redhat.com>
Tue, 12 Jan 2021 10:56:52 +0000 (05:56 -0500)
Remote file synchronization by @mchangir.

Signed-off-by: Milind Changire <mchangir@redhat.com>
Signed-off-by: Venky Shankar <vshankar@redhat.com>
src/common/options.cc
src/tools/cephfs_mirror/CMakeLists.txt
src/tools/cephfs_mirror/PeerReplayer.cc [new file with mode: 0644]
src/tools/cephfs_mirror/PeerReplayer.h [new file with mode: 0644]

index 1f15939a6783f155d0e45ecc9c2c06709abae229..0f5261150fdf75739c9c432f93f48b587d3136db 100644 (file)
@@ -8910,7 +8910,18 @@ std::vector<Option> get_cephfs_mirror_options() {
     .set_description("interval to restart blocklisted instances")
     .set_long_description("Interval in seconds to restart blocklisted mirror instances. Setting to zero (0) disables restarting blocklisted instances."),
 
-    });
+    Option("cephfs_mirror_max_snapshot_sync_per_cycle", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+    .set_default(3)
+    .set_min(1)
+    .set_description("number of snapshots to mirror in one cycle")
+    .set_long_description("maximum number of snapshots to mirror when a directory is picked up for mirroring by worker threads."),
+
+    Option("cephfs_mirror_directory_scan_interval", Option::TYPE_UINT, Option::LEVEL_ADVANCED)
+    .set_default(10)
+    .set_min(1)
+    .set_description("interval to scan directories to mirror snapshots")
+    .set_long_description("interval in seconds to scan configured directories for snapshot mirroring."),
+   });
 }
 
 static std::vector<Option> build_options()
index 2eda68b351fd07ff8abeff5ec0c6a6e913109094..d922615713e55ff83477c37adc28b80763771f48 100644 (file)
@@ -4,6 +4,7 @@ set(cephfs_mirror_internal
   FSMirror.cc
   InstanceWatcher.cc
   MirrorWatcher.cc
+  PeerReplayer.cc
   Types.cc
   Utils.cc
   Watcher.cc
diff --git a/src/tools/cephfs_mirror/PeerReplayer.cc b/src/tools/cephfs_mirror/PeerReplayer.cc
new file mode 100644 (file)
index 0000000..0d7aca7
--- /dev/null
@@ -0,0 +1,1016 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <stack>
+#include <fcntl.h>
+#include <algorithm>
+#include <sys/time.h>
+
+#include "common/admin_socket.h"
+#include "common/ceph_context.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "FSMirror.h"
+#include "PeerReplayer.h"
+#include "Utils.h"
+
+#define dout_context g_ceph_context
+#define dout_subsys ceph_subsys_cephfs_mirror
+#undef dout_prefix
+#define dout_prefix *_dout << "cephfs::mirror::PeerReplayer("   \
+                           << m_peer.uuid << ") " << __func__
+
+namespace cephfs {
+namespace mirror {
+
+namespace {
+
+std::string snapshot_dir_path(CephContext *cct, const std::string &path) {
+  return path + "/" + cct->_conf->client_snapdir;
+}
+
+std::string snapshot_path(const std::string &snap_dir, const std::string &snap_name) {
+  return snap_dir + "/" + snap_name;
+}
+
+std::string snapshot_path(CephContext *cct, const std::string &path, const std::string &snap_name) {
+  return path + "/" + cct->_conf->client_snapdir + "/" + snap_name;
+}
+
+std::string entry_path(const std::string &dir, const std::string &name) {
+  return dir + "/" + name;
+}
+
+std::map<std::string, std::string> decode_snap_metadata(snap_metadata *snap_metadata,
+                                                        size_t nr_snap_metadata) {
+  std::map<std::string, std::string> metadata;
+  for (size_t i = 0; i < nr_snap_metadata; ++i) {
+    metadata.emplace(snap_metadata[i].key, snap_metadata[i].value);
+  }
+
+  return metadata;
+}
+
+class PeerAdminSocketCommand {
+public:
+  virtual ~PeerAdminSocketCommand() {
+  }
+  virtual int call(Formatter *f) = 0;
+};
+
+class StatusCommand : public PeerAdminSocketCommand {
+public:
+  explicit StatusCommand(PeerReplayer *peer_replayer)
+    : peer_replayer(peer_replayer) {
+  }
+
+  int call(Formatter *f) override {
+    peer_replayer->peer_status(f);
+    return 0;
+  }
+
+private:
+  PeerReplayer *peer_replayer;
+};
+
+} // anonymous namespace
+
+class PeerReplayerAdminSocketHook : public AdminSocketHook {
+public:
+  PeerReplayerAdminSocketHook(CephContext *cct, const Filesystem &filesystem,
+                              const Peer &peer, PeerReplayer *peer_replayer)
+    : admin_socket(cct->get_admin_socket()) {
+    int r;
+    std::string cmd;
+
+    // mirror peer status format is name@id uuid
+    cmd = "fs mirror peer status "
+          + stringify(filesystem.fs_name) + "@" + stringify(filesystem.fscid)
+          + " "
+          + stringify(peer.uuid);
+    r = admin_socket->register_command(
+      cmd, this, "get peer mirror status");
+    if (r == 0) {
+      commands[cmd] = new StatusCommand(peer_replayer);
+    }
+  }
+
+  ~PeerReplayerAdminSocketHook() override {
+    admin_socket->unregister_commands(this);
+    for (auto &[command, cmdptr] : commands) {
+      delete cmdptr;
+    }
+  }
+
+  int call(std::string_view command, const cmdmap_t& cmdmap,
+           Formatter *f, std::ostream &errss, bufferlist &out) override {
+    auto p = commands.at(std::string(command));
+    return p->call(f);
+  }
+
+private:
+  typedef std::map<std::string, PeerAdminSocketCommand*, std::less<>> Commands;
+
+  AdminSocket *admin_socket;
+  Commands commands;
+};
+
+PeerReplayer::PeerReplayer(CephContext *cct, FSMirror *fs_mirror,
+                           const Filesystem &filesystem, const Peer &peer,
+                           const std::set<std::string, std::less<>> &directories,
+                           MountRef mount)
+  : m_cct(cct),
+    m_fs_mirror(fs_mirror),
+    m_peer(peer),
+    m_directories(directories.begin(), directories.end()),
+    m_local_mount(mount),
+    m_asok_hook(new PeerReplayerAdminSocketHook(cct, filesystem, peer, this)),
+    m_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::" + stringify(peer.uuid))) {
+}
+
+PeerReplayer::~PeerReplayer() {
+  delete m_asok_hook;
+}
+
+int PeerReplayer::init() {
+  dout(20) << ": initial dir list=[" << m_directories << "]" << dendl;
+
+  auto &remote_client = m_peer.remote.client_name;
+  auto &remote_cluster = m_peer.remote.cluster_name;
+  auto remote_filesystem = Filesystem{0, m_peer.remote.fs_name};
+
+  int r = connect(remote_client, remote_cluster, &m_remote_cluster);
+  if (r < 0) {
+    derr << ": error connecting to remote cluster: " << cpp_strerror(r)
+         << dendl;
+    return r;
+  }
+
+  r = mount(m_remote_cluster, remote_filesystem, false, &m_remote_mount);
+  if (r < 0) {
+    m_remote_cluster.reset();
+    derr << ": error mounting remote filesystem=" << remote_filesystem << dendl;
+    return r;
+  }
+
+  std::scoped_lock locker(m_lock);
+  auto nr_replayers = g_ceph_context->_conf.get_val<uint64_t>(
+    "cephfs_mirror_max_concurrent_directory_syncs");
+  dout(20) << ": spawning " << nr_replayers << " snapshot replayer(s)" << dendl;
+
+  while (nr_replayers-- > 0) {
+    std::unique_ptr<SnapshotReplayerThread> replayer(
+      new SnapshotReplayerThread(this));
+    std::string name("replayer-" + stringify(nr_replayers));
+    replayer->create(name.c_str());
+    m_replayers.push_back(std::move(replayer));
+  }
+
+  return 0;
+}
+
+void PeerReplayer::shutdown() {
+  dout(20) << dendl;
+
+  {
+    std::scoped_lock locker(m_lock);
+    ceph_assert(!m_stopping);
+    m_stopping = true;
+    m_cond.notify_all();
+  }
+
+  for (auto &replayer : m_replayers) {
+    replayer->join();
+  }
+  m_replayers.clear();
+  ceph_unmount(m_remote_mount);
+  m_remote_mount = nullptr;
+  m_remote_cluster.reset();
+}
+
+void PeerReplayer::add_directory(string_view dir_path) {
+  dout(20) << ": dir_path=" << dir_path << dendl;
+
+  std::scoped_lock locker(m_lock);
+  m_directories.emplace_back(dir_path);
+  m_cond.notify_all();
+}
+
+void PeerReplayer::remove_directory(string_view dir_path) {
+  dout(20) << ": dir_path=" << dir_path << dendl;
+
+  std::scoped_lock locker(m_lock);
+  auto it = std::find(m_directories.begin(), m_directories.end(), dir_path);
+  if (it != m_directories.end()) {
+    m_directories.erase(it);
+  }
+  m_cond.notify_all();
+}
+
+boost::optional<std::string> PeerReplayer::pick_directory() {
+  dout(20) << dendl;
+
+  boost::optional<std::string> candidate;
+  for (auto &dir_path : m_directories) {
+    if (!m_registered.count(dir_path)) {
+      candidate = dir_path;
+      break;
+    }
+  }
+
+  std::rotate(m_directories.begin(), m_directories.begin() + 1, m_directories.end());
+  return candidate;
+}
+
+int PeerReplayer::register_directory(const std::string &dir_path,
+                                     SnapshotReplayerThread *replayer) {
+  dout(20) << ": dir_path=" << dir_path << dendl;
+  ceph_assert(m_registered.find(dir_path) == m_registered.end());
+
+  DirRegistry registry;
+  int r = try_lock_directory(dir_path, replayer, &registry);
+  if (r < 0) {
+    return r;
+  }
+
+  dout(5) << ": dir_path=" << dir_path << " registered with replayer="
+          << replayer << dendl;
+  m_registered.emplace(dir_path, std::move(registry));
+  return 0;
+}
+
+void PeerReplayer::unregister_directory(const std::string &dir_path) {
+  dout(20) << ": dir_path=" << dir_path << dendl;
+
+  auto it = m_registered.find(dir_path);
+  ceph_assert(it != m_registered.end());
+
+  unlock_directory(it->first, it->second);
+  m_registered.erase(it);
+}
+
+int PeerReplayer::try_lock_directory(const std::string &dir_path,
+                                     SnapshotReplayerThread *replayer, DirRegistry *registry) {
+  dout(20) << ": dir_path=" << dir_path << dendl;
+
+  int r = ceph_open(m_remote_mount, dir_path.c_str(), O_RDONLY | O_DIRECTORY, 0);
+  if (r < 0 && r != -ENOENT) {
+    derr << ": failed to open remote dir_path=" << dir_path << ": " << cpp_strerror(r)
+         << dendl;
+    return r;
+  }
+
+  if (r == -ENOENT) {
+    // we snap under dir_path, so mode does not matter much
+    r = ceph_mkdirs(m_remote_mount, dir_path.c_str(), 0755);
+    if (r < 0) {
+      derr << ": failed to create remote directory=" << dir_path << ": " << cpp_strerror(r)
+           << dendl;
+      return r;
+    }
+
+    r = ceph_open(m_remote_mount, dir_path.c_str(), O_RDONLY | O_DIRECTORY, 0);
+    if (r < 0) {
+      derr << ": failed to open remote dir_path=" << dir_path << ": " << cpp_strerror(r)
+           << dendl;
+      return r;
+    }
+  }
+
+  int fd = r;
+  r = ceph_flock(m_remote_mount, fd, LOCK_EX | LOCK_NB, (uint64_t)replayer->get_thread_id());
+  if (r != 0) {
+    if (r == -EWOULDBLOCK) {
+      dout(5) << ": dir_path=" << dir_path << " is locked by cephfs-mirror, "
+              << "will retry again" << dendl;
+    } else {
+      derr << ": failed to lock dir_path=" << dir_path << ": " << cpp_strerror(r)
+           << dendl;
+    }
+
+    if (ceph_close(m_remote_mount, fd) < 0) {
+      derr << ": failed to close (cleanup) remote dir_path=" << dir_path << ": "
+           << cpp_strerror(r) << dendl;
+    }
+    return r;
+  }
+
+  dout(10) << ": dir_path=" << dir_path << " locked" << dendl;
+
+  registry->fd = fd;
+  registry->replayer = replayer;
+  return 0;
+}
+
+void PeerReplayer::unlock_directory(const std::string &dir_path, const DirRegistry &registry) {
+  dout(20) << ": dir_path=" << dir_path << dendl;
+
+  int r = ceph_flock(m_remote_mount, registry.fd, LOCK_UN,
+                     (uint64_t)registry.replayer->get_thread_id());
+  if (r < 0) {
+    derr << ": failed to unlock remote dir_path=" << dir_path << ": " << cpp_strerror(r)
+         << dendl;
+    return;
+  }
+
+  r = ceph_close(m_remote_mount, registry.fd);
+  if (r < 0) {
+    derr << ": failed to close remote dir_path=" << dir_path << ": " << cpp_strerror(r)
+         << dendl;
+  }
+
+  dout(10) << ": dir_path=" << dir_path << " unlocked" << dendl;
+}
+
+int PeerReplayer::build_snap_map(const std::string &dir_path,
+                                 std::map<uint64_t, std::string> *snap_map, bool is_remote) {
+  auto snap_dir = snapshot_dir_path(m_cct, dir_path);
+  dout(20) << ": dir_path=" << dir_path << ", snap_dir=" << snap_dir
+           << ", is_remote=" << is_remote << dendl;
+
+  auto lr_str = is_remote ? "remote" : "local";
+  auto mnt = is_remote ? m_remote_mount : m_local_mount;
+
+  ceph_dir_result *dirp = nullptr;
+  int r = ceph_opendir(mnt, snap_dir.c_str(), &dirp);
+  if (r < 0) {
+    if (is_remote && r == -ENOENT) {
+      return 0;
+    }
+    derr << ": failed to open " << lr_str << " snap directory=" << snap_dir
+         << ": " << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  std::set<std::string> snaps;
+  auto entry = ceph_readdir(mnt, dirp);
+  while (entry != NULL) {
+    auto d_name = std::string(entry->d_name);
+    dout(20) << ": entry=" << d_name << dendl;
+    if (d_name != "." && d_name != "..") {
+      snaps.emplace(d_name);
+    }
+
+    entry = ceph_readdir(mnt, dirp);
+  }
+
+  int rv = 0;
+  for (auto &snap : snaps) {
+    snap_info info;
+    auto snap_path = snapshot_path(snap_dir, snap);
+    r = ceph_get_snap_info(mnt, snap_path.c_str(), &info);
+    if (r < 0) {
+      derr << ": failed to fetch " << lr_str << " snap info for snap_path=" << snap_path
+           << ": " << cpp_strerror(r) << dendl;
+      rv = r;
+      break;
+    }
+
+    uint64_t snap_id;
+    if (is_remote) {
+      if (!info.nr_snap_metadata) {
+        derr << ": snap_path=" << snap_path << " has invalid metadata in remote snapshot"
+             << dendl;
+        rv = -EINVAL;
+      } else {
+        auto metadata = decode_snap_metadata(info.snap_metadata, info.nr_snap_metadata);
+        dout(20) << ": snap_path=" << snap_path << ", metadata=" << metadata << dendl;
+        auto it = metadata.find(PRIMARY_SNAP_ID_KEY);
+        if (it == metadata.end()) {
+          derr << ": snap_path=" << snap_path << " has missing \"" << PRIMARY_SNAP_ID_KEY
+               << "\" in metadata" << dendl;
+          rv = -EINVAL;
+        } else {
+          snap_id = std::stoull(it->second);
+        }
+        ceph_free_snap_info_buffer(&info);
+      }
+    } else {
+      snap_id = info.id;
+    }
+
+    if (rv != 0) {
+      break;
+    }
+    snap_map->emplace(snap_id, snap);
+  }
+
+  r = ceph_closedir(mnt, dirp);
+  if (r < 0) {
+    derr << ": failed to close " << lr_str << " snap directory=" << snap_dir
+         << ": " << cpp_strerror(r) << dendl;
+  }
+
+  dout(10) << ": " << lr_str << " snap_map=" << *snap_map << dendl;
+  return rv;
+}
+
+int PeerReplayer::propagate_snap_deletes(const std::string &dir_path,
+                                         const std::set<std::string> &snaps) {
+  dout(5) << ": dir_path=" << dir_path << ", deleted snapshots=" << snaps << dendl;
+
+  for (auto &snap : snaps) {
+    dout(20) << ": deleting dir_path=" << dir_path << ", snapshot=" << snap
+             << dendl;
+    int r = ceph_rmsnap(m_remote_mount, dir_path.c_str(), snap.c_str());
+    if (r < 0) {
+      derr << ": failed to delete remote snap dir_path=" << dir_path
+           << ", snapshot=" << snaps << ": " << cpp_strerror(r) << dendl;
+      return r;
+    }
+  }
+
+  return 0;
+}
+
+int PeerReplayer::propagate_snap_renames(
+    const std::string &dir_path,
+    const std::set<std::pair<std::string,std::string>> &snaps) {
+  dout(10) << ": dir_path=" << dir_path << ", renamed snapshots=" << snaps << dendl;
+
+  for (auto &snapp : snaps) {
+    auto from = snapshot_path(m_cct, dir_path, snapp.first);
+    auto to = snapshot_path(m_cct, dir_path, snapp.second);
+    dout(20) << ": renaming dir_path=" << dir_path << ", snapshot from="
+             << from << ", to=" << to << dendl;
+    int r = ceph_rename(m_remote_mount, from.c_str(), to.c_str());
+    if (r < 0) {
+      derr << ": failed to rename remote snap dir_path=" << dir_path
+           << ", snapshot from =" << from << ", to=" << to << ": "
+           << cpp_strerror(r) << dendl;
+      return r;
+    }
+  }
+
+  return 0;
+}
+
+int PeerReplayer::remote_mkdir(const std::string &local_path,
+                               const std::string &remote_path,
+                               const struct ceph_statx &stx) {
+  dout(10) << ": local_path=" << local_path << ", remote_path=" << remote_path
+           << dendl;
+
+  int r = ceph_mkdir(m_remote_mount, remote_path.c_str(), stx.stx_mode & ~S_IFDIR);
+  if (r < 0 && r != -EEXIST) {
+    derr << ": failed to create remote directory=" << remote_path << ": " << cpp_strerror(r)
+         << dendl;
+    return r;
+  }
+
+  r = ceph_lchown(m_remote_mount, remote_path.c_str(), stx.stx_uid, stx.stx_gid);
+  if (r < 0) {
+    derr << ": failed to chown remote directory=" << remote_path << ": " << cpp_strerror(r)
+         << dendl;
+    return r;
+  }
+
+  struct timeval times[] = {{stx.stx_atime.tv_sec, stx.stx_atime.tv_nsec / 1000},
+                            {stx.stx_mtime.tv_sec, stx.stx_mtime.tv_nsec / 1000}};
+  r = ceph_lutimes(m_remote_mount, remote_path.c_str(), times);
+  if (r < 0) {
+    derr << ": failed to change [am]time on remote directory=" << remote_path << ": "
+         << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  return 0;
+}
+
+#define NR_IOVECS 8 // # iovecs
+#define IOVEC_SIZE (8 * 1024 * 1024) // buffer size for each iovec
+int PeerReplayer::remote_copy(const std::string &local_path,
+                              const std::string &remote_path,
+                              const struct ceph_statx &stx) {
+  dout(10) << ": local_path=" << local_path << ", remote_path=" << remote_path
+           << dendl;
+  int l_fd;
+  int r_fd;
+  void *ptr;
+  struct iovec iov[NR_IOVECS];
+
+  int r = ceph_open(m_local_mount, local_path.c_str(), O_RDONLY, 0);
+  if (r < 0) {
+    derr << ": failed to open local file path=" << local_path << ": "
+         << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  l_fd = r;
+  r = ceph_open(m_remote_mount, remote_path.c_str(),
+                O_CREAT | O_TRUNC | O_WRONLY, stx.stx_mode);
+  if (r < 0) {
+    derr << ": failed to create remote file path=" << remote_path << ": "
+         << cpp_strerror(r) << dendl;
+    goto close_local_fd;
+  }
+
+  r_fd = r;
+  ptr = malloc(NR_IOVECS * IOVEC_SIZE);
+  if (!ptr) {
+    r = -ENOMEM;
+    derr << ": failed to allocate memory" << dendl;
+    goto close_remote_fd;
+  }
+
+  while (true) {
+    for (int i = 0; i < NR_IOVECS; ++i) {
+      iov[i].iov_base = (char*)ptr + IOVEC_SIZE*i;
+      iov[i].iov_len = IOVEC_SIZE;
+    }
+
+    r = ceph_preadv(m_local_mount, l_fd, iov, NR_IOVECS, -1);
+    if (r < 0) {
+      derr << ": failed to read local file path=" << local_path << ": "
+           << cpp_strerror(r) << dendl;
+      break;
+    }
+    if (r == 0) {
+      break;
+    }
+
+    int iovs = (int)(r / IOVEC_SIZE);
+    int t = r % IOVEC_SIZE;
+    if (t) {
+      iov[iovs].iov_len = t;
+      ++iovs;
+    }
+
+    r = ceph_pwritev(m_remote_mount, r_fd, iov, iovs, -1);
+    if (r < 0) {
+      derr << ": failed to write remote file path=" << remote_path << ": "
+           << cpp_strerror(r) << dendl;
+      break;
+    }
+  }
+
+  if (r == 0) {
+    r = ceph_fsync(m_remote_mount, r_fd, 0);
+    if (r < 0) {
+      derr << ": failed to sync data for dir_path=" << remote_path << ": "
+           << cpp_strerror(r) << dendl;
+    }
+  }
+
+  free(ptr);
+
+close_remote_fd:
+  if (ceph_close(m_remote_mount, r_fd) < 0) {
+    derr << ": failed to close remote fd path=" << remote_path << ": " << cpp_strerror(r)
+         << dendl;
+    return -EINVAL;
+  }
+
+close_local_fd:
+  if (ceph_close(m_local_mount, l_fd) < 0) {
+    derr << ": failed to close local fd path=" << local_path << ": " << cpp_strerror(r)
+         << dendl;
+    return -EINVAL;
+  }
+
+  return r == 0 ? 0 : r;
+}
+
+int PeerReplayer::remote_file_op(const std::string &local_path,
+                                 const std::string &remote_path,
+                                 const struct ceph_statx &stx) {
+  dout(10) << ": local_path=" << local_path << ", remote_path=" << remote_path
+           << dendl;
+
+  int r;
+  if (S_ISREG(stx.stx_mode)) {
+    r = remote_copy(local_path, remote_path, stx);
+    if (r < 0) {
+      derr << ": failed to copy path=" << local_path << ": " << cpp_strerror(r)
+           << dendl;
+      return r;
+    }
+  } else if (S_ISLNK(stx.stx_mode)) {
+    char *target = (char *)alloca(stx.stx_size+1);
+    r = ceph_readlink(m_local_mount, local_path.c_str(), target, stx.stx_size);
+    if (r < 0) {
+      derr << ": failed to readlink local path=" << local_path << ": " << cpp_strerror(r)
+           << dendl;
+      return r;
+    }
+
+    r = ceph_symlink(m_remote_mount, target, remote_path.c_str());
+    if (r < 0 && r != EEXIST) {
+      derr << ": failed to symlink remote path=" << remote_path << " to target=" << target
+           << ": " << cpp_strerror(r) << dendl;
+      return r;
+    }
+  } else {
+    dout(5) << ": skipping entry=" << local_path << ": unsupported mode=" << stx.stx_mode
+            << dendl;
+    return 0;
+  }
+
+  r = ceph_lchown(m_remote_mount, remote_path.c_str(), stx.stx_uid, stx.stx_gid);
+  if (r < 0) {
+    derr << ": failed to chown remote directory=" << remote_path << ": "
+         << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  struct timeval times[] = {{stx.stx_atime.tv_sec, stx.stx_atime.tv_nsec / 1000},
+                            {stx.stx_mtime.tv_sec, stx.stx_mtime.tv_nsec / 1000}};
+  r = ceph_lutimes(m_remote_mount, remote_path.c_str(), times);
+  if (r < 0) {
+    derr << ": failed to change [am]time on remote directory=" << remote_path << ": "
+         << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  return 0;
+}
+
+int PeerReplayer::cleanup_remote_dir(const std::string &dir_path) {
+  dout(20) << ": dir_path=" << dir_path << dendl;
+
+  std::stack<SyncEntry> rm_stack;
+  ceph_dir_result *tdirp;
+  int r = ceph_opendir(m_remote_mount, dir_path.c_str(), &tdirp);
+  if (r < 0) {
+    derr << ": failed to open remote directory=" << dir_path << ": "
+         << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  struct ceph_statx tstx;
+  r = ceph_statx(m_remote_mount, dir_path.c_str(), &tstx,
+                 CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+                 CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+                 AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW);
+  if (r < 0) {
+    derr << ": failed to stat remote directory=" << dir_path << ": "
+         << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  rm_stack.emplace(SyncEntry(dir_path, tdirp, tstx));
+  while (!rm_stack.empty()) {
+    dout(20) << ": " << rm_stack.size() << " entries in stack" << dendl;
+    std::string e_name;
+    auto &entry = rm_stack.top();
+    dout(20) << ": top of stack path=" << entry.epath << dendl;
+    if (entry.is_directory()) {
+      struct ceph_statx stx;
+      struct dirent de;
+      while (true) {
+        r = ceph_readdirplus_r(m_remote_mount, entry.dirp, &de, &stx,
+                               CEPH_STATX_MODE, AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW, NULL);
+        if (r < 0) {
+          derr << ": failed to read remote directory=" << entry.epath << dendl;
+          break;
+        }
+        if (r == 0) {
+          break;
+        }
+
+        auto d_name = std::string(de.d_name);
+        if (d_name != "." && d_name != "..") {
+          e_name = d_name;
+          break;
+        }
+      }
+
+      if (r == 0) {
+        if (rm_stack.size() > 1) {
+          r = ceph_rmdir(m_remote_mount, entry.epath.c_str());
+          if (r < 0) {
+            derr << ": failed to remove remote directory=" << entry.epath << ": "
+                 << cpp_strerror(r) << dendl;
+            break;
+          }
+        }
+
+        dout(10) << ": done for remote directory=" << entry.epath << dendl;
+        if (ceph_closedir(m_remote_mount, entry.dirp) < 0) {
+          derr << ": failed to close remote directory=" << entry.epath << dendl;
+        }
+        rm_stack.pop();
+        continue;
+      }
+      if (r < 0) {
+        break;
+      }
+
+      auto epath = entry_path(entry.epath, e_name);
+      if (S_ISDIR(stx.stx_mode)) {
+        ceph_dir_result *dirp;
+        r = ceph_opendir(m_remote_mount, epath.c_str(), &dirp);
+        if (r < 0) {
+          derr << ": failed to open remote directory=" << epath << ": "
+               << cpp_strerror(r) << dendl;
+          break;
+        }
+        rm_stack.emplace(SyncEntry(epath, dirp, stx));
+      } else {
+        rm_stack.emplace(SyncEntry(epath, stx));
+      }
+    } else {
+      r = ceph_unlink(m_remote_mount, entry.epath.c_str());
+      if (r < 0) {
+        derr << ": failed to remove remote directory=" << entry.epath << ": "
+             << cpp_strerror(r) << dendl;
+        break;
+      }
+      dout(10) << ": done for remote file=" << entry.epath << dendl;
+      rm_stack.pop();
+    }
+  }
+
+  while (!rm_stack.empty()) {
+    auto &entry = rm_stack.top();
+    if (entry.is_directory()) {
+      dout(20) << ": closing remote directory=" << entry.epath << dendl;
+      if (ceph_closedir(m_remote_mount, entry.dirp) < 0) {
+        derr << ": failed to close remote directory=" << entry.epath << dendl;
+      }
+    }
+
+    rm_stack.pop();
+  }
+
+  return r;
+}
+
+int PeerReplayer::do_synchronize(const std::string &dir_path, const std::string &snap_name) {
+  dout(20) << ": dir_path=" << dir_path << ", snap_name=" << snap_name << dendl;
+
+  auto snap_path = snapshot_path(m_cct, dir_path, snap_name);
+  std::stack<SyncEntry> sync_stack;
+
+  ceph_dir_result *tdirp;
+  int r = ceph_opendir(m_local_mount, snap_path.c_str(), &tdirp);
+  if (r < 0) {
+    derr << ": failed to open local directory=" << snap_path << ": "
+         << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  struct ceph_statx tstx;
+  r = ceph_statx(m_local_mount, snap_path.c_str(), &tstx,
+                 CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+                 CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+                 AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW);
+  if (r < 0) {
+    derr << ": failed to stat local directory=" << snap_path << ": "
+         << cpp_strerror(r) << dendl;
+    return r;
+  }
+
+  sync_stack.emplace(SyncEntry("/", tdirp, tstx));
+  while (!sync_stack.empty()) {
+    dout(20) << ": " << sync_stack.size() << " entries in stack" << dendl;
+    std::string e_name;
+    auto &entry = sync_stack.top();
+    dout(20) << ": top of stack path=" << entry.epath << dendl;
+    if (entry.is_directory()) {
+      struct ceph_statx stx;
+      struct dirent de;
+      while (true) {
+        r = ceph_readdirplus_r(m_local_mount, entry.dirp, &de, &stx,
+                               CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
+                               CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
+                               AT_NO_ATTR_SYNC | AT_SYMLINK_NOFOLLOW, NULL);
+        if (r < 0) {
+          derr << ": failed to local read directory=" << entry.epath << dendl;
+          break;
+        }
+        if (r == 0) {
+          break;
+        }
+
+        auto d_name = std::string(de.d_name);
+        if (d_name != "." && d_name != "..") {
+          e_name = d_name;
+          break;
+        }
+      }
+
+      if (r == 0) {
+        dout(10) << ": done for directory=" << entry.epath << dendl;
+        if (ceph_closedir(m_local_mount, entry.dirp) < 0) {
+          derr << ": failed to close local directory=" << entry.epath << dendl;
+        }
+        sync_stack.pop();
+        continue;
+      }
+      if (r < 0) {
+        break;
+      }
+
+      auto epath = entry_path(entry.epath, e_name);
+      auto l_path = entry_path(snap_path, epath);
+      auto r_path = entry_path(dir_path, epath);
+      if (S_ISDIR(stx.stx_mode)) {
+        r = remote_mkdir(l_path, r_path, stx);
+        if (r < 0) {
+          break;
+        }
+        ceph_dir_result *dirp;
+        r = ceph_opendir(m_local_mount, l_path.c_str(), &dirp);
+        if (r < 0) {
+          derr << ": failed to open local directory=" << l_path << ": "
+               << cpp_strerror(r) << dendl;
+          break;
+        }
+        sync_stack.emplace(SyncEntry(epath, dirp, stx));
+      } else {
+        sync_stack.emplace(SyncEntry(epath, stx));
+      }
+    } else {
+      auto l_path = entry_path(snap_path, entry.epath);
+      auto r_path = entry_path(dir_path, entry.epath);
+      r = remote_file_op(l_path, r_path, entry.stx);
+      if (r < 0) {
+        break;
+      }
+      dout(10) << ": done for file=" << entry.epath << dendl;
+      sync_stack.pop();
+    }
+  }
+
+  while (!sync_stack.empty()) {
+    auto &entry = sync_stack.top();
+    if (entry.is_directory()) {
+      dout(20) << ": closing local directory=" << entry.epath << dendl;
+      if (ceph_closedir(m_local_mount, entry.dirp) < 0) {
+        derr << ": failed to close local directory=" << entry.epath << dendl;
+      }
+    }
+
+    sync_stack.pop();
+  }
+
+  return r;
+}
+
+int PeerReplayer::synchronize(const std::string &dir_path, uint64_t snap_id,
+                              const std::string &snap_name) {
+  dout(20) << ": dir_path=" << dir_path << ", snap_id=" << snap_id
+           << ", snap_name=" << snap_name << dendl;
+
+  auto snap_path = snapshot_path(m_cct, dir_path, snap_name);
+
+  int r = cleanup_remote_dir(dir_path);
+  if (r < 0) {
+    derr << ": failed to cleanup remote directory=" << dir_path << dendl;
+    return r;
+  }
+
+  r = do_synchronize(dir_path, snap_name);
+  if (r < 0) {
+    derr << ": failed to synchronize dir_path=" << dir_path << ", snapshot="
+         << snap_path << dendl;
+    return r;
+  }
+
+  snap_metadata snap_meta[] = {{PRIMARY_SNAP_ID_KEY.c_str(), stringify(snap_id).c_str()}};
+  r = ceph_mksnap(m_remote_mount, dir_path.c_str(), snap_name.c_str(), 0755,
+                  snap_meta, 1);
+  if (r < 0) {
+    derr << ": failed to snap remote directory dir_path=" << dir_path
+         << ": " << cpp_strerror(r) << dendl;
+  }
+
+  return r;
+}
+
+int PeerReplayer::do_sync_snaps(const std::string &dir_path) {
+  dout(20) << ": dir_path=" << dir_path << dendl;
+
+  std::map<uint64_t, std::string> local_snap_map;
+  std::map<uint64_t, std::string> remote_snap_map;
+
+  int r = build_snap_map(dir_path, &local_snap_map);
+  if (r < 0) {
+    derr << ": failed to build local snap map" << dendl;
+    return r;
+  }
+
+  r = build_snap_map(dir_path, &remote_snap_map, true);
+  if (r < 0) {
+    derr << ": failed to build remote snap map" << dendl;
+    return r;
+  }
+
+  // infer deleted and renamed snapshots from local and remote
+  // snap maps
+  std::set<std::string> snaps_deleted;
+  std::set<std::pair<std::string,std::string>> snaps_renamed;
+  for (auto &[primary_snap_id, snap_name] : remote_snap_map) {
+    auto it = local_snap_map.find(primary_snap_id);
+    if (it == local_snap_map.end()) {
+      snaps_deleted.emplace(snap_name);
+    } else if (it->second != snap_name) {
+      snaps_renamed.emplace(std::make_pair(snap_name, it->second));
+    }
+  }
+
+  r = propagate_snap_deletes(dir_path, snaps_deleted);
+  if (r < 0) {
+    derr << ": failed to propgate deleted snapshots" << dendl;
+    return r;
+  }
+
+  r = propagate_snap_renames(dir_path, snaps_renamed);
+  if (r < 0) {
+    derr << ": failed to propgate renamed snapshots" << dendl;
+    return r;
+  }
+
+  // start mirroring snapshots from the last snap-id synchronized
+  uint64_t last_snap_id = 0;
+  if (!remote_snap_map.empty()) {
+    last_snap_id = remote_snap_map.rbegin()->first;
+  }
+
+  dout(5) << ": last snap-id transferred=" << last_snap_id << dendl;
+  auto it = local_snap_map.upper_bound(last_snap_id);
+  if (it == local_snap_map.end()) {
+    dout(20) << ": nothing to synchronize" << dendl;
+    return 0;
+  }
+
+  auto snaps_per_cycle = g_ceph_context->_conf.get_val<uint64_t>(
+    "cephfs_mirror_max_snapshot_sync_per_cycle");
+
+  dout(10) << ": synzhronizing from snap-id=" << it->first << dendl;
+  for (; it != local_snap_map.end(); ++it) {
+    r = synchronize(dir_path, it->first, it->second);
+    if (r < 0) {
+      derr << ": failed to synchronize dir_path=" << dir_path
+           << ", snapshot=" << it->second << dendl;
+      return r;
+    }
+    if (--snaps_per_cycle == 0) {
+      break;
+    }
+  }
+
+  return 0;
+}
+
+void PeerReplayer::sync_snaps(const std::string &dir_path,
+                              std::unique_lock<ceph::mutex> &locker) {
+  dout(20) << ": dir_path=" << dir_path << dendl;
+  locker.unlock();
+  int r = do_sync_snaps(dir_path);
+  if (r < 0) {
+    derr << ": failed to sync snapshots for dir_path=" << dir_path << dendl;
+  }
+  locker.lock();
+}
+
+void PeerReplayer::run(SnapshotReplayerThread *replayer) {
+  dout(10) << ": snapshot replayer=" << replayer << dendl;
+
+  time last_directory_scan = clock::zero();
+  auto scan_interval = g_ceph_context->_conf.get_val<uint64_t>(
+    "cephfs_mirror_directory_scan_interval");
+
+  std::unique_lock locker(m_lock);
+  while (true) {
+    // do not check if client is blocklisted under lock
+    m_cond.wait_for(locker, 1s, [this]{return is_stopping();});
+    if (is_stopping()) {
+      dout(5) << ": exiting" << dendl;
+      break;
+    }
+
+    locker.unlock();
+
+    if (m_fs_mirror->is_blocklisted()) {
+      dout(5) << ": exiting as client is blocklisted" << dendl;
+      break;
+    }
+
+    locker.lock();
+
+    auto now = clock::now();
+    std::chrono::duration<double> timo = now - last_directory_scan;
+    if (timo.count() >= scan_interval && m_directories.size()) {
+      dout(20) << ": trying to pick from " << m_directories.size() << " directories" << dendl;
+      auto dir_path = pick_directory();
+      if (dir_path) {
+        dout(5) << ": picked dir_path=" << *dir_path << dendl;
+        int r = register_directory(*dir_path, replayer);
+        if (r == 0) {
+          sync_snaps(*dir_path, locker);
+          unregister_directory(*dir_path);
+        }
+      }
+
+      last_directory_scan = now;
+    }
+  }
+}
+
+void PeerReplayer::peer_status(Formatter *f) {
+}
+
+} // namespace mirror
+} // namespace cephfs
diff --git a/src/tools/cephfs_mirror/PeerReplayer.h b/src/tools/cephfs_mirror/PeerReplayer.h
new file mode 100644 (file)
index 0000000..69e5fe1
--- /dev/null
@@ -0,0 +1,143 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPHFS_MIRROR_PEER_REPLAYER_H
+#define CEPHFS_MIRROR_PEER_REPLAYER_H
+
+#include "common/Formatter.h"
+#include "common/Thread.h"
+#include "mds/FSMap.h"
+#include "Types.h"
+
+class CephContext;
+
+namespace cephfs {
+namespace mirror {
+
+class FSMirror;
+class PeerReplayerAdminSocketHook;
+
+class PeerReplayer {
+public:
+  PeerReplayer(CephContext *cct, FSMirror *fs_mirror,
+               const Filesystem &filesystem, const Peer &peer,
+               const std::set<std::string, std::less<>> &directories,
+               MountRef mount);
+  ~PeerReplayer();
+
+  // initialize replayer for a peer
+  int init();
+
+  // shutdown replayer for a peer
+  void shutdown();
+
+  // add a directory to mirror queue
+  void add_directory(string_view dir_path);
+
+  // remove a directory from queue
+  void remove_directory(string_view dir_path);
+
+  // admin socket helpers
+  void peer_status(Formatter *f);
+
+private:
+  inline static const std::string PRIMARY_SNAP_ID_KEY = "primary_snap_id";
+
+  bool is_stopping() {
+    return m_stopping;
+  }
+
+  struct Replayer;
+  class SnapshotReplayerThread : public Thread {
+  public:
+    SnapshotReplayerThread(PeerReplayer *peer_replayer)
+      : m_peer_replayer(peer_replayer) {
+    }
+
+    void *entry() override {
+      m_peer_replayer->run(this);
+      return 0;
+    }
+
+  private:
+    PeerReplayer *m_peer_replayer;
+  };
+
+  struct DirRegistry {
+    int fd;
+    SnapshotReplayerThread *replayer;
+  };
+
+  struct SyncEntry {
+    std::string epath;
+    ceph_dir_result *dirp; // valid for directories
+    struct ceph_statx stx;
+
+    SyncEntry(std::string_view path,
+              const struct ceph_statx &stx)
+      : epath(path),
+        stx(stx) {
+    }
+    SyncEntry(std::string_view path,
+              ceph_dir_result *dirp,
+              const struct ceph_statx &stx)
+      : epath(path),
+        dirp(dirp),
+        stx(stx) {
+    }
+
+    bool is_directory() const {
+      return S_ISDIR(stx.stx_mode);
+    }
+  };
+
+  typedef std::vector<std::unique_ptr<SnapshotReplayerThread>> SnapshotReplayers;
+
+  CephContext *m_cct;
+  FSMirror *m_fs_mirror;
+  Peer m_peer;
+  // probably need to be encapsulated when supporting cancelations
+  std::map<std::string, DirRegistry> m_registered;
+  std::vector<std::string> m_directories;
+  MountRef m_local_mount;
+  PeerReplayerAdminSocketHook *m_asok_hook = nullptr;
+
+  ceph::mutex m_lock;
+  ceph::condition_variable m_cond;
+  RadosRef m_remote_cluster;
+  MountRef m_remote_mount;
+  bool m_stopping = false;
+  SnapshotReplayers m_replayers;
+
+  void run(SnapshotReplayerThread *replayer);
+
+  boost::optional<std::string> pick_directory();
+  int register_directory(const std::string &dir_path, SnapshotReplayerThread *replayer);
+  void unregister_directory(const std::string &dir_path);
+  int try_lock_directory(const std::string &dir_path, SnapshotReplayerThread *replayer,
+                         DirRegistry *registry);
+  void unlock_directory(const std::string &dir_path, const DirRegistry &registry);
+  void sync_snaps(const std::string &dir_path, std::unique_lock<ceph::mutex> &locker);
+
+  int do_sync_snaps(const std::string &dir_path);
+  int build_snap_map(const std::string &dir_path, std::map<uint64_t, std::string> *snap_map,
+                     bool is_remote=false);
+  int propagate_snap_deletes(const std::string &dir_name, const std::set<std::string> &snaps);
+  int propagate_snap_renames(const std::string &dir_name,
+                             const std::set<std::pair<std::string,std::string>> &snaps);
+  int synchronize(const std::string &dir_path, uint64_t snap_id, const std::string &snap_name);
+  int do_synchronize(const std::string &path, const std::string &snap_name);
+
+  int cleanup_remote_dir(const std::string &dir_path);
+  int remote_mkdir(const std::string &local_path, const std::string &remote_path,
+                   const struct ceph_statx &stx);
+  int remote_file_op(const std::string &local_path, const std::string &remote_path,
+                     const struct ceph_statx &stx);
+  int remote_copy(const std::string &local_path,const std::string &remote_path,
+                  const struct ceph_statx &local_stx);
+};
+
+} // namespace mirror
+} // namespace cephfs
+
+#endif // CEPHFS_MIRROR_PEER_REPLAYER_H