assert(m_in_flight_aio == 0);
assert(m_aio_modify_unsafe_contexts.empty());
assert(m_aio_modify_safe_contexts.empty());
- assert(m_op_contexts.empty());
+ assert(m_op_events.empty());
}
template <typename I>
return;
}
- Mutex::Locker locker(m_lock);
RWLock::RLocker owner_lock(m_image_ctx.owner_lock);
boost::apply_visitor(EventVisitor(this, on_ready, on_safe),
event_entry.event);
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
+ OpTids cancel_op_tids;
on_finish = util::create_async_context_callback(
m_image_ctx, on_finish);
+
{
+ RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
Mutex::Locker locker(m_lock);
- assert(m_flush_ctx == nullptr);
- m_flush_ctx = on_finish;
+ // safely commit any remaining AIO modify operations
if (m_in_flight_aio != 0) {
flush_aio();
}
- if (!m_op_contexts.empty() || m_in_flight_aio != 0) {
- return;
+ for (auto &op_event_pair : m_op_events) {
+ cancel_op_tids.push_back(op_event_pair.first);
+ }
+
+ assert(m_flush_ctx == nullptr);
+ if (!m_op_events.empty() || m_in_flight_aio != 0) {
+ std::swap(m_flush_ctx, on_finish);
}
}
- on_finish->complete(0);
+
+ for (auto op_tid : cancel_op_tids) {
+ handle_op_complete(op_tid, -ERESTART);
+ }
+ if (on_finish != nullptr) {
+ on_finish->complete(0);
+ }
}
template <typename I>
AioImageRequest<I>::aio_discard(&m_image_ctx, aio_comp, event.offset,
event.length);
if (flush_required) {
+ Mutex::Locker locker(m_lock);
flush_aio();
}
}
AioImageRequest<I>::aio_write(&m_image_ctx, aio_comp, event.offset,
event.length, data.c_str(), 0);
if (flush_required) {
+ Mutex::Locker locker(m_lock);
flush_aio();
}
}
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": AIO flush event" << dendl;
- AioCompletion *aio_comp = create_aio_flush_completion(on_ready, on_safe);
+ AioCompletion *aio_comp;
+ {
+ Mutex::Locker locker(m_lock);
+ aio_comp = create_aio_flush_completion(on_ready, on_safe);
+ }
AioImageRequest<I>::aio_flush(&m_image_ctx, aio_comp);
}
void Replay<I>::handle_event(const journal::OpFinishEvent &event,
Context *on_ready, Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
- ldout(cct, 20) << this << " " << __func__ << ": Op finish event" << dendl;
+ ldout(cct, 20) << this << " " << __func__ << ": Op finish event: "
+ << "op_tid=" << event.op_tid << dendl;
+
+ Context *on_op_finish_event = nullptr;
+ {
+ Mutex::Locker locker(m_lock);
+ auto op_it = m_op_events.find(event.op_tid);
+ if (op_it == m_op_events.end()) {
+ ldout(cct, 10) << "unable to locate associated op: assuming previously "
+ << "committed." << dendl;
+ on_ready->complete(0);
+ m_image_ctx.op_work_queue->queue(on_safe, 0);
+ return;
+ }
+
+ OpEvent &op_event = op_it->second;
+ assert(op_event.on_finish_safe == nullptr);
+ op_event.on_finish_ready = on_ready;
+ op_event.on_finish_safe = on_safe;
+ std::swap(on_op_finish_event, op_event.on_op_finish_event);
+ }
+
+ if (event.r < 0) {
+ // TODO handle snap create / resize
+
+ // journal recorded failure of op -- no-op the operation
+ delete on_op_finish_event;
+ handle_op_complete(event.op_tid, 0);
+ return;
+ }
+
+ // apply the op now -- each op is responsible for filtering the
+ // recorded result to know if the op completed successfully
+ on_op_finish_event->complete(0);
}
template <typename I>
ldout(cct, 20) << this << " " << __func__ << ": Snap create event" << dendl;
// TODO not-ready until state machine lets us know
- Context *on_finish = create_op_context_callback(on_safe);
- m_image_ctx.operations->snap_create(event.snap_name.c_str(), on_finish);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe,
+ &op_event);
+
+ m_image_ctx.operations->snap_create(event.snap_name.c_str(), on_op_complete);
}
template <typename I>
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap remove event" << dendl;
- Context *on_finish = create_op_context_callback(on_safe);
- m_image_ctx.operations->snap_remove(event.snap_name.c_str(), on_finish);
+ Mutex::Locker locker(m_lock);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe,
+ &op_event);
+ op_event->on_op_finish_event = new FunctionContext(
+ [this, event, on_op_complete](int r) {
+ m_image_ctx.operations->snap_remove(event.snap_name.c_str(),
+ on_op_complete);
+ });
+
on_ready->complete(0);
}
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap rename event" << dendl;
- Context *on_finish = create_op_context_callback(on_safe);
- m_image_ctx.operations->snap_rename(event.snap_id, event.snap_name.c_str(),
- on_finish);
+ Mutex::Locker locker(m_lock);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe,
+ &op_event);
+ op_event->on_op_finish_event = new FunctionContext(
+ [this, event, on_op_complete](int r) {
+ m_image_ctx.operations->snap_rename(event.snap_id,
+ event.snap_name.c_str(),
+ on_op_complete);
+ });
+
on_ready->complete(0);
}
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Snap protect event" << dendl;
- Context *on_finish = create_op_context_callback(on_safe);
- m_image_ctx.operations->snap_protect(event.snap_name.c_str(), on_finish);
+ Mutex::Locker locker(m_lock);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe,
+ &op_event);
+ op_event->on_op_finish_event = new FunctionContext(
+ [this, event, on_op_complete](int r) {
+ m_image_ctx.operations->snap_protect(event.snap_name.c_str(),
+ on_op_complete);
+ });
+
on_ready->complete(0);
}
ldout(cct, 20) << this << " " << __func__ << ": Snap unprotect event"
<< dendl;
- Context *on_finish = create_op_context_callback(on_safe);
- m_image_ctx.operations->snap_unprotect(event.snap_name.c_str(), on_finish);
+ Mutex::Locker locker(m_lock);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe,
+ &op_event);
+ op_event->on_op_finish_event = new FunctionContext(
+ [this, event, on_op_complete](int r) {
+ m_image_ctx.operations->snap_unprotect(event.snap_name.c_str(),
+ on_op_complete);
+ });
+
on_ready->complete(0);
}
ldout(cct, 20) << this << " " << __func__ << ": Snap rollback start event"
<< dendl;
- Context *on_finish = create_op_context_callback(on_safe);
- m_image_ctx.operations->snap_rollback(event.snap_name.c_str(),
- no_op_progress_callback, on_finish);
+ Mutex::Locker locker(m_lock);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe,
+ &op_event);
+ op_event->on_op_finish_event = new FunctionContext(
+ [this, event, on_op_complete](int r) {
+ m_image_ctx.operations->snap_rollback(event.snap_name.c_str(),
+ no_op_progress_callback,
+ on_op_complete);
+ });
+
on_ready->complete(0);
}
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Rename event" << dendl;
- Context *on_finish = create_op_context_callback(on_safe);
- m_image_ctx.operations->rename(event.image_name.c_str(), on_finish);
+ Mutex::Locker locker(m_lock);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe,
+ &op_event);
+ op_event->on_op_finish_event = new FunctionContext(
+ [this, event, on_op_complete](int r) {
+ m_image_ctx.operations->rename(event.image_name.c_str(), on_op_complete);
+ });
+
on_ready->complete(0);
}
ldout(cct, 20) << this << " " << __func__ << ": Resize start event" << dendl;
// TODO not-ready until state machine lets us know
- Context *on_finish = create_op_context_callback(on_safe);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe,
+ &op_event);
+
m_image_ctx.operations->resize(event.size, no_op_progress_callback,
- on_finish);
+ on_op_complete);
}
template <typename I>
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": Flatten start event" << dendl;
- Context *on_finish = create_op_context_callback(on_safe);
- m_image_ctx.operations->flatten(no_op_progress_callback, on_finish);
+ Mutex::Locker locker(m_lock);
+ OpEvent *op_event;
+ Context *on_op_complete = create_op_context_callback(event.op_tid, on_safe,
+ &op_event);
+ op_event->on_op_finish_event = new FunctionContext(
+ [this, event, on_op_complete](int r) {
+ m_image_ctx.operations->flatten(no_op_progress_callback, on_op_complete);
+ });
+
on_ready->complete(0);
}
m_in_flight_aio -= on_safe_ctxs.size();
std::swap(on_aio_ready, m_on_aio_ready);
- if (m_op_contexts.empty() && m_in_flight_aio == 0) {
+ if (m_op_events.empty() && m_in_flight_aio == 0) {
on_flush = m_flush_ctx;
}
}
template <typename I>
-Context *Replay<I>::create_op_context_callback(Context *on_safe) {
+Context *Replay<I>::create_op_context_callback(uint64_t op_tid,
+ Context *on_safe,
+ OpEvent **op_event) {
assert(m_lock.is_locked());
- C_OpOnFinish *on_finish;
- {
- on_finish = new C_OpOnFinish(this);
- m_op_contexts[on_finish] = on_safe;
- }
- return on_finish;
+ *op_event = &m_op_events[op_tid];
+ (*op_event)->on_start_safe = on_safe;
+ return new C_OpOnComplete(this, op_tid);
}
template <typename I>
-void Replay<I>::handle_op_context_callback(Context *on_op_finish, int r) {
- Context *on_safe = nullptr;
+void Replay<I>::handle_op_complete(uint64_t op_tid, int r) {
+ CephContext *cct = m_image_ctx.cct;
+ ldout(cct, 20) << this << " " << __func__ << ": op_tid=" << op_tid << ", "
+ << "r=" << r << dendl;
+
+ OpEvent op_event;
Context *on_flush = nullptr;
{
Mutex::Locker locker(m_lock);
- auto it = m_op_contexts.find(on_op_finish);
- assert(it != m_op_contexts.end());
+ auto op_it = m_op_events.find(op_tid);
+ assert(op_it != m_op_events.end());
+
+ op_event = std::move(op_it->second);
+ m_op_events.erase(op_it);
- on_safe = it->second;
- m_op_contexts.erase(it);
- if (m_op_contexts.empty() && m_in_flight_aio == 0) {
+ // TODO handle paused snap create / resize
+
+ if (m_op_events.empty() && m_in_flight_aio == 0) {
on_flush = m_flush_ctx;
}
}
- on_safe->complete(r);
+ assert((op_event.on_finish_ready != nullptr &&
+ op_event.on_finish_safe != nullptr) || r == -ERESTART);
+
+ // skipped upon error -- so clean up if non-null
+ delete op_event.on_op_finish_event;
+
+ if (op_event.on_finish_ready != nullptr) {
+ op_event.on_finish_ready->complete(0);
+ }
+
+ op_event.on_start_safe->complete(r);
+ if (op_event.on_finish_safe != nullptr) {
+ op_event.on_finish_safe->complete(r);
+ }
if (on_flush != nullptr) {
on_flush->complete(0);
}
AioCompletion *Replay<I>::create_aio_modify_completion(Context *on_ready,
Context *on_safe,
bool *flush_required) {
+ Mutex::Locker locker(m_lock);
CephContext *cct = m_image_ctx.cct;
- assert(m_lock.is_locked());
assert(m_on_aio_ready == nullptr);
++m_in_flight_aio;
template <typename I>
AioCompletion *Replay<I>::create_aio_flush_completion(Context *on_ready,
Context *on_safe) {
+ assert(m_lock.is_locked());
+
// associate all prior write/discard ops to this flush request
AioCompletion *aio_comp = AioCompletion::create<Context>(
new C_AioFlushComplete(this, on_safe,