ObjectStore::Transaction *t = new ObjectStore::Transaction;
Context *onreadable = 0;
Context *onreadable_sync = 0;
+ Context *oncomplete = 0;
submit_push_data(pi.recovery_info, first,
data_included, data,
m->omap_header,
onreadable = new C_OSD_AppliedRecoveredObject(this, t, obc);
onreadable_sync = new C_OSD_OndiskWriteUnlock(obc);
+ oncomplete = new C_OSD_CompletedPull(this, hoid, get_osdmap()->get_epoch());
} else {
onreadable = new ObjectStore::C_DeleteTransaction(t);
}
int r = osd->store->
- queue_transaction(osr.get(), t,
- onreadable,
- new C_OSD_CommittedPushedObject(this, op,
- info.history.same_interval_since,
- info.last_complete),
- onreadable_sync);
+ queue_transaction(
+ osr.get(), t,
+ onreadable,
+ new C_OSD_CommittedPushedObject(this, op,
+ info.history.same_interval_since,
+ info.last_complete),
+ onreadable_sync,
+ oncomplete,
+ TrackedOpRef()
+ );
assert(r == 0);
if (complete) {
- finish_recovery_op(hoid);
pulling.erase(hoid);
pull_from_peer[m->get_source().num()].erase(hoid);
update_stats();
osd->send_message_osd_cluster(reply, conn.get());
}
};
+ struct C_OSD_CompletedPull : public Context {
+ boost::intrusive_ptr<ReplicatedPG> pg;
+ hobject_t hoid;
+ epoch_t epoch;
+ C_OSD_CompletedPull(
+ ReplicatedPG *pg,
+ const hobject_t &hoid,
+ epoch_t epoch) : pg(pg), hoid(hoid), epoch(epoch) {}
+ void finish(int) {
+ pg->lock();
+ if (epoch >= pg->last_peering_reset) {
+ pg->finish_recovery_op(hoid);
+ }
+ pg->unlock();
+ }
+ };
+ friend class C_OSD_CompletedPull;
struct C_OSD_AppliedRecoveredObjectReplica : public Context {
boost::intrusive_ptr<ReplicatedPG> pg;
ObjectStore::Transaction *t;