From: Jason Dillaman Date: Fri, 17 Jul 2015 15:26:54 +0000 (-0400) Subject: journal: simplified commit position tracking X-Git-Tag: v10.0.1~102^2~10 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=352194d90a58a8703b8e816824a67410061d23bd;p=ceph.git journal: simplified commit position tracking Now the journal player and recorder will allocate a tid to represent the associated journal entry. The order of these allocations are tracked so that the commit position can be moved only when all prior commits are safely on disk. Signed-off-by: Jason Dillaman --- diff --git a/src/journal/Future.h b/src/journal/Future.h index 88ad436ef2fb..dbc7d4d68776 100644 --- a/src/journal/Future.h +++ b/src/journal/Future.h @@ -34,8 +34,13 @@ public: int get_return_value() const; private: + friend class Journaler; friend std::ostream& operator<<(std::ostream&, const Future&); + inline FutureImplPtr get_future_impl() const { + return m_future_impl; + } + FutureImplPtr m_future_impl; }; diff --git a/src/journal/FutureImpl.cc b/src/journal/FutureImpl.cc index 4b966f0a8a82..59494a17f449 100644 --- a/src/journal/FutureImpl.cc +++ b/src/journal/FutureImpl.cc @@ -7,8 +7,10 @@ namespace journal { -FutureImpl::FutureImpl(Finisher &finisher, const std::string &tag, uint64_t tid) +FutureImpl::FutureImpl(Finisher &finisher, const std::string &tag, uint64_t tid, + uint64_t commit_tid) : RefCountedObject(NULL, 0), m_finisher(finisher), m_tag(tag), m_tid(tid), + m_commit_tid(commit_tid), m_lock(utils::unique_lock_name("FutureImpl::m_lock", this)), m_safe(false), m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE), m_consistent_ack(this) { diff --git a/src/journal/FutureImpl.h b/src/journal/FutureImpl.h index 69ea4461e149..d936805acdc0 100644 --- a/src/journal/FutureImpl.h +++ b/src/journal/FutureImpl.h @@ -31,7 +31,8 @@ public: }; typedef boost::intrusive_ptr FlushHandlerPtr; - FutureImpl(Finisher &finisher, const std::string &tag, uint64_t tid); + FutureImpl(Finisher &finisher, const std::string &tag, uint64_t tid, + uint64_t commit_tid); void init(const FutureImplPtr &prev_future); @@ -41,6 +42,9 @@ public: inline uint64_t get_tid() const { return m_tid; } + inline uint64_t get_commit_tid() const { + return m_commit_tid; + } void flush(Context *on_safe = NULL); void wait(Context *on_safe); @@ -94,6 +98,7 @@ private: Finisher &m_finisher; std::string m_tag; uint64_t m_tid; + uint64_t m_commit_tid; mutable Mutex m_lock; FutureImplPtr m_prev_future; diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc index 56c0db32e3f8..bd486f5cae90 100644 --- a/src/journal/JournalMetadata.cc +++ b/src/journal/JournalMetadata.cc @@ -7,6 +7,7 @@ #include "common/Finisher.h" #include "common/Timer.h" #include "cls/journal/cls_journal_client.h" +#include #define dout_subsys ceph_subsys_journaler #undef dout_prefix @@ -24,9 +25,10 @@ JournalMetadata::JournalMetadata(librados::IoCtx &ioctx, m_client_id(client_id), m_commit_interval(commit_interval), m_order(0), m_splay_width(0), m_initialized(false), m_finisher(NULL), m_timer(NULL), m_timer_lock("JournalMetadata::m_timer_lock"), - m_lock("JournalMetadata::m_lock"), m_watch_ctx(this), m_watch_handle(0), - m_minimum_set(0), m_active_set(0), m_update_notifications(0), - m_commit_position_pending(false), m_commit_position_ctx(NULL) { + m_lock("JournalMetadata::m_lock"), m_commit_tid(0), m_watch_ctx(this), + m_watch_handle(0), m_minimum_set(0), m_active_set(0), + m_update_notifications(0), m_commit_position_ctx(NULL), + m_commit_position_task_ctx(NULL) { m_ioctx.dup(ioctx); m_cct = reinterpret_cast(m_ioctx.cct()); } @@ -178,6 +180,20 @@ void JournalMetadata::set_active_set(uint64_t object_set) { m_active_set = object_set; } +void JournalMetadata::flush_commit_position() { + { + Mutex::Locker locker(m_lock); + if (m_commit_position_task_ctx == NULL) { + return; + } + + Mutex::Locker timer_locker(m_timer_lock); + m_timer->cancel_event(m_commit_position_task_ctx); + m_commit_position_task_ctx = NULL; + } + handle_commit_position_task(); +} + void JournalMetadata::set_commit_position( const ObjectSetPosition &commit_position, Context *on_safe) { assert(on_safe != NULL); @@ -281,9 +297,9 @@ void JournalMetadata::schedule_commit_task() { assert(m_lock.is_locked()); Mutex::Locker timer_locker(m_timer_lock); - if (!m_commit_position_pending) { - m_commit_position_pending = true; - m_timer->add_event_after(m_commit_interval, new C_CommitPositionTask(this)); + if (m_commit_position_task_ctx == NULL) { + m_commit_position_task_ctx = new C_CommitPositionTask(this); + m_timer->add_event_after(m_commit_interval, m_commit_position_task_ctx); } } @@ -335,6 +351,77 @@ void JournalMetadata::handle_watch_error(int err) { schedule_watch_reset(); } +uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num, + const std::string &tag, + uint64_t tid) { + Mutex::Locker locker(m_lock); + uint64_t commit_tid = ++m_commit_tid; + m_pending_commit_tids[commit_tid] = CommitEntry(object_num, tag, tid); + + ldout(m_cct, 20) << "allocated commit tid: commit_tid=" << commit_tid << " [" + << "object_num=" << object_num << ", " + << "tag=" << tag << ", tid=" << tid << "]" << dendl; + return commit_tid; +} + +bool JournalMetadata::committed(uint64_t commit_tid, + ObjectSetPosition *object_set_position) { + ldout(m_cct, 20) << "committed tid=" << commit_tid << dendl; + + Mutex::Locker locker(m_lock); + { + CommitTids::iterator it = m_pending_commit_tids.find(commit_tid); + assert(it != m_pending_commit_tids.end()); + + CommitEntry &commit_entry = it->second; + commit_entry.committed = true; + } + + if (!m_commit_position.entry_positions.empty()) { + *object_set_position = m_commit_position; + } else { + *object_set_position = m_client.commit_position; + } + + bool update_commit_position = false; + while (!m_pending_commit_tids.empty()) { + CommitTids::iterator it = m_pending_commit_tids.begin(); + CommitEntry &commit_entry = it->second; + if (!commit_entry.committed) { + break; + } + + object_set_position->object_number = commit_entry.object_num; + if (!object_set_position->entry_positions.empty() && + object_set_position->entry_positions.front().tag == commit_entry.tag) { + object_set_position->entry_positions.front() = EntryPosition( + commit_entry.tag, commit_entry.tid); + } else { + object_set_position->entry_positions.push_front(EntryPosition( + commit_entry.tag, commit_entry.tid)); + } + m_pending_commit_tids.erase(it); + update_commit_position = true; + } + + if (update_commit_position) { + // prune the position to have unique tags in commit-order + std::set in_use_tags; + EntryPositions::iterator it = object_set_position->entry_positions.begin(); + while (it != object_set_position->entry_positions.end()) { + if (!in_use_tags.insert(it->tag).second) { + it = object_set_position->entry_positions.erase(it); + } else { + ++it; + } + } + + ldout(m_cct, 20) << "updated object set position: " << *object_set_position + << dendl; + } + return update_commit_position; +} + void JournalMetadata::notify_update() { ldout(m_cct, 10) << "notifying journal header update" << dendl; diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h index 89ddcf1296b0..0a933b019563 100644 --- a/src/journal/JournalMetadata.h +++ b/src/journal/JournalMetadata.h @@ -87,6 +87,7 @@ public: return m_active_set; } + void flush_commit_position(); void set_commit_position(const ObjectSetPosition &commit_position, Context *on_safe); void get_commit_position(ObjectSetPosition *commit_position) const { @@ -106,6 +107,10 @@ public: void reserve_tid(const std::string &tag, uint64_t tid); bool get_last_allocated_tid(const std::string &tag, uint64_t *tid) const; + uint64_t allocate_commit_tid(uint64_t object_num, const std::string &tag, + uint64_t tid); + bool committed(uint64_t commit_tid, ObjectSetPosition *object_set_position); + void notify_update(); void async_notify_update(); @@ -113,6 +118,20 @@ private: typedef std::map AllocatedTids; typedef std::list Listeners; + struct CommitEntry { + uint64_t object_num; + std::string tag; + uint64_t tid; + bool committed; + + CommitEntry() : object_num(0), tid(0), committed(false) { + } + CommitEntry(uint64_t _object_num, const std::string &_tag, uint64_t _tid) + : object_num(_object_num), tag(_tag), tid(_tid), committed(false) { + } + }; + typedef std::map CommitTids; + struct C_WatchCtx : public librados::WatchCtx2 { JournalMetadata *journal_metadata; @@ -186,8 +205,8 @@ private: virtual void finish(int r) { journal_metadata->handle_immutable_metadata(r, on_finish); } - }; + struct C_Refresh : public Context { JournalMetadata* journal_metadata; uint64_t minimum_set; @@ -225,6 +244,9 @@ private: mutable Mutex m_lock; + uint64_t m_commit_tid; + CommitTids m_pending_commit_tids; + Listeners m_listeners; C_WatchCtx m_watch_ctx; @@ -240,9 +262,9 @@ private: size_t m_update_notifications; Cond m_update_cond; - bool m_commit_position_pending; ObjectSetPosition m_commit_position; Context *m_commit_position_ctx; + Context *m_commit_position_task_ctx; AsyncOpTracker m_async_op_tracker; diff --git a/src/journal/JournalPlayer.cc b/src/journal/JournalPlayer.cc index 421a38b121e1..db92590b434b 100644 --- a/src/journal/JournalPlayer.cc +++ b/src/journal/JournalPlayer.cc @@ -3,6 +3,7 @@ #include "journal/JournalPlayer.h" #include "common/Finisher.h" +#include "journal/Entry.h" #include "journal/ReplayHandler.h" #include "journal/Utils.h" @@ -67,8 +68,10 @@ JournalPlayer::JournalPlayer(librados::IoCtx &ioctx, m_commit_object = commit_position.object_number; m_commit_tag = commit_position.entry_positions.front().tag; - for (size_t i=0; ireserve_tid(entry->get_tag(), entry->get_tid()); + *commit_tid = m_journal_metadata->allocate_commit_tid( + object_player->get_object_number(), entry->get_tag(), entry->get_tid()); return true; } diff --git a/src/journal/JournalPlayer.h b/src/journal/JournalPlayer.h index 613c9e305b61..7d4855960d71 100644 --- a/src/journal/JournalPlayer.h +++ b/src/journal/JournalPlayer.h @@ -18,6 +18,7 @@ class SafeTimer; namespace journal { +class Entry; class ReplayHandler; class JournalPlayer { @@ -35,7 +36,7 @@ public: void prefetch_and_watch(double interval); void unwatch(); - bool try_pop_front(Entry *entry, ObjectSetPosition *object_set_position); + bool try_pop_front(Entry *entry, uint64_t *commit_tid); private: typedef std::map AllocatedTids; diff --git a/src/journal/JournalRecorder.cc b/src/journal/JournalRecorder.cc index a921af26c7ef..4fb7765c59f5 100644 --- a/src/journal/JournalRecorder.cc +++ b/src/journal/JournalRecorder.cc @@ -48,8 +48,10 @@ Future JournalRecorder::append(const std::string &tag, uint8_t splay_offset = tid % splay_width; ObjectRecorderPtr object_ptr = get_object(splay_offset); + uint64_t commit_tid = m_journal_metadata->allocate_commit_tid( + object_ptr->get_object_number(), tag, tid); FutureImplPtr future(new FutureImpl(m_journal_metadata->get_finisher(), - tag, tid)); + tag, tid, commit_tid)); future->init(m_prev_future); m_prev_future = future; @@ -60,8 +62,6 @@ Future JournalRecorder::append(const std::string &tag, append_buffers.push_back(std::make_pair(future, entry_bl)); bool object_full = object_ptr->append(append_buffers); - // TODO populate the object_set_position - if (object_full) { ldout(m_cct, 10) << "object " << object_ptr->get_oid() << " now full" << dendl; diff --git a/src/journal/JournalTrimmer.cc b/src/journal/JournalTrimmer.cc index cdb517b9b50f..2af67a829839 100644 --- a/src/journal/JournalTrimmer.cc +++ b/src/journal/JournalTrimmer.cc @@ -25,6 +25,7 @@ JournalTrimmer::JournalTrimmer(librados::IoCtx &ioctx, } JournalTrimmer::~JournalTrimmer() { + m_journal_metadata->flush_commit_position(); m_async_op_tracker.wait_for_ops(); } @@ -53,10 +54,13 @@ int JournalTrimmer::remove_objects() { return ctx.wait(); } -void JournalTrimmer::update_commit_position( - const ObjectSetPosition &object_set_position) { - ldout(m_cct, 20) << __func__ << ": pos=" << object_set_position - << dendl; +void JournalTrimmer::committed(uint64_t commit_tid) { + ldout(m_cct, 20) << __func__ << ": commit_tid=" << commit_tid << dendl; + + ObjectSetPosition object_set_position; + if (!m_journal_metadata->committed(commit_tid, &object_set_position)) { + return; + } { Mutex::Locker locker(m_lock); diff --git a/src/journal/JournalTrimmer.h b/src/journal/JournalTrimmer.h index 1ae994da57dc..937340733d70 100644 --- a/src/journal/JournalTrimmer.h +++ b/src/journal/JournalTrimmer.h @@ -23,7 +23,7 @@ public: ~JournalTrimmer(); int remove_objects(); - void update_commit_position(const ObjectSetPosition &object_set_position); + void committed(uint64_t commit_tid); private: struct C_CommitPositionSafe : public Context { diff --git a/src/journal/Journaler.cc b/src/journal/Journaler.cc index b7ca392fc97f..eaa4ce15d9cf 100644 --- a/src/journal/Journaler.cc +++ b/src/journal/Journaler.cc @@ -10,7 +10,7 @@ #include "journal/JournalPlayer.h" #include "journal/JournalRecorder.h" #include "journal/JournalTrimmer.h" -#include "journal/PayloadImpl.h" +#include "journal/ReplayEntry.h" #include "journal/ReplayHandler.h" #include "cls/journal/cls_journal_client.h" #include "cls/journal/cls_journal_types.h" @@ -131,16 +131,16 @@ void Journaler::start_live_replay(ReplayHandler *replay_handler, m_player->prefetch_and_watch(interval); } -bool Journaler::try_pop_front(Payload *payload) { +bool Journaler::try_pop_front(ReplayEntry *replay_entry) { assert(m_player != NULL); Entry entry; - ObjectSetPosition object_set_position; - if (!m_player->try_pop_front(&entry, &object_set_position)) { + uint64_t commit_tid; + if (!m_player->try_pop_front(&entry, &commit_tid)) { return false; } - *payload = Payload(new PayloadImpl(entry.get_data(), object_set_position)); + *replay_entry = ReplayEntry(entry.get_data(), commit_tid); return true; } @@ -151,9 +151,13 @@ void Journaler::stop_replay() { m_player = NULL; } -void Journaler::update_commit_position(const Payload &payload) { - PayloadImplPtr payload_impl = payload.get_payload_impl(); - m_trimmer->update_commit_position(payload_impl->get_object_set_position()); +void Journaler::committed(const ReplayEntry &replay_entry) { + m_trimmer->committed(replay_entry.get_commit_tid()); +} + +void Journaler::committed(const Future &future) { + FutureImplPtr future_impl = future.get_future_impl(); + m_trimmer->committed(future_impl->get_commit_tid()); } void Journaler::start_append() { diff --git a/src/journal/Journaler.h b/src/journal/Journaler.h index f27038d5cf03..4b4959a87983 100644 --- a/src/journal/Journaler.h +++ b/src/journal/Journaler.h @@ -8,7 +8,6 @@ #include "include/buffer.h" #include "include/rados/librados.hpp" #include "journal/Future.h" -#include "journal/Payload.h" #include #include #include "include/assert.h" @@ -22,6 +21,7 @@ class JournalMetadata; class JournalPlayer; class JournalRecorder; class JournalTrimmer; +class ReplayEntry; class ReplayHandler; class Journaler { @@ -40,16 +40,17 @@ public: void start_replay(ReplayHandler *replay_handler); void start_live_replay(ReplayHandler *replay_handler, double interval); - bool try_pop_front(Payload *payload); + bool try_pop_front(ReplayEntry *replay_entry); void stop_replay(); - void update_commit_position(const Payload &payload); - void start_append(); Future append(const std::string &tag, const bufferlist &bl); void flush(Context *on_safe); void stop_append(Context *on_safe); + void committed(const ReplayEntry &replay_entry); + void committed(const Future &future); + private: librados::IoCtx m_header_ioctx; librados::IoCtx m_data_ioctx; diff --git a/src/journal/Makefile.am b/src/journal/Makefile.am index 99b84b148769..4f0cd18c0146 100644 --- a/src/journal/Makefile.am +++ b/src/journal/Makefile.am @@ -13,8 +13,6 @@ libjournal_la_SOURCES = \ journal/JournalTrimmer.cc \ journal/ObjectPlayer.cc \ journal/ObjectRecorder.cc \ - journal/Payload.cc \ - journal/PayloadImpl.cc \ journal/Utils.cc noinst_LTLIBRARIES += libjournal.la @@ -30,8 +28,7 @@ noinst_HEADERS += \ journal/JournalTrimmer.h \ journal/ObjectPlayer.h \ journal/ObjectRecorder.h \ - journal/Payload.h \ - journal/PayloadImpl.h \ + journal/ReplayEntry.h \ journal/ReplayHandler.h \ journal/Utils.h DENCODER_DEPS += libjournal.la diff --git a/src/journal/Payload.cc b/src/journal/Payload.cc deleted file mode 100644 index 8015528b4b9a..000000000000 --- a/src/journal/Payload.cc +++ /dev/null @@ -1,21 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include "journal/Payload.h" -#include "journal/PayloadImpl.h" - -namespace journal { - -const bufferlist &Payload::get_data() const { - return m_payload_impl->get_data(); -} - -void intrusive_ptr_add_ref(PayloadImpl *p) { - p->get(); -} - -void intrusive_ptr_release(PayloadImpl *p) { - p->put(); -} - -} // namespace journal diff --git a/src/journal/Payload.h b/src/journal/Payload.h deleted file mode 100644 index 70f774af190d..000000000000 --- a/src/journal/Payload.h +++ /dev/null @@ -1,43 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#ifndef CEPH_JOURNAL_PAYLOAD_H -#define CEPH_JOURNAL_PAYLOAD_H - -#include "include/int_types.h" -#include "include/buffer.h" -#include - -namespace journal { - -class PayloadImpl; - -class Payload { -public: - typedef boost::intrusive_ptr PayloadImplPtr; - - Payload() {} - Payload(const PayloadImplPtr &payload) : m_payload_impl(payload) {} - - inline bool is_valid() const { - return m_payload_impl; - } - - const bufferlist &get_data() const; - -private: - friend class Journaler; - - inline PayloadImplPtr get_payload_impl() const { - return m_payload_impl; - } - - PayloadImplPtr m_payload_impl; -}; - -void intrusive_ptr_add_ref(PayloadImpl *p); -void intrusive_ptr_release(PayloadImpl *p); - -} // namespace journal - -#endif // CEPH_JOURNAL_PAYLOAD_H diff --git a/src/journal/PayloadImpl.cc b/src/journal/PayloadImpl.cc deleted file mode 100644 index 6bb134f91f9c..000000000000 --- a/src/journal/PayloadImpl.cc +++ /dev/null @@ -1,22 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#include "journal/PayloadImpl.h" - -namespace journal { - -PayloadImpl::PayloadImpl(const bufferlist &data, - const ObjectSetPosition &object_set_position) - : m_data(data), m_object_set_position(object_set_position) { -} - -const bufferlist &PayloadImpl::get_data() const { - return m_data; -} - -const PayloadImpl::ObjectSetPosition & -PayloadImpl::get_object_set_position() const { - return m_object_set_position; -} - -} // namespace journal diff --git a/src/journal/PayloadImpl.h b/src/journal/PayloadImpl.h deleted file mode 100644 index 9c4d0b596e56..000000000000 --- a/src/journal/PayloadImpl.h +++ /dev/null @@ -1,37 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab - -#ifndef CEPH_JOURNAL_PAYLOAD_IMPL_H -#define CEPH_JOURNAL_PAYLOAD_IMPL_H - -#include "include/int_types.h" -#include "include/buffer.h" -#include "common/RefCountedObj.h" -#include "cls/journal/cls_journal_types.h" -#include -#include -#include "include/assert.h" - -namespace journal { - -class PayloadImpl; -typedef boost::intrusive_ptr PayloadImplPtr; - -class PayloadImpl : public RefCountedObject, boost::noncopyable { -public: - typedef cls::journal::ObjectSetPosition ObjectSetPosition; - - PayloadImpl(const bufferlist &data, - const ObjectSetPosition &object_set_position); - - const bufferlist &get_data() const; - const ObjectSetPosition &get_object_set_position() const; - -private: - bufferlist m_data; - ObjectSetPosition m_object_set_position; -}; - -} // namespace journal - -#endif // CEPH_JOURNAL_PAYLOAD_IMPL_H diff --git a/src/journal/ReplayEntry.h b/src/journal/ReplayEntry.h new file mode 100644 index 000000000000..4dd3ba4758e8 --- /dev/null +++ b/src/journal/ReplayEntry.h @@ -0,0 +1,34 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_JOURNAL_REPLAY_ENTRY_H +#define CEPH_JOURNAL_REPLAY_ENTRY_H + +#include "include/int_types.h" +#include "include/buffer.h" + +namespace journal { + +class ReplayEntry { +public: + ReplayEntry() : m_commit_tid(0) { + } + ReplayEntry(const bufferlist &data, uint64_t commit_tid) + : m_data(data), m_commit_tid(commit_tid) { + } + + inline const bufferlist &get_data() const { + return m_data; + } + inline uint64_t get_commit_tid() const { + return m_commit_tid; + } + +private: + bufferlist m_data; + uint64_t m_commit_tid; +}; + +} // namespace journal + +#endif // CEPH_JOURNAL_REPLAY_ENTRY_H