// inform the journal that the op has successfully committed
if (journal_tid != 0) {
assert(ictx->journal != NULL);
- ictx->journal->commit_event(journal_tid, rval);
+ ictx->journal->commit_io_event(journal_tid, rval);
}
// note: possible for image to be closed after op marked finished
bl.append(m_buf, m_len);
journal::EventEntry event_entry(journal::AioWriteEvent(m_off, m_len, bl));
- uint64_t tid = m_image_ctx.journal->append_event(m_aio_comp, event_entry,
- requests, m_off, m_len,
- synchronous);
+ uint64_t tid = m_image_ctx.journal->append_io_event(m_aio_comp, event_entry,
+ requests, m_off, m_len,
+ synchronous);
if (m_image_ctx.object_cacher == NULL) {
m_aio_comp->associate_journal_event(tid);
}
uint64_t AioImageDiscard::append_journal_event(
const AioObjectRequests &requests, bool synchronous) {
journal::EventEntry event_entry(journal::AioDiscardEvent(m_off, m_len));
- uint64_t tid = m_image_ctx.journal->append_event(m_aio_comp, event_entry,
- requests, m_off, m_len,
- synchronous);
+ uint64_t tid = m_image_ctx.journal->append_io_event(m_aio_comp, event_entry,
+ requests, m_off, m_len,
+ synchronous);
m_aio_comp->associate_journal_event(tid);
return tid;
}
RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
if (m_image_ctx.journal != NULL &&
!m_image_ctx.journal->is_journal_replaying()) {
- uint64_t journal_tid = m_image_ctx.journal->append_event(
+ uint64_t journal_tid = m_image_ctx.journal->append_io_event(
m_aio_comp, journal::EventEntry(journal::AioFlushEvent()),
AioObjectRequests(), 0, 0, false);
#include "journal/Journaler.h"
#include "journal/ReplayEntry.h"
#include "common/errno.h"
+#include <boost/utility/enable_if.hpp>
+#include <boost/type_traits/is_base_of.hpp>
#define dout_subsys ceph_subsys_rbd
#undef dout_prefix
}
};
+struct SetOpRequestTid : public boost::static_visitor<void> {
+ uint64_t tid;
+
+ SetOpRequestTid(uint64_t _tid) : tid(_tid) {
+ }
+
+ template <typename Event>
+ typename boost::enable_if<boost::is_base_of<journal::OpEventBase, Event>,
+ void>::type
+ operator()(Event &event) const {
+ event.tid = tid;
+ }
+
+ template <typename Event>
+ typename boost::disable_if<boost::is_base_of<journal::OpEventBase, Event>,
+ void>::type
+ operator()(Event &event) const {
+ assert(false);
+ }
+};
+
} // anonymous namespace
Journal::Journal(ImageCtx &image_ctx)
m_image_ctx.op_work_queue->drain();
assert(m_journaler == NULL);
assert(m_journal_replay == NULL);
+ assert(m_wait_for_state_contexts.empty());
m_image_ctx.image_watcher->unregister_listener(&m_lock_listener);
return (m_state == STATE_REPLAYING);
}
-bool Journal::wait_for_journal_ready() {
+void Journal::wait_for_journal_ready(Context *on_ready) {
+ Mutex::Locker locker(m_lock);
+ schedule_wait_for_ready(on_ready);
+}
+
+void Journal::wait_for_journal_ready() {
Mutex::Locker locker(m_lock);
- while (m_state != STATE_UNINITIALIZED && m_state != STATE_RECORDING) {
+ while (m_state != STATE_RECORDING) {
wait_for_state_transition();
}
- return (m_state == STATE_RECORDING);
}
void Journal::open() {
return 0;
}
-uint64_t Journal::append_event(AioCompletion *aio_comp,
- const journal::EventEntry &event_entry,
- const AioObjectRequests &requests,
- uint64_t offset, size_t length,
- bool flush_entry) {
+uint64_t Journal::append_io_event(AioCompletion *aio_comp,
+ const journal::EventEntry &event_entry,
+ const AioObjectRequests &requests,
+ uint64_t offset, size_t length,
+ bool flush_entry) {
assert(m_image_ctx.owner_lock.is_locked());
bufferlist bl;
return tid;
}
-void Journal::commit_event(uint64_t tid, int r) {
+void Journal::commit_io_event(uint64_t tid, int r) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
"r=" << r << dendl;
complete_event(it, r);
}
-void Journal::commit_event_extent(uint64_t tid, uint64_t offset,
- uint64_t length, int r) {
+void Journal::commit_io_event_extent(uint64_t tid, uint64_t offset,
+ uint64_t length, int r) {
assert(length > 0);
CephContext *cct = m_image_ctx.cct;
complete_event(it, event.ret_val);
}
+uint64_t Journal::append_op_event(journal::EventEntry &event_entry) {
+ assert(m_image_ctx.owner_lock.is_locked());
+
+ uint64_t tid;
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_state == STATE_RECORDING);
+
+ Mutex::Locker event_locker(m_event_lock);
+ tid = ++m_event_tid;
+ assert(tid != 0);
+
+ // inject the generated tid into the provided event entry
+ boost::apply_visitor(SetOpRequestTid(tid), event_entry.event);
+
+ bufferlist bl;
+ ::encode(event_entry, bl);
+ m_journaler->append("", bl);
+ }
+
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": "
+ << "event=" << event_entry.get_event_type() << ", "
+ << "tid=" << tid << dendl;
+ return tid;
+}
+
+void Journal::commit_op_event(uint64_t tid, int r) {
+ journal::EventEntry event_entry((journal::OpFinishEvent(tid, r)));
+
+ bufferlist bl;
+ ::encode(event_entry, bl);
+
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_state == STATE_RECORDING);
+
+ m_journaler->append("", bl);
+ }
+}
+
void Journal::flush_event(uint64_t tid, Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
assert(m_lock.is_locked());
m_state = state;
m_cond.Signal();
+
+ Contexts wait_for_state_contexts;
+ wait_for_state_contexts.swap(m_wait_for_state_contexts);
+ for (Contexts::iterator it = wait_for_state_contexts.begin();
+ it != wait_for_state_contexts.end(); ++it) {
+ (*it)->complete(0);
+ }
}
void Journal::wait_for_state_transition() {
}
}
+void Journal::schedule_wait_for_ready(Context *on_ready) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << __func__ << ": on_ready=" << on_ready << dendl;
+
+ assert(m_lock.is_locked());
+ m_wait_for_state_contexts.push_back(new C_WaitForReady(this, on_ready));
+}
+
+void Journal::handle_wait_for_ready(Context *on_ready) {
+ assert(m_lock.is_locked());
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": on_ready=" << on_ready << ", "
+ << "state=" << m_state << dendl;
+
+ if (m_state == STATE_RECORDING) {
+ m_image_ctx.op_work_queue->queue(on_ready, 0);
+ } else {
+ schedule_wait_for_ready(on_ready);
+ }
+}
+
} // namespace librbd
bool is_journal_ready() const;
bool is_journal_replaying() const;
- bool wait_for_journal_ready();
+ void wait_for_journal_ready(Context *on_ready);
+ void wait_for_journal_ready();
void open();
int close();
- uint64_t append_event(AioCompletion *aio_comp,
- const journal::EventEntry &event_entry,
- const AioObjectRequests &requests,
- uint64_t offset, size_t length,
- bool flush_entry);
+ uint64_t append_io_event(AioCompletion *aio_comp,
+ const journal::EventEntry &event_entry,
+ const AioObjectRequests &requests,
+ uint64_t offset, size_t length,
+ bool flush_entry);
+ void commit_io_event(uint64_t tid, int r);
+ void commit_io_event_extent(uint64_t tid, uint64_t offset, uint64_t length,
+ int r);
- void commit_event(uint64_t tid, int r);
- void commit_event_extent(uint64_t tid, uint64_t offset, uint64_t length,
- int r);
+ uint64_t append_op_event(journal::EventEntry &event_entry);
+ void commit_op_event(uint64_t tid, int r);
void flush_event(uint64_t tid, Context *on_safe);
void wait_event(uint64_t tid, Context *on_safe);
}
};
+ struct C_WaitForReady : public Context {
+ Journal *journal;
+ Context *on_ready;
+
+ C_WaitForReady(Journal *_journal, Context *_on_ready)
+ : journal(_journal), on_ready(_on_ready) {
+ }
+
+ virtual void finish(int r) {
+ journal->handle_wait_for_ready(on_ready);
+ }
+ };
+
struct ReplayHandler : public ::journal::ReplayHandler {
Journal *journal;
ReplayHandler(Journal *_journal) : journal(_journal) {
Cond m_cond;
State m_state;
+ Contexts m_wait_for_state_contexts;
LockListener m_lock_listener;
ReplayHandler m_replay_handler;
void flush_journal();
void transition_state(State state);
void wait_for_state_transition();
+ void schedule_wait_for_ready(Context *on_ready);
+ void handle_wait_for_ready(Context *on_ready);
};
} // namespace librbd
virtual void complete(int r) {
if (request_sent || r < 0) {
- commit_event_extent(r);
+ commit_io_event_extent(r);
req_comp->complete(r);
delete this;
} else {
virtual void finish(int r) {
}
- void commit_event_extent(int r) {
+ void commit_io_event_extent(int r) {
CephContext *cct = image_ctx->cct;
ldout(cct, 20) << this << " C_WriteJournalCommit: "
<< "write committed: updating journal commit position"
bl.length(), file_extents);
for (Extents::iterator it = file_extents.begin();
it != file_extents.end(); ++it) {
- image_ctx->journal->commit_event_extent(journal_tid, it->first,
- it->second, r);
+ image_ctx->journal->commit_io_event_extent(journal_tid, it->first,
+ it->second, r);
}
}
len, file_extents);
for (Extents::iterator it = file_extents.begin();
it != file_extents.end(); ++it) {
- m_ictx->journal->commit_event_extent(journal_tid, it->first, it->second,
- 0);
+ m_ictx->journal->commit_io_event_extent(journal_tid, it->first,
+ it->second, 0);
}
}
if (r < 0)
return r;
- bool fast_diff_enabled = false;
+ bool proxy_op = false;
{
RWLock::RLocker snap_locker(ictx->snap_lock);
if (ictx->get_snap_id(snap_name) == CEPH_NOSNAP) {
return -ENOENT;
}
- fast_diff_enabled = ((ictx->features & RBD_FEATURE_FAST_DIFF) != 0);
+ proxy_op = ((ictx->features & RBD_FEATURE_FAST_DIFF) != 0 ||
+ (ictx->features & RBD_FEATURE_JOURNALING) != 0);
}
- if (fast_diff_enabled) {
+ if (proxy_op) {
r = invoke_async_request(ictx, "snap_remove", true,
boost::bind(&snap_remove_helper, ictx, _1,
snap_name),
return -EROFS;
}
+ if (ictx->journal != NULL) {
+ ictx->journal->wait_for_journal_ready();
+ }
+
ictx->snap_lock.get_read();
new_size = ictx->get_image_size(snap_id);
ictx->snap_lock.put_read();
virtual void send_op();
virtual bool should_complete(int r);
+ virtual journal::Event create_event() const {
+ return journal::FlattenEvent(0);
+ }
+
private:
/**
* Flatten goes through the following state machine to copyup objects
virtual void send_op();
virtual bool should_complete(int r);
+ virtual journal::Event create_event() const {
+ return journal::RenameEvent(0, m_dest_name);
+ }
+
private:
std::string m_dest_name;
// vim: ts=8 sw=2 smarttab
#include "librbd/operation/Request.h"
+#include "librbd/ImageCtx.h"
+#include "librbd/Journal.h"
namespace librbd {
namespace operation {
Request::Request(ImageCtx &image_ctx, Context *on_finish)
- : AsyncRequest(image_ctx, on_finish) {
+ : AsyncRequest(image_ctx, on_finish), m_tid(0) {
}
void Request::send() {
- // TODO: record op start in journal
+ assert(m_image_ctx.owner_lock.is_locked());
+
+ {
+ RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
+ if (m_image_ctx.journal != NULL &&
+ !m_image_ctx.journal->is_journal_replaying()) {
+ // journal might be replaying -- wait for it to complete
+ if (!m_image_ctx.journal->is_journal_ready()) {
+ m_image_ctx.journal->wait_for_journal_ready(
+ new C_WaitForJournalReady(this));
+ return;
+ }
+
+ journal::EventEntry event_entry(create_event());
+ m_tid = m_image_ctx.journal->append_op_event(event_entry);
+ }
+ }
+
send_op();
}
void Request::finish(int r) {
- // TODO: record op finish in journal
+ AsyncRequest::finish(r);
+
+ RWLock::RLocker snap_locker(m_image_ctx.snap_lock);
+ if (m_tid != 0 &&
+ m_image_ctx.journal != NULL &&
+ !m_image_ctx.journal->is_journal_replaying()) {
+ // ops will be canceled / completed before closing journal
+ assert(m_image_ctx.journal->is_journal_ready());
+
+ m_image_ctx.journal->commit_op_event(m_tid, r);
+ }
+}
+
+void Request::handle_journal_ready() {
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+ send();
}
} // namespace operation
#define CEPH_LIBRBD_OPERATION_REQUEST_H
#include "librbd/AsyncRequest.h"
+#include "include/Context.h"
+#include "librbd/JournalTypes.h"
namespace librbd {
namespace operation {
virtual void finish(int r);
virtual void send_op() = 0;
+ virtual journal::Event create_event() const = 0;
+
+private:
+ struct C_WaitForJournalReady : public Context {
+ Request *request;
+
+ C_WaitForJournalReady(Request *_request) : request(_request) {
+ }
+
+ virtual void finish(int r) {
+ request->handle_journal_ready();
+ }
+ };
+
+ uint64_t m_tid;
+
+ void handle_journal_ready();
};
} // namespace operation
return false;
}
-void ResizeRequest::send_op() {
+void ResizeRequest::send() {
assert(m_image_ctx.owner_lock.is_locked());
{
compute_parent_overlap();
}
+ Request::send();
+}
+
+void ResizeRequest::send_op() {
+ assert(m_image_ctx.owner_lock.is_locked());
+
CephContext *cct = m_image_ctx.cct;
if (is_canceled()) {
complete(-ERESTART);
return m_new_size;
}
+ virtual void send();
+
protected:
virtual void send_op();
virtual bool should_complete(int r);
+ virtual journal::Event create_event() const {
+ return journal::ResizeEvent(0, m_new_size);
+ }
+
private:
/**
* Resize goes through the following state machine to resize the image
return r;
}
+ virtual journal::Event create_event() const {
+ return journal::SnapCreateEvent(0, m_snap_name);
+ }
+
private:
std::string m_snap_name;
State m_state;
virtual void send_op();
virtual bool should_complete(int r);
+ virtual journal::Event create_event() const {
+ return journal::SnapProtectEvent(0, m_snap_name);
+ }
+
private:
std::string m_snap_name;
State m_state;
virtual void send_op();
virtual bool should_complete(int r);
+ virtual journal::Event create_event() const {
+ return journal::SnapRemoveEvent(0, m_snap_name);
+ }
+
private:
std::string m_snap_name;
uint64_t m_snap_id;
SnapshotRenameRequest(ImageCtx &image_ctx, Context *on_finish,
uint64_t snap_id, const std::string &snap_name);
+ virtual journal::Event create_event() const {
+ return journal::SnapRenameEvent(0, m_snap_id, m_snap_name);
+ }
+
protected:
virtual void send_op();
virtual bool should_complete(int r);
virtual void send_op();
virtual bool should_complete(int r);
+ virtual journal::Event create_event() const {
+ return journal::SnapRollbackEvent(0, m_snap_name);
+ }
+
private:
std::string m_snap_name;
uint64_t m_snap_id;
return 0;
}
+ virtual journal::Event create_event() const {
+ return journal::SnapUnprotectEvent(0, m_snap_name);
+ }
+
private:
std::string m_snap_name;
State m_state;