seastar::future<>
ClientRequest::do_process(Ref<PG>& pg, crimson::osd::ObjectContextRef obc)
{
- using do_ops_return_t =
- crimson::errorator<crimson::ct_error::eagain>::future<Ref<MOSDOpReply>>;
- return [&pg, obc]() -> do_ops_return_t {
- if (!pg->is_primary()) {
- // primary can handle both normal ops and balanced reads
- if (is_misdirected(*pg)) {
- logger().trace("process_op: dropping misdirected op");
- return seastar::make_ready_future<Ref<MOSDOpReply>>();
- } else if (const hobject_t& hoid = m->get_hobj();
- !pg->get_peering_state().can_serve_replica_read(hoid)) {
- auto reply = make_message<MOSDOpReply>(
- m.get(), -EAGAIN, pg->get_osdmap_epoch(),
- m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK),
- !m->has_flag(CEPH_OSD_FLAG_RETURNVEC));
- return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
- }
- }
- return pg->do_osd_ops(m, obc, op_info);
- }().safe_then([this](Ref<MOSDOpReply> reply) {
- if (reply) {
- return conn->send(std::move(reply));
- } else {
+ if (!pg->is_primary()) {
+ // primary can handle both normal ops and balanced reads
+ if (is_misdirected(*pg)) {
+ logger().trace("process_op: dropping misdirected op");
return seastar::now();
+ } else if (const hobject_t& hoid = m->get_hobj();
+ !pg->get_peering_state().can_serve_replica_read(hoid)) {
+ auto reply = make_message<MOSDOpReply>(
+ m.get(), -EAGAIN, pg->get_osdmap_epoch(),
+ m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK),
+ !m->has_flag(CEPH_OSD_FLAG_RETURNVEC));
+ return conn->send(std::move(reply));
}
+ }
+ return pg->do_osd_ops(m, obc, op_info).safe_then([this](Ref<MOSDOpReply> reply) {
+ return conn->send(std::move(reply));
}, crimson::ct_error::eagain::handle([this, &pg] {
return process_op(pg);
}));