: ThreadPool::PointerWQ<AioImageRequest<> >(name, ti, 0, tp),
m_image_ctx(*image_ctx),
m_lock(util::unique_lock_name("AioImageRequestWQ::m_lock", this)),
- m_write_blockers(0), m_in_progress_writes(0), m_queued_writes(0),
- m_in_flight_ops(0), m_refresh_in_progress(false),
+ m_write_blockers(0), m_in_progress_writes(0), m_queued_reads(0),
+ m_queued_writes(0), m_in_flight_ops(0), m_refresh_in_progress(false),
m_shutdown(false), m_on_shutdown(nullptr) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 5) << this << " " << ": ictx=" << image_ctx << dendl;
}
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
- if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
+
+ // if journaling is enabled -- we need to replay the journal because
+ // it might contain an uncommitted write
+ bool lock_required;
+ {
+ RWLock::RLocker locker(m_lock);
+ lock_required = m_require_lock_on_read;
+ }
+
+ if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty() ||
+ lock_required) {
queue(new AioImageRead(m_image_ctx, c, off, len, buf, pbl, op_flags));
} else {
AioImageRequest<>::aio_read(&m_image_ctx, c, off, len, buf, pbl, op_flags);
}
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
- if (m_image_ctx.non_blocking_aio || is_journal_required() ||
- writes_blocked()) {
+ if (m_image_ctx.non_blocking_aio || writes_blocked()) {
queue(new AioImageWrite(m_image_ctx, c, off, len, buf, op_flags));
} else {
AioImageRequest<>::aio_write(&m_image_ctx, c, off, len, buf, op_flags);
}
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
- if (m_image_ctx.non_blocking_aio || is_journal_required() ||
- writes_blocked()) {
+ if (m_image_ctx.non_blocking_aio || writes_blocked()) {
queue(new AioImageDiscard(m_image_ctx, c, off, len));
} else {
AioImageRequest<>::aio_discard(&m_image_ctx, c, off, len);
}
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
- if (m_image_ctx.non_blocking_aio || is_journal_required() ||
- writes_blocked() || !writes_empty()) {
+ if (m_image_ctx.non_blocking_aio || writes_blocked() || !writes_empty()) {
queue(new AioImageFlush(m_image_ctx, c));
} else {
AioImageRequest<>::aio_flush(&m_image_ctx, c);
m_image_ctx.flush(on_shutdown);
}
+bool AioImageRequestWQ::is_lock_request_needed() const {
+ RWLock::RLocker locker(m_lock);
+ return (m_queued_writes.read() > 0 ||
+ (m_require_lock_on_read && m_queued_reads.read() > 0));
+}
+
void AioImageRequestWQ::block_writes() {
C_SaferCond cond_ctx;
block_writes(&cond_ctx);
}
}
+void AioImageRequestWQ::set_require_lock_on_read() {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << __func__ << dendl;
+
+ RWLock::WLocker locker(m_lock);
+ m_require_lock_on_read = true;
+}
+
+void AioImageRequestWQ::clear_require_lock_on_read() {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << __func__ << dendl;
+
+ {
+ RWLock::WLocker locker(m_lock);
+ if (!m_require_lock_on_read) {
+ return;
+ }
+
+ m_require_lock_on_read = false;
+ }
+ signal();
+}
+
void *AioImageRequestWQ::_void_dequeue() {
AioImageRequest<> *peek_item = front();
if (peek_item == NULL || m_refresh_in_progress) {
return NULL;
}
- if (peek_item->is_write_op()) {
+ {
RWLock::RLocker locker(m_lock);
- if (m_write_blockers > 0) {
- return NULL;
+ if (peek_item->is_write_op()) {
+ if (m_write_blockers > 0) {
+ return NULL;
+ }
+ m_in_progress_writes.inc();
+ } else if (m_require_lock_on_read) {
+ return nullptr;
}
- m_in_progress_writes.inc();
}
AioImageRequest<> *item = reinterpret_cast<AioImageRequest<> *>(
!m_write_blocker_contexts.empty()) {
writes_blocked = true;
}
+ } else {
+ assert(m_queued_reads.read() > 0);
+ m_queued_reads.dec();
}
}
m_image_ctx.flush(m_on_shutdown);
}
-bool AioImageRequestWQ::is_journal_required() const {
- // TODO eliminate once journal startup state is integrated
- RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
- return (m_image_ctx.journal != NULL);
-}
-
bool AioImageRequestWQ::is_lock_required() const {
assert(m_image_ctx.owner_lock.is_locked());
if (m_image_ctx.exclusive_lock == NULL) {
bool write_op = req->is_write_op();
if (write_op) {
m_queued_writes.inc();
+ } else {
+ m_queued_reads.inc();
}
ThreadPool::PointerWQ<AioImageRequest<> >::queue(req);
- if (write_op && is_lock_required()) {
+ if ((write_op && is_lock_required()) ||
+ (!write_op && m_require_lock_on_read)) {
m_image_ctx.exclusive_lock->request_lock(nullptr);
}
}
using typename ThreadPool::PointerWQ<AioImageRequest<ImageCtx> >::drain;
using typename ThreadPool::PointerWQ<AioImageRequest<ImageCtx> >::empty;
- inline bool writes_empty() const {
- RWLock::RLocker locker(m_lock);
- return (m_queued_writes.read() == 0);
- }
+ void shut_down(Context *on_shutdown);
+
+ bool is_lock_request_needed() const;
inline bool writes_blocked() const {
RWLock::RLocker locker(m_lock);
return (m_write_blockers > 0);
}
- void shut_down(Context *on_shutdown);
-
void block_writes();
void block_writes(Context *on_blocked);
void unblock_writes();
+ void set_require_lock_on_read();
+ void clear_require_lock_on_read();
+
protected:
virtual void *_void_dequeue();
virtual void process(AioImageRequest<ImageCtx> *req);
mutable RWLock m_lock;
Contexts m_write_blocker_contexts;
uint32_t m_write_blockers;
+ bool m_require_lock_on_read = false;
atomic_t m_in_progress_writes;
+ atomic_t m_queued_reads;
atomic_t m_queued_writes;
atomic_t m_in_flight_ops;
bool m_shutdown;
Context *m_on_shutdown;
+ inline bool writes_empty() const {
+ RWLock::RLocker locker(m_lock);
+ return (m_queued_writes.read() == 0);
+ }
+
int start_in_flight_op(AioCompletion *c);
void finish_in_flight_op();
- bool is_journal_required() const;
bool is_lock_required() const;
void queue(AioImageRequest<ImageCtx> *req);
}
template <typename I>
-void ExclusiveLock<I>::init(Context *on_init) {
+void ExclusiveLock<I>::init(uint64_t features, Context *on_init) {
assert(m_image_ctx.owner_lock.is_locked());
ldout(m_image_ctx.cct, 10) << this << " " << __func__ << dendl;
}
m_image_ctx.aio_work_queue->block_writes(new C_InitComplete(this, on_init));
+ if ((features & RBD_FEATURE_JOURNALING) != 0) {
+ m_image_ctx.aio_work_queue->set_require_lock_on_read();
+ }
}
template <typename I>
if (next_state == STATE_LOCKED) {
m_image_ctx.image_watcher->notify_acquired_lock();
+ m_image_ctx.aio_work_queue->clear_require_lock_on_read();
m_image_ctx.aio_work_queue->unblock_writes();
}
template <typename I>
void ExclusiveLock<I>::handle_release_lock(int r) {
- bool pending_writes = false;
+ bool lock_request_needed = false;
{
Mutex::Locker locker(m_lock);
ldout(m_image_ctx.cct, 10) << this << " " << __func__ << ": r=" << r
if (r >= 0) {
m_lock.Unlock();
m_image_ctx.image_watcher->notify_released_lock();
- pending_writes = !m_image_ctx.aio_work_queue->writes_empty();
+ lock_request_needed = m_image_ctx.aio_work_queue->is_lock_request_needed();
m_lock.Lock();
m_watch_handle = 0;
complete_active_action(r < 0 ? STATE_LOCKED : STATE_UNLOCKED, r);
}
- if (r >= 0 && pending_writes) {
- // if we have pending writes -- re-request the lock
+ if (r >= 0 && lock_request_needed) {
+ // if we have blocked IO -- re-request the lock
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
request_lock(nullptr);
}
assert(m_lock.is_locked());
if (m_state == STATE_UNLOCKED) {
m_state = STATE_SHUTTING_DOWN;
+ m_image_ctx.aio_work_queue->clear_require_lock_on_read();
m_image_ctx.aio_work_queue->unblock_writes();
m_image_ctx.image_watcher->flush(util::create_context_callback<
ExclusiveLock<I>, &ExclusiveLock<I>::complete_shutdown>(this));
lderr(cct) << "failed to shut down exclusive lock: " << cpp_strerror(r)
<< dendl;
} else {
+ m_image_ctx.aio_work_queue->clear_require_lock_on_read();
m_image_ctx.aio_work_queue->unblock_writes();
}
bool is_lock_owner() const;
bool accept_requests() const;
- void init(Context *on_init);
+ void init(uint64_t features, Context *on_init);
void shut_down(Context *on_shutdown);
void try_lock(Context *on_tried_lock);
{
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+ if (m_image_ctx.test_features(RBD_FEATURE_JOURNALING)) {
+ m_image_ctx.aio_work_queue->set_require_lock_on_read();
+ }
m_image_ctx.aio_work_queue->block_writes(ctx);
}
}
#include "common/WorkQueue.h"
#include "cls/lock/cls_lock_client.h"
#include "cls/rbd/cls_rbd_client.h"
+#include "librbd/AioImageRequestWQ.h"
#include "librbd/ExclusiveLock.h"
#include "librbd/ImageCtx.h"
#include "librbd/Journal.h"
klass, &klass::handle_v2_init_exclusive_lock>(this);
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
- m_exclusive_lock->init(ctx);
+ m_exclusive_lock->init(m_features, ctx);
return nullptr;
}
m_image_ctx.journal != nullptr ||
m_image_ctx.exclusive_lock == nullptr ||
!m_image_ctx.exclusive_lock->is_lock_owner()) {
+
+ // journal dynamically enabled -- doesn't own exclusive lock
+ if ((m_features & RBD_FEATURE_JOURNALING) != 0 &&
+ m_image_ctx.exclusive_lock != nullptr &&
+ m_image_ctx.journal == nullptr) {
+ m_image_ctx.aio_work_queue->set_require_lock_on_read();
+ }
return send_v2_finalize_refresh_parent();
}
std::swap(m_exclusive_lock, m_image_ctx.exclusive_lock);
}
if (!m_image_ctx.test_features(RBD_FEATURE_JOURNALING,
- m_image_ctx.snap_lock) ||
- m_journal != nullptr) {
+ m_image_ctx.snap_lock)) {
+ if (m_image_ctx.journal != nullptr) {
+ m_image_ctx.aio_work_queue->clear_require_lock_on_read();
+ }
+ std::swap(m_journal, m_image_ctx.journal);
+ } else if (m_journal != nullptr) {
std::swap(m_journal, m_image_ctx.journal);
}
if (!m_image_ctx.test_features(RBD_FEATURE_OBJECT_MAP,
klass, &klass::handle_init_exclusive_lock>(this);
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
- m_exclusive_lock->init(ctx);
+ m_exclusive_lock->init(m_image_ctx.features, ctx);
}
template <typename I>
public:
typedef ReleaseRequest<MockImageCtx> MockReleaseRequest;
+ void expect_test_features(MockImageCtx &mock_image_ctx, uint64_t features,
+ bool enabled) {
+ EXPECT_CALL(mock_image_ctx, test_features(features))
+ .WillOnce(Return(enabled));
+ }
+
+ void expect_set_require_lock_on_read(MockImageCtx &mock_image_ctx) {
+ EXPECT_CALL(*mock_image_ctx.aio_work_queue, set_require_lock_on_read());
+ }
+
void expect_block_writes(MockImageCtx &mock_image_ctx, int r) {
+ expect_test_features(mock_image_ctx, RBD_FEATURE_JOURNALING,
+ ((mock_image_ctx.features & RBD_FEATURE_JOURNALING) != 0));
+ if ((mock_image_ctx.features & RBD_FEATURE_JOURNALING) != 0) {
+ expect_set_require_lock_on_read(mock_image_ctx);
+ }
EXPECT_CALL(*mock_image_ctx.aio_work_queue, block_writes(_))
.WillOnce(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue));
}
using ::testing::DoDefault;
using ::testing::InSequence;
using ::testing::Return;
+using ::testing::WithArg;
class TestMockImageRefreshRequest : public TestMockFixture {
public:
typedef RefreshRequest<MockImageCtx> MockRefreshRequest;
typedef RefreshParentRequest<MockImageCtx> MockRefreshParentRequest;
+ void expect_set_require_lock_on_read(MockImageCtx &mock_image_ctx) {
+ EXPECT_CALL(*mock_image_ctx.aio_work_queue, set_require_lock_on_read());
+ }
+
+ void expect_clear_require_lock_on_read(MockImageCtx &mock_image_ctx) {
+ EXPECT_CALL(*mock_image_ctx.aio_work_queue, clear_require_lock_on_read());
+ }
+
void expect_v1_read_header(MockImageCtx &mock_image_ctx, int r) {
auto &expect = EXPECT_CALL(get_mock_io_ctx(mock_image_ctx.md_ctx),
read(mock_image_ctx.header_oid, _, _, _));
int r) {
EXPECT_CALL(mock_image_ctx, create_exclusive_lock())
.WillOnce(Return(&mock_exclusive_lock));
- EXPECT_CALL(mock_exclusive_lock, init(_))
- .WillOnce(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue));
+ EXPECT_CALL(mock_exclusive_lock, init(mock_image_ctx.features, _))
+ .WillOnce(WithArg<1>(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue)));
}
void expect_shut_down_exclusive_lock(MockImageCtx &mock_image_ctx,
expect_get_mutable_metadata(mock_image_ctx, 0);
expect_get_flags(mock_image_ctx, 0);
expect_refresh_parent_is_required(mock_refresh_parent_request, false);
+ expect_set_require_lock_on_read(mock_image_ctx);
C_SaferCond ctx;
MockRefreshRequest *req = new MockRefreshRequest(mock_image_ctx, &ctx);
expect_get_mutable_metadata(mock_image_ctx, 0);
expect_get_flags(mock_image_ctx, 0);
expect_refresh_parent_is_required(mock_refresh_parent_request, false);
+ expect_clear_require_lock_on_read(mock_image_ctx);
expect_close_journal(mock_image_ctx, *mock_journal, 0);
C_SaferCond ctx;
MOCK_METHOD1(block_writes, void(Context *));
MOCK_METHOD0(unblock_writes, void());
- MOCK_CONST_METHOD0(writes_empty, bool());
+ MOCK_METHOD0(set_require_lock_on_read, void());
+ MOCK_METHOD0(clear_require_lock_on_read, void());
+
+ MOCK_CONST_METHOD0(is_lock_request_needed, bool());
};
} // namespace librbd
MOCK_METHOD1(assert_header_locked, void(librados::ObjectWriteOperation *));
- MOCK_METHOD1(init, void(Context*));
+ MOCK_METHOD2(init, void(uint64_t features, Context*));
MOCK_METHOD1(shut_down, void(Context*));
};
ASSERT_PASSED(validate_object_map, image3);
}
+TEST_F(TestLibRBD, ExclusiveLockReadTransition)
+{
+ REQUIRE_FEATURE(RBD_FEATURE_JOURNALING);
+
+ librados::IoCtx ioctx;
+ ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+
+ librbd::RBD rbd;
+ std::string name = get_temp_image_name();
+
+ uint64_t size = 1 << 18;
+ int order = 12;
+ ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+
+ librbd::Image image1;
+ ASSERT_EQ(0, rbd.open(ioctx, image1, name.c_str(), NULL));
+
+ bool lock_owner;
+ ASSERT_EQ(0, image1.is_exclusive_lock_owner(&lock_owner));
+ ASSERT_FALSE(lock_owner);
+
+ // journaling should force read ops to acquire the lock
+ bufferlist read_bl;
+ ASSERT_EQ(0, image1.read(0, 0, read_bl));
+
+ ASSERT_EQ(0, image1.is_exclusive_lock_owner(&lock_owner));
+ ASSERT_TRUE(lock_owner);
+
+ librbd::Image image2;
+ ASSERT_EQ(0, rbd.open(ioctx, image2, name.c_str(), NULL));
+
+ std::list<librbd::RBD::AioCompletion *> comps;
+ std::list<bufferlist> read_bls;
+ for (size_t object_no = 0; object_no < (size >> 12); ++object_no) {
+ librbd::RBD::AioCompletion *comp = new librbd::RBD::AioCompletion(NULL,
+ NULL);
+ comps.push_back(comp);
+ read_bls.emplace_back();
+ if (object_no % 2 == 0) {
+ ASSERT_EQ(0, image1.aio_read(object_no << order, 1 << order, read_bls.back(), comp));
+ } else {
+ ASSERT_EQ(0, image2.aio_read(object_no << order, 1 << order, read_bls.back(), comp));
+ }
+ }
+
+ while (!comps.empty()) {
+ librbd::RBD::AioCompletion *comp = comps.front();
+ comps.pop_front();
+ ASSERT_EQ(0, comp->wait_for_complete());
+ ASSERT_EQ(1, comp->is_complete());
+ }
+}
+
TEST_F(TestLibRBD, CacheMayCopyOnWrite) {
REQUIRE_FEATURE(RBD_FEATURE_LAYERING);
.WillRepeatedly(Return(1234567890));
}
+ void expect_set_require_lock_on_read(MockImageCtx &mock_image_ctx) {
+ EXPECT_CALL(*mock_image_ctx.aio_work_queue, set_require_lock_on_read());
+ }
+
+ void expect_clear_require_lock_on_read(MockImageCtx &mock_image_ctx) {
+ EXPECT_CALL(*mock_image_ctx.aio_work_queue, clear_require_lock_on_read());
+ }
+
void expect_block_writes(MockImageCtx &mock_image_ctx) {
EXPECT_CALL(*mock_image_ctx.aio_work_queue, block_writes(_))
.WillOnce(CompleteContext(0, mock_image_ctx.image_ctx->op_work_queue));
+ if ((mock_image_ctx.features & RBD_FEATURE_JOURNALING) != 0) {
+ expect_set_require_lock_on_read(mock_image_ctx);
+ }
}
void expect_unblock_writes(MockImageCtx &mock_image_ctx) {
+ expect_clear_require_lock_on_read(mock_image_ctx);
EXPECT_CALL(*mock_image_ctx.aio_work_queue, unblock_writes());
}
expect_unblock_writes(mock_image_ctx);
}
expect_notify_released_lock(mock_image_ctx);
- expect_writes_empty(mock_image_ctx);
+ expect_is_lock_request_needed(mock_image_ctx, false);
}
}
.Times(1);
}
- void expect_writes_empty(MockImageCtx &mock_image_ctx) {
- EXPECT_CALL(*mock_image_ctx.aio_work_queue, writes_empty())
- .WillRepeatedly(Return(true));
+ void expect_is_lock_request_needed(MockImageCtx &mock_image_ctx, bool ret) {
+ EXPECT_CALL(*mock_image_ctx.aio_work_queue, is_lock_request_needed())
+ .WillRepeatedly(Return(ret));
}
void expect_flush_notifies(MockImageCtx &mock_image_ctx) {
C_SaferCond ctx;
{
RWLock::WLocker owner_locker(mock_image_ctx.owner_lock);
- exclusive_lock.init(&ctx);
+ exclusive_lock.init(mock_image_ctx.features, &ctx);
}
return ctx.wait();
}
EXPECT_CALL(release, send())
.WillOnce(Notify(&wait_for_send_ctx2));
expect_notify_released_lock(mock_image_ctx);
- expect_writes_empty(mock_image_ctx);
+ expect_is_lock_request_needed(mock_image_ctx, false);
C_SaferCond try_request_ctx1;
{