[FNAME, this, pg, this_instance_id, obc, &ihref]() mutable {
DEBUGDPP("{}.{}: in process stage, calling do_process",
*pg, *this, this_instance_id);
- return do_process(ihref, pg, obc, this_instance_id);
- });
- }
- ).handle_error_interruptible(
- PG::load_obc_ertr::all_same_way(
- [FNAME, this, pg=std::move(pg), this_instance_id](
- const auto &code
- ) -> interruptible_future<> {
- DEBUGDPP("{}.{}: saw error code {}",
- *pg, *this, this_instance_id, code);
- assert(code.value() > 0);
- return reply_op_error(pg, -code.value());
- })
- );
+ return do_process(
+ ihref, pg, obc, this_instance_id
+ ).handle_error_interruptible(
+ crimson::ct_error::eagain::handle(
+ [this, pg, this_instance_id, &ihref]() mutable {
+ return process_op(ihref, pg, this_instance_id);
+ })
+ );
+ }
+ );
+ }).handle_error_interruptible(
+ PG::load_obc_ertr::all_same_way(
+ [FNAME, this, pg=std::move(pg), this_instance_id](
+ const auto &code
+ ) -> interruptible_future<> {
+ DEBUGDPP("{}.{}: saw error code {}",
+ *pg, *this, this_instance_id, code);
+ assert(code.value() > 0);
+ return reply_op_error(pg, -code.value());
+ })
+ );
}
-ClientRequest::interruptible_future<>
+ClientRequest::do_process_iertr::future<>
ClientRequest::do_process(
instance_handle_t &ihref,
Ref<PG> pg, crimson::osd::ObjectContextRef obc,
{
LOG_PREFIX(ClientRequest::do_process);
if (m->has_flag(CEPH_OSD_FLAG_PARALLELEXEC)) {
- return reply_op_error(pg, -EINVAL);
+ co_await reply_op_error(pg, -EINVAL);
+ co_return;
}
const pg_pool_t pool = pg->get_pgpool().info;
if (pool.has_flag(pg_pool_t::FLAG_EIO)) {
if (m->has_flag(CEPH_OSD_FLAG_SUPPORTSPOOLEIO)) {
DEBUGDPP("{}.{}: discarding op due to pool EIO flag",
*pg, *this, this_instance_id);
- return seastar::now();
+ co_return;
} else {
DEBUGDPP("{}.{}: replying EIO due to pool EIO flag",
*pg, *this, this_instance_id);
- return reply_op_error(pg, -EIO);
+ co_await reply_op_error(pg, -EIO);
+ co_return;
}
}
if (m->get_oid().name.size()
> crimson::common::local_conf()->osd_max_object_name_len) {
- return reply_op_error(pg, -ENAMETOOLONG);
+ co_await reply_op_error(pg, -ENAMETOOLONG);
+ co_return;
} else if (m->get_hobj().get_key().size()
> crimson::common::local_conf()->osd_max_object_name_len) {
- return reply_op_error(pg, -ENAMETOOLONG);
+ co_await reply_op_error(pg, -ENAMETOOLONG);
+ co_return;
} else if (m->get_hobj().nspace.size()
> crimson::common::local_conf()->osd_max_object_namespace_len) {
- return reply_op_error(pg, -ENAMETOOLONG);
+ co_await reply_op_error(pg, -ENAMETOOLONG);
+ co_return;
} else if (m->get_hobj().oid.name.empty()) {
- return reply_op_error(pg, -EINVAL);
+ co_await reply_op_error(pg, -EINVAL);
+ co_return;
} else if (m->get_hobj().is_internal_pg_local()) {
// clients are not allowed to write to hobject_t::INTERNAL_PG_LOCAL_NS
- return reply_op_error(pg, -EINVAL);
+ co_await reply_op_error(pg, -EINVAL);
+ co_return;
} else if (pg->get_osdmap()->is_blocklisted(
get_foreign_connection().get_peer_addr())) {
DEBUGDPP("{}.{}: {} is blocklisted",
*pg, *this, this_instance_id, get_foreign_connection().get_peer_addr());
- return reply_op_error(pg, -EBLOCKLISTED);
+ co_await reply_op_error(pg, -EBLOCKLISTED);
+ co_return;
}
if (!obc->obs.exists && !op_info.may_write()) {
- return reply_op_error(pg, -ENOENT);
+ co_await reply_op_error(pg, -ENOENT);
+ co_return;
}
SnapContext snapc = get_snapc(*pg,obc);
*pg, *this, this_instance_id,
snapc.seq, obc->ssc->snapset.seq,
obc->obs.oi.soid);
- return reply_op_error(pg, -EOLDSNAPC);
+ co_await reply_op_error(pg, -EOLDSNAPC);
+ co_return;
}
if (!pg->is_primary()) {
if (is_misdirected(*pg)) {
DEBUGDPP("{}.{}: dropping misdirected op",
*pg, *this, this_instance_id);
- return seastar::now();
+ co_return;
} else if (const hobject_t& hoid = m->get_hobj();
!pg->get_peering_state().can_serve_replica_read(hoid)) {
DEBUGDPP("{}.{}: unstable write on replica, bouncing to primary",
*pg, *this, this_instance_id);
- return reply_op_error(pg, -EAGAIN);
+ co_await reply_op_error(pg, -EAGAIN);
+ co_return;
} else {
DEBUGDPP("{}.{}: serving replica read on oid {}",
*pg, *this, this_instance_id, m->get_hobj());
}
}
- return pg->do_osd_ops(
+
+ auto [submitted, all_completed] = co_await pg->do_osd_ops(
m, r_conn, obc, op_info, snapc
- ).safe_then_unpack_interruptible(
- [FNAME, this, pg, this_instance_id, &ihref](
- auto submitted, auto all_completed) mutable {
- return submitted.then_interruptible(
- [this, pg, &ihref] {
- return ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this);
- }).then_interruptible(
- [FNAME, this, pg, this_instance_id,
- all_completed=std::move(all_completed), &ihref]() mutable {
- return all_completed.safe_then_interruptible(
- [FNAME, this, pg, this_instance_id, &ihref](
- MURef<MOSDOpReply> reply) {
- return ihref.enter_stage<interruptor>(client_pp(*pg).send_reply, *this
- ).then_interruptible(
- [FNAME, this, pg, this_instance_id,
- reply=std::move(reply)]() mutable {
- DEBUGDPP("{}.{}: sending response",
- *pg, *this, this_instance_id);
- // TODO: gate the crosscore sending
- return get_foreign_connection(
- ).send_with_throttling(std::move(reply));
- });
- }, crimson::ct_error::eagain::handle(
- [this, pg, this_instance_id, &ihref]() mutable {
- return process_op(ihref, pg, this_instance_id);
- }));
- });
- }, crimson::ct_error::eagain::handle(
- [this, pg, this_instance_id, &ihref]() mutable {
- return process_op(ihref, pg, this_instance_id);
- }));
+ );
+ co_await std::move(submitted);
+
+ co_await ihref.enter_stage<interruptor>(client_pp(*pg).wait_repop, *this);
+
+ auto reply = co_await std::move(all_completed);
+
+ co_await ihref.enter_stage<interruptor>(client_pp(*pg).send_reply, *this);
+ DEBUGDPP("{}.{}: sending response",
+ *pg, *this, this_instance_id);
+ // TODO: gate the crosscore sending
+ co_await interruptor::make_interruptible(
+ get_foreign_connection().send_with_throttling(std::move(reply))
+ );
}
bool ClientRequest::is_misdirected(const PG& pg) const