{
}
+// ======================
+// PGBackend::Listener
+
+
+void ReplicatedPG::on_local_recover_start(
+ const hobject_t &oid,
+ ObjectStore::Transaction *t)
+{
+ pg_log.revise_have(oid, eversion_t());
+ remove_snap_mapped_object(*t, oid);
+ t->remove(coll, oid);
+}
+
+void ReplicatedPG::on_local_recover(
+ const hobject_t &hoid,
+ const object_stat_sum_t &stat_diff,
+ const ObjectRecoveryInfo &_recovery_info,
+ ObjectContextRef obc,
+ ObjectStore::Transaction *t
+ )
+{
+ ObjectRecoveryInfo recovery_info(_recovery_info);
+ if (recovery_info.soid.snap < CEPH_NOSNAP) {
+ assert(recovery_info.oi.snaps.size());
+ OSDriver::OSTransaction _t(osdriver.get_transaction(t));
+ set<snapid_t> snaps(
+ recovery_info.oi.snaps.begin(),
+ recovery_info.oi.snaps.end());
+ snap_mapper.add_oid(
+ recovery_info.soid,
+ snaps,
+ &_t);
+ }
+
+ if (pg_log.get_missing().is_missing(recovery_info.soid) &&
+ pg_log.get_missing().missing.find(recovery_info.soid)->second.need > recovery_info.version) {
+ assert(is_primary());
+ const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second;
+ if (latest->op == pg_log_entry_t::LOST_REVERT &&
+ latest->reverting_to == recovery_info.version) {
+ dout(10) << " got old revert version " << recovery_info.version
+ << " for " << *latest << dendl;
+ recovery_info.version = latest->version;
+ // update the attr to the revert event version
+ recovery_info.oi.prior_version = recovery_info.oi.version;
+ recovery_info.oi.version = latest->version;
+ bufferlist bl;
+ ::encode(recovery_info.oi, bl);
+ t->setattr(coll, recovery_info.soid, OI_ATTR, bl);
+ }
+ }
+
+ // keep track of active pushes for scrub
+ ++active_pushes;
+
+ recover_got(recovery_info.soid, recovery_info.version);
+
+ if (is_primary()) {
+ info.stats.stats.sum.add(stat_diff);
+
+ assert(obc);
+ obc->obs.exists = true;
+ obc->ondisk_write_lock();
+ obc->obs.oi = recovery_info.oi; // may have been updated above
+
+
+ t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
+ t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
+ t->register_on_complete(
+ new C_OSD_CompletedPull(this, hoid, get_osdmap()->get_epoch()));
+
+ publish_stats_to_osd();
+ if (waiting_for_missing_object.count(hoid)) {
+ dout(20) << " kicking waiters on " << hoid << dendl;
+ requeue_ops(waiting_for_missing_object[hoid]);
+ waiting_for_missing_object.erase(hoid);
+ if (pg_log.get_missing().missing.size() == 0) {
+ requeue_ops(waiting_for_all_missing);
+ waiting_for_all_missing.clear();
+ }
+ }
+ } else {
+ t->register_on_applied(
+ new C_OSD_AppliedRecoveredObjectReplica(this));
+
+ }
+
+ t->register_on_commit(
+ new C_OSD_CommittedPushedObject(
+ this,
+ get_osdmap()->get_epoch(),
+ info.last_complete));
+
+ // update pg
+ dirty_info = true;
+ write_if_dirty(*t);
+
+}
+
+void ReplicatedPG::on_global_recover(
+ const hobject_t &soid)
+{
+ publish_stats_to_osd();
+ pushing.erase(soid);
+ dout(10) << "pushed " << soid << " to all replicas" << dendl;
+ finish_recovery_op(soid);
+ if (waiting_for_degraded_object.count(soid)) {
+ requeue_ops(waiting_for_degraded_object[soid]);
+ waiting_for_degraded_object.erase(soid);
+ }
+ finish_degraded_object(soid);
+}
+
+void ReplicatedPG::on_peer_recover(
+ int peer,
+ const hobject_t &soid,
+ const ObjectRecoveryInfo &recovery_info)
+{
+ // done!
+ if (peer == backfill_target && backfills_in_flight.count(soid))
+ backfills_in_flight.erase(soid);
+ else
+ peer_missing[peer].got(soid, recovery_info.version);
+}
+
+void ReplicatedPG::begin_peer_recover(
+ int peer,
+ const hobject_t soid)
+{
+}
+
// =======================
// pg changes
}
if (first) {
- pg_log.revise_have(recovery_info.soid, eversion_t());
- remove_snap_mapped_object(*t, recovery_info.soid);
- t->remove(coll, recovery_info.soid);
+ on_local_recover_start(recovery_info.soid, t);
t->remove(get_temp_coll(t), recovery_info.soid);
t->touch(target_coll, recovery_info.soid);
t->omap_setheader(target_coll, recovery_info.soid, omap_header);
q.get_start(), q.get_len(), q.get_start());
}
}
-
- if (recovery_info.soid.snap < CEPH_NOSNAP) {
- assert(recovery_info.oi.snaps.size());
- OSDriver::OSTransaction _t(osdriver.get_transaction(t));
- set<snapid_t> snaps(
- recovery_info.oi.snaps.begin(),
- recovery_info.oi.snaps.end());
- snap_mapper.add_oid(
- recovery_info.soid,
- snaps,
- &_t);
- }
-
- if (pg_log.get_missing().is_missing(recovery_info.soid) &&
- pg_log.get_missing().missing.find(recovery_info.soid)->second.need > recovery_info.version) {
- assert(is_primary());
- const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second;
- if (latest->op == pg_log_entry_t::LOST_REVERT &&
- latest->reverting_to == recovery_info.version) {
- dout(10) << " got old revert version " << recovery_info.version
- << " for " << *latest << dendl;
- recovery_info.version = latest->version;
- // update the attr to the revert event version
- recovery_info.oi.prior_version = recovery_info.oi.version;
- recovery_info.oi.version = latest->version;
- bufferlist bl;
- ::encode(recovery_info.oi, bl);
- t->setattr(coll, recovery_info.soid, OI_ATTR, bl);
- }
- }
- recover_got(recovery_info.soid, recovery_info.version);
-
- // update pg
- dirty_info = true;
- write_if_dirty(*t);
}
ObjectRecoveryInfo ReplicatedPG::recalc_subsets(const ObjectRecoveryInfo& recovery_info)
info.stats.stats.sum.num_keys_recovered += pop.omap_entries.size();
- if (complete) {
- info.stats.stats.sum.num_objects_recovered++;
-
- SnapSetContext *ssc;
- if (hoid.snap == CEPH_NOSNAP || hoid.snap == CEPH_SNAPDIR) {
- ssc = create_snapset_context(hoid.oid);
- ssc->snapset = pi.recovery_info.ss;
- } else {
- ssc = get_snapset_context(hoid.oid, hoid.get_key(), hoid.hash, false,
- hoid.get_namespace());
- assert(ssc);
- }
- ObjectContextRef obc = create_object_context(pi.recovery_info.oi, ssc);
- obc->obs.exists = true;
-
- obc->ondisk_write_lock();
-
- // keep track of active pushes for scrub
- ++active_pushes;
-
- t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
- t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
- t->register_on_complete(
- new C_OSD_CompletedPull(this, hoid, get_osdmap()->get_epoch()));
- }
-
- t->register_on_commit(
- new C_OSD_CommittedPushedObject(
- this,
- get_osdmap()->get_epoch(),
- info.last_complete));
-
if (complete) {
pulling.erase(hoid);
pull_from_peer[from].erase(hoid);
- publish_stats_to_osd();
- if (waiting_for_missing_object.count(hoid)) {
- dout(20) << " kicking waiters on " << hoid << dendl;
- requeue_ops(waiting_for_missing_object[hoid]);
- waiting_for_missing_object.erase(hoid);
- if (pg_log.get_missing().missing.size() == 0) {
- requeue_ops(waiting_for_all_missing);
- waiting_for_all_missing.clear();
- }
- }
+ info.stats.stats.sum.num_objects_recovered++;
+ on_local_recover(hoid, object_stat_sum_t(), pi.recovery_info, pi.obc, t);
return false;
} else {
response->soid = pop.soid;
bool complete = pop.after_progress.data_complete &&
pop.after_progress.omap_complete;
- // keep track of active pushes for scrub
- ++active_pushes;
-
response->soid = pop.recovery_info.soid;
- t->register_on_applied(
- new C_OSD_AppliedRecoveredObjectReplica(this));
submit_push_data(pop.recovery_info,
first,
complete,
pop.omap_entries,
t);
- t->register_on_commit(
- new C_OSD_CommittedPushedObject(
- this,
- get_osdmap()->get_epoch(),
- info.last_complete));
+ if (complete)
+ on_local_recover(
+ pop.recovery_info.soid,
+ object_stat_sum_t(),
+ pop.recovery_info,
+ ObjectContextRef(), // ok, is replica
+ t);
}
void ReplicatedPG::send_pushes(int prio, map<int, vector<PushOp> > &pushes)
return true;
} else {
// done!
- if (peer == backfill_target && backfills_in_flight.count(soid))
- backfills_in_flight.erase(soid);
- else
- peer_missing[peer].got(soid, pi->recovery_info.version);
+ on_peer_recover(peer, soid, pi->recovery_info);
pushing[soid].erase(peer);
pi = NULL;
- publish_stats_to_osd();
if (pushing[soid].empty()) {
- pushing.erase(soid);
- dout(10) << "pushed " << soid << " to all replicas" << dendl;
- finish_recovery_op(soid);
- if (waiting_for_degraded_object.count(soid)) {
- requeue_ops(waiting_for_degraded_object[soid]);
- waiting_for_degraded_object.erase(soid);
- }
- finish_degraded_object(soid);
+ on_global_recover(soid);
} else {
dout(10) << "pushed " << soid << ", still waiting for push ack from "
<< pushing[soid].size() << " others" << dendl;
t->register_on_complete(new C_OSD_SendMessageOnConn(
osd, reply, m->get_connection()));
}
- t->register_on_commit(new C_OnPushCommit(this, op));
osd->store->queue_transaction(osr.get(), t);
return;
}