CephContext *cct = m_image_ctx.cct;
m_lock.Lock();
+ assert(m_state == STATE_REPLAYING ||
+ m_state == STATE_FLUSHING_RESTART ||
+ m_state == STATE_FLUSHING_REPLAY);
+
ldout(cct, 20) << this << " " << __func__ << ": r=" << r << dendl;
if (r < 0) {
lderr(cct) << "failed to commit journal event to disk: " << cpp_strerror(r)
} else {
// only commit the entry if written successfully
m_journaler->committed(replay_entry);
- m_lock.Unlock();
}
+ m_lock.Unlock();
}
template <typename I>
}
virtual void finish(int r) override {
+ CephContext *cct = image_ctx.cct;
+ if (r < 0) {
+ lderr(cct) << "ExecuteOp: " << __func__ << ": r=" << r << dendl;
+ on_op_complete->complete(r);
+ return;
+ }
+
+ ldout(cct, 20) << "ExecuteOp: " << __func__ << dendl;
RWLock::RLocker owner_locker(image_ctx.owner_lock);
execute(event);
}
}
virtual void finish(int r) override {
+ CephContext *cct = image_ctx.cct;
+
+ if (r < 0) {
+ lderr(cct) << "C_RefreshIfRequired: " << __func__ << ": r=" << r << dendl;
+ image_ctx.op_work_queue->queue(on_finish, r);
+ return;
+ }
+
if (image_ctx.state->is_refresh_required()) {
+ ldout(cct, 20) << "C_RefreshIfRequired: " << __func__ << ": "
+ << "refresh required" << dendl;
image_ctx.state->refresh(on_finish);
return;
}
ldout(cct, 20) << this << " " << __func__ << dendl;
AioCompletion *flush_comp = nullptr;
- OpTids cancel_op_tids;
- Contexts op_finish_events;
on_finish = util::create_async_context_callback(
m_image_ctx, on_finish);
// OpFinishEvent or waiting for ready)
if (op_event.on_start_ready == nullptr &&
op_event.on_op_finish_event != nullptr) {
- cancel_op_tids.push_back(op_event_pair.first);
+ Context *on_op_finish_event = nullptr;
+ std::swap(on_op_finish_event, op_event.on_op_finish_event);
+ m_image_ctx.op_work_queue->queue(on_op_finish_event, -ERESTART);
}
} else if (op_event.on_op_finish_event != nullptr) {
// start ops waiting for OpFinishEvent
RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
AioImageRequest<I>::aio_flush(&m_image_ctx, flush_comp);
}
- for (auto op_tid : cancel_op_tids) {
- handle_op_complete(op_tid, -ERESTART);
- }
if (on_finish != nullptr) {
on_finish->complete(0);
}
op_event.on_finish_safe != nullptr) || shutting_down);
}
- // skipped upon error -- so clean up if non-null
- delete op_event.on_op_finish_event;
- if (r == -ERESTART) {
- delete op_event.on_op_complete;
+ if (op_event.on_op_finish_event != nullptr) {
+ op_event.on_op_finish_event->complete(r);
}
if (op_event.on_finish_ready != nullptr) {
when_replay_op_ready(mock_journal_replay, 123, &on_resume);
ASSERT_EQ(0, on_snap_create_ready.wait());
- ASSERT_EQ(0, when_shut_down(mock_journal_replay, true));
- ASSERT_EQ(-ERESTART, on_snap_remove_safe.wait());
+ C_SaferCond on_shut_down;
+ mock_journal_replay.shut_down(true, &on_shut_down);
+
+ ASSERT_EQ(-ERESTART, on_resume.wait());
+ on_snap_create_finish->complete(-ERESTART);
ASSERT_EQ(-ERESTART, on_snap_create_safe.wait());
+
+ ASSERT_EQ(-ERESTART, on_snap_remove_safe.wait());
+ ASSERT_EQ(0, on_shut_down.wait());
}
TEST_F(TestMockJournalReplay, UnknownOpFinishEvent) {