#include "cls/journal/cls_journal_types.h"
#include "common/Formatter.h"
-#include <set>
namespace cls {
namespace journal {
void EntryPosition::encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
- ::encode(tag, bl);
- ::encode(tid, bl);
+ ::encode(tag_tid, bl);
+ ::encode(entry_tid, bl);
ENCODE_FINISH(bl);
}
void EntryPosition::decode(bufferlist::iterator& iter) {
DECODE_START(1, iter);
- ::decode(tag, iter);
- ::decode(tid, iter);
+ ::decode(tag_tid, iter);
+ ::decode(entry_tid, iter);
DECODE_FINISH(iter);
}
void EntryPosition::dump(Formatter *f) const {
- f->dump_string("tag", tag);
- f->dump_unsigned("tid", tid);
+ f->dump_unsigned("tag_tid", tag_tid);
+ f->dump_unsigned("entry_tid", entry_tid);
}
void EntryPosition::generate_test_instances(std::list<EntryPosition *> &o) {
o.push_back(new EntryPosition());
- o.push_back(new EntryPosition("id", 2));
-}
-
-bool ObjectSetPosition::operator<(const ObjectSetPosition& rhs) const {
- if (entry_positions.size() < rhs.entry_positions.size()) {
- return true;
- } else if (entry_positions.size() > rhs.entry_positions.size()) {
- return false;
- }
-
- std::map<std::string, uint64_t> rhs_tids;
- for (EntryPositions::const_iterator it = rhs.entry_positions.begin();
- it != rhs.entry_positions.end(); ++it) {
- rhs_tids[it->tag] = it->tid;
- }
-
- for (EntryPositions::const_iterator it = entry_positions.begin();
- it != entry_positions.end(); ++it) {
- const EntryPosition &entry_position = *it;
- if (entry_position.tid < rhs_tids[entry_position.tag]) {
- return true;
- }
- }
- return false;
+ o.push_back(new EntryPosition(1, 2));
}
void ObjectSetPosition::encode(bufferlist& bl) const {
o.push_back(new ObjectSetPosition());
EntryPositions entry_positions;
- entry_positions.push_back(EntryPosition("tag1", 120));
- entry_positions.push_back(EntryPosition("tag2", 121));
+ entry_positions.push_back(EntryPosition(1, 120));
+ entry_positions.push_back(EntryPosition(2, 121));
o.push_back(new ObjectSetPosition(1, entry_positions));
}
o.push_back(new Client("id", "desc"));
EntryPositions entry_positions;
- entry_positions.push_back(EntryPosition("tag1", 120));
- entry_positions.push_back(EntryPosition("tag1", 121));
+ entry_positions.push_back(EntryPosition(1, 120));
+ entry_positions.push_back(EntryPosition(2, 121));
o.push_back(new Client("id", "desc", ObjectSetPosition(1, entry_positions)));
}
std::ostream &operator<<(std::ostream &os,
const EntryPosition &entry_position) {
- os << "[tag=" << entry_position.tag << ", tid="
- << entry_position.tid << "]";
+ os << "[tag_tid=" << entry_position.tag_tid << ", entry_tid="
+ << entry_position.entry_tid << "]";
return os;
}
namespace journal {
struct EntryPosition {
- std::string tag;
- uint64_t tid;
+ uint64_t tag_tid;
+ uint64_t entry_tid;
- EntryPosition() : tid(0) {}
- EntryPosition(const std::string& _tag, uint64_t _tid)
- : tag(_tag), tid(_tid) {}
+ EntryPosition() : tag_tid(0), entry_tid(0) {}
+ EntryPosition(uint64_t _tag_tid, uint64_t _entry_tid)
+ : tag_tid(_tag_tid), entry_tid(_entry_tid) {}
inline bool operator==(const EntryPosition& rhs) const {
- return (tag == rhs.tag && tid == rhs.tid);
+ return (tag_tid == rhs.tag_tid && entry_tid == rhs.entry_tid);
}
void encode(bufferlist& bl) const;
void decode(bufferlist::iterator& iter);
void dump(Formatter *f) const;
+ inline bool operator<(const EntryPosition &rhs) const {
+ if (tag_tid != rhs.tag_tid) {
+ return tag_tid < rhs.tag_tid;
+ }
+ return entry_tid < rhs.entry_tid;
+ }
+
static void generate_test_instances(std::list<EntryPosition *> &o);
};
const EntryPositions &_entry_positions)
: object_number(_object_number), entry_positions(_entry_positions) {}
- bool operator<(const ObjectSetPosition& rhs) const;
- inline bool operator<=(const ObjectSetPosition& rhs) const {
- return (*this == rhs || *this < rhs);
- }
- inline bool operator==(const ObjectSetPosition &rhs) const {
- return (entry_positions == rhs.entry_positions);
- }
-
void encode(bufferlist& bl) const;
void decode(bufferlist::iterator& iter);
void dump(Formatter *f) const;
+ inline bool operator==(const ObjectSetPosition &rhs) const {
+ return (object_number == rhs.object_number &&
+ entry_positions == rhs.entry_positions);
+ }
+
static void generate_test_instances(std::list<ObjectSetPosition *> &o);
};
namespace {
-const uint32_t HEADER_FIXED_SIZE = 17; /// preamble, version, tid
+const uint32_t HEADER_FIXED_SIZE = 25; /// preamble, version, entry tid, tag id
} // anonymous namespace
bufferlist data_bl;
::encode(preamble, data_bl);
::encode(static_cast<uint8_t>(1), data_bl);
- ::encode(m_tid, data_bl);
+ ::encode(m_entry_tid, data_bl);
+ ::encode(m_tag_tid, data_bl);
assert(HEADER_FIXED_SIZE == data_bl.length());
- ::encode(m_tag, data_bl);
::encode(m_data, data_bl);
uint32_t crc = data_bl.crc32c(0);
throw buffer::malformed_input("unknown version: " + stringify(version));
}
- ::decode(m_tid, iter);
- ::decode(m_tag, iter);
+ ::decode(m_entry_tid, iter);
+ ::decode(m_tag_tid, iter);
::decode(m_data, iter);
uint32_t end_offset = iter.get_off();
}
void Entry::dump(Formatter *f) const {
- f->dump_string("tag", m_tag);
- f->dump_unsigned("tid", m_tid);
+ f->dump_unsigned("tag_tid", m_tag_tid);
+ f->dump_unsigned("entry_tid", m_entry_tid);
std::stringstream data;
m_data.hexdump(data);
}
iter.advance(HEADER_FIXED_SIZE - sizeof(bl_preamble));
- if (iter.get_remaining() < sizeof(uint32_t)) {
- *bytes_needed = sizeof(uint32_t) - iter.get_remaining();
- return false;
- }
- uint32_t tag_size;
- ::decode(tag_size, iter);
-
- if (iter.get_remaining() < tag_size) {
- *bytes_needed = tag_size - iter.get_remaining();
- return false;
- }
- iter.advance(tag_size);
-
if (iter.get_remaining() < sizeof(uint32_t)) {
*bytes_needed = sizeof(uint32_t) - iter.get_remaining();
return false;
}
void Entry::generate_test_instances(std::list<Entry *> &o) {
- o.push_back(new Entry("tag1", 123, bufferlist()));
+ o.push_back(new Entry(1, 123, bufferlist()));
bufferlist bl;
bl.append("data");
- o.push_back(new Entry("tag2", 123, bl));
+ o.push_back(new Entry(2, 123, bl));
}
bool Entry::operator==(const Entry& rhs) const {
- return (m_tag == rhs.m_tag && m_tid == rhs.m_tid &&
+ return (m_tag_tid == rhs.m_tag_tid && m_entry_tid == rhs.m_entry_tid &&
const_cast<bufferlist&>(m_data).contents_equal(
const_cast<bufferlist&>(rhs.m_data)));
}
std::ostream &operator<<(std::ostream &os, const Entry &entry) {
- os << "Entry[tag=" << entry.get_tag() << ", tid=" << entry.get_tid() << ", "
+ os << "Entry[tag_tid=" << entry.get_tag_tid() << ", "
+ << "entry_tid=" << entry.get_entry_tid() << ", "
<< "data size=" << entry.get_data().length() << "]";
return os;
}
class Entry {
public:
- Entry() : m_tid() {}
- Entry(const std::string &tag, uint64_t tid, const bufferlist &data)
- : m_tag(tag), m_tid(tid), m_data(data)
+ Entry() : m_tag_tid(0), m_entry_tid() {}
+ Entry(uint64_t tag_tid, uint64_t entry_tid, const bufferlist &data)
+ : m_tag_tid(tag_tid), m_entry_tid(entry_tid), m_data(data)
{
}
- inline const std::string &get_tag() const {
- return m_tag;
+ inline uint64_t get_tag_tid() const {
+ return m_tag_tid;
}
- inline uint64_t get_tid() const {
- return m_tid;
+ inline uint64_t get_entry_tid() const {
+ return m_entry_tid;
}
inline const bufferlist &get_data() const {
return m_data;
private:
static const uint64_t preamble = 0x3141592653589793;
- std::string m_tag;
- uint64_t m_tid;
+ uint64_t m_tag_tid;
+ uint64_t m_entry_tid;
bufferlist m_data;
};
namespace journal {
-FutureImpl::FutureImpl(Finisher &finisher, const std::string &tag, uint64_t tid,
+FutureImpl::FutureImpl(Finisher &finisher, uint64_t tag_tid, uint64_t entry_tid,
uint64_t commit_tid)
- : RefCountedObject(NULL, 0), m_finisher(finisher), m_tag(tag), m_tid(tid),
- m_commit_tid(commit_tid),
+ : RefCountedObject(NULL, 0), m_finisher(finisher), m_tag_tid(tag_tid),
+ m_entry_tid(entry_tid), m_commit_tid(commit_tid),
m_lock(utils::unique_lock_name("FutureImpl::m_lock", this)), m_safe(false),
m_consistent(false), m_return_value(0), m_flush_state(FLUSH_STATE_NONE),
m_consistent_ack(this) {
}
std::ostream &operator<<(std::ostream &os, const FutureImpl &future) {
- os << "Future[tag=" << future.m_tag << ", tid=" << future.m_tid << "]";
+ os << "Future[tag_tid=" << future.m_tag_tid << ", "
+ << "entry_tid=" << future.m_entry_tid << ", "
+ << "commit_tid=" << future.m_commit_tid << "]";
return os;
}
};
typedef boost::intrusive_ptr<FlushHandler> FlushHandlerPtr;
- FutureImpl(Finisher &finisher, const std::string &tag, uint64_t tid,
+ FutureImpl(Finisher &finisher, uint64_t tag_tid, uint64_t entry_tid,
uint64_t commit_tid);
void init(const FutureImplPtr &prev_future);
- inline const std::string &get_tag() const {
- return m_tag;
+ inline uint64_t get_tag_tid() const {
+ return m_tag_tid;
}
- inline uint64_t get_tid() const {
- return m_tid;
+ inline uint64_t get_entry_tid() const {
+ return m_entry_tid;
}
inline uint64_t get_commit_tid() const {
return m_commit_tid;
};
Finisher &m_finisher;
- std::string m_tag;
- uint64_t m_tid;
+ uint64_t m_tag_tid;
+ uint64_t m_entry_tid;
uint64_t m_commit_tid;
mutable Mutex m_lock;
#include "common/Finisher.h"
#include "common/Timer.h"
#include "cls/journal/cls_journal_client.h"
+#include <functional>
#include <set>
#define dout_subsys ceph_subsys_journaler
using namespace cls::journal;
+namespace {
+
+// does not compare object number
+inline bool entry_positions_less_equal(const ObjectSetPosition &lhs,
+ const ObjectSetPosition &rhs) {
+ if (lhs.entry_positions == rhs.entry_positions) {
+ return true;
+ }
+
+ if (lhs.entry_positions.size() < rhs.entry_positions.size()) {
+ return true;
+ } else if (rhs.entry_positions.size() > rhs.entry_positions.size()) {
+ return false;
+ }
+
+ std::map<uint64_t, uint64_t> rhs_tids;
+ for (EntryPositions::const_iterator it = rhs.entry_positions.begin();
+ it != rhs.entry_positions.end(); ++it) {
+ rhs_tids[it->tag_tid] = it->entry_tid;
+ }
+
+ for (EntryPositions::const_iterator it = lhs.entry_positions.begin();
+ it != lhs.entry_positions.end(); ++it) {
+ const EntryPosition &entry_position = *it;
+ if (entry_position.entry_tid < rhs_tids[entry_position.tag_tid]) {
+ return true;
+ }
+ }
+ return false;
+}
+
+} // anonymous namespace
+
JournalMetadata::JournalMetadata(librados::IoCtx &ioctx,
const std::string &oid,
const std::string &client_id,
Mutex::Locker locker(m_lock);
ldout(m_cct, 20) << __func__ << ": current=" << m_client.commit_position
<< ", new=" << commit_position << dendl;
- if (commit_position <= m_client.commit_position ||
- commit_position <= m_commit_position) {
+ if (entry_positions_less_equal(commit_position, m_client.commit_position) ||
+ entry_positions_less_equal(commit_position, m_commit_position)) {
stale_ctx = on_safe;
} else {
stale_ctx = m_commit_position_ctx;
}
}
-void JournalMetadata::reserve_tid(const std::string &tag, uint64_t tid) {
+void JournalMetadata::reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid) {
Mutex::Locker locker(m_lock);
- uint64_t &allocated_tid = m_allocated_tids[tag];
- if (allocated_tid <= tid) {
- allocated_tid = tid + 1;
+ uint64_t &allocated_entry_tid = m_allocated_entry_tids[tag_tid];
+ if (allocated_entry_tid <= entry_tid) {
+ allocated_entry_tid = entry_tid + 1;
}
}
-bool JournalMetadata::get_last_allocated_tid(const std::string &tag,
- uint64_t *tid) const {
+bool JournalMetadata::get_last_allocated_entry_tid(uint64_t tag_tid,
+ uint64_t *entry_tid) const {
Mutex::Locker locker(m_lock);
- AllocatedTids::const_iterator it = m_allocated_tids.find(tag);
- if (it == m_allocated_tids.end()) {
+ AllocatedEntryTids::const_iterator it = m_allocated_entry_tids.find(tag_tid);
+ if (it == m_allocated_entry_tids.end()) {
return false;
}
assert(it->second > 0);
- *tid = it->second - 1;
+ *entry_tid = it->second - 1;
return true;
}
}
uint64_t JournalMetadata::allocate_commit_tid(uint64_t object_num,
- const std::string &tag,
- uint64_t tid) {
+ uint64_t tag_tid,
+ uint64_t entry_tid) {
Mutex::Locker locker(m_lock);
uint64_t commit_tid = ++m_commit_tid;
- m_pending_commit_tids[commit_tid] = CommitEntry(object_num, tag, tid);
+ m_pending_commit_tids[commit_tid] = CommitEntry(object_num, tag_tid,
+ entry_tid);
ldout(m_cct, 20) << "allocated commit tid: commit_tid=" << commit_tid << " ["
<< "object_num=" << object_num << ", "
- << "tag=" << tag << ", tid=" << tid << "]" << dendl;
+ << "tag_tid=" << tag_tid << ", entry_tid=" << entry_tid << "]"
+ << dendl;
return commit_tid;
}
object_set_position->object_number = commit_entry.object_num;
if (!object_set_position->entry_positions.empty() &&
- object_set_position->entry_positions.front().tag == commit_entry.tag) {
+ object_set_position->entry_positions.front().tag_tid ==
+ commit_entry.tag_tid) {
object_set_position->entry_positions.front() = EntryPosition(
- commit_entry.tag, commit_entry.tid);
+ commit_entry.tag_tid, commit_entry.entry_tid);
} else {
object_set_position->entry_positions.push_front(EntryPosition(
- commit_entry.tag, commit_entry.tid));
+ commit_entry.tag_tid, commit_entry.entry_tid));
}
m_pending_commit_tids.erase(it);
update_commit_position = true;
if (update_commit_position) {
// prune the position to have unique tags in commit-order
- std::set<std::string> in_use_tags;
+ std::set<uint64_t> in_use_tag_tids;
EntryPositions::iterator it = object_set_position->entry_positions.begin();
while (it != object_set_position->entry_positions.end()) {
- if (!in_use_tags.insert(it->tag).second) {
+ if (!in_use_tag_tids.insert(it->tag_tid).second) {
it = object_set_position->entry_positions.erase(it);
} else {
++it;
*registered_clients = m_registered_clients;
}
- inline uint64_t allocate_tid(const std::string &tag) {
+ inline uint64_t allocate_entry_tid(uint64_t tag_tid) {
Mutex::Locker locker(m_lock);
- return m_allocated_tids[tag]++;
+ return m_allocated_entry_tids[tag_tid]++;
}
- void reserve_tid(const std::string &tag, uint64_t tid);
- bool get_last_allocated_tid(const std::string &tag, uint64_t *tid) const;
+ void reserve_entry_tid(uint64_t tag_tid, uint64_t entry_tid);
+ bool get_last_allocated_entry_tid(uint64_t tag_tid, uint64_t *entry_tid) const;
- uint64_t allocate_commit_tid(uint64_t object_num, const std::string &tag,
- uint64_t tid);
+ uint64_t allocate_commit_tid(uint64_t object_num, uint64_t tag_tid,
+ uint64_t entry_tid);
bool committed(uint64_t commit_tid, ObjectSetPosition *object_set_position);
void notify_update();
void async_notify_update();
private:
- typedef std::map<std::string, uint64_t> AllocatedTids;
+ typedef std::map<uint64_t, uint64_t> AllocatedEntryTids;
typedef std::list<Listener*> Listeners;
struct CommitEntry {
uint64_t object_num;
- std::string tag;
- uint64_t tid;
+ uint64_t tag_tid;
+ uint64_t entry_tid;
bool committed;
- CommitEntry() : object_num(0), tid(0), committed(false) {
+ CommitEntry() : object_num(0), tag_tid(0), entry_tid(0), committed(false) {
}
- CommitEntry(uint64_t _object_num, const std::string &_tag, uint64_t _tid)
- : object_num(_object_num), tag(_tag), tid(_tid), committed(false) {
+ CommitEntry(uint64_t _object_num, uint64_t _tag_tid, uint64_t _entry_tid)
+ : object_num(_object_num), tag_tid(_tag_tid), entry_tid(_entry_tid),
+ committed(false) {
}
};
typedef std::map<uint64_t, CommitEntry> CommitTids;
RegisteredClients m_registered_clients;
Client m_client;
- AllocatedTids m_allocated_tids;
+ AllocatedEntryTids m_allocated_entry_tids;
size_t m_update_notifications;
Cond m_update_cond;
m_journal_metadata(journal_metadata), m_replay_handler(replay_handler),
m_lock("JournalPlayer::m_lock"), m_state(STATE_INIT), m_splay_offset(0),
m_watch_enabled(false), m_watch_scheduled(false), m_watch_interval(0),
- m_commit_object(0) {
+ m_commit_object(0), m_commit_tag_tid(0) {
m_replay_handler->get();
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
uint8_t splay_width = m_journal_metadata->get_splay_width();
m_splay_offset = commit_position.object_number % splay_width;
m_commit_object = commit_position.object_number;
- m_commit_tag = commit_position.entry_positions.front().tag;
+ m_commit_tag_tid = commit_position.entry_positions.front().tag_tid;
for (EntryPositions::const_iterator it =
commit_position.entry_positions.begin();
it != commit_position.entry_positions.end(); ++it) {
const EntryPosition &entry_position = *it;
- m_commit_tids[entry_position.tag] = entry_position.tid;
+ m_commit_tids[entry_position.tag_tid] = entry_position.entry_tid;
}
}
}
object_player->front(entry);
object_player->pop_front();
- uint64_t last_tid;
- if (m_journal_metadata->get_last_allocated_tid(entry->get_tag(), &last_tid) &&
- entry->get_tid() != last_tid + 1) {
+ uint64_t last_entry_tid;
+ if (m_journal_metadata->get_last_allocated_entry_tid(
+ entry->get_tag_tid(), &last_entry_tid) &&
+ entry->get_entry_tid() != last_entry_tid + 1) {
lderr(m_cct) << "missing prior journal entry: " << *entry << dendl;
m_state = STATE_ERROR;
if (!object_player->empty()) {
Entry peek_entry;
object_player->front(&peek_entry);
- if (peek_entry.get_tag() == entry->get_tag() ||
- (m_journal_metadata->get_last_allocated_tid(peek_entry.get_tag(),
- &last_tid) &&
- last_tid + 1 != peek_entry.get_tid())) {
+ if (peek_entry.get_tag_tid() == entry->get_tag_tid() ||
+ (m_journal_metadata->get_last_allocated_entry_tid(
+ peek_entry.get_tag_tid(), &last_entry_tid) &&
+ last_entry_tid + 1 != peek_entry.get_entry_tid())) {
advance_splay_object();
}
} else {
remove_empty_object_player(object_player);
}
- m_journal_metadata->reserve_tid(entry->get_tag(), entry->get_tid());
+ m_journal_metadata->reserve_entry_tid(entry->get_tag_tid(),
+ entry->get_entry_tid());
*commit_tid = m_journal_metadata->allocate_commit_tid(
- object_player->get_object_number(), entry->get_tag(), entry->get_tid());
+ object_player->get_object_number(), entry->get_tag_tid(),
+ entry->get_entry_tid());
return true;
}
Entry entry;
while (!m_commit_tids.empty() && !object_player->empty()) {
object_player->front(&entry);
- if (entry.get_tid() > m_commit_tids[entry.get_tag()]) {
+ if (entry.get_entry_tid() > m_commit_tids[entry.get_tag_tid()]) {
ldout(m_cct, 10) << "located next uncommitted entry: " << entry
<< dendl;
break;
}
ldout(m_cct, 20) << "skipping committed entry: " << entry << dendl;
- m_journal_metadata->reserve_tid(entry.get_tag(), entry.get_tid());
+ m_journal_metadata->reserve_entry_tid(entry.get_tag_tid(),
+ entry.get_entry_tid());
object_player->pop_front();
}
} else {
Entry entry;
object_player->front(&entry);
- if (entry.get_tag() == m_commit_tag) {
+ if (entry.get_tag_tid() == m_commit_tag_tid) {
advance_splay_object();
}
}
private:
typedef std::set<uint8_t> PrefetchSplayOffsets;
- typedef std::map<std::string, uint64_t> AllocatedTids;
+ typedef std::map<uint64_t, uint64_t> AllocatedEntryTids;
typedef std::map<uint64_t, ObjectPlayerPtr> ObjectPlayers;
typedef std::map<uint8_t, ObjectPlayers> SplayedObjectPlayers;
PrefetchSplayOffsets m_prefetch_splay_offsets;
SplayedObjectPlayers m_object_players;
uint64_t m_commit_object;
- std::string m_commit_tag;
- AllocatedTids m_commit_tids;
+ uint64_t m_commit_tag_tid;
+ AllocatedEntryTids m_commit_tids;
void advance_splay_object();
m_journal_metadata->remove_listener(&m_listener);
}
-Future JournalRecorder::append(const std::string &tag,
+Future JournalRecorder::append(uint64_t tag_tid,
const bufferlist &payload_bl) {
Mutex::Locker locker(m_lock);
- uint64_t tid = m_journal_metadata->allocate_tid(tag);
+ uint64_t entry_tid = m_journal_metadata->allocate_entry_tid(tag_tid);
uint8_t splay_width = m_journal_metadata->get_splay_width();
- uint8_t splay_offset = tid % splay_width;
+ uint8_t splay_offset = entry_tid % splay_width;
ObjectRecorderPtr object_ptr = get_object(splay_offset);
uint64_t commit_tid = m_journal_metadata->allocate_commit_tid(
- object_ptr->get_object_number(), tag, tid);
+ object_ptr->get_object_number(), tag_tid, entry_tid);
FutureImplPtr future(new FutureImpl(m_journal_metadata->get_finisher(),
- tag, tid, commit_tid));
+ tag_tid, entry_tid, commit_tid));
future->init(m_prev_future);
m_prev_future = future;
bufferlist entry_bl;
- ::encode(Entry(future->get_tag(), future->get_tid(), payload_bl), entry_bl);
+ ::encode(Entry(future->get_tag_tid(), future->get_entry_tid(), payload_bl),
+ entry_bl);
AppendBuffers append_buffers;
append_buffers.push_back(std::make_pair(future, entry_bl));
double flush_age);
~JournalRecorder();
- Future append(const std::string &tag, const bufferlist &bl);
+ Future append(uint64_t tag_tid, const bufferlist &bl);
void flush(Context *on_safe);
ObjectRecorderPtr get_object(uint8_t splay_offset);
}
bool Journaler::try_pop_front(ReplayEntry *replay_entry,
- std::string* tag) {
+ uint64_t *tag_tid) {
assert(m_player != NULL);
Entry entry;
}
*replay_entry = ReplayEntry(entry.get_data(), commit_tid);
- if (tag != NULL) {
- *tag = entry.get_tag();
+ if (tag_tid != nullptr) {
+ *tag_tid = entry.get_tag_tid();
}
return true;
}
m_recorder = NULL;
}
-Future Journaler::append(const std::string &tag, const bufferlist &payload_bl) {
- return m_recorder->append(tag, payload_bl);
+Future Journaler::append(uint64_t tag_tid, const bufferlist &payload_bl) {
+ return m_recorder->append(tag_tid, payload_bl);
}
void Journaler::flush(Context *on_safe) {
void start_replay(ReplayHandler *replay_handler);
void start_live_replay(ReplayHandler *replay_handler, double interval);
- bool try_pop_front(ReplayEntry *replay_entry, std::string* tag = NULL);
+ bool try_pop_front(ReplayEntry *replay_entry, uint64_t *tag_tid = nullptr);
void stop_replay();
void start_append(int flush_interval, uint64_t flush_bytes, double flush_age);
- Future append(const std::string &tag, const bufferlist &bl);
+ Future append(uint64_t tag_tid, const bufferlist &bl);
void flush(Context *on_safe);
void stop_append(Context *on_safe);
::decode(entry, iter);
ldout(m_cct, 20) << ": " << entry << " decoded" << dendl;
- EntryKey entry_key(std::make_pair(entry.get_tag(), entry.get_tid()));
+ EntryKey entry_key(std::make_pair(entry.get_tag_tid(),
+ entry.get_entry_tid()));
if (m_entry_keys.find(entry_key) == m_entry_keys.end()) {
m_entry_keys[entry_key] = m_entries.insert(m_entries.end(), entry);
} else {
}
private:
- typedef std::pair<std::string, uint64_t> EntryKey;
+ typedef std::pair<uint64_t, uint64_t> EntryKey;
typedef boost::unordered_map<EntryKey, Entries::iterator> EntryKeys;
struct C_Fetch : public Context {
tid = ++m_event_tid;
assert(tid != 0);
- future = m_journaler->append("", bl);
+ // TODO: use allocated tag_id
+ future = m_journaler->append(0, bl);
m_events[tid] = Event(future, aio_comp, requests, offset, length);
}
{
Mutex::Locker locker(m_lock);
assert(m_state == STATE_READY);
- future = m_journaler->append("", bl);
+
+ // TODO: use allocated tag_id
+ future = m_journaler->append(0, bl);
}
on_safe = create_async_context_callback(m_image_ctx, on_safe);
{
Mutex::Locker locker(m_lock);
assert(m_state == STATE_READY);
- future = m_journaler->append("", bl);
+
+ // TODO: use allocated tag_id
+ future = m_journaler->append(0, bl);
}
future.flush(new C_OpEventSafe(this, op_tid, future, nullptr));
cls::journal::EntryPositions entry_positions;
entry_positions = {
- cls::journal::EntryPosition("tag1", 120),
- cls::journal::EntryPosition("tag2", 121)};
+ cls::journal::EntryPosition(234, 120),
+ cls::journal::EntryPosition(235, 121)};
cls::journal::ObjectSetPosition object_set_position(
1, entry_positions);
cls::journal::EntryPositions entry_positions;
entry_positions = {
- cls::journal::EntryPosition("tag1", 120),
- cls::journal::EntryPosition("tag1", 121),
- cls::journal::EntryPosition("tag2", 121)};
+ cls::journal::EntryPosition(234, 120),
+ cls::journal::EntryPosition(234, 121),
+ cls::journal::EntryPosition(235, 121)};
cls::journal::ObjectSetPosition object_set_position(
1, entry_positions);
TEST_F(TestEntry, DefaultConstructor) {
journal::Entry entry;
- ASSERT_EQ(0U, entry.get_tid());
- ASSERT_EQ("", entry.get_tag());
+ ASSERT_EQ(0U, entry.get_entry_tid());
+ ASSERT_EQ(0U, entry.get_tag_tid());
bufferlist data(entry.get_data());
bufferlist expected_data;
TEST_F(TestEntry, Constructor) {
bufferlist data;
data.append("data");
- journal::Entry entry("tag", 123, data);
+ journal::Entry entry(234, 123, data);
data.clear();
data = entry.get_data();
bufferlist expected_data;
expected_data.append("data");
- ASSERT_EQ(123U, entry.get_tid());
- ASSERT_EQ("tag", entry.get_tag());
+ ASSERT_EQ(123U, entry.get_entry_tid());
+ ASSERT_EQ(234U, entry.get_tag_tid());
ASSERT_TRUE(data.contents_equal(expected_data));
}
TEST_F(TestEntry, IsReadable) {
bufferlist data;
data.append("data");
- journal::Entry entry("tag", 123, data);
+ journal::Entry entry(234, 123, data);
bufferlist full_bl;
::encode(entry, full_bl);
TEST_F(TestEntry, IsReadableBadPreamble) {
bufferlist data;
data.append("data");
- journal::Entry entry("tag", 123, data);
+ journal::Entry entry(234, 123, data);
uint64_t stray_bytes = 0x1122334455667788;
bufferlist full_bl;
TEST_F(TestEntry, IsReadableBadCRC) {
bufferlist data;
data.append("data");
- journal::Entry entry("tag", 123, data);
+ journal::Entry entry(234, 123, data);
bufferlist full_bl;
::encode(entry, full_bl);
m_finisher->start();
}
- journal::FutureImplPtr create_future(const std::string &tag, uint64_t tid,
+ journal::FutureImplPtr create_future(uint64_t tag_tid, uint64_t entry_tid,
uint64_t commit_tid,
const journal::FutureImplPtr &prev =
journal::FutureImplPtr()) {
journal::FutureImplPtr future(new journal::FutureImpl(*m_finisher,
- tag, tid,
+ tag_tid, entry_tid,
commit_tid));
future->init(prev);
return future;
};
TEST_F(TestFutureImpl, Getters) {
- journal::FutureImplPtr future = create_future("tag", 123, 456);
- ASSERT_EQ("tag", future->get_tag());
- ASSERT_EQ(123U, future->get_tid());
+ journal::FutureImplPtr future = create_future(234, 123, 456);
+ ASSERT_EQ(234U, future->get_tag_tid());
+ ASSERT_EQ(123U, future->get_entry_tid());
ASSERT_EQ(456U, future->get_commit_tid());
}
TEST_F(TestFutureImpl, Attach) {
- journal::FutureImplPtr future = create_future("tag", 123, 456);
+ journal::FutureImplPtr future = create_future(234, 123, 456);
ASSERT_FALSE(future->attach(&m_flush_handler));
ASSERT_EQ(1U, m_flush_handler.refs);
}
TEST_F(TestFutureImpl, AttachWithPendingFlush) {
- journal::FutureImplPtr future = create_future("tag", 123, 456);
+ journal::FutureImplPtr future = create_future(234, 123, 456);
future->flush(NULL);
ASSERT_TRUE(future->attach(&m_flush_handler));
}
TEST_F(TestFutureImpl, Detach) {
- journal::FutureImplPtr future = create_future("tag", 123, 456);
+ journal::FutureImplPtr future = create_future(234, 123, 456);
ASSERT_FALSE(future->attach(&m_flush_handler));
future->detach();
ASSERT_EQ(0U, m_flush_handler.refs);
}
TEST_F(TestFutureImpl, DetachImplicit) {
- journal::FutureImplPtr future = create_future("tag", 123, 456);
+ journal::FutureImplPtr future = create_future(234, 123, 456);
ASSERT_FALSE(future->attach(&m_flush_handler));
future.reset();
ASSERT_EQ(0U, m_flush_handler.refs);
}
TEST_F(TestFutureImpl, Flush) {
- journal::FutureImplPtr future = create_future("tag", 123, 456);
+ journal::FutureImplPtr future = create_future(234, 123, 456);
ASSERT_FALSE(future->attach(&m_flush_handler));
C_SaferCond cond;
}
TEST_F(TestFutureImpl, FlushWithoutContext) {
- journal::FutureImplPtr future = create_future("tag", 123, 456);
+ journal::FutureImplPtr future = create_future(234, 123, 456);
ASSERT_FALSE(future->attach(&m_flush_handler));
future->flush(NULL);
}
TEST_F(TestFutureImpl, FlushChain) {
- journal::FutureImplPtr future1 = create_future("tag1", 123, 456);
- journal::FutureImplPtr future2 = create_future("tag1", 124, 457, future1);
- journal::FutureImplPtr future3 = create_future("tag2", 1, 458, future2);
+ journal::FutureImplPtr future1 = create_future(234, 123, 456);
+ journal::FutureImplPtr future2 = create_future(234, 124, 457, future1);
+ journal::FutureImplPtr future3 = create_future(235, 1, 458, future2);
ASSERT_FALSE(future1->attach(&m_flush_handler));
ASSERT_FALSE(future2->attach(&m_flush_handler));
ASSERT_FALSE(future3->attach(&m_flush_handler));
}
TEST_F(TestFutureImpl, FlushInProgress) {
- journal::FutureImplPtr future1 = create_future("tag1", 123, 456);
- journal::FutureImplPtr future2 = create_future("tag1", 124, 457, future1);
+ journal::FutureImplPtr future1 = create_future(234, 123, 456);
+ journal::FutureImplPtr future2 = create_future(234, 124, 457, future1);
ASSERT_FALSE(future1->attach(&m_flush_handler));
ASSERT_FALSE(future2->attach(&m_flush_handler));
}
TEST_F(TestFutureImpl, FlushAlreadyComplete) {
- journal::FutureImplPtr future = create_future("tag1", 123, 456);
+ journal::FutureImplPtr future = create_future(234, 123, 456);
future->safe(-EIO);
C_SaferCond cond;
}
TEST_F(TestFutureImpl, Wait) {
- journal::FutureImplPtr future = create_future("tag", 1, 456);
+ journal::FutureImplPtr future = create_future(234, 1, 456);
C_SaferCond cond;
future->wait(&cond);
}
TEST_F(TestFutureImpl, WaitAlreadyComplete) {
- journal::FutureImplPtr future = create_future("tag", 1, 456);
+ journal::FutureImplPtr future = create_future(234, 1, 456);
future->safe(-EEXIST);
C_SaferCond cond;
}
TEST_F(TestFutureImpl, SafePreservesError) {
- journal::FutureImplPtr future1 = create_future("tag1", 123, 456);
- journal::FutureImplPtr future2 = create_future("tag1", 124, 457, future1);
+ journal::FutureImplPtr future1 = create_future(234, 123, 456);
+ journal::FutureImplPtr future2 = create_future(234, 124, 457, future1);
future1->safe(-EIO);
future2->safe(-EEXIST);
}
TEST_F(TestFutureImpl, ConsistentPreservesError) {
- journal::FutureImplPtr future1 = create_future("tag1", 123, 456);
- journal::FutureImplPtr future2 = create_future("tag1", 124, 457, future1);
+ journal::FutureImplPtr future1 = create_future(234, 123, 456);
+ journal::FutureImplPtr future2 = create_future(234, 124, 457, future1);
future2->safe(-EEXIST);
future1->safe(-EIO);
journal::JournalMetadata::EntryPositions entry_positions;
entry_positions = {
- cls::journal::EntryPosition("tag1", 122)};
+ cls::journal::EntryPosition(123, 122)};
commit_position = journal::JournalMetadata::ObjectSetPosition(1, entry_positions);
C_SaferCond cond;
virtual void get() {}
virtual void put() {}
- virtual bool filter_entry(const std::string &tag) {
- return false;
- }
-
virtual void handle_entries_available() {
Mutex::Locker locker(lock);
entries_available = true;
return RadosTestFixture::client_commit(oid, "client", position);
}
- journal::Entry create_entry(const std::string &tag, uint64_t tid) {
+ journal::Entry create_entry(uint64_t tag_tid, uint64_t entry_tid) {
bufferlist payload_bl;
payload_bl.append("playload");
- return journal::Entry(tag, tid, payload_bl);
+ return journal::Entry(tag_tid, entry_tid, payload_bl);
}
journal::JournalMetadataPtr create_metadata(const std::string &oid) {
}
int write_entry(const std::string &oid, uint64_t object_num,
- const std::string &tag, uint64_t tid) {
+ uint64_t tag_tid, uint64_t entry_tid) {
bufferlist bl;
- ::encode(create_entry(tag, tid), bl);
+ ::encode(create_entry(tag_tid, entry_tid), bl);
return append(oid + "." + stringify(object_num), bl);
}
journal::JournalPlayer::EntryPositions positions;
positions = {
- cls::journal::EntryPosition("tag1", 122) };
+ cls::journal::EntryPosition(234, 122) };
cls::journal::ObjectSetPosition commit_position(0, positions);
ASSERT_EQ(0, create(oid));
journal::JournalPlayer *player = create_player(oid, metadata);
- ASSERT_EQ(0, write_entry(oid, 0, "tag1", 122));
- ASSERT_EQ(0, write_entry(oid, 1, "tag1", 123));
- ASSERT_EQ(0, write_entry(oid, 0, "tag1", 124));
- ASSERT_EQ(0, write_entry(oid, 1, "tag1", 125));
+ ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
+ ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
+ ASSERT_EQ(0, write_entry(oid, 0, 234, 124));
+ ASSERT_EQ(0, write_entry(oid, 1, 234, 125));
player->prefetch();
Entries expected_entries;
expected_entries = {
- create_entry("tag1", 123),
- create_entry("tag1", 124),
- create_entry("tag1", 125)};
+ create_entry(234, 123),
+ create_entry(234, 124),
+ create_entry(234, 125)};
ASSERT_EQ(expected_entries, entries);
uint64_t last_tid;
- ASSERT_TRUE(metadata->get_last_allocated_tid("tag1", &last_tid));
+ ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
ASSERT_EQ(125U, last_tid);
}
journal::JournalPlayer::EntryPositions positions;
positions = {
- cls::journal::EntryPosition("tag1", 125) };
+ cls::journal::EntryPosition(234, 125) };
cls::journal::ObjectSetPosition commit_position(0, positions);
ASSERT_EQ(0, create(oid));
journal::JournalPlayer *player = create_player(oid, metadata);
- ASSERT_EQ(0, write_entry(oid, 0, "tag1", 122));
- ASSERT_EQ(0, write_entry(oid, 1, "tag1", 123));
- ASSERT_EQ(0, write_entry(oid, 0, "tag1", 124));
- ASSERT_EQ(0, write_entry(oid, 1, "tag1", 125));
+ ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
+ ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
+ ASSERT_EQ(0, write_entry(oid, 0, 234, 124));
+ ASSERT_EQ(0, write_entry(oid, 1, 234, 125));
player->prefetch();
ASSERT_TRUE(wait_for_complete(player));
uint64_t last_tid;
- ASSERT_TRUE(metadata->get_last_allocated_tid("tag1", &last_tid));
+ ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
ASSERT_EQ(125U, last_tid);
}
journal::JournalPlayer *player = create_player(oid, metadata);
- ASSERT_EQ(0, write_entry(oid, 0, "tag1", 122));
- ASSERT_EQ(0, write_entry(oid, 1, "tag1", 123));
+ ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
+ ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
player->prefetch();
Entries expected_entries;
expected_entries = {
- create_entry("tag1", 122),
- create_entry("tag1", 123)};
+ create_entry(234, 122),
+ create_entry(234, 123)};
ASSERT_EQ(expected_entries, entries);
}
journal::JournalPlayer::EntryPositions positions;
positions = {
- cls::journal::EntryPosition("tag1", 122),
- cls::journal::EntryPosition("tag2", 1)};
+ cls::journal::EntryPosition(234, 122),
+ cls::journal::EntryPosition(345, 1)};
cls::journal::ObjectSetPosition commit_position(0, positions);
ASSERT_EQ(0, create(oid));
journal::JournalPlayer *player = create_player(oid, metadata);
- ASSERT_EQ(0, write_entry(oid, 0, "tag1", 120));
- ASSERT_EQ(0, write_entry(oid, 0, "tag2", 0));
- ASSERT_EQ(0, write_entry(oid, 1, "tag1", 121));
- ASSERT_EQ(0, write_entry(oid, 1, "tag2", 1));
- ASSERT_EQ(0, write_entry(oid, 0, "tag1", 122));
- ASSERT_EQ(0, write_entry(oid, 1, "tag1", 123));
- ASSERT_EQ(0, write_entry(oid, 0, "tag1", 124));
- ASSERT_EQ(0, write_entry(oid, 0, "tag2", 2));
+ ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
+ ASSERT_EQ(0, write_entry(oid, 0, 345, 0));
+ ASSERT_EQ(0, write_entry(oid, 1, 234, 121));
+ ASSERT_EQ(0, write_entry(oid, 1, 345, 1));
+ ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
+ ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
+ ASSERT_EQ(0, write_entry(oid, 0, 234, 124));
+ ASSERT_EQ(0, write_entry(oid, 0, 345, 2));
player->prefetch();
ASSERT_TRUE(wait_for_complete(player));
uint64_t last_tid;
- ASSERT_TRUE(metadata->get_last_allocated_tid("tag1", &last_tid));
+ ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
ASSERT_EQ(124U, last_tid);
- ASSERT_TRUE(metadata->get_last_allocated_tid("tag2", &last_tid));
+ ASSERT_TRUE(metadata->get_last_allocated_entry_tid(345, &last_tid));
ASSERT_EQ(2U, last_tid);
}
journal::JournalPlayer *player = create_player(oid, metadata);
- ASSERT_EQ(0, write_entry(oid, 0, "tag1", 120));
- ASSERT_EQ(0, write_entry(oid, 0, "tag2", 0));
- ASSERT_EQ(0, write_entry(oid, 1, "tag1", 121));
- ASSERT_EQ(0, write_entry(oid, 0, "tag1", 124));
+ ASSERT_EQ(0, write_entry(oid, 0, 234, 120));
+ ASSERT_EQ(0, write_entry(oid, 0, 345, 0));
+ ASSERT_EQ(0, write_entry(oid, 1, 234, 121));
+ ASSERT_EQ(0, write_entry(oid, 0, 234, 124));
player->prefetch();
Entries entries;
journal::JournalPlayer::EntryPositions positions;
positions = {
- cls::journal::EntryPosition("tag1", 122)};
+ cls::journal::EntryPosition(234, 122)};
cls::journal::ObjectSetPosition commit_position(0, positions);
ASSERT_EQ(0, create(oid));
journal::JournalPlayer *player = create_player(oid, metadata);
- ASSERT_EQ(0, write_entry(oid, 0, "tag1", 122));
+ ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
player->prefetch_and_watch(0.25);
Entries entries;
- ASSERT_EQ(0, write_entry(oid, 1, "tag1", 123));
+ ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
ASSERT_TRUE(wait_for_entries(player, 1, &entries));
Entries expected_entries;
- expected_entries = {create_entry("tag1", 123)};
+ expected_entries = {create_entry(234, 123)};
ASSERT_EQ(expected_entries, entries);
- ASSERT_EQ(0, write_entry(oid, 0, "tag1", 124));
+ ASSERT_EQ(0, write_entry(oid, 0, 234, 124));
ASSERT_TRUE(wait_for_entries(player, 1, &entries));
- expected_entries = {create_entry("tag1", 124)};
+ expected_entries = {create_entry(234, 124)};
ASSERT_EQ(expected_entries, entries);
}
journal::JournalPlayer *player = create_player(oid, metadata);
- ASSERT_EQ(0, write_entry(oid, 0, "tag1", 122));
- ASSERT_EQ(0, write_entry(oid, 1, "tag1", 123));
- ASSERT_EQ(0, write_entry(oid, 5, "tag1", 124));
- ASSERT_EQ(0, write_entry(oid, 6, "tag1", 125));
- ASSERT_EQ(0, write_entry(oid, 7, "tag1", 126));
+ ASSERT_EQ(0, write_entry(oid, 0, 234, 122));
+ ASSERT_EQ(0, write_entry(oid, 1, 234, 123));
+ ASSERT_EQ(0, write_entry(oid, 5, 234, 124));
+ ASSERT_EQ(0, write_entry(oid, 6, 234, 125));
+ ASSERT_EQ(0, write_entry(oid, 7, 234, 126));
player->prefetch();
Entries expected_entries;
expected_entries = {
- create_entry("tag1", 122),
- create_entry("tag1", 123),
- create_entry("tag1", 124),
- create_entry("tag1", 125),
- create_entry("tag1", 126)};
+ create_entry(234, 122),
+ create_entry(234, 123),
+ create_entry(234, 124),
+ create_entry(234, 125),
+ create_entry(234, 126)};
ASSERT_EQ(expected_entries, entries);
uint64_t last_tid;
- ASSERT_TRUE(metadata->get_last_allocated_tid("tag1", &last_tid));
+ ASSERT_TRUE(metadata->get_last_allocated_entry_tid(234, &last_tid));
ASSERT_EQ(126U, last_tid);
}
journal::JournalRecorder *recorder = create_recorder(oid, metadata);
- journal::Future future1 = recorder->append("tag1", create_payload("payload"));
+ journal::Future future1 = recorder->append(123, create_payload("payload"));
C_SaferCond cond;
future1.flush(&cond);
journal::JournalRecorder *recorder = create_recorder(oid, metadata);
- recorder->append("tag1", create_payload(std::string(1 << 12, '1')));
- journal::Future future2 = recorder->append("tag1", create_payload(std::string(1, '2')));
+ recorder->append(123, create_payload(std::string(1 << 12, '1')));
+ journal::Future future2 = recorder->append(123, create_payload(std::string(1, '2')));
C_SaferCond cond;
future2.flush(&cond);
journal::JournalRecorder *recorder1 = create_recorder(oid, metadata);
journal::JournalRecorder *recorder2 = create_recorder(oid, metadata);
- recorder1->append("tag1", create_payload(std::string(1, '1')));
- recorder2->append("tag2", create_payload(std::string(1 << 12, '2')));
+ recorder1->append(123, create_payload(std::string(1, '1')));
+ recorder2->append(234, create_payload(std::string(1 << 12, '2')));
- journal::Future future = recorder2->append("tag1", create_payload(std::string(1, '3')));
+ journal::Future future = recorder2->append(123, create_payload(std::string(1, '3')));
C_SaferCond cond;
future.flush(&cond);
journal::JournalRecorder *recorder = create_recorder(oid, metadata);
- journal::Future future1 = recorder->append("tag1", create_payload("payload1"));
- journal::Future future2 = recorder->append("tag1", create_payload("payload2"));
+ journal::Future future1 = recorder->append(123, create_payload("payload1"));
+ journal::Future future2 = recorder->append(123, create_payload("payload2"));
C_SaferCond cond;
future2.flush(&cond);
journal::JournalRecorder *recorder = create_recorder(oid, metadata);
- journal::Future future1 = recorder->append("tag1", create_payload("payload1"));
- journal::Future future2 = recorder->append("tag1", create_payload("payload2"));
+ journal::Future future1 = recorder->append(123, create_payload("payload1"));
+ journal::Future future2 = recorder->append(123, create_payload("payload2"));
C_SaferCond cond1;
recorder->flush(&cond1);
const std::string &oid, uint64_t object_num,
const std::string &payload, uint64_t *commit_tid) {
int r = append(oid + "." + stringify(object_num), create_payload(payload));
- uint64_t tid = metadata->allocate_commit_tid(object_num, "tag", 123);
+ uint64_t tid = metadata->allocate_commit_tid(object_num, 234, 123);
if (commit_tid != NULL) {
*commit_tid = tid;
}
TEST_F(TestObjectPlayer, Fetch) {
std::string oid = get_temp_oid();
- journal::Entry entry1("tag1", 123, create_payload(std::string(24, '1')));
- journal::Entry entry2("tag1", 124, create_payload(std::string(24, '1')));
+ journal::Entry entry1(234, 123, create_payload(std::string(24, '1')));
+ journal::Entry entry2(234, 124, create_payload(std::string(24, '1')));
bufferlist bl;
::encode(entry1, bl);
TEST_F(TestObjectPlayer, FetchLarge) {
std::string oid = get_temp_oid();
- journal::Entry entry1("tag1", 123,
+ journal::Entry entry1(234, 123,
create_payload(std::string(8192 - 33, '1')));
- journal::Entry entry2("tag1", 124, create_payload(""));
+ journal::Entry entry2(234, 124, create_payload(""));
bufferlist bl;
::encode(entry1, bl);
TEST_F(TestObjectPlayer, FetchDeDup) {
std::string oid = get_temp_oid();
- journal::Entry entry1("tag1", 123, create_payload(std::string(24, '1')));
- journal::Entry entry2("tag1", 123, create_payload(std::string(24, '2')));
+ journal::Entry entry1(234, 123, create_payload(std::string(24, '1')));
+ journal::Entry entry2(234, 123, create_payload(std::string(24, '2')));
bufferlist bl;
::encode(entry1, bl);
TEST_F(TestObjectPlayer, FetchCorrupt) {
std::string oid = get_temp_oid();
- journal::Entry entry1("tag1", 123, create_payload(std::string(24, '1')));
- journal::Entry entry2("tag1", 124, create_payload(std::string(24, '2')));
+ journal::Entry entry1(234, 123, create_payload(std::string(24, '1')));
+ journal::Entry entry2(234, 124, create_payload(std::string(24, '2')));
bufferlist bl;
::encode(entry1, bl);
TEST_F(TestObjectPlayer, FetchAppend) {
std::string oid = get_temp_oid();
- journal::Entry entry1("tag1", 123, create_payload(std::string(24, '1')));
- journal::Entry entry2("tag1", 124, create_payload(std::string(24, '2')));
+ journal::Entry entry1(234, 123, create_payload(std::string(24, '1')));
+ journal::Entry entry2(234, 124, create_payload(std::string(24, '2')));
bufferlist bl;
::encode(entry1, bl);
TEST_F(TestObjectPlayer, PopEntry) {
std::string oid = get_temp_oid();
- journal::Entry entry1("tag1", 123, create_payload(std::string(24, '1')));
- journal::Entry entry2("tag1", 124, create_payload(std::string(24, '1')));
+ journal::Entry entry1(234, 123, create_payload(std::string(24, '1')));
+ journal::Entry entry2(234, 124, create_payload(std::string(24, '1')));
bufferlist bl;
::encode(entry1, bl);
C_SaferCond cond1;
object->watch(&cond1, 0.1);
- journal::Entry entry1("tag1", 123, create_payload(std::string(24, '1')));
- journal::Entry entry2("tag1", 124, create_payload(std::string(24, '1')));
+ journal::Entry entry1(234, 123, create_payload(std::string(24, '1')));
+ journal::Entry entry2(234, 124, create_payload(std::string(24, '1')));
bufferlist bl;
::encode(entry1, bl);
m_flush_age = i;
}
- journal::AppendBuffer create_append_buffer(const std::string &tag,
- uint64_t tid,
+ journal::AppendBuffer create_append_buffer(uint64_t tag_tid, uint64_t entry_tid,
const std::string &payload) {
journal::FutureImplPtr future(new journal::FutureImpl(*m_finisher,
- tag, tid, 456));
+ tag_tid, entry_tid, 456));
future->init(journal::FutureImplPtr());
bufferlist bl;
journal::ObjectRecorderPtr object = create_object(oid, 24);
- journal::AppendBuffer append_buffer1 = create_append_buffer("tag", 123,
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
ASSERT_FALSE(object->append(append_buffers));
ASSERT_EQ(1U, object->get_pending_appends());
- journal::AppendBuffer append_buffer2 = create_append_buffer("tag", 124,
+ journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
ASSERT_FALSE(object->append(append_buffers));
set_flush_interval(2);
journal::ObjectRecorderPtr object = create_object(oid, 24);
- journal::AppendBuffer append_buffer1 = create_append_buffer("tag", 123,
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
ASSERT_FALSE(object->append(append_buffers));
ASSERT_EQ(1U, object->get_pending_appends());
- journal::AppendBuffer append_buffer2 = create_append_buffer("tag", 124,
+ journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
ASSERT_FALSE(object->append(append_buffers));
set_flush_bytes(10);
journal::ObjectRecorderPtr object = create_object(oid, 24);
- journal::AppendBuffer append_buffer1 = create_append_buffer("tag", 123,
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
ASSERT_FALSE(object->append(append_buffers));
ASSERT_EQ(1U, object->get_pending_appends());
- journal::AppendBuffer append_buffer2 = create_append_buffer("tag", 124,
+ journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
ASSERT_FALSE(object->append(append_buffers));
set_flush_age(0.1);
journal::ObjectRecorderPtr object = create_object(oid, 24);
- journal::AppendBuffer append_buffer1 = create_append_buffer("tag", 123,
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
ASSERT_FALSE(object->append(append_buffers));
- journal::AppendBuffer append_buffer2 = create_append_buffer("tag", 124,
+ journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
"payload");
append_buffers = {append_buffer2};
ASSERT_FALSE(object->append(append_buffers));
journal::ObjectRecorderPtr object = create_object(oid, 12);
std::string payload(2048, '1');
- journal::AppendBuffer append_buffer1 = create_append_buffer("tag", 123,
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
payload);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
ASSERT_FALSE(object->append(append_buffers));
- journal::AppendBuffer append_buffer2 = create_append_buffer("tag", 124,
+ journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
payload);
append_buffers = {append_buffer2};
ASSERT_TRUE(object->append(append_buffers));
journal::ObjectRecorderPtr object = create_object(oid, 24);
- journal::AppendBuffer append_buffer1 = create_append_buffer("tag", 123,
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1};
journal::ObjectRecorderPtr object = create_object(oid, 24);
- journal::AppendBuffer append_buffer = create_append_buffer("tag", 123,
+ journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer};
journal::ObjectRecorderPtr object = create_object(oid, 24);
- journal::AppendBuffer append_buffer = create_append_buffer("tag", 123,
+ journal::AppendBuffer append_buffer = create_append_buffer(234, 123,
"payload");
journal::AppendBuffers append_buffers;
journal::ObjectRecorderPtr object2 = create_object(oid, 12);
std::string payload(2048, '1');
- journal::AppendBuffer append_buffer1 = create_append_buffer("tag", 123,
+ journal::AppendBuffer append_buffer1 = create_append_buffer(234, 123,
payload);
- journal::AppendBuffer append_buffer2 = create_append_buffer("tag", 124,
+ journal::AppendBuffer append_buffer2 = create_append_buffer(234, 124,
payload);
journal::AppendBuffers append_buffers;
append_buffers = {append_buffer1, append_buffer2};
ASSERT_EQ(0, cond.wait());
ASSERT_EQ(0U, object1->get_pending_appends());
- journal::AppendBuffer append_buffer3 = create_append_buffer("bar", 123,
+ journal::AppendBuffer append_buffer3 = create_append_buffer(456, 123,
payload);
append_buffers = {append_buffer3};
MOCK_METHOD3(start_append, void(int flush_interval, uint64_t flush_bytes,
double flush_age));
- MOCK_METHOD2(append, MockFutureProxy(const std::string &tag,
+ MOCK_METHOD2(append, MockFutureProxy(uint64_t tag_id,
const bufferlist &bl));
MOCK_METHOD1(flush, void(Context *on_safe));
MOCK_METHOD1(stop_append, void(Context *on_safe));
flush_age);
}
- MockFutureProxy append(const std::string &tag, const bufferlist &bl) {
- return MockJournaler::get_instance().append(tag, bl);
+ MockFutureProxy append(uint64_t tag_id, const bufferlist &bl) {
+ return MockJournaler::get_instance().append(tag_id, bl);
}
void flush(Context *on_safe) {
int r = 0;
while (true) {
::journal::ReplayEntry replay_entry;
- std::string tag;
- if (!m_journaler.try_pop_front(&replay_entry, &tag)) {
+ uint64_t tag_id;
+ if (!m_journaler.try_pop_front(&replay_entry, &tag_id)) {
break;
}
- r = process_entry(replay_entry, tag);
+ r = process_entry(replay_entry, tag_id);
if (r < 0) {
break;
}
}
virtual int process_entry(::journal::ReplayEntry replay_entry,
- std::string& tag) = 0;
+ uint64_t tag_id) = 0;
void handle_replay_complete(int r) {
m_journaler.stop_replay();
};
int process_entry(::journal::ReplayEntry replay_entry,
- std::string& tag) {
+ uint64_t tag_id) {
m_s.total++;
if (m_verbose) {
- std::cout << "Entry: tag=" << tag << ", commit_tid="
+ std::cout << "Entry: tag_id=" << tag_id << ", commit_tid="
<< replay_entry.get_commit_tid() << std::endl;
}
bufferlist data = replay_entry.get_data();
}
struct ExportEntry {
- std::string tag;
+ uint64_t tag_id;
uint64_t commit_tid;
int type;
bufferlist entry;
- ExportEntry() : tag(), commit_tid(0), type(0), entry() {}
+ ExportEntry() : tag_id(0), commit_tid(0), type(0), entry() {}
- ExportEntry(const std::string& tag, uint64_t commit_tid, int type,
+ ExportEntry(uint64_t tag_id, uint64_t commit_tid, int type,
const bufferlist& entry)
- : tag(tag), commit_tid(commit_tid), type(type), entry(entry) {
+ : tag_id(tag_id), commit_tid(commit_tid), type(type), entry(entry) {
}
void dump(Formatter *f) const {
- ::encode_json("tag", tag, f);
+ ::encode_json("tag_id", tag_id, f);
::encode_json("commit_tid", commit_tid, f);
::encode_json("type", type, f);
::encode_json("entry", entry, f);
}
void decode_json(JSONObj *obj) {
- JSONDecoder::decode_json("tag", tag, obj);
+ JSONDecoder::decode_json("tag_id", tag_id, obj);
JSONDecoder::decode_json("commit_tid", commit_tid, obj);
JSONDecoder::decode_json("type", type, obj);
JSONDecoder::decode_json("entry", entry, obj);
};
int process_entry(::journal::ReplayEntry replay_entry,
- std::string& tag) {
+ uint64_t tag_id) {
m_s.total++;
int type = -1;
bufferlist entry = replay_entry.get_data();
} else {
type = event_entry.get_event_type();
}
- ExportEntry export_entry(tag, replay_entry.get_commit_tid(), type, entry);
+ ExportEntry export_entry(tag_id, replay_entry.get_commit_tid(), type,
+ entry);
JSONFormatter f;
::encode_json("event_entry", export_entry, &f);
std::ostringstream oss;
librbd::journal::EventEntry event_entry;
r = inspect_entry(e.entry, event_entry, m_verbose);
if (r < 0) {
- std::cerr << "rbd: corrupted entry " << n << ": tag=" << e.tag
+ std::cerr << "rbd: corrupted entry " << n << ": tag_tid=" << e.tag_id
<< ", commit_tid=" << e.commit_tid << std::endl;
if (m_no_error) {
r1 = r;
break;
}
}
- m_journaler.append(e.tag, e.entry);
+ m_journaler.append(e.tag_id, e.entry);
error_count--;
}