return store->read(coll, ghobject_t{hoid}, off, len, flags);
}
+MURef<MOSDRepOp> ReplicatedBackend::new_repop_msg(
+ const pg_shard_t &pg_shard,
+ const hobject_t &hoid,
+ const bufferlist &encoded_txn,
+ const osd_op_params_t &osd_op_p,
+ epoch_t min_epoch,
+ epoch_t map_epoch,
+ const std::vector<pg_log_entry_t> &log_entries,
+ ceph_tid_t tid)
+{
+ ceph_assert(pg_shard != whoami);
+ auto m = crimson::make_message<MOSDRepOp>(
+ osd_op_p.req_id,
+ whoami,
+ spg_t{pgid, pg_shard.shard},
+ hoid,
+ CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
+ map_epoch,
+ min_epoch,
+ tid,
+ osd_op_p.at_version);
+ if (pg.should_send_op(pg_shard, hoid)) {
+ m->set_data(encoded_txn);
+ } else {
+ ceph::os::Transaction t;
+ bufferlist bl;
+ encode(t, bl);
+ m->set_data(bl);
+ }
+ encode(log_entries, m->logbl);
+ m->pg_trim_to = osd_op_p.pg_trim_to;
+ m->pg_committed_to = osd_op_p.pg_committed_to;
+ m->pg_stats = pg.get_info().stats;
+ return m;
+}
+
ReplicatedBackend::rep_op_fut_t
ReplicatedBackend::submit_transaction(const std::set<pg_shard_t>& pg_shards,
const hobject_t& hoid,
auto sends = std::make_unique<std::vector<seastar::future<>>>();
for (auto pg_shard : pg_shards) {
if (pg_shard != whoami) {
- auto m = crimson::make_message<MOSDRepOp>(
- osd_op_p.req_id,
- whoami,
- spg_t{pgid, pg_shard.shard},
- hoid,
- CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK,
- map_epoch,
- min_epoch,
- tid,
- osd_op_p.at_version);
- if (pg.should_send_op(pg_shard, hoid)) {
- m->set_data(encoded_txn);
- } else {
- ceph::os::Transaction t;
- bufferlist bl;
- encode(t, bl);
- m->set_data(bl);
- }
+ auto m = new_repop_msg(
+ pg_shard, hoid, encoded_txn, osd_op_p,
+ min_epoch, map_epoch, log_entries, tid);
pending_txn->second.acked_peers.push_back({pg_shard, eversion_t{}});
- encode(log_entries, m->logbl);
- m->pg_trim_to = osd_op_p.pg_trim_to;
- m->pg_committed_to = osd_op_p.pg_committed_to;
- m->pg_stats = pg.get_info().stats;
// TODO: set more stuff. e.g., pg_states
sends->emplace_back(
shard_services.send_to_osd(
pending_transactions_t pending_trans;
crimson::osd::PG& pg;
+ MURef<MOSDRepOp> new_repop_msg(
+ const pg_shard_t &pg_shard,
+ const hobject_t &hoid,
+ const bufferlist &encoded_txn,
+ const osd_op_params_t &osd_op_p,
+ epoch_t min_epoch,
+ epoch_t map_epoch,
+ const std::vector<pg_log_entry_t> &log_entries,
+ ceph_tid_t tid);
+
seastar::future<> request_committed(
const osd_reqid_t& reqid, const eversion_t& at_version) final;
};