m_lock("Journal<I>::m_lock"), m_state(STATE_UNINITIALIZED),
m_error_result(0), m_replay_handler(this), m_close_pending(false),
m_event_lock("Journal<I>::m_event_lock"), m_event_tid(0),
- m_blocking_writes(false), m_journal_replay(NULL) {
+ m_blocking_writes(false), m_journal_replay(NULL),
+ m_metadata_listener(this) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
delete m_journal_replay;
m_journal_replay = NULL;
+ m_journaler->remove_listener(&m_metadata_listener);
+
transition_state(STATE_CLOSING, r);
m_journaler->shut_down(create_async_context_callback(
m_image_ctx, create_context_callback<
delete m_journal_replay;
m_journal_replay = NULL;
+ m_journaler->remove_listener(&m_metadata_listener);
+
transition_state(STATE_RESTARTING_REPLAY, r);
m_journaler->shut_down(create_async_context_callback(
m_image_ctx, create_context_callback<
m_image_ctx, create_context_callback<
Journal<I>, &Journal<I>::handle_get_tags>(this)));
m_journaler->get_tags(m_tag_class, &tags_ctx->tags, tags_ctx);
+
+ m_journaler->add_listener(&m_metadata_listener);
}
template <typename I>
m_wait_for_state_contexts.push_back(on_state);
}
+template <typename I>
+int Journal<I>::check_resync_requested(bool *do_resync) {
+ Mutex::Locker l(m_lock);
+ return check_resync_requested_internal(do_resync);
+}
+
+template <typename I>
+int Journal<I>::check_resync_requested_internal(bool *do_resync) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << dendl;
+
+ assert(m_lock.is_locked());
+ assert(do_resync != nullptr);
+
+ cls::journal::Client client;
+ int r = m_journaler->get_cached_client(IMAGE_CLIENT_ID, &client);
+ if (r < 0) {
+ lderr(cct) << "failed to retrieve client: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ librbd::journal::ClientData client_data;
+ bufferlist::iterator bl_it = client.data.begin();
+ try {
+ ::decode(client_data, bl_it);
+ } catch (const buffer::error &err) {
+ lderr(cct) << "failed to decode client data: " << err << dendl;
+ return -EINVAL;
+ }
+
+ journal::ImageClientMeta *image_client_meta =
+ boost::get<journal::ImageClientMeta>(&client_data.client_meta);
+ if (image_client_meta == nullptr) {
+ lderr(cct) << "failed to access image client meta struct" << dendl;
+ return -EINVAL;
+ }
+
+ *do_resync = image_client_meta->resync_requested;
+
+ return 0;
+}
+
+template <typename I>
+void Journal<I>::handle_metadata_updated() {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << dendl;
+
+ std::list<journal::ResyncListener *> resync_private_list;
+
+ {
+ Mutex::Locker l(m_lock);
+
+ if (m_state == STATE_CLOSING || m_state == STATE_CLOSED ||
+ m_state == STATE_UNINITIALIZED || m_state == STATE_STOPPING) {
+ return;
+ }
+
+ bool do_resync = false;
+ int r = check_resync_requested_internal(&do_resync);
+ if (r < 0) {
+ lderr(cct) << "failed to check if a resync was requested" << dendl;
+ return;
+ }
+
+ if (do_resync) {
+ for (const auto& listener :
+ m_listener_map[journal::ListenerType::RESYNC]) {
+ journal::ResyncListener *rsync_listener =
+ boost::get<journal::ResyncListener *>(listener);
+ resync_private_list.push_back(rsync_listener);
+ }
+ }
+ }
+
+ for (const auto& listener : resync_private_list) {
+ listener->handle_resync();
+ }
+}
+
+template <typename I>
+void Journal<I>::add_listener(journal::ListenerType type,
+ journal::JournalListenerPtr listener) {
+ Mutex::Locker l(m_lock);
+ m_listener_map[type].push_back(listener);
+}
+
+template <typename I>
+void Journal<I>::remove_listener(journal::ListenerType type,
+ journal::JournalListenerPtr listener) {
+ Mutex::Locker l(m_lock);
+ m_listener_map[type].remove(listener);
+}
+
} // namespace librbd
template class librbd::Journal<librbd::ImageCtx>;
#include "include/rados/librados.hpp"
#include "common/Mutex.h"
#include "journal/Future.h"
+#include "journal/JournalMetadataListener.h"
#include "journal/ReplayEntry.h"
#include "journal/ReplayHandler.h"
#include "librbd/journal/Types.h"
Context *on_start, Context *on_close_request);
void stop_external_replay();
+ void add_listener(journal::ListenerType type,
+ journal::JournalListenerPtr listener);
+ void remove_listener(journal::ListenerType type,
+ journal::JournalListenerPtr listener);
+
+ int check_resync_requested(bool *do_resync);
+
private:
ImageCtxT &m_image_ctx;
journal::Replay<ImageCtxT> *m_journal_replay;
Context *m_on_replay_close_request = nullptr;
+ struct MetadataListener : public ::journal::JournalMetadataListener {
+ Journal<ImageCtxT> *journal;
+
+ MetadataListener(Journal<ImageCtxT> *journal) : journal(journal) { }
+
+ void handle_update(::journal::JournalMetadata *) {
+ FunctionContext *ctx = new FunctionContext([this](int r) {
+ journal->handle_metadata_updated();
+ });
+ journal->m_work_queue->queue(ctx, 0);
+ }
+ } m_metadata_listener;
+
+ typedef std::map<journal::ListenerType,
+ std::list<journal::JournalListenerPtr> > ListenerMap;
+ ListenerMap m_listener_map;
+
uint64_t append_io_events(journal::EventType event_type,
const Bufferlists &bufferlists,
const AioObjectRequests &requests,
bool is_steady_state() const;
void wait_for_steady_state(Context *on_state);
+
+ int check_resync_requested_internal(bool *do_resync);
+
+ void handle_metadata_updated();
};
} // namespace librbd