From: Jason Dillaman Date: Fri, 10 Jul 2015 01:25:33 +0000 (-0400) Subject: librdb: initial interface with journal library X-Git-Tag: v10.0.1~52^2~21 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=03636c93eeafa049a80b6b88cf3140b77d52b021;p=ceph.git librdb: initial interface with journal library Rough draft of journal library integration within librbd. Non-cached IO paths are now recorded to the journal. Incoming IO ops are blocked if the exclusive lock isn't held or if the journal hasn't been replayed. Signed-off-by: Jason Dillaman --- diff --git a/src/librbd/AioCompletion.cc b/src/librbd/AioCompletion.cc index f3822379f096..982a03fc7f79 100644 --- a/src/librbd/AioCompletion.cc +++ b/src/librbd/AioCompletion.cc @@ -6,9 +6,11 @@ #include "common/ceph_context.h" #include "common/dout.h" #include "common/errno.h" +#include "common/perf_counters.h" #include "common/WorkQueue.h" #include "librbd/AioObjectRequest.h" +#include "librbd/ImageCtx.h" #include "librbd/internal.h" #include "librbd/AioCompletion.h" @@ -99,6 +101,21 @@ namespace librbd { tracepoint(librbd, aio_complete_exit); } + void AioCompletion::init_time(ImageCtx *i, aio_type_t t) { + if (ictx == NULL) { + ictx = i; + aio_type = t; + start_time = ceph_clock_now(ictx->cct); + } + } + + void AioCompletion::start_op(ImageCtx *i, aio_type_t t) { + init_time(i, t); + if (!async_op.started()) { + async_op.start_op(*ictx); + } + } + void AioCompletion::fail(CephContext *cct, int r) { lderr(cct) << "AioCompletion::fail() " << this << ": " << cpp_strerror(r) diff --git a/src/librbd/AioCompletion.h b/src/librbd/AioCompletion.h index ba24c300bb1c..94ed682dd2e8 100644 --- a/src/librbd/AioCompletion.h +++ b/src/librbd/AioCompletion.h @@ -5,18 +5,16 @@ #include "common/Cond.h" #include "common/Mutex.h" -#include "common/ceph_context.h" -#include "common/perf_counters.h" #include "include/Context.h" #include "include/utime.h" #include "include/rbd/librbd.hpp" #include "librbd/AsyncOperation.h" -#include "librbd/ImageCtx.h" -#include "librbd/internal.h" #include "osdc/Striper.h" +class CephContext; + namespace librbd { class AioObjectRead; @@ -88,20 +86,8 @@ namespace librbd { void finish_adding_requests(CephContext *cct); - void init_time(ImageCtx *i, aio_type_t t) { - if (ictx == NULL) { - ictx = i; - aio_type = t; - start_time = ceph_clock_now(ictx->cct); - } - } - void start_op(ImageCtx *i, aio_type_t t) { - init_time(i, t); - if (!async_op.started()) { - async_op.start_op(*ictx); - } - } - + void init_time(ImageCtx *i, aio_type_t t); + void start_op(ImageCtx *i, aio_type_t t); void fail(CephContext *cct, int r); void complete(CephContext *cct); diff --git a/src/librbd/AioImageRequest.cc b/src/librbd/AioImageRequest.cc index 56fbc453ef31..dc6d87f24fd5 100644 --- a/src/librbd/AioImageRequest.cc +++ b/src/librbd/AioImageRequest.cc @@ -7,6 +7,8 @@ #include "librbd/ImageCtx.h" #include "librbd/ImageWatcher.h" #include "librbd/internal.h" +#include "librbd/Journal.h" +#include "librbd/JournalTypes.h" #include "include/rados/librados.hpp" #include "osdc/Striper.h" @@ -147,7 +149,11 @@ void AbstractAioImageWrite::send_request() { RWLock::RLocker md_locker(m_image_ctx.md_lock); + bool journaling = false; + uint64_t journal_tid = 0; + uint64_t clip_len = m_len; + ObjectExtents object_extents; ::SnapContext snapc; { // prevent image size from changing between computing clip and recording @@ -165,20 +171,33 @@ void AbstractAioImageWrite::send_request() { } snapc = m_image_ctx.snapc; - m_aio_comp->start_op(&m_image_ctx, AIO_TYPE_WRITE); // TODO use correct enum + m_aio_comp->start_op(&m_image_ctx, get_aio_type()); + + // map to object extents + if (clip_len > 0) { + Striper::file_to_extents(cct, m_image_ctx.format_string, + &m_image_ctx.layout, m_off, clip_len, 0, + object_extents); + } + + journaling = (m_image_ctx.journal != NULL); } assert(!m_image_ctx.image_watcher->is_lock_supported() || m_image_ctx.image_watcher->is_lock_owner()); - // map to object extents - ObjectExtents extents; - if (clip_len > 0) { - Striper::file_to_extents(cct, m_image_ctx.format_string, - &m_image_ctx.layout, m_off, clip_len, 0, extents); + AioObjectRequests requests; + send_object_requests(object_extents, snapc, (journaling ? &requests : NULL)); + + if (journaling) { + // in-flight ops are flushed prior to closing the journal + assert(m_image_ctx.journal != NULL); + journal_tid = append_journal_event(requests, m_synchronous); } - send_object_requests(extents, snapc); + if (m_image_ctx.object_cacher != NULL) { + send_cache_requests(object_extents, snapc, journal_tid); + } update_stats(clip_len); m_aio_comp->finish_adding_requests(cct); @@ -186,86 +205,131 @@ void AbstractAioImageWrite::send_request() { } void AbstractAioImageWrite::send_object_requests( - const ObjectExtents &object_extents, const ::SnapContext &snapc) { + const ObjectExtents &object_extents, const ::SnapContext &snapc, + AioObjectRequests *aio_object_requests) { CephContext *cct = m_image_ctx.cct; + for (ObjectExtents::const_iterator p = object_extents.begin(); p != object_extents.end(); ++p) { ldout(cct, 20) << " oid " << p->oid << " " << p->offset << "~" << p->length << " from " << p->buffer_extents << dendl; - send_object_request(*p, snapc); + C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp); + AioObjectRequest *request = send_object_request(*p, snapc, req_comp); + + // if journaling, stash the request for later; otherwise send + if (request != NULL) { + if (aio_object_requests != NULL) { + aio_object_requests->push_back(request); + } else { + request->send(); + } + } } } -void AioImageWrite::send_object_request(const ObjectExtent &object_extent, - const ::SnapContext &snapc) { - CephContext *cct = m_image_ctx.cct; - - // assemble extent - bufferlist bl; +void AioImageWrite::assemble_extent(const ObjectExtent &object_extent, + bufferlist *bl) { for (Extents::const_iterator q = object_extent.buffer_extents.begin(); q != object_extent.buffer_extents.end(); ++q) { - bl.append(m_buf + q->first, q->second); + bl->append(m_buf + q->first, q->second);; } +} - C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp); - if (m_image_ctx.object_cacher) { +uint64_t AioImageWrite::append_journal_event( + const AioObjectRequests &requests, bool synchronous) { + bufferlist bl; + bl.append(m_buf, m_len); + + journal::EventEntry event_entry(journal::AioWriteEvent(m_off, m_len, bl)); + return m_image_ctx.journal->append_event(m_aio_comp, event_entry, requests, + synchronous); +} + +void AioImageWrite::send_cache_requests(const ObjectExtents &object_extents, + const ::SnapContext &snapc, + uint64_t journal_tid) { + CephContext *cct = m_image_ctx.cct; + + for (ObjectExtents::const_iterator p = object_extents.begin(); + p != object_extents.end(); ++p) { + const ObjectExtent &object_extent = *p; + + bufferlist bl; + assemble_extent(object_extent, &bl); + + // TODO pass journal_tid to object cacher + C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp); m_image_ctx.write_to_cache(object_extent.oid, bl, object_extent.length, object_extent.offset, req_comp, m_op_flags); - } else { - AioObjectWrite *req = new AioObjectWrite(&m_image_ctx, - object_extent.oid.name, - object_extent.objectno, - object_extent.offset, bl, - snapc, req_comp); - - req->set_op_flags(m_op_flags); - req->send(); } } +AioObjectRequest *AioImageWrite::send_object_request( + const ObjectExtent &object_extent, const ::SnapContext &snapc, + Context *on_finish) { + if (m_image_ctx.object_cacher != NULL) { + return NULL; + } + + bufferlist bl; + assemble_extent(object_extent, &bl); + AioObjectWrite *req = new AioObjectWrite(&m_image_ctx, + object_extent.oid.name, + object_extent.objectno, + object_extent.offset, bl, + snapc, on_finish); + req->set_op_flags(m_op_flags); + return req; +} void AioImageWrite::update_stats(size_t length) { m_image_ctx.perfcounter->inc(l_librbd_wr); m_image_ctx.perfcounter->inc(l_librbd_wr_bytes, length); } -void AioImageDiscard::send_object_requests(const ObjectExtents &object_extents, - const ::SnapContext &snapc) { - // discard from the cache first to ensure writeback won't recreate - if (m_image_ctx.object_cacher != NULL) { - Mutex::Locker cache_locker(m_image_ctx.cache_lock); - m_image_ctx.object_cacher->discard_set(m_image_ctx.object_set, - object_extents); - } +uint64_t AioImageDiscard::append_journal_event( + const AioObjectRequests &requests, bool synchronous) { + journal::EventEntry event_entry(journal::AioDiscardEvent(m_off, m_len)); + return m_image_ctx.journal->append_event(m_aio_comp, event_entry, requests, + synchronous); +} + +void AioImageDiscard::send_cache_requests(const ObjectExtents &object_extents, + const ::SnapContext &snapc, + uint64_t journal_tid) { + // TODO need to have cache flag pending discard for writeback or need + // to delay cache update until after journal commits + Mutex::Locker cache_locker(m_image_ctx.cache_lock); - AbstractAioImageWrite::send_object_requests(object_extents, snapc); + // TODO pass journal_tid to object cacher + m_image_ctx.object_cacher->discard_set(m_image_ctx.object_set, + object_extents); } -void AioImageDiscard::send_object_request(const ObjectExtent &object_extent, - const ::SnapContext &snapc) { +AioObjectRequest *AioImageDiscard::send_object_request( + const ObjectExtent &object_extent, const ::SnapContext &snapc, + Context *on_finish) { CephContext *cct = m_image_ctx.cct; - C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp); - AioObjectRequest *req; if (object_extent.length == m_image_ctx.layout.fl_object_size) { req = new AioObjectRemove(&m_image_ctx, object_extent.oid.name, - object_extent.objectno, snapc, req_comp); + object_extent.objectno, snapc, on_finish); } else if (object_extent.offset + object_extent.length == m_image_ctx.layout.fl_object_size) { req = new AioObjectTruncate(&m_image_ctx, object_extent.oid.name, object_extent.objectno, object_extent.offset, - snapc, req_comp); + snapc, on_finish); } else { if(cct->_conf->rbd_skip_partial_discard) { - delete req_comp; - return; + delete on_finish; + return NULL; } req = new AioObjectZero(&m_image_ctx, object_extent.oid.name, object_extent.objectno, object_extent.offset, - object_extent.length, snapc, req_comp); + object_extent.length, snapc, on_finish); } - req->send(); + return req; } void AioImageDiscard::update_stats(size_t length) { @@ -276,6 +340,16 @@ void AioImageDiscard::update_stats(size_t length) { void AioImageFlush::send_request() { CephContext *cct = m_image_ctx.cct; + { + // journal the flush event + RWLock::RLocker snap_locker(m_image_ctx.snap_lock); + if (m_image_ctx.journal != NULL) { + m_image_ctx.journal->append_event( + m_aio_comp, journal::EventEntry(journal::AioFlushEvent()), + AioObjectRequests(), true); + } + } + // TODO race condition between registering op and submitting to cache // (might not be flushed -- backport needed) C_AioRequest *flush_ctx = new C_AioRequest(cct, m_aio_comp); diff --git a/src/librbd/AioImageRequest.h b/src/librbd/AioImageRequest.h index db65839b5dd6..ab8053a03f29 100644 --- a/src/librbd/AioImageRequest.h +++ b/src/librbd/AioImageRequest.h @@ -8,12 +8,14 @@ #include "include/buffer.h" #include "common/snap_types.h" #include "osd/osd_types.h" +#include "librbd/AioCompletion.h" +#include #include #include namespace librbd { -class AioCompletion; +class AioObjectRequest; class ImageCtx; class AioImageRequest { @@ -40,6 +42,8 @@ public: void send(); protected: + typedef std::list AioObjectRequests; + ImageCtx &m_image_ctx; AioCompletion *m_aio_comp; @@ -84,25 +88,43 @@ public: return true; } + inline void flag_synchronous() { + m_synchronous = true; + } + protected: typedef std::vector ObjectExtents; + const uint64_t m_off; + const size_t m_len; + AbstractAioImageWrite(ImageCtx &image_ctx, AioCompletion *aio_comp, uint64_t off, size_t len) - : AioImageRequest(image_ctx, aio_comp), m_off(off), m_len(len) { + : AioImageRequest(image_ctx, aio_comp), m_off(off), m_len(len), + m_synchronous(false) { } + virtual aio_type_t get_aio_type() const = 0; + virtual void send_request(); - virtual void send_object_requests(const ObjectExtents &object_extents, - const ::SnapContext &snapc); - virtual void send_object_request(const ObjectExtent &object_extent, - const ::SnapContext &snapc) = 0; + virtual void send_cache_requests(const ObjectExtents &object_extents, + const ::SnapContext &snapc, + uint64_t journal_tid) = 0; + + void send_object_requests(const ObjectExtents &object_extents, + const ::SnapContext &snapc, + AioObjectRequests *aio_object_requests); + virtual AioObjectRequest *send_object_request( + const ObjectExtent &object_extent, const ::SnapContext &snapc, + Context *on_finish) = 0; + + virtual uint64_t append_journal_event(const AioObjectRequests &requests, + bool synchronous) = 0; virtual void update_stats(size_t length) = 0; private: - uint64_t m_off; - size_t m_len; + bool m_synchronous; }; class AioImageWrite : public AbstractAioImageWrite { @@ -114,12 +136,25 @@ public: } protected: + virtual aio_type_t get_aio_type() const { + return AIO_TYPE_WRITE; + } virtual const char *get_request_type() const { return "aio_write"; } - virtual void send_object_request(const ObjectExtent &object_extent, - const ::SnapContext &snapc); + void assemble_extent(const ObjectExtent &object_extent, bufferlist *bl); + + virtual void send_cache_requests(const ObjectExtents &object_extents, + const ::SnapContext &snapc, + uint64_t journal_tid); + + virtual AioObjectRequest *send_object_request( + const ObjectExtent &object_extent, const ::SnapContext &snapc, + Context *on_finish); + + virtual uint64_t append_journal_event(const AioObjectRequests &requests, + bool synchronous); virtual void update_stats(size_t length); private: const char *m_buf; @@ -134,14 +169,23 @@ public: } protected: + virtual aio_type_t get_aio_type() const { + return AIO_TYPE_DISCARD; + } virtual const char *get_request_type() const { return "aio_discard"; } - virtual void send_object_requests(const ObjectExtents &object_extents, - const ::SnapContext &snapc); - virtual void send_object_request(const ObjectExtent &object_extent, - const ::SnapContext &snapc); + virtual void send_cache_requests(const ObjectExtents &object_extents, + const ::SnapContext &snapc, + uint64_t journal_tid); + + virtual AioObjectRequest *send_object_request( + const ObjectExtent &object_extent, const ::SnapContext &snapc, + Context *on_finish); + + virtual uint64_t append_journal_event(const AioObjectRequests &requests, + bool synchronous); virtual void update_stats(size_t length); }; diff --git a/src/librbd/AioImageRequestWQ.cc b/src/librbd/AioImageRequestWQ.cc index 6d5fffde5a4e..6b58b0b728b0 100644 --- a/src/librbd/AioImageRequestWQ.cc +++ b/src/librbd/AioImageRequestWQ.cc @@ -5,7 +5,6 @@ #include "librbd/AioCompletion.h" #include "librbd/AioImageRequest.h" #include "librbd/ImageCtx.h" -#include "librbd/ImageWatcher.h" #include "librbd/internal.h" #define dout_subsys ceph_subsys_rbd @@ -14,6 +13,17 @@ namespace librbd { +AioImageRequestWQ::AioImageRequestWQ(ImageCtx *image_ctx, const string &name, + time_t ti, ThreadPool *tp) + : ThreadPool::PointerWQ(name, ti, 0, tp), + m_image_ctx(*image_ctx), m_lock("AioImageRequestWQ::m_lock"), + m_write_blockers(0), m_in_progress_writes(0), m_queued_writes(0), + m_lock_listener(this), m_blocking_writes(false) { + + CephContext *cct = m_image_ctx.cct; + ldout(cct, 5) << this << " " << ": ictx=" << image_ctx << dendl; +} + ssize_t AioImageRequestWQ::read(uint64_t off, size_t len, char *buf, int op_flags) { CephContext *cct = m_image_ctx.cct; @@ -86,8 +96,7 @@ void AioImageRequestWQ::aio_read(AioCompletion *c, uint64_t off, size_t len, RWLock::RLocker owner_locker(m_image_ctx.owner_lock); if (m_image_ctx.non_blocking_aio) { - queue(new AioImageRead(m_image_ctx, c, off, len, buf, pbl, op_flags), - false); + queue(new AioImageRead(m_image_ctx, c, off, len, buf, pbl, op_flags)); } else { AioImageRequest::aio_read(&m_image_ctx, c, off, len, buf, pbl, op_flags); } @@ -102,10 +111,9 @@ void AioImageRequestWQ::aio_write(AioCompletion *c, uint64_t off, size_t len, << "len=" << len << ", flags=" << op_flags << dendl; RWLock::RLocker owner_locker(m_image_ctx.owner_lock); - bool lock_required = is_lock_required(); - if (m_image_ctx.non_blocking_aio || lock_required) { - queue(new AioImageWrite(m_image_ctx, c, off, len, buf, op_flags), - lock_required); + if (m_image_ctx.non_blocking_aio || is_journal_required() || + writes_blocked()) { + queue(new AioImageWrite(m_image_ctx, c, off, len, buf, op_flags)); } else { AioImageRequest::aio_write(&m_image_ctx, c, off, len, buf, op_flags); } @@ -120,10 +128,9 @@ void AioImageRequestWQ::aio_discard(AioCompletion *c, uint64_t off, << dendl; RWLock::RLocker owner_locker(m_image_ctx.owner_lock); - bool lock_required = is_lock_required(); - if (m_image_ctx.non_blocking_aio || lock_required) { - queue(new AioImageDiscard(m_image_ctx, c, off, len), - lock_required); + if (m_image_ctx.non_blocking_aio || is_journal_required() || + writes_blocked()) { + queue(new AioImageDiscard(m_image_ctx, c, off, len)); } else { AioImageRequest::aio_discard(&m_image_ctx, c, off, len); } @@ -136,33 +143,51 @@ void AioImageRequestWQ::aio_flush(AioCompletion *c) { << "completion=" << c << dendl; RWLock::RLocker owner_locker(m_image_ctx.owner_lock); - if (m_image_ctx.non_blocking_aio || !writes_empty()) { - queue(new AioImageFlush(m_image_ctx, c), false); + if (m_image_ctx.non_blocking_aio || is_journal_required() || + writes_blocked() || !writes_empty()) { + queue(new AioImageFlush(m_image_ctx, c)); } else { AioImageRequest::aio_flush(&m_image_ctx, c); } } -void AioImageRequestWQ::suspend_writes() { +void AioImageRequestWQ::block_writes() { CephContext *cct = m_image_ctx.cct; - ldout(cct, 5) << __func__ << ": " << &m_image_ctx << dendl; Mutex::Locker locker(m_lock); - m_writes_suspended = true; - while (m_in_progress_writes > 0) { - m_cond.Wait(m_lock); + ++m_write_blockers; + ldout(cct, 5) << __func__ << ": " << &m_image_ctx << ", " + << "num=" << m_write_blockers << dendl; + if (m_write_blockers == 1) { + while (m_in_progress_writes > 0) { + m_cond.Wait(m_lock); + } } } -void AioImageRequestWQ::resume_writes() { +void AioImageRequestWQ::unblock_writes() { CephContext *cct = m_image_ctx.cct; - ldout(cct, 5) << __func__ << ": " << &m_image_ctx << dendl; + bool wake_up = false; { Mutex::Locker locker(m_lock); - m_writes_suspended = false; + assert(m_write_blockers > 0); + --m_write_blockers; + + ldout(cct, 5) << __func__ << ": " << &m_image_ctx << ", " + << "num=" << m_write_blockers << dendl; + if (m_write_blockers == 0) { + wake_up = true; + } } - signal(); + + if (wake_up) { + signal(); + } +} + +void AioImageRequestWQ::register_lock_listener() { + m_image_ctx.image_watcher->register_listener(&m_lock_listener); } void *AioImageRequestWQ::_void_dequeue() { @@ -174,7 +199,7 @@ void *AioImageRequestWQ::_void_dequeue() { { if (peek_item->is_write_op()) { Mutex::Locker locker(m_lock); - if (m_writes_suspended) { + if (m_write_blockers > 0) { return NULL; } ++m_in_progress_writes; @@ -201,9 +226,7 @@ void AioImageRequestWQ::process(AioImageRequest *req) { Mutex::Locker locker(m_lock); if (req->is_write_op()) { assert(m_queued_writes > 0); - if (--m_queued_writes == 0) { - m_image_ctx.image_watcher->clear_aio_ops_pending(); - } + --m_queued_writes; assert(m_in_progress_writes > 0); if (--m_in_progress_writes == 0) { @@ -214,19 +237,25 @@ void AioImageRequestWQ::process(AioImageRequest *req) { delete req; } -bool AioImageRequestWQ::is_lock_required() { +bool AioImageRequestWQ::is_journal_required() const { + RWLock::RLocker snap_locker(m_image_ctx.snap_lock); + return (m_image_ctx.journal != NULL); +} + +bool AioImageRequestWQ::is_lock_required() const { assert(m_image_ctx.owner_lock.is_locked()); if (m_image_ctx.image_watcher == NULL) { return false; } + return (m_image_ctx.image_watcher->is_lock_supported() && !m_image_ctx.image_watcher->is_lock_owner()); } -void AioImageRequestWQ::queue(AioImageRequest *req, bool lock_required) { +void AioImageRequestWQ::queue(AioImageRequest *req) { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << __func__ << ": ictx=" << &m_image_ctx << ", " - << "req=" << req << ", lock_req=" << lock_required << dendl; + << "req=" << req << dendl; assert(m_image_ctx.owner_lock.is_locked()); @@ -241,9 +270,38 @@ void AioImageRequestWQ::queue(AioImageRequest *req, bool lock_required) { } ThreadPool::PointerWQ::queue(req); - if (first_write_op) { - m_image_ctx.image_watcher->flag_aio_ops_pending(); - if (lock_required) { + if (is_lock_required() && first_write_op) { + m_image_ctx.image_watcher->request_lock(); + } +} + +void AioImageRequestWQ::handle_releasing_lock() { + assert(m_image_ctx.owner_lock.is_locked()); + + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << __func__ << ": ictx=" << &m_image_ctx << dendl; + + if (!m_blocking_writes) { + m_blocking_writes = true; + block_writes(); + } +} + +void AioImageRequestWQ::handle_lock_updated(bool lock_supported, + bool lock_owner) { + assert(m_image_ctx.owner_lock.is_locked()); + + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << __func__ << ": ictx=" << &m_image_ctx << ", " + << "lock_support=" << lock_supported << ", " + << "owner=" << lock_owner << dendl; + + if ((!lock_supported || lock_owner) && m_blocking_writes) { + m_blocking_writes = false; + unblock_writes(); + } else if (lock_supported && !lock_owner) { + assert(writes_blocked()); + if (!writes_empty()) { m_image_ctx.image_watcher->request_lock(); } } diff --git a/src/librbd/AioImageRequestWQ.h b/src/librbd/AioImageRequestWQ.h index c57715fc2393..75ec889b5313 100644 --- a/src/librbd/AioImageRequestWQ.h +++ b/src/librbd/AioImageRequestWQ.h @@ -4,8 +4,10 @@ #ifndef CEPH_LIBRBD_AIO_IMAGE_REQUEST_WQ_H #define CEPH_LIBRBD_AIO_IMAGE_REQUEST_WQ_H +#include "include/Context.h" #include "common/WorkQueue.h" #include "common/Mutex.h" +#include "librbd/ImageWatcher.h" namespace librbd { @@ -16,11 +18,7 @@ class ImageCtx; class AioImageRequestWQ : protected ThreadPool::PointerWQ { public: AioImageRequestWQ(ImageCtx *image_ctx, const string &name, time_t ti, - ThreadPool *tp) - : ThreadPool::PointerWQ(name, ti, 0, tp), - m_image_ctx(*image_ctx), m_lock("AioImageRequestWQ::m_lock"), - m_writes_suspended(false), m_in_progress_writes(0), m_queued_writes(0) { - } + ThreadPool *tp); ssize_t read(uint64_t off, size_t len, char *buf, int op_flags); ssize_t write(uint64_t off, size_t len, const char *buf, int op_flags); @@ -40,28 +38,54 @@ public: return (m_queued_writes == 0); } - inline bool writes_suspended() const { + inline bool writes_blocked() const { Mutex::Locker locker(m_lock); - return m_writes_suspended; + return (m_write_blockers > 0); } - void suspend_writes(); - void resume_writes(); + void block_writes(); + void unblock_writes(); + + void register_lock_listener(); protected: virtual void *_void_dequeue(); virtual void process(AioImageRequest *req); private: + struct LockListener : public ImageWatcher::Listener { + AioImageRequestWQ *aio_work_queue; + LockListener(AioImageRequestWQ *_aio_work_queue) + : aio_work_queue(_aio_work_queue) { + } + + virtual bool handle_requested_lock() { + return true; + } + virtual void handle_releasing_lock() { + aio_work_queue->handle_releasing_lock(); + } + virtual void handle_lock_updated(bool lock_supported, bool lock_owner) { + aio_work_queue->handle_lock_updated(lock_supported, lock_owner); + } + }; + ImageCtx &m_image_ctx; mutable Mutex m_lock; Cond m_cond; - bool m_writes_suspended; + uint32_t m_write_blockers; uint32_t m_in_progress_writes; uint32_t m_queued_writes; - bool is_lock_required(); - void queue(AioImageRequest *req, bool lock_required); + LockListener m_lock_listener; + bool m_blocking_writes; + + bool is_journal_required() const; + bool is_lock_required() const; + void queue(AioImageRequest *req); + + void handle_releasing_lock(); + void handle_lock_updated(bool lock_supported, bool lock_owner); }; } // namespace librbd diff --git a/src/librbd/CopyupRequest.cc b/src/librbd/CopyupRequest.cc index eff12ff645aa..5c3973a2b97f 100644 --- a/src/librbd/CopyupRequest.cc +++ b/src/librbd/CopyupRequest.cc @@ -13,6 +13,7 @@ #include "librbd/CopyupRequest.h" #include "librbd/ImageCtx.h" #include "librbd/ImageWatcher.h" +#include "librbd/internal.h" #include "librbd/ObjectMap.h" #include diff --git a/src/librbd/ImageCtx.cc b/src/librbd/ImageCtx.cc index d93e149b24a8..eb815283ef02 100644 --- a/src/librbd/ImageCtx.cc +++ b/src/librbd/ImageCtx.cc @@ -17,6 +17,7 @@ #include "librbd/internal.h" #include "librbd/ImageCtx.h" #include "librbd/ImageWatcher.h" +#include "librbd/Journal.h" #include "librbd/LibrbdAdminSocketHook.h" #include "librbd/ObjectMap.h" @@ -67,6 +68,7 @@ public: exclusive_locked(false), name(image_name), image_watcher(NULL), + journal(NULL), refresh_seq(0), last_refresh(0), owner_lock(unique_lock_name("librbd::ImageCtx::owner_lock", this)), @@ -110,6 +112,7 @@ public: } ImageCtx::~ImageCtx() { + assert(journal == NULL); if (perfcounter) { perf_stop(); } @@ -755,6 +758,7 @@ public: int ImageCtx::register_watch() { assert(image_watcher == NULL); image_watcher = new ImageWatcher(*this); + aio_work_queue->register_lock_listener(); return image_watcher->register_watch(); } @@ -938,4 +942,24 @@ public: ASSIGN_OPTION(request_timed_out_seconds); ASSIGN_OPTION(enable_alloc_hint); } + + void ImageCtx::open_journal() { + assert(journal == NULL); + journal = new Journal(*this); + } + + int ImageCtx::close_journal(bool force) { + assert(journal != NULL); + int r = journal->close(); + if (r < 0) { + lderr(cct) << "failed to flush journal: " << cpp_strerror(r) << dendl; + if (!force) { + return r; + } + } + + delete journal; + journal = NULL; + return r; + } } diff --git a/src/librbd/ImageCtx.h b/src/librbd/ImageCtx.h index 9e21e45493c3..2d41e0114507 100644 --- a/src/librbd/ImageCtx.h +++ b/src/librbd/ImageCtx.h @@ -44,6 +44,7 @@ namespace librbd { class CopyupRequest; class LibrbdAdminSocketHook; class ImageWatcher; + class Journal; struct ImageCtx { CephContext *cct; @@ -69,6 +70,7 @@ namespace librbd { std::string snap_name; IoCtx data_ctx, md_ctx; ImageWatcher *image_watcher; + Journal *journal; int refresh_seq; ///< sequence for refresh requests int last_refresh; ///< last completed refresh @@ -243,6 +245,9 @@ namespace librbd { void cancel_async_requests(); void apply_metadata_confs(); + + void open_journal(); + int close_journal(bool force); }; } diff --git a/src/librbd/ImageWatcher.cc b/src/librbd/ImageWatcher.cc index 026432cd5f59..69825a1df914 100644 --- a/src/librbd/ImageWatcher.cc +++ b/src/librbd/ImageWatcher.cc @@ -2,7 +2,6 @@ // vim: ts=8 sw=2 smarttab #include "librbd/ImageWatcher.h" #include "librbd/AioCompletion.h" -#include "librbd/AioImageRequestWQ.h" #include "librbd/ImageCtx.h" #include "librbd/internal.h" #include "librbd/ObjectMap.h" @@ -35,8 +34,10 @@ ImageWatcher::ImageWatcher(ImageCtx &image_ctx) : m_image_ctx(image_ctx), m_watch_lock(unique_lock_name("librbd::ImageWatcher::m_watch_lock", this)), m_watch_ctx(*this), m_watch_handle(0), - m_watch_state(WATCH_STATE_UNREGISTERED), m_aio_ops_pending(false), + m_watch_state(WATCH_STATE_UNREGISTERED), m_lock_supported(false), m_lock_owner_state(LOCK_OWNER_STATE_NOT_LOCKED), + m_listeners_lock(unique_lock_name("librbd::ImageWatcher::m_listeners_lock", this)), + m_listeners_in_use(false), m_task_finisher(new TaskFinisher(*m_image_ctx.cct)), m_async_request_lock(unique_lock_name("librbd::ImageWatcher::m_async_request_lock", this)), m_owner_client_id_lock(unique_lock_name("librbd::ImageWatcher::m_owner_client_id_lock", this)) @@ -74,6 +75,20 @@ bool ImageWatcher::is_lock_owner() const { m_lock_owner_state == LOCK_OWNER_STATE_RELEASING); } +void ImageWatcher::register_listener(Listener *listener) { + Mutex::Locker listeners_locker(m_listeners_lock); + m_listeners.push_back(listener); +} + +void ImageWatcher::unregister_listener(Listener *listener) { + // TODO CoW listener list + Mutex::Locker listeners_locker(m_listeners_lock); + while (m_listeners_in_use) { + m_listeners_cond.Wait(m_listeners_lock); + } + m_listeners.remove(listener); +} + int ImageWatcher::register_watch() { ldout(m_image_ctx.cct, 10) << this << " registering image watcher" << dendl; @@ -110,19 +125,41 @@ int ImageWatcher::unregister_watch() { return r; } -void ImageWatcher::refresh() { +int ImageWatcher::refresh() { assert(m_image_ctx.owner_lock.is_locked()); - if (is_lock_supported() && !is_lock_owner()) { - m_image_ctx.aio_work_queue->suspend_writes(); - } else if (!is_lock_supported()) { - m_image_ctx.aio_work_queue->resume_writes(); + bool lock_support_changed = false; + { + RWLock::WLocker watch_locker(m_watch_lock); + if (m_lock_supported != is_lock_supported()) { + m_lock_supported = is_lock_supported(); + lock_support_changed = true; + } + } + + int r = 0; + if (lock_support_changed) { + if (is_lock_supported() && !is_lock_owner()) { + // image opened, exclusive lock dynamically enabled, or now HEAD + notify_listeners_releasing_lock(); + } else if (!is_lock_supported() && is_lock_owner()) { + // exclusive lock dynamically disabled or now snapshot + m_image_ctx.owner_lock.put_read(); + { + RWLock::WLocker owner_locker(m_image_ctx.owner_lock); + r = release_lock(); + } + m_image_ctx.owner_lock.get_read(); + } + notify_listeners_updated_lock(); } + return r; } int ImageWatcher::try_lock() { assert(m_image_ctx.owner_lock.is_wlocked()); assert(m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED); + assert(is_lock_supported()); while (true) { int r = lock(); @@ -188,10 +225,6 @@ int ImageWatcher::try_lock() { } void ImageWatcher::request_lock() { - { - RWLock::WLocker watch_locker(m_watch_lock); - m_aio_ops_pending = true; - } schedule_request_lock(false); } @@ -275,6 +308,9 @@ int ImageWatcher::get_lock_owner_info(entity_name_t *locker, std::string *cookie } int ImageWatcher::lock() { + assert(m_image_ctx.owner_lock.is_wlocked()); + assert(m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED); + int r = rados::cls::lock::lock(&m_image_ctx.md_ctx, m_image_ctx.header_oid, RBD_LOCK_NAME, LOCK_EXCLUSIVE, encode_lock_cookie(), WATCHER_LOCK_TAG, "", @@ -302,39 +338,16 @@ int ImageWatcher::lock() { m_image_ctx.object_map.refresh(CEPH_NOSNAP); } - bufferlist bl; - ::encode(NotifyMessage(AcquiredLockPayload(get_client_id())), bl); - - m_image_ctx.aio_work_queue->resume_writes(); - // send the notification when we aren't holding locks FunctionContext *ctx = new FunctionContext( - boost::bind(&IoCtx::notify2, &m_image_ctx.md_ctx, m_image_ctx.header_oid, - bl, NOTIFY_TIMEOUT, reinterpret_cast(0))); + boost::bind(&ImageWatcher::notify_acquired_lock, this)); m_task_finisher->queue(TASK_CODE_ACQUIRED_LOCK, ctx); return 0; } -void ImageWatcher::prepare_unlock() { - assert(m_image_ctx.owner_lock.is_wlocked()); - if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) { - m_lock_owner_state = LOCK_OWNER_STATE_RELEASING; - } -} - -void ImageWatcher::cancel_unlock() { - assert(m_image_ctx.owner_lock.is_wlocked()); - if (m_lock_owner_state == LOCK_OWNER_STATE_RELEASING) { - m_lock_owner_state = LOCK_OWNER_STATE_LOCKED; - } -} - int ImageWatcher::unlock() { assert(m_image_ctx.owner_lock.is_wlocked()); - if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) { - return 0; - } ldout(m_image_ctx.cct, 10) << this << " releasing exclusive lock" << dendl; m_lock_owner_state = LOCK_OWNER_STATE_NOT_LOCKED; @@ -350,63 +363,77 @@ int ImageWatcher::unlock() m_image_ctx.object_map.unlock(); } - if (is_lock_supported()) { - m_image_ctx.aio_work_queue->suspend_writes(); + { + Mutex::Locker l(m_owner_client_id_lock); + set_owner_client_id(ClientId()); } - Mutex::Locker l(m_owner_client_id_lock); - set_owner_client_id(ClientId()); - FunctionContext *ctx = new FunctionContext( boost::bind(&ImageWatcher::notify_released_lock, this)); m_task_finisher->queue(TASK_CODE_RELEASED_LOCK, ctx); return 0; } -bool ImageWatcher::release_lock() +int ImageWatcher::release_lock() { assert(m_image_ctx.owner_lock.is_wlocked()); - ldout(m_image_ctx.cct, 10) << this << " releasing exclusive lock by request" - << dendl; - if (!is_lock_owner()) { - return false; + + CephContext *cct = m_image_ctx.cct; + ldout(cct, 10) << this << " releasing exclusive lock by request" << dendl; + if (m_lock_owner_state != LOCK_OWNER_STATE_LOCKED) { + return 0; } - prepare_unlock(); + + m_lock_owner_state = LOCK_OWNER_STATE_RELEASING; m_image_ctx.owner_lock.put_write(); + // ensure all maint operations are canceled m_image_ctx.cancel_async_requests(); m_image_ctx.flush_async_operations(); - m_image_ctx.aio_work_queue->suspend_writes(); + int r; { RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + + // alert listeners that all incoming IO needs to be stopped since the + // lock is being released + notify_listeners_releasing_lock(); + RWLock::WLocker md_locker(m_image_ctx.md_lock); - librbd::_flush(&m_image_ctx); + r = librbd::_flush(&m_image_ctx); + if (r < 0) { + lderr(cct) << this << " failed to flush: " << cpp_strerror(r) << dendl; + goto err_cancel_unlock; + } } m_image_ctx.owner_lock.get_write(); - if (!is_lock_owner()) { - return false; - } + assert(m_lock_owner_state == LOCK_OWNER_STATE_RELEASING); + r = unlock(); - unlock(); - return true; -} + // notify listeners of the change w/ owner read locked + m_image_ctx.owner_lock.put_write(); + { + RWLock::RLocker owner_lock(m_image_ctx.owner_lock); + if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) { + notify_listeners_updated_lock(); + } + } + m_image_ctx.owner_lock.get_write(); -void ImageWatcher::flag_aio_ops_pending() { - RWLock::WLocker watch_locker(m_watch_lock); - if (!m_aio_ops_pending) { - ldout(m_image_ctx.cct, 20) << this << " pending AIO ops" << dendl; - m_aio_ops_pending = true; + if (r < 0) { + lderr(cct) << this << " failed to unlock: " << cpp_strerror(r) << dendl; + return r; } -} -void ImageWatcher::clear_aio_ops_pending() { - RWLock::WLocker watch_locker(m_watch_lock); - if (m_aio_ops_pending) { - ldout(m_image_ctx.cct, 20) << this << " no pending AIO ops" << dendl; - m_aio_ops_pending = false; + return 0; + +err_cancel_unlock: + m_image_ctx.owner_lock.get_write(); + if (m_lock_owner_state == LOCK_OWNER_STATE_RELEASING) { + m_lock_owner_state = LOCK_OWNER_STATE_LOCKED; } + return r; } void ImageWatcher::assert_header_locked(librados::ObjectWriteOperation *op) { @@ -537,6 +564,21 @@ int ImageWatcher::notify_rebuild_object_map(uint64_t request_id, return notify_async_request(async_request_id, bl, prog_ctx); } +void ImageWatcher::notify_lock_state() { + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) { + // re-send the acquired lock notification so that peers know they can now + // request the lock + ldout(m_image_ctx.cct, 10) << this << " notify lock state" << dendl; + + bufferlist bl; + ::encode(NotifyMessage(AcquiredLockPayload(get_client_id())), bl); + + m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, + NULL); + } +} + void ImageWatcher::notify_header_update(librados::IoCtx &io_ctx, const std::string &oid) { @@ -592,6 +634,17 @@ ClientId ImageWatcher::get_client_id() { return ClientId(m_image_ctx.md_ctx.get_instance_id(), m_watch_handle); } +void ImageWatcher::notify_acquired_lock() { + ldout(m_image_ctx.cct, 10) << this << " notify acquired lock" << dendl; + + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + notify_listeners_updated_lock(); + + bufferlist bl; + ::encode(NotifyMessage(AcquiredLockPayload(get_client_id())), bl); + m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL); +} + void ImageWatcher::notify_release_lock() { RWLock::WLocker owner_locker(m_image_ctx.owner_lock); release_lock(); @@ -609,7 +662,7 @@ void ImageWatcher::schedule_request_lock(bool use_timer, int timer_delay) { assert(m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED); RWLock::RLocker watch_locker(m_watch_lock); - if (m_watch_state == WATCH_STATE_REGISTERED && m_aio_ops_pending) { + if (m_watch_state == WATCH_STATE_REGISTERED) { ldout(m_image_ctx.cct, 15) << this << " requesting exclusive lock" << dendl; FunctionContext *ctx = new FunctionContext( @@ -806,40 +859,48 @@ void ImageWatcher::handle_payload(const AcquiredLockPayload &payload, bufferlist *out) { ldout(m_image_ctx.cct, 10) << this << " image exclusively locked announcement" << dendl; + + bool cancel_async_requests = true; if (payload.client_id.is_valid()) { Mutex::Locker l(m_owner_client_id_lock); if (payload.client_id == m_owner_client_id) { - // we already know that the remote client is the owner - return; + cancel_async_requests = false; } set_owner_client_id(payload.client_id); } RWLock::RLocker owner_locker(m_image_ctx.owner_lock); if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) { - schedule_cancel_async_requests(); - schedule_request_lock(false); + if (cancel_async_requests) { + schedule_cancel_async_requests(); + } + notify_listeners_updated_lock(); } } void ImageWatcher::handle_payload(const ReleasedLockPayload &payload, bufferlist *out) { ldout(m_image_ctx.cct, 10) << this << " exclusive lock released" << dendl; + + bool cancel_async_requests = true; if (payload.client_id.is_valid()) { Mutex::Locker l(m_owner_client_id_lock); if (payload.client_id != m_owner_client_id) { ldout(m_image_ctx.cct, 10) << this << " unexpected owner: " << payload.client_id << " != " << m_owner_client_id << dendl; - return; + cancel_async_requests = false; + } else { + set_owner_client_id(ClientId()); } - set_owner_client_id(ClientId()); } RWLock::RLocker owner_locker(m_image_ctx.owner_lock); if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) { - schedule_cancel_async_requests(); - schedule_request_lock(false); + if (cancel_async_requests) { + schedule_cancel_async_requests(); + } + notify_listeners_updated_lock(); } } @@ -862,11 +923,25 @@ void ImageWatcher::handle_payload(const RequestLockPayload &payload, } } - ldout(m_image_ctx.cct, 10) << this << " queuing release of exclusive lock" - << dendl; - FunctionContext *ctx = new FunctionContext( - boost::bind(&ImageWatcher::notify_release_lock, this)); - m_task_finisher->queue(TASK_CODE_RELEASING_LOCK, ctx); + bool release_permitted = true; + { + Mutex::Locker listeners_locker(m_listeners_lock); + for (Listeners::iterator it = m_listeners.begin(); + it != m_listeners.end(); ++it) { + if (!(*it)->handle_requested_lock()) { + release_permitted = false; + break; + } + } + } + + if (release_permitted) { + ldout(m_image_ctx.cct, 10) << this << " queuing release of exclusive lock" + << dendl; + FunctionContext *ctx = new FunctionContext( + boost::bind(&ImageWatcher::notify_release_lock, this)); + m_task_finisher->queue(TASK_CODE_RELEASING_LOCK, ctx); + } } } @@ -1116,7 +1191,7 @@ void ImageWatcher::reregister_watch() { } if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) { - schedule_request_lock(false); + notify_listeners_updated_lock(); } } @@ -1135,4 +1210,52 @@ void ImageWatcher::RemoteContext::finish(int r) { m_image_watcher.schedule_async_complete(m_async_request_id, r); } +void ImageWatcher::notify_listeners_releasing_lock() { + assert(m_image_ctx.owner_lock.is_locked()); + + Listeners listeners; + { + Mutex::Locker listeners_locker(m_listeners_lock); + m_listeners_in_use = true; + listeners = m_listeners; + } + + for (Listeners::iterator it = listeners.begin(); + it != listeners.end(); ++it) { + (*it)->handle_releasing_lock(); + } + + Mutex::Locker listeners_locker(m_listeners_lock); + m_listeners_in_use = false; + m_listeners_cond.Signal(); +} + +void ImageWatcher::notify_listeners_updated_lock() { + assert(m_image_ctx.owner_lock.is_locked()); + + Listeners listeners; + { + Mutex::Locker listeners_locker(m_listeners_lock); + m_listeners_in_use = true; + listeners = m_listeners; + } + + bool lock_supported; + { + RWLock::RLocker watch_locker(m_watch_lock); + lock_supported = m_lock_supported; + } + + assert(lock_supported || m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED); + for (Listeners::iterator it = listeners.begin(); + it != listeners.end(); ++it) { + (*it)->handle_lock_updated(lock_supported, + m_lock_owner_state == LOCK_OWNER_STATE_LOCKED); + } + + Mutex::Locker listeners_locker(m_listeners_lock); + m_listeners_in_use = false; + m_listeners_cond.Signal(); +} + } diff --git a/src/librbd/ImageWatcher.h b/src/librbd/ImageWatcher.h index d6fda2b85c78..c2c0ce395a75 100644 --- a/src/librbd/ImageWatcher.h +++ b/src/librbd/ImageWatcher.h @@ -3,6 +3,7 @@ #ifndef CEPH_LIBRBD_IMAGE_WATCHER_H #define CEPH_LIBRBD_IMAGE_WATCHER_H +#include "common/Cond.h" #include "common/Mutex.h" #include "common/RWLock.h" #include "include/Context.h" @@ -25,6 +26,13 @@ template class TaskFinisher; class ImageWatcher { public: + struct Listener { + virtual ~Listener() {} + + virtual bool handle_requested_lock() = 0; + virtual void handle_releasing_lock() = 0; + virtual void handle_lock_updated(bool lock_supported, bool lock_owner) = 0; + }; ImageWatcher(ImageCtx& image_ctx); ~ImageWatcher(); @@ -33,19 +41,17 @@ public: bool is_lock_supported(const RWLock &snap_lock) const; bool is_lock_owner() const; + void register_listener(Listener *listener); + void unregister_listener(Listener *listener); + int register_watch(); int unregister_watch(); - void refresh(); + int refresh(); int try_lock(); void request_lock(); - void prepare_unlock(); - void cancel_unlock(); - int unlock(); - - void flag_aio_ops_pending(); - void clear_aio_ops_pending(); + int release_lock(); void assert_header_locked(librados::ObjectWriteOperation *op); @@ -59,6 +65,7 @@ public: int notify_rebuild_object_map(uint64_t request_id, ProgressContext &prog_ctx); + void notify_lock_state(); static void notify_header_update(librados::IoCtx &io_ctx, const std::string &oid); @@ -87,6 +94,7 @@ private: TASK_CODE_ASYNC_PROGRESS }; + typedef std::list Listeners; typedef std::pair AsyncRequest; class Task { @@ -194,10 +202,16 @@ private: WatchCtx m_watch_ctx; uint64_t m_watch_handle; WatchState m_watch_state; - bool m_aio_ops_pending; + + bool m_lock_supported; LockOwnerState m_lock_owner_state; + Mutex m_listeners_lock; + Cond m_listeners_cond; + Listeners m_listeners; + bool m_listeners_in_use; + TaskFinisher *m_task_finisher; RWLock m_async_request_lock; @@ -213,7 +227,7 @@ private: int get_lock_owner_info(entity_name_t *locker, std::string *cookie, std::string *address, uint64_t *handle); int lock(); - bool release_lock(); + int unlock(); bool try_request_lock(); void schedule_cancel_async_requests(); @@ -222,6 +236,7 @@ private: void set_owner_client_id(const WatchNotify::ClientId &client_id); WatchNotify::ClientId get_client_id(); + void notify_acquired_lock(); void notify_release_lock(); void notify_released_lock(); @@ -281,6 +296,9 @@ private: void acknowledge_notify(uint64_t notify_id, uint64_t handle, bufferlist &out); void reregister_watch(); + + void notify_listeners_releasing_lock(); + void notify_listeners_updated_lock(); }; } // namespace librbd diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc new file mode 100644 index 000000000000..e7ec7e37b438 --- /dev/null +++ b/src/librbd/Journal.cc @@ -0,0 +1,419 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "librbd/Journal.h" +#include "librbd/AioCompletion.h" +#include "librbd/AioImageRequestWQ.h" +#include "librbd/AioObjectRequest.h" +#include "librbd/ImageCtx.h" +#include "librbd/JournalTypes.h" +#include "journal/Journaler.h" +#include "journal/ReplayEntry.h" +#include "common/errno.h" + +#define dout_subsys ceph_subsys_rbd +#undef dout_prefix +#define dout_prefix *_dout << "librbd::Journal: " + +namespace librbd { + +namespace { + +const std::string CLIENT_DESCRIPTION = "master image"; + +} // anonymous namespace + +Journal::Journal(ImageCtx &image_ctx) + : m_image_ctx(image_ctx), m_journaler(NULL), + m_lock("Journal::m_lock"), m_state(STATE_UNINITIALIZED), + m_lock_listener(this), m_replay_handler(this), m_close_pending(false), + m_next_tid(0), m_blocking_writes(false) { + + ldout(m_image_ctx.cct, 5) << this << ": ictx=" << &m_image_ctx << dendl; + + m_image_ctx.image_watcher->register_listener(&m_lock_listener); + + Mutex::Locker locker(m_lock); + block_writes(); +} + +Journal::~Journal() { + assert(m_journaler == NULL); + + m_image_ctx.image_watcher->unregister_listener(&m_lock_listener); + + Mutex::Locker locker(m_lock); + unblock_writes(); +} + +bool Journal::is_journal_supported(ImageCtx &image_ctx) { + assert(image_ctx.snap_lock.is_locked()); + return ((image_ctx.features & RBD_FEATURE_JOURNALING) && + !image_ctx.read_only && image_ctx.snap_id == CEPH_NOSNAP); +} + +int Journal::create(librados::IoCtx &io_ctx, const std::string &image_id) { + CephContext *cct = reinterpret_cast(io_ctx.cct()); + ldout(cct, 5) << __func__ << ": image=" << image_id << dendl; + + // TODO configurable commit flush interval + ::journal::Journaler journaler(io_ctx, image_id, "", 5); + + // TODO order / splay width via config / image metadata / data pool + int r = journaler.create(24, 4, io_ctx.get_id()); + if (r < 0) { + lderr(cct) << "failed to create journal: " << cpp_strerror(r) << dendl; + return r; + } + + r = journaler.register_client(CLIENT_DESCRIPTION); + if (r < 0) { + lderr(cct) << "failed to register client: " << cpp_strerror(r) << dendl; + return r; + } + return 0; +} + +int Journal::remove(librados::IoCtx &io_ctx, const std::string &image_id) { + CephContext *cct = reinterpret_cast(io_ctx.cct()); + ldout(cct, 5) << __func__ << ": image=" << image_id << dendl; + + return 0; +} + +bool Journal::is_journal_ready() const { + Mutex::Locker locker(m_lock); + return (m_state == STATE_RECORDING); +} + +void Journal::open() { + Mutex::Locker locker(m_lock); + if (m_journaler != NULL) { + return; + } + + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << dendl; + create_journaler(); +} + +int Journal::close() { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": state=" << m_state << dendl; + + Mutex::Locker locker(m_lock); + if (m_state == STATE_UNINITIALIZED) { + return 0; + } + + int r; + bool done = false; + while (!done) { + switch (m_state) { + case STATE_UNINITIALIZED: + done = true; + break; + case STATE_INITIALIZING: + case STATE_REPLAYING: + m_close_pending = true; + wait_for_state_transition(); + break; + case STATE_RECORDING: + r = stop_recording(); + if (r < 0) { + return r; + } + done = true; + break; + default: + assert(false); + } + } + + destroy_journaler(); + return 0; +} + +uint64_t Journal::append_event(AioCompletion *aio_comp, + const journal::EventEntry &event_entry, + const AioObjectRequests &requests, + bool flush_entry) { + assert(m_image_ctx.owner_lock.is_locked()); + assert(m_state == STATE_RECORDING); + + bufferlist bl; + ::encode(event_entry, bl); + + ::journal::Future future = m_journaler->append("", bl); + uint64_t tid; + { + Mutex::Locker locker(m_lock); + tid = m_next_tid++; + m_events[tid] = Event(future, aio_comp, requests); + } + + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": " + << "event=" << event_entry.get_event_type() << ", " + << "new_reqs=" << requests.size() << ", " + << "flush=" << flush_entry << ", tid=" << tid << dendl; + + Context *on_safe = new C_EventSafe(this, tid); + if (flush_entry) { + future.flush(on_safe); + } else { + future.wait(on_safe); + } + return tid; +} + +void Journal::create_journaler() { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << dendl; + + assert(m_lock.is_locked()); + assert(m_state == STATE_UNINITIALIZED); + + // TODO allow alternate pool for journal objects and commit flush interval + m_close_pending = false; + m_journaler = new ::journal::Journaler(m_image_ctx.md_ctx, m_image_ctx.id, "", + 5); + + m_journaler->init(new C_InitJournal(this)); + transition_state(STATE_INITIALIZING); +} + +void Journal::destroy_journaler() { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << dendl; + + assert(m_lock.is_locked()); + + m_close_pending = false; + delete m_journaler; + m_journaler = NULL; + transition_state(STATE_UNINITIALIZED); +} + +void Journal::handle_initialized(int r) { + CephContext *cct = m_image_ctx.cct; + if (r < 0) { + lderr(cct) << this << " " << __func__ << ": r=" << r << dendl; + Mutex::Locker locker(m_lock); + + // TODO: failed to open journal -- retry? + destroy_journaler(); + create_journaler(); + return; + } + + ldout(cct, 20) << this << " " << __func__ << dendl; + Mutex::Locker locker(m_lock); + if (m_close_pending) { + destroy_journaler(); + return; + } + + transition_state(STATE_REPLAYING); + m_journaler->start_replay(&m_replay_handler); +} + +void Journal::handle_replay_ready() { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << dendl; + + Mutex::Locker locker(m_lock); + while (true) { + if (m_close_pending) { + m_journaler->stop_replay(); + destroy_journaler(); + return; + } + + ::journal::ReplayEntry replay_entry; + if (!m_journaler->try_pop_front(&replay_entry)) { + return; + } + + m_lock.Unlock(); + // TODO process the payload + m_lock.Lock(); + } +} + +void Journal::handle_replay_complete(int r) { + CephContext *cct = m_image_ctx.cct; + + { + Mutex::Locker locker(m_lock); + if (r < 0) { + lderr(cct) << this << " " << __func__ << ": r=" << r << dendl; + + // TODO: failed to replay journal -- retry? + destroy_journaler(); + create_journaler(); + return; + } + + ldout(cct, 20) << this << " " << __func__ << dendl; + m_journaler->stop_replay(); + + if (m_close_pending) { + destroy_journaler(); + return; + } + + // TODO configurable flush interval, flush bytes, and flush age + m_journaler->start_append(0, 0, 0); + transition_state(STATE_RECORDING); + + unblock_writes(); + } + + // kick peers to let them know they can re-request the lock now + m_image_ctx.image_watcher->notify_lock_state(); +} + +void Journal::handle_event_safe(int r, uint64_t tid) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", " + << "tid=" << tid << dendl; + + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + + AioCompletion *aio_comp; + AioObjectRequests aio_object_requests; + Contexts on_safe_contexts; + { + Mutex::Locker locker(m_lock); + Events::iterator it = m_events.find(tid); + assert(it != m_events.end()); + + Event &event = it->second; + aio_comp = event.aio_comp; + aio_object_requests.swap(event.aio_object_requests); + on_safe_contexts.swap(event.on_safe_contexts); + m_events.erase(it); + } + + ldout(cct, 20) << "completing tid=" << tid << dendl; + + assert(m_image_ctx.image_watcher->is_lock_owner()); + + if (r < 0) { + // don't send aio requests if the journal fails -- bubble error up + aio_comp->fail(cct, r); + } else { + // send any waiting aio requests now that journal entry is safe + for (AioObjectRequests::iterator it = aio_object_requests.begin(); + it != aio_object_requests.end(); ++it) { + (*it)->send(); + } + } + + // alert the cache about the journal event status + for (Contexts::iterator it = on_safe_contexts.begin(); + it != on_safe_contexts.end(); ++it) { + (*it)->complete(r); + } +} + +bool Journal::handle_requested_lock() { + Mutex::Locker locker(m_lock); + + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": " << "state=" << m_state + << dendl; + + // prevent peers from taking our lock while we are replaying + return (m_state != STATE_INITIALIZING && m_state != STATE_REPLAYING); +} + +void Journal::handle_releasing_lock() { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << dendl; + + Mutex::Locker locker(m_lock); + if (m_state == STATE_INITIALIZING || m_state == STATE_REPLAYING) { + // wait for replay to successfully interrupt + m_close_pending = true; + wait_for_state_transition(); + } + + if (m_state == STATE_UNINITIALIZED || m_state == STATE_RECORDING) { + // prevent new write ops but allow pending ops to flush to the journal + block_writes(); + } +} + +void Journal::handle_lock_updated(bool lock_owner) { + + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": " + << "owner=" << lock_owner << dendl; + + Mutex::Locker locker(m_lock); + if (lock_owner && m_state == STATE_UNINITIALIZED) { + create_journaler(); + } else if (!lock_owner && m_state != STATE_UNINITIALIZED) { + assert(m_state == STATE_RECORDING); + assert(m_events.empty()); + int r = stop_recording(); + if (r < 0) { + // TODO handle failed journal writes + assert(false); + } + } +} + +int Journal::stop_recording() { + C_SaferCond cond; + + m_journaler->stop_append(&cond); + + m_lock.Unlock(); + int r = cond.wait(); + m_lock.Lock(); + + destroy_journaler(); + if (r < 0) { + lderr(m_image_ctx.cct) << "failed to flush journal: " << cpp_strerror(r) + << dendl; + return r; + } + return 0; +} + +void Journal::block_writes() { + assert(m_lock.is_locked()); + if (!m_blocking_writes) { + m_blocking_writes = true; + m_image_ctx.aio_work_queue->block_writes(); + } +} + +void Journal::unblock_writes() { + assert(m_lock.is_locked()); + if (m_blocking_writes) { + m_blocking_writes = false; + m_image_ctx.aio_work_queue->unblock_writes(); + } +} + +void Journal::transition_state(State state) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": new state=" << state << dendl; + assert(m_lock.is_locked()); + m_state = state; + m_cond.Signal(); +} + +void Journal::wait_for_state_transition() { + assert(m_lock.is_locked()); + State state = m_state; + while (m_state == state) { + m_cond.Wait(m_lock); + } +} + +} // namespace librbd diff --git a/src/librbd/Journal.h b/src/librbd/Journal.h new file mode 100644 index 000000000000..35c173894439 --- /dev/null +++ b/src/librbd/Journal.h @@ -0,0 +1,182 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#ifndef CEPH_LIBRBD_JOURNAL_H +#define CEPH_LIBRBD_JOURNAL_H + +#include "include/int_types.h" +#include "include/Context.h" +#include "include/unordered_map.h" +#include "include/rados/librados.hpp" +#include "common/Mutex.h" +#include "common/Cond.h" +#include "journal/Future.h" +#include "journal/ReplayHandler.h" +#include "librbd/ImageWatcher.h" +#include +#include + +class Context; +namespace journal { +class Journaler; +} + +namespace librbd { + +class AioCompletion; +class AioObjectRequest; +class ImageCtx; +namespace journal { +class EventEntry; +} + +class Journal { +public: + typedef std::list AioObjectRequests; + + Journal(ImageCtx &image_ctx); + ~Journal(); + + static bool is_journal_supported(ImageCtx &image_ctx); + static int create(librados::IoCtx &io_ctx, const std::string &image_id); + static int remove(librados::IoCtx &io_ctx, const std::string &image_id); + + bool is_journal_ready() const; + + void open(); + int close(); + + uint64_t append_event(AioCompletion *aio_comp, + const journal::EventEntry &event_entry, + const AioObjectRequests &requests, + bool flush_entry); + +private: + typedef std::list Contexts; + + enum State { + STATE_UNINITIALIZED, + STATE_INITIALIZING, + STATE_REPLAYING, + STATE_RECORDING, + }; + + struct Event { + ::journal::Future future; + AioCompletion *aio_comp; + AioObjectRequests aio_object_requests; + Contexts on_safe_contexts; + + Event() : aio_comp(NULL) { + } + Event(const ::journal::Future &_future, AioCompletion *_aio_comp, + const AioObjectRequests &_requests) + : future(_future), aio_comp(_aio_comp), aio_object_requests(_requests) { + } + }; + typedef ceph::unordered_map Events; + + struct LockListener : public ImageWatcher::Listener { + Journal *journal; + LockListener(Journal *_journal) : journal(_journal) { + } + + virtual bool handle_requested_lock() { + return journal->handle_requested_lock(); + } + virtual void handle_releasing_lock() { + journal->handle_releasing_lock(); + } + virtual void handle_lock_updated(bool lock_supported, bool lock_owner) { + journal->handle_lock_updated(lock_owner); + } + }; + + struct C_InitJournal : public Context { + Journal *journal; + + C_InitJournal(Journal *_journal) : journal(_journal) { + } + + virtual void finish(int r) { + journal->handle_initialized(r); + } + }; + + struct C_EventSafe : public Context { + Journal *journal; + uint64_t tid; + + C_EventSafe(Journal *_journal, uint64_t _tid) + : journal(_journal), tid(_tid) { + } + + virtual void finish(int r) { + journal->handle_event_safe(r, tid); + } + }; + + struct ReplayHandler : public ::journal::ReplayHandler { + Journal *journal; + ReplayHandler(Journal *_journal) : journal(_journal) { + } + + virtual void get() { + // TODO + } + virtual void put() { + // TODO + } + + virtual void handle_entries_available() { + journal->handle_replay_ready(); + } + virtual void handle_complete(int r) { + journal->handle_replay_complete(r); + } + }; + + ImageCtx &m_image_ctx; + + ::journal::Journaler *m_journaler; + + mutable Mutex m_lock; + Cond m_cond; + State m_state; + + LockListener m_lock_listener; + + ReplayHandler m_replay_handler; + bool m_close_pending; + + uint64_t m_next_tid; + Events m_events; + + bool m_blocking_writes; + + void create_journaler(); + void destroy_journaler(); + + void handle_initialized(int r); + + void handle_replay_ready(); + void handle_replay_complete(int r); + + void handle_event_safe(int r, uint64_t tid); + + bool handle_requested_lock(); + void handle_releasing_lock(); + void handle_lock_updated(bool lock_owner); + + int stop_recording(); + + void block_writes(); + void unblock_writes(); + + void transition_state(State state); + void wait_for_state_transition(); +}; + +} // namespace librbd + +#endif // CEPH_LIBRBD_JOURNAL_H diff --git a/src/librbd/Makefile.am b/src/librbd/Makefile.am index 84fed189ffc1..863531f78638 100644 --- a/src/librbd/Makefile.am +++ b/src/librbd/Makefile.am @@ -23,6 +23,7 @@ librbd_internal_la_SOURCES = \ librbd/ImageCtx.cc \ librbd/ImageWatcher.cc \ librbd/internal.cc \ + librbd/Journal.cc \ librbd/LibrbdAdminSocketHook.cc \ librbd/LibrbdWriteback.cc \ librbd/ObjectMap.cc \ @@ -36,11 +37,12 @@ noinst_LTLIBRARIES += librbd_api.la librbd_la_SOURCES = \ librbd/librbd.cc librbd_la_LIBADD = \ - librbd_internal.la $(LIBRBD_TYPES) \ + librbd_internal.la $(LIBRBD_TYPES) libjournal.la \ $(LIBRADOS) $(LIBCOMMON) $(LIBOSDC) \ librados_internal.la \ libcls_rbd_client.la \ libcls_lock_client.la \ + libcls_journal_client.la \ $(PTHREAD_LIBS) $(EXTRALIBS) librbd_la_LDFLAGS = ${AM_LDFLAGS} -version-info 1:0:0 @@ -66,6 +68,7 @@ noinst_HEADERS += \ librbd/ImageCtx.h \ librbd/ImageWatcher.h \ librbd/internal.h \ + librbd/Journal.h \ librbd/JournalTypes.h \ librbd/LibrbdAdminSocketHook.h \ librbd/LibrbdWriteback.h \ diff --git a/src/librbd/ObjectMap.cc b/src/librbd/ObjectMap.cc index d94780711d53..e1f2e17796ff 100644 --- a/src/librbd/ObjectMap.cc +++ b/src/librbd/ObjectMap.cc @@ -21,6 +21,10 @@ ObjectMap::ObjectMap(ImageCtx &image_ctx) { } +int ObjectMap::remove(librados::IoCtx &io_ctx, const std::string &image_id) { + return io_ctx.remove(object_map_name(image_id, CEPH_NOSNAP)); +} + std::string ObjectMap::object_map_name(const std::string &image_id, uint64_t snap_id) { std::string oid(RBD_OBJECT_MAP_PREFIX + image_id); diff --git a/src/librbd/ObjectMap.h b/src/librbd/ObjectMap.h index 797307f2820d..1737e1246684 100644 --- a/src/librbd/ObjectMap.h +++ b/src/librbd/ObjectMap.h @@ -21,6 +21,7 @@ public: ObjectMap(ImageCtx &image_ctx); + static int remove(librados::IoCtx &io_ctx, const std::string &image_id); static std::string object_map_name(const std::string &image_id, uint64_t snap_id); diff --git a/src/librbd/internal.cc b/src/librbd/internal.cc index 763cee170dad..3a88b6ab78be 100644 --- a/src/librbd/internal.cc +++ b/src/librbd/internal.cc @@ -29,6 +29,7 @@ #include "librbd/ImageCtx.h" #include "librbd/ImageWatcher.h" #include "librbd/internal.h" +#include "librbd/Journal.h" #include "librbd/ObjectMap.h" #include "librbd/parent_types.h" #include "librbd/RebuildObjectMapRequest.h" @@ -1301,13 +1302,37 @@ reprotect_and_return_err: OBJECT_NONEXISTENT); r = io_ctx.operate(ObjectMap::object_map_name(id, CEPH_NOSNAP), &op); if (r < 0) { + lderr(cct) << "error creating initial object map: " + << cpp_strerror(r) << dendl; goto err_remove_header; } } + if ((features & RBD_FEATURE_JOURNALING) != 0) { + if ((features & RBD_FEATURE_EXCLUSIVE_LOCK) == 0) { + lderr(cct) << "cannot use journaling without exclusive lock" << dendl; + goto err_remove_object_map; + } + + r = Journal::create(io_ctx, id); + if (r < 0) { + lderr(cct) << "error creating journal: " << cpp_strerror(r) << dendl; + goto err_remove_object_map; + } + } + ldout(cct, 2) << "done." << dendl; return 0; + err_remove_object_map: + if ((features & RBD_FEATURE_OBJECT_MAP) != 0) { + remove_r = ObjectMap::remove(io_ctx, id); + if (remove_r < 0) { + lderr(cct) << "error cleaning up object map after creation failed: " + << cpp_strerror(remove_r) << dendl; + } + } + err_remove_header: remove_r = io_ctx.remove(header_oid); if (remove_r < 0) { @@ -1816,6 +1841,13 @@ reprotect_and_return_err: return -EINVAL; } + RWLock::RLocker owner_locker(ictx->owner_lock); + RWLock::WLocker md_locker(ictx->md_lock); + r = _flush(ictx); + if (r < 0) { + return r; + } + if ((features & RBD_FEATURES_MUTABLE) != features) { lderr(cct) << "cannot update immutable features" << dendl; return -EINVAL; @@ -1824,13 +1856,17 @@ reprotect_and_return_err: return -EINVAL; } - RWLock::RLocker l(ictx->snap_lock); - uint64_t new_features = ictx->features | features; - if (!enabled) { + RWLock::WLocker snap_locker(ictx->snap_lock); + uint64_t new_features; + if (enabled) { + features &= ~ictx->features; + new_features = ictx->features | features; + } else { + features &= ictx->features; new_features = ictx->features & ~features; } - if (ictx->features == new_features) { + if (features == 0) { return 0; } @@ -1861,6 +1897,13 @@ reprotect_and_return_err: return -EINVAL; } features_mask |= RBD_FEATURE_EXCLUSIVE_LOCK; + + r = Journal::create(ictx->md_ctx, ictx->id); + if (r < 0) { + lderr(cct) << "error creating image journal: " << cpp_strerror(r) + << dendl; + return r; + } } if (enable_flags != 0) { @@ -1894,6 +1937,14 @@ reprotect_and_return_err: if ((features & RBD_FEATURE_FAST_DIFF) != 0) { disable_flags = RBD_FLAG_FAST_DIFF_INVALID; } + if ((features & RBD_FEATURE_JOURNALING) != 0) { + r = Journal::remove(ictx->md_ctx, ictx->id); + if (r < 0) { + lderr(cct) << "error removing image journal: " << cpp_strerror(r) + << dendl; + return r; + } + } } ldout(cct, 10) << "update_features: features=" << new_features << ", mask=" @@ -1903,6 +1954,7 @@ reprotect_and_return_err: if (r < 0) { lderr(cct) << "failed to update features: " << cpp_strerror(r) << dendl; + return r; } if (((ictx->features & RBD_FEATURE_OBJECT_MAP) == 0) && ((features & RBD_FEATURE_OBJECT_MAP) != 0)) { @@ -2186,9 +2238,16 @@ reprotect_and_return_err: } } if (!old_format) { - r = io_ctx.remove(ObjectMap::object_map_name(id, CEPH_NOSNAP)); + r = Journal::remove(io_ctx, id); + if (r < 0 && r != -ENOENT) { + lderr(cct) << "error removing image journal" << dendl; + return r; + } + + r = ObjectMap::remove(io_ctx, id); if (r < 0 && r != -ENOENT) { lderr(cct) << "error removing image object map" << dendl; + return r; } ldout(cct, 2) << "removing id object..." << dendl; @@ -2705,6 +2764,17 @@ reprotect_and_return_err: ictx->object_map.refresh(ictx->snap_id); ictx->data_ctx.selfmanaged_snap_set_write_ctx(ictx->snapc.seq, ictx->snaps); + + // dynamically enable/disable journaling support + if ((ictx->features & RBD_FEATURE_JOURNALING) != 0 && + ictx->image_watcher != NULL && ictx->journal == NULL && + ictx->snap_name.empty()) { + ictx->open_journal(); + } else if ((ictx->features & RBD_FEATURE_JOURNALING) == 0 && + ictx->journal != NULL) { + // TODO journal needs to be disabled via proxied request to avoid race + // between deleting journal and appending journal events + } } // release snap_lock and cache_lock if (ictx->image_watcher != NULL) { @@ -2997,44 +3067,57 @@ reprotect_and_return_err: // snapshot and the user is trying to fix that ictx_check(ictx); - bool unlocking = false; - { - RWLock::WLocker l(ictx->owner_lock); - if (ictx->image_watcher != NULL && ictx->image_watcher->is_lock_owner() && - snap_name != NULL && strlen(snap_name) != 0) { - // stop incoming requests since we will release the lock - ictx->image_watcher->prepare_unlock(); - unlocking = true; + int r; + bool snapshot_mode = (snap_name != NULL && strlen(snap_name) != 0); + if (snapshot_mode) { + { + RWLock::WLocker owner_locker(ictx->owner_lock); + if (ictx->image_watcher != NULL && + ictx->image_watcher->is_lock_owner()) { + r = ictx->image_watcher->release_lock(); + if (r < 0) { + return r; + } + } } - } - ictx->cancel_async_requests(); - ictx->flush_async_operations(); - if (ictx->object_cacher) { - // complete pending writes before we're set to a snapshot and - // get -EROFS for writes - RWLock::RLocker owner_locker(ictx->owner_lock); - RWLock::WLocker md_locker(ictx->md_lock); - ictx->flush_cache(); + ictx->cancel_async_requests(); + ictx->flush_async_operations(); + + if (ictx->object_cacher) { + RWLock::RLocker owner_locker(ictx->owner_lock); + r = _flush(ictx); + if (r < 0) { + return r; + } + } + + { + RWLock::WLocker snap_locker(ictx->snap_lock); + if (ictx->journal != NULL) { + r = ictx->close_journal(false); + if (r < 0) { + return r; + } + } + } } - int r = _snap_set(ictx, snap_name); + + r = _snap_set(ictx, snap_name); if (r < 0) { - RWLock::WLocker l(ictx->owner_lock); - if (unlocking) { - ictx->image_watcher->cancel_unlock(); - } return r; } - RWLock::WLocker l(ictx->owner_lock); - if (ictx->image_watcher != NULL) { - if (unlocking) { - r = ictx->image_watcher->unlock(); - if (r < 0) { - lderr(ictx->cct) << "error unlocking image: " << cpp_strerror(r) - << dendl; - } + { + RWLock::WLocker snap_locker(ictx->snap_lock); + if ((ictx->features & RBD_FEATURE_JOURNALING) != 0 && + ictx->journal == NULL && !snapshot_mode) { + ictx->open_journal(); } + } + + RWLock::RLocker owner_locker(ictx->owner_lock); + if (ictx->image_watcher != NULL) { ictx->image_watcher->refresh(); } return r; @@ -3086,31 +3169,42 @@ reprotect_and_return_err: { ldout(ictx->cct, 20) << "close_image " << ictx << dendl; + // finish all incoming IO operations + ictx->aio_work_queue->drain(); + + int r = 0; { - RWLock::WLocker l(ictx->owner_lock); + // release the lock (and flush all in-flight IO) + RWLock::WLocker owner_locker(ictx->owner_lock); if (ictx->image_watcher != NULL && ictx->image_watcher->is_lock_owner()) { - // stop incoming requests - ictx->image_watcher->prepare_unlock(); + r = ictx->image_watcher->release_lock(); + if (r < 0) { + lderr(ictx->cct) << "error releasing image lock: " << cpp_strerror(r) + << dendl; + } } } - assert(!ictx->aio_work_queue->writes_suspended() || + assert(!ictx->aio_work_queue->writes_blocked() || ictx->aio_work_queue->writes_empty()); - ictx->aio_work_queue->drain(); + ictx->cancel_async_requests(); ictx->flush_async_operations(); ictx->readahead.wait_for_pending(); - int r; + int flush_r; if (ictx->object_cacher) { - r = ictx->shutdown_cache(); // implicitly flushes + flush_r = ictx->shutdown_cache(); // implicitly flushes } else { RWLock::RLocker owner_locker(ictx->owner_lock); - r = _flush(ictx); + flush_r = _flush(ictx); } - if (r < 0) { - lderr(ictx->cct) << "error flushing IO: " << cpp_strerror(r) + if (flush_r< 0) { + lderr(ictx->cct) << "error flushing IO: " << cpp_strerror(flush_r) << dendl; + if (r == 0) { + r = flush_r; + } } ictx->op_work_queue->drain(); @@ -3120,6 +3214,13 @@ reprotect_and_return_err: ictx->copyup_finisher->stop(); } + if (ictx->journal != NULL) { + int close_r = ictx->close_journal(true); + if (close_r < 0 && r == 0) { + r = close_r; + } + } + if (ictx->parent) { int close_r = close_image(ictx->parent); if (r == 0 && close_r < 0) { @@ -3129,19 +3230,6 @@ reprotect_and_return_err: } if (ictx->image_watcher) { - { - RWLock::WLocker l(ictx->owner_lock); - if (ictx->image_watcher->is_lock_owner()) { - int unlock_r = ictx->image_watcher->unlock(); - if (unlock_r < 0) { - lderr(ictx->cct) << "error unlocking image: " - << cpp_strerror(unlock_r) << dendl; - if (r == 0) { - r = unlock_r; - } - } - } - } ictx->unregister_watch(); } diff --git a/src/test/Makefile-client.am b/src/test/Makefile-client.am index 9c7471117f53..cc96eaa8e430 100644 --- a/src/test/Makefile-client.am +++ b/src/test/Makefile-client.am @@ -355,6 +355,7 @@ unittest_librbd_CXXFLAGS = $(UNITTEST_CXXFLAGS) -DTEST_LIBRBD_INTERNALS unittest_librbd_LDADD = \ librbd_test.la librbd_api.la librbd_internal.la $(LIBRBD_TYPES) \ libcls_rbd_client.la libcls_lock_client.la \ + libjournal.la libcls_journal_client.la \ librados_test_stub.la librados_internal.la \ $(LIBOSDC) $(UNITTEST_LDADD) \ $(CEPH_GLOBAL) $(RADOS_TEST_LDADD) @@ -367,6 +368,7 @@ ceph_test_librbd_CXXFLAGS = $(UNITTEST_CXXFLAGS) -DTEST_LIBRBD_INTERNALS ceph_test_librbd_LDADD = \ librbd_test.la librbd_api.la librbd_internal.la $(LIBRBD_TYPES) \ libcls_rbd_client.la libcls_lock_client.la \ + libjournal.la libcls_journal_client.la \ librados_api.la $(LIBRADOS_DEPS) $(UNITTEST_LDADD) \ $(CEPH_GLOBAL) $(RADOS_TEST_LDADD) bin_DEBUGPROGRAMS += ceph_test_librbd