ldout(m_image_ctx.cct, 10) << this << " remote resize request: "
<< payload.async_request_id << " "
<< payload.size << dendl;
- m_image_ctx.operations->resize(payload.size, *prog_ctx, ctx);
+ m_image_ctx.operations->resize(payload.size, *prog_ctx, ctx, 0);
}
::encode(ResponseMessage(r), ack_ctx->out);
<< payload.snap_name << dendl;
m_image_ctx.operations->snap_create(payload.snap_name.c_str(),
- new C_ResponseMessage(ack_ctx));
+ new C_ResponseMessage(ack_ctx), 0);
return false;
}
return true;
future.flush(new C_OpEventSafe(this, op_tid, future, nullptr));
}
+template <typename I>
+void Journal<I>::replay_op_ready(uint64_t op_tid, Context *on_resume) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 10) << this << " " << __func__ << ": op_tid=" << op_tid << dendl;
+
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_journal_replay != nullptr);
+ m_journal_replay->replay_op_ready(op_tid, on_resume);
+ }
+}
+
template <typename I>
void Journal<I>::flush_event(uint64_t tid, Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
void append_op_event(uint64_t op_tid, journal::EventEntry &&event_entry,
Context *on_safe);
void commit_op_event(uint64_t tid, int r);
+ void replay_op_ready(uint64_t op_tid, Context *on_resume);
void flush_event(uint64_t tid, Context *on_safe);
void wait_event(uint64_t tid, Context *on_safe);
uint64_t request_id = m_async_request_seq.inc();
r = invoke_async_request("resize", false,
boost::bind(&Operations<I>::resize, this,
- size, boost::ref(prog_ctx), _1),
+ size, boost::ref(prog_ctx), _1, 0),
boost::bind(&ImageWatcher::notify_resize,
m_image_ctx.image_watcher, request_id,
size, boost::ref(prog_ctx)));
template <typename I>
void Operations<I>::resize(uint64_t size, ProgressContext &prog_ctx,
- Context *on_finish) {
+ Context *on_finish, uint64_t journal_op_tid) {
assert(m_image_ctx.owner_lock.is_locked());
assert(m_image_ctx.exclusive_lock == nullptr ||
m_image_ctx.exclusive_lock->is_lock_owner());
}
operation::ResizeRequest<I> *req = new operation::ResizeRequest<I>(
- m_image_ctx, on_finish, size, prog_ctx);
+ m_image_ctx, on_finish, size, prog_ctx, journal_op_tid, false);
req->send();
}
r = invoke_async_request("snap_create", true,
boost::bind(&Operations<I>::snap_create, this,
- snap_name, _1),
+ snap_name, _1, 0),
boost::bind(&ImageWatcher::notify_snap_create,
m_image_ctx.image_watcher, snap_name));
if (r < 0 && r != -EEXIST) {
}
template <typename I>
-void Operations<I>::snap_create(const char *snap_name, Context *on_finish) {
+void Operations<I>::snap_create(const char *snap_name, Context *on_finish,
+ uint64_t journal_op_tid) {
assert(m_image_ctx.owner_lock.is_locked());
assert(m_image_ctx.exclusive_lock == nullptr ||
m_image_ctx.exclusive_lock->is_lock_owner());
<< dendl;
operation::SnapshotCreateRequest<I> *req =
- new operation::SnapshotCreateRequest<I>(m_image_ctx, on_finish, snap_name);
+ new operation::SnapshotCreateRequest<I>(m_image_ctx, on_finish, snap_name,
+ journal_op_tid);
req->send();
}
void rename(const char *dstname, Context *on_finish);
int resize(uint64_t size, ProgressContext& prog_ctx);
- void resize(uint64_t size, ProgressContext &prog_ctx, Context *on_finish);
+ void resize(uint64_t size, ProgressContext &prog_ctx, Context *on_finish,
+ uint64_t journal_op_tid);
int snap_create(const char *snap_name);
- void snap_create(const char *snap_name, Context *on_finish);
+ void snap_create(const char *snap_name, Context *on_finish,
+ uint64_t journal_op_tid);
int snap_rollback(const char *snap_name, ProgressContext& prog_ctx);
void snap_rollback(const char *snap_name, ProgressContext& prog_ctx,
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
+ AioCompletion *flush_comp = nullptr;
OpTids cancel_op_tids;
on_finish = util::create_async_context_callback(
m_image_ctx, on_finish);
{
- RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
Mutex::Locker locker(m_lock);
// safely commit any remaining AIO modify operations
if (m_in_flight_aio != 0) {
- flush_aio();
+ flush_comp = create_aio_flush_completion(nullptr, nullptr);;
}
+ // cancel ops that are waiting to start
for (auto &op_event_pair : m_op_events) {
- cancel_op_tids.push_back(op_event_pair.first);
+ const OpEvent &op_event = op_event_pair.second;
+ if (op_event.on_start_ready == nullptr) {
+ cancel_op_tids.push_back(op_event_pair.first);
+ }
}
assert(m_flush_ctx == nullptr);
}
}
+ // execute the following outside of lock scope
+ if (flush_comp != nullptr) {
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+ AioImageRequest<I>::aio_flush(&m_image_ctx, flush_comp);
+ }
for (auto op_tid : cancel_op_tids) {
handle_op_complete(op_tid, -ERESTART);
}
}
}
+template <typename I>
+void Replay<I>::replay_op_ready(uint64_t op_tid, Context *on_resume) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": op_tid=" << op_tid << dendl;
+
+ auto op_it = m_op_events.find(op_tid);
+ assert(op_it != m_op_events.end());
+
+ OpEvent &op_event = op_it->second;
+ assert(op_event.op_in_progress &&
+ op_event.on_op_finish_event == nullptr &&
+ op_event.on_finish_ready == nullptr &&
+ op_event.on_finish_safe == nullptr);
+
+ // resume processing replay events
+ Context *on_start_ready = nullptr;
+ std::swap(on_start_ready, op_event.on_start_ready);
+ on_start_ready->complete(0);
+
+ // cancel has been requested -- send error to paused state machine
+ if (m_flush_ctx != nullptr) {
+ m_image_ctx.op_work_queue->queue(on_resume, -ERESTART);
+ return;
+ }
+
+ // resume the op state machine once the associated OpFinishEvent
+ // is processed
+ op_event.on_op_finish_event = new FunctionContext(
+ [on_resume](int r) {
+ on_resume->complete(r);
+ });
+}
+
template <typename I>
void Replay<I>::handle_event(const journal::AioDiscardEvent &event,
Context *on_ready, Context *on_safe) {
AioImageRequest<I>::aio_discard(&m_image_ctx, aio_comp, event.offset,
event.length);
if (flush_required) {
- Mutex::Locker locker(m_lock);
- flush_aio();
+ m_lock.Lock();
+ AioCompletion *flush_comp = create_aio_flush_completion(nullptr, nullptr);
+ m_lock.Unlock();
+
+ AioImageRequest<I>::aio_flush(&m_image_ctx, flush_comp);
}
}
AioImageRequest<I>::aio_write(&m_image_ctx, aio_comp, event.offset,
event.length, data.c_str(), 0);
if (flush_required) {
- Mutex::Locker locker(m_lock);
- flush_aio();
+ m_lock.Lock();
+ AioCompletion *flush_comp = create_aio_flush_completion(nullptr, nullptr);
+ m_lock.Unlock();
+
+ AioImageRequest<I>::aio_flush(&m_image_ctx, flush_comp);
}
}
ldout(cct, 20) << this << " " << __func__ << ": Op finish event: "
<< "op_tid=" << event.op_tid << dendl;
+ bool op_in_progress;
Context *on_op_finish_event = nullptr;
{
Mutex::Locker locker(m_lock);
assert(op_event.on_finish_safe == nullptr);
op_event.on_finish_ready = on_ready;
op_event.on_finish_safe = on_safe;
+ op_in_progress = op_event.op_in_progress;
std::swap(on_op_finish_event, op_event.on_op_finish_event);
}
if (event.r < 0) {
- // TODO handle snap create / resize
-
- // journal recorded failure of op -- no-op the operation
- delete on_op_finish_event;
- handle_op_complete(event.op_tid, 0);
+ if (op_in_progress) {
+ // bubble the error up to the in-progress op to cancel it
+ on_op_finish_event->complete(event.r);
+ } else {
+ // op hasn't been started -- no-op the event
+ delete on_op_finish_event;
+ handle_op_complete(event.op_tid, 0);
+ }
return;
}
- // apply the op now -- each op is responsible for filtering the
- // recorded result to know if the op completed successfully
+ // journal recorded success -- apply the op now
on_op_finish_event->complete(0);
}
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap create event" << dendl;
- // TODO not-ready until state machine lets us know
+ Mutex::Locker locker(m_lock);
OpEvent *op_event;
Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe,
&op_event);
- m_image_ctx.operations->snap_create(event.snap_name.c_str(), on_op_complete);
+ m_image_ctx.operations->snap_create(event.snap_name.c_str(), on_op_complete,
+ event.op_tid);
+
+ // do not process more events until the state machine is ready
+ // since it will affect IO
+ op_event->op_in_progress = true;
+ op_event->on_start_ready = util::create_async_context_callback(
+ m_image_ctx, on_ready);
}
template <typename I>
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Resize start event" << dendl;
- // TODO not-ready until state machine lets us know
+ Mutex::Locker locker(m_lock);
OpEvent *op_event;
Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe,
&op_event);
m_image_ctx.operations->resize(event.size, no_op_progress_callback,
- on_op_complete);
+ on_op_complete, event.op_tid);
+
+ // do not process more events until the state machine is ready
+ // since it will affect IO
+ op_event->op_in_progress = true;
+ op_event->on_start_ready = util::create_async_context_callback(
+ m_image_ctx, on_ready);
}
template <typename I>
on_safe->complete(0);
}
-template <typename I>
-void Replay<I>::flush_aio() {
- assert(m_lock.is_locked());
-
- AioCompletion *aio_comp = create_aio_flush_completion(nullptr, nullptr);
- AioImageRequest<I>::aio_flush(&m_image_ctx, aio_comp);
-}
-
template <typename I>
void Replay<I>::handle_aio_modify_complete(Context *on_safe, int r) {
Mutex::Locker locker(m_lock);
op_event = std::move(op_it->second);
m_op_events.erase(op_it);
- // TODO handle paused snap create / resize
-
if (m_op_events.empty() && m_in_flight_aio == 0) {
on_flush = m_flush_ctx;
}
}
+ assert(op_event.on_start_ready == nullptr);
assert((op_event.on_finish_ready != nullptr &&
op_event.on_finish_safe != nullptr) || r == -ERESTART);
void process(bufferlist::iterator *it, Context *on_ready, Context *on_safe);
void flush(Context *on_finish);
+ void replay_op_ready(uint64_t op_tid, Context *on_resume);
+
private:
struct OpEvent {
+ bool op_in_progress = false;
Context *on_op_finish_event = nullptr;
+ Context *on_start_ready = nullptr;
Context *on_start_safe = nullptr;
Context *on_finish_ready = nullptr;
Context *on_finish_safe = nullptr;
void handle_event(const UnknownEvent &event, Context *on_ready,
Context *on_safe);
- void flush_aio();
void handle_aio_modify_complete(Context *on_safe, int r);
void handle_aio_flush_complete(Context *on_flush_safe, Contexts &on_safe_ctxs,
int r);
namespace operation {
template <typename I>
-Request<I>::Request(I &image_ctx, Context *on_finish)
- : AsyncRequest<I>(image_ctx, on_finish) {
+Request<I>::Request(I &image_ctx, Context *on_finish, uint64_t journal_op_tid)
+ : AsyncRequest<I>(image_ctx, on_finish), m_op_tid(journal_op_tid) {
}
template <typename I>
template <typename ImageCtxT = ImageCtx>
class Request : public AsyncRequest<ImageCtxT> {
public:
- Request(ImageCtxT &image_ctx, Context *on_finish);
+ Request(ImageCtxT &image_ctx, Context *on_finish,
+ uint64_t journal_op_tid = 0);
virtual void send();
bool append_op_event(T *request) {
ImageCtxT &image_ctx = this->m_image_ctx;
+ assert(can_affect_io());
RWLock::RLocker owner_locker(image_ctx.owner_lock);
RWLock::RLocker snap_locker(image_ctx.snap_lock);
- if (image_ctx.journal != NULL &&
- !image_ctx.journal->is_journal_replaying()) {
- append_op_event(util::create_context_callback<T, MF>(request));
+ if (image_ctx.journal != NULL) {
+ Context *ctx = util::create_context_callback<T, MF>(request);
+ if (image_ctx.journal->is_journal_replaying()) {
+ assert(m_op_tid != 0);
+ m_appended_op_event = true;
+ image_ctx.journal->replay_op_ready(m_op_tid, ctx);
+ } else {
+ append_op_event(ctx);
+ }
return true;
}
return false;
template <typename I>
ResizeRequest<I>::ResizeRequest(I &image_ctx, Context *on_finish,
- uint64_t new_size, ProgressContext &prog_ctx)
- : Request<I>(image_ctx, on_finish),
+ uint64_t new_size, ProgressContext &prog_ctx,
+ uint64_t journal_op_tid, bool disable_journal)
+ : Request<I>(image_ctx, on_finish, journal_op_tid),
m_original_size(0), m_new_size(new_size), m_prog_ctx(prog_ctx),
- m_new_parent_overlap(0), m_xlist_item(this)
+ m_new_parent_overlap(0), m_disable_journal(disable_journal),
+ m_xlist_item(this)
{
}
template <typename I>
Context *ResizeRequest<I>::send_append_op_event() {
I &image_ctx = this->m_image_ctx;
- if (!this->template append_op_event<
+ if (m_disable_journal || !this->template append_op_event<
ResizeRequest<I>, &ResizeRequest<I>::handle_append_op_event>(this)) {
- m_shrink_size_visible = true;
return send_grow_object_map();
}
CephContext *cct = image_ctx.cct;
ldout(cct, 5) << this << " " << __func__ << ": r=" << *result << dendl;
- m_shrink_size_visible = true;
if (*result < 0) {
lderr(cct) << "failed to commit journal entry: " << cpp_strerror(*result)
<< dendl;
Context *ResizeRequest<I>::send_grow_object_map() {
I &image_ctx = this->m_image_ctx;
+ {
+ RWLock::WLocker snap_locker(image_ctx.snap_lock);
+ m_shrink_size_visible = true;
+ }
image_ctx.aio_work_queue->unblock_writes();
+
if (m_original_size == m_new_size) {
- this->commit_op_event(0);
+ if (!m_disable_journal) {
+ this->commit_op_event(0);
+ }
return this->create_context_finisher();
} else if (m_new_size < m_original_size) {
send_trim_image();
return this->create_context_finisher();
}
- this->commit_op_event(0);
+ if (!m_disable_journal) {
+ this->commit_op_event(0);
+ }
return send_shrink_object_map();
}
template <typename ImageCtxT = ImageCtx>
class ResizeRequest : public Request<ImageCtxT> {
public:
+ static ResizeRequest *create(ImageCtxT &image_ctx, Context *on_finish,
+ uint64_t new_size, ProgressContext &prog_ctx,
+ uint64_t journal_op_tid, bool disable_journal) {
+ return new ResizeRequest(image_ctx, on_finish, new_size, prog_ctx,
+ journal_op_tid, disable_journal);
+ }
+
ResizeRequest(ImageCtxT &image_ctx, Context *on_finish, uint64_t new_size,
- ProgressContext &prog_ctx);
+ ProgressContext &prog_ctx, uint64_t journal_op_tid,
+ bool disable_journal);
virtual ~ResizeRequest();
inline bool shrinking() const {
ProgressContext &m_prog_ctx;
uint64_t m_new_parent_overlap;
bool m_shrink_size_visible = false;
+ bool m_disable_journal = false;
typename xlist<ResizeRequest<ImageCtxT>*>::item m_xlist_item;
template <typename I>
SnapshotCreateRequest<I>::SnapshotCreateRequest(I &image_ctx,
Context *on_finish,
- const std::string &snap_name)
- : Request<I>(image_ctx, on_finish), m_snap_name(snap_name), m_ret_val(0),
- m_snap_id(CEPH_NOSNAP) {
+ const std::string &snap_name,
+ uint64_t journal_op_tid)
+ : Request<I>(image_ctx, on_finish, journal_op_tid), m_snap_name(snap_name),
+ m_ret_val(0), m_snap_id(CEPH_NOSNAP) {
}
template <typename I>
* (if enabled) and bubble the originating error code back to the client.
*/
SnapshotCreateRequest(ImageCtxT &image_ctx, Context *on_finish,
- const std::string &snap_name);
+ const std::string &snap_name, uint64_t journal_op_tid);
protected:
virtual void send_op();