From: Jason Dillaman Date: Sat, 11 Jul 2015 02:56:27 +0000 (-0400) Subject: journal: added additional callbacks for async ops X-Git-Tag: v10.0.1~102^2~25 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=d8f7e96ea15bbf749b8bfd6b16eadb5b4eb9fa76;p=ceph.git journal: added additional callbacks for async ops Signed-off-by: Jason Dillaman --- diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc index e5087e5ebc16..499100e356e0 100644 --- a/src/journal/JournalMetadata.cc +++ b/src/journal/JournalMetadata.cc @@ -43,40 +43,27 @@ JournalMetadata::~JournalMetadata() { rados.watch_flush(); } -int JournalMetadata::init() { +void JournalMetadata::init(Context *on_init) { assert(!m_initialized); m_initialized = true; - int r = client::get_immutable_metadata(m_ioctx, m_oid, &m_order, - &m_splay_width); - if (r < 0) { - lderr(m_cct) << __func__ << ": failed to retrieve journal metadata: " - << cpp_strerror(r) << dendl; - return r; - } - m_timer = new SafeTimer(m_cct, m_timer_lock, false); m_timer->init(); - r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx); + int r = m_ioctx.watch2(m_oid, &m_watch_handle, &m_watch_ctx); if (r < 0) { lderr(m_cct) << __func__ << ": failed to watch journal" << cpp_strerror(r) << dendl; - return r; + on_init->complete(r); + return; } - C_SaferCond cond; - refresh(&cond); - r = cond.wait(); - if (r < 0) { - return r; - } - return 0; + C_ImmutableMetadata *ctx = new C_ImmutableMetadata(this, on_init); + client::get_immutable_metadata(m_ioctx, m_oid, &m_order, &m_splay_width, + ctx); } int JournalMetadata::register_client(const std::string &description) { - assert(!m_client_id.empty()); - ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl; int r = client::client_register(m_ioctx, m_oid, m_client_id, description); if (r < 0) { @@ -211,8 +198,20 @@ bool JournalMetadata::get_last_allocated_tid(const std::string &tag, return true; } +void JournalMetadata::handle_immutable_metadata(int r, Context *on_init) { + if (r < 0) { + lderr(m_cct) << "failed to initialize immutable metadata: " + << cpp_strerror(r) << dendl; + on_init->complete(r); + return; + } + + ldout(m_cct, 10) << "initialized immutable metadata" << dendl; + refresh(on_init); +} + void JournalMetadata::refresh(Context *on_complete) { - ldout(m_cct, 10) << "refreshing journal metadata" << dendl; + ldout(m_cct, 10) << "refreshing mutable metadata" << dendl; C_Refresh *refresh = new C_Refresh(this, on_complete); client::get_mutable_metadata(m_ioctx, m_oid, &refresh->minimum_set, &refresh->active_set, @@ -220,7 +219,7 @@ void JournalMetadata::refresh(Context *on_complete) { } void JournalMetadata::handle_refresh_complete(C_Refresh *refresh, int r) { - ldout(m_cct, 10) << "refreshed journal metadata: r=" << r << dendl; + ldout(m_cct, 10) << "refreshed mutable metadata: r=" << r << dendl; if (r == 0) { Mutex::Locker locker(m_lock); diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h index 81a539576f4a..8763edd3edb0 100644 --- a/src/journal/JournalMetadata.h +++ b/src/journal/JournalMetadata.h @@ -42,7 +42,7 @@ public: const std::string &client_id, double commit_interval); ~JournalMetadata(); - int init(); + void init(Context *on_init); void add_listener(Listener *listener); void remove_listener(Listener *listener); @@ -159,6 +159,19 @@ private: } }; + struct C_ImmutableMetadata : public Context { + JournalMetadataPtr journal_metadata; + Context *on_finish; + + C_ImmutableMetadata(JournalMetadata *_journal_metadata, Context *_on_finish) + : journal_metadata(_journal_metadata), on_finish(_on_finish) { + } + + virtual void finish(int r) { + journal_metadata->handle_immutable_metadata(r, on_finish); + } + + }; struct C_Refresh : public Context { JournalMetadataPtr journal_metadata; uint64_t minimum_set; @@ -209,6 +222,8 @@ private: ObjectSetPosition m_commit_position; Context *m_commit_position_ctx; + void handle_immutable_metadata(int r, Context *on_init); + void refresh(Context *on_finish); void handle_refresh_complete(C_Refresh *refresh, int r); diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc index 82b2ef926c7a..efca93d688af 100644 --- a/src/journal/Journaler.cc +++ b/src/journal/Journaler.cc @@ -31,7 +31,8 @@ static const std::string JOURNAL_OBJECT_PREFIX = "journal_data."; using namespace cls::journal; Journaler::Journaler(librados::IoCtx &header_ioctx, librados::IoCtx &data_ioctx, - uint64_t journal_id, const std::string &client_id) + const std::string &journal_id, + const std::string &client_id) : m_client_id(client_id), m_metadata(NULL), m_player(NULL), m_recorder(NULL), m_trimmer(NULL) { @@ -39,8 +40,8 @@ Journaler::Journaler(librados::IoCtx &header_ioctx, librados::IoCtx &data_ioctx, m_data_ioctx.dup(data_ioctx); m_cct = reinterpret_cast(m_header_ioctx.cct()); - m_header_oid = JOURNAL_HEADER_PREFIX + stringify(journal_id); - m_object_oid_prefix = JOURNAL_OBJECT_PREFIX + stringify(journal_id) + "."; + m_header_oid = JOURNAL_HEADER_PREFIX + journal_id; + m_object_oid_prefix = JOURNAL_OBJECT_PREFIX + journal_id + "."; // TODO configurable commit interval m_metadata = new JournalMetadata(m_header_ioctx, m_header_oid, m_client_id, @@ -60,8 +61,8 @@ Journaler::~Journaler() { assert(m_recorder == NULL); } -int Journaler::init() { - return m_metadata->init(); +void Journaler::init(Context *on_init) { + m_metadata->init(on_init); } int Journaler::create(uint8_t order, uint8_t splay_width) { @@ -136,17 +137,30 @@ void Journaler::start_append() { m_metadata, 0, 0, 0); } -void Journaler::stop_append() { +int Journaler::stop_append() { assert(m_recorder != NULL); - m_recorder->flush(); + + C_SaferCond cond; + flush(&cond); + int r = cond.wait(); + if (r < 0) { + return r; + } + delete m_recorder; m_recorder = NULL; + return 0; } Future Journaler::append(const std::string &tag, const bufferlist &payload_bl) { return m_recorder->append(tag, payload_bl); } +void Journaler::flush(Context *on_safe) { + // TODO pass ctx + m_recorder->flush(); +} + void Journaler::create_player(ReplayHandler *replay_handler) { assert(m_player == NULL); m_player = new JournalPlayer(m_data_ioctx, m_object_oid_prefix, m_metadata, diff --git a/src/journal/Journaler.h b/src/journal/Journaler.h index 7de2bbcf7418..c6abf711007c 100644 --- a/src/journal/Journaler.h +++ b/src/journal/Journaler.h @@ -13,6 +13,7 @@ #include #include "include/assert.h" +class Context; class SafeTimer; namespace journal { @@ -26,13 +27,13 @@ class ReplayHandler; class Journaler { public: Journaler(librados::IoCtx &header_ioctx, librados::IoCtx &data_ioctx, - uint64_t journal_id, const std::string &client_id); + const std::string &journal_id, const std::string &client_id); ~Journaler(); - int init(); - int create(uint8_t order, uint8_t splay_width); + void init(Context *on_init); + int register_client(const std::string &description); int unregister_client(); @@ -45,7 +46,8 @@ public: void start_append(); Future append(const std::string &tag, const bufferlist &bl); - void stop_append(); + void flush(Context *on_safe); + int stop_append(); private: librados::IoCtx m_header_ioctx;