m_args(args),
m_work_queue(work_queue),
m_snap_listener(this),
+ m_ts_listener(this),
m_asok_hook(new MirrorAdminSocketHook(cct, filesystem, this)) {
m_service_daemon->add_or_update_fs_attribute(m_filesystem.fscid, SERVICE_DAEMON_DIR_COUNT_KEY,
(uint64_t)0);
Context *ctx = new C_CallbackAdapter<
FSMirror, &FSMirror::handle_init_instance_watcher>(this);
- m_instance_watcher = InstanceWatcher::create(m_ioctx, m_snap_listener, m_work_queue);
+ m_instance_watcher = InstanceWatcher::create(m_ioctx, m_snap_listener, m_ts_listener, m_work_queue);
m_instance_watcher->init(ctx);
}
std::scoped_lock locker(m_lock);
Context *ctx = new C_CallbackAdapter<
FSMirror, &FSMirror::handle_init_mirror_watcher>(this);
- m_mirror_watcher = MirrorWatcher::create(m_ioctx, this, m_work_queue);
+ m_mirror_watcher = MirrorWatcher::create(m_ioctx, this, m_ts_listener, m_work_queue);
m_mirror_watcher->init(ctx);
}
monotime get_failed_ts() {
std::scoped_lock locker(m_lock);
- if (m_instance_watcher) {
- return m_instance_watcher->get_failed_ts();
- }
- if (m_mirror_watcher) {
- return m_mirror_watcher->get_failed_ts();
- }
+ return m_failed_ts;
+ }
- return clock::now();
+ void set_failed_ts() {
+ std::scoped_lock locker(m_lock);
+ m_failed_ts = clock::now();
}
bool is_blocklisted() {
monotime get_blocklisted_ts() {
std::scoped_lock locker(m_lock);
- if (m_instance_watcher) {
- return m_instance_watcher->get_blocklisted_ts();
- }
- if (m_mirror_watcher) {
- return m_mirror_watcher->get_blocklisted_ts();
- }
+ return m_blocklisted_ts;
+ }
- return clock::now();
+ void set_blocklisted_ts() {
+ std::scoped_lock locker(m_lock);
+ m_blocklisted_ts = clock::now();
}
Peers get_peers() {
void release_directory(std::string_view dir_path) override {
fs_mirror->handle_release_directory(dir_path);
}
+
+ };
+
+ struct TimestampListener: public Watcher::ErrorListener {
+ FSMirror *fs_mirror;
+ TimestampListener(FSMirror *fs_mirror)
+ : fs_mirror(fs_mirror) {
+ }
+ void set_blocklisted_ts() {
+ fs_mirror->set_blocklisted_ts();
+ }
+ void set_failed_ts() {
+ fs_mirror->set_failed_ts();
+ }
};
+ monotime m_blocklisted_ts;
+ monotime m_failed_ts;
CephContext *m_cct;
Filesystem m_filesystem;
uint64_t m_pool_id;
ceph::mutex m_lock = ceph::make_mutex("cephfs::mirror::fs_mirror");
SnapListener m_snap_listener;
+ TimestampListener m_ts_listener;
std::set<std::string, std::less<>> m_directories;
Peers m_all_peers;
std::map<Peer, std::unique_ptr<PeerReplayer>> m_peer_replayers;
} // anonymous namespace
InstanceWatcher::InstanceWatcher(librados::IoCtx &ioctx,
- Listener &listener, ContextWQ *work_queue)
+ Listener &listener, ErrorListener &elistener, ContextWQ *work_queue)
: Watcher(ioctx, instance_oid(stringify(ioctx.get_instance_id())), work_queue),
m_ioctx(ioctx),
m_listener(listener),
+ m_elistener(elistener),
m_work_queue(work_queue),
m_lock(ceph::make_mutex("cephfs::mirror::instance_watcher")) {
}
dout(0) << ": client blocklisted" <<dendl;
std::scoped_lock locker(m_lock);
m_blocklisted = true;
- m_blocklisted_ts = clock::now();
+ m_elistener.set_blocklisted_ts();
} else if (r == -ENOENT) {
derr << ": mirroring object deleted" << dendl;
m_failed = true;
- m_failed_ts = clock::now();
+ m_elistener.set_failed_ts();
} else if (r < 0) {
derr << ": rewatch error: " << cpp_strerror(r) << dendl;
m_failed = true;
- m_failed_ts = clock::now();
+ m_elistener.set_failed_ts();
}
}
};
static InstanceWatcher *create(librados::IoCtx &ioctx,
- Listener &listener, ContextWQ *work_queue) {
- return new InstanceWatcher(ioctx, listener, work_queue);
+ Listener &listener, ErrorListener &elistener, ContextWQ *work_queue) {
+ return new InstanceWatcher(ioctx, listener, elistener, work_queue);
}
- InstanceWatcher(librados::IoCtx &ioctx, Listener &listener, ContextWQ *work_queue);
+ InstanceWatcher(librados::IoCtx &ioctx, Listener &listener, ErrorListener &elistener, ContextWQ *work_queue);
~InstanceWatcher();
void init(Context *on_finish);
return m_blocklisted;
}
- monotime get_blocklisted_ts() {
- std::scoped_lock locker(m_lock);
- return m_blocklisted_ts;
- }
-
bool is_failed() {
std::scoped_lock locker(m_lock);
return m_failed;
}
- monotime get_failed_ts() {
- std::scoped_lock locker(m_lock);
- return m_failed_ts;
- }
-
private:
librados::IoCtx &m_ioctx;
Listener &m_listener;
+ ErrorListener &m_elistener;
ContextWQ *m_work_queue;
ceph::mutex m_lock;
bool m_blocklisted = false;
bool m_failed = false;
- monotime m_blocklisted_ts;
- monotime m_failed_ts;
-
void create_instance();
void handle_create_instance(int r);
namespace mirror {
MirrorWatcher::MirrorWatcher(librados::IoCtx &ioctx, FSMirror *fs_mirror,
- ContextWQ *work_queue)
+ ErrorListener &elistener, ContextWQ *work_queue)
: Watcher(ioctx, CEPHFS_MIRROR_OBJECT, work_queue),
m_ioctx(ioctx),
m_fs_mirror(fs_mirror),
+ m_elistener(elistener),
m_work_queue(work_queue),
m_lock(ceph::make_mutex("cephfs::mirror::mirror_watcher")),
m_instance_id(stringify(m_ioctx.get_instance_id())) {
dout(0) << ": client blocklisted" <<dendl;
std::scoped_lock locker(m_lock);
m_blocklisted = true;
- m_blocklisted_ts = clock::now();
+ m_elistener.set_blocklisted_ts();
} else if (r == -ENOENT) {
derr << ": mirroring object deleted" << dendl;
m_failed = true;
- m_failed_ts = clock::now();
+ m_elistener.set_failed_ts();
} else if (r < 0) {
derr << ": rewatch error: " << cpp_strerror(r) << dendl;
m_failed = true;
- m_failed_ts = clock::now();
+ m_elistener.set_failed_ts();
}
}
class MirrorWatcher : public Watcher {
public:
static MirrorWatcher *create(librados::IoCtx &ioctx, FSMirror *fs_mirror,
- ContextWQ *work_queue) {
- return new MirrorWatcher(ioctx, fs_mirror, work_queue);
+ ErrorListener &elistener, ContextWQ *work_queue) {
+ return new MirrorWatcher(ioctx, fs_mirror, elistener, work_queue);
}
- MirrorWatcher(librados::IoCtx &ioctx, FSMirror *fs_mirror,
+ MirrorWatcher(librados::IoCtx &ioctx, FSMirror *fs_mirror, ErrorListener &elistener,
ContextWQ *work_queue);
~MirrorWatcher();
return m_blocklisted;
}
- monotime get_blocklisted_ts() {
- std::scoped_lock locker(m_lock);
- return m_blocklisted_ts;
- }
-
bool is_failed() {
std::scoped_lock locker(m_lock);
return m_failed;
}
- monotime get_failed_ts() {
- std::scoped_lock locker(m_lock);
- return m_failed_ts;
- }
-
private:
librados::IoCtx &m_ioctx;
FSMirror *m_fs_mirror;
+ ErrorListener &m_elistener;
ContextWQ *m_work_queue;
ceph::mutex m_lock;
bool m_blocklisted = false;
bool m_failed = false;
- monotime m_blocklisted_ts;
- monotime m_failed_ts;
-
void register_watcher();
void handle_register_watcher(int r);
void register_watch(Context *on_finish);
void unregister_watch(Context *on_finish);
+ struct ErrorListener {
+ virtual ~ErrorListener() {
+ }
+ virtual void set_blocklisted_ts() = 0;
+ virtual void set_failed_ts() = 0;
+ };
+
protected:
std::string m_oid;