#include "common/ceph_context.h"
#include "common/dout.h"
#include "common/errno.h"
+#include "common/perf_counters.h"
#include "common/WorkQueue.h"
#include "librbd/AioObjectRequest.h"
+#include "librbd/ImageCtx.h"
#include "librbd/internal.h"
#include "librbd/AioCompletion.h"
tracepoint(librbd, aio_complete_exit);
}
+ void AioCompletion::init_time(ImageCtx *i, aio_type_t t) {
+ if (ictx == NULL) {
+ ictx = i;
+ aio_type = t;
+ start_time = ceph_clock_now(ictx->cct);
+ }
+ }
+
+ void AioCompletion::start_op(ImageCtx *i, aio_type_t t) {
+ init_time(i, t);
+ if (!async_op.started()) {
+ async_op.start_op(*ictx);
+ }
+ }
+
void AioCompletion::fail(CephContext *cct, int r)
{
lderr(cct) << "AioCompletion::fail() " << this << ": " << cpp_strerror(r)
#include "common/Cond.h"
#include "common/Mutex.h"
-#include "common/ceph_context.h"
-#include "common/perf_counters.h"
#include "include/Context.h"
#include "include/utime.h"
#include "include/rbd/librbd.hpp"
#include "librbd/AsyncOperation.h"
-#include "librbd/ImageCtx.h"
-#include "librbd/internal.h"
#include "osdc/Striper.h"
+class CephContext;
+
namespace librbd {
class AioObjectRead;
void finish_adding_requests(CephContext *cct);
- void init_time(ImageCtx *i, aio_type_t t) {
- if (ictx == NULL) {
- ictx = i;
- aio_type = t;
- start_time = ceph_clock_now(ictx->cct);
- }
- }
- void start_op(ImageCtx *i, aio_type_t t) {
- init_time(i, t);
- if (!async_op.started()) {
- async_op.start_op(*ictx);
- }
- }
-
+ void init_time(ImageCtx *i, aio_type_t t);
+ void start_op(ImageCtx *i, aio_type_t t);
void fail(CephContext *cct, int r);
void complete(CephContext *cct);
#include "librbd/ImageCtx.h"
#include "librbd/ImageWatcher.h"
#include "librbd/internal.h"
+#include "librbd/Journal.h"
+#include "librbd/JournalTypes.h"
#include "include/rados/librados.hpp"
#include "osdc/Striper.h"
RWLock::RLocker md_locker(m_image_ctx.md_lock);
+ bool journaling = false;
+ uint64_t journal_tid = 0;
+
uint64_t clip_len = m_len;
+ ObjectExtents object_extents;
::SnapContext snapc;
{
// prevent image size from changing between computing clip and recording
}
snapc = m_image_ctx.snapc;
- m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_WRITE); // TODO use correct enum
+ m_aio_comp->start_op(&m_image_ctx, get_aio_type());
+
+ // map to object extents
+ if (clip_len > 0) {
+ Striper::file_to_extents(cct, m_image_ctx.format_string,
+ &m_image_ctx.layout, m_off, clip_len, 0,
+ object_extents);
+ }
+
+ journaling = (m_image_ctx.journal != NULL);
}
assert(!m_image_ctx.image_watcher->is_lock_supported() ||
m_image_ctx.image_watcher->is_lock_owner());
- // map to object extents
- ObjectExtents extents;
- if (clip_len > 0) {
- Striper::file_to_extents(cct, m_image_ctx.format_string,
- &m_image_ctx.layout, m_off, clip_len, 0, extents);
+ AioObjectRequests requests;
+ send_object_requests(object_extents, snapc, (journaling ? &requests : NULL));
+
+ if (journaling) {
+ // in-flight ops are flushed prior to closing the journal
+ assert(m_image_ctx.journal != NULL);
+ journal_tid = append_journal_event(requests, m_synchronous);
}
- send_object_requests(extents, snapc);
+ if (m_image_ctx.object_cacher != NULL) {
+ send_cache_requests(object_extents, snapc, journal_tid);
+ }
update_stats(clip_len);
m_aio_comp->finish_adding_requests(cct);
}
void AbstractAioImageWrite::send_object_requests(
- const ObjectExtents &object_extents, const ::SnapContext &snapc) {
+ const ObjectExtents &object_extents, const ::SnapContext &snapc,
+ AioObjectRequests *aio_object_requests) {
CephContext *cct = m_image_ctx.cct;
+
for (ObjectExtents::const_iterator p = object_extents.begin();
p != object_extents.end(); ++p) {
ldout(cct, 20) << " oid " << p->oid << " " << p->offset << "~" << p->length
<< " from " << p->buffer_extents << dendl;
- send_object_request(*p, snapc);
+ C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
+ AioObjectRequest *request = send_object_request(*p, snapc, req_comp);
+
+ // if journaling, stash the request for later; otherwise send
+ if (request != NULL) {
+ if (aio_object_requests != NULL) {
+ aio_object_requests->push_back(request);
+ } else {
+ request->send();
+ }
+ }
}
}
-void AioImageWrite::send_object_request(const ObjectExtent &object_extent,
- const ::SnapContext &snapc) {
- CephContext *cct = m_image_ctx.cct;
-
- // assemble extent
- bufferlist bl;
+void AioImageWrite::assemble_extent(const ObjectExtent &object_extent,
+ bufferlist *bl) {
for (Extents::const_iterator q = object_extent.buffer_extents.begin();
q != object_extent.buffer_extents.end(); ++q) {
- bl.append(m_buf + q->first, q->second);
+ bl->append(m_buf + q->first, q->second);;
}
+}
- C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
- if (m_image_ctx.object_cacher) {
+uint64_t AioImageWrite::append_journal_event(
+ const AioObjectRequests &requests, bool synchronous) {
+ bufferlist bl;
+ bl.append(m_buf, m_len);
+
+ journal::EventEntry event_entry(journal::AioWriteEvent(m_off, m_len, bl));
+ return m_image_ctx.journal->append_event(m_aio_comp, event_entry, requests,
+ synchronous);
+}
+
+void AioImageWrite::send_cache_requests(const ObjectExtents &object_extents,
+ const ::SnapContext &snapc,
+ uint64_t journal_tid) {
+ CephContext *cct = m_image_ctx.cct;
+
+ for (ObjectExtents::const_iterator p = object_extents.begin();
+ p != object_extents.end(); ++p) {
+ const ObjectExtent &object_extent = *p;
+
+ bufferlist bl;
+ assemble_extent(object_extent, &bl);
+
+ // TODO pass journal_tid to object cacher
+ C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
m_image_ctx.write_to_cache(object_extent.oid, bl, object_extent.length,
object_extent.offset, req_comp, m_op_flags);
- } else {
- AioObjectWrite *req = new AioObjectWrite(&m_image_ctx,
- object_extent.oid.name,
- object_extent.objectno,
- object_extent.offset, bl,
- snapc, req_comp);
-
- req->set_op_flags(m_op_flags);
- req->send();
}
}
+AioObjectRequest *AioImageWrite::send_object_request(
+ const ObjectExtent &object_extent, const ::SnapContext &snapc,
+ Context *on_finish) {
+ if (m_image_ctx.object_cacher != NULL) {
+ return NULL;
+ }
+
+ bufferlist bl;
+ assemble_extent(object_extent, &bl);
+ AioObjectWrite *req = new AioObjectWrite(&m_image_ctx,
+ object_extent.oid.name,
+ object_extent.objectno,
+ object_extent.offset, bl,
+ snapc, on_finish);
+ req->set_op_flags(m_op_flags);
+ return req;
+}
void AioImageWrite::update_stats(size_t length) {
m_image_ctx.perfcounter->inc(l_librbd_wr);
m_image_ctx.perfcounter->inc(l_librbd_wr_bytes, length);
}
-void AioImageDiscard::send_object_requests(const ObjectExtents &object_extents,
- const ::SnapContext &snapc) {
- // discard from the cache first to ensure writeback won't recreate
- if (m_image_ctx.object_cacher != NULL) {
- Mutex::Locker cache_locker(m_image_ctx.cache_lock);
- m_image_ctx.object_cacher->discard_set(m_image_ctx.object_set,
- object_extents);
- }
+uint64_t AioImageDiscard::append_journal_event(
+ const AioObjectRequests &requests, bool synchronous) {
+ journal::EventEntry event_entry(journal::AioDiscardEvent(m_off, m_len));
+ return m_image_ctx.journal->append_event(m_aio_comp, event_entry, requests,
+ synchronous);
+}
+
+void AioImageDiscard::send_cache_requests(const ObjectExtents &object_extents,
+ const ::SnapContext &snapc,
+ uint64_t journal_tid) {
+ // TODO need to have cache flag pending discard for writeback or need
+ // to delay cache update until after journal commits
+ Mutex::Locker cache_locker(m_image_ctx.cache_lock);
- AbstractAioImageWrite::send_object_requests(object_extents, snapc);
+ // TODO pass journal_tid to object cacher
+ m_image_ctx.object_cacher->discard_set(m_image_ctx.object_set,
+ object_extents);
}
-void AioImageDiscard::send_object_request(const ObjectExtent &object_extent,
- const ::SnapContext &snapc) {
+AioObjectRequest *AioImageDiscard::send_object_request(
+ const ObjectExtent &object_extent, const ::SnapContext &snapc,
+ Context *on_finish) {
CephContext *cct = m_image_ctx.cct;
- C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
-
AioObjectRequest *req;
if (object_extent.length == m_image_ctx.layout.fl_object_size) {
req = new AioObjectRemove(&m_image_ctx, object_extent.oid.name,
- object_extent.objectno, snapc, req_comp);
+ object_extent.objectno, snapc, on_finish);
} else if (object_extent.offset + object_extent.length ==
m_image_ctx.layout.fl_object_size) {
req = new AioObjectTruncate(&m_image_ctx, object_extent.oid.name,
object_extent.objectno, object_extent.offset,
- snapc, req_comp);
+ snapc, on_finish);
} else {
if(cct->_conf->rbd_skip_partial_discard) {
- delete req_comp;
- return;
+ delete on_finish;
+ return NULL;
}
req = new AioObjectZero(&m_image_ctx, object_extent.oid.name,
object_extent.objectno, object_extent.offset,
- object_extent.length, snapc, req_comp);
+ object_extent.length, snapc, on_finish);
}
- req->send();
+ return req;
}
void AioImageDiscard::update_stats(size_t length) {
void AioImageFlush::send_request() {
CephContext *cct = m_image_ctx.cct;
+ {
+ // journal the flush event
+ RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
+ if (m_image_ctx.journal != NULL) {
+ m_image_ctx.journal->append_event(
+ m_aio_comp, journal::EventEntry(journal::AioFlushEvent()),
+ AioObjectRequests(), true);
+ }
+ }
+
// TODO race condition between registering op and submitting to cache
// (might not be flushed -- backport needed)
C_AioRequest *flush_ctx = new C_AioRequest(cct, m_aio_comp);
#include "include/buffer.h"
#include "common/snap_types.h"
#include "osd/osd_types.h"
+#include "librbd/AioCompletion.h"
+#include <list>
#include <utility>
#include <vector>
namespace librbd {
-class AioCompletion;
+class AioObjectRequest;
class ImageCtx;
class AioImageRequest {
void send();
protected:
+ typedef std::list<AioObjectRequest *> AioObjectRequests;
+
ImageCtx &m_image_ctx;
AioCompletion *m_aio_comp;
return true;
}
+ inline void flag_synchronous() {
+ m_synchronous = true;
+ }
+
protected:
typedef std::vector<ObjectExtent> ObjectExtents;
+ const uint64_t m_off;
+ const size_t m_len;
+
AbstractAioImageWrite(ImageCtx &image_ctx, AioCompletion *aio_comp,
uint64_t off, size_t len)
- : AioImageRequest(image_ctx, aio_comp), m_off(off), m_len(len) {
+ : AioImageRequest(image_ctx, aio_comp), m_off(off), m_len(len),
+ m_synchronous(false) {
}
+ virtual aio_type_t get_aio_type() const = 0;
+
virtual void send_request();
- virtual void send_object_requests(const ObjectExtents &object_extents,
- const ::SnapContext &snapc);
- virtual void send_object_request(const ObjectExtent &object_extent,
- const ::SnapContext &snapc) = 0;
+ virtual void send_cache_requests(const ObjectExtents &object_extents,
+ const ::SnapContext &snapc,
+ uint64_t journal_tid) = 0;
+
+ void send_object_requests(const ObjectExtents &object_extents,
+ const ::SnapContext &snapc,
+ AioObjectRequests *aio_object_requests);
+ virtual AioObjectRequest *send_object_request(
+ const ObjectExtent &object_extent, const ::SnapContext &snapc,
+ Context *on_finish) = 0;
+
+ virtual uint64_t append_journal_event(const AioObjectRequests &requests,
+ bool synchronous) = 0;
virtual void update_stats(size_t length) = 0;
private:
- uint64_t m_off;
- size_t m_len;
+ bool m_synchronous;
};
class AioImageWrite : public AbstractAioImageWrite {
}
protected:
+ virtual aio_type_t get_aio_type() const {
+ return AIO_TYPE_WRITE;
+ }
virtual const char *get_request_type() const {
return "aio_write";
}
- virtual void send_object_request(const ObjectExtent &object_extent,
- const ::SnapContext &snapc);
+ void assemble_extent(const ObjectExtent &object_extent, bufferlist *bl);
+
+ virtual void send_cache_requests(const ObjectExtents &object_extents,
+ const ::SnapContext &snapc,
+ uint64_t journal_tid);
+
+ virtual AioObjectRequest *send_object_request(
+ const ObjectExtent &object_extent, const ::SnapContext &snapc,
+ Context *on_finish);
+
+ virtual uint64_t append_journal_event(const AioObjectRequests &requests,
+ bool synchronous);
virtual void update_stats(size_t length);
private:
const char *m_buf;
}
protected:
+ virtual aio_type_t get_aio_type() const {
+ return AIO_TYPE_DISCARD;
+ }
virtual const char *get_request_type() const {
return "aio_discard";
}
- virtual void send_object_requests(const ObjectExtents &object_extents,
- const ::SnapContext &snapc);
- virtual void send_object_request(const ObjectExtent &object_extent,
- const ::SnapContext &snapc);
+ virtual void send_cache_requests(const ObjectExtents &object_extents,
+ const ::SnapContext &snapc,
+ uint64_t journal_tid);
+
+ virtual AioObjectRequest *send_object_request(
+ const ObjectExtent &object_extent, const ::SnapContext &snapc,
+ Context *on_finish);
+
+ virtual uint64_t append_journal_event(const AioObjectRequests &requests,
+ bool synchronous);
virtual void update_stats(size_t length);
};
#include "librbd/AioCompletion.h"
#include "librbd/AioImageRequest.h"
#include "librbd/ImageCtx.h"
-#include "librbd/ImageWatcher.h"
#include "librbd/internal.h"
#define dout_subsys ceph_subsys_rbd
namespace librbd {
+AioImageRequestWQ::AioImageRequestWQ(ImageCtx *image_ctx, const string &name,
+ time_t ti, ThreadPool *tp)
+ : ThreadPool::PointerWQ<AioImageRequest>(name, ti, 0, tp),
+ m_image_ctx(*image_ctx), m_lock("AioImageRequestWQ::m_lock"),
+ m_write_blockers(0), m_in_progress_writes(0), m_queued_writes(0),
+ m_lock_listener(this), m_blocking_writes(false) {
+
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 5) << this << " " << ": ictx=" << image_ctx << dendl;
+}
+
ssize_t AioImageRequestWQ::read(uint64_t off, size_t len, char *buf,
int op_flags) {
CephContext *cct = m_image_ctx.cct;
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
if (m_image_ctx.non_blocking_aio) {
- queue(new AioImageRead(m_image_ctx, c, off, len, buf, pbl, op_flags),
- false);
+ queue(new AioImageRead(m_image_ctx, c, off, len, buf, pbl, op_flags));
} else {
AioImageRequest::aio_read(&m_image_ctx, c, off, len, buf, pbl, op_flags);
}
<< "len=" << len << ", flags=" << op_flags << dendl;
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
- bool lock_required = is_lock_required();
- if (m_image_ctx.non_blocking_aio || lock_required) {
- queue(new AioImageWrite(m_image_ctx, c, off, len, buf, op_flags),
- lock_required);
+ if (m_image_ctx.non_blocking_aio || is_journal_required() ||
+ writes_blocked()) {
+ queue(new AioImageWrite(m_image_ctx, c, off, len, buf, op_flags));
} else {
AioImageRequest::aio_write(&m_image_ctx, c, off, len, buf, op_flags);
}
<< dendl;
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
- bool lock_required = is_lock_required();
- if (m_image_ctx.non_blocking_aio || lock_required) {
- queue(new AioImageDiscard(m_image_ctx, c, off, len),
- lock_required);
+ if (m_image_ctx.non_blocking_aio || is_journal_required() ||
+ writes_blocked()) {
+ queue(new AioImageDiscard(m_image_ctx, c, off, len));
} else {
AioImageRequest::aio_discard(&m_image_ctx, c, off, len);
}
<< "completion=" << c << dendl;
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
- if (m_image_ctx.non_blocking_aio || !writes_empty()) {
- queue(new AioImageFlush(m_image_ctx, c), false);
+ if (m_image_ctx.non_blocking_aio || is_journal_required() ||
+ writes_blocked() || !writes_empty()) {
+ queue(new AioImageFlush(m_image_ctx, c));
} else {
AioImageRequest::aio_flush(&m_image_ctx, c);
}
}
-void AioImageRequestWQ::suspend_writes() {
+void AioImageRequestWQ::block_writes() {
CephContext *cct = m_image_ctx.cct;
- ldout(cct, 5) << __func__ << ": " << &m_image_ctx << dendl;
Mutex::Locker locker(m_lock);
- m_writes_suspended = true;
- while (m_in_progress_writes > 0) {
- m_cond.Wait(m_lock);
+ ++m_write_blockers;
+ ldout(cct, 5) << __func__ << ": " << &m_image_ctx << ", "
+ << "num=" << m_write_blockers << dendl;
+ if (m_write_blockers == 1) {
+ while (m_in_progress_writes > 0) {
+ m_cond.Wait(m_lock);
+ }
}
}
-void AioImageRequestWQ::resume_writes() {
+void AioImageRequestWQ::unblock_writes() {
CephContext *cct = m_image_ctx.cct;
- ldout(cct, 5) << __func__ << ": " << &m_image_ctx << dendl;
+ bool wake_up = false;
{
Mutex::Locker locker(m_lock);
- m_writes_suspended = false;
+ assert(m_write_blockers > 0);
+ --m_write_blockers;
+
+ ldout(cct, 5) << __func__ << ": " << &m_image_ctx << ", "
+ << "num=" << m_write_blockers << dendl;
+ if (m_write_blockers == 0) {
+ wake_up = true;
+ }
}
- signal();
+
+ if (wake_up) {
+ signal();
+ }
+}
+
+void AioImageRequestWQ::register_lock_listener() {
+ m_image_ctx.image_watcher->register_listener(&m_lock_listener);
}
void *AioImageRequestWQ::_void_dequeue() {
{
if (peek_item->is_write_op()) {
Mutex::Locker locker(m_lock);
- if (m_writes_suspended) {
+ if (m_write_blockers > 0) {
return NULL;
}
++m_in_progress_writes;
Mutex::Locker locker(m_lock);
if (req->is_write_op()) {
assert(m_queued_writes > 0);
- if (--m_queued_writes == 0) {
- m_image_ctx.image_watcher->clear_aio_ops_pending();
- }
+ --m_queued_writes;
assert(m_in_progress_writes > 0);
if (--m_in_progress_writes == 0) {
delete req;
}
-bool AioImageRequestWQ::is_lock_required() {
+bool AioImageRequestWQ::is_journal_required() const {
+ RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
+ return (m_image_ctx.journal != NULL);
+}
+
+bool AioImageRequestWQ::is_lock_required() const {
assert(m_image_ctx.owner_lock.is_locked());
if (m_image_ctx.image_watcher == NULL) {
return false;
}
+
return (m_image_ctx.image_watcher->is_lock_supported() &&
!m_image_ctx.image_watcher->is_lock_owner());
}
-void AioImageRequestWQ::queue(AioImageRequest *req, bool lock_required) {
+void AioImageRequestWQ::queue(AioImageRequest *req) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << __func__ << ": ictx=" << &m_image_ctx << ", "
- << "req=" << req << ", lock_req=" << lock_required << dendl;
+ << "req=" << req << dendl;
assert(m_image_ctx.owner_lock.is_locked());
}
ThreadPool::PointerWQ<AioImageRequest>::queue(req);
- if (first_write_op) {
- m_image_ctx.image_watcher->flag_aio_ops_pending();
- if (lock_required) {
+ if (is_lock_required() && first_write_op) {
+ m_image_ctx.image_watcher->request_lock();
+ }
+}
+
+void AioImageRequestWQ::handle_releasing_lock() {
+ assert(m_image_ctx.owner_lock.is_locked());
+
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << __func__ << ": ictx=" << &m_image_ctx << dendl;
+
+ if (!m_blocking_writes) {
+ m_blocking_writes = true;
+ block_writes();
+ }
+}
+
+void AioImageRequestWQ::handle_lock_updated(bool lock_supported,
+ bool lock_owner) {
+ assert(m_image_ctx.owner_lock.is_locked());
+
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << __func__ << ": ictx=" << &m_image_ctx << ", "
+ << "lock_support=" << lock_supported << ", "
+ << "owner=" << lock_owner << dendl;
+
+ if ((!lock_supported || lock_owner) && m_blocking_writes) {
+ m_blocking_writes = false;
+ unblock_writes();
+ } else if (lock_supported && !lock_owner) {
+ assert(writes_blocked());
+ if (!writes_empty()) {
m_image_ctx.image_watcher->request_lock();
}
}
#ifndef CEPH_LIBRBD_AIO_IMAGE_REQUEST_WQ_H
#define CEPH_LIBRBD_AIO_IMAGE_REQUEST_WQ_H
+#include "include/Context.h"
#include "common/WorkQueue.h"
#include "common/Mutex.h"
+#include "librbd/ImageWatcher.h"
namespace librbd {
class AioImageRequestWQ : protected ThreadPool::PointerWQ<AioImageRequest> {
public:
AioImageRequestWQ(ImageCtx *image_ctx, const string &name, time_t ti,
- ThreadPool *tp)
- : ThreadPool::PointerWQ<AioImageRequest>(name, ti, 0, tp),
- m_image_ctx(*image_ctx), m_lock("AioImageRequestWQ::m_lock"),
- m_writes_suspended(false), m_in_progress_writes(0), m_queued_writes(0) {
- }
+ ThreadPool *tp);
ssize_t read(uint64_t off, size_t len, char *buf, int op_flags);
ssize_t write(uint64_t off, size_t len, const char *buf, int op_flags);
return (m_queued_writes == 0);
}
- inline bool writes_suspended() const {
+ inline bool writes_blocked() const {
Mutex::Locker locker(m_lock);
- return m_writes_suspended;
+ return (m_write_blockers > 0);
}
- void suspend_writes();
- void resume_writes();
+ void block_writes();
+ void unblock_writes();
+
+ void register_lock_listener();
protected:
virtual void *_void_dequeue();
virtual void process(AioImageRequest *req);
private:
+ struct LockListener : public ImageWatcher::Listener {
+ AioImageRequestWQ *aio_work_queue;
+ LockListener(AioImageRequestWQ *_aio_work_queue)
+ : aio_work_queue(_aio_work_queue) {
+ }
+
+ virtual bool handle_requested_lock() {
+ return true;
+ }
+ virtual void handle_releasing_lock() {
+ aio_work_queue->handle_releasing_lock();
+ }
+ virtual void handle_lock_updated(bool lock_supported, bool lock_owner) {
+ aio_work_queue->handle_lock_updated(lock_supported, lock_owner);
+ }
+ };
+
ImageCtx &m_image_ctx;
mutable Mutex m_lock;
Cond m_cond;
- bool m_writes_suspended;
+ uint32_t m_write_blockers;
uint32_t m_in_progress_writes;
uint32_t m_queued_writes;
- bool is_lock_required();
- void queue(AioImageRequest *req, bool lock_required);
+ LockListener m_lock_listener;
+ bool m_blocking_writes;
+
+ bool is_journal_required() const;
+ bool is_lock_required() const;
+ void queue(AioImageRequest *req);
+
+ void handle_releasing_lock();
+ void handle_lock_updated(bool lock_supported, bool lock_owner);
};
} // namespace librbd
#include "librbd/CopyupRequest.h"
#include "librbd/ImageCtx.h"
#include "librbd/ImageWatcher.h"
+#include "librbd/internal.h"
#include "librbd/ObjectMap.h"
#include <boost/bind.hpp>
#include "librbd/internal.h"
#include "librbd/ImageCtx.h"
#include "librbd/ImageWatcher.h"
+#include "librbd/Journal.h"
#include "librbd/LibrbdAdminSocketHook.h"
#include "librbd/ObjectMap.h"
exclusive_locked(false),
name(image_name),
image_watcher(NULL),
+ journal(NULL),
refresh_seq(0),
last_refresh(0),
owner_lock(unique_lock_name("librbd::ImageCtx::owner_lock", this)),
}
ImageCtx::~ImageCtx() {
+ assert(journal == NULL);
if (perfcounter) {
perf_stop();
}
int ImageCtx::register_watch() {
assert(image_watcher == NULL);
image_watcher = new ImageWatcher(*this);
+ aio_work_queue->register_lock_listener();
return image_watcher->register_watch();
}
ASSIGN_OPTION(request_timed_out_seconds);
ASSIGN_OPTION(enable_alloc_hint);
}
+
+ void ImageCtx::open_journal() {
+ assert(journal == NULL);
+ journal = new Journal(*this);
+ }
+
+ int ImageCtx::close_journal(bool force) {
+ assert(journal != NULL);
+ int r = journal->close();
+ if (r < 0) {
+ lderr(cct) << "failed to flush journal: " << cpp_strerror(r) << dendl;
+ if (!force) {
+ return r;
+ }
+ }
+
+ delete journal;
+ journal = NULL;
+ return r;
+ }
}
class CopyupRequest;
class LibrbdAdminSocketHook;
class ImageWatcher;
+ class Journal;
struct ImageCtx {
CephContext *cct;
std::string snap_name;
IoCtx data_ctx, md_ctx;
ImageWatcher *image_watcher;
+ Journal *journal;
int refresh_seq; ///< sequence for refresh requests
int last_refresh; ///< last completed refresh
void cancel_async_requests();
void apply_metadata_confs();
+
+ void open_journal();
+ int close_journal(bool force);
};
}
// vim: ts=8 sw=2 smarttab
#include "librbd/ImageWatcher.h"
#include "librbd/AioCompletion.h"
-#include "librbd/AioImageRequestWQ.h"
#include "librbd/ImageCtx.h"
#include "librbd/internal.h"
#include "librbd/ObjectMap.h"
: m_image_ctx(image_ctx),
m_watch_lock(unique_lock_name("librbd::ImageWatcher::m_watch_lock", this)),
m_watch_ctx(*this), m_watch_handle(0),
- m_watch_state(WATCH_STATE_UNREGISTERED), m_aio_ops_pending(false),
+ m_watch_state(WATCH_STATE_UNREGISTERED), m_lock_supported(false),
m_lock_owner_state(LOCK_OWNER_STATE_NOT_LOCKED),
+ m_listeners_lock(unique_lock_name("librbd::ImageWatcher::m_listeners_lock", this)),
+ m_listeners_in_use(false),
m_task_finisher(new TaskFinisher<Task>(*m_image_ctx.cct)),
m_async_request_lock(unique_lock_name("librbd::ImageWatcher::m_async_request_lock", this)),
m_owner_client_id_lock(unique_lock_name("librbd::ImageWatcher::m_owner_client_id_lock", this))
m_lock_owner_state == LOCK_OWNER_STATE_RELEASING);
}
+void ImageWatcher::register_listener(Listener *listener) {
+ Mutex::Locker listeners_locker(m_listeners_lock);
+ m_listeners.push_back(listener);
+}
+
+void ImageWatcher::unregister_listener(Listener *listener) {
+ // TODO CoW listener list
+ Mutex::Locker listeners_locker(m_listeners_lock);
+ while (m_listeners_in_use) {
+ m_listeners_cond.Wait(m_listeners_lock);
+ }
+ m_listeners.remove(listener);
+}
+
int ImageWatcher::register_watch() {
ldout(m_image_ctx.cct, 10) << this << " registering image watcher" << dendl;
return r;
}
-void ImageWatcher::refresh() {
+int ImageWatcher::refresh() {
assert(m_image_ctx.owner_lock.is_locked());
- if (is_lock_supported() && !is_lock_owner()) {
- m_image_ctx.aio_work_queue->suspend_writes();
- } else if (!is_lock_supported()) {
- m_image_ctx.aio_work_queue->resume_writes();
+ bool lock_support_changed = false;
+ {
+ RWLock::WLocker watch_locker(m_watch_lock);
+ if (m_lock_supported != is_lock_supported()) {
+ m_lock_supported = is_lock_supported();
+ lock_support_changed = true;
+ }
+ }
+
+ int r = 0;
+ if (lock_support_changed) {
+ if (is_lock_supported() && !is_lock_owner()) {
+ // image opened, exclusive lock dynamically enabled, or now HEAD
+ notify_listeners_releasing_lock();
+ } else if (!is_lock_supported() && is_lock_owner()) {
+ // exclusive lock dynamically disabled or now snapshot
+ m_image_ctx.owner_lock.put_read();
+ {
+ RWLock::WLocker owner_locker(m_image_ctx.owner_lock);
+ r = release_lock();
+ }
+ m_image_ctx.owner_lock.get_read();
+ }
+ notify_listeners_updated_lock();
}
+ return r;
}
int ImageWatcher::try_lock() {
assert(m_image_ctx.owner_lock.is_wlocked());
assert(m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED);
+ assert(is_lock_supported());
while (true) {
int r = lock();
}
void ImageWatcher::request_lock() {
- {
- RWLock::WLocker watch_locker(m_watch_lock);
- m_aio_ops_pending = true;
- }
schedule_request_lock(false);
}
}
int ImageWatcher::lock() {
+ assert(m_image_ctx.owner_lock.is_wlocked());
+ assert(m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED);
+
int r = rados::cls::lock::lock(&m_image_ctx.md_ctx, m_image_ctx.header_oid,
RBD_LOCK_NAME, LOCK_EXCLUSIVE,
encode_lock_cookie(), WATCHER_LOCK_TAG, "",
m_image_ctx.object_map.refresh(CEPH_NOSNAP);
}
- bufferlist bl;
- ::encode(NotifyMessage(AcquiredLockPayload(get_client_id())), bl);
-
- m_image_ctx.aio_work_queue->resume_writes();
-
// send the notification when we aren't holding locks
FunctionContext *ctx = new FunctionContext(
- boost::bind(&IoCtx::notify2, &m_image_ctx.md_ctx, m_image_ctx.header_oid,
- bl, NOTIFY_TIMEOUT, reinterpret_cast<bufferlist *>(0)));
+ boost::bind(&ImageWatcher::notify_acquired_lock, this));
m_task_finisher->queue(TASK_CODE_ACQUIRED_LOCK, ctx);
return 0;
}
-void ImageWatcher::prepare_unlock() {
- assert(m_image_ctx.owner_lock.is_wlocked());
- if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) {
- m_lock_owner_state = LOCK_OWNER_STATE_RELEASING;
- }
-}
-
-void ImageWatcher::cancel_unlock() {
- assert(m_image_ctx.owner_lock.is_wlocked());
- if (m_lock_owner_state == LOCK_OWNER_STATE_RELEASING) {
- m_lock_owner_state = LOCK_OWNER_STATE_LOCKED;
- }
-}
-
int ImageWatcher::unlock()
{
assert(m_image_ctx.owner_lock.is_wlocked());
- if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
- return 0;
- }
ldout(m_image_ctx.cct, 10) << this << " releasing exclusive lock" << dendl;
m_lock_owner_state = LOCK_OWNER_STATE_NOT_LOCKED;
m_image_ctx.object_map.unlock();
}
- if (is_lock_supported()) {
- m_image_ctx.aio_work_queue->suspend_writes();
+ {
+ Mutex::Locker l(m_owner_client_id_lock);
+ set_owner_client_id(ClientId());
}
- Mutex::Locker l(m_owner_client_id_lock);
- set_owner_client_id(ClientId());
-
FunctionContext *ctx = new FunctionContext(
boost::bind(&ImageWatcher::notify_released_lock, this));
m_task_finisher->queue(TASK_CODE_RELEASED_LOCK, ctx);
return 0;
}
-bool ImageWatcher::release_lock()
+int ImageWatcher::release_lock()
{
assert(m_image_ctx.owner_lock.is_wlocked());
- ldout(m_image_ctx.cct, 10) << this << " releasing exclusive lock by request"
- << dendl;
- if (!is_lock_owner()) {
- return false;
+
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 10) << this << " releasing exclusive lock by request" << dendl;
+ if (m_lock_owner_state != LOCK_OWNER_STATE_LOCKED) {
+ return 0;
}
- prepare_unlock();
+
+ m_lock_owner_state = LOCK_OWNER_STATE_RELEASING;
m_image_ctx.owner_lock.put_write();
+ // ensure all maint operations are canceled
m_image_ctx.cancel_async_requests();
m_image_ctx.flush_async_operations();
- m_image_ctx.aio_work_queue->suspend_writes();
+ int r;
{
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+
+ // alert listeners that all incoming IO needs to be stopped since the
+ // lock is being released
+ notify_listeners_releasing_lock();
+
RWLock::WLocker md_locker(m_image_ctx.md_lock);
- librbd::_flush(&m_image_ctx);
+ r = librbd::_flush(&m_image_ctx);
+ if (r < 0) {
+ lderr(cct) << this << " failed to flush: " << cpp_strerror(r) << dendl;
+ goto err_cancel_unlock;
+ }
}
m_image_ctx.owner_lock.get_write();
- if (!is_lock_owner()) {
- return false;
- }
+ assert(m_lock_owner_state == LOCK_OWNER_STATE_RELEASING);
+ r = unlock();
- unlock();
- return true;
-}
+ // notify listeners of the change w/ owner read locked
+ m_image_ctx.owner_lock.put_write();
+ {
+ RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
+ if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
+ notify_listeners_updated_lock();
+ }
+ }
+ m_image_ctx.owner_lock.get_write();
-void ImageWatcher::flag_aio_ops_pending() {
- RWLock::WLocker watch_locker(m_watch_lock);
- if (!m_aio_ops_pending) {
- ldout(m_image_ctx.cct, 20) << this << " pending AIO ops" << dendl;
- m_aio_ops_pending = true;
+ if (r < 0) {
+ lderr(cct) << this << " failed to unlock: " << cpp_strerror(r) << dendl;
+ return r;
}
-}
-void ImageWatcher::clear_aio_ops_pending() {
- RWLock::WLocker watch_locker(m_watch_lock);
- if (m_aio_ops_pending) {
- ldout(m_image_ctx.cct, 20) << this << " no pending AIO ops" << dendl;
- m_aio_ops_pending = false;
+ return 0;
+
+err_cancel_unlock:
+ m_image_ctx.owner_lock.get_write();
+ if (m_lock_owner_state == LOCK_OWNER_STATE_RELEASING) {
+ m_lock_owner_state = LOCK_OWNER_STATE_LOCKED;
}
+ return r;
}
void ImageWatcher::assert_header_locked(librados::ObjectWriteOperation *op) {
return notify_async_request(async_request_id, bl, prog_ctx);
}
+void ImageWatcher::notify_lock_state() {
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+ if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) {
+ // re-send the acquired lock notification so that peers know they can now
+ // request the lock
+ ldout(m_image_ctx.cct, 10) << this << " notify lock state" << dendl;
+
+ bufferlist bl;
+ ::encode(NotifyMessage(AcquiredLockPayload(get_client_id())), bl);
+
+ m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT,
+ NULL);
+ }
+}
+
void ImageWatcher::notify_header_update(librados::IoCtx &io_ctx,
const std::string &oid)
{
return ClientId(m_image_ctx.md_ctx.get_instance_id(), m_watch_handle);
}
+void ImageWatcher::notify_acquired_lock() {
+ ldout(m_image_ctx.cct, 10) << this << " notify acquired lock" << dendl;
+
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+ notify_listeners_updated_lock();
+
+ bufferlist bl;
+ ::encode(NotifyMessage(AcquiredLockPayload(get_client_id())), bl);
+ m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL);
+}
+
void ImageWatcher::notify_release_lock() {
RWLock::WLocker owner_locker(m_image_ctx.owner_lock);
release_lock();
assert(m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED);
RWLock::RLocker watch_locker(m_watch_lock);
- if (m_watch_state == WATCH_STATE_REGISTERED && m_aio_ops_pending) {
+ if (m_watch_state == WATCH_STATE_REGISTERED) {
ldout(m_image_ctx.cct, 15) << this << " requesting exclusive lock" << dendl;
FunctionContext *ctx = new FunctionContext(
bufferlist *out) {
ldout(m_image_ctx.cct, 10) << this << " image exclusively locked announcement"
<< dendl;
+
+ bool cancel_async_requests = true;
if (payload.client_id.is_valid()) {
Mutex::Locker l(m_owner_client_id_lock);
if (payload.client_id == m_owner_client_id) {
- // we already know that the remote client is the owner
- return;
+ cancel_async_requests = false;
}
set_owner_client_id(payload.client_id);
}
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
- schedule_cancel_async_requests();
- schedule_request_lock(false);
+ if (cancel_async_requests) {
+ schedule_cancel_async_requests();
+ }
+ notify_listeners_updated_lock();
}
}
void ImageWatcher::handle_payload(const ReleasedLockPayload &payload,
bufferlist *out) {
ldout(m_image_ctx.cct, 10) << this << " exclusive lock released" << dendl;
+
+ bool cancel_async_requests = true;
if (payload.client_id.is_valid()) {
Mutex::Locker l(m_owner_client_id_lock);
if (payload.client_id != m_owner_client_id) {
ldout(m_image_ctx.cct, 10) << this << " unexpected owner: "
<< payload.client_id << " != "
<< m_owner_client_id << dendl;
- return;
+ cancel_async_requests = false;
+ } else {
+ set_owner_client_id(ClientId());
}
- set_owner_client_id(ClientId());
}
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
- schedule_cancel_async_requests();
- schedule_request_lock(false);
+ if (cancel_async_requests) {
+ schedule_cancel_async_requests();
+ }
+ notify_listeners_updated_lock();
}
}
}
}
- ldout(m_image_ctx.cct, 10) << this << " queuing release of exclusive lock"
- << dendl;
- FunctionContext *ctx = new FunctionContext(
- boost::bind(&ImageWatcher::notify_release_lock, this));
- m_task_finisher->queue(TASK_CODE_RELEASING_LOCK, ctx);
+ bool release_permitted = true;
+ {
+ Mutex::Locker listeners_locker(m_listeners_lock);
+ for (Listeners::iterator it = m_listeners.begin();
+ it != m_listeners.end(); ++it) {
+ if (!(*it)->handle_requested_lock()) {
+ release_permitted = false;
+ break;
+ }
+ }
+ }
+
+ if (release_permitted) {
+ ldout(m_image_ctx.cct, 10) << this << " queuing release of exclusive lock"
+ << dendl;
+ FunctionContext *ctx = new FunctionContext(
+ boost::bind(&ImageWatcher::notify_release_lock, this));
+ m_task_finisher->queue(TASK_CODE_RELEASING_LOCK, ctx);
+ }
}
}
}
if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
- schedule_request_lock(false);
+ notify_listeners_updated_lock();
}
}
m_image_watcher.schedule_async_complete(m_async_request_id, r);
}
+void ImageWatcher::notify_listeners_releasing_lock() {
+ assert(m_image_ctx.owner_lock.is_locked());
+
+ Listeners listeners;
+ {
+ Mutex::Locker listeners_locker(m_listeners_lock);
+ m_listeners_in_use = true;
+ listeners = m_listeners;
+ }
+
+ for (Listeners::iterator it = listeners.begin();
+ it != listeners.end(); ++it) {
+ (*it)->handle_releasing_lock();
+ }
+
+ Mutex::Locker listeners_locker(m_listeners_lock);
+ m_listeners_in_use = false;
+ m_listeners_cond.Signal();
+}
+
+void ImageWatcher::notify_listeners_updated_lock() {
+ assert(m_image_ctx.owner_lock.is_locked());
+
+ Listeners listeners;
+ {
+ Mutex::Locker listeners_locker(m_listeners_lock);
+ m_listeners_in_use = true;
+ listeners = m_listeners;
+ }
+
+ bool lock_supported;
+ {
+ RWLock::RLocker watch_locker(m_watch_lock);
+ lock_supported = m_lock_supported;
+ }
+
+ assert(lock_supported || m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED);
+ for (Listeners::iterator it = listeners.begin();
+ it != listeners.end(); ++it) {
+ (*it)->handle_lock_updated(lock_supported,
+ m_lock_owner_state == LOCK_OWNER_STATE_LOCKED);
+ }
+
+ Mutex::Locker listeners_locker(m_listeners_lock);
+ m_listeners_in_use = false;
+ m_listeners_cond.Signal();
+}
+
}
#ifndef CEPH_LIBRBD_IMAGE_WATCHER_H
#define CEPH_LIBRBD_IMAGE_WATCHER_H
+#include "common/Cond.h"
#include "common/Mutex.h"
#include "common/RWLock.h"
#include "include/Context.h"
class ImageWatcher {
public:
+ struct Listener {
+ virtual ~Listener() {}
+
+ virtual bool handle_requested_lock() = 0;
+ virtual void handle_releasing_lock() = 0;
+ virtual void handle_lock_updated(bool lock_supported, bool lock_owner) = 0;
+ };
ImageWatcher(ImageCtx& image_ctx);
~ImageWatcher();
bool is_lock_supported(const RWLock &snap_lock) const;
bool is_lock_owner() const;
+ void register_listener(Listener *listener);
+ void unregister_listener(Listener *listener);
+
int register_watch();
int unregister_watch();
- void refresh();
+ int refresh();
int try_lock();
void request_lock();
- void prepare_unlock();
- void cancel_unlock();
- int unlock();
-
- void flag_aio_ops_pending();
- void clear_aio_ops_pending();
+ int release_lock();
void assert_header_locked(librados::ObjectWriteOperation *op);
int notify_rebuild_object_map(uint64_t request_id,
ProgressContext &prog_ctx);
+ void notify_lock_state();
static void notify_header_update(librados::IoCtx &io_ctx,
const std::string &oid);
TASK_CODE_ASYNC_PROGRESS
};
+ typedef std::list<Listener *> Listeners;
typedef std::pair<Context *, ProgressContext *> AsyncRequest;
class Task {
WatchCtx m_watch_ctx;
uint64_t m_watch_handle;
WatchState m_watch_state;
- bool m_aio_ops_pending;
+
+ bool m_lock_supported;
LockOwnerState m_lock_owner_state;
+ Mutex m_listeners_lock;
+ Cond m_listeners_cond;
+ Listeners m_listeners;
+ bool m_listeners_in_use;
+
TaskFinisher<Task> *m_task_finisher;
RWLock m_async_request_lock;
int get_lock_owner_info(entity_name_t *locker, std::string *cookie,
std::string *address, uint64_t *handle);
int lock();
- bool release_lock();
+ int unlock();
bool try_request_lock();
void schedule_cancel_async_requests();
void set_owner_client_id(const WatchNotify::ClientId &client_id);
WatchNotify::ClientId get_client_id();
+ void notify_acquired_lock();
void notify_release_lock();
void notify_released_lock();
void acknowledge_notify(uint64_t notify_id, uint64_t handle, bufferlist &out);
void reregister_watch();
+
+ void notify_listeners_releasing_lock();
+ void notify_listeners_updated_lock();
};
} // namespace librbd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "librbd/Journal.h"
+#include "librbd/AioCompletion.h"
+#include "librbd/AioImageRequestWQ.h"
+#include "librbd/AioObjectRequest.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/JournalTypes.h"
+#include "journal/Journaler.h"
+#include "journal/ReplayEntry.h"
+#include "common/errno.h"
+
+#define dout_subsys ceph_subsys_rbd
+#undef dout_prefix
+#define dout_prefix *_dout << "librbd::Journal: "
+
+namespace librbd {
+
+namespace {
+
+const std::string CLIENT_DESCRIPTION = "master image";
+
+} // anonymous namespace
+
+Journal::Journal(ImageCtx &image_ctx)
+ : m_image_ctx(image_ctx), m_journaler(NULL),
+ m_lock("Journal::m_lock"), m_state(STATE_UNINITIALIZED),
+ m_lock_listener(this), m_replay_handler(this), m_close_pending(false),
+ m_next_tid(0), m_blocking_writes(false) {
+
+ ldout(m_image_ctx.cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
+
+ m_image_ctx.image_watcher->register_listener(&m_lock_listener);
+
+ Mutex::Locker locker(m_lock);
+ block_writes();
+}
+
+Journal::~Journal() {
+ assert(m_journaler == NULL);
+
+ m_image_ctx.image_watcher->unregister_listener(&m_lock_listener);
+
+ Mutex::Locker locker(m_lock);
+ unblock_writes();
+}
+
+bool Journal::is_journal_supported(ImageCtx &image_ctx) {
+ assert(image_ctx.snap_lock.is_locked());
+ return ((image_ctx.features & RBD_FEATURE_JOURNALING) &&
+ !image_ctx.read_only && image_ctx.snap_id == CEPH_NOSNAP);
+}
+
+int Journal::create(librados::IoCtx &io_ctx, const std::string &image_id) {
+ CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
+ ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
+
+ // TODO configurable commit flush interval
+ ::journal::Journaler journaler(io_ctx, image_id, "", 5);
+
+ // TODO order / splay width via config / image metadata / data pool
+ int r = journaler.create(24, 4, io_ctx.get_id());
+ if (r < 0) {
+ lderr(cct) << "failed to create journal: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+
+ r = journaler.register_client(CLIENT_DESCRIPTION);
+ if (r < 0) {
+ lderr(cct) << "failed to register client: " << cpp_strerror(r) << dendl;
+ return r;
+ }
+ return 0;
+}
+
+int Journal::remove(librados::IoCtx &io_ctx, const std::string &image_id) {
+ CephContext *cct = reinterpret_cast<CephContext *>(io_ctx.cct());
+ ldout(cct, 5) << __func__ << ": image=" << image_id << dendl;
+
+ return 0;
+}
+
+bool Journal::is_journal_ready() const {
+ Mutex::Locker locker(m_lock);
+ return (m_state == STATE_RECORDING);
+}
+
+void Journal::open() {
+ Mutex::Locker locker(m_lock);
+ if (m_journaler != NULL) {
+ return;
+ }
+
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << dendl;
+ create_journaler();
+}
+
+int Journal::close() {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": state=" << m_state << dendl;
+
+ Mutex::Locker locker(m_lock);
+ if (m_state == STATE_UNINITIALIZED) {
+ return 0;
+ }
+
+ int r;
+ bool done = false;
+ while (!done) {
+ switch (m_state) {
+ case STATE_UNINITIALIZED:
+ done = true;
+ break;
+ case STATE_INITIALIZING:
+ case STATE_REPLAYING:
+ m_close_pending = true;
+ wait_for_state_transition();
+ break;
+ case STATE_RECORDING:
+ r = stop_recording();
+ if (r < 0) {
+ return r;
+ }
+ done = true;
+ break;
+ default:
+ assert(false);
+ }
+ }
+
+ destroy_journaler();
+ return 0;
+}
+
+uint64_t Journal::append_event(AioCompletion *aio_comp,
+ const journal::EventEntry &event_entry,
+ const AioObjectRequests &requests,
+ bool flush_entry) {
+ assert(m_image_ctx.owner_lock.is_locked());
+ assert(m_state == STATE_RECORDING);
+
+ bufferlist bl;
+ ::encode(event_entry, bl);
+
+ ::journal::Future future = m_journaler->append("", bl);
+ uint64_t tid;
+ {
+ Mutex::Locker locker(m_lock);
+ tid = m_next_tid++;
+ m_events[tid] = Event(future, aio_comp, requests);
+ }
+
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": "
+ << "event=" << event_entry.get_event_type() << ", "
+ << "new_reqs=" << requests.size() << ", "
+ << "flush=" << flush_entry << ", tid=" << tid << dendl;
+
+ Context *on_safe = new C_EventSafe(this, tid);
+ if (flush_entry) {
+ future.flush(on_safe);
+ } else {
+ future.wait(on_safe);
+ }
+ return tid;
+}
+
+void Journal::create_journaler() {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << dendl;
+
+ assert(m_lock.is_locked());
+ assert(m_state == STATE_UNINITIALIZED);
+
+ // TODO allow alternate pool for journal objects and commit flush interval
+ m_close_pending = false;
+ m_journaler = new ::journal::Journaler(m_image_ctx.md_ctx, m_image_ctx.id, "",
+ 5);
+
+ m_journaler->init(new C_InitJournal(this));
+ transition_state(STATE_INITIALIZING);
+}
+
+void Journal::destroy_journaler() {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << dendl;
+
+ assert(m_lock.is_locked());
+
+ m_close_pending = false;
+ delete m_journaler;
+ m_journaler = NULL;
+ transition_state(STATE_UNINITIALIZED);
+}
+
+void Journal::handle_initialized(int r) {
+ CephContext *cct = m_image_ctx.cct;
+ if (r < 0) {
+ lderr(cct) << this << " " << __func__ << ": r=" << r << dendl;
+ Mutex::Locker locker(m_lock);
+
+ // TODO: failed to open journal -- retry?
+ destroy_journaler();
+ create_journaler();
+ return;
+ }
+
+ ldout(cct, 20) << this << " " << __func__ << dendl;
+ Mutex::Locker locker(m_lock);
+ if (m_close_pending) {
+ destroy_journaler();
+ return;
+ }
+
+ transition_state(STATE_REPLAYING);
+ m_journaler->start_replay(&m_replay_handler);
+}
+
+void Journal::handle_replay_ready() {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << dendl;
+
+ Mutex::Locker locker(m_lock);
+ while (true) {
+ if (m_close_pending) {
+ m_journaler->stop_replay();
+ destroy_journaler();
+ return;
+ }
+
+ ::journal::ReplayEntry replay_entry;
+ if (!m_journaler->try_pop_front(&replay_entry)) {
+ return;
+ }
+
+ m_lock.Unlock();
+ // TODO process the payload
+ m_lock.Lock();
+ }
+}
+
+void Journal::handle_replay_complete(int r) {
+ CephContext *cct = m_image_ctx.cct;
+
+ {
+ Mutex::Locker locker(m_lock);
+ if (r < 0) {
+ lderr(cct) << this << " " << __func__ << ": r=" << r << dendl;
+
+ // TODO: failed to replay journal -- retry?
+ destroy_journaler();
+ create_journaler();
+ return;
+ }
+
+ ldout(cct, 20) << this << " " << __func__ << dendl;
+ m_journaler->stop_replay();
+
+ if (m_close_pending) {
+ destroy_journaler();
+ return;
+ }
+
+ // TODO configurable flush interval, flush bytes, and flush age
+ m_journaler->start_append(0, 0, 0);
+ transition_state(STATE_RECORDING);
+
+ unblock_writes();
+ }
+
+ // kick peers to let them know they can re-request the lock now
+ m_image_ctx.image_watcher->notify_lock_state();
+}
+
+void Journal::handle_event_safe(int r, uint64_t tid) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
+ << "tid=" << tid << dendl;
+
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+
+ AioCompletion *aio_comp;
+ AioObjectRequests aio_object_requests;
+ Contexts on_safe_contexts;
+ {
+ Mutex::Locker locker(m_lock);
+ Events::iterator it = m_events.find(tid);
+ assert(it != m_events.end());
+
+ Event &event = it->second;
+ aio_comp = event.aio_comp;
+ aio_object_requests.swap(event.aio_object_requests);
+ on_safe_contexts.swap(event.on_safe_contexts);
+ m_events.erase(it);
+ }
+
+ ldout(cct, 20) << "completing tid=" << tid << dendl;
+
+ assert(m_image_ctx.image_watcher->is_lock_owner());
+
+ if (r < 0) {
+ // don't send aio requests if the journal fails -- bubble error up
+ aio_comp->fail(cct, r);
+ } else {
+ // send any waiting aio requests now that journal entry is safe
+ for (AioObjectRequests::iterator it = aio_object_requests.begin();
+ it != aio_object_requests.end(); ++it) {
+ (*it)->send();
+ }
+ }
+
+ // alert the cache about the journal event status
+ for (Contexts::iterator it = on_safe_contexts.begin();
+ it != on_safe_contexts.end(); ++it) {
+ (*it)->complete(r);
+ }
+}
+
+bool Journal::handle_requested_lock() {
+ Mutex::Locker locker(m_lock);
+
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": " << "state=" << m_state
+ << dendl;
+
+ // prevent peers from taking our lock while we are replaying
+ return (m_state != STATE_INITIALIZING && m_state != STATE_REPLAYING);
+}
+
+void Journal::handle_releasing_lock() {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << dendl;
+
+ Mutex::Locker locker(m_lock);
+ if (m_state == STATE_INITIALIZING || m_state == STATE_REPLAYING) {
+ // wait for replay to successfully interrupt
+ m_close_pending = true;
+ wait_for_state_transition();
+ }
+
+ if (m_state == STATE_UNINITIALIZED || m_state == STATE_RECORDING) {
+ // prevent new write ops but allow pending ops to flush to the journal
+ block_writes();
+ }
+}
+
+void Journal::handle_lock_updated(bool lock_owner) {
+
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": "
+ << "owner=" << lock_owner << dendl;
+
+ Mutex::Locker locker(m_lock);
+ if (lock_owner && m_state == STATE_UNINITIALIZED) {
+ create_journaler();
+ } else if (!lock_owner && m_state != STATE_UNINITIALIZED) {
+ assert(m_state == STATE_RECORDING);
+ assert(m_events.empty());
+ int r = stop_recording();
+ if (r < 0) {
+ // TODO handle failed journal writes
+ assert(false);
+ }
+ }
+}
+
+int Journal::stop_recording() {
+ C_SaferCond cond;
+
+ m_journaler->stop_append(&cond);
+
+ m_lock.Unlock();
+ int r = cond.wait();
+ m_lock.Lock();
+
+ destroy_journaler();
+ if (r < 0) {
+ lderr(m_image_ctx.cct) << "failed to flush journal: " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+ return 0;
+}
+
+void Journal::block_writes() {
+ assert(m_lock.is_locked());
+ if (!m_blocking_writes) {
+ m_blocking_writes = true;
+ m_image_ctx.aio_work_queue->block_writes();
+ }
+}
+
+void Journal::unblock_writes() {
+ assert(m_lock.is_locked());
+ if (m_blocking_writes) {
+ m_blocking_writes = false;
+ m_image_ctx.aio_work_queue->unblock_writes();
+ }
+}
+
+void Journal::transition_state(State state) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": new state=" << state << dendl;
+ assert(m_lock.is_locked());
+ m_state = state;
+ m_cond.Signal();
+}
+
+void Journal::wait_for_state_transition() {
+ assert(m_lock.is_locked());
+ State state = m_state;
+ while (m_state == state) {
+ m_cond.Wait(m_lock);
+ }
+}
+
+} // namespace librbd
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#ifndef CEPH_LIBRBD_JOURNAL_H
+#define CEPH_LIBRBD_JOURNAL_H
+
+#include "include/int_types.h"
+#include "include/Context.h"
+#include "include/unordered_map.h"
+#include "include/rados/librados.hpp"
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "journal/Future.h"
+#include "journal/ReplayHandler.h"
+#include "librbd/ImageWatcher.h"
+#include <list>
+#include <string>
+
+class Context;
+namespace journal {
+class Journaler;
+}
+
+namespace librbd {
+
+class AioCompletion;
+class AioObjectRequest;
+class ImageCtx;
+namespace journal {
+class EventEntry;
+}
+
+class Journal {
+public:
+ typedef std::list<AioObjectRequest *> AioObjectRequests;
+
+ Journal(ImageCtx &image_ctx);
+ ~Journal();
+
+ static bool is_journal_supported(ImageCtx &image_ctx);
+ static int create(librados::IoCtx &io_ctx, const std::string &image_id);
+ static int remove(librados::IoCtx &io_ctx, const std::string &image_id);
+
+ bool is_journal_ready() const;
+
+ void open();
+ int close();
+
+ uint64_t append_event(AioCompletion *aio_comp,
+ const journal::EventEntry &event_entry,
+ const AioObjectRequests &requests,
+ bool flush_entry);
+
+private:
+ typedef std::list<Context *> Contexts;
+
+ enum State {
+ STATE_UNINITIALIZED,
+ STATE_INITIALIZING,
+ STATE_REPLAYING,
+ STATE_RECORDING,
+ };
+
+ struct Event {
+ ::journal::Future future;
+ AioCompletion *aio_comp;
+ AioObjectRequests aio_object_requests;
+ Contexts on_safe_contexts;
+
+ Event() : aio_comp(NULL) {
+ }
+ Event(const ::journal::Future &_future, AioCompletion *_aio_comp,
+ const AioObjectRequests &_requests)
+ : future(_future), aio_comp(_aio_comp), aio_object_requests(_requests) {
+ }
+ };
+ typedef ceph::unordered_map<uint64_t, Event> Events;
+
+ struct LockListener : public ImageWatcher::Listener {
+ Journal *journal;
+ LockListener(Journal *_journal) : journal(_journal) {
+ }
+
+ virtual bool handle_requested_lock() {
+ return journal->handle_requested_lock();
+ }
+ virtual void handle_releasing_lock() {
+ journal->handle_releasing_lock();
+ }
+ virtual void handle_lock_updated(bool lock_supported, bool lock_owner) {
+ journal->handle_lock_updated(lock_owner);
+ }
+ };
+
+ struct C_InitJournal : public Context {
+ Journal *journal;
+
+ C_InitJournal(Journal *_journal) : journal(_journal) {
+ }
+
+ virtual void finish(int r) {
+ journal->handle_initialized(r);
+ }
+ };
+
+ struct C_EventSafe : public Context {
+ Journal *journal;
+ uint64_t tid;
+
+ C_EventSafe(Journal *_journal, uint64_t _tid)
+ : journal(_journal), tid(_tid) {
+ }
+
+ virtual void finish(int r) {
+ journal->handle_event_safe(r, tid);
+ }
+ };
+
+ struct ReplayHandler : public ::journal::ReplayHandler {
+ Journal *journal;
+ ReplayHandler(Journal *_journal) : journal(_journal) {
+ }
+
+ virtual void get() {
+ // TODO
+ }
+ virtual void put() {
+ // TODO
+ }
+
+ virtual void handle_entries_available() {
+ journal->handle_replay_ready();
+ }
+ virtual void handle_complete(int r) {
+ journal->handle_replay_complete(r);
+ }
+ };
+
+ ImageCtx &m_image_ctx;
+
+ ::journal::Journaler *m_journaler;
+
+ mutable Mutex m_lock;
+ Cond m_cond;
+ State m_state;
+
+ LockListener m_lock_listener;
+
+ ReplayHandler m_replay_handler;
+ bool m_close_pending;
+
+ uint64_t m_next_tid;
+ Events m_events;
+
+ bool m_blocking_writes;
+
+ void create_journaler();
+ void destroy_journaler();
+
+ void handle_initialized(int r);
+
+ void handle_replay_ready();
+ void handle_replay_complete(int r);
+
+ void handle_event_safe(int r, uint64_t tid);
+
+ bool handle_requested_lock();
+ void handle_releasing_lock();
+ void handle_lock_updated(bool lock_owner);
+
+ int stop_recording();
+
+ void block_writes();
+ void unblock_writes();
+
+ void transition_state(State state);
+ void wait_for_state_transition();
+};
+
+} // namespace librbd
+
+#endif // CEPH_LIBRBD_JOURNAL_H
librbd/ImageCtx.cc \
librbd/ImageWatcher.cc \
librbd/internal.cc \
+ librbd/Journal.cc \
librbd/LibrbdAdminSocketHook.cc \
librbd/LibrbdWriteback.cc \
librbd/ObjectMap.cc \
librbd_la_SOURCES = \
librbd/librbd.cc
librbd_la_LIBADD = \
- librbd_internal.la $(LIBRBD_TYPES) \
+ librbd_internal.la $(LIBRBD_TYPES) libjournal.la \
$(LIBRADOS) $(LIBCOMMON) $(LIBOSDC) \
librados_internal.la \
libcls_rbd_client.la \
libcls_lock_client.la \
+ libcls_journal_client.la \
$(PTHREAD_LIBS) $(EXTRALIBS)
librbd_la_LDFLAGS = ${AM_LDFLAGS} -version-info 1:0:0
librbd/ImageCtx.h \
librbd/ImageWatcher.h \
librbd/internal.h \
+ librbd/Journal.h \
librbd/JournalTypes.h \
librbd/LibrbdAdminSocketHook.h \
librbd/LibrbdWriteback.h \
{
}
+int ObjectMap::remove(librados::IoCtx &io_ctx, const std::string &image_id) {
+ return io_ctx.remove(object_map_name(image_id, CEPH_NOSNAP));
+}
+
std::string ObjectMap::object_map_name(const std::string &image_id,
uint64_t snap_id) {
std::string oid(RBD_OBJECT_MAP_PREFIX + image_id);
ObjectMap(ImageCtx &image_ctx);
+ static int remove(librados::IoCtx &io_ctx, const std::string &image_id);
static std::string object_map_name(const std::string &image_id,
uint64_t snap_id);
#include "librbd/ImageCtx.h"
#include "librbd/ImageWatcher.h"
#include "librbd/internal.h"
+#include "librbd/Journal.h"
#include "librbd/ObjectMap.h"
#include "librbd/parent_types.h"
#include "librbd/RebuildObjectMapRequest.h"
OBJECT_NONEXISTENT);
r = io_ctx.operate(ObjectMap::object_map_name(id, CEPH_NOSNAP), &op);
if (r < 0) {
+ lderr(cct) << "error creating initial object map: "
+ << cpp_strerror(r) << dendl;
goto err_remove_header;
}
}
+ if ((features & RBD_FEATURE_JOURNALING) != 0) {
+ if ((features & RBD_FEATURE_EXCLUSIVE_LOCK) == 0) {
+ lderr(cct) << "cannot use journaling without exclusive lock" << dendl;
+ goto err_remove_object_map;
+ }
+
+ r = Journal::create(io_ctx, id);
+ if (r < 0) {
+ lderr(cct) << "error creating journal: " << cpp_strerror(r) << dendl;
+ goto err_remove_object_map;
+ }
+ }
+
ldout(cct, 2) << "done." << dendl;
return 0;
+ err_remove_object_map:
+ if ((features & RBD_FEATURE_OBJECT_MAP) != 0) {
+ remove_r = ObjectMap::remove(io_ctx, id);
+ if (remove_r < 0) {
+ lderr(cct) << "error cleaning up object map after creation failed: "
+ << cpp_strerror(remove_r) << dendl;
+ }
+ }
+
err_remove_header:
remove_r = io_ctx.remove(header_oid);
if (remove_r < 0) {
return -EINVAL;
}
+ RWLock::RLocker owner_locker(ictx->owner_lock);
+ RWLock::WLocker md_locker(ictx->md_lock);
+ r = _flush(ictx);
+ if (r < 0) {
+ return r;
+ }
+
if ((features & RBD_FEATURES_MUTABLE) != features) {
lderr(cct) << "cannot update immutable features" << dendl;
return -EINVAL;
return -EINVAL;
}
- RWLock::RLocker l(ictx->snap_lock);
- uint64_t new_features = ictx->features | features;
- if (!enabled) {
+ RWLock::WLocker snap_locker(ictx->snap_lock);
+ uint64_t new_features;
+ if (enabled) {
+ features &= ~ictx->features;
+ new_features = ictx->features | features;
+ } else {
+ features &= ictx->features;
new_features = ictx->features & ~features;
}
- if (ictx->features == new_features) {
+ if (features == 0) {
return 0;
}
return -EINVAL;
}
features_mask |= RBD_FEATURE_EXCLUSIVE_LOCK;
+
+ r = Journal::create(ictx->md_ctx, ictx->id);
+ if (r < 0) {
+ lderr(cct) << "error creating image journal: " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
}
if (enable_flags != 0) {
if ((features & RBD_FEATURE_FAST_DIFF) != 0) {
disable_flags = RBD_FLAG_FAST_DIFF_INVALID;
}
+ if ((features & RBD_FEATURE_JOURNALING) != 0) {
+ r = Journal::remove(ictx->md_ctx, ictx->id);
+ if (r < 0) {
+ lderr(cct) << "error removing image journal: " << cpp_strerror(r)
+ << dendl;
+ return r;
+ }
+ }
}
ldout(cct, 10) << "update_features: features=" << new_features << ", mask="
if (r < 0) {
lderr(cct) << "failed to update features: " << cpp_strerror(r)
<< dendl;
+ return r;
}
if (((ictx->features & RBD_FEATURE_OBJECT_MAP) == 0) &&
((features & RBD_FEATURE_OBJECT_MAP) != 0)) {
}
}
if (!old_format) {
- r = io_ctx.remove(ObjectMap::object_map_name(id, CEPH_NOSNAP));
+ r = Journal::remove(io_ctx, id);
+ if (r < 0 && r != -ENOENT) {
+ lderr(cct) << "error removing image journal" << dendl;
+ return r;
+ }
+
+ r = ObjectMap::remove(io_ctx, id);
if (r < 0 && r != -ENOENT) {
lderr(cct) << "error removing image object map" << dendl;
+ return r;
}
ldout(cct, 2) << "removing id object..." << dendl;
ictx->object_map.refresh(ictx->snap_id);
ictx->data_ctx.selfmanaged_snap_set_write_ctx(ictx->snapc.seq, ictx->snaps);
+
+ // dynamically enable/disable journaling support
+ if ((ictx->features & RBD_FEATURE_JOURNALING) != 0 &&
+ ictx->image_watcher != NULL && ictx->journal == NULL &&
+ ictx->snap_name.empty()) {
+ ictx->open_journal();
+ } else if ((ictx->features & RBD_FEATURE_JOURNALING) == 0 &&
+ ictx->journal != NULL) {
+ // TODO journal needs to be disabled via proxied request to avoid race
+ // between deleting journal and appending journal events
+ }
} // release snap_lock and cache_lock
if (ictx->image_watcher != NULL) {
// snapshot and the user is trying to fix that
ictx_check(ictx);
- bool unlocking = false;
- {
- RWLock::WLocker l(ictx->owner_lock);
- if (ictx->image_watcher != NULL && ictx->image_watcher->is_lock_owner() &&
- snap_name != NULL && strlen(snap_name) != 0) {
- // stop incoming requests since we will release the lock
- ictx->image_watcher->prepare_unlock();
- unlocking = true;
+ int r;
+ bool snapshot_mode = (snap_name != NULL && strlen(snap_name) != 0);
+ if (snapshot_mode) {
+ {
+ RWLock::WLocker owner_locker(ictx->owner_lock);
+ if (ictx->image_watcher != NULL &&
+ ictx->image_watcher->is_lock_owner()) {
+ r = ictx->image_watcher->release_lock();
+ if (r < 0) {
+ return r;
+ }
+ }
}
- }
- ictx->cancel_async_requests();
- ictx->flush_async_operations();
- if (ictx->object_cacher) {
- // complete pending writes before we're set to a snapshot and
- // get -EROFS for writes
- RWLock::RLocker owner_locker(ictx->owner_lock);
- RWLock::WLocker md_locker(ictx->md_lock);
- ictx->flush_cache();
+ ictx->cancel_async_requests();
+ ictx->flush_async_operations();
+
+ if (ictx->object_cacher) {
+ RWLock::RLocker owner_locker(ictx->owner_lock);
+ r = _flush(ictx);
+ if (r < 0) {
+ return r;
+ }
+ }
+
+ {
+ RWLock::WLocker snap_locker(ictx->snap_lock);
+ if (ictx->journal != NULL) {
+ r = ictx->close_journal(false);
+ if (r < 0) {
+ return r;
+ }
+ }
+ }
}
- int r = _snap_set(ictx, snap_name);
+
+ r = _snap_set(ictx, snap_name);
if (r < 0) {
- RWLock::WLocker l(ictx->owner_lock);
- if (unlocking) {
- ictx->image_watcher->cancel_unlock();
- }
return r;
}
- RWLock::WLocker l(ictx->owner_lock);
- if (ictx->image_watcher != NULL) {
- if (unlocking) {
- r = ictx->image_watcher->unlock();
- if (r < 0) {
- lderr(ictx->cct) << "error unlocking image: " << cpp_strerror(r)
- << dendl;
- }
+ {
+ RWLock::WLocker snap_locker(ictx->snap_lock);
+ if ((ictx->features & RBD_FEATURE_JOURNALING) != 0 &&
+ ictx->journal == NULL && !snapshot_mode) {
+ ictx->open_journal();
}
+ }
+
+ RWLock::RLocker owner_locker(ictx->owner_lock);
+ if (ictx->image_watcher != NULL) {
ictx->image_watcher->refresh();
}
return r;
{
ldout(ictx->cct, 20) << "close_image " << ictx << dendl;
+ // finish all incoming IO operations
+ ictx->aio_work_queue->drain();
+
+ int r = 0;
{
- RWLock::WLocker l(ictx->owner_lock);
+ // release the lock (and flush all in-flight IO)
+ RWLock::WLocker owner_locker(ictx->owner_lock);
if (ictx->image_watcher != NULL && ictx->image_watcher->is_lock_owner()) {
- // stop incoming requests
- ictx->image_watcher->prepare_unlock();
+ r = ictx->image_watcher->release_lock();
+ if (r < 0) {
+ lderr(ictx->cct) << "error releasing image lock: " << cpp_strerror(r)
+ << dendl;
+ }
}
}
- assert(!ictx->aio_work_queue->writes_suspended() ||
+ assert(!ictx->aio_work_queue->writes_blocked() ||
ictx->aio_work_queue->writes_empty());
- ictx->aio_work_queue->drain();
+
ictx->cancel_async_requests();
ictx->flush_async_operations();
ictx->readahead.wait_for_pending();
- int r;
+ int flush_r;
if (ictx->object_cacher) {
- r = ictx->shutdown_cache(); // implicitly flushes
+ flush_r = ictx->shutdown_cache(); // implicitly flushes
} else {
RWLock::RLocker owner_locker(ictx->owner_lock);
- r = _flush(ictx);
+ flush_r = _flush(ictx);
}
- if (r < 0) {
- lderr(ictx->cct) << "error flushing IO: " << cpp_strerror(r)
+ if (flush_r< 0) {
+ lderr(ictx->cct) << "error flushing IO: " << cpp_strerror(flush_r)
<< dendl;
+ if (r == 0) {
+ r = flush_r;
+ }
}
ictx->op_work_queue->drain();
ictx->copyup_finisher->stop();
}
+ if (ictx->journal != NULL) {
+ int close_r = ictx->close_journal(true);
+ if (close_r < 0 && r == 0) {
+ r = close_r;
+ }
+ }
+
if (ictx->parent) {
int close_r = close_image(ictx->parent);
if (r == 0 && close_r < 0) {
}
if (ictx->image_watcher) {
- {
- RWLock::WLocker l(ictx->owner_lock);
- if (ictx->image_watcher->is_lock_owner()) {
- int unlock_r = ictx->image_watcher->unlock();
- if (unlock_r < 0) {
- lderr(ictx->cct) << "error unlocking image: "
- << cpp_strerror(unlock_r) << dendl;
- if (r == 0) {
- r = unlock_r;
- }
- }
- }
- }
ictx->unregister_watch();
}
unittest_librbd_LDADD = \
librbd_test.la librbd_api.la librbd_internal.la $(LIBRBD_TYPES) \
libcls_rbd_client.la libcls_lock_client.la \
+ libjournal.la libcls_journal_client.la \
librados_test_stub.la librados_internal.la \
$(LIBOSDC) $(UNITTEST_LDADD) \
$(CEPH_GLOBAL) $(RADOS_TEST_LDADD)
ceph_test_librbd_LDADD = \
librbd_test.la librbd_api.la librbd_internal.la $(LIBRBD_TYPES) \
libcls_rbd_client.la libcls_lock_client.la \
+ libjournal.la libcls_journal_client.la \
librados_api.la $(LIBRADOS_DEPS) $(UNITTEST_LDADD) \
$(CEPH_GLOBAL) $(RADOS_TEST_LDADD)
bin_DEBUGPROGRAMS += ceph_test_librbd