#include "include/encoding.h"
#include "include/stringify.h"
#include "common/errno.h"
+#include "common/Timer.h"
#include <sstream>
#include <boost/bind.hpp>
#include <boost/function.hpp>
static const uint64_t NOTIFY_TIMEOUT = 5000;
static const uint8_t NOTIFY_VERSION = 1;
+static const double RETRY_DELAY_SECONDS = 1.0;
enum {
NOTIFY_OP_ACQUIRED_LOCK = 0,
: m_image_ctx(image_ctx), m_watch_ctx(*this), m_handle(0),
m_lock_owner_state(LOCK_OWNER_STATE_NOT_LOCKED),
m_finisher(new Finisher(image_ctx.cct)),
+ m_timer_lock("librbd::ImageWatcher::m_timer_lock"),
+ m_timer(new SafeTimer(image_ctx.cct, m_timer_lock)),
m_watch_lock("librbd::ImageWatcher::m_watch_lock"), m_watch_error(0),
m_aio_request_lock("librbd::ImageWatcher::m_aio_request_lock"),
- m_retrying_aio_requests(false)
+ m_retrying_aio_requests(false), m_retry_aio_context(NULL)
{
m_finisher->start();
+ m_timer->init();
}
ImageWatcher::~ImageWatcher()
{
+ Mutex::Locker l(m_timer_lock);
+ m_timer->shutdown();
m_finisher->stop();
delete m_finisher;
}
}
bool ImageWatcher::try_request_lock() {
+ RWLock::WLocker l(m_image_ctx.owner_lock);
+ if (is_lock_owner()) {
+ return true;
+ }
+
int r = try_lock();
if (r < 0) {
ldout(m_image_ctx.cct, 5) << "failed to acquire exclusive lock:"
<< cpp_strerror(r) << dendl;
- cancel_aio_requests(-EROFS);
- return true;
+ return false;
}
if (is_lock_owner()) {
}
void ImageWatcher::finalize_request_lock() {
- {
- RWLock::WLocker l(m_image_ctx.owner_lock);
- try_request_lock();
+ cancel_retry_aio_requests();
+
+ if (try_request_lock()) {
+ retry_aio_requests();
+ } else {
+ schedule_retry_aio_requests();
}
- retry_aio_requests();
}
int ImageWatcher::get_lock_owner_info(entity_name_t *locker, std::string *cookie,
return true;
}
+void ImageWatcher::schedule_retry_aio_requests() {
+ Mutex::Locker l(m_timer_lock);
+ if (m_retry_aio_context == NULL) {
+ m_retry_aio_context = new FunctionContext(boost::bind(
+ &ImageWatcher::finalize_retry_aio_requests, this));
+ m_timer->add_event_after(RETRY_DELAY_SECONDS, m_retry_aio_context);
+ }
+}
+
+void ImageWatcher::cancel_retry_aio_requests() {
+ Mutex::Locker l(m_timer_lock);
+ if (m_retry_aio_context != NULL) {
+ m_timer->cancel_event(m_retry_aio_context);
+ m_retry_aio_context = NULL;
+ }
+}
+
+void ImageWatcher::finalize_retry_aio_requests() {
+ assert(m_timer_lock.is_locked());
+ m_retry_aio_context = NULL;
+ retry_aio_requests();
+}
+
void ImageWatcher::retry_aio_requests() {
std::vector<AioRequest> lock_request_restarts;
{
m_aio_request_cond.Signal();
}
-void ImageWatcher::cancel_aio_requests(int result) {
- Mutex::Locker l(m_aio_request_lock);
- for (std::vector<AioRequest>::iterator iter = m_aio_requests.begin();
- iter != m_aio_requests.end(); ++iter) {
- AioCompletion *c = iter->second;
- c->get();
- c->lock.Lock();
- c->rval = result;
- c->lock.Unlock();
- c->finish_adding_requests(m_image_ctx.cct);
- c->put();
- }
- m_aio_requests.clear();
- m_aio_request_cond.Signal();
-}
-
int ImageWatcher::decode_response_code(bufferlist &bl) {
int r;
bufferlist::iterator iter = bl.begin();
}
void ImageWatcher::notify_request_lock() {
- bool try_lock_complete;
- {
- // try to lock now that we know we are not in a rados callback
- RWLock::WLocker l(m_image_ctx.owner_lock);
- try_lock_complete = try_request_lock();
- }
- if (try_lock_complete) {
+ cancel_retry_aio_requests();
+
+ if (try_request_lock()) {
retry_aio_requests();
return;
}
} else if (r < 0) {
lderr(m_image_ctx.cct) << "error requesting lock: " << cpp_strerror(r)
<< dendl;
- cancel_aio_requests(-EROFS);
+ schedule_retry_aio_requests();
}
}
if (m_watch_error < 0) {
lderr(m_image_ctx.cct) << "failed to re-register image watch: "
<< cpp_strerror(m_watch_error) << dendl;
- cancel_aio_requests(m_watch_error);
+ schedule_retry_aio_requests();
return;
}
}
}
}
+ Mutex::Locker l(m_timer_lock);
retry_aio_requests();
}
class TestImageWatcher : public TestFixture {
public:
- TestImageWatcher() : m_watch_ctx(NULL), m_aio_completion_restarts(),
+ TestImageWatcher() : m_watch_ctx(NULL), m_aio_completion_restarts(0),
+ m_expected_aio_restarts(0),
m_callback_lock("m_callback_lock")
{
}
static void handle_aio_completion(void *arg1, void *arg2) {
TestImageWatcher *test_image_watcher =
reinterpret_cast<TestImageWatcher *>(arg2);
- Mutex::Locker l(test_image_watcher->m_callback_lock);
+ assert(test_image_watcher->m_callback_lock.is_locked());
test_image_watcher->m_callback_cond.Signal();
}
- int handle_restart_aio(librbd::AioCompletion *aio_completion) {
- {
- Mutex::Locker l(aio_completion->lock);
- aio_completion->complete();
+ int handle_restart_aio(librbd::ImageCtx *ictx,
+ librbd::AioCompletion *aio_completion) {
+ Mutex::Locker l1(m_callback_lock);
+ ++m_aio_completion_restarts;
+
+ RWLock::WLocker l2(ictx->owner_lock);
+ if (!ictx->image_watcher->is_lock_owner() &&
+ m_aio_completion_restarts < m_expected_aio_restarts) {
+ EXPECT_EQ(0, ictx->image_watcher->request_lock(
+ boost::bind(&TestImageWatcher::handle_restart_aio, this, ictx, _1),
+ aio_completion));
+ } else {
+ {
+ Mutex::Locker l2(aio_completion->lock);
+ aio_completion->complete();
+ }
+
+ m_aio_completions.erase(aio_completion);
+ delete aio_completion;
}
- Mutex::Locker l(m_callback_lock);
- m_aio_completions.erase(aio_completion);
- delete aio_completion;
- ++m_aio_completion_restarts;
m_callback_cond.Signal();
return 0;
}
- bool wait_for_aio_completions(librbd::ImageCtx &ictx, int result) {
+ bool wait_for_aio_completions(librbd::ImageCtx &ictx) {
Mutex::Locker l(m_callback_lock);
- while (!m_aio_completions.empty()) {
- for (std::set<librbd::AioCompletion *>::iterator iter =
- m_aio_completions.begin(); iter != m_aio_completions.end(); ) {
- std::set<librbd::AioCompletion *>::iterator next(iter);
- ++next;
-
- librbd::AioCompletion *aio_completion = *iter;
- if (!aio_completion->building) {
- if (result != aio_completion->rval) {
- EXPECT_EQ(result, aio_completion->rval);
- return false;
- }
- m_aio_completions.erase(iter);
- }
- iter = next;
- }
- if (m_aio_completions.empty()) {
- break;
- }
-
- int r = m_callback_cond.WaitInterval(ictx.cct, m_callback_lock,
- utime_t(10, 0));
+ int r = 0;
+ while (!m_aio_completions.empty() &&
+ m_aio_completion_restarts < m_expected_aio_restarts) {
+ r = m_callback_cond.WaitInterval(ictx.cct, m_callback_lock,
+ utime_t(10, 0));
if (r != 0) {
break;
}
}
- return (m_aio_completions.empty() &&
- (result != 0 || m_aio_completion_restarts > 0));
+ return (r == 0);
}
typedef std::pair<NotifyOp, bufferlist> NotifyOpPayload;
std::set<librbd::AioCompletion *> m_aio_completions;
uint32_t m_aio_completion_restarts;
+ uint32_t m_expected_aio_restarts;
Mutex m_callback_lock;
Cond m_callback_cond;
m_notify_acks = boost::assign::list_of(
std::make_pair(NOTIFY_OP_REQUEST_LOCK, bl));
+ m_expected_aio_restarts = 2;
{
RWLock::WLocker l(ictx->owner_lock);
ASSERT_EQ(0, ictx->image_watcher->request_lock(
- boost::bind(&TestImageWatcher::handle_restart_aio, this, _1),
+ boost::bind(&TestImageWatcher::handle_restart_aio, this, ictx, _1),
create_aio_completion(*ictx)));
ASSERT_EQ(0, ictx->image_watcher->request_lock(
- boost::bind(&TestImageWatcher::handle_restart_aio, this, _1),
+ boost::bind(&TestImageWatcher::handle_restart_aio, this, ictx, _1),
create_aio_completion(*ictx)));
}
std::make_pair(NOTIFY_OP_ACQUIRED_LOCK, bufferlist()));
ASSERT_EQ(expected_notify_ops, m_notifies);
- ASSERT_TRUE(wait_for_aio_completions(*ictx, 0));
+ ASSERT_TRUE(wait_for_aio_completions(*ictx));
}
TEST_F(TestImageWatcher, RequestLockTimedOut) {
m_notify_acks = boost::assign::list_of(
std::make_pair(NOTIFY_OP_REQUEST_LOCK, bufferlist()));
+ m_expected_aio_restarts = 1;
{
RWLock::WLocker l(ictx->owner_lock);
ASSERT_EQ(0, ictx->image_watcher->request_lock(
- boost::bind(&TestImageWatcher::handle_restart_aio, this, _1),
+ boost::bind(&TestImageWatcher::handle_restart_aio, this, ictx, _1),
create_aio_completion(*ictx)));
}
std::make_pair(NOTIFY_OP_REQUEST_LOCK, bufferlist()));
ASSERT_EQ(expected_notify_ops, m_notifies);
- ASSERT_TRUE(wait_for_aio_completions(*ictx, 0));
+ ASSERT_TRUE(wait_for_aio_completions(*ictx));
}
TEST_F(TestImageWatcher, RequestLockTryLockRace) {
m_notify_acks = boost::assign::list_of(
std::make_pair(NOTIFY_OP_REQUEST_LOCK, bl));
+ m_expected_aio_restarts = 1;
{
RWLock::WLocker l(ictx->owner_lock);
ASSERT_EQ(0, ictx->image_watcher->request_lock(
- boost::bind(&TestImageWatcher::handle_restart_aio, this, _1),
+ boost::bind(&TestImageWatcher::handle_restart_aio, this, ictx, _1),
create_aio_completion(*ictx)));
}
ENCODE_FINISH(bl);
}
ASSERT_EQ(0, m_ioctx.notify2(ictx->header_oid, bl, 5000, NULL));
- ASSERT_TRUE(wait_for_aio_completions(*ictx, 0));
+ ASSERT_TRUE(wait_for_aio_completions(*ictx));
RWLock::RLocker l(ictx->owner_lock);
ASSERT_FALSE(ictx->image_watcher->is_lock_owner());
}
ASSERT_EQ(0, open_image(m_image_name, &ictx));
ASSERT_EQ(0, lock_image(*ictx, LOCK_SHARED, "manually 1234"));
+ m_expected_aio_restarts = 3;
{
RWLock::WLocker l(ictx->owner_lock);
ASSERT_EQ(0, ictx->image_watcher->request_lock(
- boost::bind(&TestImageWatcher::handle_restart_aio, this, _1),
+ boost::bind(&TestImageWatcher::handle_restart_aio, this, ictx, _1),
create_aio_completion(*ictx)));
}
- ASSERT_TRUE(wait_for_aio_completions(*ictx, -EROFS));
+ ASSERT_TRUE(wait_for_aio_completions(*ictx));
}
TEST_F(TestImageWatcher, RequestLockPostTryLockFailed) {
m_notify_acks = boost::assign::list_of(
std::make_pair(NOTIFY_OP_REQUEST_LOCK, bl));
+ m_expected_aio_restarts = 1;
{
RWLock::WLocker l(ictx->owner_lock);
ASSERT_EQ(0, ictx->image_watcher->request_lock(
- boost::bind(&TestImageWatcher::handle_restart_aio, this, _1),
+ boost::bind(&TestImageWatcher::handle_restart_aio, this, ictx, _1),
create_aio_completion(*ictx)));
}
ENCODE_FINISH(bl);
}
ASSERT_EQ(0, m_ioctx.notify2(ictx->header_oid, bl, 5000, NULL));
- ASSERT_TRUE(wait_for_aio_completions(*ictx, -EROFS));
+ ASSERT_TRUE(wait_for_aio_completions(*ictx));
}
TEST_F(TestImageWatcher, NotifyHeaderUpdate) {