osd->logger->inc(l_osd_op_wip);
+ dout(10) << __func__ << ": " << *repop << dendl;
return repop;
}
boost::intrusive_ptr<ReplicatedPG::RepGather> ReplicatedPG::new_repop(
+ eversion_t version,
ObcLockManager &&manager,
OpRequestRef &&op,
boost::optional<std::function<void(void)> > &&on_complete)
std::move(on_complete),
osd->get_tid(),
info.last_complete);
+ repop->v = version;
repop->start = ceph_clock_now(cct);
osd->logger->inc(l_osd_op_wip);
+ dout(10) << __func__ << ": " << *repop << dendl;
return boost::intrusive_ptr<RepGather>(repop);
}
dout(10) << __func__ << " " << entries << dendl;
assert(is_primary());
+ eversion_t version;
if (!entries.empty()) {
assert(entries.rbegin()->version >= projected_last_update);
- projected_last_update = entries.rbegin()->version;
+ version = projected_last_update = entries.rbegin()->version;
}
boost::intrusive_ptr<RepGather> repop;
boost::optional<std::function<void(void)> > on_complete;
if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_JEWEL)) {
repop = new_repop(
+ version,
std::move(manager),
std::move(op),
std::move(_on_complete));
}
}
+// Dup op detection
+
+bool ReplicatedPG::already_complete(eversion_t v)
+{
+ dout(20) << __func__ << ": " << v << dendl;
+ for (xlist<RepGather*>::iterator i = repop_queue.begin();
+ !i.end();
+ ++i) {
+ dout(20) << __func__ << ": " << **i << dendl;
+ // skip copy from temp object ops
+ if ((*i)->v == eversion_t()) {
+ dout(20) << __func__ << ": " << **i
+ << " version is empty" << dendl;
+ continue;
+ }
+ if ((*i)->v > v) {
+ dout(20) << __func__ << ": " << **i
+ << " (*i)->v past v" << dendl;
+ break;
+ }
+ if (!(*i)->all_committed) {
+ dout(20) << __func__ << ": " << **i
+ << " not committed, returning false"
+ << dendl;
+ return false;
+ }
+ }
+ dout(20) << __func__ << ": returning true" << dendl;
+ return true;
+}
+
+bool ReplicatedPG::already_ack(eversion_t v)
+{
+ dout(20) << __func__ << ": " << v << dendl;
+ for (xlist<RepGather*>::iterator i = repop_queue.begin();
+ !i.end();
+ ++i) {
+ // skip copy from temp object ops
+ if ((*i)->v == eversion_t()) {
+ dout(20) << __func__ << ": " << **i
+ << " version is empty" << dendl;
+ continue;
+ }
+ if ((*i)->v > v) {
+ dout(20) << __func__ << ": " << **i
+ << " (*i)->v past v" << dendl;
+ break;
+ }
+ if (!(*i)->all_applied) {
+ dout(20) << __func__ << ": " << **i
+ << " not applied, returning false"
+ << dendl;
+ return false;
+ }
+ }
+ dout(20) << __func__ << ": returning true" << dendl;
+ return true;
+}
+
// ==========================================================================================
// SCRUB
ObjectContextRef obc,
ceph_tid_t rep_tid);
boost::intrusive_ptr<RepGather> new_repop(
+ eversion_t version,
ObcLockManager &&manager,
OpRequestRef &&op,
boost::optional<std::function<void(void)> > &&on_complete);
void agent_choose_mode_restart() override;
/// true if we can send an ondisk/commit for v
- bool already_complete(eversion_t v) {
- for (xlist<RepGather*>::iterator i = repop_queue.begin();
- !i.end();
- ++i) {
- // skip copy from temp object ops
- if ((*i)->v == eversion_t())
- continue;
- if ((*i)->v > v)
- break;
- if (!(*i)->all_committed)
- return false;
- }
- return true;
- }
+ bool already_complete(eversion_t v);
/// true if we can send an ack for v
- bool already_ack(eversion_t v) {
- for (xlist<RepGather*>::iterator i = repop_queue.begin();
- !i.end();
- ++i) {
- // skip copy from temp object ops
- if ((*i)->v == eversion_t())
- continue;
- if ((*i)->v > v)
- break;
- if (!(*i)->all_applied)
- return false;
- }
- return true;
- }
+ bool already_ack(eversion_t v);
// projected object info
SharedLRU<hobject_t, ObjectContext, hobject_t::ComparatorWithDefault> object_contexts;