m_local_mount(mount),
m_service_daemon(service_daemon),
m_asok_hook(new PeerReplayerAdminSocketHook(cct, filesystem, peer, this)),
- m_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::" + stringify(peer.uuid))) {
+ m_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::" + stringify(peer.uuid))),
+ smq_lock(ceph::make_mutex("cephfs::mirror::PeerReplayer::smq" + stringify(peer.uuid))) {
// reset sync stats sent via service daemon
m_service_daemon->add_or_update_peer_attribute(m_filesystem.fscid, m_peer,
SERVICE_DAEMON_FAILED_DIR_COUNT_KEY, (uint64_t)0);
m_cond.notify_all();
}
+void PeerReplayer::enqueue_syncm(const std::shared_ptr<SyncMechanism>& item) {
+ dout(20) << ": Enqueue syncm object=" << item << dendl;
+ std::lock_guard lock(smq_lock);
+ syncm_q.push(item);
+ smq_cv.notify_all();
+}
+
+
boost::optional<std::string> PeerReplayer::pick_directory() {
dout(20) << dendl;
return r == 0 ? 0 : r;
}
-int PeerReplayer::remote_file_op(SyncMechanism *syncm, const std::string &dir_root,
+int PeerReplayer::remote_file_op(std::shared_ptr<SyncMechanism>& syncm, const std::string &dir_root,
const std::string &epath, const struct ceph_statx &stx,
bool sync_check, const FHandles &fh, bool need_data_sync, bool need_attr_sync) {
dout(10) << ": dir_root=" << dir_root << ", epath=" << epath << ", need_data_sync=" << need_data_sync
return r;
}
- SyncMechanism *syncm;
+ std::shared_ptr<SyncMechanism> syncm;
if (fh.p_mnt == m_local_mount) {
- syncm = new SnapDiffSync(dir_root, m_local_mount, m_remote_mount, &fh,
- m_peer, current, prev);
+ syncm = std::make_shared<SnapDiffSync>(dir_root, m_local_mount, m_remote_mount,
+ &fh, m_peer, current, prev);
+
} else {
- syncm = new RemoteSync(m_local_mount, m_remote_mount, &fh,
- m_peer, current, boost::none);
+ syncm = std::make_shared<RemoteSync>(m_local_mount, m_remote_mount, &fh,
+ m_peer, current, boost::none);
}
r = syncm->init_sync();
derr << ": failed to initialize sync mechanism" << dendl;
ceph_close(m_local_mount, fh.c_fd);
ceph_close(fh.p_mnt, fh.p_fd);
- delete syncm;
return r;
}
+ enqueue_syncm(syncm);
+
// starting from this point we shouldn't care about manual closing of fh.c_fd,
// it will be closed automatically when bound tdirp is closed.
while (true) {
}
syncm->finish_sync();
- delete syncm;
dout(20) << " cur:" << fh.c_fd
<< " prev:" << fh.p_fd
break;
}
- // TODO Wait and fetch syncm from SyncMechanism Queue
+ std::shared_ptr<SyncMechanism> syncm;
+ {
+ std::unique_lock lock(smq_lock);
+ dout(20) << ": snapshot data replayer waiting for syncm to process" << dendl;
+ smq_cv.wait(lock, [this]{return !syncm_q.empty();});
+ syncm = syncm_q.front();
+ dout(20) << ": snapshot data replayer woke up! syncm=" << syncm << dendl;
+ }
// TODO pre_sync and open handles
// TODO Wait and fetch files from syncm data queue and sync
+ // Dequeue syncm object after processing
+ {
+ std::unique_lock lock(smq_lock);
+ if (!syncm_q.empty() && syncm_q.front() == syncm) {
+ dout(20) << ": Dequeue syncm object=" << syncm << dendl;
+ syncm_q.pop();
+ smq_cv.notify_all();
+ }
+ }
+
//lock again to satify m_cond
locker.lock();
}
SnapshotDataReplayers m_data_replayers;
+ ceph::mutex smq_lock;
+ ceph::condition_variable smq_cv;
+ std::queue<std::shared_ptr<SyncMechanism>> syncm_q;
+
ServiceDaemonStats m_service_daemon_stats;
PerfCounters *m_perf_counters;
int do_sync_snaps(const std::string &dir_root);
int remote_mkdir(const std::string &epath, const struct ceph_statx &stx, const FHandles &fh);
- int remote_file_op(SyncMechanism *syncm, const std::string &dir_root,
+ int remote_file_op(std::shared_ptr<SyncMechanism>& syncm, const std::string &dir_root,
const std::string &epath, const struct ceph_statx &stx,
bool sync_check, const FHandles &fh, bool need_data_sync, bool need_attr_sync);
int copy_to_remote(const std::string &dir_root, const std::string &epath, const struct ceph_statx &stx,
const FHandles &fh, uint64_t num_blocks, struct cblock *b);
int sync_perms(const std::string& path);
+
+ // add syncm to syncm_q
+ void enqueue_syncm(const std::shared_ptr<SyncMechanism>& item);
};
} // namespace mirror