namespace librbd {
+namespace {
+
+struct C_DiscardJournalCommit : public Context {
+ typedef std::vector<ObjectExtent> ObjectExtents;
+
+ ImageCtx &image_ctx;
+ AioCompletion *aio_comp;
+ ObjectExtents object_extents;
+
+ C_DiscardJournalCommit(ImageCtx &_image_ctx, AioCompletion *_aio_comp,
+ const ObjectExtents &_object_extents, uint64_t tid)
+ : image_ctx(_image_ctx), aio_comp(_aio_comp),
+ object_extents(_object_extents) {
+ CephContext *cct = image_ctx.cct;
+ ldout(cct, 20) << this << " C_DiscardJournalCommit: "
+ << "delaying cache discard until journal tid " << tid << " "
+ << "safe" << dendl;
+
+ aio_comp->add_request();
+ }
+
+ virtual void finish(int r) {
+ CephContext *cct = image_ctx.cct;
+ ldout(cct, 20) << this << " C_DiscardJournalCommit: "
+ << "journal committed: discarding from cache" << dendl;
+
+ Mutex::Locker cache_locker(image_ctx.cache_lock);
+ image_ctx.object_cacher->discard_set(image_ctx.object_set, object_extents);
+ aio_comp->complete_request(cct, r);
+ }
+};
+
+} // anonymous namespace
+
void AioImageRequest::aio_read(
ImageCtx *ictx, AioCompletion *c,
const std::vector<std::pair<uint64_t,uint64_t> > &extents,
}
if (m_image_ctx.object_cacher != NULL) {
- send_cache_requests(object_extents, snapc, journal_tid);
+ send_cache_requests(object_extents, journal_tid);
}
update_stats(clip_len);
ldout(cct, 20) << " oid " << p->oid << " " << p->offset << "~" << p->length
<< " from " << p->buffer_extents << dendl;
C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
- AioObjectRequest *request = send_object_request(*p, snapc, req_comp);
+ AioObjectRequest *request = create_object_request(*p, snapc, req_comp);
// if journaling, stash the request for later; otherwise send
if (request != NULL) {
journal::EventEntry event_entry(journal::AioWriteEvent(m_off, m_len, bl));
return m_image_ctx.journal->append_event(m_aio_comp, event_entry, requests,
- synchronous);
+ m_off, m_len, synchronous);
}
void AioImageWrite::send_cache_requests(const ObjectExtents &object_extents,
- const ::SnapContext &snapc,
uint64_t journal_tid) {
CephContext *cct = m_image_ctx.cct;
-
for (ObjectExtents::const_iterator p = object_extents.begin();
p != object_extents.end(); ++p) {
const ObjectExtent &object_extent = *p;
bufferlist bl;
assemble_extent(object_extent, &bl);
- // TODO pass journal_tid to object cacher
C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
m_image_ctx.write_to_cache(object_extent.oid, bl, object_extent.length,
- object_extent.offset, req_comp, m_op_flags);
+ object_extent.offset, req_comp, m_op_flags,
+ journal_tid);
}
}
-AioObjectRequest *AioImageWrite::send_object_request(
+void AioImageWrite::send_object_requests(
+ const ObjectExtents &object_extents, const ::SnapContext &snapc,
+ AioObjectRequests *aio_object_requests) {
+ // cache handles creating object requests during writeback
+ if (m_image_ctx.object_cacher == NULL) {
+ AbstractAioImageWrite::send_object_requests(object_extents, snapc,
+ aio_object_requests);
+ }
+}
+
+AioObjectRequest *AioImageWrite::create_object_request(
const ObjectExtent &object_extent, const ::SnapContext &snapc,
Context *on_finish) {
- if (m_image_ctx.object_cacher != NULL) {
- return NULL;
- }
+ assert(m_image_ctx.object_cacher == NULL);
bufferlist bl;
assemble_extent(object_extent, &bl);
const AioObjectRequests &requests, bool synchronous) {
journal::EventEntry event_entry(journal::AioDiscardEvent(m_off, m_len));
return m_image_ctx.journal->append_event(m_aio_comp, event_entry, requests,
- synchronous);
+ m_off, m_len, synchronous);
}
void AioImageDiscard::send_cache_requests(const ObjectExtents &object_extents,
- const ::SnapContext &snapc,
uint64_t journal_tid) {
- // TODO need to have cache flag pending discard for writeback or need
- // to delay cache update until after journal commits
- Mutex::Locker cache_locker(m_image_ctx.cache_lock);
-
- // TODO pass journal_tid to object cacher
- m_image_ctx.object_cacher->discard_set(m_image_ctx.object_set,
- object_extents);
+ if (journal_tid == 0) {
+ Mutex::Locker cache_locker(m_image_ctx.cache_lock);
+ m_image_ctx.object_cacher->discard_set(m_image_ctx.object_set,
+ object_extents);
+ } else {
+ // cannot discard from cache until journal has committed
+ assert(m_image_ctx.journal != NULL);
+ m_image_ctx.journal->wait_event(
+ journal_tid, new C_DiscardJournalCommit(m_image_ctx, m_aio_comp,
+ object_extents, journal_tid));
+ }
}
-AioObjectRequest *AioImageDiscard::send_object_request(
+AioObjectRequest *AioImageDiscard::create_object_request(
const ObjectExtent &object_extent, const ::SnapContext &snapc,
Context *on_finish) {
CephContext *cct = m_image_ctx.cct;
if (m_image_ctx.journal != NULL) {
m_image_ctx.journal->append_event(
m_aio_comp, journal::EventEntry(journal::AioFlushEvent()),
- AioObjectRequests(), true);
+ AioObjectRequests(), 0, 0, true);
}
}
virtual void send_request();
virtual void send_cache_requests(const ObjectExtents &object_extents,
- const ::SnapContext &snapc,
uint64_t journal_tid) = 0;
- void send_object_requests(const ObjectExtents &object_extents,
- const ::SnapContext &snapc,
- AioObjectRequests *aio_object_requests);
- virtual AioObjectRequest *send_object_request(
+ virtual void send_object_requests(const ObjectExtents &object_extents,
+ const ::SnapContext &snapc,
+ AioObjectRequests *aio_object_requests);
+ virtual AioObjectRequest *create_object_request(
const ObjectExtent &object_extent, const ::SnapContext &snapc,
Context *on_finish) = 0;
void assemble_extent(const ObjectExtent &object_extent, bufferlist *bl);
virtual void send_cache_requests(const ObjectExtents &object_extents,
- const ::SnapContext &snapc,
uint64_t journal_tid);
- virtual AioObjectRequest *send_object_request(
+ virtual void send_object_requests(const ObjectExtents &object_extents,
+ const ::SnapContext &snapc,
+ AioObjectRequests *aio_object_requests);
+ virtual AioObjectRequest *create_object_request(
const ObjectExtent &object_extent, const ::SnapContext &snapc,
Context *on_finish);
}
virtual void send_cache_requests(const ObjectExtents &object_extents,
- const ::SnapContext &snapc,
uint64_t journal_tid);
- virtual AioObjectRequest *send_object_request(
+ virtual AioObjectRequest *create_object_request(
const ObjectExtent &object_extent, const ::SnapContext &snapc,
Context *on_finish);
void ImageCtx::write_to_cache(object_t o, const bufferlist& bl, size_t len,
uint64_t off, Context *onfinish,
- int fadvise_flags) {
+ int fadvise_flags, uint64_t journal_tid) {
snap_lock.get_read();
ObjectCacher::OSDWrite *wr = object_cacher->prepare_write(snapc, bl,
utime_t(),
fadvise_flags,
- 0);
+ journal_tid);
snap_lock.put_read();
ObjectExtent extent(o, 0, off, len, 0);
extent.oloc.pool = data_ctx.get_id();
size_t len, uint64_t off, Context *onfinish,
int fadvise_flags);
void write_to_cache(object_t o, const bufferlist& bl, size_t len,
- uint64_t off, Context *onfinish, int fadvise_flags);
+ uint64_t off, Context *onfinish, int fadvise_flags,
+ uint64_t journal_tid);
void user_flushed();
void flush_cache_aio(Context *onfinish);
int flush_cache();
: m_image_ctx(image_ctx), m_journaler(NULL),
m_lock("Journal::m_lock"), m_state(STATE_UNINITIALIZED),
m_lock_listener(this), m_replay_handler(this), m_close_pending(false),
- m_next_tid(0), m_blocking_writes(false) {
+ m_event_tid(0), m_blocking_writes(false) {
ldout(m_image_ctx.cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
uint64_t Journal::append_event(AioCompletion *aio_comp,
const journal::EventEntry &event_entry,
const AioObjectRequests &requests,
+ uint64_t offset, size_t length,
bool flush_entry) {
assert(m_image_ctx.owner_lock.is_locked());
assert(m_state == STATE_RECORDING);
uint64_t tid;
{
Mutex::Locker locker(m_lock);
- tid = m_next_tid++;
- m_events[tid] = Event(future, aio_comp, requests);
+ tid = ++m_event_tid;
+ assert(tid != 0);
+
+ m_events[tid] = Event(future, aio_comp, requests, offset, length);
}
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": "
<< "event=" << event_entry.get_event_type() << ", "
<< "new_reqs=" << requests.size() << ", "
+ << "offset=" << offset << ", "
+ << "length=" << length << ", "
<< "flush=" << flush_entry << ", tid=" << tid << dendl;
Context *on_safe = new C_EventSafe(this, tid);
return tid;
}
+void Journal::commit_event(uint64_t tid, int r) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
+ "r=" << r << dendl;
+
+ Mutex::Locker locker(m_lock);
+ Events::iterator it = m_events.find(tid);
+ if (it == m_events.end()) {
+ return;
+ }
+ complete_event(it, r);
+}
+
+void Journal::commit_event_extent(uint64_t tid, uint64_t offset,
+ uint64_t length, int r) {
+ assert(length > 0);
+
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
+ << "offset=" << offset << ", "
+ << "length=" << length << ", "
+ << "r=" << r << dendl;
+
+ Mutex::Locker locker(m_lock);
+ Events::iterator it = m_events.find(tid);
+ if (it == m_events.end()) {
+ return;
+ }
+
+ Event &event = it->second;
+ if (event.ret_val == 0 && r < 0) {
+ event.ret_val = r;
+ }
+
+ ExtentInterval extent;
+ extent.insert(offset, length);
+
+ ExtentInterval intersect;
+ intersect.intersection_of(extent, event.pending_extents);
+
+ event.pending_extents.subtract(intersect);
+ if (!event.pending_extents.empty()) {
+ ldout(cct, 20) << "pending extents: " << event.pending_extents << dendl;
+ return;
+ }
+ complete_event(it, event.ret_val);
+}
+
+void Journal::flush_event(uint64_t tid, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
+ << "on_safe=" << on_safe << dendl;
+
+ ::journal::Future future;
+ {
+ Mutex::Locker locker(m_lock);
+ future = wait_event(m_lock, tid, on_safe);
+ }
+
+ if (future.is_valid()) {
+ future.flush(NULL);
+ }
+}
+
+void Journal::wait_event(uint64_t tid, Context *on_safe) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
+ << "on_safe=" << on_safe << dendl;
+
+ Mutex::Locker locker(m_lock);
+ wait_event(m_lock, tid, on_safe);
+}
+
+::journal::Future Journal::wait_event(Mutex &lock, uint64_t tid,
+ Context *on_safe) {
+ assert(m_lock.is_locked());
+ CephContext *cct = m_image_ctx.cct;
+
+ Events::iterator it = m_events.find(tid);
+ if (it == m_events.end() || it->second.safe) {
+ // journal entry already safe
+ ldout(cct, 20) << "journal entry already safe" << dendl;
+ m_image_ctx.op_work_queue->queue(on_safe, 0);
+ return ::journal::Future();
+ }
+
+ Event &event = it->second;
+ event.on_safe_contexts.push_back(on_safe);
+ return event.future;
+}
+
void Journal::create_journaler() {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
transition_state(STATE_UNINITIALIZED);
}
+void Journal::complete_event(Events::iterator it, int r) {
+ assert(m_lock.is_locked());
+
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": tid=" << it->first << " "
+ << "r=" << r << dendl;
+
+ // TODO
+
+ m_events.erase(it);
+}
+
void Journal::handle_initialized(int r) {
CephContext *cct = m_image_ctx.cct;
if (r < 0) {
ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
<< "tid=" << tid << dendl;
- RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
-
+ // TODO: ensure this callback never sees a failure
AioCompletion *aio_comp;
AioObjectRequests aio_object_requests;
Contexts on_safe_contexts;
aio_comp = event.aio_comp;
aio_object_requests.swap(event.aio_object_requests);
on_safe_contexts.swap(event.on_safe_contexts);
- m_events.erase(it);
+
+ if (event.pending_extents.empty()) {
+ m_events.erase(it);
+ } else {
+ event.safe = true;
+ }
}
ldout(cct, 20) << "completing tid=" << tid << dendl;
- assert(m_image_ctx.image_watcher->is_lock_owner());
-
if (r < 0) {
// don't send aio requests if the journal fails -- bubble error up
aio_comp->fail(cct, r);
} else {
// send any waiting aio requests now that journal entry is safe
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+ assert(m_image_ctx.image_watcher->is_lock_owner());
+
for (AioObjectRequests::iterator it = aio_object_requests.begin();
it != aio_object_requests.end(); ++it) {
(*it)->send();
#include "include/int_types.h"
#include "include/Context.h"
+#include "include/interval_set.h"
#include "include/unordered_map.h"
#include "include/rados/librados.hpp"
#include "common/Mutex.h"
#include "journal/Future.h"
#include "journal/ReplayHandler.h"
#include "librbd/ImageWatcher.h"
+#include <algorithm>
#include <list>
#include <string>
uint64_t append_event(AioCompletion *aio_comp,
const journal::EventEntry &event_entry,
const AioObjectRequests &requests,
+ uint64_t offset, size_t length,
bool flush_entry);
+ void commit_event(uint64_t tid, int r);
+ void commit_event_extent(uint64_t tid, uint64_t offset, uint64_t length,
+ int r);
+
+ void flush_event(uint64_t tid, Context *on_safe);
+ void wait_event(uint64_t tid, Context *on_safe);
+
private:
typedef std::list<Context *> Contexts;
+ typedef interval_set<uint64_t> ExtentInterval;
enum State {
STATE_UNINITIALIZED,
AioCompletion *aio_comp;
AioObjectRequests aio_object_requests;
Contexts on_safe_contexts;
+ ExtentInterval pending_extents;
+ bool safe;
+ int ret_val;
Event() : aio_comp(NULL) {
}
Event(const ::journal::Future &_future, AioCompletion *_aio_comp,
- const AioObjectRequests &_requests)
- : future(_future), aio_comp(_aio_comp), aio_object_requests(_requests) {
+ const AioObjectRequests &_requests, uint64_t offset, size_t length)
+ : future(_future), aio_comp(_aio_comp), aio_object_requests(_requests),
+ safe(false), ret_val(0) {
+ if (length > 0) {
+ pending_extents.insert(offset, length);
+ }
}
};
typedef ceph::unordered_map<uint64_t, Event> Events;
ReplayHandler m_replay_handler;
bool m_close_pending;
- uint64_t m_next_tid;
+ uint64_t m_event_tid;
Events m_events;
bool m_blocking_writes;
+ ::journal::Future wait_event(Mutex &lock, uint64_t tid, Context *on_safe);
+
void create_journaler();
void destroy_journaler();
+ void complete_event(Events::iterator it, int r);
+
void handle_initialized(int r);
void handle_replay_ready();
#include "librbd/LibrbdWriteback.h"
#include "librbd/AioCompletion.h"
#include "librbd/ObjectMap.h"
+#include "librbd/Journal.h"
#include "include/assert.h"
LibrbdWriteback *m_wb_handler;
};
+ struct C_WriteJournalCommit : public Context {
+ typedef std::vector<std::pair<uint64_t,uint64_t> > Extents;
+
+ ImageCtx *image_ctx;
+ std::string oid;
+ uint64_t object_no;
+ uint64_t off;
+ bufferlist bl;
+ SnapContext snapc;
+ Context *req_comp;
+ uint64_t journal_tid;
+ bool request_sent;
+
+ C_WriteJournalCommit(ImageCtx *_image_ctx, const std::string &_oid,
+ uint64_t _object_no, uint64_t _off,
+ const bufferlist &_bl, const SnapContext& _snapc,
+ Context *_req_comp, uint64_t _journal_tid)
+ : image_ctx(_image_ctx), oid(_oid), object_no(_object_no), off(_off),
+ bl(_bl), snapc(_snapc), req_comp(_req_comp), journal_tid(_journal_tid),
+ request_sent(false) {
+ CephContext *cct = image_ctx->cct;
+ ldout(cct, 20) << this << " C_WriteJournalCommit: "
+ << "delaying write until journal tid "
+ << journal_tid << " safe" << dendl;
+ }
+
+ virtual void complete(int r) {
+ if (request_sent || r < 0) {
+ commit_event_extent(r);
+ req_comp->complete(r);
+ delete this;
+ } else {
+ send_request();
+ }
+ }
+
+ virtual void finish(int r) {
+ }
+
+ void commit_event_extent(int r) {
+ CephContext *cct = image_ctx->cct;
+ ldout(cct, 20) << this << " C_WriteJournalCommit: "
+ << "write committed: updating journal commit position"
+ << dendl;
+
+ // all IO operations are flushed prior to closing the journal
+ assert(image_ctx->journal != NULL);
+
+ Extents file_extents;
+ Striper::extent_to_file(cct, &image_ctx->layout, object_no, off,
+ bl.length(), file_extents);
+ for (Extents::iterator it = file_extents.begin();
+ it != file_extents.end(); ++it) {
+ image_ctx->journal->commit_event_extent(journal_tid, it->first,
+ it->second, r);
+ }
+ }
+
+ void send_request() {
+ CephContext *cct = image_ctx->cct;
+ ldout(cct, 20) << this << " C_WriteJournalCommit: "
+ << "journal committed: sending write request" << dendl;
+
+ RWLock::RLocker owner_locker(image_ctx->owner_lock);
+ assert(image_ctx->image_watcher->is_lock_owner());
+
+ request_sent = true;
+ AioObjectWrite *req = new AioObjectWrite(image_ctx, oid, object_no, off,
+ bl, snapc, this);
+ req->send();
+ }
+ };
+
LibrbdWriteback::LibrbdWriteback(ImageCtx *ictx, Mutex& lock)
: m_finisher(new Finisher(ictx->cct)), m_tid(0), m_lock(lock), m_ictx(ictx)
{
m_writes[oid.name].push(result);
ldout(m_ictx->cct, 20) << "write will wait for result " << result << dendl;
C_OrderedWrite *req_comp = new C_OrderedWrite(m_ictx->cct, result, this);
- AioObjectWrite *req = new AioObjectWrite(m_ictx, oid.name, object_no, off,
- bl, snapc, req_comp);
- req->send();
+
+ // all IO operations are flushed prior to closing the journal
+ assert(journal_tid == 0 || m_ictx->journal != NULL);
+ if (journal_tid != 0) {
+ m_ictx->journal->flush_event(
+ journal_tid, new C_WriteJournalCommit(m_ictx, oid.name, object_no, off,
+ bl, snapc, req_comp,
+ journal_tid));
+ } else {
+ AioObjectWrite *req = new AioObjectWrite(m_ictx, oid.name, object_no, off,
+ bl, snapc, req_comp);
+ req->send();
+ }
return ++m_tid;
}
void LibrbdWriteback::overwrite_extent(const object_t& oid, uint64_t off,
uint64_t len, ceph_tid_t journal_tid) {
- assert(journal_tid != 0);
+ typedef std::vector<std::pair<uint64_t,uint64_t> > Extents;
+
+ assert(m_ictx->owner_lock.is_locked());
+ uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
+
+ // all IO operations are flushed prior to closing the journal
+ assert(journal_tid != 0 && m_ictx->journal != NULL);
- // TODO inform the journal that we no longer expect to receive writebacks
- // for the specified extent
+ Extents file_extents;
+ Striper::extent_to_file(m_ictx->cct, &m_ictx->layout, object_no, off,
+ len, file_extents);
+ for (Extents::iterator it = file_extents.begin();
+ it != file_extents.end(); ++it) {
+ m_ictx->journal->commit_event_extent(journal_tid, it->first, it->second,
+ 0);
+ }
}
void LibrbdWriteback::get_client_lock() {