peering_pp(*pg).process
).then_interruptible([this] {
pg->get_recovery_handler()->dispatch_backfill_event(std::move(evt));
+ return handle.complete();
+ }).then_interruptible([] {
return seastar::make_ready_future<bool>(false);
+ }).finally([this] {
+ handle.exit();
});
}
} else {
return process_op(ihref, pgref);
}
+ }).then_interruptible([this, this_instance_id, &ihref] {
+ logger().debug("{}.{}: complete", *this, this_instance_id);
+ return ihref.handle.complete();
}).then_interruptible([this, this_instance_id, pgref] {
logger().debug("{}.{}: after process*", *this, this_instance_id);
pgref->client_request_orderer.remove_request(*this);
}, [this, this_instance_id, pgref](std::exception_ptr eptr) {
// TODO: better debug output
logger().debug("{}.{}: interrupted {}", *this, this_instance_id, eptr);
- }, pgref).finally(
- [opref=std::move(opref), pgref=std::move(pgref),
- instance_handle=std::move(instance_handle), &ihref] {
- ihref.handle.exit();
- });
+ },
+ pgref
+ ).finally(
+ [opref=std::move(opref), pgref,
+ instance_handle=std::move(instance_handle), &ihref,
+ this_instance_id, this] {
+ logger().debug("{}.{}: exit", *this, this_instance_id);
+ ihref.handle.exit();
+ });
}
seastar::future<> ClientRequest::with_pg(
});
});
});
- }).handle_error_interruptible(PG::load_obc_ertr::all_same_way([] {
- return seastar::now();
- })).then_interruptible([] {
+ }).si_then([this] {
+ logger().debug("{}: complete", *this);
+ return handle.complete();
+ }).handle_error_interruptible(
+ PG::load_obc_ertr::all_same_way([] {
+ return seastar::now();
+ })
+ ).then_interruptible([] {
return seastar::stop_iteration::yes;
});
}, [this](std::exception_ptr eptr) {
}, pg);
}).then([this] {
track_event<CompletionEvent>();
+ }).finally([this] {
+ logger().debug("{}: exit", *this);
+ handle.exit();
});
});
}
});
}).then_interruptible([this, pg](auto) {
return pg->do_update_log_missing(req, conn);
+ }).then_interruptible([this] {
+ logger().debug("{}: complete", *this);
+ return handle.complete();
});
- }, [ref](std::exception_ptr) { return seastar::now(); }, pg);
+ }, [](std::exception_ptr) {
+ return seastar::now();
+ }, pg).finally([this, ref] {
+ logger().debug("{}: exit", *this);
+ handle.exit();
+ });
}
}
IRef ref = this;
return interruptor::with_interruption([this, pg] {
- return pg->do_update_log_missing_reply(std::move(req));
- }, [ref](std::exception_ptr) { return seastar::now(); }, pg);
+ return pg->do_update_log_missing_reply(std::move(req)
+ ).then_interruptible([this] {
+ logger().debug("{}: complete", *this);
+ return handle.complete();
+ });
+ }, [](std::exception_ptr) {
+ return seastar::now();
+ }, pg).finally([this, ref] {
+ logger().debug("{}: exit", *this);
+ handle.exit();
+ });
}
}
return this->template enter_stage<interruptor>(peering_pp(*pg).process);
}).then_interruptible([this, pg, &shard_services] {
return pg->do_peering_event(evt, ctx
- ).then_interruptible([this, pg, &shard_services] {
- that()->get_handle().exit();
+ ).then_interruptible([this] {
+ return that()->get_handle().complete();
+ }).then_interruptible([this, pg, &shard_services] {
return complete_rctx(shard_services, pg);
});
}).then_interruptible([pg, &shard_services]()
});
}, [this](std::exception_ptr ep) {
logger().debug("{}: interrupted with {}", *this, ep);
- }, pg);
+ }, pg).finally([this] {
+ logger().debug("{}: exit", *this);
+ that()->get_handle().exit();
+ });
}
template <class T>
return shard_services.send_pg_temp();
});
});
- }).then([this, ref=std::move(ref)] {
+ }).then([this] {
logger().debug("{}: complete", *this);
+ return handle.complete();
+ }).finally([this, ref=std::move(ref)] {
+ logger().debug("{}: exit", *this);
+ handle.exit();
});
}
track_event<StartEvent>();
IRef opref = this;
return interruptor::with_interruption([this, pgref] {
- return pgref->get_recovery_backend()->handle_recovery_op(m, conn);
+ return pgref->get_recovery_backend()->handle_recovery_op(m, conn
+ ).then_interruptible([this] {
+ logger().debug("{}: complete", *this);
+ return handle.complete();
+ });
}, [](std::exception_ptr) {
return seastar::now();
- }, pgref).finally([this, opref, pgref] {
+ }, pgref).finally([this, opref=std::move(opref), pgref] {
+ logger().debug("{}: exit", *this);
track_event<CompletionEvent>();
+ handle.exit();
});
}
});
}).then_interruptible([this, pg] (auto) {
return pg->handle_rep_op(req);
+ }).then_interruptible([this] {
+ logger().debug("{}: complete", *this);
+ return handle.complete();
});
- }, [ref](std::exception_ptr) {
+ }, [](std::exception_ptr) {
return seastar::now();
- }, pg);
+ }, pg).finally([this, ref=std::move(ref)] {
+ logger().debug("{}: exit", *this);
+ handle.exit();
+ });
}
}
f->close_section();
}
-SnapTrimEvent::snap_trim_ertr::future<seastar::stop_iteration>
-SnapTrimEvent::start()
-{
- logger().debug("{}: {}", *this, __func__);
- return with_pg(
- pg->get_shard_services(), pg
- ).finally([ref=IRef{this}, this] {
- logger().debug("{}: complete", *ref);
- return handle.complete();
- });
-}
-
CommonPGPipeline& SnapTrimEvent::client_pp()
{
return pg->request_pg_pipeline;
}
SnapTrimEvent::snap_trim_ertr::future<seastar::stop_iteration>
-SnapTrimEvent::with_pg(
- ShardServices &shard_services, Ref<PG> _pg)
+SnapTrimEvent::start()
{
+ ShardServices &shard_services = pg->get_shard_services();
return interruptor::with_interruption([&shard_services, this] {
return enter_stage<interruptor>(
client_pp().wait_for_active
return subop_blocker.wait_completion();
}).finally([this] {
pg->snaptrim_mutex.unlock();
- }).safe_then_interruptible([this] {
+ }).si_then([this] {
if (!needs_pause) {
return interruptor::now();
}
return seastar::sleep(
std::chrono::milliseconds(std::lround(time_to_sleep * 1000)));
});
- }).safe_then_interruptible([this] {
+ }).si_then([this] {
logger().debug("{}: all completed", *this);
return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(
seastar::stop_iteration::no);
});
+ }).si_then([this](auto stop) {
+ return handle.complete().then([stop] {
+ return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(stop);
+ });
});
});
}, [this](std::exception_ptr eptr) -> snap_trim_ertr::future<seastar::stop_iteration> {
logger().debug("{}: interrupted {}", *this, eptr);
return crimson::ct_error::eagain::make();
- }, pg);
+ }, pg).finally([this] {
+ logger().debug("{}: exit", *this);
+ handle.exit();
+ });
}
return pg->request_pg_pipeline;
}
-SnapTrimObjSubEvent::remove_or_update_iertr::future<>
-SnapTrimObjSubEvent::start()
-{
- logger().debug("{}: start", *this);
- return with_pg(
- pg->get_shard_services(), pg
- ).finally([ref=IRef{this}, this] {
- logger().debug("{}: complete", *ref);
- return handle.complete();
- });
-}
-
SnapTrimObjSubEvent::remove_or_update_iertr::future<>
SnapTrimObjSubEvent::remove_clone(
ObjectContextRef obc,
*this, coid, old_snaps, new_snaps);
ret = adjust_snaps(obc, head_obc, new_snaps, txn, log_entries);
}
- return std::move(ret).safe_then_interruptible(
+ return std::move(ret).si_then(
[&txn, obc, num_objects_before_trim, log_entries=std::move(log_entries), head_obc=std::move(head_obc), this]() mutable {
osd_op_p.at_version = pg->next_version();
// num_objects_before_trim - delta_stats.num_objects;
//add_objects_trimmed_count(num_objects_trimmed);
}
- }).safe_then_interruptible(
+ }).si_then(
[&txn, log_entries=std::move(log_entries)] () mutable {
return remove_or_update_iertr::make_ready_future<remove_or_update_ret_t>(
std::make_pair(std::move(txn), std::move(log_entries)));
}
SnapTrimObjSubEvent::remove_or_update_iertr::future<>
-SnapTrimObjSubEvent::with_pg(
- ShardServices &shard_services, Ref<PG> _pg)
+SnapTrimObjSubEvent::start()
{
return enter_stage<interruptor>(
client_pp().wait_for_active
});
});
});
+ }).si_then([this] {
+ logger().debug("{}: completed", *this);
+ return handle.complete();
}).handle_error_interruptible(
remove_or_update_iertr::pass_further{},
crimson::ct_error::assert_all{"unexpected error in SnapTrimObjSubEvent"}
);
+ }).finally([this] {
+ logger().debug("{}: exit", *this);
+ handle.exit();
});
}
void print(std::ostream &) const final;
void dump_detail(ceph::Formatter* f) const final;
snap_trim_ertr::future<seastar::stop_iteration> start();
- snap_trim_ertr::future<seastar::stop_iteration> with_pg(
- ShardServices &shard_services, Ref<PG> pg);
private:
CommonPGPipeline& client_pp();
void print(std::ostream &) const final;
void dump_detail(ceph::Formatter* f) const final;
remove_or_update_iertr::future<> start();
- remove_or_update_iertr::future<> with_pg(
- ShardServices &shard_services, Ref<PG> pg);
CommonPGPipeline& client_pp();
opref.get_connection_pipeline().get_pg);
}).then([this, &opref] {
return get_pg_to_shard_mapping().maybe_create_pg(opref.get_pgid());
- }).then([this, &logger, op=std::move(op)](auto core) mutable {
+ }).then_wrapped([this, &logger, op=std::move(op)](auto fut) mutable {
+ if (unlikely(fut.failed())) {
+ logger.error("{}: failed before with_pg", *op);
+ op->get_handle().exit();
+ return seastar::make_exception_future<>(fut.get_exception());
+ }
+
+ auto core = fut.get();
logger.debug("{}: can_create={}, target-core={}",
*op, T::can_create(), core);
return this->template with_remote_shard_state_and_op<T>(