osd->send_message_osd_cluster(m, con);
}
+void PrimaryLogPG::on_primary_error(
+ const hobject_t &oid,
+ eversion_t v)
+{
+ dout(0) << __func__ << ": oid " << oid << " version " << v << dendl;
+ list<pg_shard_t> fl = { pg_whoami };
+ failed_push(fl, oid);
+ primary_error(oid, v);
+ backfills_in_flight.erase(oid);
+ missing_loc.add_missing(oid, v, eversion_t());
+}
+
ConnectionRef PrimaryLogPG::get_con_osd_cluster(
int peer, epoch_t from_epoch)
{
ObjectRecoveryProgress &new_progress = *out_progress;
new_progress = progress;
- dout(7) << "send_push_op " << recovery_info.soid
+ dout(7) << __func__ << " " << recovery_info.soid
<< " v " << recovery_info.version
<< " size " << recovery_info.size
<< " recovery_info: " << recovery_info
return false;
} else {
PushInfo *pi = &pushing[soid][peer];
+ bool error = pushing[soid].begin()->second.recovery_progress.error;
- if (!pi->recovery_progress.data_complete) {
+ if (!pi->recovery_progress.data_complete && !error) {
dout(10) << " pushing more from, "
<< pi->recovery_progress.data_recovered_to
<< " of " << pi->recovery_info.copy_subset << dendl;
pi->recovery_info,
pi->recovery_progress, &new_progress, reply,
&(pi->stat));
- // XXX: What can we do here?
- assert(r == 0);
+ // Handle the case of a read error right after we wrote, which is
+ // hopefuilly extremely rare.
+ if (r < 0) {
+ dout(5) << __func__ << ": oid " << soid << " error " << r << dendl;
+
+ error = true;
+ goto done;
+ }
pi->recovery_progress = new_progress;
return true;
} else {
// done!
- get_parent()->on_peer_recover(
- peer, soid, pi->recovery_info);
+done:
+ if (!error)
+ get_parent()->on_peer_recover( peer, soid, pi->recovery_info);
get_parent()->release_locks(pi->lock_manager);
object_stat_sum_t stat = pi->stat;
+ eversion_t v = pi->recovery_info.version;
pushing[soid].erase(peer);
pi = NULL;
if (pushing[soid].empty()) {
- get_parent()->on_global_recover(soid, stat);
+ if (!error)
+ get_parent()->on_global_recover(soid, stat);
+ else
+ get_parent()->on_primary_error(soid, v);
+
pushing.erase(soid);
} else {
+ // This looks weird, but we erased the current peer and need to remember
+ // the error on any other one, while getting more acks.
+ if (error)
+ pushing[soid].begin()->second.recovery_progress.error = true;
dout(10) << "pushed " << soid << ", still waiting for push ack from "
<< pushing[soid].size() << " others" << dendl;
}