{
const hobject_t& hoid = obc->obs.oi.soid;
logger().debug("ECBackend::{} hoid={}", __func__, hoid);
+ auto op = std::make_unique<ECCrimsonOp>(std::move(txn), std::move(obc));
+ op->hoid = hoid;
+ //op->delta_stats = delta_stats;
+ op->version = osd_op_p.at_version;
+ op->trim_to = osd_op_p.pg_trim_to;
+ op->pg_committed_to =
+ std::max(osd_op_p.pg_committed_to, rmw_pipeline.committed_to);
+ op->log_entries = std::move(log_entries);
+ //std::swap(op->updated_hit_set_history, hset_history);
+ auto on_all_commit = new C_AllSubWritesCommited;
+ op->on_all_commit = on_all_commit;
+ op->tid = shard_services.get_tid();
+ op->reqid = osd_op_p.req_id;
+ op->client_op = nullptr; //client_op;
+ //if (client_op) {
+ // op->trace = client_op->pg_trace;
+ //}
+ op->plan = op->get_write_plan(
+ sinfo,
+ *(op->t),
+ &dpp);
+ logger().info("{}: op {} starting", "_submit_transaction", ""); //*op);
+ rmw_pipeline.start_rmw(std::move(op));
+ logger().debug("ECBackend::{} started ec op", "_submit_transaction");
return make_ready_future<rep_op_ret_t>(
- seastar::now(),
- PGBackend::interruptor::async([=, this,
- txn=std::move(txn),
- osd_op_p=std::move(osd_op_p)]() mutable {
- logger().debug("ECBackend::{} LINE {}", "_submit_transaction", __LINE__);
- auto op = std::make_unique<ECCrimsonOp>(std::move(txn), std::move(obc));
- logger().debug("ECBackend::{} LINE {}", "_submit_transaction", __LINE__);
- op->hoid = hoid;
- //op->delta_stats = delta_stats;
- op->version = osd_op_p.at_version;
- op->trim_to = osd_op_p.pg_trim_to;
- op->pg_committed_to =
- std::max(osd_op_p.pg_committed_to, rmw_pipeline.committed_to);
- op->log_entries = std::move(log_entries);
- //std::swap(op->updated_hit_set_history, hset_history);
- // TODO: promsie future here
- auto on_all_commit = new C_AllSubWritesCommited;
- op->on_all_commit = on_all_commit;
- op->tid = shard_services.get_tid();
- op->reqid = osd_op_p.req_id;
- op->client_op = nullptr; //client_op;
- //if (client_op) {
- // op->trace = client_op->pg_trace;
- //}
- op->plan = op->get_write_plan(
- sinfo,
- *(op->t),
- [this](const hobject_t &i) {
- ECUtil::HashInfoRef ref =
- get_hash_info(i, true).handle_error_interruptible(
- crimson::ct_error::assert_all{}
- ).get();
- logger().debug("ECBackend::{} LINE {}", "_submit_transaction", __LINE__);
- ceph_assert_always(ref);
- return ref;
- },
- &dpp);
- logger().info("{}: op {} starting", "_submit_transaction", ""); //*op);
- rmw_pipeline.start_rmw(std::move(op));
- logger().debug("ECBackend::{} started ec op", "_submit_transaction");
- on_all_commit->get_future().get();
- logger().debug("ECBackend::{} LINE {}", "_submit_transaction", __LINE__);
- }).then_interruptible([] {
- logger().debug("ECBackend::{} LINE {}", "_submit_transaction", __LINE__);
- return seastar::now();
- })};
+ seastar::now(),
+ on_all_commit->get_future().then_interruptible([] {
+ logger().debug("{}: op {} finished completely", "_submit_transaction", "");
+ return seastar::now();
+ })
+ );
}
ECBackend::write_iertr::future<>