def process_updates(self):
def acquire_message(dir_path):
+ lookup_info = self.policy.lookup(dir_path)
+ sync_latest_snapshot = False
+ sync_from_snapshot = None
+ if lookup_info:
+ sync_latest_snapshot = lookup_info['sync_latest_snapshot']
+ sync_from_snapshot = lookup_info['sync_from_snapshot']
return json.dumps({'dir_path': dir_path,
- 'mode': 'acquire'
+ 'mode': 'acquire',
+ 'sync_latest_snapshot': sync_latest_snapshot,
+ 'sync_from_snapshot': sync_from_snapshot
})
def release_message(dir_path):
return json.dumps({'dir_path': dir_path,
# take care to not overwrite purge status
update_map[dir_path] = {'version': 1,
'instance_id': lookup_info['instance_id'],
- 'last_shuffled': lookup_info['mapped_time']
+ 'last_shuffled': lookup_info['mapped_time'],
+ 'sync_latest_snapshot': lookup_info['sync_latest_snapshot'],
+ 'sync_from_snapshot': lookup_info['sync_from_snapshot']
}
if lookup_info['purging']:
update_map[dir_path]['purging'] = 1
self.notifier.notify(dir_path, message, self.handle_peer_ack)
self.dir_paths.clear()
- def add_dir(self, dir_path):
+ def add_dir(self, dir_path, sync_latest_snapshot, sync_from_snapshot):
with self.lock:
lookup_info = self.policy.lookup(dir_path)
if lookup_info:
schedule = self.policy.add_dir(dir_path)
if not schedule:
return
- update_map = {dir_path: {'version': 1, 'instance_id': '', 'last_shuffled': 0.0}}
+ update_map = {dir_path: {'version': 1, 'instance_id': '', 'last_shuffled': 0.0,
+ 'sync_latest_snapshot': sync_latest_snapshot,
+ 'sync_from_snapshot': sync_from_snapshot}}
updated = False
def update_safe(updates, removals, r):
nonlocal updated
raise MirrorException(-errno.EINVAL, f'{dir_path} should be an absolute path')
return os.path.normpath(dir_path)
- def add_dir(self, filesystem, dir_path):
+ def add_dir(self, filesystem, dir_path, sync_latest_snapshot, sync_from_snapshot):
try:
with self.lock:
if not self.filesystem_exist(filesystem):
raise MirrorException(-errno.EINVAL, f'filesystem {filesystem} is not mirrored')
dir_path = FSSnapshotMirror.norm_path(dir_path)
log.debug(f'path normalized to {dir_path}')
- fspolicy.add_dir(dir_path)
+ fspolicy.add_dir(dir_path, sync_latest_snapshot, sync_from_snapshot)
return 0, json.dumps({}), ''
except MirrorException as me:
return me.args[0], '', me.args[1]
@CLIWriteCommand('fs snapshot mirror add')
def snapshot_mirror_add_dir(self,
fs_name: str,
- path: str):
+ path: str,
+ sync_latest_snapshot: Optional[bool] = False,
+ sync_from_snapshot: Optional[str] = None):
"""Add a directory for snapshot mirroring"""
- return self.fs_snapshot_mirror.add_dir(fs_name, path)
+ return self.fs_snapshot_mirror.add_dir(fs_name, path, sync_latest_snapshot,
+ sync_from_snapshot)
@CLIWriteCommand('fs snapshot mirror remove')
def snapshot_mirror_remove_dir(self,
}
}
-void FSMirror::handle_acquire_directory(string_view dir_path) {
- dout(5) << ": dir_path=" << dir_path << dendl;
+void FSMirror::handle_acquire_directory(string_view dir_path,
+ bool sync_latest_snapshot,
+ string_view sync_from_snapshot) {
+ dout(5) << ": dir_path=" << dir_path << ", sync_latest_snapshot=" << sync_latest_snapshot
+ << ", sync_from_snapshot=" << sync_from_snapshot << dendl;
{
std::scoped_lock locker(m_lock);
for (auto &[peer, peer_replayer] : m_peer_replayers) {
dout(10) << ": peer=" << peer << dendl;
- peer_replayer->add_directory(dir_path);
+ peer_replayer->add_directory(dir_path, sync_latest_snapshot, sync_from_snapshot);
}
}
if (m_perf_counters) {
: fs_mirror(fs_mirror) {
}
- void acquire_directory(std::string_view dir_path) override {
- fs_mirror->handle_acquire_directory(dir_path);
+ void acquire_directory(std::string_view dir_path,
+ bool sync_latest_snapshot = false,
+ std::string_view sync_from_snapshot = "") override {
+ fs_mirror->handle_acquire_directory(dir_path, sync_latest_snapshot, sync_from_snapshot);
}
void release_directory(std::string_view dir_path) override {
void shutdown_instance_watcher();
void handle_shutdown_instance_watcher(int r);
- void handle_acquire_directory(std::string_view dir_path);
+ void handle_acquire_directory(std::string_view dir_path, bool sync_latest_snapshot,
+ std::string_view sync_from_snapshot);
void handle_release_directory(std::string_view dir_path);
};
std::string dir_path;
std::string mode;
+ bool sync_latest_snapshot;
+ std::string sync_from_snapshot;
+
try {
JSONDecoder jd(bl);
JSONDecoder::decode_json("dir_path", dir_path, &jd.parser, true);
JSONDecoder::decode_json("mode", mode, &jd.parser, true);
+ JSONDecoder::decode_json("sync_latest_snapshot", sync_latest_snapshot, &jd.parser, true);
+ JSONDecoder::decode_json("sync_from_snapshot", sync_from_snapshot, &jd.parser, true);
} catch (const JSONDecoder::err &e) {
derr << ": failed to decode notify json: " << e.what() << dendl;
}
dout(20) << ": notifier_id=" << notifier_id << ", dir_path=" << dir_path
- << ", mode=" << mode << dendl;
+ << ", mode=" << mode << ", sync_latest_snapshot=" << sync_latest_snapshot
+ << ", sync_from_snapshot=" << sync_from_snapshot << dendl;
if (mode == "acquire") {
- m_listener.acquire_directory(dir_path);
+ if (sync_from_snapshot == "null") {
+ m_listener.acquire_directory(dir_path, sync_latest_snapshot);
+ } else {
+ m_listener.acquire_directory(dir_path, sync_latest_snapshot, sync_from_snapshot);
+ }
} else if (mode == "release") {
m_listener.release_directory(dir_path);
} else {
virtual ~Listener() {
}
- virtual void acquire_directory(std::string_view dir_path) = 0;
+ virtual void acquire_directory(std::string_view dir_path,
+ bool sync_latest_snapshot = false,
+ std::string_view sync_from_snapshot = "") = 0;
virtual void release_directory(std::string_view dir_path) = 0;
};
m_local_cluster(local_cluster),
m_filesystem(filesystem),
m_peer(peer),
- m_directories(directories.begin(), directories.end()),
m_local_mount(mount),
m_service_daemon(service_daemon),
m_asok_hook(new PeerReplayerAdminSocketHook(cct, filesystem, peer, this)),
m_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::" + stringify(peer.uuid))) {
+ std::for_each(directories.begin(), directories.end(), [this](const auto& dir_root) {
+ m_directories[dir_root] = SnapSyncFrom();
+ });
+
// reset sync stats sent via service daemon
m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
SERVICE_DAEMON_FAILED_DIR_COUNT_KEY, (uint64_t)0);
}
int PeerReplayer::init() {
- dout(20) << ": initial dir list=[" << m_directories << "]" << dendl;
+ std::string str_list = ": initial dir list=[";
for (auto &dir_root : m_directories) {
- m_snap_sync_stats.emplace(dir_root, SnapSyncStat());
+ m_snap_sync_stats.emplace(dir_root.first, SnapSyncStat());
+ str_list += dir_root.first + " ";
}
+ dout(20) << str_list << "]" << dendl;
auto &remote_client = m_peer.remote.client_name;
auto &remote_cluster = m_peer.remote.cluster_name;
m_remote_cluster.reset();
}
-void PeerReplayer::add_directory(string_view dir_root) {
- dout(20) << ": dir_root=" << dir_root << dendl;
+void PeerReplayer::add_directory(string_view dir_root, bool sync_latest_snapshot,
+ string_view sync_from_snapshot) {
+ dout(20) << ": dir_root=" << dir_root << ", sync_latest_snapshot=" << sync_latest_snapshot
+ << ", sync_from_snapshot=" << sync_from_snapshot << dendl;
std::scoped_lock locker(m_lock);
- m_directories.emplace_back(dir_root);
+ m_directories[std::string(dir_root)] = SnapSyncFrom(sync_latest_snapshot, sync_from_snapshot);
m_snap_sync_stats.emplace(dir_root, SnapSyncStat());
m_cond.notify_all();
}
auto _dir_root = std::string(dir_root);
std::scoped_lock locker(m_lock);
- auto it = std::find(m_directories.begin(), m_directories.end(), _dir_root);
+ auto it = m_directories.find(_dir_root);
if (it != m_directories.end()) {
m_directories.erase(it);
}
boost::optional<std::string> candidate;
for (auto &dir_root : m_directories) {
- auto &sync_stat = m_snap_sync_stats.at(dir_root);
+ auto &sync_stat = m_snap_sync_stats.at(dir_root.first);
if (sync_stat.failed) {
std::chrono::duration<double> d = now - *sync_stat.last_failed;
if (d.count() < retry_timo) {
continue;
}
}
- if (!m_registered.count(dir_root)) {
- candidate = dir_root;
+ if (!m_registered.count(dir_root.first)) {
+ candidate = dir_root.first;
break;
}
}
- std::rotate(m_directories.begin(), m_directories.begin() + 1, m_directories.end());
return candidate;
}
unlock_directory(it->first, it->second);
m_registered.erase(it);
- if (std::find(m_directories.begin(), m_directories.end(), dir_root) == m_directories.end()) {
+ if (m_directories.find(dir_root) == m_directories.end()) {
m_snap_sync_stats.erase(dir_root);
}
}
dout(10) << ": dir_root=" << dir_root << " unlocked" << dendl;
}
+int PeerReplayer::get_snap_id(const std::string &dir_root, const std::string& snap_name) {
+ snap_info info;
+ auto snap_dir = snapshot_dir_path(m_cct, dir_root);
+ auto snap_path = snapshot_path(snap_dir, snap_name);
+ int r = ceph_get_snap_info(m_local_mount, snap_path.c_str(), &info);
+ if (r < 0) {
+ derr << ": failed to fetch " << snap_name << ", snap info for snap_path=" << snap_path
+ << ": " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ return info.id;
+}
+
int PeerReplayer::build_snap_map(const std::string &dir_root,
std::map<uint64_t, std::string> *snap_map, bool is_remote) {
auto snap_dir = snapshot_dir_path(m_cct, dir_root);
return r;
}
+ auto sync_stat = m_snap_sync_stats.at(dir_root);
std::set<std::string> snaps;
auto entry = ceph_readdir(mnt, dirp);
while (entry != NULL) {
entry = ceph_readdir(mnt, dirp);
}
+ uint64_t snap_id_in = 0;
+ std::string snap_name = m_directories.at(dir_root).sync_from_snapshot;
+ if (!is_remote && !snap_name.empty()) {
+ r = get_snap_id(dir_root, snap_name);
+ if (r < 0) {
+ derr << ": defaulting to first snapshot for syncing" << dendl;
+ } else {
+ snap_id_in = r;
+ }
+ }
+
int rv = 0;
for (auto &snap : snaps) {
snap_info info;
if (rv != 0) {
break;
}
- snap_map->emplace(snap_id, snap);
+ if (snap_id >= snap_id_in) {
+ snap_map->emplace(snap_id, snap);
+ }
+ }
+
+ if (!is_remote && m_directories.at(dir_root).sync_latest_snapshot) {
+ // Now we know the highest snap_id object emplaced.
+ // Reset the snap_map to the highest snap_id element.
+ if (!sync_stat.current_syncing_snap) { // skip if there's a sync in progress
+ auto it = snap_map->rbegin();
+ auto sid = it->first;
+ auto sname = it->second;
+ snap_map->clear();
+ snap_map->emplace(sid, sname);
+ }
}
r = ceph_closedir(mnt, dirp);
}
}
- r = propagate_snap_deletes(dir_root, snaps_deleted);
- if (r < 0) {
- derr << ": failed to propgate deleted snapshots" << dendl;
- return r;
+ if (!m_directories.at(dir_root).sync_latest_snapshot) {
+ r = propagate_snap_deletes(dir_root, snaps_deleted);
+ if (r < 0) {
+ derr << ": failed to propgate deleted snapshots" << dendl;
+ return r;
+ }
}
r = propagate_snap_renames(dir_root, snaps_renamed);
void shutdown();
// add a directory to mirror queue
- void add_directory(std::string_view dir_root);
+ void add_directory(std::string_view dir_root, bool sync_latest_snapshot,
+ std::string_view sync_from_snapshot);
// remove a directory from queue
void remove_directory(std::string_view dir_root);
};
struct SnapSyncStat {
- uint64_t nr_failures = 0; // number of consecutive failures
+ uint64_t nr_failures; // number of consecutive failures
boost::optional<monotime> last_failed; // lat failed timestamp
boost::optional<std::string> last_failed_reason;
- bool failed = false; // hit upper cap for consecutive failures
+ bool failed; // hit upper cap for consecutive failures
boost::optional<std::pair<uint64_t, std::string>> last_synced_snap;
boost::optional<std::pair<uint64_t, std::string>> current_syncing_snap;
- uint64_t synced_snap_count = 0;
- uint64_t deleted_snap_count = 0;
- uint64_t renamed_snap_count = 0;
- monotime last_synced = clock::zero();
+ uint64_t synced_snap_count;
+ uint64_t deleted_snap_count;
+ uint64_t renamed_snap_count;
+ monotime last_synced;
boost::optional<double> last_sync_duration;
boost::optional<uint64_t> last_sync_bytes; //last sync bytes for display in status
- uint64_t sync_bytes = 0; //sync bytes counter, independently for each directory sync.
+ uint64_t sync_bytes; //sync bytes counter, independently for each directory sync.
+
+ SnapSyncStat() {
+ nr_failures = 0;
+ failed = false;
+ synced_snap_count = 0;
+ deleted_snap_count = 0;
+ renamed_snap_count = 0;
+ last_synced = clock::zero();
+ sync_bytes = 0;
+ }
+ };
+
+ struct SnapSyncFrom {
+ bool sync_latest_snapshot;
+ std::string sync_from_snapshot;
+ SnapSyncFrom() {
+ sync_latest_snapshot = false;
+ sync_from_snapshot = "";
+ }
+ SnapSyncFrom(bool sl, const std::string_view& sf) {
+ sync_latest_snapshot = sl;
+ sync_from_snapshot = sf;
+ }
};
void _inc_failed_count(const std::string &dir_root) {
Peer m_peer;
// probably need to be encapsulated when supporting cancelations
std::map<std::string, DirRegistry> m_registered;
- std::vector<std::string> m_directories;
+ std::map<std::string, SnapSyncFrom> m_directories;
std::map<std::string, SnapSyncStat> m_snap_sync_stats;
MountRef m_local_mount;
ServiceDaemon *m_service_daemon;
int build_snap_map(const std::string &dir_root, std::map<uint64_t, std::string> *snap_map,
bool is_remote=false);
+ int get_snap_id(const std::string &dir_root, const std::string& snap_name);
int propagate_snap_deletes(const std::string &dir_root, const std::set<std::string> &snaps);
int propagate_snap_renames(const std::string &dir_root,
const std::set<std::pair<std::string,std::string>> &snaps);