return -EIO;
}
- // cannot expire tags if a client hasn't committed yet
- if (client.commit_position.entry_positions.empty()) {
- return 0;
- }
-
- for (auto entry_position : client.commit_position.entry_positions) {
- minimum_tag_tid = MIN(minimum_tag_tid, entry_position.tag_tid);
+ for (auto object_position : client.commit_position.object_positions) {
+ minimum_tag_tid = MIN(minimum_tag_tid, object_position.tag_tid);
}
}
if (!vals.empty()) {
}
} while (r == MAX_KEYS_READ);
+ // cannot expire tags if a client hasn't committed yet
+ if (minimum_tag_tid == std::numeric_limits<uint64_t>::max()) {
+ return 0;
+ }
+
// compute the minimum in-use tag for each class
std::map<uint64_t, uint64_t> minimum_tag_class_to_tids;
typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
if (r < 0) {
return r;
}
- if (commit_position.entry_positions.size() > splay_width) {
- CLS_ERR("too many entry positions");
+ if (commit_position.object_positions.size() > splay_width) {
+ CLS_ERR("too many object positions");
return -EINVAL;
}
return r;
}
- for (auto entry_position : client.commit_position.entry_positions) {
- minimum_tag_tid = MIN(minimum_tag_tid, entry_position.tag_tid);
+ for (auto object_position : client.commit_position.object_positions) {
+ minimum_tag_tid = MIN(minimum_tag_tid, object_position.tag_tid);
}
// compute minimum tags in use per-class
typedef enum { TAG_PASS_CALCULATE_MINIMUMS,
TAG_PASS_LIST,
TAG_PASS_DONE } TagPass;
- int tag_pass = (client.commit_position.entry_positions.empty() ?
+ int tag_pass = (minimum_tag_tid == std::numeric_limits<uint64_t>::max() ?
TAG_PASS_LIST : TAG_PASS_CALCULATE_MINIMUMS);
std::string last_read = HEADER_KEY_TAG_PREFIX;
do {
namespace cls {
namespace journal {
-void EntryPosition::encode(bufferlist& bl) const {
+void ObjectPosition::encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
+ ::encode(object_number, bl);
::encode(tag_tid, bl);
::encode(entry_tid, bl);
ENCODE_FINISH(bl);
}
-void EntryPosition::decode(bufferlist::iterator& iter) {
+void ObjectPosition::decode(bufferlist::iterator& iter) {
DECODE_START(1, iter);
+ ::decode(object_number, iter);
::decode(tag_tid, iter);
::decode(entry_tid, iter);
DECODE_FINISH(iter);
}
-void EntryPosition::dump(Formatter *f) const {
+void ObjectPosition::dump(Formatter *f) const {
+ f->dump_unsigned("object_number", object_number);
f->dump_unsigned("tag_tid", tag_tid);
f->dump_unsigned("entry_tid", entry_tid);
}
-void EntryPosition::generate_test_instances(std::list<EntryPosition *> &o) {
- o.push_back(new EntryPosition());
- o.push_back(new EntryPosition(1, 2));
+void ObjectPosition::generate_test_instances(std::list<ObjectPosition *> &o) {
+ o.push_back(new ObjectPosition());
+ o.push_back(new ObjectPosition(1, 2, 3));
}
void ObjectSetPosition::encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
- ::encode(object_number, bl);
- ::encode(entry_positions, bl);
+ ::encode(object_positions, bl);
ENCODE_FINISH(bl);
}
void ObjectSetPosition::decode(bufferlist::iterator& iter) {
DECODE_START(1, iter);
- ::decode(object_number, iter);
- ::decode(entry_positions, iter);
+ ::decode(object_positions, iter);
DECODE_FINISH(iter);
}
void ObjectSetPosition::dump(Formatter *f) const {
- f->dump_unsigned("object_number", object_number);
- f->open_array_section("entry_positions");
- for (EntryPositions::const_iterator it = entry_positions.begin();
- it != entry_positions.end(); ++it) {
- f->open_object_section("entry_position");
- it->dump(f);
+ f->open_array_section("object_positions");
+ for (auto &pos : object_positions) {
+ f->open_object_section("object_position");
+ pos.dump(f);
f->close_section();
}
f->close_section();
void ObjectSetPosition::generate_test_instances(
std::list<ObjectSetPosition *> &o) {
o.push_back(new ObjectSetPosition());
- o.push_back(new ObjectSetPosition(1, {{1, 120}, {2, 121}}));
+ o.push_back(new ObjectSetPosition({{0, 1, 120}, {121, 2, 121}}));
}
void Client::encode(bufferlist& bl) const {
o.push_back(new Client());
o.push_back(new Client("id", data));
- o.push_back(new Client("id", data, {1, {{1, 120}, {2, 121}}}));
+ o.push_back(new Client("id", data, {{{1, 2, 120}, {2, 3, 121}}}));
}
void Tag::encode(bufferlist& bl) const {
}
std::ostream &operator<<(std::ostream &os,
- const EntryPosition &entry_position) {
- os << "[tag_tid=" << entry_position.tag_tid << ", entry_tid="
- << entry_position.entry_tid << "]";
+ const ObjectPosition &object_position) {
+ os << "["
+ << "object_number=" << object_position.object_number << ", "
+ << "tag_tid=" << object_position.tag_tid << ", "
+ << "entry_tid=" << object_position.entry_tid << "]";
return os;
}
std::ostream &operator<<(std::ostream &os,
const ObjectSetPosition &object_set_position) {
- os << "[object_number=" << object_set_position.object_number << ", "
- << "positions=[";
+ os << "[positions=[";
std::string delim;
- for (auto &entry_position : object_set_position.entry_positions) {
- os << entry_position << delim;
+ for (auto &object_position : object_set_position.object_positions) {
+ os << object_position << delim;
delim = ", ";
}
os << "]]";
namespace cls {
namespace journal {
-struct EntryPosition {
+struct ObjectPosition {
+ uint64_t object_number;
uint64_t tag_tid;
uint64_t entry_tid;
- EntryPosition() : tag_tid(0), entry_tid(0) {}
- EntryPosition(uint64_t _tag_tid, uint64_t _entry_tid)
- : tag_tid(_tag_tid), entry_tid(_entry_tid) {}
+ ObjectPosition() : object_number(0), tag_tid(0), entry_tid(0) {}
+ ObjectPosition(uint64_t _object_number, uint64_t _tag_tid,
+ uint64_t _entry_tid)
+ : object_number(_object_number), tag_tid(_tag_tid), entry_tid(_entry_tid) {}
- inline bool operator==(const EntryPosition& rhs) const {
- return (tag_tid == rhs.tag_tid && entry_tid == rhs.entry_tid);
+ inline bool operator==(const ObjectPosition& rhs) const {
+ return (object_number == rhs.object_number &&
+ tag_tid == rhs.tag_tid &&
+ entry_tid == rhs.entry_tid);
}
void encode(bufferlist& bl) const;
void decode(bufferlist::iterator& iter);
void dump(Formatter *f) const;
- inline bool operator<(const EntryPosition &rhs) const {
- if (tag_tid != rhs.tag_tid) {
+ inline bool operator<(const ObjectPosition &rhs) const {
+ if (object_number != rhs.object_number) {
+ return object_number < rhs.object_number;
+ } else if (tag_tid != rhs.tag_tid) {
return tag_tid < rhs.tag_tid;
}
return entry_tid < rhs.entry_tid;
}
- static void generate_test_instances(std::list<EntryPosition *> &o);
+ static void generate_test_instances(std::list<ObjectPosition *> &o);
};
-typedef std::list<EntryPosition> EntryPositions;
+typedef std::list<ObjectPosition> ObjectPositions;
struct ObjectSetPosition {
- uint64_t object_number;
- EntryPositions entry_positions;
+ // stored in most-recent -> least recent committed entry order
+ ObjectPositions object_positions;
- ObjectSetPosition() : object_number(0) {}
- ObjectSetPosition(uint64_t _object_number,
- const EntryPositions &_entry_positions)
- : object_number(_object_number), entry_positions(_entry_positions) {}
+ ObjectSetPosition() {}
+ ObjectSetPosition(const ObjectPositions &_object_positions)
+ : object_positions(_object_positions) {}
void encode(bufferlist& bl) const;
void decode(bufferlist::iterator& iter);
void dump(Formatter *f) const;
inline bool operator==(const ObjectSetPosition &rhs) const {
- return (object_number == rhs.object_number &&
- entry_positions == rhs.entry_positions);
+ return (object_positions == rhs.object_positions);
}
static void generate_test_instances(std::list<ObjectSetPosition *> &o);
static void generate_test_instances(std::list<Tag *> &o);
};
-WRITE_CLASS_ENCODER(EntryPosition);
+WRITE_CLASS_ENCODER(ObjectPosition);
WRITE_CLASS_ENCODER(ObjectSetPosition);
WRITE_CLASS_ENCODER(Client);
WRITE_CLASS_ENCODER(Tag);
std::ostream &operator<<(std::ostream &os,
- const EntryPosition &entry_position);
+ const ObjectPosition &object_position);
std::ostream &operator<<(std::ostream &os,
const ObjectSetPosition &object_set_position);
std::ostream &operator<<(std::ostream &os,
namespace {
// does not compare object number
-inline bool entry_positions_less_equal(const ObjectSetPosition &lhs,
+inline bool object_positions_less_equal(const ObjectSetPosition &lhs,
const ObjectSetPosition &rhs) {
- if (lhs.entry_positions == rhs.entry_positions) {
+ if (lhs.object_positions == rhs.object_positions) {
return true;
}
- if (lhs.entry_positions.size() < rhs.entry_positions.size()) {
- return true;
- } else if (lhs.entry_positions.size() > rhs.entry_positions.size()) {
- return false;
+ if (lhs.object_positions.size() != rhs.object_positions.size()) {
+ return lhs.object_positions.size() < rhs.object_positions.size();
}
std::map<uint64_t, uint64_t> rhs_tids;
- for (EntryPositions::const_iterator it = rhs.entry_positions.begin();
- it != rhs.entry_positions.end(); ++it) {
+ for (ObjectPositions::const_iterator it = rhs.object_positions.begin();
+ it != rhs.object_positions.end(); ++it) {
rhs_tids[it->tag_tid] = it->entry_tid;
}
- for (EntryPositions::const_iterator it = lhs.entry_positions.begin();
- it != lhs.entry_positions.end(); ++it) {
- const EntryPosition &entry_position = *it;
- if (entry_position.entry_tid < rhs_tids[entry_position.tag_tid]) {
+ for (ObjectPositions::const_iterator it = lhs.object_positions.begin();
+ it != lhs.object_positions.end(); ++it) {
+ const ObjectPosition &object_position = *it;
+ if (object_position.entry_tid < rhs_tids[object_position.tag_tid]) {
return true;
}
}
Mutex::Locker locker(m_lock);
ldout(m_cct, 20) << __func__ << ": current=" << m_client.commit_position
<< ", new=" << commit_position << dendl;
- if (entry_positions_less_equal(commit_position, m_client.commit_position) ||
- entry_positions_less_equal(commit_position, m_commit_position)) {
+ if (object_positions_less_equal(commit_position, m_client.commit_position) ||
+ object_positions_less_equal(commit_position, m_commit_position)) {
stale_ctx = on_safe;
} else {
stale_ctx = m_commit_position_ctx;
commit_entry.committed = true;
}
- if (!m_commit_position.entry_positions.empty()) {
+ if (!m_commit_position.object_positions.empty()) {
*object_set_position = m_commit_position;
} else {
*object_set_position = m_client.commit_position;
break;
}
- object_set_position->object_number = commit_entry.object_num;
- if (!object_set_position->entry_positions.empty() &&
- object_set_position->entry_positions.front().tag_tid ==
- commit_entry.tag_tid) {
- object_set_position->entry_positions.front() = EntryPosition(
- commit_entry.tag_tid, commit_entry.entry_tid);
- } else {
- object_set_position->entry_positions.push_front(EntryPosition(
- commit_entry.tag_tid, commit_entry.entry_tid));
- }
- m_pending_commit_tids.erase(it);
+ // TODO
update_commit_position = true;
}
if (update_commit_position) {
// prune the position to have unique tags in commit-order
std::set<uint64_t> in_use_tag_tids;
- EntryPositions::iterator it = object_set_position->entry_positions.begin();
- while (it != object_set_position->entry_positions.end()) {
+ ObjectPositions::iterator it = object_set_position->object_positions.begin();
+ while (it != object_set_position->object_positions.end()) {
if (!in_use_tag_tids.insert(it->tag_tid).second) {
- it = object_set_position->entry_positions.erase(it);
+ it = object_set_position->object_positions.erase(it);
} else {
++it;
}
class JournalMetadata : public RefCountedObject, boost::noncopyable {
public:
- typedef cls::journal::EntryPosition EntryPosition;
- typedef cls::journal::EntryPositions EntryPositions;
+ typedef cls::journal::ObjectPosition ObjectPosition;
+ typedef cls::journal::ObjectPositions ObjectPositions;
typedef cls::journal::ObjectSetPosition ObjectSetPosition;
typedef cls::journal::Client Client;
typedef cls::journal::Tag Tag;
ObjectSetPosition commit_position;
m_journal_metadata->get_commit_position(&commit_position);
- if (!commit_position.entry_positions.empty()) {
+
+ // TODO
+ /*
+ if (!commit_position.object_positions.empty()) {
uint8_t splay_width = m_journal_metadata->get_splay_width();
m_splay_offset = commit_position.object_number % splay_width;
m_commit_object = commit_position.object_number;
- m_commit_tag_tid = commit_position.entry_positions.front().tag_tid;
+ m_commit_tag_tid = commit_position.object_positions.front().tag_tid;
- for (EntryPositions::const_iterator it =
- commit_position.entry_positions.begin();
- it != commit_position.entry_positions.end(); ++it) {
- const EntryPosition &entry_position = *it;
- m_commit_tids[entry_position.tag_tid] = entry_position.entry_tid;
+ for (ObjectPositions::const_iterator it =
+ commit_position.object_positions.begin();
+ it != commit_position.object_positions.end(); ++it) {
+ const ObjectPosition &object_position = *it;
+ m_commit_tids[object_position.tag_tid] = object_position.entry_tid;
}
}
+ */
}
JournalPlayer::~JournalPlayer() {
class JournalPlayer {
public:
- typedef cls::journal::EntryPosition EntryPosition;
- typedef cls::journal::EntryPositions EntryPositions;
+ typedef cls::journal::ObjectPosition ObjectPosition;
+ typedef cls::journal::ObjectPositions ObjectPositions;
typedef cls::journal::ObjectSetPosition ObjectSetPosition;
JournalPlayer(librados::IoCtx &ioctx, const std::string &object_oid_prefix,
Mutex::Locker locker(m_lock);
if (r == 0) {
+ // TODO
+ /*
uint8_t splay_width = m_journal_metadata->get_splay_width();
uint64_t object_set = object_set_position.object_number / splay_width;
if (trim_permitted) {
trim_objects(object_set_position.object_number / splay_width);
}
+ */
}
}
ASSERT_EQ(0, client::tag_create(ioctx, oid, 2, 1, bufferlist()));
librados::ObjectWriteOperation op1;
- client::client_commit(&op1, "id1", {1, {{2, 120}}});
+ client::client_commit(&op1, "id1", {{{1, 2, 120}}});
ASSERT_EQ(0, ioctx.operate(oid, &op1));
ASSERT_EQ(0, client::client_unregister(ioctx, oid, "id2"));
ASSERT_EQ(0, client::create(ioctx, oid, 2, 2, ioctx.get_id()));
ASSERT_EQ(0, client::client_register(ioctx, oid, "id1", bufferlist()));
- cls::journal::EntryPositions entry_positions;
- entry_positions = {
- cls::journal::EntryPosition(234, 120),
- cls::journal::EntryPosition(235, 121)};
+ cls::journal::ObjectPositions object_positions;
+ object_positions = {
+ cls::journal::ObjectPosition(0, 234, 120),
+ cls::journal::ObjectPosition(3, 235, 121)};
cls::journal::ObjectSetPosition object_set_position(
- 1, entry_positions);
+ object_positions);
librados::ObjectWriteOperation op2;
client::client_commit(&op2, "id1", object_set_position);
ASSERT_EQ(0, client::create(ioctx, oid, 2, 2, ioctx.get_id()));
ASSERT_EQ(0, client::client_register(ioctx, oid, "id1", bufferlist()));
- cls::journal::EntryPositions entry_positions;
- entry_positions = {
- cls::journal::EntryPosition(234, 120),
- cls::journal::EntryPosition(234, 121),
- cls::journal::EntryPosition(235, 121)};
+ cls::journal::ObjectPositions object_positions;
+ object_positions = {
+ cls::journal::ObjectPosition(0, 234, 120),
+ cls::journal::ObjectPosition(4, 234, 121),
+ cls::journal::ObjectPosition(5, 235, 121)};
cls::journal::ObjectSetPosition object_set_position(
- 1, entry_positions);
+ object_positions);
librados::ObjectWriteOperation op2;
client::client_commit(&op2, "id1", object_set_position);
ASSERT_EQ(0, client::tag_create(ioctx, oid, 2, 1, bufferlist()));
librados::ObjectWriteOperation op1;
- client::client_commit(&op1, "id1", {1, {{2, 120}}});
+ client::client_commit(&op1, "id1", {{{1, 2, 120}}});
ASSERT_EQ(0, ioctx.operate(oid, &op1));
ASSERT_EQ(0, client::tag_create(ioctx, oid, 3, 0, bufferlist()));
TYPE(cls_user_complete_stats_sync_op)
#include "cls/journal/cls_journal_types.h"
-TYPE(cls::journal::EntryPosition)
+TYPE(cls::journal::ObjectPosition)
TYPE(cls::journal::ObjectSetPosition)
TYPE(cls::journal::Client)
metadata1->get_commit_position(&read_commit_position);
ASSERT_EQ(commit_position, read_commit_position);
- journal::JournalMetadata::EntryPositions entry_positions;
- entry_positions = {
- cls::journal::EntryPosition(123, 122)};
- commit_position = journal::JournalMetadata::ObjectSetPosition(1, entry_positions);
+ journal::JournalMetadata::ObjectPositions object_positions;
+ object_positions = {
+ cls::journal::ObjectPosition(1, 123, 122)};
+ commit_position = journal::JournalMetadata::ObjectSetPosition(object_positions);
C_SaferCond cond;
metadata1->set_commit_position(commit_position, &cond);
TEST_F(TestJournalPlayer, Prefetch) {
std::string oid = get_temp_oid();
- journal::JournalPlayer::EntryPositions positions;
+ journal::JournalPlayer::ObjectPositions positions;
positions = {
- cls::journal::EntryPosition(234, 122) };
- cls::journal::ObjectSetPosition commit_position(0, positions);
+ cls::journal::ObjectPosition(0, 234, 122) };
+ cls::journal::ObjectSetPosition commit_position(positions);
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
TEST_F(TestJournalPlayer, PrefetchSkip) {
std::string oid = get_temp_oid();
- journal::JournalPlayer::EntryPositions positions;
+ journal::JournalPlayer::ObjectPositions positions;
positions = {
- cls::journal::EntryPosition(234, 125) };
- cls::journal::ObjectSetPosition commit_position(0, positions);
+ cls::journal::ObjectPosition(0, 234, 125) };
+ cls::journal::ObjectSetPosition commit_position(positions);
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
TEST_F(TestJournalPlayer, PrefetchMultipleTags) {
std::string oid = get_temp_oid();
- journal::JournalPlayer::EntryPositions positions;
+ journal::JournalPlayer::ObjectPositions positions;
positions = {
- cls::journal::EntryPosition(234, 122),
- cls::journal::EntryPosition(345, 1)};
- cls::journal::ObjectSetPosition commit_position(0, positions);
+ cls::journal::ObjectPosition(0, 234, 122),
+ cls::journal::ObjectPosition(1, 345, 1)};
+ cls::journal::ObjectSetPosition commit_position(positions);
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
TEST_F(TestJournalPlayer, PrefetchAndWatch) {
std::string oid = get_temp_oid();
- journal::JournalPlayer::EntryPositions positions;
+ journal::JournalPlayer::ObjectPositions positions;
positions = {
- cls::journal::EntryPosition(234, 122)};
- cls::journal::ObjectSetPosition commit_position(0, positions);
+ cls::journal::ObjectPosition(0, 234, 122)};
+ cls::journal::ObjectSetPosition commit_position(positions);
ASSERT_EQ(0, create(oid));
ASSERT_EQ(0, client_register(oid));
*tid = -1;
return;
}
- cls::journal::EntryPositions entry_positions =
- c->commit_position.entry_positions;
- cls::journal::EntryPositions::const_iterator p;
- for (p = entry_positions.begin(); p != entry_positions.end(); ++p) {
+ cls::journal::ObjectPositions object_positions =
+ c->commit_position.object_positions;
+ cls::journal::ObjectPositions::const_iterator p;
+ for (p = object_positions.begin(); p != object_positions.end(); p++) {
if (p->tag_tid == 0) {
break;
}
}
- *tid = p == entry_positions.end() ? -1 : p->entry_tid;
+ *tid = p == object_positions.end() ? -1 : p->entry_tid;
C_SaferCond open_cond;
ictx->journal = new librbd::Journal<>(*ictx);
ASSERT_EQ(-EINVAL, ictx2->operations->flatten(no_op));
}
-TEST_F(TestJournalReplay, EntryPosition) {
+TEST_F(TestJournalReplay, ObjectPosition) {
REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
librbd::ImageCtx *ictx;
std::set<cls::journal::Client>::const_iterator c;
for (c = registered_clients.begin(); c != registered_clients.end(); c++) {
std::cout << __func__ << ": client: " << *c << std::endl;
- cls::journal::EntryPositions entry_positions =
- c->commit_position.entry_positions;
- cls::journal::EntryPositions::const_iterator p;
- for (p = entry_positions.begin(); p != entry_positions.end(); p++) {
+ cls::journal::ObjectPositions object_positions =
+ c->commit_position.object_positions;
+ cls::journal::ObjectPositions::const_iterator p;
+ for (p = object_positions.begin(); p != object_positions.end(); p++) {
if (c->id == master_client_id) {
ASSERT_EQ(-1, master_tid);
master_tid = p->entry_tid;