}
}
-void AioImageRequestWQ::handle_releasing_lock() {
- assert(m_image_ctx.owner_lock.is_locked());
-
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << __func__ << ": ictx=" << &m_image_ctx << dendl;
-
- if (!m_blocking_writes) {
- m_blocking_writes = true;
- block_writes();
- }
-}
-
-void AioImageRequestWQ::handle_lock_updated(bool lock_supported,
- bool lock_owner) {
+void AioImageRequestWQ::handle_lock_updated(
+ ImageWatcher::LockUpdateState state) {
assert(m_image_ctx.owner_lock.is_locked());
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << __func__ << ": ictx=" << &m_image_ctx << ", "
- << "lock_support=" << lock_supported << ", "
- << "owner=" << lock_owner << dendl;
+ << "state=" << state << dendl;
- if ((!lock_supported || lock_owner) && m_blocking_writes) {
+ if ((state == ImageWatcher::LOCK_UPDATE_STATE_NOT_SUPPORTED ||
+ state == ImageWatcher::LOCK_UPDATE_STATE_LOCKED) && m_blocking_writes) {
m_blocking_writes = false;
unblock_writes();
- } else if (lock_supported && !lock_owner) {
+ } else if (state == ImageWatcher::LOCK_UPDATE_STATE_RELEASING &&
+ !m_blocking_writes) {
+ m_blocking_writes = true;
+ block_writes();
+ } else if (state == ImageWatcher::LOCK_UPDATE_STATE_UNLOCKED) {
+ assert(m_blocking_writes);
assert(writes_blocked());
- if (!writes_empty()) {
- m_image_ctx.image_watcher->request_lock();
- }
+ } else if (state == ImageWatcher::LOCK_UPDATE_STATE_NOTIFICATION &&
+ !writes_empty()) {
+ m_image_ctx.image_watcher->request_lock();
}
}
virtual bool handle_requested_lock() {
return true;
}
- virtual void handle_releasing_lock() {
- aio_work_queue->handle_releasing_lock();
- }
- virtual void handle_lock_updated(bool lock_supported, bool lock_owner) {
- aio_work_queue->handle_lock_updated(lock_supported, lock_owner);
+ virtual void handle_lock_updated(ImageWatcher::LockUpdateState state) {
+ aio_work_queue->handle_lock_updated(state);
}
};
bool is_lock_required() const;
void queue(AioImageRequest *req);
- void handle_releasing_lock();
- void handle_lock_updated(bool lock_supported, bool lock_owner);
+ void handle_lock_updated(ImageWatcher::LockUpdateState state);
};
} // namespace librbd
int r = 0;
if (lock_support_changed) {
- if (is_lock_supported() && !is_lock_owner()) {
+ if (is_lock_supported()) {
// image opened, exclusive lock dynamically enabled, or now HEAD
- notify_listeners_releasing_lock();
- } else if (!is_lock_supported() && is_lock_owner()) {
- // exclusive lock dynamically disabled or now snapshot
- m_image_ctx.owner_lock.put_read();
- {
- RWLock::WLocker owner_locker(m_image_ctx.owner_lock);
- r = release_lock();
+ notify_listeners_updated_lock(LOCK_UPDATE_STATE_RELEASING);
+ notify_listeners_updated_lock(LOCK_UPDATE_STATE_UNLOCKED);
+ } else if (!is_lock_supported()) {
+ if (is_lock_owner()) {
+ // exclusive lock dynamically disabled or now snapshot
+ m_image_ctx.owner_lock.put_read();
+ {
+ RWLock::WLocker owner_locker(m_image_ctx.owner_lock);
+ r = release_lock();
+ }
+ m_image_ctx.owner_lock.get_read();
}
- m_image_ctx.owner_lock.get_read();
+ notify_listeners_updated_lock(LOCK_UPDATE_STATE_NOT_SUPPORTED);
}
- notify_listeners_updated_lock();
}
return r;
}
// alert listeners that all incoming IO needs to be stopped since the
// lock is being released
- notify_listeners_releasing_lock();
+ notify_listeners_updated_lock(LOCK_UPDATE_STATE_RELEASING);
RWLock::WLocker md_locker(m_image_ctx.md_lock);
r = librbd::_flush(&m_image_ctx);
{
RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
- notify_listeners_updated_lock();
+ notify_listeners_updated_lock(LOCK_UPDATE_STATE_UNLOCKED);
}
}
m_image_ctx.owner_lock.get_write();
ldout(m_image_ctx.cct, 10) << this << " notify acquired lock" << dendl;
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
- notify_listeners_updated_lock();
+ if (m_lock_owner_state != LOCK_OWNER_STATE_LOCKED) {
+ return;
+ }
+
+ notify_listeners_updated_lock(LOCK_UPDATE_STATE_LOCKED);
bufferlist bl;
::encode(NotifyMessage(AcquiredLockPayload(get_client_id())), bl);
if (cancel_async_requests) {
schedule_cancel_async_requests();
}
- notify_listeners_updated_lock();
+ notify_listeners_updated_lock(LOCK_UPDATE_STATE_NOTIFICATION);
}
}
if (cancel_async_requests) {
schedule_cancel_async_requests();
}
- notify_listeners_updated_lock();
+ notify_listeners_updated_lock(LOCK_UPDATE_STATE_NOTIFICATION);
}
}
}
if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
- notify_listeners_updated_lock();
+ notify_listeners_updated_lock(LOCK_UPDATE_STATE_UNLOCKED);
}
}
m_image_watcher.schedule_async_complete(m_async_request_id, r);
}
-void ImageWatcher::notify_listeners_releasing_lock() {
- assert(m_image_ctx.owner_lock.is_locked());
-
- Listeners listeners;
- {
- Mutex::Locker listeners_locker(m_listeners_lock);
- m_listeners_in_use = true;
- listeners = m_listeners;
- }
-
- for (Listeners::iterator it = listeners.begin();
- it != listeners.end(); ++it) {
- (*it)->handle_releasing_lock();
- }
-
- Mutex::Locker listeners_locker(m_listeners_lock);
- m_listeners_in_use = false;
- m_listeners_cond.Signal();
-}
-
-void ImageWatcher::notify_listeners_updated_lock() {
+void ImageWatcher::notify_listeners_updated_lock(
+ LockUpdateState lock_update_state) {
assert(m_image_ctx.owner_lock.is_locked());
Listeners listeners;
listeners = m_listeners;
}
- bool lock_supported;
- {
- RWLock::RLocker watch_locker(m_watch_lock);
- lock_supported = m_lock_supported;
- }
-
- assert(lock_supported || m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED);
for (Listeners::iterator it = listeners.begin();
it != listeners.end(); ++it) {
- (*it)->handle_lock_updated(lock_supported,
- m_lock_owner_state == LOCK_OWNER_STATE_LOCKED);
+ (*it)->handle_lock_updated(lock_update_state);
}
Mutex::Locker listeners_locker(m_listeners_lock);
class ImageWatcher {
public:
+ enum LockUpdateState {
+ LOCK_UPDATE_STATE_NOT_SUPPORTED,
+ LOCK_UPDATE_STATE_LOCKED,
+ LOCK_UPDATE_STATE_RELEASING,
+ LOCK_UPDATE_STATE_UNLOCKED,
+ LOCK_UPDATE_STATE_NOTIFICATION
+ };
+
struct Listener {
virtual ~Listener() {}
virtual bool handle_requested_lock() = 0;
- virtual void handle_releasing_lock() = 0;
- virtual void handle_lock_updated(bool lock_supported, bool lock_owner) = 0;
+ virtual void handle_lock_updated(LockUpdateState lock_update_state) = 0;
};
ImageWatcher(ImageCtx& image_ctx);
void reregister_watch();
- void notify_listeners_releasing_lock();
- void notify_listeners_updated_lock();
+ void notify_listeners_updated_lock(LockUpdateState lock_update_state);
};
} // namespace librbd
: m_image_ctx(image_ctx), m_journaler(NULL),
m_lock("Journal::m_lock"), m_state(STATE_UNINITIALIZED),
m_lock_listener(this), m_replay_handler(this), m_close_pending(false),
- m_event_tid(0), m_blocking_writes(false), m_journal_replay(NULL) {
+ m_event_lock("Journal::m_event_lock"), m_event_tid(0),
+ m_blocking_writes(false), m_journal_replay(NULL) {
ldout(m_image_ctx.cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
}
Journal::~Journal() {
+ m_image_ctx.op_work_queue->drain();
assert(m_journaler == NULL);
assert(m_journal_replay == NULL);
!image_ctx.read_only && image_ctx.snap_id == CEPH_NOSNAP);
}
-bool Journal::is_journal_replaying() const {
- Mutex::Locker locker(m_lock);
- return (m_state == STATE_REPLAYING);
-}
-
int Journal::create(librados::IoCtx &io_ctx, const std::string &image_id) {
CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
return (m_state == STATE_RECORDING);
}
+bool Journal::is_journal_replaying() const {
+ Mutex::Locker locker(m_lock);
+ return (m_state == STATE_REPLAYING);
+}
+
+bool Journal::wait_for_journal_ready() {
+ Mutex::Locker locker(m_lock);
+ while (m_state != STATE_UNINITIALIZED && m_state != STATE_RECORDING) {
+ wait_for_state_transition();
+ }
+ return (m_state == STATE_RECORDING);
+}
+
void Journal::open() {
Mutex::Locker locker(m_lock);
if (m_journaler != NULL) {
m_close_pending = true;
wait_for_state_transition();
break;
+ case STATE_STOPPING_RECORDING:
+ wait_for_state_transition();
+ break;
case STATE_RECORDING:
r = stop_recording();
if (r < 0) {
uint64_t offset, size_t length,
bool flush_entry) {
assert(m_image_ctx.owner_lock.is_locked());
- assert(m_state == STATE_RECORDING);
bufferlist bl;
::encode(event_entry, bl);
- ::journal::Future future = m_journaler->append("", bl);
+ ::journal::Future future;
uint64_t tid;
{
Mutex::Locker locker(m_lock);
+ assert(m_state == STATE_RECORDING);
+
+ future = m_journaler->append("", bl);
+
+ Mutex::Locker event_locker(m_event_lock);
tid = ++m_event_tid;
assert(tid != 0);
ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
"r=" << r << dendl;
- Mutex::Locker locker(m_lock);
+ Mutex::Locker event_locker(m_event_lock);
Events::iterator it = m_events.find(tid);
if (it == m_events.end()) {
return;
<< "length=" << length << ", "
<< "r=" << r << dendl;
- Mutex::Locker locker(m_lock);
+ Mutex::Locker event_locker(m_event_lock);
Events::iterator it = m_events.find(tid);
if (it == m_events.end()) {
return;
::journal::Future future;
{
- Mutex::Locker locker(m_lock);
+ Mutex::Locker event_locker(m_event_lock);
future = wait_event(m_lock, tid, on_safe);
}
ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
<< "on_safe=" << on_safe << dendl;
- Mutex::Locker locker(m_lock);
+ Mutex::Locker event_locker(m_event_lock);
wait_event(m_lock, tid, on_safe);
}
::journal::Future Journal::wait_event(Mutex &lock, uint64_t tid,
Context *on_safe) {
- assert(m_lock.is_locked());
+ assert(m_event_lock.is_locked());
CephContext *cct = m_image_ctx.cct;
Events::iterator it = m_events.find(tid);
}
void Journal::complete_event(Events::iterator it, int r) {
- assert(m_lock.is_locked());
+ assert(m_event_lock.is_locked());
assert(m_state == STATE_RECORDING);
CephContext *cct = m_image_ctx.cct;
AioObjectRequests aio_object_requests;
Contexts on_safe_contexts;
{
- Mutex::Locker locker(m_lock);
+ Mutex::Locker event_locker(m_event_lock);
Events::iterator it = m_events.find(tid);
assert(it != m_events.end());
ldout(cct, 20) << this << " " << __func__ << ": " << "state=" << m_state
<< dendl;
- // prevent peers from taking our lock while we are replaying
+ // prevent peers from taking our lock while we are replaying since that
+ // will stale forward progress
return (m_state != STATE_INITIALIZING && m_state != STATE_REPLAYING);
}
-void Journal::handle_releasing_lock() {
- CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << this << " " << __func__ << dendl;
-
- Mutex::Locker locker(m_lock);
- if (m_state == STATE_INITIALIZING || m_state == STATE_REPLAYING) {
- // wait for replay to successfully interrupt
- m_close_pending = true;
- wait_for_state_transition();
- }
-
- if (m_state == STATE_UNINITIALIZED || m_state == STATE_RECORDING) {
- // prevent new write ops but allow pending ops to flush to the journal
- block_writes();
- }
-}
-
-void Journal::handle_lock_updated(bool lock_owner) {
+void Journal::handle_lock_updated(ImageWatcher::LockUpdateState state) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": "
- << "owner=" << lock_owner << dendl;
+ << "state=" << state << dendl;
Mutex::Locker locker(m_lock);
- if (lock_owner && m_state == STATE_UNINITIALIZED) {
+ if (state == ImageWatcher::LOCK_UPDATE_STATE_LOCKED &&
+ m_state == STATE_UNINITIALIZED) {
create_journaler();
- } else if (!lock_owner && m_state != STATE_UNINITIALIZED) {
+ } else if (state == ImageWatcher::LOCK_UPDATE_STATE_RELEASING) {
+ if (m_state == STATE_INITIALIZING || m_state == STATE_REPLAYING) {
+ // wait for replay to successfully interrupt
+ m_close_pending = true;
+ wait_for_state_transition();
+ }
+
+ if (m_state == STATE_UNINITIALIZED || m_state == STATE_RECORDING) {
+ // prevent new write ops but allow pending ops to flush to the journal
+ block_writes();
+ }
+ } else if ((state == ImageWatcher::LOCK_UPDATE_STATE_NOT_SUPPORTED ||
+ state == ImageWatcher::LOCK_UPDATE_STATE_UNLOCKED) &&
+ m_state != STATE_UNINITIALIZED &&
+ m_state != STATE_STOPPING_RECORDING) {
assert(m_state == STATE_RECORDING);
- assert(m_events.empty());
+ {
+ Mutex::Locker event_locker(m_event_lock);
+ assert(m_events.empty());
+ }
int r = stop_recording();
if (r < 0) {
assert(m_lock.is_locked());
assert(m_journaler != NULL);
- C_SaferCond cond;
- m_journaler->stop_append(&cond);
+ transition_state(STATE_STOPPING_RECORDING);
+ C_SaferCond cond;
m_lock.Unlock();
+ m_journaler->stop_append(&cond);
int r = cond.wait();
m_lock.Lock();
bool is_journal_ready() const;
bool is_journal_replaying() const;
+ bool wait_for_journal_ready();
+
void open();
int close();
STATE_INITIALIZING,
STATE_REPLAYING,
STATE_RECORDING,
+ STATE_STOPPING_RECORDING
};
struct Event {
virtual bool handle_requested_lock() {
return journal->handle_requested_lock();
}
- virtual void handle_releasing_lock() {
- journal->handle_releasing_lock();
- }
- virtual void handle_lock_updated(bool lock_supported, bool lock_owner) {
- journal->handle_lock_updated(lock_owner);
+ virtual void handle_lock_updated(ImageWatcher::LockUpdateState state) {
+ journal->handle_lock_updated(state);
}
};
ReplayHandler m_replay_handler;
bool m_close_pending;
+ Mutex m_event_lock;
uint64_t m_event_tid;
Events m_events;
void handle_event_safe(int r, uint64_t tid);
bool handle_requested_lock();
- void handle_releasing_lock();
- void handle_lock_updated(bool lock_owner);
+ void handle_lock_updated(ImageWatcher::LockUpdateState state);
int stop_recording();