From: Jason Dillaman Date: Wed, 15 Jul 2015 17:18:16 +0000 (-0400) Subject: librbd: integrate cache with journal X-Git-Tag: v10.0.1~52^2~17 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=00390414699f290275d40f3fa46b58fccaab0ca6;p=ceph.git librbd: integrate cache with journal Cache writeback should be delayed until after journal event has been commmitted to disk. Signed-off-by: Jason Dillaman --- diff --git a/src/librbd/AioImageRequest.cc b/src/librbd/AioImageRequest.cc index dc6d87f24fd5..a902adcaf4f4 100644 --- a/src/librbd/AioImageRequest.cc +++ b/src/librbd/AioImageRequest.cc @@ -18,6 +18,40 @@ namespace librbd { +namespace { + +struct C_DiscardJournalCommit : public Context { + typedef std::vector ObjectExtents; + + ImageCtx &image_ctx; + AioCompletion *aio_comp; + ObjectExtents object_extents; + + C_DiscardJournalCommit(ImageCtx &_image_ctx, AioCompletion *_aio_comp, + const ObjectExtents &_object_extents, uint64_t tid) + : image_ctx(_image_ctx), aio_comp(_aio_comp), + object_extents(_object_extents) { + CephContext *cct = image_ctx.cct; + ldout(cct, 20) << this << " C_DiscardJournalCommit: " + << "delaying cache discard until journal tid " << tid << " " + << "safe" << dendl; + + aio_comp->add_request(); + } + + virtual void finish(int r) { + CephContext *cct = image_ctx.cct; + ldout(cct, 20) << this << " C_DiscardJournalCommit: " + << "journal committed: discarding from cache" << dendl; + + Mutex::Locker cache_locker(image_ctx.cache_lock); + image_ctx.object_cacher->discard_set(image_ctx.object_set, object_extents); + aio_comp->complete_request(cct, r); + } +}; + +} // anonymous namespace + void AioImageRequest::aio_read( ImageCtx *ictx, AioCompletion *c, const std::vector > &extents, @@ -196,7 +230,7 @@ void AbstractAioImageWrite::send_request() { } if (m_image_ctx.object_cacher != NULL) { - send_cache_requests(object_extents, snapc, journal_tid); + send_cache_requests(object_extents, journal_tid); } update_stats(clip_len); @@ -214,7 +248,7 @@ void AbstractAioImageWrite::send_object_requests( ldout(cct, 20) << " oid " << p->oid << " " << p->offset << "~" << p->length << " from " << p->buffer_extents << dendl; C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp); - AioObjectRequest *request = send_object_request(*p, snapc, req_comp); + AioObjectRequest *request = create_object_request(*p, snapc, req_comp); // if journaling, stash the request for later; otherwise send if (request != NULL) { @@ -242,14 +276,12 @@ uint64_t AioImageWrite::append_journal_event( journal::EventEntry event_entry(journal::AioWriteEvent(m_off, m_len, bl)); return m_image_ctx.journal->append_event(m_aio_comp, event_entry, requests, - synchronous); + m_off, m_len, synchronous); } void AioImageWrite::send_cache_requests(const ObjectExtents &object_extents, - const ::SnapContext &snapc, uint64_t journal_tid) { CephContext *cct = m_image_ctx.cct; - for (ObjectExtents::const_iterator p = object_extents.begin(); p != object_extents.end(); ++p) { const ObjectExtent &object_extent = *p; @@ -257,19 +289,27 @@ void AioImageWrite::send_cache_requests(const ObjectExtents &object_extents, bufferlist bl; assemble_extent(object_extent, &bl); - // TODO pass journal_tid to object cacher C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp); m_image_ctx.write_to_cache(object_extent.oid, bl, object_extent.length, - object_extent.offset, req_comp, m_op_flags); + object_extent.offset, req_comp, m_op_flags, + journal_tid); } } -AioObjectRequest *AioImageWrite::send_object_request( +void AioImageWrite::send_object_requests( + const ObjectExtents &object_extents, const ::SnapContext &snapc, + AioObjectRequests *aio_object_requests) { + // cache handles creating object requests during writeback + if (m_image_ctx.object_cacher == NULL) { + AbstractAioImageWrite::send_object_requests(object_extents, snapc, + aio_object_requests); + } +} + +AioObjectRequest *AioImageWrite::create_object_request( const ObjectExtent &object_extent, const ::SnapContext &snapc, Context *on_finish) { - if (m_image_ctx.object_cacher != NULL) { - return NULL; - } + assert(m_image_ctx.object_cacher == NULL); bufferlist bl; assemble_extent(object_extent, &bl); @@ -291,22 +331,25 @@ uint64_t AioImageDiscard::append_journal_event( const AioObjectRequests &requests, bool synchronous) { journal::EventEntry event_entry(journal::AioDiscardEvent(m_off, m_len)); return m_image_ctx.journal->append_event(m_aio_comp, event_entry, requests, - synchronous); + m_off, m_len, synchronous); } void AioImageDiscard::send_cache_requests(const ObjectExtents &object_extents, - const ::SnapContext &snapc, uint64_t journal_tid) { - // TODO need to have cache flag pending discard for writeback or need - // to delay cache update until after journal commits - Mutex::Locker cache_locker(m_image_ctx.cache_lock); - - // TODO pass journal_tid to object cacher - m_image_ctx.object_cacher->discard_set(m_image_ctx.object_set, - object_extents); + if (journal_tid == 0) { + Mutex::Locker cache_locker(m_image_ctx.cache_lock); + m_image_ctx.object_cacher->discard_set(m_image_ctx.object_set, + object_extents); + } else { + // cannot discard from cache until journal has committed + assert(m_image_ctx.journal != NULL); + m_image_ctx.journal->wait_event( + journal_tid, new C_DiscardJournalCommit(m_image_ctx, m_aio_comp, + object_extents, journal_tid)); + } } -AioObjectRequest *AioImageDiscard::send_object_request( +AioObjectRequest *AioImageDiscard::create_object_request( const ObjectExtent &object_extent, const ::SnapContext &snapc, Context *on_finish) { CephContext *cct = m_image_ctx.cct; @@ -346,7 +389,7 @@ void AioImageFlush::send_request() { if (m_image_ctx.journal != NULL) { m_image_ctx.journal->append_event( m_aio_comp, journal::EventEntry(journal::AioFlushEvent()), - AioObjectRequests(), true); + AioObjectRequests(), 0, 0, true); } } diff --git a/src/librbd/AioImageRequest.h b/src/librbd/AioImageRequest.h index ab8053a03f29..6fdc9f9712df 100644 --- a/src/librbd/AioImageRequest.h +++ b/src/librbd/AioImageRequest.h @@ -109,13 +109,12 @@ protected: virtual void send_request(); virtual void send_cache_requests(const ObjectExtents &object_extents, - const ::SnapContext &snapc, uint64_t journal_tid) = 0; - void send_object_requests(const ObjectExtents &object_extents, - const ::SnapContext &snapc, - AioObjectRequests *aio_object_requests); - virtual AioObjectRequest *send_object_request( + virtual void send_object_requests(const ObjectExtents &object_extents, + const ::SnapContext &snapc, + AioObjectRequests *aio_object_requests); + virtual AioObjectRequest *create_object_request( const ObjectExtent &object_extent, const ::SnapContext &snapc, Context *on_finish) = 0; @@ -146,10 +145,12 @@ protected: void assemble_extent(const ObjectExtent &object_extent, bufferlist *bl); virtual void send_cache_requests(const ObjectExtents &object_extents, - const ::SnapContext &snapc, uint64_t journal_tid); - virtual AioObjectRequest *send_object_request( + virtual void send_object_requests(const ObjectExtents &object_extents, + const ::SnapContext &snapc, + AioObjectRequests *aio_object_requests); + virtual AioObjectRequest *create_object_request( const ObjectExtent &object_extent, const ::SnapContext &snapc, Context *on_finish); @@ -177,10 +178,9 @@ protected: } virtual void send_cache_requests(const ObjectExtents &object_extents, - const ::SnapContext &snapc, uint64_t journal_tid); - virtual AioObjectRequest *send_object_request( + virtual AioObjectRequest *create_object_request( const ObjectExtent &object_extent, const ::SnapContext &snapc, Context *on_finish); diff --git a/src/librbd/ImageCtx.cc b/src/librbd/ImageCtx.cc index 86cd49e88bab..a7ce73a168b6 100644 --- a/src/librbd/ImageCtx.cc +++ b/src/librbd/ImageCtx.cc @@ -625,12 +625,12 @@ public: void ImageCtx::write_to_cache(object_t o, const bufferlist& bl, size_t len, uint64_t off, Context *onfinish, - int fadvise_flags) { + int fadvise_flags, uint64_t journal_tid) { snap_lock.get_read(); ObjectCacher::OSDWrite *wr = object_cacher->prepare_write(snapc, bl, utime_t(), fadvise_flags, - 0); + journal_tid); snap_lock.put_read(); ObjectExtent extent(o, 0, off, len, 0); extent.oloc.pool = data_ctx.get_id(); diff --git a/src/librbd/ImageCtx.h b/src/librbd/ImageCtx.h index 2d41e0114507..df3784a770cd 100644 --- a/src/librbd/ImageCtx.h +++ b/src/librbd/ImageCtx.h @@ -226,7 +226,8 @@ namespace librbd { size_t len, uint64_t off, Context *onfinish, int fadvise_flags); void write_to_cache(object_t o, const bufferlist& bl, size_t len, - uint64_t off, Context *onfinish, int fadvise_flags); + uint64_t off, Context *onfinish, int fadvise_flags, + uint64_t journal_tid); void user_flushed(); void flush_cache_aio(Context *onfinish); int flush_cache(); diff --git a/src/librbd/Journal.cc b/src/librbd/Journal.cc index 0e5f00eba953..1278ef36cf75 100644 --- a/src/librbd/Journal.cc +++ b/src/librbd/Journal.cc @@ -27,7 +27,7 @@ Journal::Journal(ImageCtx &image_ctx) : m_image_ctx(image_ctx), m_journaler(NULL), m_lock("Journal::m_lock"), m_state(STATE_UNINITIALIZED), m_lock_listener(this), m_replay_handler(this), m_close_pending(false), - m_next_tid(0), m_blocking_writes(false) { + m_event_tid(0), m_blocking_writes(false) { ldout(m_image_ctx.cct, 5) << this << ": ictx=" << &m_image_ctx << dendl; @@ -156,6 +156,7 @@ int Journal::close() { uint64_t Journal::append_event(AioCompletion *aio_comp, const journal::EventEntry &event_entry, const AioObjectRequests &requests, + uint64_t offset, size_t length, bool flush_entry) { assert(m_image_ctx.owner_lock.is_locked()); assert(m_state == STATE_RECORDING); @@ -167,14 +168,18 @@ uint64_t Journal::append_event(AioCompletion *aio_comp, uint64_t tid; { Mutex::Locker locker(m_lock); - tid = m_next_tid++; - m_events[tid] = Event(future, aio_comp, requests); + tid = ++m_event_tid; + assert(tid != 0); + + m_events[tid] = Event(future, aio_comp, requests, offset, length); } CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << ": " << "event=" << event_entry.get_event_type() << ", " << "new_reqs=" << requests.size() << ", " + << "offset=" << offset << ", " + << "length=" << length << ", " << "flush=" << flush_entry << ", tid=" << tid << dendl; Context *on_safe = new C_EventSafe(this, tid); @@ -186,6 +191,97 @@ uint64_t Journal::append_event(AioCompletion *aio_comp, return tid; } +void Journal::commit_event(uint64_t tid, int r) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", " + "r=" << r << dendl; + + Mutex::Locker locker(m_lock); + Events::iterator it = m_events.find(tid); + if (it == m_events.end()) { + return; + } + complete_event(it, r); +} + +void Journal::commit_event_extent(uint64_t tid, uint64_t offset, + uint64_t length, int r) { + assert(length > 0); + + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", " + << "offset=" << offset << ", " + << "length=" << length << ", " + << "r=" << r << dendl; + + Mutex::Locker locker(m_lock); + Events::iterator it = m_events.find(tid); + if (it == m_events.end()) { + return; + } + + Event &event = it->second; + if (event.ret_val == 0 && r < 0) { + event.ret_val = r; + } + + ExtentInterval extent; + extent.insert(offset, length); + + ExtentInterval intersect; + intersect.intersection_of(extent, event.pending_extents); + + event.pending_extents.subtract(intersect); + if (!event.pending_extents.empty()) { + ldout(cct, 20) << "pending extents: " << event.pending_extents << dendl; + return; + } + complete_event(it, event.ret_val); +} + +void Journal::flush_event(uint64_t tid, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", " + << "on_safe=" << on_safe << dendl; + + ::journal::Future future; + { + Mutex::Locker locker(m_lock); + future = wait_event(m_lock, tid, on_safe); + } + + if (future.is_valid()) { + future.flush(NULL); + } +} + +void Journal::wait_event(uint64_t tid, Context *on_safe) { + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", " + << "on_safe=" << on_safe << dendl; + + Mutex::Locker locker(m_lock); + wait_event(m_lock, tid, on_safe); +} + +::journal::Future Journal::wait_event(Mutex &lock, uint64_t tid, + Context *on_safe) { + assert(m_lock.is_locked()); + CephContext *cct = m_image_ctx.cct; + + Events::iterator it = m_events.find(tid); + if (it == m_events.end() || it->second.safe) { + // journal entry already safe + ldout(cct, 20) << "journal entry already safe" << dendl; + m_image_ctx.op_work_queue->queue(on_safe, 0); + return ::journal::Future(); + } + + Event &event = it->second; + event.on_safe_contexts.push_back(on_safe); + return event.future; +} + void Journal::create_journaler() { CephContext *cct = m_image_ctx.cct; ldout(cct, 20) << this << " " << __func__ << dendl; @@ -214,6 +310,18 @@ void Journal::destroy_journaler() { transition_state(STATE_UNINITIALIZED); } +void Journal::complete_event(Events::iterator it, int r) { + assert(m_lock.is_locked()); + + CephContext *cct = m_image_ctx.cct; + ldout(cct, 20) << this << " " << __func__ << ": tid=" << it->first << " " + << "r=" << r << dendl; + + // TODO + + m_events.erase(it); +} + void Journal::handle_initialized(int r) { CephContext *cct = m_image_ctx.cct; if (r < 0) { @@ -298,8 +406,7 @@ void Journal::handle_event_safe(int r, uint64_t tid) { ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", " << "tid=" << tid << dendl; - RWLock::RLocker owner_locker(m_image_ctx.owner_lock); - + // TODO: ensure this callback never sees a failure AioCompletion *aio_comp; AioObjectRequests aio_object_requests; Contexts on_safe_contexts; @@ -312,18 +419,24 @@ void Journal::handle_event_safe(int r, uint64_t tid) { aio_comp = event.aio_comp; aio_object_requests.swap(event.aio_object_requests); on_safe_contexts.swap(event.on_safe_contexts); - m_events.erase(it); + + if (event.pending_extents.empty()) { + m_events.erase(it); + } else { + event.safe = true; + } } ldout(cct, 20) << "completing tid=" << tid << dendl; - assert(m_image_ctx.image_watcher->is_lock_owner()); - if (r < 0) { // don't send aio requests if the journal fails -- bubble error up aio_comp->fail(cct, r); } else { // send any waiting aio requests now that journal entry is safe + RWLock::RLocker owner_locker(m_image_ctx.owner_lock); + assert(m_image_ctx.image_watcher->is_lock_owner()); + for (AioObjectRequests::iterator it = aio_object_requests.begin(); it != aio_object_requests.end(); ++it) { (*it)->send(); diff --git a/src/librbd/Journal.h b/src/librbd/Journal.h index 35c173894439..4ac2bee97c62 100644 --- a/src/librbd/Journal.h +++ b/src/librbd/Journal.h @@ -6,6 +6,7 @@ #include "include/int_types.h" #include "include/Context.h" +#include "include/interval_set.h" #include "include/unordered_map.h" #include "include/rados/librados.hpp" #include "common/Mutex.h" @@ -13,6 +14,7 @@ #include "journal/Future.h" #include "journal/ReplayHandler.h" #include "librbd/ImageWatcher.h" +#include #include #include @@ -49,10 +51,19 @@ public: uint64_t append_event(AioCompletion *aio_comp, const journal::EventEntry &event_entry, const AioObjectRequests &requests, + uint64_t offset, size_t length, bool flush_entry); + void commit_event(uint64_t tid, int r); + void commit_event_extent(uint64_t tid, uint64_t offset, uint64_t length, + int r); + + void flush_event(uint64_t tid, Context *on_safe); + void wait_event(uint64_t tid, Context *on_safe); + private: typedef std::list Contexts; + typedef interval_set ExtentInterval; enum State { STATE_UNINITIALIZED, @@ -66,12 +77,19 @@ private: AioCompletion *aio_comp; AioObjectRequests aio_object_requests; Contexts on_safe_contexts; + ExtentInterval pending_extents; + bool safe; + int ret_val; Event() : aio_comp(NULL) { } Event(const ::journal::Future &_future, AioCompletion *_aio_comp, - const AioObjectRequests &_requests) - : future(_future), aio_comp(_aio_comp), aio_object_requests(_requests) { + const AioObjectRequests &_requests, uint64_t offset, size_t length) + : future(_future), aio_comp(_aio_comp), aio_object_requests(_requests), + safe(false), ret_val(0) { + if (length > 0) { + pending_extents.insert(offset, length); + } } }; typedef ceph::unordered_map Events; @@ -149,14 +167,18 @@ private: ReplayHandler m_replay_handler; bool m_close_pending; - uint64_t m_next_tid; + uint64_t m_event_tid; Events m_events; bool m_blocking_writes; + ::journal::Future wait_event(Mutex &lock, uint64_t tid, Context *on_safe); + void create_journaler(); void destroy_journaler(); + void complete_event(Events::iterator it, int r); + void handle_initialized(int r); void handle_replay_ready(); diff --git a/src/librbd/LibrbdWriteback.cc b/src/librbd/LibrbdWriteback.cc index b55eb3ffa203..1e68214273bd 100644 --- a/src/librbd/LibrbdWriteback.cc +++ b/src/librbd/LibrbdWriteback.cc @@ -17,6 +17,7 @@ #include "librbd/LibrbdWriteback.h" #include "librbd/AioCompletion.h" #include "librbd/ObjectMap.h" +#include "librbd/Journal.h" #include "include/assert.h" @@ -91,6 +92,79 @@ namespace librbd { LibrbdWriteback *m_wb_handler; }; + struct C_WriteJournalCommit : public Context { + typedef std::vector > Extents; + + ImageCtx *image_ctx; + std::string oid; + uint64_t object_no; + uint64_t off; + bufferlist bl; + SnapContext snapc; + Context *req_comp; + uint64_t journal_tid; + bool request_sent; + + C_WriteJournalCommit(ImageCtx *_image_ctx, const std::string &_oid, + uint64_t _object_no, uint64_t _off, + const bufferlist &_bl, const SnapContext& _snapc, + Context *_req_comp, uint64_t _journal_tid) + : image_ctx(_image_ctx), oid(_oid), object_no(_object_no), off(_off), + bl(_bl), snapc(_snapc), req_comp(_req_comp), journal_tid(_journal_tid), + request_sent(false) { + CephContext *cct = image_ctx->cct; + ldout(cct, 20) << this << " C_WriteJournalCommit: " + << "delaying write until journal tid " + << journal_tid << " safe" << dendl; + } + + virtual void complete(int r) { + if (request_sent || r < 0) { + commit_event_extent(r); + req_comp->complete(r); + delete this; + } else { + send_request(); + } + } + + virtual void finish(int r) { + } + + void commit_event_extent(int r) { + CephContext *cct = image_ctx->cct; + ldout(cct, 20) << this << " C_WriteJournalCommit: " + << "write committed: updating journal commit position" + << dendl; + + // all IO operations are flushed prior to closing the journal + assert(image_ctx->journal != NULL); + + Extents file_extents; + Striper::extent_to_file(cct, &image_ctx->layout, object_no, off, + bl.length(), file_extents); + for (Extents::iterator it = file_extents.begin(); + it != file_extents.end(); ++it) { + image_ctx->journal->commit_event_extent(journal_tid, it->first, + it->second, r); + } + } + + void send_request() { + CephContext *cct = image_ctx->cct; + ldout(cct, 20) << this << " C_WriteJournalCommit: " + << "journal committed: sending write request" << dendl; + + RWLock::RLocker owner_locker(image_ctx->owner_lock); + assert(image_ctx->image_watcher->is_lock_owner()); + + request_sent = true; + AioObjectWrite *req = new AioObjectWrite(image_ctx, oid, object_no, off, + bl, snapc, this); + req->send(); + } + }; + LibrbdWriteback::LibrbdWriteback(ImageCtx *ictx, Mutex& lock) : m_finisher(new Finisher(ictx->cct)), m_tid(0), m_lock(lock), m_ictx(ictx) { @@ -169,19 +243,41 @@ namespace librbd { m_writes[oid.name].push(result); ldout(m_ictx->cct, 20) << "write will wait for result " << result << dendl; C_OrderedWrite *req_comp = new C_OrderedWrite(m_ictx->cct, result, this); - AioObjectWrite *req = new AioObjectWrite(m_ictx, oid.name, object_no, off, - bl, snapc, req_comp); - req->send(); + + // all IO operations are flushed prior to closing the journal + assert(journal_tid == 0 || m_ictx->journal != NULL); + if (journal_tid != 0) { + m_ictx->journal->flush_event( + journal_tid, new C_WriteJournalCommit(m_ictx, oid.name, object_no, off, + bl, snapc, req_comp, + journal_tid)); + } else { + AioObjectWrite *req = new AioObjectWrite(m_ictx, oid.name, object_no, off, + bl, snapc, req_comp); + req->send(); + } return ++m_tid; } void LibrbdWriteback::overwrite_extent(const object_t& oid, uint64_t off, uint64_t len, ceph_tid_t journal_tid) { - assert(journal_tid != 0); + typedef std::vector > Extents; + + assert(m_ictx->owner_lock.is_locked()); + uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix); + + // all IO operations are flushed prior to closing the journal + assert(journal_tid != 0 && m_ictx->journal != NULL); - // TODO inform the journal that we no longer expect to receive writebacks - // for the specified extent + Extents file_extents; + Striper::extent_to_file(m_ictx->cct, &m_ictx->layout, object_no, off, + len, file_extents); + for (Extents::iterator it = file_extents.begin(); + it != file_extents.end(); ++it) { + m_ictx->journal->commit_event_extent(journal_tid, it->first, it->second, + 0); + } } void LibrbdWriteback::get_client_lock() {