dout(10) << " sending commit on " << *m << " " << reply << dendl;
osd->send_message_osd_client(reply, m->get_connection());
}
- ));
+ ),
+ op
+ );
}
ReplicatedPG::cache_result_t ReplicatedPG::maybe_handle_cache_detail(
boost::intrusive_ptr<ReplicatedPG::RepGather> ReplicatedPG::new_repop(
ObcLockManager &&manager,
+ OpRequestRef &&op,
boost::optional<std::function<void(void)> > &&on_complete)
{
RepGather *repop = new RepGather(
std::move(manager),
+ std::move(op),
std::move(on_complete),
osd->get_tid(),
info.last_complete);
void ReplicatedPG::submit_log_entries(
const list<pg_log_entry_t> &entries,
ObcLockManager &&manager,
- boost::optional<std::function<void(void)> > &&on_complete)
+ boost::optional<std::function<void(void)> > &&on_complete,
+ OpRequestRef op)
{
dout(10) << __func__ << entries << dendl;
assert(is_primary());
if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
repop = new_repop(
std::move(manager),
+ std::move(op),
std::move(on_complete));
}
for (set<pg_shard_t>::const_iterator i = actingbackfill.begin();
// this will requeue ops we were working on but didn't finish, and
// any dups
apply_and_flush_repops(is_primary());
+ cancel_log_updates();
// do this *after* apply_and_flush_repops so that we catch any newly
// registered watches.
RepGather(
ObcLockManager &&manager,
+ OpRequestRef &&o,
boost::optional<std::function<void(void)> > &&on_complete,
ceph_tid_t rt,
eversion_t lc) :
+ op(o),
queue_item(this),
nref(1),
rep_tid(rt),
ceph_tid_t rep_tid);
boost::intrusive_ptr<RepGather> new_repop(
ObcLockManager &&manager,
+ OpRequestRef &&op,
boost::optional<std::function<void(void)> > &&on_complete);
void remove_repop(RepGather *repop);
void submit_log_entries(
const list<pg_log_entry_t> &entries,
ObcLockManager &&manager,
- boost::optional<std::function<void(void)> > &&on_complete);
+ boost::optional<std::function<void(void)> > &&on_complete,
+ OpRequestRef op = OpRequestRef());
struct LogUpdateCtx {
boost::intrusive_ptr<RepGather> repop;
set<pg_shard_t> waiting_on;