From: Jason Dillaman Date: Tue, 23 Feb 2016 20:32:26 +0000 (-0500) Subject: journal: update JournalMetadata to support new commit handling X-Git-Tag: v10.1.0~258^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=569738701b4bb5a9d0124df77fa1e7ce2321ad8e;p=ceph.git journal: update JournalMetadata to support new commit handling Signed-off-by: Jason Dillaman --- diff --git a/src/journal/JournalMetadata.cc b/src/journal/JournalMetadata.cc index 99d6871cef40..19bd8b0c33c9 100644 --- a/src/journal/JournalMetadata.cc +++ b/src/journal/JournalMetadata.cc @@ -20,33 +20,6 @@ using namespace cls::journal; namespace { -// does not compare object number -inline bool object_positions_less_equal(const ObjectSetPosition &lhs, - const ObjectSetPosition &rhs) { - if (lhs.object_positions == rhs.object_positions) { - return true; - } - - if (lhs.object_positions.size() != rhs.object_positions.size()) { - return lhs.object_positions.size() < rhs.object_positions.size(); - } - - std::map rhs_tids; - for (ObjectPositions::const_iterator it = rhs.object_positions.begin(); - it != rhs.object_positions.end(); ++it) { - rhs_tids[it->tag_tid] = it->entry_tid; - } - - for (ObjectPositions::const_iterator it = lhs.object_positions.begin(); - it != lhs.object_positions.end(); ++it) { - const ObjectPosition &object_position = *it; - if (object_position.entry_tid < rhs_tids[object_position.tag_tid]) { - return true; - } - } - return false; -} - struct C_AllocateTag : public Context { CephContext *cct; librados::IoCtx &ioctx; @@ -491,34 +464,6 @@ void JournalMetadata::flush_commit_position(Context *on_safe) { handle_commit_position_task(); } -void JournalMetadata::set_commit_position( - const ObjectSetPosition &commit_position, Context *on_safe) { - assert(on_safe != NULL); - - Context *stale_ctx = nullptr; - { - Mutex::Locker timer_locker(m_timer_lock); - Mutex::Locker locker(m_lock); - ldout(m_cct, 20) << __func__ << ": current=" << m_client.commit_position - << ", new=" << commit_position << dendl; - if (object_positions_less_equal(commit_position, m_client.commit_position) || - object_positions_less_equal(commit_position, m_commit_position)) { - stale_ctx = on_safe; - } else { - stale_ctx = m_commit_position_ctx; - - m_client.commit_position = commit_position; - m_commit_position = commit_position; - m_commit_position_ctx = on_safe; - schedule_commit_task(); - } - } - - if (stale_ctx != nullptr) { - stale_ctx->complete(-ESTALE); - } -} - void JournalMetadata::reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid) { Mutex::Locker locker(m_lock); uint64_t &allocated_entry_tid = m_allocated_entry_tids[tag_tid]; @@ -696,58 +641,84 @@ uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num, ldout(m_cct, 20) << "allocated commit tid: commit_tid=" << commit_tid << " [" << "object_num=" << object_num << ", " - << "tag_tid=" << tag_tid << ", entry_tid=" << entry_tid << "]" + << "tag_tid=" << tag_tid << ", " + << "entry_tid=" << entry_tid << "]" << dendl; return commit_tid; } -bool JournalMetadata::committed(uint64_t commit_tid, - ObjectSetPosition *object_set_position) { +void JournalMetadata::committed(uint64_t commit_tid, + const CreateContext &create_context) { ldout(m_cct, 20) << "committed tid=" << commit_tid << dendl; - Mutex::Locker locker(m_lock); + ObjectSetPosition commit_position; + Context *stale_ctx = nullptr; { + Mutex::Locker timer_locker(m_timer_lock); + Mutex::Locker locker(m_lock); + assert(commit_tid > m_commit_position_tid); + + if (!m_commit_position.object_positions.empty()) { + // in-flight commit position update + commit_position = m_commit_position; + } else { + // safe commit position + commit_position = m_client.commit_position; + } + CommitTids::iterator it = m_pending_commit_tids.find(commit_tid); assert(it != m_pending_commit_tids.end()); CommitEntry &commit_entry = it->second; commit_entry.committed = true; - } - if (!m_commit_position.object_positions.empty()) { - *object_set_position = m_commit_position; - } else { - *object_set_position = m_client.commit_position; - } + bool update_commit_position = false; + while (!m_pending_commit_tids.empty()) { + CommitTids::iterator it = m_pending_commit_tids.begin(); + CommitEntry &commit_entry = it->second; + if (!commit_entry.committed) { + break; + } - bool update_commit_position = false; - while (!m_pending_commit_tids.empty()) { - CommitTids::iterator it = m_pending_commit_tids.begin(); - CommitEntry &commit_entry = it->second; - if (!commit_entry.committed) { - break; + commit_position.object_positions.emplace_front( + commit_entry.object_num, commit_entry.tag_tid, + commit_entry.entry_tid); + m_pending_commit_tids.erase(it); + update_commit_position = true; } - // TODO - update_commit_position = true; - } + if (!update_commit_position) { + return; + } - if (update_commit_position) { - // prune the position to have unique tags in commit-order - std::set in_use_tag_tids; - ObjectPositions::iterator it = object_set_position->object_positions.begin(); - while (it != object_set_position->object_positions.end()) { - if (!in_use_tag_tids.insert(it->tag_tid).second) { - it = object_set_position->object_positions.erase(it); + // prune the position to have one position per splay offset + std::set in_use_splay_offsets; + ObjectPositions::iterator ob_it = commit_position.object_positions.begin(); + while (ob_it != commit_position.object_positions.end()) { + uint8_t splay_offset = ob_it->object_number % m_splay_width; + if (!in_use_splay_offsets.insert(splay_offset).second) { + ob_it = commit_position.object_positions.erase(ob_it); } else { - ++it; + ++ob_it; } } - ldout(m_cct, 20) << "updated object set position: " << *object_set_position + stale_ctx = m_commit_position_ctx; + m_commit_position_ctx = create_context(); + m_commit_position = commit_position; + m_commit_position_tid = commit_tid; + + ldout(m_cct, 20) << "updated commit position: " << commit_position << ", " + << "on_safe=" << m_commit_position_ctx << dendl; + schedule_commit_task(); + } + + + if (stale_ctx != nullptr) { + ldout(m_cct, 20) << "canceling stale commit: on_safe=" << stale_ctx << dendl; + stale_ctx->complete(-ESTALE); } - return update_commit_position; } void JournalMetadata::notify_update() { diff --git a/src/journal/JournalMetadata.h b/src/journal/JournalMetadata.h index 049329b409c8..5d21d71556e7 100644 --- a/src/journal/JournalMetadata.h +++ b/src/journal/JournalMetadata.h @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -30,6 +31,7 @@ typedef boost::intrusive_ptr JournalMetadataPtr; class JournalMetadata : public RefCountedObject, boost::noncopyable { public: + typedef std::function CreateContext; typedef cls::journal::ObjectPosition ObjectPosition; typedef cls::journal::ObjectPositions ObjectPositions; typedef cls::journal::ObjectSetPosition ObjectSetPosition; @@ -106,8 +108,6 @@ public: void flush_commit_position(); void flush_commit_position(Context *on_safe); - void set_commit_position(const ObjectSetPosition &commit_position, - Context *on_safe); void get_commit_position(ObjectSetPosition *commit_position) const { Mutex::Locker locker(m_lock); *commit_position = m_client.commit_position; @@ -127,7 +127,7 @@ public: uint64_t allocate_commit_tid(uint64_t object_num, uint64_t tag_tid, uint64_t entry_tid); - bool committed(uint64_t commit_tid, ObjectSetPosition *object_set_position); + void committed(uint64_t commit_tid, const CreateContext &create_context); void notify_update(); void async_notify_update(); @@ -306,6 +306,7 @@ private: size_t m_update_notifications; Cond m_update_cond; + uint64_t m_commit_position_tid = 0; ObjectSetPosition m_commit_position; Context *m_commit_position_ctx; Context *m_commit_position_task_ctx; diff --git a/src/journal/JournalTrimmer.cc b/src/journal/JournalTrimmer.cc index 05a767285167..0998c259fb0c 100644 --- a/src/journal/JournalTrimmer.cc +++ b/src/journal/JournalTrimmer.cc @@ -63,19 +63,8 @@ int JournalTrimmer::remove_objects(bool force) { void JournalTrimmer::committed(uint64_t commit_tid) { ldout(m_cct, 20) << __func__ << ": commit_tid=" << commit_tid << dendl; - - ObjectSetPosition object_set_position; - if (!m_journal_metadata->committed(commit_tid, &object_set_position)) { - return; - } - - { - Mutex::Locker locker(m_lock); - m_async_op_tracker.start_op(); - } - - Context *ctx = new C_CommitPositionSafe(this, object_set_position); - m_journal_metadata->set_commit_position(object_set_position, ctx); + m_journal_metadata->committed(commit_tid, + m_create_commit_position_safe_context); } void JournalTrimmer::trim_objects(uint64_t minimum_set) { @@ -121,10 +110,8 @@ void JournalTrimmer::remove_set(uint64_t object_set) { } } -void JournalTrimmer::handle_commit_position_safe( - int r, const ObjectSetPosition &object_set_position) { - ldout(m_cct, 20) << __func__ << ": r=" << r << ", pos=" - << object_set_position << dendl; +void JournalTrimmer::handle_commit_position_safe(int r) { + ldout(m_cct, 20) << __func__ << ": r=" << r << dendl; Mutex::Locker locker(m_lock); if (r == 0) { diff --git a/src/journal/JournalTrimmer.h b/src/journal/JournalTrimmer.h index 46db1c51bab4..349592b36e42 100644 --- a/src/journal/JournalTrimmer.h +++ b/src/journal/JournalTrimmer.h @@ -11,6 +11,7 @@ #include "journal/AsyncOpTracker.h" #include "journal/JournalMetadata.h" #include "cls/journal/cls_journal_types.h" +#include namespace journal { @@ -26,18 +27,22 @@ public: void committed(uint64_t commit_tid); private: + typedef std::function CreateContext; + struct C_CommitPositionSafe : public Context { JournalTrimmer *journal_trimmer; - ObjectSetPosition object_set_position; - C_CommitPositionSafe(JournalTrimmer *_journal_trimmer, - const ObjectSetPosition &_object_set_position) - : journal_trimmer(_journal_trimmer), - object_set_position(_object_set_position) {} + C_CommitPositionSafe(JournalTrimmer *_journal_trimmer) + : journal_trimmer(_journal_trimmer) { + Mutex::Locker locker(journal_trimmer->m_lock); + journal_trimmer->m_async_op_tracker.start_op(); + } + virtual ~C_CommitPositionSafe() { + journal_trimmer->m_async_op_tracker.finish_op(); + } virtual void finish(int r) { - journal_trimmer->handle_commit_position_safe(r, object_set_position); - journal_trimmer->m_async_op_tracker.finish_op(); + journal_trimmer->handle_commit_position_safe(r); } }; struct C_RemoveSet : public Context { @@ -70,10 +75,14 @@ private: uint64_t m_remove_set; Context *m_remove_set_ctx; + CreateContext m_create_commit_position_safe_context = [this]() { + return new C_CommitPositionSafe(this); + }; + void trim_objects(uint64_t minimum_set); void remove_set(uint64_t object_set); - void handle_commit_position_safe(int r, const ObjectSetPosition &position); + void handle_commit_position_safe(int r); void handle_set_removed(int r, uint64_t object_set); }; diff --git a/src/test/journal/test_JournalMetadata.cc b/src/test/journal/test_JournalMetadata.cc index be9628e2941c..91ed2d7ca19f 100644 --- a/src/test/journal/test_JournalMetadata.cc +++ b/src/test/journal/test_JournalMetadata.cc @@ -18,9 +18,10 @@ public: } journal::JournalMetadataPtr create_metadata(const std::string &oid, - const std::string &client_id) { + const std::string &client_id, + double commit_internal = 0.1) { journal::JournalMetadataPtr metadata(new journal::JournalMetadata( - m_ioctx, oid, client_id, 0.1)); + m_ioctx, oid, client_id, commit_internal)); m_metadata_list.push_back(metadata); metadata->add_listener(&m_listener); return metadata; @@ -50,36 +51,49 @@ TEST_F(TestJournalMetadata, ClientDNE) { ASSERT_EQ(-ENOENT, init_metadata(metadata2)); } -TEST_F(TestJournalMetadata, SetCommitPositions) { +TEST_F(TestJournalMetadata, Committed) { std::string oid = get_temp_oid(); ASSERT_EQ(0, create(oid, 14, 2)); ASSERT_EQ(0, client_register(oid, "client1", "")); - journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1"); + journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1", 600); ASSERT_EQ(0, init_metadata(metadata1)); journal::JournalMetadataPtr metadata2 = create_metadata(oid, "client1"); ASSERT_EQ(0, init_metadata(metadata2)); ASSERT_TRUE(wait_for_update(metadata2)); - journal::JournalMetadata::ObjectSetPosition commit_position; + journal::JournalMetadata::ObjectSetPosition expect_commit_position; journal::JournalMetadata::ObjectSetPosition read_commit_position; metadata1->get_commit_position(&read_commit_position); - ASSERT_EQ(commit_position, read_commit_position); + ASSERT_EQ(expect_commit_position, read_commit_position); - journal::JournalMetadata::ObjectPositions object_positions; - object_positions = { - cls::journal::ObjectPosition(1, 123, 122)}; - commit_position = journal::JournalMetadata::ObjectSetPosition(object_positions); + uint64_t commit_tid1 = metadata1->allocate_commit_tid(0, 0, 0); + uint64_t commit_tid2 = metadata1->allocate_commit_tid(0, 1, 0); + uint64_t commit_tid3 = metadata1->allocate_commit_tid(1, 0, 1); + uint64_t commit_tid4 = metadata1->allocate_commit_tid(0, 0, 2); - C_SaferCond cond; - metadata1->set_commit_position(commit_position, &cond); - ASSERT_EQ(0, cond.wait()); - ASSERT_TRUE(wait_for_update(metadata2)); + // cannot commit until tid1 + 2 committed + metadata1->committed(commit_tid2, []() { return nullptr; }); + metadata1->committed(commit_tid3, []() { return nullptr; }); + + C_SaferCond cond1; + metadata1->committed(commit_tid1, [&cond1]() { return &cond1; }); + // given our 10 minute commit internal, this should override the + // in-flight commit + C_SaferCond cond2; + metadata1->committed(commit_tid4, [&cond2]() { return &cond2; }); + + ASSERT_EQ(-ESTALE, cond1.wait()); + metadata1->flush_commit_position(); + ASSERT_EQ(0, cond2.wait()); + + ASSERT_TRUE(wait_for_update(metadata2)); metadata2->get_commit_position(&read_commit_position); - ASSERT_EQ(commit_position, read_commit_position); + expect_commit_position = {{{0, 0, 2}, {1, 0, 1}}}; + ASSERT_EQ(expect_commit_position, read_commit_position); } TEST_F(TestJournalMetadata, UpdateActiveObject) {