namespace {
-// does not compare object number
-inline bool object_positions_less_equal(const ObjectSetPosition &lhs,
- const ObjectSetPosition &rhs) {
- if (lhs.object_positions == rhs.object_positions) {
- return true;
- }
-
- if (lhs.object_positions.size() != rhs.object_positions.size()) {
- return lhs.object_positions.size() < rhs.object_positions.size();
- }
-
- std::map<uint64_t, uint64_t> rhs_tids;
- for (ObjectPositions::const_iterator it = rhs.object_positions.begin();
- it != rhs.object_positions.end(); ++it) {
- rhs_tids[it->tag_tid] = it->entry_tid;
- }
-
- for (ObjectPositions::const_iterator it = lhs.object_positions.begin();
- it != lhs.object_positions.end(); ++it) {
- const ObjectPosition &object_position = *it;
- if (object_position.entry_tid < rhs_tids[object_position.tag_tid]) {
- return true;
- }
- }
- return false;
-}
-
struct C_AllocateTag : public Context {
CephContext *cct;
librados::IoCtx &ioctx;
handle_commit_position_task();
}
-void JournalMetadata::set_commit_position(
- const ObjectSetPosition &commit_position, Context *on_safe) {
- assert(on_safe != NULL);
-
- Context *stale_ctx = nullptr;
- {
- Mutex::Locker timer_locker(m_timer_lock);
- Mutex::Locker locker(m_lock);
- ldout(m_cct, 20) << __func__ << ": current=" << m_client.commit_position
- << ", new=" << commit_position << dendl;
- if (object_positions_less_equal(commit_position, m_client.commit_position) ||
- object_positions_less_equal(commit_position, m_commit_position)) {
- stale_ctx = on_safe;
- } else {
- stale_ctx = m_commit_position_ctx;
-
- m_client.commit_position = commit_position;
- m_commit_position = commit_position;
- m_commit_position_ctx = on_safe;
- schedule_commit_task();
- }
- }
-
- if (stale_ctx != nullptr) {
- stale_ctx->complete(-ESTALE);
- }
-}
-
void JournalMetadata::reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid) {
Mutex::Locker locker(m_lock);
uint64_t &allocated_entry_tid = m_allocated_entry_tids[tag_tid];
ldout(m_cct, 20) << "allocated commit tid: commit_tid=" << commit_tid << " ["
<< "object_num=" << object_num << ", "
- << "tag_tid=" << tag_tid << ", entry_tid=" << entry_tid << "]"
+ << "tag_tid=" << tag_tid << ", "
+ << "entry_tid=" << entry_tid << "]"
<< dendl;
return commit_tid;
}
-bool JournalMetadata::committed(uint64_t commit_tid,
- ObjectSetPosition *object_set_position) {
+void JournalMetadata::committed(uint64_t commit_tid,
+ const CreateContext &create_context) {
ldout(m_cct, 20) << "committed tid=" << commit_tid << dendl;
- Mutex::Locker locker(m_lock);
+ ObjectSetPosition commit_position;
+ Context *stale_ctx = nullptr;
{
+ Mutex::Locker timer_locker(m_timer_lock);
+ Mutex::Locker locker(m_lock);
+ assert(commit_tid > m_commit_position_tid);
+
+ if (!m_commit_position.object_positions.empty()) {
+ // in-flight commit position update
+ commit_position = m_commit_position;
+ } else {
+ // safe commit position
+ commit_position = m_client.commit_position;
+ }
+
CommitTids::iterator it = m_pending_commit_tids.find(commit_tid);
assert(it != m_pending_commit_tids.end());
CommitEntry &commit_entry = it->second;
commit_entry.committed = true;
- }
- if (!m_commit_position.object_positions.empty()) {
- *object_set_position = m_commit_position;
- } else {
- *object_set_position = m_client.commit_position;
- }
+ bool update_commit_position = false;
+ while (!m_pending_commit_tids.empty()) {
+ CommitTids::iterator it = m_pending_commit_tids.begin();
+ CommitEntry &commit_entry = it->second;
+ if (!commit_entry.committed) {
+ break;
+ }
- bool update_commit_position = false;
- while (!m_pending_commit_tids.empty()) {
- CommitTids::iterator it = m_pending_commit_tids.begin();
- CommitEntry &commit_entry = it->second;
- if (!commit_entry.committed) {
- break;
+ commit_position.object_positions.emplace_front(
+ commit_entry.object_num, commit_entry.tag_tid,
+ commit_entry.entry_tid);
+ m_pending_commit_tids.erase(it);
+ update_commit_position = true;
}
- // TODO
- update_commit_position = true;
- }
+ if (!update_commit_position) {
+ return;
+ }
- if (update_commit_position) {
- // prune the position to have unique tags in commit-order
- std::set<uint64_t> in_use_tag_tids;
- ObjectPositions::iterator it = object_set_position->object_positions.begin();
- while (it != object_set_position->object_positions.end()) {
- if (!in_use_tag_tids.insert(it->tag_tid).second) {
- it = object_set_position->object_positions.erase(it);
+ // prune the position to have one position per splay offset
+ std::set<uint8_t> in_use_splay_offsets;
+ ObjectPositions::iterator ob_it = commit_position.object_positions.begin();
+ while (ob_it != commit_position.object_positions.end()) {
+ uint8_t splay_offset = ob_it->object_number % m_splay_width;
+ if (!in_use_splay_offsets.insert(splay_offset).second) {
+ ob_it = commit_position.object_positions.erase(ob_it);
} else {
- ++it;
+ ++ob_it;
}
}
- ldout(m_cct, 20) << "updated object set position: " << *object_set_position
+ stale_ctx = m_commit_position_ctx;
+ m_commit_position_ctx = create_context();
+ m_commit_position = commit_position;
+ m_commit_position_tid = commit_tid;
+
+ ldout(m_cct, 20) << "updated commit position: " << commit_position << ", "
+ << "on_safe=" << m_commit_position_ctx << dendl;
+ schedule_commit_task();
+ }
+
+
+ if (stale_ctx != nullptr) {
+ ldout(m_cct, 20) << "canceling stale commit: on_safe=" << stale_ctx
<< dendl;
+ stale_ctx->complete(-ESTALE);
}
- return update_commit_position;
}
void JournalMetadata::notify_update() {
#include <boost/intrusive_ptr.hpp>
#include <boost/noncopyable.hpp>
#include <boost/optional.hpp>
+#include <functional>
#include <list>
#include <map>
#include <string>
class JournalMetadata : public RefCountedObject, boost::noncopyable {
public:
+ typedef std::function<Context*()> CreateContext;
typedef cls::journal::ObjectPosition ObjectPosition;
typedef cls::journal::ObjectPositions ObjectPositions;
typedef cls::journal::ObjectSetPosition ObjectSetPosition;
void flush_commit_position();
void flush_commit_position(Context *on_safe);
- void set_commit_position(const ObjectSetPosition &commit_position,
- Context *on_safe);
void get_commit_position(ObjectSetPosition *commit_position) const {
Mutex::Locker locker(m_lock);
*commit_position = m_client.commit_position;
uint64_t allocate_commit_tid(uint64_t object_num, uint64_t tag_tid,
uint64_t entry_tid);
- bool committed(uint64_t commit_tid, ObjectSetPosition *object_set_position);
+ void committed(uint64_t commit_tid, const CreateContext &create_context);
void notify_update();
void async_notify_update();
size_t m_update_notifications;
Cond m_update_cond;
+ uint64_t m_commit_position_tid = 0;
ObjectSetPosition m_commit_position;
Context *m_commit_position_ctx;
Context *m_commit_position_task_ctx;
void JournalTrimmer::committed(uint64_t commit_tid) {
ldout(m_cct, 20) << __func__ << ": commit_tid=" << commit_tid << dendl;
-
- ObjectSetPosition object_set_position;
- if (!m_journal_metadata->committed(commit_tid, &object_set_position)) {
- return;
- }
-
- {
- Mutex::Locker locker(m_lock);
- m_async_op_tracker.start_op();
- }
-
- Context *ctx = new C_CommitPositionSafe(this, object_set_position);
- m_journal_metadata->set_commit_position(object_set_position, ctx);
+ m_journal_metadata->committed(commit_tid,
+ m_create_commit_position_safe_context);
}
void JournalTrimmer::trim_objects(uint64_t minimum_set) {
}
}
-void JournalTrimmer::handle_commit_position_safe(
- int r, const ObjectSetPosition &object_set_position) {
- ldout(m_cct, 20) << __func__ << ": r=" << r << ", pos="
- << object_set_position << dendl;
+void JournalTrimmer::handle_commit_position_safe(int r) {
+ ldout(m_cct, 20) << __func__ << ": r=" << r << dendl;
Mutex::Locker locker(m_lock);
if (r == 0) {
#include "journal/AsyncOpTracker.h"
#include "journal/JournalMetadata.h"
#include "cls/journal/cls_journal_types.h"
+#include <functional>
namespace journal {
void committed(uint64_t commit_tid);
private:
+ typedef std::function<Context*()> CreateContext;
+
struct C_CommitPositionSafe : public Context {
JournalTrimmer *journal_trimmer;
- ObjectSetPosition object_set_position;
- C_CommitPositionSafe(JournalTrimmer *_journal_trimmer,
- const ObjectSetPosition &_object_set_position)
- : journal_trimmer(_journal_trimmer),
- object_set_position(_object_set_position) {}
+ C_CommitPositionSafe(JournalTrimmer *_journal_trimmer)
+ : journal_trimmer(_journal_trimmer) {
+ Mutex::Locker locker(journal_trimmer->m_lock);
+ journal_trimmer->m_async_op_tracker.start_op();
+ }
+ virtual ~C_CommitPositionSafe() {
+ journal_trimmer->m_async_op_tracker.finish_op();
+ }
virtual void finish(int r) {
- journal_trimmer->handle_commit_position_safe(r, object_set_position);
- journal_trimmer->m_async_op_tracker.finish_op();
+ journal_trimmer->handle_commit_position_safe(r);
}
};
struct C_RemoveSet : public Context {
uint64_t m_remove_set;
Context *m_remove_set_ctx;
+ CreateContext m_create_commit_position_safe_context = [this]() {
+ return new C_CommitPositionSafe(this);
+ };
+
void trim_objects(uint64_t minimum_set);
void remove_set(uint64_t object_set);
- void handle_commit_position_safe(int r, const ObjectSetPosition &position);
+ void handle_commit_position_safe(int r);
void handle_set_removed(int r, uint64_t object_set);
};
}
journal::JournalMetadataPtr create_metadata(const std::string &oid,
- const std::string &client_id) {
+ const std::string &client_id,
+ double commit_internal = 0.1) {
journal::JournalMetadataPtr metadata(new journal::JournalMetadata(
- m_ioctx, oid, client_id, 0.1));
+ m_ioctx, oid, client_id, commit_internal));
m_metadata_list.push_back(metadata);
metadata->add_listener(&m_listener);
return metadata;
ASSERT_EQ(-ENOENT, init_metadata(metadata2));
}
-TEST_F(TestJournalMetadata, SetCommitPositions) {
+TEST_F(TestJournalMetadata, Committed) {
std::string oid = get_temp_oid();
ASSERT_EQ(0, create(oid, 14, 2));
ASSERT_EQ(0, client_register(oid, "client1", ""));
- journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1");
+ journal::JournalMetadataPtr metadata1 = create_metadata(oid, "client1", 600);
ASSERT_EQ(0, init_metadata(metadata1));
journal::JournalMetadataPtr metadata2 = create_metadata(oid, "client1");
ASSERT_EQ(0, init_metadata(metadata2));
ASSERT_TRUE(wait_for_update(metadata2));
- journal::JournalMetadata::ObjectSetPosition commit_position;
+ journal::JournalMetadata::ObjectSetPosition expect_commit_position;
journal::JournalMetadata::ObjectSetPosition read_commit_position;
metadata1->get_commit_position(&read_commit_position);
- ASSERT_EQ(commit_position, read_commit_position);
+ ASSERT_EQ(expect_commit_position, read_commit_position);
- journal::JournalMetadata::ObjectPositions object_positions;
- object_positions = {
- cls::journal::ObjectPosition(1, 123, 122)};
- commit_position = journal::JournalMetadata::ObjectSetPosition(object_positions);
+ uint64_t commit_tid1 = metadata1->allocate_commit_tid(0, 0, 0);
+ uint64_t commit_tid2 = metadata1->allocate_commit_tid(0, 1, 0);
+ uint64_t commit_tid3 = metadata1->allocate_commit_tid(1, 0, 1);
+ uint64_t commit_tid4 = metadata1->allocate_commit_tid(0, 0, 2);
- C_SaferCond cond;
- metadata1->set_commit_position(commit_position, &cond);
- ASSERT_EQ(0, cond.wait());
- ASSERT_TRUE(wait_for_update(metadata2));
+ // cannot commit until tid1 + 2 committed
+ metadata1->committed(commit_tid2, []() { return nullptr; });
+ metadata1->committed(commit_tid3, []() { return nullptr; });
+
+ C_SaferCond cond1;
+ metadata1->committed(commit_tid1, [&cond1]() { return &cond1; });
+ // given our 10 minute commit internal, this should override the
+ // in-flight commit
+ C_SaferCond cond2;
+ metadata1->committed(commit_tid4, [&cond2]() { return &cond2; });
+
+ ASSERT_EQ(-ESTALE, cond1.wait());
+ metadata1->flush_commit_position();
+ ASSERT_EQ(0, cond2.wait());
+
+ ASSERT_TRUE(wait_for_update(metadata2));
metadata2->get_commit_position(&read_commit_position);
- ASSERT_EQ(commit_position, read_commit_position);
+ expect_commit_position = {{{0, 0, 2}, {1, 0, 1}}};
+ ASSERT_EQ(expect_commit_position, read_commit_position);
}
TEST_F(TestJournalMetadata, UpdateActiveObject) {