#endif
}
};
+
+class C_AllSubWritesCommited : public Context {
+ seastar::promise<> on_complete;
+public:
+ C_AllSubWritesCommited() = default;
+
+ void finish(int) override {
+ on_complete.set_value();
+ }
+
+ PGBackend::interruptible_future<> get_future() {
+ return on_complete.get_future();
+ }
+};
+
ECBackend::rep_op_fut_t
ECBackend::submit_transaction(const std::set<pg_shard_t> &pg_shards,
crimson::osd::ObjectContextRef&& obc,
epoch_t min_epoch, epoch_t max_epoch,
std::vector<pg_log_entry_t>&& log_entries)
{
- // todo
- return make_ready_future<rep_op_ret_t>(seastar::now(), seastar::now());
+ const hobject_t& hoid = obc->obs.oi.soid;
+ logger().debug("ECBackend::{} hoid={}", __func__, hoid);
+ return make_ready_future<rep_op_ret_t>(
+ seastar::now(),
+ PGBackend::interruptor::async([=, this,
+ txn=std::move(txn),
+ osd_op_p=std::move(osd_op_p)]() mutable {
+ logger().debug("ECBackend::{} LINE {}", "_submit_transaction", __LINE__);
+ auto op = std::make_unique<ECCrimsonOp>(std::move(txn), std::move(obc));
+ logger().debug("ECBackend::{} LINE {}", "_submit_transaction", __LINE__);
+ op->hoid = hoid;
+ //op->delta_stats = delta_stats;
+ op->version = osd_op_p.at_version;
+ op->trim_to = osd_op_p.pg_trim_to;
+ op->pg_committed_to =
+ std::max(osd_op_p.pg_committed_to, rmw_pipeline.committed_to);
+ op->log_entries = std::move(log_entries);
+ //std::swap(op->updated_hit_set_history, hset_history);
+ // TODO: promsie future here
+ auto on_all_commit = new C_AllSubWritesCommited;
+ op->on_all_commit = on_all_commit;
+ op->tid = shard_services.get_tid();
+ op->reqid = osd_op_p.req_id;
+ op->client_op = nullptr; //client_op;
+ //if (client_op) {
+ // op->trace = client_op->pg_trace;
+ //}
+ op->plan = op->get_write_plan(
+ sinfo,
+ *(op->t),
+ [this](const hobject_t &i) {
+ ECUtil::HashInfoRef ref =
+ get_hash_info(i, true).handle_error_interruptible(
+ crimson::ct_error::assert_all{}
+ ).get();
+ logger().debug("ECBackend::{} LINE {}", "_submit_transaction", __LINE__);
+ ceph_assert_always(ref);
+ return ref;
+ },
+ &dpp);
+ logger().info("{}: op {} starting", "_submit_transaction", ""); //*op);
+ rmw_pipeline.start_rmw(std::move(op));
+ logger().debug("ECBackend::{} started ec op", "_submit_transaction");
+ on_all_commit->get_future().get();
+ logger().debug("ECBackend::{} LINE {}", "_submit_transaction", __LINE__);
+ }).then_interruptible([] {
+ logger().debug("ECBackend::{} LINE {}", "_submit_transaction", __LINE__);
+ return seastar::now();
+ })};
}
ECBackend::write_iertr::future<>