ObjectPlayer::ObjectPlayer(librados::IoCtx &ioctx,
const std::string &object_oid_prefix,
uint64_t object_num, SafeTimer &timer,
- Mutex &timer_lock, uint8_t order)
+ Mutex &timer_lock, uint8_t order,
+ uint64_t max_fetch_bytes)
: RefCountedObject(NULL, 0), m_object_num(object_num),
m_oid(utils::get_object_name(object_oid_prefix, m_object_num)),
m_cct(NULL), m_timer(timer), m_timer_lock(timer_lock), m_order(order),
+ m_max_fetch_bytes(max_fetch_bytes > 0 ? max_fetch_bytes : 2 << order),
m_watch_interval(0), m_watch_task(NULL),
m_lock(utils::unique_lock_name("ObjectPlayer::m_lock", this)),
- m_fetch_in_progress(false), m_read_off(0) {
+ m_fetch_in_progress(false) {
m_ioctx.dup(ioctx);
m_cct = reinterpret_cast<CephContext*>(m_ioctx.cct());
}
ldout(m_cct, 10) << __func__ << ": " << m_oid << dendl;
Mutex::Locker locker(m_lock);
+ assert(!m_fetch_in_progress);
m_fetch_in_progress = true;
C_Fetch *context = new C_Fetch(this, on_finish);
librados::ObjectReadOperation op;
- op.read(m_read_off, 2 << m_order, &context->read_bl, NULL);
+ op.read(m_read_off, m_max_fetch_bytes, &context->read_bl, NULL);
op.set_op_flags2(CEPH_OSD_OP_FLAG_FADVISE_DONTNEED);
librados::AioCompletion *rados_completion =
void ObjectPlayer::pop_front() {
Mutex::Locker locker(m_lock);
assert(!m_entries.empty());
+
+ auto &entry = m_entries.front();
+ m_entry_keys.erase({entry.get_tag_tid(), entry.get_entry_tid()});
m_entries.pop_front();
}
-int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl) {
+int ObjectPlayer::handle_fetch_complete(int r, const bufferlist &bl,
+ bool *refetch) {
ldout(m_cct, 10) << __func__ << ": " << m_oid << ", r=" << r << ", len="
<< bl.length() << dendl;
- m_fetch_in_progress = false;
+ *refetch = false;
if (r < 0) {
return r;
}
}
Mutex::Locker locker(m_lock);
+ assert(m_fetch_in_progress);
+ m_read_off += bl.length();
m_read_bl.append(bl);
+ bool full_fetch = (m_max_fetch_bytes == 2U << m_order);
+ bool partial_entry = false;
bool invalid = false;
uint32_t invalid_start_off = 0;
- bufferlist::iterator iter(&m_read_bl, m_read_off);
+ clear_invalid_range(m_read_bl_off, m_read_bl.length());
+ bufferlist::iterator iter(&m_read_bl, 0);
while (!iter.end()) {
uint32_t bytes_needed;
+ uint32_t bl_off = iter.get_off();
if (!Entry::is_readable(iter, &bytes_needed)) {
if (bytes_needed != 0) {
- invalid_start_off = iter.get_off();
+ invalid_start_off = m_read_bl_off + bl_off;
invalid = true;
- lderr(m_cct) << ": partial record at offset " << iter.get_off()
- << dendl;
+ partial_entry = true;
+ if (full_fetch) {
+ lderr(m_cct) << ": partial record at offset " << invalid_start_off
+ << dendl;
+ } else {
+ ldout(m_cct, 20) << ": partial record detected, will re-fetch"
+ << dendl;
+ }
break;
}
if (!invalid) {
- invalid_start_off = iter.get_off();
+ invalid_start_off = m_read_bl_off + bl_off;
invalid = true;
lderr(m_cct) << ": detected corrupt journal entry at offset "
<< invalid_start_off << dendl;
continue;
}
+ Entry entry;
+ ::decode(entry, iter);
+ ldout(m_cct, 20) << ": " << entry << " decoded" << dendl;
+
+ uint32_t entry_len = iter.get_off() - bl_off;
if (invalid) {
- uint32_t invalid_end_off = iter.get_off();
+ // new corrupt region detected
+ uint32_t invalid_end_off = m_read_bl_off + bl_off;
lderr(m_cct) << ": corruption range [" << invalid_start_off
<< ", " << invalid_end_off << ")" << dendl;
- m_invalid_ranges.insert(invalid_start_off, invalid_end_off);
+ m_invalid_ranges.insert(invalid_start_off,
+ invalid_end_off - invalid_start_off);
invalid = false;
}
- Entry entry;
- ::decode(entry, iter);
- ldout(m_cct, 20) << ": " << entry << " decoded" << dendl;
-
EntryKey entry_key(std::make_pair(entry.get_tag_tid(),
entry.get_entry_tid()));
if (m_entry_keys.find(entry_key) == m_entry_keys.end()) {
ldout(m_cct, 10) << ": " << entry << " is duplicate, replacing" << dendl;
*m_entry_keys[entry_key] = entry;
}
+
+ // prune decoded / corrupted journal entries from front of bl
+ bufferlist sub_bl;
+ sub_bl.substr_of(m_read_bl, iter.get_off(),
+ m_read_bl.length() - iter.get_off());
+ sub_bl.swap(m_read_bl);
+ iter = bufferlist::iterator(&m_read_bl, 0);
+
+ // advance the decoded entry offset
+ m_read_bl_off += entry_len;
}
- m_read_off = m_read_bl.length();
if (invalid) {
- uint32_t invalid_end_off = m_read_bl.length();
- lderr(m_cct) << ": corruption range [" << invalid_start_off
- << ", " << invalid_end_off << ")" << dendl;
- m_invalid_ranges.insert(invalid_start_off, invalid_end_off);
+ uint32_t invalid_end_off = m_read_bl_off + m_read_bl.length();
+ if (!partial_entry) {
+ lderr(m_cct) << ": corruption range [" << invalid_start_off
+ << ", " << invalid_end_off << ")" << dendl;
+ }
+ m_invalid_ranges.insert(invalid_start_off,
+ invalid_end_off - invalid_start_off);
}
- if (!m_invalid_ranges.empty()) {
- r = -EBADMSG;
+ if (!m_invalid_ranges.empty() && !partial_entry) {
+ return -EBADMSG;
+ } else if (partial_entry && (full_fetch || m_entries.empty())) {
+ *refetch = true;
+ return -EAGAIN;
+ }
+
+ return 0;
+}
+
+void ObjectPlayer::clear_invalid_range(uint32_t off, uint32_t len) {
+ // possibly remove previously partial record region
+ InvalidRanges decode_range;
+ decode_range.insert(off, len);
+ InvalidRanges intersect_range;
+ intersect_range.intersection_of(m_invalid_ranges, decode_range);
+ if (!intersect_range.empty()) {
+ ldout(m_cct, 20) << ": clearing invalid range: " << intersect_range
+ << dendl;
+ m_invalid_ranges.subtract(intersect_range);
}
- return r;
}
void ObjectPlayer::schedule_watch() {
}
void ObjectPlayer::C_Fetch::finish(int r) {
- r = object_player->handle_fetch_complete(r, read_bl);
- object_player.reset();
+ bool refetch = false;
+ r = object_player->handle_fetch_complete(r, read_bl, &refetch);
+ {
+ Mutex::Locker locker(object_player->m_lock);
+ object_player->m_fetch_in_progress = false;
+ }
+
+ if (refetch) {
+ object_player->fetch(on_finish);
+ return;
+ }
+
+ object_player.reset();
on_finish->complete(r);
}
#include "test/librados/test.h"
#include "test/journal/RadosTestFixture.h"
+template <typename T>
class TestObjectPlayer : public RadosTestFixture {
public:
+ static const uint32_t max_fetch_bytes = T::max_fetch_bytes;
+
journal::ObjectPlayerPtr create_object(const std::string &oid,
uint8_t order) {
journal::ObjectPlayerPtr object(new journal::ObjectPlayer(
- m_ioctx, oid + ".", 0, *m_timer, m_timer_lock, order));
+ m_ioctx, oid + ".", 0, *m_timer, m_timer_lock, order,
+ max_fetch_bytes));
return object;
}
+ int fetch(journal::ObjectPlayerPtr object_player) {
+ while (true) {
+ C_SaferCond ctx;
+ object_player->clear_refetch_required();
+ object_player->fetch(&ctx);
+ int r = ctx.wait();
+ if (r < 0 || !object_player->refetch_required()) {
+ return r;
+ }
+ }
+ return 0;
+ }
+
+ int watch_and_wait_for_entries(journal::ObjectPlayerPtr object_player,
+ journal::ObjectPlayer::Entries *entries,
+ size_t count) {
+ for (size_t i = 0; i < 50; ++i) {
+ object_player->get_entries(entries);
+ if (entries->size() == count) {
+ break;
+ }
+
+ C_SaferCond ctx;
+ object_player->watch(&ctx, 0.1);
+
+ int r = ctx.wait();
+ if (r < 0) {
+ return r;
+ }
+ }
+ return 0;
+ }
+
std::string get_object_name(const std::string &oid) {
return oid + ".0";
}
};
-TEST_F(TestObjectPlayer, Fetch) {
- std::string oid = get_temp_oid();
+template <uint32_t _max_fetch_bytes>
+struct TestObjectPlayerParams {
+ static const uint32_t max_fetch_bytes = _max_fetch_bytes;
+};
+
+typedef ::testing::Types<TestObjectPlayerParams<0>,
+ TestObjectPlayerParams<10> > TestObjectPlayerTypes;
+TYPED_TEST_CASE(TestObjectPlayer, TestObjectPlayerTypes);
+
+TYPED_TEST(TestObjectPlayer, Fetch) {
+ std::string oid = this->get_temp_oid();
- journal::Entry entry1(234, 123, create_payload(std::string(24, '1')));
- journal::Entry entry2(234, 124, create_payload(std::string(24, '1')));
+ journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1')));
+ journal::Entry entry2(234, 124, this->create_payload(std::string(24, '1')));
bufferlist bl;
::encode(entry1, bl);
::encode(entry2, bl);
- ASSERT_EQ(0, append(get_object_name(oid), bl));
+ ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
- journal::ObjectPlayerPtr object = create_object(oid, 14);
-
- C_SaferCond cond;
- object->fetch(&cond);
- ASSERT_LE(0, cond.wait());
+ journal::ObjectPlayerPtr object = this->create_object(oid, 14);
+ ASSERT_LE(0, this->fetch(object));
journal::ObjectPlayer::Entries entries;
object->get_entries(&entries);
ASSERT_EQ(expected_entries, entries);
}
-TEST_F(TestObjectPlayer, FetchLarge) {
- std::string oid = get_temp_oid();
+TYPED_TEST(TestObjectPlayer, FetchLarge) {
+ std::string oid = this->get_temp_oid();
journal::Entry entry1(234, 123,
- create_payload(std::string(8192 - 33, '1')));
- journal::Entry entry2(234, 124, create_payload(""));
+ this->create_payload(std::string(8192 - 32, '1')));
+ journal::Entry entry2(234, 124, this->create_payload(""));
bufferlist bl;
::encode(entry1, bl);
::encode(entry2, bl);
- ASSERT_EQ(0, append(get_object_name(oid), bl));
-
- journal::ObjectPlayerPtr object = create_object(oid, 12);
+ ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
- C_SaferCond cond;
- object->fetch(&cond);
- ASSERT_LE(0, cond.wait());
+ journal::ObjectPlayerPtr object = this->create_object(oid, 12);
+ ASSERT_LE(0, this->fetch(object));
journal::ObjectPlayer::Entries entries;
object->get_entries(&entries);
- ASSERT_EQ(1U, entries.size());
+ ASSERT_EQ(2U, entries.size());
- journal::ObjectPlayer::Entries expected_entries = {entry1};
+ journal::ObjectPlayer::Entries expected_entries = {entry1, entry2};
ASSERT_EQ(expected_entries, entries);
}
-TEST_F(TestObjectPlayer, FetchDeDup) {
- std::string oid = get_temp_oid();
+TYPED_TEST(TestObjectPlayer, FetchDeDup) {
+ std::string oid = this->get_temp_oid();
- journal::Entry entry1(234, 123, create_payload(std::string(24, '1')));
- journal::Entry entry2(234, 123, create_payload(std::string(24, '2')));
+ journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1')));
+ journal::Entry entry2(234, 123, this->create_payload(std::string(24, '2')));
bufferlist bl;
::encode(entry1, bl);
::encode(entry2, bl);
- ASSERT_EQ(0, append(get_object_name(oid), bl));
-
- journal::ObjectPlayerPtr object = create_object(oid, 14);
+ ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
- C_SaferCond cond;
- object->fetch(&cond);
- ASSERT_LE(0, cond.wait());
+ journal::ObjectPlayerPtr object = this->create_object(oid, 14);
+ ASSERT_LE(0, this->fetch(object));
journal::ObjectPlayer::Entries entries;
object->get_entries(&entries);
ASSERT_EQ(expected_entries, entries);
}
-TEST_F(TestObjectPlayer, FetchEmpty) {
- std::string oid = get_temp_oid();
+TYPED_TEST(TestObjectPlayer, FetchEmpty) {
+ std::string oid = this->get_temp_oid();
bufferlist bl;
- ASSERT_EQ(0, append(get_object_name(oid), bl));
+ ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
- journal::ObjectPlayerPtr object = create_object(oid, 14);
+ journal::ObjectPlayerPtr object = this->create_object(oid, 14);
- C_SaferCond cond;
- object->fetch(&cond);
- ASSERT_EQ(-ENOENT, cond.wait());
+ ASSERT_EQ(-ENOENT, this->fetch(object));
ASSERT_TRUE(object->empty());
}
-TEST_F(TestObjectPlayer, FetchError) {
- std::string oid = get_temp_oid();
+TYPED_TEST(TestObjectPlayer, FetchCorrupt) {
+ std::string oid = this->get_temp_oid();
- journal::ObjectPlayerPtr object = create_object(oid, 14);
-
- C_SaferCond cond;
- object->fetch(&cond);
- ASSERT_EQ(-ENOENT, cond.wait());
- ASSERT_TRUE(object->empty());
-}
-
-TEST_F(TestObjectPlayer, FetchCorrupt) {
- std::string oid = get_temp_oid();
-
- journal::Entry entry1(234, 123, create_payload(std::string(24, '1')));
- journal::Entry entry2(234, 124, create_payload(std::string(24, '2')));
+ journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1')));
+ journal::Entry entry2(234, 124, this->create_payload(std::string(24, '2')));
bufferlist bl;
::encode(entry1, bl);
- ::encode(create_payload("corruption"), bl);
+ ::encode(this->create_payload("corruption"), bl);
::encode(entry2, bl);
- ASSERT_EQ(0, append(get_object_name(oid), bl));
-
- journal::ObjectPlayerPtr object = create_object(oid, 14);
+ ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
- C_SaferCond cond;
- object->fetch(&cond);
- ASSERT_EQ(-EBADMSG, cond.wait());
+ journal::ObjectPlayerPtr object = this->create_object(oid, 14);
+ ASSERT_EQ(-EBADMSG, this->fetch(object));
journal::ObjectPlayer::Entries entries;
object->get_entries(&entries);
ASSERT_EQ(expected_entries, entries);
}
-TEST_F(TestObjectPlayer, FetchAppend) {
- std::string oid = get_temp_oid();
+TYPED_TEST(TestObjectPlayer, FetchAppend) {
+ std::string oid = this->get_temp_oid();
- journal::Entry entry1(234, 123, create_payload(std::string(24, '1')));
- journal::Entry entry2(234, 124, create_payload(std::string(24, '2')));
+ journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1')));
+ journal::Entry entry2(234, 124, this->create_payload(std::string(24, '2')));
bufferlist bl;
::encode(entry1, bl);
- ASSERT_EQ(0, append(get_object_name(oid), bl));
+ ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
- journal::ObjectPlayerPtr object = create_object(oid, 14);
-
- C_SaferCond cond1;
- object->fetch(&cond1);
- ASSERT_LE(0, cond1.wait());
+ journal::ObjectPlayerPtr object = this->create_object(oid, 14);
+ ASSERT_LE(0, this->fetch(object));
journal::ObjectPlayer::Entries entries;
object->get_entries(&entries);
bl.clear();
::encode(entry2, bl);
- ASSERT_EQ(0, append(get_object_name(oid), bl));
-
- C_SaferCond cond2;
- object->fetch(&cond2);
- ASSERT_LE(0, cond2.wait());
+ ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
+ ASSERT_LE(0, this->fetch(object));
object->get_entries(&entries);
ASSERT_EQ(2U, entries.size());
ASSERT_EQ(expected_entries, entries);
}
-TEST_F(TestObjectPlayer, PopEntry) {
- std::string oid = get_temp_oid();
+TYPED_TEST(TestObjectPlayer, PopEntry) {
+ std::string oid = this->get_temp_oid();
- journal::Entry entry1(234, 123, create_payload(std::string(24, '1')));
- journal::Entry entry2(234, 124, create_payload(std::string(24, '1')));
+ journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1')));
+ journal::Entry entry2(234, 124, this->create_payload(std::string(24, '1')));
bufferlist bl;
::encode(entry1, bl);
::encode(entry2, bl);
- ASSERT_EQ(0, append(get_object_name(oid), bl));
+ ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
- journal::ObjectPlayerPtr object = create_object(oid, 14);
-
- C_SaferCond cond;
- object->fetch(&cond);
- ASSERT_LE(0, cond.wait());
+ journal::ObjectPlayerPtr object = this->create_object(oid, 14);
+ ASSERT_LE(0, this->fetch(object));
journal::ObjectPlayer::Entries entries;
object->get_entries(&entries);
ASSERT_TRUE(object->empty());
}
-TEST_F(TestObjectPlayer, Watch) {
- std::string oid = get_temp_oid();
- journal::ObjectPlayerPtr object = create_object(oid, 14);
+TYPED_TEST(TestObjectPlayer, Watch) {
+ std::string oid = this->get_temp_oid();
+ journal::ObjectPlayerPtr object = this->create_object(oid, 14);
C_SaferCond cond1;
object->watch(&cond1, 0.1);
- journal::Entry entry1(234, 123, create_payload(std::string(24, '1')));
- journal::Entry entry2(234, 124, create_payload(std::string(24, '1')));
+ journal::Entry entry1(234, 123, this->create_payload(std::string(24, '1')));
+ journal::Entry entry2(234, 124, this->create_payload(std::string(24, '1')));
bufferlist bl;
::encode(entry1, bl);
- ASSERT_EQ(0, append(get_object_name(oid), bl));
+ ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
ASSERT_LE(0, cond1.wait());
journal::ObjectPlayer::Entries entries;
- object->get_entries(&entries);
+ ASSERT_EQ(0, this->watch_and_wait_for_entries(object, &entries, 1U));
ASSERT_EQ(1U, entries.size());
journal::ObjectPlayer::Entries expected_entries;
bl.clear();
::encode(entry2, bl);
- ASSERT_EQ(0, append(get_object_name(oid), bl));
+ ASSERT_EQ(0, this->append(this->get_object_name(oid), bl));
ASSERT_LE(0, cond2.wait());
- object->get_entries(&entries);
+ ASSERT_EQ(0, this->watch_and_wait_for_entries(object, &entries, 2U));
ASSERT_EQ(2U, entries.size());
expected_entries = {entry1, entry2};
ASSERT_EQ(expected_entries, entries);
}
-TEST_F(TestObjectPlayer, Unwatch) {
- std::string oid = get_temp_oid();
- journal::ObjectPlayerPtr object = create_object(oid, 14);
+TYPED_TEST(TestObjectPlayer, Unwatch) {
+ std::string oid = this->get_temp_oid();
+ journal::ObjectPlayerPtr object = this->create_object(oid, 14);
C_SaferCond watch_ctx;
object->watch(&watch_ctx, 600);