ldout(cct, 21) << "tick" << dendl;
tick_event = timer.add_event_after(
cct->_conf->client_tick_interval,
- new FunctionContext([this](int) {
+ new LambdaContext([this](int) {
// Called back via Timer, which takes client_lock for us
ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
tick();
}
void TokenBucketThrottle::schedule_timer() {
- m_token_ctx = new FunctionContext(
+ m_token_ctx = new LambdaContext(
[this](int r) {
schedule_timer();
});
uint64_t m_burst = 0;
SafeTimer *m_timer;
ceph::mutex *m_timer_lock;
- FunctionContext *m_token_ctx = nullptr;
+ Context *m_token_ctx = nullptr;
std::list<Blocker> m_blockers;
ceph::mutex m_lock;
template <typename T, typename I, void(T::*MF)(int, I*, uint64_t)>
void add_blocker(uint64_t c, T *handler, I *item, uint64_t flag) {
- Context *ctx = new FunctionContext([handler, item, flag](int r) {
+ Context *ctx = new LambdaContext([handler, item, flag](int r) {
(handler->*MF)(r, item, flag);
});
m_blockers.emplace_back(c, ctx);
void start_flush_on_transaction(
ObjectStore::Transaction &t) final {
t.register_on_commit(
- new LambdaContext([this](){
+ new LambdaContext([this](int r){
peering_state.complete_flush();
}));
}
PGPeeringEventRef on_commit) final {
t.register_on_commit(
new LambdaContext(
- [this, on_commit=std::move(on_commit)] {
+ [this, on_commit=std::move(on_commit)](int r){
shard_services.start_operation<LocalPeeringEvent>(
this,
shard_services,
typedef std::shared_ptr<RunOnDelete> RunOnDeleteRef;
template <typename T>
-struct LambdaContext : public Context {
- T t;
+class LambdaContext : public Context {
+public:
LambdaContext(T &&t) : t(std::forward<T>(t)) {}
- void finish(int) override {
- t();
+ void finish(int r) override {
+ t(r);
}
+private:
+ T t;
};
+
template <typename T>
LambdaContext<T> *make_lambda_context(T &&t) {
return new LambdaContext<T>(std::move(t));
typedef C_GatherBase<Context, Context> C_Gather;
typedef C_GatherBuilderBase<Context, C_Gather > C_GatherBuilder;
-class FunctionContext : public Context {
-public:
- FunctionContext(boost::function<void(int)> &&callback)
- : m_callback(std::move(callback))
- {
- }
-
- void finish(int r) override {
- m_callback(r);
- }
-private:
- boost::function<void(int)> m_callback;
-};
-
template <class ContextType>
class ContextFactory {
public:
on_finish = utils::create_async_context_callback(
this, on_finish);
on_finish = new C_ImmutableMetadata(this, on_finish);
- on_finish = new FunctionContext([this, on_finish](int r) {
+ on_finish = new LambdaContext([this, on_finish](int r) {
if (r < 0) {
lderr(m_cct) << __func__ << ": failed to watch journal"
<< cpp_strerror(r) << dendl;
// chain the shut down sequence (reverse order)
on_finish = utils::create_async_context_callback(
this, on_finish);
- on_finish = new FunctionContext([this, on_finish](int r) {
+ on_finish = new LambdaContext([this, on_finish](int r) {
ldout(m_cct, 20) << "shut_down: waiting for ops" << dendl;
m_async_op_tracker.wait_for_ops(on_finish);
});
- on_finish = new FunctionContext([this, on_finish](int r) {
+ on_finish = new LambdaContext([this, on_finish](int r) {
ldout(m_cct, 20) << "shut_down: flushing watch" << dendl;
librados::Rados rados(m_ioctx);
librados::AioCompletion *comp = librados::Rados::aio_create_completion(
ceph_assert(r == 0);
comp->release();
});
- on_finish = new FunctionContext([this, on_finish](int r) {
+ on_finish = new LambdaContext([this, on_finish](int r) {
flush_commit_position(on_finish);
});
if (watch_handle != 0) {
m_async_op_tracker.start_op();
++m_flush_commits_in_progress;
- Context* ctx = new FunctionContext([this, commit_position_ctx](int r) {
+ Context* ctx = new LambdaContext([this, commit_position_ctx](int r) {
Contexts flush_commit_position_ctxs;
m_lock.lock();
ceph_assert(m_flush_commits_in_progress > 0);
m_async_op_tracker.finish_op();
});
ctx = new C_NotifyUpdate(this, ctx);
- ctx = new FunctionContext([this, ctx](int r) {
+ ctx = new LambdaContext([this, ctx](int r) {
// manually kick of a refresh in case the notification is missed
// and ignore the next notification that we are about to send
m_lock.lock();
refresh(ctx);
});
- ctx = new FunctionContext([this, ctx](int r) {
+ ctx = new LambdaContext([this, ctx](int r) {
schedule_laggy_clients_disconnect(ctx);
});
ldout(m_cct, 1) << __func__ << ": " << client_id
<< ": scheduling disconnect" << dendl;
- ctx = new FunctionContext([this, client_id, ctx](int r1) {
+ ctx = new LambdaContext([this, client_id, ctx](int r1) {
ldout(m_cct, 10) << __func__ << ": " << client_id
<< ": flagging disconnected" << dendl;
<< *m_active_tag_tid << dendl;
m_async_op_tracker.start_op();
- FunctionContext *ctx = new FunctionContext([this](int r) {
+ auto ctx = new LambdaContext([this](int r) {
handle_watch_assert_active(r);
});
m_journal_metadata->assert_active_tag(*m_active_tag_tid, ctx);
m_state = STATE_INIT;
if (m_max_fetch_bytes >= min_bytes) {
m_async_op_tracker.start_op();
- auto ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[this](int r) {
prefetch();
m_async_op_tracker.finish_op();
}
void JournalRecorder::shut_down(Context *on_safe) {
- on_safe = new FunctionContext(
+ on_safe = new LambdaContext(
[this, on_safe](int r) {
Context *ctx = nullptr;
{
std::lock_guard locker{m_lock};
if (m_in_flight_advance_sets != 0) {
ceph_assert(m_on_object_set_advanced == nullptr);
- m_on_object_set_advanced = new FunctionContext(
+ m_on_object_set_advanced = new LambdaContext(
[on_safe, r](int) {
on_safe->complete(r);
});
m_journal_metadata->remove_listener(&m_metadata_listener);
// chain the shut down sequence (reverse order)
- on_finish = new FunctionContext([this, on_finish](int r) {
+ on_finish = new LambdaContext([this, on_finish](int r) {
m_async_op_tracker.wait_for_ops(on_finish);
});
m_journal_metadata->flush_commit_position(on_finish);
void JournalTrimmer::remove_objects(bool force, Context *on_finish) {
ldout(m_cct, 20) << __func__ << dendl;
- on_finish = new FunctionContext([this, force, on_finish](int r) {
+ on_finish = new LambdaContext([this, force, on_finish](int r) {
std::lock_guard locker{m_lock};
if (m_remove_set_pending) {
std::swap(metadata, m_metadata);
ceph_assert(metadata != nullptr);
- on_finish = new FunctionContext([metadata, on_finish](int r) {
+ on_finish = new LambdaContext([metadata, on_finish](int r) {
metadata->put();
on_finish->complete(0);
});
return;
}
- on_finish = new FunctionContext([trimmer, metadata, on_finish](int r) {
+ on_finish = new LambdaContext([trimmer, metadata, on_finish](int r) {
delete trimmer;
metadata->shut_down(on_finish);
});
void Journaler::remove(bool force, Context *on_finish) {
// chain journal removal (reverse order)
- on_finish = new FunctionContext([this, on_finish](int r) {
+ on_finish = new LambdaContext([this, on_finish](int r) {
librados::AioCompletion *comp = librados::Rados::aio_create_completion(
on_finish, nullptr, utils::rados_ctx_callback);
r = m_header_ioctx.aio_remove(m_header_oid, comp);
comp->release();
});
- on_finish = new FunctionContext([this, force, on_finish](int r) {
+ on_finish = new LambdaContext([this, force, on_finish](int r) {
m_trimmer->remove_objects(force, on_finish);
});
std::swap(player, m_player);
ceph_assert(player != nullptr);
- on_finish = new FunctionContext([player, on_finish](int r) {
+ auto f = [player, on_finish](int r) {
delete player;
on_finish->complete(r);
- });
+ };
+ on_finish = new LambdaContext(std::move(f));
player->shut_down(on_finish);
}
std::swap(recorder, m_recorder);
ceph_assert(recorder != nullptr);
- on_safe = new FunctionContext([recorder, on_safe](int r) {
+ on_safe = new LambdaContext([recorder, on_safe](int r) {
delete recorder;
on_safe->complete(r);
});
ceph_assert(m_watch_task == nullptr);
m_watch_task = m_timer.add_event_after(
m_watch_interval,
- new FunctionContext([this](int) {
+ new LambdaContext([this](int) {
handle_watch_task();
}));
}
if (future.is_valid()) {
// cannot be invoked while the same lock context
- m_op_work_queue->queue(new FunctionContext(
+ m_op_work_queue->queue(new LambdaContext(
[future, on_safe] (int r) mutable {
future.flush(on_safe);
}));
}
// rollback the object map (copy snapshot object map to HEAD)
- auto ctx = new FunctionContext([this, finish_op_ctx](int r) {
+ auto ctx = new LambdaContext([this, finish_op_ctx](int r) {
handle_copy_object_map(r);
finish_op_ctx->complete(0);
});
ldout(m_cct, 20) << dendl;
- auto ctx = new FunctionContext([this, finish_op_ctx](int r) {
+ auto ctx = new LambdaContext([this, finish_op_ctx](int r) {
handle_refresh_object_map(r);
finish_op_ctx->complete(0);
});
}
m_async_op_tracker.start_op();
- return new FunctionContext([this](int r) {
+ return new LambdaContext([this](int r) {
m_async_op_tracker.finish_op();
});
}
PreAcquireRequest<I> *req = PreAcquireRequest<I>::create(m_image_ctx,
on_finish);
- m_image_ctx.op_work_queue->queue(new FunctionContext([req](int r) {
+ m_image_ctx.op_work_queue->queue(new LambdaContext([req](int r) {
req->send();
}));
}
util::create_context_callback<EL, &EL::handle_post_acquiring_lock>(this),
util::create_context_callback<EL, &EL::handle_post_acquired_lock>(this));
- m_image_ctx.op_work_queue->queue(new FunctionContext([req](int r) {
+ m_image_ctx.op_work_queue->queue(new LambdaContext([req](int r) {
req->send();
}));
}
PreReleaseRequest<I> *req = PreReleaseRequest<I>::create(
m_image_ctx, shutting_down, m_async_op_tracker, on_finish);
- m_image_ctx.op_work_queue->queue(new FunctionContext([req](int r) {
+ m_image_ctx.op_work_queue->queue(new LambdaContext([req](int r) {
req->send();
}));
}
{
std::lock_guard locker{m_lock};
if (!m_in_flight.empty()) {
- Context *ctx = new FunctionContext(
+ Context *ctx = new LambdaContext(
[this, on_finish](int r) {
ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__
<< ": completing flush" << dendl;
m_in_flight.insert(handle);
- Context *ctx = new FunctionContext(
+ Context *ctx = new LambdaContext(
[this, handle, watcher](int r) {
handle_notify(handle, watcher);
});
cancel_async_requests();
- FunctionContext *ctx = new FunctionContext([this, on_finish](int r) {
+ auto ctx = new LambdaContext([this, on_finish](int r) {
m_task_finisher->cancel_all(on_finish);
});
Watcher::unregister_watch(ctx);
CephContext *cct = m_image_ctx.cct;
ldout(cct, 10) << this << " " << __func__ << dendl;
- on_finish = new FunctionContext([this, on_finish](int r) {
+ on_finish = new LambdaContext([this, on_finish](int r) {
cancel_async_requests();
on_finish->complete(r);
});
template <typename I>
void ImageWatcher<I>::schedule_async_progress(const AsyncRequestId &request,
uint64_t offset, uint64_t total) {
- FunctionContext *ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
boost::bind(&ImageWatcher<I>::notify_async_progress, this, request, offset,
total));
m_task_finisher->queue(Task(TASK_CODE_ASYNC_PROGRESS, request), ctx);
template <typename I>
void ImageWatcher<I>::schedule_async_complete(const AsyncRequestId &request,
int r) {
- FunctionContext *ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
boost::bind(&ImageWatcher<I>::notify_async_complete, this, request, r));
m_task_finisher->queue(ctx);
}
<< request << " = " << r << dendl;
send_notify(AsyncCompletePayload(request, r),
- new FunctionContext(boost::bind(&ImageWatcher<I>::handle_async_complete,
+ new LambdaContext(boost::bind(&ImageWatcher<I>::handle_async_complete,
this, request, r, _1)));
}
template <typename I>
void ImageWatcher<I>::schedule_cancel_async_requests() {
- FunctionContext *ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
boost::bind(&ImageWatcher<I>::cancel_async_requests, this));
m_task_finisher->queue(TASK_CODE_CANCEL_ASYNC_REQUESTS, ctx);
}
if (this->is_registered(this->m_watch_lock)) {
ldout(m_image_ctx.cct, 15) << this << " requesting exclusive lock" << dendl;
- FunctionContext *ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
boost::bind(&ImageWatcher<I>::notify_request_lock, this));
if (use_timer) {
if (timer_delay < 0) {
ldout(m_image_ctx.cct, 20) << "scheduling async request time out: " << id
<< dendl;
- Context *ctx = new FunctionContext(boost::bind(
+ Context *ctx = new LambdaContext(boost::bind(
&ImageWatcher<I>::async_request_timed_out, this, id));
Task task(TASK_CODE_ASYNC_REQUEST, id);
ldout(m_image_ctx.cct, 10) << this << " async request: " << async_request_id
<< dendl;
- Context *on_notify = new FunctionContext([this, async_request_id](int r) {
+ Context *on_notify = new LambdaContext([this, async_request_id](int r) {
if (r < 0) {
// notification failed -- don't expect updates
Context *on_complete = remove_async_request(async_request_id);
}
});
- Context *on_complete = new FunctionContext(
+ Context *on_complete = new LambdaContext(
[this, async_request_id, on_finish](int r) {
m_task_finisher->cancel(Task(TASK_CODE_ASYNC_REQUEST, async_request_id));
on_finish->complete(r);
Journaler *journaler = this->journaler;
Context *on_finish = this->on_finish;
- FunctionContext *ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[journaler, on_finish](int r) {
on_finish->complete(r);
delete journaler;
void send_get_client() {
ldout(cct, 20) << __func__ << dendl;
- FunctionContext *ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[this](int r) {
handle_get_client(r);
});
void send_get_tags() {
ldout(cct, 20) << __func__ << dendl;
- FunctionContext *ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[this](int r) {
handle_get_tags(r);
});
CephContext *cct = m_image_ctx.cct;
ldout(cct, 20) << this << " " << __func__ << dendl;
- on_finish = new FunctionContext([this, on_finish](int r) {
+ on_finish = new LambdaContext([this, on_finish](int r) {
// remove our handler from object dispatcher chain - preserve error
- auto ctx = new FunctionContext([on_finish, r](int _) {
+ auto ctx = new LambdaContext([on_finish, r](int _) {
on_finish->complete(r);
});
m_image_ctx.io_object_dispatcher->shut_down_object_dispatch(
}
on_safe = create_async_context_callback(m_image_ctx, on_safe);
- on_safe = new FunctionContext([this, on_safe](int r) {
+ on_safe = new LambdaContext([this, on_safe](int r) {
// ensure all committed IO before this op is committed
m_journaler->flush_commit_position(on_safe);
});
ceph_assert(m_journal_replay == nullptr);
on_start = util::create_async_context_callback(m_image_ctx, on_start);
- on_start = new FunctionContext(
+ on_start = new LambdaContext(
[this, journal_replay, on_start](int r) {
handle_start_external_replay(r, journal_replay, on_start);
});
Context *ctx = create_async_context_callback(
m_image_ctx, create_context_callback<
Journal<I>, &Journal<I>::handle_journal_destroyed>(this));
- ctx = new FunctionContext(
+ ctx = new LambdaContext(
[this, ctx](int r) {
std::lock_guard locker{m_lock};
m_journaler->shut_down(ctx);
}
}
- Context *ctx = new FunctionContext([this, cct](int r) {
+ Context *ctx = new LambdaContext([this, cct](int r) {
ldout(cct, 20) << this << " handle_replay_complete: "
<< "handle shut down replay" << dendl;
handle_flushing_replay();
}
});
- ctx = new FunctionContext([this, ctx](int r) {
+ ctx = new LambdaContext([this, ctx](int r) {
// ensure the commit position is flushed to disk
m_journaler->flush_commit_position(ctx);
});
- ctx = new FunctionContext([this, cct, cancel_ops, ctx](int r) {
+ ctx = new LambdaContext([this, cct, cancel_ops, ctx](int r) {
ldout(cct, 20) << this << " handle_replay_complete: "
<< "shut down replay" << dendl;
m_journal_replay->shut_down(cancel_ops, ctx);
// stop replay, shut down, and restart
Context* ctx = create_context_callback<
Journal<I>, &Journal<I>::handle_flushing_restart>(this);
- ctx = new FunctionContext([this, ctx](int r) {
+ ctx = new LambdaContext([this, ctx](int r) {
// ensure the commit position is flushed to disk
m_journaler->flush_commit_position(ctx);
});
- ctx = new FunctionContext([this, cct, ctx](int r) {
+ ctx = new LambdaContext([this, cct, ctx](int r) {
ldout(cct, 20) << this << " handle_replay_process_safe: "
<< "shut down replay" << dendl;
{
// pull the most recent tags from the journal, decode, and
// update the internal tag state
C_RefreshTags *refresh_ctx = new C_RefreshTags(m_async_journal_op_tracker);
- refresh_ctx->on_finish = new FunctionContext(
+ refresh_ctx->on_finish = new LambdaContext(
[this, refresh_sequence, refresh_ctx](int r) {
handle_refresh_metadata(refresh_sequence, refresh_ctx->tag_tid,
refresh_ctx->tag_data, r);
MetadataListener(Journal<ImageCtxT> *journal) : journal(journal) { }
void handle_update(::journal::JournalMetadata *) override {
- FunctionContext *ctx = new FunctionContext([this](int r) {
+ auto ctx = new LambdaContext([this](int r) {
journal->handle_metadata_updated();
});
journal->m_work_queue->queue(ctx, 0);
m_state = STATE_ACQUIRING;
m_cookie = encode_lock_cookie(watch_handle);
- m_work_queue->queue(new FunctionContext([this](int r) {
+ m_work_queue->queue(new LambdaContext([this](int r) {
pre_acquire_lock_handler(create_context_callback<
ManagedLock<I>, &ManagedLock<I>::handle_pre_acquire_lock>(this));
}));
m_post_next_state = (r < 0 ? STATE_UNLOCKED : STATE_LOCKED);
- m_work_queue->queue(new FunctionContext([this, r](int ret) {
+ m_work_queue->queue(new LambdaContext([this, r](int ret) {
post_acquire_lock_handler(r, create_context_callback<
ManagedLock<I>, &ManagedLock<I>::handle_post_acquire_lock>(this));
}));
using managed_lock::ReleaseRequest;
ReleaseRequest<I>* req = ReleaseRequest<I>::create(m_ioctx, m_watcher,
m_work_queue, m_oid, m_cookie,
- new FunctionContext([this, r](int ret) {
+ new LambdaContext([this, r](int ret) {
std::lock_guard locker{m_lock};
ceph_assert(ret == 0);
complete_active_action(STATE_UNLOCKED, r);
auto ctx = create_context_callback<
ManagedLock, &ManagedLock<I>::handle_reacquire_lock>(this);
- ctx = new FunctionContext([this, ctx](int r) {
+ ctx = new LambdaContext([this, ctx](int r) {
post_reacquire_lock_handler(r, ctx);
});
ldout(m_cct, 10) << dendl;
m_state = STATE_PRE_RELEASING;
- m_work_queue->queue(new FunctionContext([this](int r) {
+ m_work_queue->queue(new LambdaContext([this](int r) {
pre_release_lock_handler(false, create_context_callback<
ManagedLock<I>, &ManagedLock<I>::handle_pre_release_lock>(this));
}));
m_post_next_state = STATE_LOCKED;
}
- m_work_queue->queue(new FunctionContext([this, r](int ret) {
+ m_work_queue->queue(new LambdaContext([this, r](int ret) {
post_release_lock_handler(false, r, create_context_callback<
ManagedLock<I>, &ManagedLock<I>::handle_post_release_lock>(this));
}));
ceph_assert(ceph_mutex_is_locked(m_lock));
if (m_state == STATE_UNLOCKED) {
m_state = STATE_SHUTTING_DOWN;
- m_work_queue->queue(new FunctionContext([this](int r) {
+ m_work_queue->queue(new LambdaContext([this](int r) {
shutdown_handler(r, create_context_callback<
ManagedLock<I>, &ManagedLock<I>::handle_shutdown>(this));
}));
std::lock_guard locker{m_lock};
- m_work_queue->queue(new FunctionContext([this](int r) {
+ m_work_queue->queue(new LambdaContext([this](int r) {
pre_release_lock_handler(true, create_context_callback<
ManagedLock<I>, &ManagedLock<I>::handle_shutdown_pre_release>(this));
}));
using managed_lock::ReleaseRequest;
ReleaseRequest<I>* req = ReleaseRequest<I>::create(m_ioctx, m_watcher,
m_work_queue, m_oid, cookie,
- new FunctionContext([this, r](int l) {
+ new LambdaContext([this, r](int l) {
int rst = r < 0 ? r : l;
post_release_lock_handler(true, rst, create_context_callback<
ManagedLock<I>, &ManagedLock<I>::handle_shutdown_post_release>(this));
void ManagedLock<I>::wait_for_tracked_ops(int r) {
ldout(m_cct, 10) << "r=" << r << dendl;
- Context *ctx = new FunctionContext([this, r](int ret) {
+ Context *ctx = new LambdaContext([this, r](int ret) {
complete_shutdown(r);
});
ldout(cct, 20) << "in-flight update cell: " << cell << dendl;
Context *on_finish = op.on_finish;
- Context *ctx = new FunctionContext([this, cell, on_finish](int r) {
+ Context *ctx = new LambdaContext([this, cell, on_finish](int r) {
handle_detained_aio_update(cell, r, on_finish);
});
aio_update(CEPH_NOSNAP, op.start_object_no, op.end_object_no, op.new_state,
if (m_image_ctx.old_format) {
// unregister watch before and register back after rename
on_finish = new C_NotifyUpdate<I>(m_image_ctx, on_finish);
- on_finish = new FunctionContext([this, on_finish](int r) {
+ on_finish = new LambdaContext([this, on_finish](int r) {
if (m_image_ctx.old_format) {
m_image_ctx.image_watcher->set_oid(m_image_ctx.header_oid);
}
m_image_ctx.image_watcher->register_watch(on_finish);
});
- on_finish = new FunctionContext([this, dest_name, on_finish](int r) {
+ on_finish = new LambdaContext([this, dest_name, on_finish](int r) {
std::shared_lock owner_locker{m_image_ctx.owner_lock};
operation::RenameRequest<I> *req = new operation::RenameRequest<I>(
m_image_ctx, on_finish, dest_name);
<< dendl;
ceph_assert(m_unregister_watch_ctx == nullptr);
- m_unregister_watch_ctx = new FunctionContext([this, on_finish](int r) {
+ m_unregister_watch_ctx = new LambdaContext([this, on_finish](int r) {
unregister_watch(on_finish);
});
return;
m_watch_blacklisted = true;
}
- FunctionContext *ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
boost::bind(&Watcher::rewatch, this));
m_work_queue->queue(ctx);
}
// chain shut down in reverse order
// shut down the cache
- on_finish = new FunctionContext([this, on_finish](int r) {
+ on_finish = new LambdaContext([this, on_finish](int r) {
m_object_cacher->stop();
on_finish->complete(r);
});
// discard the cache state after changes are committed to disk (and to
// prevent races w/ readahead)
auto ctx = *on_finish;
- *on_finish = new FunctionContext(
+ *on_finish = new LambdaContext(
[this, object_extents, ctx](int r) {
m_cache_lock.lock();
m_object_cacher->discard_set(m_object_set, object_extents);
return;
}
- Context* create_session_ctx = new FunctionContext([this, on_finish](int ret) {
+ Context* create_session_ctx = new LambdaContext([this, on_finish](int ret) {
m_connecting.store(false);
if (on_finish != nullptr) {
on_finish->complete(ret);
* So, we need to check if session is normal again. If session work,
* we need set m_connecting to false. */
if (!m_cache_client->is_session_work()) {
- Context* on_finish = new FunctionContext([this](int ret) {
+ Context* on_finish = new LambdaContext([this](int ret) {
m_connecting.store(false);
});
create_cache_session(on_finish, true);
auto cct = m_image_ctx->cct;
ldout(cct, 20) << dendl;
- Context* register_ctx = new FunctionContext([this, cct, on_finish](int ret) {
+ Context* register_ctx = new LambdaContext([this, cct, on_finish](int ret) {
if (ret < 0) {
lderr(cct) << "Parent cache fail to register client." << dendl;
} else {
on_finish->complete(ret);
});
- Context* connect_ctx = new FunctionContext(
+ Context* connect_ctx = new LambdaContext(
[this, cct, register_ctx](int ret) {
if (ret < 0) {
lderr(cct) << "Parent cache fail to connect RO daeomn." << dendl;
auto ctx = util::create_async_context_callback(*m_image_ctx, *on_finish);
*dispatch_result = io::DISPATCH_RESULT_CONTINUE;
- *on_finish = new FunctionContext([this, tid](int r) {
+ *on_finish = new LambdaContext([this, tid](int r) {
handle_in_flight_flush_complete(r, tid);
});
auto ctx = util::create_async_context_callback(*m_image_ctx, *on_finish);
*dispatch_result = io::DISPATCH_RESULT_CONTINUE;
- *on_finish = new FunctionContext(
+ *on_finish = new LambdaContext(
[this, tid, object_no, object_off, object_len](int r) {
handle_in_flight_io_complete(r, tid, object_no, object_off, object_len);
});
++m_current_ops;
- Context *ctx = new FunctionContext(
+ Context *ctx = new LambdaContext(
[this, ono](int r) {
handle_object_copy(ono, r);
});
return;
}
- auto ctx = new FunctionContext([this, finish_op_ctx](int r) {
+ auto ctx = new LambdaContext([this, finish_op_ctx](int r) {
handle_write_object(r);
finish_op_ctx->complete(0);
});
return;
}
- auto ctx = new FunctionContext([this, finish_op_ctx](int r) {
+ auto ctx = new LambdaContext([this, finish_op_ctx](int r) {
handle_update_object_map(r);
finish_op_ctx->complete(0);
});
int* r) {
ceph_assert(ceph_mutex_is_locked(m_dst_image_ctx->owner_lock));
if (m_dst_image_ctx->exclusive_lock == nullptr) {
- return new FunctionContext([](int r) {});
+ return new LambdaContext([](int r) {});
}
return m_dst_image_ctx->exclusive_lock->start_op(r);
}
return;
}
- auto ctx = new FunctionContext([this, finish_op_ctx](int r) {
+ auto ctx = new LambdaContext([this, finish_op_ctx](int r) {
handle_set_size(r);
finish_op_ctx->complete(0);
});
return;
}
- auto ctx = new FunctionContext([this, finish_op_ctx](int r) {
+ auto ctx = new LambdaContext([this, finish_op_ctx](int r) {
handle_detach_parent(r);
finish_op_ctx->complete(0);
});
return;
}
- auto ctx = new FunctionContext([this, finish_op_ctx](int r) {
+ auto ctx = new LambdaContext([this, finish_op_ctx](int r) {
handle_attach_parent(r);
finish_op_ctx->complete(0);
});
Context *SetHeadRequest<I>::start_lock_op(int* r) {
std::shared_lock owner_locker{m_image_ctx->owner_lock};
if (m_image_ctx->exclusive_lock == nullptr) {
- return new FunctionContext([](int r) {});
+ return new LambdaContext([](int r) {});
}
return m_image_ctx->exclusive_lock->start_op(r);
}
return;
}
- auto ctx = new FunctionContext([this, finish_op_ctx](int r) {
+ auto ctx = new LambdaContext([this, finish_op_ctx](int r) {
handle_snap_unprotect(r);
finish_op_ctx->complete(0);
});
return;
}
- auto ctx = new FunctionContext([this, finish_op_ctx](int r) {
+ auto ctx = new LambdaContext([this, finish_op_ctx](int r) {
handle_snap_remove(r);
finish_op_ctx->complete(0);
});
return;
}
- auto ctx = new FunctionContext([this, finish_op_ctx](int r) {
+ auto ctx = new LambdaContext([this, finish_op_ctx](int r) {
handle_snap_create(r);
finish_op_ctx->complete(0);
});
return;
}
- auto ctx = new FunctionContext([this, finish_op_ctx](int r) {
+ auto ctx = new LambdaContext([this, finish_op_ctx](int r) {
handle_snap_protect(r);
finish_op_ctx->complete(0);
});
auto finish_op_ctx = start_lock_op(m_dst_image_ctx->owner_lock, &r);
if (finish_op_ctx != nullptr) {
- auto ctx = new FunctionContext([this, finish_op_ctx](int r) {
+ auto ctx = new LambdaContext([this, finish_op_ctx](int r) {
handle_resize_object_map(r);
finish_op_ctx->complete(0);
});
void SnapshotCopyRequest<I>::error(int r) {
ldout(m_cct, 20) << "r=" << r << dendl;
- m_work_queue->queue(new FunctionContext([this, r](int r1) { finish(r); }));
+ m_work_queue->queue(new LambdaContext([this, r](int r1) { finish(r); }));
}
template <typename I>
Context *SnapshotCopyRequest<I>::start_lock_op(ceph::shared_mutex &owner_lock, int* r) {
ceph_assert(ceph_mutex_is_locked(m_dst_image_ctx->owner_lock));
if (m_dst_image_ctx->exclusive_lock == nullptr) {
- return new FunctionContext([](int r) {});
+ return new LambdaContext([](int r) {});
}
return m_dst_image_ctx->exclusive_lock->start_op(r);
}
return;
}
- auto ctx = new FunctionContext([this, finish_op_ctx](int r) {
+ auto ctx = new LambdaContext([this, finish_op_ctx](int r) {
handle_create_snap(r);
finish_op_ctx->complete(0);
});
return;
}
- auto ctx = new FunctionContext([this, finish_op_ctx](int r) {
+ auto ctx = new LambdaContext([this, finish_op_ctx](int r) {
handle_create_object_map(r);
finish_op_ctx->complete(0);
});
Context *SnapshotCreateRequest<I>::start_lock_op(int* r) {
std::shared_lock owner_locker{m_dst_image_ctx->owner_lock};
if (m_dst_image_ctx->exclusive_lock == nullptr) {
- return new FunctionContext([](int r) {});
+ return new LambdaContext([](int r) {});
}
return m_dst_image_ctx->exclusive_lock->start_op(r);
}
void RemoveRequest<I>::remove_v1_image() {
ldout(m_cct, 20) << dendl;
- Context *ctx = new FunctionContext([this] (int r) {
+ Context *ctx = new LambdaContext([this] (int r) {
r = tmap_rm(m_ioctx, m_image_name);
handle_remove_v1_image(r);
});
// allocate a self-managed snapshot id if this a new pool to force
// self-managed snapshot mode
- auto ctx = new FunctionContext([this](int r) {
+ auto ctx = new LambdaContext([this](int r) {
r = m_io_ctx.selfmanaged_snap_create(&m_snap_id);
handle_create_snapshot(r);
});
void ValidatePoolRequest<I>::remove_snapshot() {
ldout(m_cct, 5) << dendl;
- auto ctx = new FunctionContext([this](int r) {
+ auto ctx = new LambdaContext([this](int r) {
r = m_io_ctx.selfmanaged_snap_remove(m_snap_id);
handle_remove_snapshot(r);
});
}
auto *throttle = m_throttle;
- auto *end_op_ctx = new FunctionContext([throttle](int r) {
+ auto *end_op_ctx = new LambdaContext([throttle](int r) {
throttle->end_op(r);
});
auto gather_ctx = new C_Gather(m_dest->cct, end_op_ctx);
auto object_dispatch_spec = ObjectDispatchSpec::create_flush(
&image_ctx, OBJECT_DISPATCH_LAYER_NONE, m_flush_source, journal_tid,
this->m_trace, ctx);
- ctx = new FunctionContext([object_dispatch_spec](int r) {
+ ctx = new LambdaContext([object_dispatch_spec](int r) {
object_dispatch_spec->send();
});
auto async_op_tracker = object_dispatch_meta.async_op_tracker;
Context* ctx = *on_finish;
- ctx = new FunctionContext(
+ ctx = new LambdaContext(
[object_dispatch, async_op_tracker, ctx](int r) {
delete object_dispatch;
delete async_op_tracker;
ctx->complete(r);
});
- ctx = new FunctionContext([object_dispatch, ctx](int r) {
+ ctx = new LambdaContext([object_dispatch, ctx](int r) {
object_dispatch->shut_down(ctx);
});
- *on_finish = new FunctionContext([async_op_tracker, ctx](int r) {
+ *on_finish = new LambdaContext([async_op_tracker, ctx](int r) {
async_op_tracker->wait_for_ops(ctx);
});
}
std::shared_lock image_locker{image_ctx->image_lock};
if (image_ctx->object_map != nullptr &&
!image_ctx->object_map->object_may_exist(this->m_object_no)) {
- image_ctx->op_work_queue->queue(new FunctionContext([this](int r) {
+ image_ctx->op_work_queue->queue(new LambdaContext([this](int r) {
read_parent();
}), 0);
return;
auto offset = it.first;
auto &merged_requests = it.second;
- auto ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[requests=std::move(merged_requests.requests), latency_stats,
latency_stats_lock, start_time=ceph_clock_now()](int r) {
if (latency_stats) {
auto dispatch_seq = ++m_dispatch_seq;
it->second->set_dispatch_seq(dispatch_seq);
- *on_finish = new FunctionContext(
+ *on_finish = new LambdaContext(
[this, object_no, dispatch_seq, start_time, ctx=*on_finish](int r) {
ctx->complete(r);
object_requests = m_dispatch_queue.front().get();
}
- m_timer_task = new FunctionContext(
+ m_timer_task = new LambdaContext(
[this, object_no=object_requests->get_object_no()](int r) {
ceph_assert(ceph_mutex_is_locked(*m_timer_lock));
auto cct = m_image_ctx->cct;
m_timer_task = nullptr;
m_image_ctx->op_work_queue->queue(
- new FunctionContext(
+ new LambdaContext(
[this, object_no](int r) {
std::lock_guard locker{m_lock};
dispatch_delayed_requests(object_no);
ldout(cct, 20) << dendl;
auto ctx = *on_finish;
- *on_finish = new FunctionContext(
+ *on_finish = new LambdaContext(
[image_ctx=m_image_ctx, ctx, journal_tid=*journal_tid](int r) {
image_ctx->journal->commit_io_event(journal_tid, r);
ctx->complete(r);
// resume the op state machine once the associated OpFinishEvent
// is processed
- op_event.on_op_finish_event = new FunctionContext(
+ op_event.on_op_finish_event = new LambdaContext(
[on_resume](int r) {
on_resume->complete(r);
});
Context *ctx = create_context_callback(
&DisableRequest<I>::handle_remove_snap, client_id);
- ctx = new FunctionContext([this, snap_namespace, snap_name, ctx](int r) {
+ ctx = new LambdaContext([this, snap_namespace, snap_name, ctx](int r) {
m_image_ctx->operations->snap_remove(snap_namespace,
snap_name.c_str(),
ctx);
Context*(DisableRequest<I>::*handle)(int*, const std::string &client_id),
const std::string &client_id) {
- return new FunctionContext([this, handle, client_id](int r) {
+ return new LambdaContext([this, handle, client_id](int r) {
Context *on_finish = (this->*handle)(&r, client_id);
if (on_finish != nullptr) {
on_finish->complete(r);
// Do the split ASAP: enqueue it in the MDSRank waiters which are
// run at the end of dispatching the current request
mds->queue_waiter(new MDSInternalContextWrapper(mds,
- new FunctionContext(callback)));
+ new LambdaContext(std::move(callback))));
} else if (is_new) {
// Set a timer to really do the split: we don't do it immediately
// so that bursts of ops on a directory have a chance to go through
// before we freeze it.
mds->timer.add_event_after(bal_fragment_interval,
- new FunctionContext(callback));
+ new LambdaContext(std::move(callback)));
}
}
dout(20) << __func__ << " enqueued dir " << *dir << dendl;
merge_pending.insert(frag);
mds->timer.add_event_after(bal_fragment_interval,
- new FunctionContext(callback));
+ new LambdaContext(std::move(callback)));
} else {
dout(20) << __func__ << " dir already in queue " << *dir << dendl;
}
{
open_mydir_inode(
new MDSInternalContextWrapper(mds,
- new FunctionContext([this, c](int r) {
+ new LambdaContext([this, c](int r) {
if (r < 0) {
c->complete(r);
return;
// I'm survivor: refresh snap cache
mds->snapclient->sync(
new MDSInternalContextWrapper(mds,
- new FunctionContext([this](int r) {
+ new LambdaContext([this](int r) {
maybe_finish_slave_resolve();
})
)
open_file_table.prefetch_inodes()) {
open_file_table.wait_for_prefetch(
new MDSInternalContextWrapper(mds,
- new FunctionContext([this](int r) {
+ new LambdaContext([this](int r) {
ceph_assert(rejoin_gather.count(mds->get_nodeid()));
process_imported_caps();
})
MDSGatherBuilder gather(g_ceph_context,
new MDSInternalContextWrapper(mds,
- new FunctionContext([this](int r) {
+ new LambdaContext([this](int r) {
if (rejoin_gather.empty())
rejoin_gather_finish();
})
MDSContext *fin = nullptr;
if (shutdown_exporting_strays.empty()) {
fin = new MDSInternalContextWrapper(mds,
- new FunctionContext([this](int r) {
+ new LambdaContext([this](int r) {
shutdown_export_strays();
})
);
if (header->get_recursive()) {
header->get_origin()->get(CInode::PIN_SCRUBQUEUE);
fin = new MDSInternalContextWrapper(mds,
- new FunctionContext([this, header](int r) {
+ new LambdaContext([this, header](int r) {
recursive_scrub_finish(header);
header->get_origin()->put(CInode::PIN_SCRUBQUEUE);
})
// If the scrub did some repair, then flush the journal at the end of
// the scrub. Otherwise in the case of e.g. rewriting a backtrace
// the on disk state will still look damaged.
- auto scrub_finish = new FunctionContext([this, header, fin](int r){
+ auto scrub_finish = new LambdaContext([this, header, fin](int r){
if (!header->get_repaired()) {
if (fin)
fin->complete(r);
return;
}
- auto flush_finish = new FunctionContext([this, fin](int r){
+ auto flush_finish = new LambdaContext([this, fin](int r){
dout(4) << "Expiring log segments because scrub did some repairs" << dendl;
mds->mdlog->trim_all();
// schedule
tick_event = timer.add_event_after(
g_conf()->mds_tick_interval,
- new FunctionContext([this](int) {
+ new LambdaContext([this](int) {
ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
tick();
}));
if (mds_rank == NULL) {
mds_rank = new MDSRankDispatcher(whoami, mds_lock, clog,
timer, beacon, mdsmap, messenger, monc, &mgrc,
- new FunctionContext([this](int r){respawn();}),
- new FunctionContext([this](int r){suicide();}));
+ new LambdaContext([this](int r){respawn();}),
+ new LambdaContext([this](int r){suicide();}));
dout(10) << __func__ << ": initializing MDS rank "
<< mds_rank->get_nodeid() << dendl;
mds_rank->init();
// previous segments for expiry
mdlog->start_new_segment();
- Context *ctx = new FunctionContext([this](int r) {
+ Context *ctx = new LambdaContext([this](int r) {
handle_flush_mdlog(r);
});
void clear_mdlog() {
dout(20) << __func__ << dendl;
- Context *ctx = new FunctionContext([this](int r) {
+ Context *ctx = new LambdaContext([this](int r) {
handle_clear_mdlog(r);
});
return;
}
- Context *ctx = new FunctionContext([this](int r) {
+ Context *ctx = new LambdaContext([this](int r) {
handle_expire_segments(r);
});
expiry_gather->set_finisher(new MDSInternalContextWrapper(mds, ctx));
void trim_segments() {
dout(20) << __func__ << dendl;
- Context *ctx = new C_OnFinisher(new FunctionContext([this](int _) {
+ Context *ctx = new C_OnFinisher(new LambdaContext([this](int) {
std::lock_guard locker(mds->mds_lock);
trim_expired_segments();
}), mds->finisher);
void write_journal_head() {
dout(20) << __func__ << dendl;
- Context *ctx = new FunctionContext([this](int r) {
+ Context *ctx = new LambdaContext([this](int r) {
std::lock_guard locker(mds->mds_lock);
handle_write_head(r);
});
return;
}
- timer_task = new FunctionContext([this](int _) {
+ timer_task = new LambdaContext([this](int) {
timer_task = nullptr;
complete(-ETIMEDOUT);
});
caps_recalled += count;
if ((throttled || count > 0) && (recall_timeout == 0 || duration < recall_timeout)) {
C_ContextTimeout *ctx = new C_ContextTimeout(
- mds, 1, new FunctionContext([this](int r) {
+ mds, 1, new LambdaContext([this](int r) {
recall_client_state();
}));
ctx->start_timer();
} else {
uint64_t remaining = (recall_timeout == 0 ? 0 : recall_timeout-duration);
C_ContextTimeout *ctx = new C_ContextTimeout(
- mds, remaining, new FunctionContext([this](int r) {
+ mds, remaining, new LambdaContext([this](int r) {
handle_recall_client_state(r);
}));
void flush_journal() {
dout(20) << __func__ << dendl;
- Context *ctx = new FunctionContext([this](int r) {
+ Context *ctx = new LambdaContext([this](int r) {
handle_flush_journal(r);
});
auto [throttled, count] = do_trim();
if (throttled && count > 0) {
- auto timer = new FunctionContext([this](int _) {
+ auto timer = new LambdaContext([this](int) {
trim_cache();
});
mds->timer.add_event_after(1.0, timer);
cluster_degraded(false), stopping(false),
purge_queue(g_ceph_context, whoami_,
mdsmap_->get_metadata_pool(), objecter,
- new FunctionContext([this](int r) {
+ new LambdaContext([this](int r) {
std::lock_guard l(mds_lock);
handle_write_error(r);
}
auto apply_blacklist = [this, cmd](std::function<void ()> fn){
ceph_assert(ceph_mutex_is_locked_by_me(mds_lock));
- Context *on_blacklist_done = new FunctionContext([this, fn](int r) {
+ Context *on_blacklist_done = new LambdaContext([this, fn](int r) {
objecter->wait_for_latest_osdmap(
new C_OnFinisher(
- new FunctionContext([this, fn](int r) {
+ new LambdaContext([this, fn](int r) {
std::lock_guard l(mds_lock);
auto epoch = objecter->with_osdmap([](const OSDMap &o){
return o.get_epoch();
}
Context *MDSRank::create_async_exec_context(C_ExecAndReply *ctx) {
- return new C_OnFinisher(new FunctionContext([ctx](int _) {
+ return new C_OnFinisher(new LambdaContext([ctx](int) {
ctx->exec();
}), finisher);
}
update_log_config();
}
- finisher->queue(new FunctionContext([this, changed](int r) {
+ finisher->queue(new LambdaContext([this, changed](int) {
std::scoped_lock lock(mds_lock);
if (changed.count("mds_log_pause") && !g_conf()->mds_log_pause) {
dout(20) << __func__ << dendl;
timer.add_event_after(g_conf().get_val<double>("mds_task_status_update_interval"),
- new FunctionContext([this](int _) {
+ new LambdaContext([this](int) {
send_task_status();
}));
}
if (gather.has_subs()) {
gather.set_finisher(
new MDSInternalContextWrapper(mds,
- new FunctionContext(finish_func)));
+ new LambdaContext(std::move(finish_func))));
gather.activate();
} else {
finish_func(0);
if (!load_done) {
wait_for_load(
new MDSInternalContextWrapper(mds,
- new FunctionContext([this](int r) {
+ new LambdaContext([this](int r) {
_prefetch_inodes();
})
)
if (in_flight.empty()) {
dout(4) << "start work (by drain)" << dendl;
- finisher.queue(new FunctionContext([this](int r) {
+ finisher.queue(new LambdaContext([this](int r) {
std::lock_guard l(lock);
_consume();
}));
if (completion)
waiting_for_recovery.push_back(completion);
- journaler.recover(new FunctionContext([this](int r){
+ journaler.recover(new LambdaContext([this](int r){
if (r == -ENOENT) {
dout(1) << "Purge Queue not found, assuming this is an upgrade and "
"creating it." << dendl;
if (!journaler.is_readable() &&
!journaler.get_error() &&
journaler.get_read_pos() < journaler.get_write_pos()) {
- journaler.wait_for_readable(new FunctionContext([this](int r) {
+ journaler.wait_for_readable(new LambdaContext([this](int r) {
std::lock_guard l(lock);
_recover();
}));
layout.pool_id = metadata_pool;
journaler.set_writeable();
journaler.create(&layout, JOURNAL_FORMAT_RESILIENT);
- journaler.write_head(new FunctionContext([this](int r) {
+ journaler.write_head(new LambdaContext([this](int r) {
std::lock_guard l(lock);
if (r) {
_go_readonly(r);
// we should flush in order to allow MDCache to drop its strays rather
// than having them wait for purgequeue to progress.
if (!delayed_flush) {
- delayed_flush = new FunctionContext([this](int r){
+ delayed_flush = new LambdaContext([this](int r){
delayed_flush = nullptr;
journaler.flush();
});
// Because we are the writer and the reader of the journal
// via the same Journaler instance, we never need to reread_head
if (!journaler.have_waiter()) {
- journaler.wait_for_readable(new FunctionContext([this](int r) {
+ journaler.wait_for_readable(new LambdaContext([this](int r) {
std::lock_guard l(lock);
if (r == 0) {
_consume();
ceph_assert(gather.has_subs());
gather.set_finisher(new C_OnFinisher(
- new FunctionContext([this, expire_to](int r){
+ new LambdaContext([this, expire_to](int r){
std::lock_guard l(lock);
_execute_item_complete(expire_to);
// might need to kick off consume.
dout(4) << "maybe start work again (max_purge_files="
<< g_conf()->mds_max_purge_files << dendl;
- finisher.queue(new FunctionContext([this](int r){
+ finisher.queue(new LambdaContext([this](int r){
std::lock_guard l(lock);
_consume();
}));
Context *send_reply;
if (reply) {
int64_t session_id = session->get_client().v;
- send_reply = new FunctionContext([this, session_id, reply](int r) {
+ send_reply = new LambdaContext([this, session_id, reply](int r) {
assert(ceph_mutex_is_locked_by_me(mds->mds_lock));
Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(session_id));
if (!session) {
pv = mds->sessionmap.mark_projected(session);
sseq = mds->sessionmap.set_state(session, Session::STATE_OPENING);
mds->sessionmap.touch_session(session);
- auto fin = new FunctionContext([log_session_status = std::move(log_session_status)](int r){
+ auto fin = new LambdaContext([log_session_status = std::move(log_session_status)](int r){
ceph_assert(r == 0);
log_session_status("ACCEPTED", "");
});
if (gather.has_subs()) {
dout(1) << "reconnect will complete once clients are evicted" << dendl;
- gather.set_finisher(new MDSInternalContextWrapper(mds, new FunctionContext(
+ gather.set_finisher(new MDSInternalContextWrapper(mds, new LambdaContext(
[this](int r){reconnect_gather_finish();})));
gather.activate();
reconnect_evicting = true;
// Send all python calls down a Finisher to avoid blocking
// C++ code, and avoid any potential lock cycles.
- finisher.queue(new FunctionContext([this, active_module, name](int) {
+ finisher.queue(new LambdaContext([this, active_module, name](int) {
int r = active_module->load(this);
if (r != 0) {
derr << "Failed to run module in active mode ('" << name << "')"
auto module = i.second.get();
// Send all python calls down a Finisher to avoid blocking
// C++ code, and avoid any potential lock cycles.
- finisher.queue(new FunctionContext([module, notify_type, notify_id](int r){
+ finisher.queue(new LambdaContext([module, notify_type, notify_id](int r){
module->notify(notify_type, notify_id);
}));
}
// Note intentional use of non-reference lambda binding on
// log_entry: we take a copy because caller's instance is
// probably ephemeral.
- finisher.queue(new FunctionContext([module, log_entry](int r){
+ finisher.queue(new LambdaContext([module, log_entry](int r){
module->notify_clog(log_entry);
}));
}
auto module = i.second.get();
// Send all python calls down a Finisher to avoid blocking
// C++ code, and avoid any potential lock cycles.
- finisher.queue(new FunctionContext([module](int r){
+ finisher.queue(new LambdaContext([module](int r){
module->config_notify();
}));
}
// TODO: enhance MCommand interface so that it returns
// latest cluster map versions on completion, and callers
// can wait for those.
- auto c = new FunctionContext([command_c, self](int command_r){
+ auto c = new LambdaContext([command_c, self](int command_r){
self->py_modules->get_objecter().wait_for_latest_osdmap(
- new FunctionContext([command_c, command_r](int wait_r){
+ new LambdaContext([command_c, command_r](int wait_r){
command_c->complete(command_r);
})
);
return;
tick_event = timer.add_event_after(delay_sec,
- new FunctionContext([this](int r) {
+ new LambdaContext([this](int r) {
tick();
}));
}
// Send a fresh MMgrConfigure to all clients, so that they can follow
// the new policy for transmitting stats
- finisher.queue(new FunctionContext([this](int r) {
+ finisher.queue(new LambdaContext([this](int r) {
std::lock_guard l(lock);
for (auto &c : daemon_connections) {
if (c->peer_is_osd()) {
}
dout(10) << "passing through " << cmdctx->cmdmap.size() << dendl;
- finisher.queue(new FunctionContext([this, cmdctx, handler_name, prefix](int r_) {
+ finisher.queue(new LambdaContext([this, cmdctx, handler_name, prefix](int r_) {
std::stringstream ss;
// Validate that the module is enabled
<< daemon_connections.size() << " clients" << dendl;
// Send a fresh MMgrConfigure to all clients, so that they can follow
// the new policy for transmitting stats
- finisher.queue(new FunctionContext([this](int r) {
+ finisher.queue(new LambdaContext([this](int r) {
std::lock_guard l(lock);
for (auto &c : daemon_connections) {
_send_configure(c);
finisher.start();
- finisher.queue(new FunctionContext([this, completion](int r){
+ finisher.queue(new LambdaContext([this, completion](int r){
init();
completion->complete(0);
}));
void Mgr::shutdown()
{
- finisher.queue(new FunctionContext([&](int) {
+ finisher.queue(new LambdaContext([&](int) {
{
std::lock_guard l(lock);
// First stop the server so that we're not taking any more incoming
if (!connect_retry_callback) {
connect_retry_callback = timer.add_event_at(
when,
- new FunctionContext([this](int r){
+ new LambdaContext([this](int r){
connect_retry_callback = nullptr;
reconnect();
}));
if (stats_period != 0) {
report_callback = timer.add_event_after(
stats_period,
- new FunctionContext([this](int) {
+ new LambdaContext([this](int) {
_send_stats();
}));
}
timer.add_event_after(
g_conf().get_val<std::chrono::seconds>("mgr_tick_period").count(),
- new FunctionContext([this](int r){
+ new LambdaContext([this](int r){
tick();
}
));
void MgrStandby::shutdown()
{
- finisher.queue(new FunctionContext([&](int) {
+ finisher.queue(new LambdaContext([&](int) {
std::lock_guard l(lock);
dout(4) << "Shutting down" << dendl;
active_mgr.reset(new Mgr(&monc, map, &py_module_registry,
client_messenger.get(), &objecter,
&client, clog, audit_clog));
- active_mgr->background_init(new FunctionContext(
+ active_mgr->background_init(new LambdaContext(
[this](int r){
// Advertise our active-ness ASAP instead of waiting for
// next tick.
// Send all python calls down a Finisher to avoid blocking
// C++ code, and avoid any potential lock cycles.
- finisher.queue(new FunctionContext([this, standby_module, name](int) {
+ finisher.queue(new LambdaContext([this, standby_module, name](int) {
int r = standby_module->load();
if (r != 0) {
derr << "Failed to run module in standby mode ('" << name << "')"
*/
expire_event = mon->timer.add_event_after(
g_conf()->mon_election_timeout + plus,
- new C_MonContext(mon, [this](int) {
+ new C_MonContext{mon, [this](int) {
logic.end_election_period();
- }));
+ }});
}
* know which FS it was part of. Nor does this matter. Sending an empty
* MDSMap is sufficient for getting the MDS to respawn.
*/
- wait_for_finished_proposal(op, new FunctionContext([op, this](int r){
+ wait_for_finished_proposal(op, new LambdaContext([op, this](int r){
if (r >= 0) {
const auto& fsmap = get_fsmap();
MDSMap null_map;
dout(5) << "prepare_beacon pending map now:" << dendl;
print_map(pending);
- wait_for_finished_proposal(op, new FunctionContext([op, this](int r){
+ wait_for_finished_proposal(op, new LambdaContext([op, this](int r){
if (r >= 0)
_updated(op); // success
else if (r == -ECANCELED) {
send_digests();
} else {
cancel_timer();
- wait_for_active_ctx(new C_MonContext(mon, [this](int) {
+ wait_for_active_ctx(new C_MonContext{mon, [this](int) {
send_digests();
- }));
+ }});
}
}
}
timer:
digest_event = mon->timer.add_event_after(
g_conf().get_val<int64_t>("mon_mgr_digest_period"),
- new C_MonContext(mon, [this](int) {
+ new C_MonContext{mon, [this](int) {
send_digests();
- }));
+ }});
}
void MgrMonitor::cancel_timer()
void MonClient::handle_config(MConfig *m)
{
ldout(cct,10) << __func__ << " " << *m << dendl;
- finisher.queue(new FunctionContext([this, m](int r) {
+ finisher.queue(new LambdaContext([this, m](int r) {
cct->_conf.set_mon_vals(cct, m->config, config_cb);
if (config_notify_cb) {
config_notify_cb();
void MonClient::schedule_tick()
{
- auto do_tick = make_lambda_context([this]() { tick(); });
+ auto do_tick = make_lambda_context([this](int) { tick(); });
if (_hunting()) {
const auto hunt_interval = (cct->_conf->mon_client_hunt_interval *
reopen_interval_multiplier);
#undef COMMAND
#undef COMMAND_WITH_FLAG
-
-
-void C_MonContext::finish(int r) {
- if (mon->is_shutdown())
- return;
- FunctionContext::finish(r);
-}
-
Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,
Messenger *m, Messenger *mgr_m, MonMap *map) :
Dispatcher(cct_),
if (changed.count("mon_health_to_clog") ||
changed.count("mon_health_to_clog_interval") ||
changed.count("mon_health_to_clog_tick_interval")) {
- finisher.queue(new C_MonContext(this, [this, changed](int) {
+ finisher.queue(new C_MonContext{this, [this, changed](int) {
std::lock_guard l{lock};
health_to_clog_update_conf(changed);
- }));
+ }});
}
if (changed.count("mon_scrub_interval")) {
int scrub_interval = conf->mon_scrub_interval;
- finisher.queue(new C_MonContext(this, [this, scrub_interval](int) {
+ finisher.queue(new C_MonContext{this, [this, scrub_interval](int) {
std::lock_guard l{lock};
scrub_update_interval(scrub_interval);
- }));
+ }});
}
}
timer.cancel_event(sync_timeout_event);
sync_timeout_event = timer.add_event_after(
g_conf()->mon_sync_timeout,
- new C_MonContext(this, [this](int) {
+ new C_MonContext{this, [this](int) {
sync_timeout();
- }));
+ }});
}
void Monitor::sync_finish(version_t last_committed)
void Monitor::reset_probe_timeout()
{
cancel_probe_timeout();
- probe_timeout_event = new C_MonContext(this, [this](int r) {
+ probe_timeout_event = new C_MonContext{this, [this](int r) {
probe_timeout(r);
- });
+ }};
double t = g_conf()->mon_probe_timeout;
if (timer.add_event_after(t, probe_timeout_event)) {
dout(10) << "reset_probe_timeout " << probe_timeout_event
// Freshen the health status before doing health_to_clog in case
// our just-completed election changed the health
- healthmon()->wait_for_active_ctx(new FunctionContext([this](int r){
+ healthmon()->wait_for_active_ctx(new LambdaContext([this](int r){
dout(20) << "healthmon now active" << dendl;
healthmon()->tick();
if (healthmon()->is_proposing()) {
dout(20) << __func__ << " healthmon proposing, waiting" << dendl;
- healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext(this,
+ healthmon()->wait_for_finished_proposal(nullptr, new C_MonContext{this,
[this](int r){
ceph_assert(ceph_mutex_is_locked_by_me(lock));
do_health_to_clog_interval();
- }));
+ }});
} else {
do_health_to_clog_interval();
health_tick_stop();
health_tick_event = timer.add_event_after(
cct->_conf->mon_health_to_clog_tick_interval,
- new C_MonContext(this, [this](int r) {
+ new C_MonContext{this, [this](int r) {
if (r < 0)
return;
health_tick_start();
- }));
+ }});
}
void Monitor::health_tick_stop()
health_interval_stop();
auto next = health_interval_calc_next_update();
- health_interval_event = new C_MonContext(this, [this](int r) {
+ health_interval_event = new C_MonContext{this, [this](int r) {
if (r < 0)
return;
do_health_to_clog_interval();
- });
+ }};
if (!timer.add_event_at(next, health_interval_event)) {
health_interval_event = nullptr;
}
timecheck_event = timer.add_event_after(
delay,
- new C_MonContext(this, [this](int) {
+ new C_MonContext{this, [this](int) {
timecheck_start_round();
- }));
+ }});
}
void Monitor::timecheck_check_skews()
scrub_event = timer.add_event_after(
cct->_conf->mon_scrub_interval,
- new C_MonContext(this, [this](int) {
+ new C_MonContext{this, [this](int) {
scrub_start();
- }));
+ }});
}
void Monitor::scrub_event_cancel()
scrub_cancel_timeout();
scrub_timeout_event = timer.add_event_after(
g_conf()->mon_scrub_timeout,
- new C_MonContext(this, [this](int) {
+ new C_MonContext{this, [this](int) {
scrub_timeout();
- }));
+ }});
}
/************ TICK ***************/
void Monitor::new_tick()
{
- timer.add_event_after(g_conf()->mon_tick_interval, new C_MonContext(this, [this](int) {
+ timer.add_event_after(g_conf()->mon_tick_interval, new C_MonContext{this, [this](int) {
tick();
- }));
+ }});
}
void Monitor::tick()
#define COMPAT_SET_LOC "feature_set"
-class C_MonContext final : public FunctionContext {
- const Monitor *mon;
-public:
- explicit C_MonContext(Monitor *m, boost::function<void(int)>&& callback)
- : FunctionContext(std::move(callback)), mon(m) {}
- void finish(int r) override;
-};
-
class Monitor : public Dispatcher,
public AuthClient,
public AuthServer,
// make sure you add your feature to Monitor::get_supported_features
+/* Callers use:
+ *
+ * new C_MonContext{...}
+ *
+ * instead of
+ *
+ * new C_MonContext(...)
+ *
+ * because of gcc bug [1].
+ *
+ * [1] https://gcc.gnu.org/bugzilla/show_bug.cgi?id=85883
+ */
+template<typename T>
+class C_MonContext : public LambdaContext<T> {
+public:
+ C_MonContext(const Monitor* m, T&& f) :
+ LambdaContext<T>(std::forward<T>(f)),
+ mon(m)
+ {}
+ void finish(int r) override {
+ if (mon->is_shutdown())
+ return;
+ LambdaContext<T>::finish(r);
+ }
+private:
+ const Monitor* mon;
+};
#endif
pending_inc.new_xinfo[target_osd].dead_epoch = m->get_epoch();
wait_for_finished_proposal(
op,
- new FunctionContext(
+ new LambdaContext(
[op, this] (int r) {
if (r >= 0) {
mon->no_reply(op); // ignore on success
collect_timeout_event = mon->timer.add_event_after(
g_conf()->mon_accept_timeout_factor *
g_conf()->mon_lease,
- new C_MonContext(mon, [this](int r) {
+ new C_MonContext{mon, [this](int r) {
if (r == -ECANCELED)
return;
collect_timeout();
- }));
+ }});
}
// set timeout event
accept_timeout_event = mon->timer.add_event_after(
g_conf()->mon_accept_timeout_factor * g_conf()->mon_lease,
- new C_MonContext(mon, [this](int r) {
+ new C_MonContext{mon, [this](int r) {
if (r == -ECANCELED)
return;
accept_timeout();
- }));
+ }});
}
// peon
if (!lease_ack_timeout_event) {
lease_ack_timeout_event = mon->timer.add_event_after(
g_conf()->mon_lease_ack_timeout_factor * g_conf()->mon_lease,
- new C_MonContext(mon, [this](int r) {
+ new C_MonContext{mon, [this](int r) {
if (r == -ECANCELED)
return;
lease_ack_timeout();
- }));
+ }});
}
// set renew event
at += ceph::make_timespan(g_conf()->mon_lease_renew_interval_factor *
g_conf()->mon_lease);
lease_renew_event = mon->timer.add_event_at(
- at, new C_MonContext(mon, [this](int r) {
+ at, new C_MonContext{mon, [this](int r) {
if (r == -ECANCELED)
return;
lease_renew_timeout();
- }));
+ }});
}
void Paxos::warn_on_future_time(utime_t t, entity_name_t from)
mon->timer.cancel_event(lease_timeout_event);
lease_timeout_event = mon->timer.add_event_after(
g_conf()->mon_lease_ack_timeout_factor * g_conf()->mon_lease,
- new C_MonContext(mon, [this](int r) {
+ new C_MonContext{mon, [this](int r) {
if (r == -ECANCELED)
return;
lease_timeout();
- }));
+ }});
}
void Paxos::lease_timeout()
* Callback class used to propose the pending value once the proposal_timer
* fires up.
*/
- auto do_propose = new C_MonContext(mon, [this](int r) {
+ auto do_propose = new C_MonContext{mon, [this](int r) {
proposal_timer = 0;
if (r >= 0) {
propose_pending();
} else {
ceph_abort_msg("bad return value for proposal_timer");
}
- });
+ }};
dout(10) << " setting proposal_timer " << do_propose
<< " with delay of " << delay << dendl;
proposal_timer = mon->timer.add_event_after(delay, do_propose);
if (tick_period <= 0)
return;
- tick_event = new C_MonContext(mon, [this](int r) {
+ tick_event = new C_MonContext{mon, [this](int r) {
if (r < 0)
return;
tick();
- });
+ }};
mon->timer.add_event_after(tick_period, tick_event);
}
// this thread might be required for splitting and merging PGs to
// make progress.
boot_finisher.queue(
- new FunctionContext(
+ new LambdaContext(
[this](int r) {
std::unique_lock l(osd_lock);
if (is_preboot()) {
std::lock_guard l(service.sleep_lock);
if (recovery_sleep > 0 && service.recovery_needs_sleep) {
PGRef pgref(pg);
- auto recovery_requeue_callback = new FunctionContext([this, pgref, queued, reserved_pushes](int r) {
+ auto recovery_requeue_callback = new LambdaContext([this, pgref, queued, reserved_pushes](int r) {
dout(20) << "do_recovery wake up at "
<< ceph_clock_now()
<< ", re-queuing recovery" << dendl;
spg_t pgid = get_pgid();
int state = scrubber.state;
auto scrub_requeue_callback =
- new FunctionContext([osds, pgid, state](int r) {
+ new LambdaContext([osds, pgid, state](int r) {
PGRef pg = osds->osd->lookup_lock_pg(pgid);
if (pg == nullptr) {
lgeneric_dout(osds->osd->cct, 20)
if (osd_delete_sleep > 0 && delete_needs_sleep) {
epoch_t e = get_osdmap()->get_epoch();
PGRef pgref(this);
- auto delete_requeue_callback = new FunctionContext([this, pgref, e](int r) {
+ auto delete_requeue_callback = new LambdaContext([this, pgref, e](int r) {
dout(20) << __func__ << " wake up at "
<< ceph_clock_now()
<< ", re-queuing delete" << dendl;
reply->objects = m->objects;
ConnectionRef conn = m->get_connection();
- gather.set_finisher(new FunctionContext(
+ gather.set_finisher(new LambdaContext(
[=](int r) {
if (r != -EAGAIN) {
get_parent()->send_message_osd_cluster(reply, conn.get());
ceph_assert(!recovering.count(soid));
recovering.insert(make_pair(soid, ObjectContextRef()));
epoch_t cur_epoch = get_osdmap_epoch();
- remove_missing_object(soid, v, new FunctionContext(
+ remove_missing_object(soid, v, new LambdaContext(
[=](int) {
std::scoped_lock locker{*this};
if (!pg_has_reset_since(cur_epoch)) {
recovery_info.version = v;
epoch_t cur_epoch = get_osdmap_epoch();
- t.register_on_complete(new FunctionContext(
+ t.register_on_complete(new LambdaContext(
[=](int) {
std::unique_lock locker{*this};
if (!pg_has_reset_since(cur_epoch)) {
m->entries, t, op_trim_to, op_roll_forward_to);
eversion_t new_lcod = info.last_complete;
- Context *complete = new FunctionContext(
+ Context *complete = new LambdaContext(
[=](int) {
const MOSDPGUpdateLogMissing *msg = static_cast<const MOSDPGUpdateLogMissing*>(
op->get_req());
if (gather.has_subs()) {
bool flushed = was_dirty && oset->dirty_or_tx == 0;
- gather.set_finisher(new FunctionContext(
+ gather.set_finisher(new LambdaContext(
[this, oset, flushed, on_finish](int) {
ceph_assert(ceph_mutex_is_locked(lock));
if (flushed && flush_set_callback)
}
}
- auto ctx = new FunctionContext([](int reg) {
+ auto ctx = new LambdaContext([](int reg) {
ASSERT_TRUE(reg == 0);
});
m_cache_client->register_client(ctx);
void test_register_client(uint64_t random_index) {
ASSERT_TRUE(m_cache_client_vec[random_index] == nullptr);
- auto ctx = new FunctionContext([](int ret){
+ auto ctx = new LambdaContext([](int ret){
ASSERT_TRUE(ret == 0);
});
auto session = create_session(random_index);
int ret = m_callback();
if (m_comp != NULL) {
if (m_finisher != NULL) {
- m_finisher->queue(new FunctionContext(boost::bind(
+ m_finisher->queue(new LambdaContext(boost::bind(
&finish_aio_completion, m_comp, ret)));
} else {
finish_aio_completion(m_comp, ret);
struct WaitForFlush {
int flushed() {
if (--count == 0) {
- aio_finisher->queue(new FunctionContext(boost::bind(
+ aio_finisher->queue(new LambdaContext(boost::bind(
&finish_aio_completion, c, 0)));
delete this;
}
int TestRadosClient::aio_watch_flush(AioCompletionImpl *c) {
c->get();
- Context *ctx = new FunctionContext(boost::bind(
+ Context *ctx = new LambdaContext(boost::bind(
&TestRadosClient::finish_aio_completion, this, c, _1));
get_watch_notify()->aio_flush(this, ctx);
return 0;
auto _pool_id = pool_id;
auto _nspace = nspace;
auto _oid = oid;
- auto ctx = new FunctionContext([_test_watch_notify, _pool_id, _nspace, _oid](int r) {
+ auto ctx = new LambdaContext([_test_watch_notify, _pool_id, _nspace, _oid](int r) {
_test_watch_notify->handle_object_removed(_pool_id, _nspace, _oid);
});
test_rados_client->get_aio_finisher()->queue(ctx);
librados::WatchCtx *watch_ctx,
librados::WatchCtx2 *watch_ctx2,
Context *on_finish) {
- auto ctx = new FunctionContext([=](int) {
+ auto ctx = new LambdaContext([=](int) {
execute_watch(rados_client, pool_id, nspace, o, gid, handle, watch_ctx,
watch_ctx2, on_finish);
});
void TestWatchNotify::aio_unwatch(TestRadosClient *rados_client,
uint64_t handle, Context *on_finish) {
- auto ctx = new FunctionContext([this, rados_client, handle, on_finish](int) {
+ auto ctx = new LambdaContext([this, rados_client, handle, on_finish](int) {
execute_unwatch(rados_client, handle, on_finish);
});
rados_client->get_aio_finisher()->queue(ctx);
const std::string& oid, const bufferlist& bl,
uint64_t timeout_ms, bufferlist *pbl,
Context *on_notify) {
- auto ctx = new FunctionContext([=](int) {
+ auto ctx = new LambdaContext([=](int) {
execute_notify(rados_client, pool_id, nspace, oid, bl, pbl, on_notify);
});
rados_client->get_aio_finisher()->queue(ctx);
m_async_op_tracker.start_op();
uint64_t notifier_id = rados_client->get_instance_id();
- watch_handle.rados_client->get_aio_finisher()->queue(new FunctionContext(
+ watch_handle.rados_client->get_aio_finisher()->queue(new LambdaContext(
[this, pool_id, nspace, oid, bl, notify_id, watch_handle, notifier_id](int r) {
bufferlist notify_bl;
notify_bl.append(bl);
auto handle = watch_handle.handle;
auto watch_ctx2 = watch_handle.watch_ctx2;
if (watch_ctx2 != nullptr) {
- auto ctx = new FunctionContext([handle, watch_ctx2](int) {
+ auto ctx = new LambdaContext([handle, watch_ctx2](int) {
watch_ctx2->handle_error(handle, -ENOTCONN);
});
watch_handle.rados_client->get_aio_finisher()->queue(ctx);
expect_cache_run(*mock_parent_image_cache, 0);
C_SaferCond cond;
- Context* handle_connect = new FunctionContext([&cond](int ret) {
+ Context* handle_connect = new LambdaContext([&cond](int ret) {
ASSERT_EQ(ret, 0);
cond.complete(0);
});
expect_cache_async_connect(*mock_parent_image_cache, 0, handle_connect);
- Context* ctx = new FunctionContext([](bool reg) {
+ Context* ctx = new LambdaContext([](bool reg) {
ASSERT_EQ(reg, true);
});
expect_cache_register(*mock_parent_image_cache, ctx, 0);
expect_cache_run(*mock_parent_image_cache, 0);
C_SaferCond cond;
- Context* handle_connect = new FunctionContext([&cond](int ret) {
+ Context* handle_connect = new LambdaContext([&cond](int ret) {
ASSERT_EQ(ret, -1);
cond.complete(0);
});
expect_cache_run(*mock_parent_image_cache, 0);
C_SaferCond cond;
- Context* handle_connect = new FunctionContext([&cond](int ret) {
+ Context* handle_connect = new LambdaContext([&cond](int ret) {
ASSERT_EQ(ret, 0);
cond.complete(0);
});
expect_cache_async_connect(*mock_parent_image_cache, 0, handle_connect);
- Context* ctx = new FunctionContext([](bool reg) {
+ Context* ctx = new LambdaContext([](bool reg) {
ASSERT_EQ(reg, false);
});
expect_cache_register(*mock_parent_image_cache, ctx, -1);
expect_cache_run(*mock_parent_image_cache, 0);
C_SaferCond conn_cond;
- Context* handle_connect = new FunctionContext([&conn_cond](int ret) {
+ Context* handle_connect = new LambdaContext([&conn_cond](int ret) {
ASSERT_EQ(ret, 0);
conn_cond.complete(0);
});
expect_cache_async_connect(*mock_parent_image_cache, 0, handle_connect);
- Context* ctx = new FunctionContext([](bool reg) {
+ Context* ctx = new LambdaContext([](bool reg) {
ASSERT_EQ(reg, true);
});
expect_cache_register(*mock_parent_image_cache, ctx, 0);
if ((m_src_image_ctx->features & RBD_FEATURE_EXCLUSIVE_LOCK) == 0) {
return;
}
- EXPECT_CALL(mock_exclusive_lock, start_op(_)).WillOnce(
- ReturnNew<FunctionContext>([](int) {}));
+ EXPECT_CALL(mock_exclusive_lock, start_op(_)).WillOnce(Return(new LambdaContext([](int){})));
}
void expect_list_snaps(librbd::MockTestImageCtx &mock_image_ctx,
}
void expect_start_op(librbd::MockExclusiveLock &mock_exclusive_lock) {
- EXPECT_CALL(mock_exclusive_lock, start_op(_)).WillOnce(
- ReturnNew<FunctionContext>([](int) {}));
+ EXPECT_CALL(mock_exclusive_lock, start_op(_)).WillOnce(Return(new LambdaContext([](int){})));
}
void expect_test_features(librbd::MockTestImageCtx &mock_image_ctx,
if ((m_src_image_ctx->features & RBD_FEATURE_EXCLUSIVE_LOCK) == 0) {
return;
}
- EXPECT_CALL(mock_exclusive_lock, start_op(_)).WillOnce(
- ReturnNew<FunctionContext>([](int) {}));
+ EXPECT_CALL(mock_exclusive_lock, start_op(_)).WillOnce(Return(new LambdaContext([](int){})));
}
void expect_get_snap_namespace(librbd::MockTestImageCtx &mock_image_ctx,
}
void expect_start_op(librbd::MockExclusiveLock &mock_exclusive_lock) {
- EXPECT_CALL(mock_exclusive_lock, start_op(_)).WillOnce(
- ReturnNew<FunctionContext>([](int) {}));
+ EXPECT_CALL(mock_exclusive_lock, start_op(_)).WillOnce(Return(new LambdaContext([](int){})));
}
void expect_test_features(librbd::MockTestImageCtx &mock_image_ctx,
}
ACTION_P2(CompleteAioCompletion, r, image_ctx) {
- image_ctx->op_work_queue->queue(new FunctionContext([this, arg0](int r) {
+ image_ctx->op_work_queue->queue(new LambdaContext([this, arg0](int r) {
arg0->get();
arg0->init_time(image_ctx, librbd::io::AIO_TYPE_NONE);
arg0->set_request_count(1);
EXPECT_CALL(mock_request, send_op())
.WillOnce(Invoke([&mock_image_ctx, &mock_request, r]() {
mock_image_ctx.image_ctx->op_work_queue->queue(
- new FunctionContext([&mock_request, r](int _) {
+ new LambdaContext([&mock_request, r](int _) {
mock_request.send_op_impl(r);
}), 0);
}));
EXPECT_CALL(mock_io_image_dispatch_spec, send())
.WillOnce(Invoke([&mock_image_ctx, &mock_io_image_dispatch_spec, r]() {
auto aio_comp = mock_io_image_dispatch_spec.s_instance->aio_comp;
- auto ctx = new FunctionContext([aio_comp](int r) {
+ auto ctx = new LambdaContext([aio_comp](int r) {
aio_comp->fail(r);
});
mock_image_ctx.image_ctx->op_work_queue->queue(ctx, r);
throttle.start_op();
uint64_t object_no = (rand() % object_count);
- auto ctx = new FunctionContext([&throttle, object_no](int r) {
+ auto ctx = new LambdaContext([&throttle, object_no](int r) {
ASSERT_EQ(0, r) << "object_no=" << object_no;
throttle.end_op(r);
});
}
void expect_start_op(librbd::MockExclusiveLock &mock_exclusive_lock) {
- EXPECT_CALL(mock_exclusive_lock, start_op(_)).WillOnce(
- ReturnNew<FunctionContext>([](int) {}));
+ EXPECT_CALL(mock_exclusive_lock, start_op(_)).WillOnce(Return(new LambdaContext([](int){})));
}
void expect_rollback_object_map(librbd::MockObjectMap &mock_object_map, int r) {
mock_image_ctx.image_ctx->op_work_queue->queue(ctx, r);
}
- on_flush = new FunctionContext([on_flush](int r) {
+ on_flush = new LambdaContext([on_flush](int r) {
derr << "FLUSH START" << dendl;
on_flush->complete(r);
derr << "FLUSH FINISH" << dendl;
}
c->get();
- mock_image_ctx.image_ctx->op_work_queue->queue(new FunctionContext([mock_rados_client, action, c](int r) {
+ mock_image_ctx.image_ctx->op_work_queue->queue(new LambdaContext([mock_rados_client, action, c](int r) {
if (action) {
action();
}
.WillOnce(DoAll(Invoke([this, &mock_image_ctx, mock_rados_client, r, action](
uint64_t handle, librados::AioCompletionImpl *c) {
c->get();
- mock_image_ctx.image_ctx->op_work_queue->queue(new FunctionContext([mock_rados_client, action, c](int r) {
+ mock_image_ctx.image_ctx->op_work_queue->queue(new LambdaContext([mock_rados_client, action, c](int r) {
if (action) {
action();
}
.WillOnce(DoAll(WithArgs<1, 2>(Invoke([&mock_image_ctx, &mock_io_ctx, r](librados::AioCompletionImpl *c, uint64_t *cookie) {
*cookie = 234;
c->get();
- mock_image_ctx.image_ctx->op_work_queue->queue(new FunctionContext([&mock_io_ctx, c](int r) {
+ mock_image_ctx.image_ctx->op_work_queue->queue(new LambdaContext([&mock_io_ctx, c](int r) {
mock_io_ctx.get_mock_rados_client()->finish_aio_completion(c, r);
}), r);
})),
.WillOnce(DoAll(Invoke([&mock_image_ctx, &mock_io_ctx, r](uint64_t handle,
librados::AioCompletionImpl *c) {
c->get();
- mock_image_ctx.image_ctx->op_work_queue->queue(new FunctionContext([&mock_io_ctx, c](int r) {
+ mock_image_ctx.image_ctx->op_work_queue->queue(new LambdaContext([&mock_io_ctx, c](int r) {
mock_io_ctx.get_mock_rados_client()->finish_aio_completion(c, r);
}), r);
}),
void expect_start_op(librbd::MockTestImageCtx &mock_image_ctx, bool success) {
EXPECT_CALL(*mock_image_ctx.exclusive_lock, start_op(_))
.WillOnce(Invoke([success](int* r) {
+ auto f = [](int r) {};
if (!success) {
*r = -EROFS;
- return static_cast<FunctionContext*>(nullptr);
+ return static_cast<LambdaContext<decltype(f)>*>(nullptr);
}
- return new FunctionContext([](int r) {});
+ return new LambdaContext(std::move(f));
}));
}
EXPECT_CALL(*mock_threads.timer, add_event_after(_, _))
.WillOnce(DoAll(WithArg<1>(Invoke([this](Context *ctx) {
auto wrapped_ctx =
- new FunctionContext([this, ctx](int r) {
+ new LambdaContext([this, ctx](int r) {
std::lock_guard timer_locker{m_threads->timer_lock};
ctx->complete(r);
});
void expect_add_event(MockThreads &mock_threads) {
EXPECT_CALL(*mock_threads.timer, add_event_after(_,_))
.WillOnce(DoAll(WithArg<1>(Invoke([this](Context *ctx) {
- auto wrapped_ctx = new FunctionContext([this, ctx](int r) {
+ auto wrapped_ctx = new LambdaContext([this, ctx](int r) {
std::lock_guard timer_locker{m_threads->timer_lock};
ctx->complete(r);
});
CephContext *cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
cct->_conf.set_val("rbd_mirror_image_policy_rebalance_timeout", "0");
- auto wrapped_ctx = new FunctionContext([this, ctx](int r) {
+ auto wrapped_ctx = new LambdaContext([this, ctx](int r) {
std::lock_guard timer_locker{m_threads->timer_lock};
ctx->complete(r);
});
mock_threads.timer_cond.notify_one();
} else {
m_threads->work_queue->queue(
- new FunctionContext([&mock_threads, ctx](int) {
+ new LambdaContext([&mock_threads, ctx](int) {
std::lock_guard timer_lock{mock_threads.timer_lock};
ctx->complete(0);
}), 0);
const std::string& o, librados::AioCompletionImpl *c,
bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) {
c->get();
- auto ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[instance_watcher, &mock_io_ctx, c, pbl](int r) {
instance_watcher->cancel_notify_requests("other");
encode(librbd::watcher::NotifyResponse(), *pbl);
const std::string& o, librados::AioCompletionImpl *c,
bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) {
c->get();
- auto ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[instance_watcher, &mock_io_ctx, c, pbl](int r) {
instance_watcher->cancel_notify_requests("other");
encode(librbd::watcher::NotifyResponse(), *pbl);
const std::string& o, librados::AioCompletionImpl *c,
bufferlist& bl, uint64_t timeout_ms, bufferlist *pbl) {
c->get();
- auto ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[instance_watcher, &mock_io_ctx, c, pbl](int r) {
instance_watcher->cancel_notify_requests("other");
encode(librbd::watcher::NotifyResponse(), *pbl);
void shut_down(Context *on_shutdown) {
if (MockManagedLock::get_instance().m_release_lock_on_shutdown) {
- on_shutdown = new FunctionContext(
+ on_shutdown = new LambdaContext(
[this, on_shutdown](int r) {
MockManagedLock::get_instance().m_release_lock_on_shutdown = false;
shut_down(on_shutdown);
void try_acquire_lock(Context *on_acquired) {
Context *post_acquire_ctx = create_async_context_callback(
- m_work_queue, new FunctionContext(
+ m_work_queue, new LambdaContext(
[this, on_acquired](int r) {
post_acquire_lock_handler(r, on_acquired);
}));
ceph_assert(MockManagedLock::get_instance().m_on_released == nullptr);
MockManagedLock::get_instance().m_on_released = on_released;
- Context *post_release_ctx = new FunctionContext(
+ Context *post_release_ctx = new LambdaContext(
[this](int r) {
ceph_assert(MockManagedLock::get_instance().m_on_released != nullptr);
post_release_lock_handler(false, r,
MockManagedLock::get_instance().m_on_released = nullptr;
});
- Context *release_ctx = new FunctionContext(
+ Context *release_ctx = new LambdaContext(
[post_release_ctx](int r) {
if (r < 0) {
MockManagedLock::get_instance().m_on_released->complete(r);
}
});
- Context *pre_release_ctx = new FunctionContext(
+ Context *pre_release_ctx = new LambdaContext(
[this, release_ctx](int r) {
bool shutting_down =
MockManagedLock::get_instance().m_release_lock_on_shutdown;
if (on_finish != nullptr) {
auto on_released = mock_managed_lock.m_on_released;
ceph_assert(on_released != nullptr);
- mock_managed_lock.m_on_released = new FunctionContext(
+ mock_managed_lock.m_on_released = new LambdaContext(
[on_released, on_finish](int r) {
on_released->complete(r);
on_finish->complete(r);
// test namespace replayer init fails for non leader
C_SaferCond on_ns1_init;
- auto ctx = new FunctionContext(
+ Context* ctx = new LambdaContext(
[&mock_namespace, &on_ns1_init](int r) {
mock_namespace.remove("ns1");
on_ns1_init.complete(r);
expect_namespace_replayer_handle_acquire_leader(*mock_ns2_namespace_replayer,
-EINVAL);
- ctx = new FunctionContext(
+ ctx = new LambdaContext(
[&mock_namespace](int) {
mock_namespace.remove("ns2");
});
// test namespace replayer init fails on acquire leader
C_SaferCond on_ns3_shut_down;
- ctx = new FunctionContext(
+ ctx = new LambdaContext(
[&mock_namespace, &on_ns3_shut_down](int) {
mock_namespace.remove("ns3");
on_ns3_shut_down.complete(0);
EXPECT_CALL(*mock_threads.timer, add_event_after(_, _))
.WillOnce(DoAll(WithArg<1>(Invoke([this](Context *ctx) {
auto wrapped_ctx =
- new FunctionContext([this, ctx](int r) {
+ new LambdaContext([this, ctx](int r) {
std::lock_guard timer_locker{m_threads->timer_lock};
ctx->complete(r);
});
bool finished = false;
std::condition_variable cond;
std::mutex m;
- txn->register_on_complete(make_lambda_context([&]() {
+ txn->register_on_complete(make_lambda_context([&](int) {
std::unique_lock lock{m};
finished = true;
cond.notify_one();
int CacheClient::connect() {
int ret = -1;
C_SaferCond cond;
- Context* on_finish = new FunctionContext([&cond, &ret](int err) {
+ Context* on_finish = new LambdaContext([&cond, &ret](int err) {
ret = err;
cond.complete(err);
});
}
ceph_assert(current_request != nullptr);
- auto process_reply = new FunctionContext([current_request, reply]
+ auto process_reply = new LambdaContext([current_request, reply]
(bool dedicated) {
if (dedicated) {
// dedicated thrad to execute this context.
librados::bufferlist* read_buf = new librados::bufferlist();
- auto ctx = new FunctionContext([this, read_buf, cache_file_name](int ret) {
+ auto ctx = new LambdaContext([this, read_buf, cache_file_name](int ret) {
handle_promote_callback(ret, read_buf, cache_file_name);
});
dout(20) << this << " " << __func__ << ": image_name=" << m_image_name
<< dendl;
- auto ctx = new FunctionContext([this](int r) {
+ auto ctx = new LambdaContext([this](int r) {
handle_finalize(r);
});
void ImageDeleter<I>::shut_down_trash_watcher(Context* on_finish) {
dout(10) << dendl;
ceph_assert(m_trash_watcher);
- auto ctx = new FunctionContext([this, on_finish](int r) {
+ auto ctx = new LambdaContext([this, on_finish](int r) {
delete m_trash_watcher;
m_trash_watcher = nullptr;
cancel_retry_timer();
}
- auto ctx = new FunctionContext([this, on_finish](int) {
+ auto ctx = new LambdaContext([this, on_finish](int) {
cancel_all_deletions(on_finish);
});
m_async_op_tracker.wait_for_ops(ctx);
Context* on_finish) {
dout(5) << "image_id=" << image_id << dendl;
- on_finish = new FunctionContext([this, on_finish](int r) {
+ on_finish = new LambdaContext([this, on_finish](int r) {
m_threads->work_queue->queue(on_finish, r);
});
ceph_assert(delete_info);
auto on_start = create_async_context_callback(
- m_threads->work_queue, new FunctionContext(
+ m_threads->work_queue, new LambdaContext(
[this, delete_info](int r) {
if (r < 0) {
notify_on_delete(delete_info->image_id, r);
m_in_flight_delete_queue.push_back(delete_info);
m_async_op_tracker.start_op();
- auto ctx = new FunctionContext([this, delete_info](int r) {
+ auto ctx = new LambdaContext([this, delete_info](int r) {
handle_remove_image(delete_info, r);
m_async_op_tracker.finish_op();
});
dout(10) << dendl;
auto &delete_info = m_retry_delete_queue.front();
- m_timer_ctx = new FunctionContext([this](int r) {
+ m_timer_ctx = new LambdaContext([this](int r) {
handle_retry_timer();
});
m_threads->timer->add_event_at(delete_info->retry_time, m_timer_ctx);
// start (concurrent) removal of images
m_async_op_tracker.start_op();
- auto ctx = new FunctionContext([this](int r) {
+ auto ctx = new LambdaContext([this](int r) {
remove_images();
m_async_op_tracker.finish_op();
});
dout(5) << "updates=[" << map_updates << "], "
<< "removes=[" << map_removals << "]" << dendl;
- Context *on_finish = new FunctionContext(
+ Context *on_finish = new LambdaContext(
[this, map_updates, map_removals](int r) {
handle_update_request(map_updates, map_removals, r);
finish_async_op();
}
}
- m_timer_task = new FunctionContext([this](int r) {
+ m_timer_task = new LambdaContext([this](int r) {
ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
m_timer_task = nullptr;
m_threads->timer->cancel_event(m_rebalance_task);
}
- m_rebalance_task = new FunctionContext([this](int _) {
+ m_rebalance_task = new LambdaContext([this](int _) {
ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
m_rebalance_task = nullptr;
template <typename I>
void ImageReplayer<I>::RemoteJournalerListener::handle_update(
::journal::JournalMetadata *) {
- FunctionContext *ctx = new FunctionContext([this](int r) {
+ auto ctx = new LambdaContext([this](int r) {
replayer->handle_remote_journal_metadata_updated();
});
replayer->m_threads->work_queue->queue(ctx, 0);
void ImageReplayer<I>::on_start_fail(int r, const std::string &desc)
{
dout(10) << "r=" << r << dendl;
- Context *ctx = new FunctionContext([this, r, desc](int _r) {
+ Context *ctx = new LambdaContext([this, r, desc](int _r) {
{
std::lock_guard locker{m_lock};
ceph_assert(m_state == STATE_STARTING);
template <typename I>
void ImageReplayer<I>::restart(Context *on_finish)
{
- FunctionContext *ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[this, on_finish](int r) {
if (r < 0) {
// Try start anyway.
}
dout(15) << dendl;
- auto ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[this, on_flush](int r) {
handle_flush_local_replay(on_flush, r);
});
}
dout(15) << dendl;
- auto ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[this, on_flush](int r) {
handle_flush_commit_position(on_flush, r);
});
// replayer to handle the new tag epoch
Context *ctx = create_context_callback<
ImageReplayer<I>, &ImageReplayer<I>::handle_replay_flush>(this);
- ctx = new FunctionContext([this, ctx](int r) {
+ ctx = new LambdaContext([this, ctx](int r) {
m_local_image_ctx->journal->stop_external_replay();
m_local_replay = nullptr;
std::lock_guard timer_locker{m_threads->timer_lock};
ceph_assert(m_delayed_preprocess_task == nullptr);
- m_delayed_preprocess_task = new FunctionContext(
+ m_delayed_preprocess_task = new LambdaContext(
[this](int r) {
ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
m_delayed_preprocess_task = nullptr;
g_perf_counters->tinc(l_rbd_mirror_replay_latency, latency);
}
- auto ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[this, bytes, latency](int r) {
std::lock_guard locker{m_lock};
if (m_perf_counters) {
void ImageReplayer<I>::queue_mirror_image_status_update(const OptionalState &state) {
dout(15) << dendl;
- auto ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[this, state](int r) {
send_mirror_status_update(state);
});
// ensure pending IO is flushed and the commit position is updated
// prior to updating the mirror status
- ctx = new FunctionContext(
- [this, ctx](int r) {
+ auto ctx2 = new LambdaContext(
+ [this, ctx=std::move(ctx)](int r) {
flush_local_replay(ctx);
});
- m_threads->work_queue->queue(ctx, 0);
+ m_threads->work_queue->queue(ctx2, 0);
}
template <typename I>
case STATE_REPLAY_FLUSHING:
status.state = cls::rbd::MIRROR_IMAGE_STATUS_STATE_REPLAYING;
{
- Context *on_req_finish = new FunctionContext(
+ Context *on_req_finish = new LambdaContext(
[this](int r) {
dout(15) << "replay status ready: r=" << r << dendl;
if (r >= 0) {
if (new_interval >= 0 && is_running_() &&
start_mirror_image_status_update(true, false)) {
- m_update_status_task = new FunctionContext(
+ m_update_status_task = new LambdaContext(
[this](int r) {
ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
m_update_status_task = nullptr;
if (m_in_flight_status_updates > 0) {
if (m_on_update_status_finish == nullptr) {
dout(15) << "waiting for in-flight status update" << dendl;
- m_on_update_status_finish = new FunctionContext(
+ m_on_update_status_finish = new LambdaContext(
[this, r](int _r) {
shut_down(r);
});
// case the remote cluster is unreachable
// chain the shut down sequence (reverse order)
- Context *ctx = new FunctionContext(
+ Context *ctx = new LambdaContext(
[this, r](int _r) {
update_mirror_image_status(true, STATE_STOPPED);
handle_shut_down(r);
// close the remote journal
if (m_remote_journaler != nullptr) {
- ctx = new FunctionContext([this, ctx](int r) {
+ ctx = new LambdaContext([this, ctx](int r) {
delete m_remote_journaler;
m_remote_journaler = nullptr;
ctx->complete(0);
});
- ctx = new FunctionContext([this, ctx](int r) {
+ ctx = new LambdaContext([this, ctx](int r) {
m_remote_journaler->remove_listener(&m_remote_listener);
m_remote_journaler->shut_down(ctx);
});
// stop the replay of remote journal events
if (m_replay_handler != nullptr) {
- ctx = new FunctionContext([this, ctx](int r) {
+ ctx = new LambdaContext([this, ctx](int r) {
delete m_replay_handler;
m_replay_handler = nullptr;
m_event_replay_tracker.wait_for_ops(ctx);
});
- ctx = new FunctionContext([this, ctx](int r) {
+ ctx = new LambdaContext([this, ctx](int r) {
m_remote_journaler->stop_replay(ctx);
});
}
// close the local image (release exclusive lock)
if (m_local_image_ctx) {
- ctx = new FunctionContext([this, ctx](int r) {
+ ctx = new LambdaContext([this, ctx](int r) {
CloseImageRequest<I> *request = CloseImageRequest<I>::create(
&m_local_image_ctx, ctx);
request->send();
// shut down event replay into the local image
if (m_local_journal != nullptr) {
- ctx = new FunctionContext([this, ctx](int r) {
+ ctx = new LambdaContext([this, ctx](int r) {
m_local_journal = nullptr;
ctx->complete(0);
});
if (m_local_replay != nullptr) {
- ctx = new FunctionContext([this, ctx](int r) {
+ ctx = new LambdaContext([this, ctx](int r) {
m_local_journal->stop_external_replay();
m_local_replay = nullptr;
ctx->complete(0);
});
}
- ctx = new FunctionContext([this, ctx](int r) {
+ ctx = new LambdaContext([this, ctx](int r) {
// blocks if listener notification is in-progress
m_local_journal->remove_listener(m_journal_listener);
ctx->complete(0);
}
// wait for all local in-flight replay events to complete
- ctx = new FunctionContext([this, ctx](int r) {
+ ctx = new LambdaContext([this, ctx](int r) {
if (r < 0) {
derr << "error shutting down journal replay: " << cpp_strerror(r)
<< dendl;
// flush any local in-flight replay events
if (m_local_replay != nullptr) {
- ctx = new FunctionContext([this, ctx](int r) {
+ ctx = new LambdaContext([this, ctx](int r) {
m_local_replay->shut_down(true, ctx);
});
}
if (m_in_flight_status_updates > 0) {
if (m_on_update_status_finish == nullptr) {
dout(15) << "waiting for in-flight status update" << dendl;
- m_on_update_status_finish = new FunctionContext(
+ m_on_update_status_finish = new LambdaContext(
[this, r](int _r) {
handle_shut_down(r);
});
if (delete_requested || resync_requested) {
dout(5) << "moving image to trash" << dendl;
- auto ctx = new FunctionContext([this, r](int) {
+ auto ctx = new LambdaContext([this, r](int) {
handle_shut_down(r);
});
ImageDeleter<I>::trash_move(m_local_io_ctx, m_global_image_id,
m_updating_sync_point = false;
if (m_image_copy_request != nullptr) {
- m_update_sync_ctx = new FunctionContext(
+ m_update_sync_ctx = new LambdaContext(
[this](int r) {
std::lock_guard locker{m_lock};
this->send_update_sync_point();
void InstanceReplayer<I>::init(Context *on_finish) {
dout(10) << dendl;
- Context *ctx = new FunctionContext(
+ Context *ctx = new LambdaContext(
[this, on_finish] (int r) {
{
std::lock_guard timer_locker{m_threads->timer_lock};
ceph_assert(m_on_shut_down == nullptr);
m_on_shut_down = on_finish;
- Context *ctx = new FunctionContext(
+ Context *ctx = new LambdaContext(
[this] (int r) {
cancel_image_state_check_task();
wait_for_ops();
it = m_image_replayers.erase(it)) {
auto image_replayer = it->second;
auto ctx = gather_ctx->new_sub();
- ctx = new FunctionContext(
+ ctx = new LambdaContext(
[image_replayer, ctx] (int r) {
image_replayer->destroy();
ctx->complete(0);
auto image_replayer = it->second;
m_image_replayers.erase(it);
- on_finish = new FunctionContext(
+ on_finish = new LambdaContext(
[image_replayer, on_finish] (int r) {
image_replayer->destroy();
on_finish->complete(0);
m_async_op_tracker.start_op();
Context *ctx = create_async_context_callback(
- m_threads->work_queue, new FunctionContext(
+ m_threads->work_queue, new LambdaContext(
[this, image_replayer, on_finish] (int r) {
stop_image_replayer(image_replayer, on_finish);
m_async_op_tracker.finish_op();
int after = 1;
dout(10) << "scheduling image replayer " << image_replayer << " stop after "
<< after << " sec (task " << ctx << ")" << dendl;
- ctx = new FunctionContext(
+ ctx = new LambdaContext(
[this, after, ctx] (int r) {
std::lock_guard timer_locker{m_threads->timer_lock};
m_threads->timer->add_event_after(after, ctx);
ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
ceph_assert(m_image_state_check_task == nullptr);
- m_image_state_check_task = new FunctionContext(
+ m_image_state_check_task = new LambdaContext(
[this](int r) {
ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
m_image_state_check_task = nullptr;
bufferlist bl;
encode(NotifyMessage{SyncStartPayload{request_id, sync_id}}, bl);
- auto ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[this, sync_id] (int r) {
dout(10) << "finish: sync_id=" << sync_id << ", r=" << r << dendl;
std::lock_guard locker{m_lock};
m_requests.erase(it);
} else {
ctx = create_async_context_callback(
- m_work_queue, new FunctionContext(
+ m_work_queue, new LambdaContext(
[this, instance_id, request_id] (int r) {
complete_request(instance_id, request_id, r);
}));
const std::string &global_image_id, Context *on_finish) {
dout(10) << "global_image_id=" << global_image_id << dendl;
- auto ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[this, global_image_id, on_finish] (int r) {
m_instance_replayer->acquire_image(this, global_image_id, on_finish);
m_notify_op_tracker.finish_op();
const std::string &global_image_id, Context *on_finish) {
dout(10) << "global_image_id=" << global_image_id << dendl;
- auto ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[this, global_image_id, on_finish] (int r) {
m_instance_replayer->release_image(global_image_id, on_finish);
m_notify_op_tracker.finish_op();
dout(10) << "global_image_id=" << global_image_id << ", "
<< "peer_mirror_uuid=" << peer_mirror_uuid << dendl;
- auto ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[this, peer_mirror_uuid, global_image_id, on_finish] (int r) {
m_instance_replayer->remove_peer_image(global_image_id,
peer_mirror_uuid, on_finish);
}
Context *on_start = create_async_context_callback(
- m_work_queue, new FunctionContext(
+ m_work_queue, new LambdaContext(
[this, instance_id, sync_id, on_finish] (int r) {
dout(10) << "handle_sync_request: finish: instance_id=" << instance_id
<< ", sync_id=" << sync_id << ", r=" << r << dendl;
ceph_assert(m_on_finish == nullptr);
m_on_finish = on_finish;
- Context *ctx = new FunctionContext(
+ Context *ctx = new LambdaContext(
[this](int r) {
std::scoped_lock locker{m_threads->timer_lock, m_lock};
cancel_remove_task();
ceph_assert(!instance_ids.empty());
dout(10) << "instance_ids=" << instance_ids << dendl;
- Context* ctx = new FunctionContext([this, instance_ids](int r) {
+ Context* ctx = new LambdaContext([this, instance_ids](int r) {
handle_remove_instances(r, instance_ids);
});
ctx = create_async_context_callback(m_threads->work_queue, ctx);
dout(10) << dendl;
// schedule a time to fire when the oldest instance should be removed
- m_timer_task = new FunctionContext(
+ m_timer_task = new LambdaContext(
[this, oldest_time](int r) {
ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
std::lock_guard locker{m_lock};
ceph_assert(!m_timer_op_tracker.empty());
m_timer_op_tracker.finish_op();
- auto ctx = new FunctionContext([this](int r) {
+ auto ctx = new LambdaContext([this](int r) {
Context *on_finish;
{
// ensure lock isn't held when completing shut down
cancel_timer_task();
- m_timer_task = new FunctionContext(
+ m_timer_task = new LambdaContext(
[this, leader, timer_callback](int r) {
ceph_assert(ceph_mutex_is_locked(m_threads->timer_lock));
m_timer_task = nullptr;
return;
}
- auto ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[this](int r) {
std::string instance_id;
if (get_leader_instance_id(&instance_id)) {
LeaderWatcher<I>, &LeaderWatcher<I>::handle_notify_listener>(this));
if (is_leader(m_lock)) {
- ctx = new FunctionContext(
+ ctx = new LambdaContext(
[this, ctx](int r) {
m_listener->post_acquire_handler(ctx);
});
} else {
- ctx = new FunctionContext(
+ ctx = new LambdaContext(
[this, ctx](int r) {
m_listener->pre_release_handler(ctx);
});
void MirrorStatusWatcher<I>::init(Context *on_finish) {
dout(20) << dendl;
- on_finish = new FunctionContext(
+ on_finish = new LambdaContext(
[this, on_finish] (int r) {
if (r < 0) {
derr << "error removing down statuses: " << cpp_strerror(r) << dendl;
}
}
- auto ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[this] (int r) {
std::lock_guard locker{m_lock};
stop_instance_replayer();
m_instance_watcher->get_instance_id(),
m_image_map_listener));
- auto ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[this, on_finish](int r) {
handle_init_image_map(r, on_finish);
});
dout(10) << "r=" << r << dendl;
if (r < 0) {
derr << "failed to init image map: " << cpp_strerror(r) << dendl;
- on_finish = new FunctionContext([on_finish, r](int) {
+ on_finish = new LambdaContext([on_finish, r](int) {
on_finish->complete(r);
});
shut_down_image_map(on_finish);
// ensure the initial set of local images is up-to-date
// after acquiring the leader role
- auto ctx = new FunctionContext([this, on_finish](int r) {
+ auto ctx = new LambdaContext([this, on_finish](int r) {
handle_init_local_pool_watcher(r, on_finish);
});
m_local_pool_watcher->init(create_async_context_callback(
dout(10) << "r=" << r << dendl;
if (r < 0) {
derr << "failed to retrieve local images: " << cpp_strerror(r) << dendl;
- on_finish = new FunctionContext([on_finish, r](int) {
+ on_finish = new LambdaContext([on_finish, r](int) {
on_finish->complete(r);
});
shut_down_pool_watchers(on_finish);
m_remote_pool_watcher.reset(PoolWatcher<I>::create(
m_threads, m_remote_io_ctx, m_remote_pool_watcher_listener));
- auto ctx = new FunctionContext([this, on_finish](int r) {
+ auto ctx = new LambdaContext([this, on_finish](int r) {
handle_init_remote_pool_watcher(r, on_finish);
});
m_remote_pool_watcher->init(create_async_context_callback(
dout(0) << "remote peer does not have mirroring configured" << dendl;
} else if (r < 0) {
derr << "failed to retrieve remote images: " << cpp_strerror(r) << dendl;
- on_finish = new FunctionContext([on_finish, r](int) {
+ on_finish = new LambdaContext([on_finish, r](int) {
on_finish->complete(r);
});
shut_down_pool_watchers(on_finish);
std::lock_guard locker{m_lock};
ceph_assert(!m_image_deleter);
- on_finish = new FunctionContext([this, on_finish](int r) {
+ on_finish = new LambdaContext([this, on_finish](int r) {
handle_init_image_deleter(r, on_finish);
});
m_image_deleter.reset(ImageDeleter<I>::create(m_local_io_ctx, m_threads,
dout(10) << "r=" << r << dendl;
if (r < 0) {
derr << "failed to init image deleter: " << cpp_strerror(r) << dendl;
- on_finish = new FunctionContext([on_finish, r](int) {
+ on_finish = new LambdaContext([on_finish, r](int) {
on_finish->complete(r);
});
shut_down_image_deleter(on_finish);
{
std::lock_guard locker{m_lock};
if (m_image_deleter) {
- Context *ctx = new FunctionContext([this, on_finish](int r) {
+ Context *ctx = new LambdaContext([this, on_finish](int r) {
handle_shut_down_image_deleter(r, on_finish);
});
ctx = create_async_context_callback(m_threads->work_queue, ctx);
{
std::lock_guard locker{m_lock};
if (m_local_pool_watcher) {
- Context *ctx = new FunctionContext([this, on_finish](int r) {
+ Context *ctx = new LambdaContext([this, on_finish](int r) {
handle_shut_down_pool_watchers(r, on_finish);
});
ctx = create_async_context_callback(m_threads->work_queue, ctx);
std::lock_guard locker{m_lock};
if (m_image_map) {
- on_finish = new FunctionContext(
+ on_finish = new LambdaContext(
[this, on_finish](int r) {
handle_shut_down_image_map(r, on_finish);
});
auto iter = mirroring_namespaces.find(it->first);
if (iter == mirroring_namespaces.end()) {
auto namespace_replayer = it->second;
- auto on_shut_down = new FunctionContext(
+ auto on_shut_down = new LambdaContext(
[this, namespace_replayer, ctx=gather_ctx->new_sub()](int r) {
delete namespace_replayer;
ctx->complete(r);
m_threads, m_image_sync_throttler.get(),
m_image_deletion_throttler.get(), m_service_daemon,
m_cache_manager_handler);
- auto on_init = new FunctionContext(
+ auto on_init = new LambdaContext(
[this, namespace_replayer, name, &mirroring_namespaces,
ctx=gather_ctx->new_sub()](int r) {
if (r < 0) {
auto it = m_namespace_replayers.find(name);
ceph_assert(it != m_namespace_replayers.end());
- on_finish = new FunctionContext(
+ on_finish = new LambdaContext(
[this, name, on_finish](int r) {
if (r < 0) {
derr << "failed to handle acquire leader for namespace: "
auto namespace_replayer = m_namespace_replayers[name];
m_namespace_replayers.erase(name);
- auto on_shut_down = new FunctionContext(
+ auto on_shut_down = new LambdaContext(
[this, namespace_replayer, on_finish](int r) {
delete namespace_replayer;
on_finish->complete(r);
m_service_daemon->add_or_update_attribute(m_local_pool_id,
SERVICE_DAEMON_LEADER_KEY,
true);
- auto ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[this, on_finish](int r) {
if (r == 0) {
std::lock_guard locker{m_lock};
std::lock_guard locker{m_lock};
on_finish = librbd::util::create_async_context_callback(
- m_threads->work_queue, new FunctionContext(
+ m_threads->work_queue, new LambdaContext(
[this, on_finish](int r) {
{
std::lock_guard locker{m_lock};
on_finish->complete(r);
}));
- auto on_lock = new FunctionContext(
+ auto on_lock = new LambdaContext(
[this, callback, on_finish](int) {
std::lock_guard locker{m_lock};
ceph_assert(m_namespace_replayers_locked);
dout(5) << dendl;
m_async_op_tracker.start_op();
- Context *ctx = new FunctionContext([this](int r) {
+ Context *ctx = new LambdaContext([this](int r) {
dout(5) << "unregister_watcher: r=" << r << dendl;
if (r < 0) {
derr << "error unregistering watcher for "
m_image_ids_invalid = true;
m_timer_ctx = m_threads->timer->add_event_after(
interval,
- new FunctionContext([this](int r) {
+ new LambdaContext([this](int r) {
process_refresh_images();
}));
}
// execute outside of the timer's lock
m_async_op_tracker.start_op();
- Context *ctx = new FunctionContext([this](int r) {
+ Context *ctx = new LambdaContext([this](int r) {
register_watcher();
m_async_op_tracker.finish_op();
});
dout(20) << dendl;
m_async_op_tracker.start_op();
- Context *ctx = new FunctionContext([this](int r) {
+ Context *ctx = new LambdaContext([this](int r) {
notify_listener();
m_async_op_tracker.finish_op();
});
return;
}
- m_timer_ctx = new FunctionContext([this](int) {
+ m_timer_ctx = new LambdaContext([this](int) {
m_timer_ctx = nullptr;
update_status();
});
return;
}
- auto ctx = new FunctionContext([this, finish_op_ctx](int r) {
+ auto ctx = new LambdaContext([this, finish_op_ctx](int r) {
handle_snap_unprotect(r);
finish_op_ctx->complete(0);
});
return;
}
- auto ctx = new FunctionContext([this, finish_op_ctx](int r) {
+ auto ctx = new LambdaContext([this, finish_op_ctx](int r) {
handle_snap_remove(r);
finish_op_ctx->complete(0);
});
}
}
- auto ctx = new FunctionContext([this, on_finish](int r) {
+ auto ctx = new LambdaContext([this, on_finish](int r) {
unregister_watcher(on_finish);
});
m_async_op_tracker.wait_for_ops(ctx);
dout(5) << dendl;
m_async_op_tracker.start_op();
- Context *ctx = new FunctionContext([this, on_finish](int r) {
+ Context *ctx = new LambdaContext([this, on_finish](int r) {
handle_unregister_watcher(r, on_finish);
});
this->unregister_watch(ctx);
dout(5) << dendl;
m_timer_ctx = m_threads->timer->add_event_after(
interval,
- new FunctionContext([this](int r) {
+ new LambdaContext([this](int r) {
process_trash_list();
}));
}
// execute outside of the timer's lock
m_async_op_tracker.start_op();
- Context *ctx = new FunctionContext([this](int r) {
+ Context *ctx = new LambdaContext([this](int r) {
create_trash();
m_async_op_tracker.finish_op();
});
<< "deferment_end_time=" << deferment_end_time << dendl;
m_async_op_tracker.start_op();
- auto ctx = new FunctionContext([this, image_id, deferment_end_time](int r) {
+ auto ctx = new LambdaContext([this, image_id, deferment_end_time](int r) {
m_trash_listener.handle_trash_image(image_id,
deferment_end_time.to_real_time());
m_async_op_tracker.finish_op();
dout(20) << "master_tag_tid=" << master_tag_tid << ", mirror_tag_tid="
<< mirror_tag_tid << dendl;
- FunctionContext *ctx = new FunctionContext(
+ auto ctx = new LambdaContext(
[this, master_tag_tid, mirror_tag_tid](int r) {
handle_update_tag_cache(master_tag_tid, mirror_tag_tid, r);
});