return with_blocking_future(handle.enter(pp(pg).get_obc));
}).then([this, &pg]() -> PG::load_obc_ertr::future<> {
op_info.set_from_op(&*m, *pg.get_osdmap());
- if (pg.is_primary()) {
- // primary can handle both normal ops and balanced reads
- } else if (is_misdirected(pg)) {
- logger().trace("process_op: dropping misdirected op");
- return seastar::now();
- } else if (!pg.get_peering_state().can_serve_replica_read(m->get_hobj())) {
- auto reply = make_message<MOSDOpReply>(
- m.get(), -EAGAIN, pg.get_osdmap_epoch(),
- m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK),
- !m->has_flag(CEPH_OSD_FLAG_RETURNVEC));
- return conn->send(reply);
- }
- return pg.with_locked_obc(
- m,
- op_info,
- this,
- [this, &pg](auto obc) {
- return with_blocking_future(handle.enter(pp(pg).process)
- ).then([this, &pg, obc]() {
- return pg.do_osd_ops(m, obc, op_info);
- }).then([this](Ref<MOSDOpReply> reply) {
- return conn->send(reply);
- });
+ return pg.with_locked_obc(m, op_info, this, [this, &pg](auto obc) {
+ return with_blocking_future(
+ handle.enter(pp(pg).process)
+ ).then([this, &pg, obc] {
+ if (!pg.is_primary()) {
+ // primary can handle both normal ops and balanced reads
+ if (is_misdirected(pg)) {
+ logger().trace("process_op: dropping misdirected op");
+ return seastar::make_ready_future<Ref<MOSDOpReply>>();
+ } else if (const hobject_t& hoid = m->get_hobj();
+ !pg.get_peering_state().can_serve_replica_read(hoid)) {
+ auto reply = make_message<MOSDOpReply>(
+ m.get(), -EAGAIN, pg.get_osdmap_epoch(),
+ m->get_flags() & (CEPH_OSD_FLAG_ACK|CEPH_OSD_FLAG_ONDISK),
+ !m->has_flag(CEPH_OSD_FLAG_RETURNVEC));
+ return seastar::make_ready_future<Ref<MOSDOpReply>>(std::move(reply));
+ }
+ }
+ return pg.do_osd_ops(m, obc, op_info);
+ }).then([this](Ref<MOSDOpReply> reply) {
+ if (reply) {
+ return conn->send(std::move(reply));
+ } else {
+ return seastar::now();
+ }
});
+ });
}).safe_then([pgref=std::move(pgref)] {
return seastar::now();
}, PG::load_obc_ertr::all_same_way([](auto &code) {