void complete(int r) {
if (should_complete(r)) {
r = filter_return_code(r);
- finish(r);
- delete this;
+ finish_and_destroy(r);
}
}
return r;
}
+ // NOTE: temporary until converted to new state machine format
+ virtual void finish_and_destroy(int r) {
+ finish(r);
+ delete this;
+ }
+
virtual void finish(int r) {
finish_request();
m_on_finish->complete(r);
}
template <typename I>
-void Journal<I>::commit_op_event(uint64_t op_tid, int r) {
+void Journal<I>::commit_op_event(uint64_t op_tid, int r, Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 10) << this << " " << __func__ << ": op_tid=" << op_tid << ", "
<< "r=" << r << dendl;
op_finish_future.flush(create_async_context_callback(
m_image_ctx, new C_OpEventSafe(this, op_tid, op_start_future,
- op_finish_future)));
+ op_finish_future, on_safe)));
}
template <typename I>
template <typename I>
void Journal<I>::handle_op_event_safe(int r, uint64_t tid,
const Future &op_start_future,
- const Future &op_finish_future) {
+ const Future &op_finish_future,
+ Context *on_safe) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
<< "tid=" << tid << dendl;
m_journaler->committed(op_finish_future);
// reduce the replay window after committing an op event
- m_journaler->flush_commit_position(nullptr);
+ m_journaler->flush_commit_position(on_safe);
}
template <typename I>
void append_op_event(uint64_t op_tid, journal::EventEntry &&event_entry,
Context *on_safe);
- void commit_op_event(uint64_t tid, int r);
+ void commit_op_event(uint64_t tid, int r, Context *on_safe);
void replay_op_ready(uint64_t op_tid, Context *on_resume);
void flush_event(uint64_t tid, Context *on_safe);
uint64_t tid;
Future op_start_future;
Future op_finish_future;
+ Context *on_safe;
C_OpEventSafe(Journal *journal, uint64_t tid, const Future &op_start_future,
- const Future &op_finish_future)
+ const Future &op_finish_future, Context *on_safe)
: journal(journal), tid(tid), op_start_future(op_start_future),
- op_finish_future(op_finish_future) {
+ op_finish_future(op_finish_future), on_safe(on_safe) {
}
virtual void finish(int r) {
- journal->handle_op_event_safe(r, tid, op_start_future, op_finish_future);
+ journal->handle_op_event_safe(r, tid, op_start_future, op_finish_future,
+ on_safe);
}
};
void handle_io_event_safe(int r, uint64_t tid);
void handle_op_event_safe(int r, uint64_t tid, const Future &op_start_future,
- const Future &op_finish_future);
+ const Future &op_finish_future, Context *on_safe);
void stop_recording();
}
template <typename I>
-void Request<I>::finish(int r) {
- // automatically commit the event if we don't need to worry
- // about affecting concurrent IO ops
- if (r < 0 || !can_affect_io()) {
- commit_op_event(r);
+Context *Request<I>::create_context_finisher(int r) {
+ // automatically commit the event if required (delete after commit)
+ if (m_appended_op_event && !m_committed_op_event &&
+ commit_op_event(r)) {
+ return nullptr;
}
+ I &image_ctx = this->m_image_ctx;
+ CephContext *cct = image_ctx.cct;
+ ldout(cct, 10) << this << " " << __func__ << dendl;
+ return util::create_context_callback<Request<I>, &Request<I>::finish>(this);
+}
+
+template <typename I>
+void Request<I>::finish_and_destroy(int r) {
+ I &image_ctx = this->m_image_ctx;
+ CephContext *cct = image_ctx.cct;
+ ldout(cct, 10) << this << " " << __func__ << ": r=" << r << dendl;
+
+ // automatically commit the event if required (delete after commit)
+ if (m_appended_op_event && !m_committed_op_event &&
+ commit_op_event(r)) {
+ return;
+ }
+
+ AsyncRequest<I>::finish_and_destroy(r);
+}
+
+template <typename I>
+void Request<I>::finish(int r) {
+ I &image_ctx = this->m_image_ctx;
+ CephContext *cct = image_ctx.cct;
+ ldout(cct, 10) << this << " " << __func__ << ": r=" << r << dendl;
+
assert(!m_appended_op_event || m_committed_op_event);
AsyncRequest<I>::finish(r);
}
}
template <typename I>
-void Request<I>::commit_op_event(int r) {
+bool Request<I>::commit_op_event(int r) {
I &image_ctx = this->m_image_ctx;
RWLock::RLocker snap_locker(image_ctx.snap_lock);
if (!m_appended_op_event) {
- return;
+ return false;
}
assert(m_op_tid != 0);
// ops will be canceled / completed before closing journal
assert(image_ctx.journal->is_journal_ready());
- image_ctx.journal->commit_op_event(m_op_tid, r);
+ image_ctx.journal->commit_op_event(m_op_tid, r,
+ new C_CommitOpEvent(this, r));
+ return true;
+ }
+ return false;
+}
+
+template <typename I>
+void Request<I>::handle_commit_op_event(int r, int original_ret_val) {
+ I &image_ctx = this->m_image_ctx;
+ CephContext *cct = image_ctx.cct;
+ ldout(cct, 10) << this << " " << __func__ << ": r=" << r << dendl;
+
+ if (r < 0) {
+ lderr(cct) << "failed to commit op event to journal: " << cpp_strerror(r)
+ << dendl;
+ }
+ if (original_ret_val < 0) {
+ r = original_ret_val;
}
+ finish(r);
}
template <typename I>
m_op_tid = image_ctx.journal->allocate_op_tid();
image_ctx.journal->append_op_event(
m_op_tid, journal::EventEntry{create_event(m_op_tid)},
- new C_OpEventSafe(this, on_safe));
+ new C_AppendOpEvent(this, on_safe));
}
template <typename I>
}
bool append_op_event();
- void commit_op_event(int r);
// NOTE: temporary until converted to new state machine format
- Context *create_context_finisher() {
- return util::create_context_callback<
- Request<ImageCtxT>, &Request<ImageCtxT>::finish>(this);
- }
+ Context *create_context_finisher(int r);
+ virtual void finish_and_destroy(int r) override;
private:
- struct C_OpEventSafe : public Context {
+ struct C_AppendOpEvent : public Context {
Request *request;
Context *on_safe;
- C_OpEventSafe(Request *request, Context *on_safe)
+ C_AppendOpEvent(Request *request, Context *on_safe)
: request(request), on_safe(on_safe) {
}
virtual void finish(int r) override {
}
};
+ struct C_CommitOpEvent : public Context {
+ Request *request;
+ int ret_val;
+ C_CommitOpEvent(Request *request, int ret_val)
+ : request(request), ret_val(ret_val) {
+ }
+ virtual void finish(int r) override {
+ request->handle_commit_op_event(r, ret_val);
+ delete request;
+ }
+ };
+
uint64_t m_op_tid = 0;
bool m_appended_op_event = false;
bool m_committed_op_event = false;
void append_op_event(Context *on_safe);
void handle_op_event_safe(int r);
+ bool commit_op_event(int r);
+ void handle_commit_op_event(int r, int original_ret_val);
+
};
} // namespace operation
if (*result < 0) {
lderr(cct) << "failed to block writes: " << cpp_strerror(*result) << dendl;
image_ctx.aio_work_queue->unblock_writes();
- return this->create_context_finisher();
+ return this->create_context_finisher(*result);
}
return send_append_op_event();
lderr(cct) << "failed to commit journal entry: " << cpp_strerror(*result)
<< dendl;
image_ctx.aio_work_queue->unblock_writes();
- return this->create_context_finisher();
+ return this->create_context_finisher(*result);
}
return send_grow_object_map();
if (*result == -ERESTART) {
ldout(cct, 5) << "resize operation interrupted" << dendl;
- return this->create_context_finisher();
+ return this->create_context_finisher(*result);
} else if (*result < 0) {
lderr(cct) << "failed to trim image: " << cpp_strerror(*result) << dendl;
- return this->create_context_finisher();
+ return this->create_context_finisher(*result);
}
send_invalidate_cache();
if (*result < 0) {
lderr(cct) << "failed to invalidate cache: " << cpp_strerror(*result)
<< dendl;
- return this->create_context_finisher();
+ return this->create_context_finisher(*result);
}
send_post_block_writes();
image_ctx.aio_work_queue->unblock_writes();
if (m_original_size == m_new_size) {
- if (!m_disable_journal) {
- this->commit_op_event(0);
- }
- return this->create_context_finisher();
+ return this->create_context_finisher(0);
} else if (m_new_size < m_original_size) {
send_trim_image();
return nullptr;
image_ctx.owner_lock.put_read();
update_size_and_overlap();
- return this->create_context_finisher();
+ return this->create_context_finisher(0);
}
CephContext *cct = image_ctx.cct;
update_size_and_overlap();
assert(*result == 0);
- return this->create_context_finisher();
+ return this->create_context_finisher(0);
}
template <typename I>
image_ctx.aio_work_queue->unblock_writes();
lderr(cct) << "failed to block writes prior to header update: "
<< cpp_strerror(*result) << dendl;
- return this->create_context_finisher();
+ return this->create_context_finisher(*result);
}
send_update_header();
lderr(cct) << "failed to update image header: " << cpp_strerror(*result)
<< dendl;
image_ctx.aio_work_queue->unblock_writes();
- return this->create_context_finisher();
+ return this->create_context_finisher(*result);
}
- if (!m_disable_journal) {
- this->commit_op_event(0);
- }
return send_shrink_object_map();
}
if (*result < 0) {
lderr(cct) << "failed to block writes: " << cpp_strerror(*result) << dendl;
image_ctx.aio_work_queue->unblock_writes();
- return this->create_context_finisher();
+ return this->create_context_finisher(*result);
}
send_append_op_event();
image_ctx.aio_work_queue->unblock_writes();
lderr(cct) << "failed to commit journal entry: " << cpp_strerror(*result)
<< dendl;
- return this->create_context_finisher();
+ return this->create_context_finisher(*result);
}
send_allocate_snap_id();
if (*result < 0) {
save_result(result);
- finalize(*result);
+ image_ctx.aio_work_queue->unblock_writes();
lderr(cct) << "failed to allocate snapshot id: " << cpp_strerror(*result)
<< dendl;
- return this->create_context_finisher();
+ return this->create_context_finisher(*result);
}
send_create_snap();
if (image_ctx.object_map == nullptr || m_skip_object_map) {
image_ctx.snap_lock.put_read();
- finalize(0);
- return this->create_context_finisher();
+ image_ctx.aio_work_queue->unblock_writes();
+ return this->create_context_finisher(0);
}
CephContext *cct = image_ctx.cct;
assert(*result == 0);
- finalize(0);
- return this->create_context_finisher();
+ image_ctx.aio_work_queue->unblock_writes();
+ return this->create_context_finisher(0);
}
template <typename I>
assert(m_ret_val < 0);
*result = m_ret_val;
- finalize(m_ret_val);
- return this->create_context_finisher();
-}
-
-template <typename I>
-void SnapshotCreateRequest<I>::finalize(int r) {
- I &image_ctx = this->m_image_ctx;
- CephContext *cct = image_ctx.cct;
- ldout(cct, 5) << this << " " << __func__ << ": r=" << r << dendl;
-
- if (r == 0) {
- this->commit_op_event(0);
- }
image_ctx.aio_work_queue->unblock_writes();
+ return this->create_context_finisher(m_ret_val);
}
template <typename I>
void send_release_snap_id();
Context *handle_release_snap_id(int *result);
- void finalize(int r);
void update_snap_context();
void save_result(int *result) {
if (*result < 0) {
lderr(cct) << "failed to block writes: " << cpp_strerror(*result) << dendl;
- return this->create_context_finisher();
+ return this->create_context_finisher(*result);
}
send_resize_image();
if (*result < 0) {
lderr(cct) << "failed to resize image for rollback: "
<< cpp_strerror(*result) << dendl;
- return this->create_context_finisher();
+ return this->create_context_finisher(*result);
}
send_rollback_object_map();
if (*result == -ERESTART) {
ldout(cct, 5) << "snapshot rollback operation interrupted" << dendl;
- return this->create_context_finisher();
+ return this->create_context_finisher(*result);
} else if (*result < 0) {
lderr(cct) << "failed to rollback objects: " << cpp_strerror(*result)
<< dendl;
- return this->create_context_finisher();
+ return this->create_context_finisher(*result);
}
return send_refresh_object_map();
apply();
if (image_ctx.object_cacher == NULL) {
- return this->create_context_finisher();
+ return this->create_context_finisher(0);
}
CephContext *cct = image_ctx.cct;
lderr(cct) << "failed to invalidate cache: " << cpp_strerror(*result)
<< dendl;
}
- return this->create_context_finisher();
+ return this->create_context_finisher(*result);
}
template <typename I>
append_op_event_mock(op_tid, event_entry, on_safe);
}
- MOCK_METHOD2(commit_op_event, void(uint64_t, int));
+ MOCK_METHOD3(commit_op_event, void(uint64_t, int, Context *));
MOCK_METHOD2(replay_op_ready, void(uint64_t, Context *));
MOCK_METHOD2(add_listener, void(journal::ListenerType,
expect_grow_object_map(mock_image_ctx);
expect_block_writes(mock_image_ctx, 0);
expect_update_header(mock_image_ctx, 0);
- expect_commit_op_event(mock_image_ctx, 0);
expect_unblock_writes(mock_image_ctx);
+ expect_commit_op_event(mock_image_ctx, 0);
ASSERT_EQ(0, when_resize(mock_image_ctx, ictx->size * 2, 0, false));
}
expect_invalidate_cache(mock_image_ctx, 0);
expect_block_writes(mock_image_ctx, 0);
expect_update_header(mock_image_ctx, 0);
- expect_commit_op_event(mock_image_ctx, 0);
expect_shrink_object_map(mock_image_ctx);
expect_unblock_writes(mock_image_ctx);
+ expect_commit_op_event(mock_image_ctx, 0);
ASSERT_EQ(0, when_resize(mock_image_ctx, ictx->size / 2, 0, false));
}
+TEST_F(TestMockOperationResizeRequest, ShrinkError) {
+ librbd::ImageCtx *ictx;
+ ASSERT_EQ(0, open_image(m_image_name, &ictx));
+
+ MockImageCtx mock_image_ctx(*ictx);
+ MockExclusiveLock mock_exclusive_lock;
+ MockJournal mock_journal;
+ MockObjectMap mock_object_map;
+ initialize_features(ictx, mock_image_ctx, mock_exclusive_lock, mock_journal,
+ mock_object_map);
+
+ InSequence seq;
+ expect_block_writes(mock_image_ctx, -EINVAL);
+ expect_unblock_writes(mock_image_ctx);
+ ASSERT_EQ(-EINVAL, when_resize(mock_image_ctx, ictx->size / 2, 0, false));
+}
+
TEST_F(TestMockOperationResizeRequest, PreBlockWritesError) {
librbd::ImageCtx *ictx;
ASSERT_EQ(0, open_image(m_image_name, &ictx));
if (mock_image_ctx.journal != nullptr) {
expect_is_journal_replaying(*mock_image_ctx.journal);
expect_is_journal_ready(*mock_image_ctx.journal);
- EXPECT_CALL(*mock_image_ctx.journal, commit_op_event(1U, r));
+ EXPECT_CALL(*mock_image_ctx.journal, commit_op_event(1U, r, _))
+ .WillOnce(WithArg<2>(CompleteContext(r, mock_image_ctx.image_ctx->op_work_queue)));
}
}