op.soid,
op.recovery_info,
recovery_ops[op.soid].obc,
+ false,
&m->t);
} else {
get_parent()->on_local_recover(
op.soid,
op.recovery_info,
ObjectContextRef(),
+ false,
&m->t);
}
}
stat.num_bytes_recovered = op.recovery_info.size;
stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ?
stat.num_objects_recovered = 1;
- get_parent()->on_global_recover(op.hoid, stat);
+ get_parent()->on_global_recover(op.hoid, stat, false);
dout(10) << __func__ << ": WRITING return " << op << dendl;
recovery_ops.erase(op.hoid);
return;
RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second;
continue_recovery_op(op, &m);
}
+
dispatch_recovery_messages(m, priority);
+ send_recovery_deletes(priority, h->deletes);
delete _h;
}
}
bool ECBackend::can_handle_while_inactive(
- OpRequestRef op)
+ OpRequestRef _op)
{
- dout(10) << __func__ << ": " << op << dendl;
- switch (op->get_req()->get_type()) {
- case MSG_OSD_PG_RECOVERY_DELETE:
- case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
- return true;
- default:
- return false;
- }
+ return false;
}
-bool ECBackend::handle_message(
+bool ECBackend::_handle_message(
OpRequestRef _op)
{
dout(10) << __func__ << ": " << *_op->get_req() << dendl;
RecoveryHandle *h
) override;
- bool handle_message(
+ bool _handle_message(
OpRequestRef op
) override;
bool can_handle_while_inactive(
#include "OSDMap.h"
#include "PGLog.h"
#include "common/LogClient.h"
+#include "messages/MOSDPGRecoveryDelete.h"
+#include "messages/MOSDPGRecoveryDeleteReply.h"
#define dout_context cct
#define dout_subsys ceph_subsys_osd
return *_dout << pgb->get_parent()->gen_dbg_prefix();
}
+void PGBackend::recover_delete_object(const hobject_t &oid, eversion_t v,
+ RecoveryHandle *h)
+{
+ assert(get_parent()->get_actingbackfill_shards().size() > 0);
+ for (const auto& shard : get_parent()->get_actingbackfill_shards()) {
+ if (shard == get_parent()->whoami_shard())
+ continue;
+ if (get_parent()->get_shard_missing(shard).is_missing(oid)) {
+ dout(20) << __func__ << " will remove " << oid << " " << v << " from "
+ << shard << dendl;
+ h->deletes[shard].push_back(make_pair(oid, v));
+ get_parent()->begin_peer_recover(shard, oid);
+ }
+ }
+}
+
+void PGBackend::send_recovery_deletes(int prio,
+ const map<pg_shard_t, vector<pair<hobject_t, eversion_t> > > &deletes)
+{
+ for (const auto& p : deletes) {
+ const auto& shard = p.first;
+ const auto& objects = p.second;
+ ConnectionRef con = get_parent()->get_con_osd_cluster(
+ shard.osd,
+ get_osdmap()->get_epoch());
+ if (!con)
+ continue;
+ auto it = objects.begin();
+ while (it != objects.end()) {
+ uint64_t cost = 0;
+ uint64_t deletes = 0;
+ spg_t target_pg = spg_t(get_parent()->get_info().pgid.pgid, shard.shard);
+ MOSDPGRecoveryDelete *msg =
+ new MOSDPGRecoveryDelete(get_parent()->whoami_shard(),
+ target_pg,
+ get_osdmap()->get_epoch());
+ msg->set_priority(prio);
+
+ while (it != objects.end() &&
+ cost < cct->_conf->osd_max_push_cost &&
+ deletes < cct->_conf->osd_max_push_objects) {
+ dout(20) << __func__ << ": sending recovery delete << " << it->first
+ << " " << it->second << " to osd." << shard << dendl;
+ msg->objects.push_back(*it);
+ cost += cct->_conf->osd_push_per_object_cost;
+ ++deletes;
+ ++it;
+ }
+
+ msg->set_cost(cost);
+ get_parent()->send_message_osd_cluster(msg, con);
+ }
+ }
+}
+
+bool PGBackend::handle_message(OpRequestRef op)
+{
+ switch (op->get_req()->get_type()) {
+ case MSG_OSD_PG_RECOVERY_DELETE:
+ handle_recovery_delete(op);
+ return true;
+
+ case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
+ handle_recovery_delete_reply(op);
+ return true;
+
+ default:
+ break;
+ }
+
+ return _handle_message(op);
+}
+
+void PGBackend::handle_recovery_delete(OpRequestRef op)
+{
+ const MOSDPGRecoveryDelete *m = static_cast<const MOSDPGRecoveryDelete *>(op->get_req());
+ assert(m->get_type() == MSG_OSD_PG_RECOVERY_DELETE);
+ dout(20) << __func__ << " " << op << dendl;
+
+ op->mark_started();
+
+ C_GatherBuilder gather(cct);
+ for (const auto &p : m->objects) {
+ get_parent()->remove_missing_object(p.first, p.second, gather.new_sub());
+ }
+
+ MOSDPGRecoveryDeleteReply *reply = new MOSDPGRecoveryDeleteReply;
+ reply->from = get_parent()->whoami_shard();
+ reply->set_priority(m->get_priority());
+ reply->pgid = spg_t(get_parent()->get_info().pgid.pgid, m->from.shard);
+ reply->map_epoch = m->map_epoch;
+ reply->objects = m->objects;
+ ConnectionRef conn = m->get_connection();
+
+ gather.set_finisher(new FunctionContext(
+ [=](int) {
+ get_parent()->send_message_osd_cluster(reply, conn.get());
+ }));
+ gather.activate();
+}
+
+void PGBackend::handle_recovery_delete_reply(OpRequestRef op)
+{
+ const MOSDPGRecoveryDeleteReply *m = static_cast<const MOSDPGRecoveryDeleteReply *>(op->get_req());
+ assert(m->get_type() == MSG_OSD_PG_RECOVERY_DELETE_REPLY);
+ dout(20) << __func__ << " " << op << dendl;
+
+ for (const auto &p : m->objects) {
+ ObjectRecoveryInfo recovery_info;
+ hobject_t oid = p.first;
+ recovery_info.version = p.second;
+ get_parent()->on_peer_recover(m->from, oid, recovery_info);
+ bool peers_recovered = true;
+ for (const auto& shard : get_parent()->get_actingbackfill_shards()) {
+ if (shard == get_parent()->whoami_shard())
+ continue;
+ if (get_parent()->get_shard_missing(shard).is_missing(oid)) {
+ dout(20) << __func__ << " " << oid << " still missing on at least "
+ << shard << dendl;
+ peers_recovered = false;
+ break;
+ }
+ }
+ if (peers_recovered && !get_parent()->get_local_missing().is_missing(oid)) {
+ dout(20) << __func__ << " completed recovery, local_missing = "
+ << get_parent()->get_local_missing() << dendl;
+ object_stat_sum_t stat_diff;
+ stat_diff.num_objects_recovered = 1;
+ get_parent()->on_global_recover(p.first, stat_diff, true);
+ }
+ }
+}
+
void PGBackend::rollback(
const pg_log_entry_t &entry,
ObjectStore::Transaction *t)
const hobject_t &oid,
const ObjectRecoveryInfo &recovery_info,
ObjectContextRef obc,
+ bool is_delete,
ObjectStore::Transaction *t
) = 0;
*/
virtual void on_global_recover(
const hobject_t &oid,
- const object_stat_sum_t &stat_diff
+ const object_stat_sum_t &stat_diff,
+ bool is_delete
) = 0;
/**
virtual void failed_push(const list<pg_shard_t> &from, const hobject_t &soid) = 0;
virtual void primary_failed(const hobject_t &soid) = 0;
virtual bool primary_error(const hobject_t& soid, eversion_t v) = 0;
-
virtual void cancel_pull(const hobject_t &soid) = 0;
virtual void apply_stats(
eversion_t v
) = 0;
+ virtual void remove_missing_object(const hobject_t &oid,
+ eversion_t v,
+ Context *on_complete) = 0;
/**
* Bless a context
*/
struct RecoveryHandle {
bool cache_dont_need;
+ map<pg_shard_t, vector<pair<hobject_t, eversion_t> > > deletes;
RecoveryHandle(): cache_dont_need(false) {}
virtual ~RecoveryHandle() {}
int priority ///< [in] msg priority
) = 0;
+ void recover_delete_object(const hobject_t &oid, eversion_t v,
+ RecoveryHandle *h);
+ void send_recovery_deletes(int prio,
+ const map<pg_shard_t, vector<pair<hobject_t, eversion_t> > > &deletes);
+
/**
* recover_object
*
virtual bool can_handle_while_inactive(OpRequestRef op) = 0;
/// gives PGBackend a crack at an incoming message
- virtual bool handle_message(
+ bool handle_message(
OpRequestRef op ///< [in] message received
- ) = 0; ///< @return true if the message was handled
+ ); ///< @return true if the message was handled
+
+ /// the variant of handle_message that is overridden by child classes
+ virtual bool _handle_message(OpRequestRef op) = 0;
virtual void check_recovery_sources(const OSDMapRef& osdmap) = 0;
ObjectStore::Transaction *t);
protected:
+
+ void handle_recovery_delete(OpRequestRef op);
+ void handle_recovery_delete_reply(OpRequestRef op);
+
/// Reapply old attributes
void rollback_setattrs(
const hobject_t &hoid,
const hobject_t &hoid,
const ObjectRecoveryInfo &_recovery_info,
ObjectContextRef obc,
+ bool is_delete,
ObjectStore::Transaction *t
)
{
ObjectRecoveryInfo recovery_info(_recovery_info);
clear_object_snap_mapping(t, hoid);
- if (recovery_info.soid.is_snap()) {
+ if (!is_delete && recovery_info.soid.is_snap()) {
OSDriver::OSTransaction _t(osdriver.get_transaction(t));
set<snapid_t> snaps;
dout(20) << " snapset " << recovery_info.ss
snaps,
&_t);
}
- if (pg_log.get_missing().is_missing(recovery_info.soid) &&
+ if (!is_delete && pg_log.get_missing().is_missing(recovery_info.soid) &&
pg_log.get_missing().get_items().find(recovery_info.soid)->second.need > recovery_info.version) {
assert(is_primary());
const pg_log_entry_t *latest = pg_log.get_log().objects.find(recovery_info.soid)->second;
recover_got(recovery_info.soid, recovery_info.version);
if (is_primary()) {
- assert(obc);
- obc->obs.exists = true;
- obc->ondisk_write_lock();
-
- bool got = obc->get_recovery_read();
- assert(got);
+ if (!is_delete) {
+ obc->obs.exists = true;
+ obc->ondisk_write_lock();
- assert(recovering.count(obc->obs.oi.soid));
- recovering[obc->obs.oi.soid] = obc;
- obc->obs.oi = recovery_info.oi; // may have been updated above
+ bool got = obc->get_recovery_read();
+ assert(got);
+ assert(recovering.count(obc->obs.oi.soid));
+ recovering[obc->obs.oi.soid] = obc;
+ obc->obs.oi = recovery_info.oi; // may have been updated above
+ t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
+ }
t->register_on_applied(new C_OSD_AppliedRecoveredObject(this, obc));
- t->register_on_applied_sync(new C_OSD_OndiskWriteUnlock(obc));
publish_stats_to_osd();
assert(missing_loc.needs_recovery(hoid));
- missing_loc.add_location(hoid, pg_whoami);
+ if (!is_delete)
+ missing_loc.add_location(hoid, pg_whoami);
release_backoffs(hoid);
if (!is_unreadable_object(hoid)) {
auto unreadable_object_entry = waiting_for_unreadable_object.find(hoid);
void PrimaryLogPG::on_global_recover(
const hobject_t &soid,
- const object_stat_sum_t &stat_diff)
+ const object_stat_sum_t &stat_diff,
+ bool is_delete)
{
info.stats.stats.sum.add(stat_diff);
missing_loc.recovered(soid);
map<hobject_t, ObjectContextRef>::iterator i = recovering.find(soid);
assert(i != recovering.end());
- // recover missing won't have had an obc, but it gets filled in
- // during on_local_recover
- assert(i->second);
- list<OpRequestRef> requeue_list;
- i->second->drop_recovery_read(&requeue_list);
- requeue_ops(requeue_list);
+ if (!is_delete) {
+ // recover missing won't have had an obc, but it gets filled in
+ // during on_local_recover
+ assert(i->second);
+ list<OpRequestRef> requeue_list;
+ i->second->drop_recovery_read(&requeue_list);
+ requeue_ops(requeue_list);
+ }
backfills_in_flight.erase(soid);
PGBackend::RecoveryHandle *h = pgbackend->open_recovery_op();
if (is_missing_object(soid)) {
recover_missing(soid, v, cct->_conf->osd_client_op_priority, h);
+ } else if (missing_loc.is_deleted(soid)) {
+ prep_object_replica_deletes(soid, v, h);
} else {
prep_object_replica_pushes(soid, v, h);
}
return PULL_NONE;
}
+ if (missing_loc.is_deleted(soid)) {
+ start_recovery_op(soid);
+ assert(!recovering.count(soid));
+ recovering.insert(make_pair(soid, ObjectContextRef()));
+ epoch_t cur_epoch = get_osdmap()->get_epoch();
+ remove_missing_object(soid, v, new FunctionContext(
+ [=](int) {
+ lock();
+ if (!pg_has_reset_since(cur_epoch)) {
+ bool object_missing = false;
+ for (const auto& shard : actingbackfill) {
+ if (shard == pg_whoami)
+ continue;
+ if (peer_missing[shard].is_missing(soid)) {
+ dout(20) << __func__ << ": soid " << soid << " needs to be deleted from replica " << shard << dendl;
+ object_missing = true;
+ break;
+ }
+ }
+ if (!object_missing) {
+ object_stat_sum_t stat_diff;
+ stat_diff.num_objects_recovered = 1;
+ on_global_recover(soid, stat_diff, true);
+ } else {
+ auto recovery_handle = pgbackend->open_recovery_op();
+ pgbackend->recover_delete_object(soid, v, recovery_handle);
+ pgbackend->run_recovery_op(recovery_handle, priority);
+ }
+ }
+ unlock();
+ }));
+ return PULL_YES;
+ }
+
// is this a snapped object? if so, consult the snapset.. we may not need the entire object!
ObjectContextRef obc;
ObjectContextRef head_obc;
osd->send_message_osd_cluster(peer.osd, subop, get_osdmap()->get_epoch());
}
+void PrimaryLogPG::remove_missing_object(const hobject_t &soid,
+ eversion_t v, Context *on_complete)
+{
+ dout(20) << __func__ << " " << soid << " " << v << dendl;
+ assert(on_complete != nullptr);
+ // delete locally
+ ObjectStore::Transaction t;
+ remove_snap_mapped_object(t, soid);
+
+ ObjectRecoveryInfo recovery_info;
+ recovery_info.soid = soid;
+ recovery_info.version = v;
+
+ epoch_t cur_epoch = get_osdmap()->get_epoch();
+ t.register_on_complete(new FunctionContext(
+ [=](int) {
+ lock();
+ if (!pg_has_reset_since(cur_epoch)) {
+ ObjectStore::Transaction t2;
+ on_local_recover(soid, recovery_info, ObjectContextRef(), true, &t2);
+ t2.register_on_complete(on_complete);
+ int r = osd->store->queue_transaction(osr.get(), std::move(t2), nullptr);
+ assert(r == 0);
+ }
+ unlock();
+ }));
+ int r = osd->store->queue_transaction(osr.get(), std::move(t), nullptr);
+ assert(r == 0);
+}
void PrimaryLogPG::finish_degraded_object(const hobject_t& oid)
{
dout(10) << "finish_degraded_object " << oid << dendl;
- ObjectContextRef obc(object_contexts.lookup(oid));
if (callbacks_for_degraded_object.count(oid)) {
list<Context*> contexts;
contexts.swap(callbacks_for_degraded_object[oid]);
void PrimaryLogPG::_applied_recovered_object(ObjectContextRef obc)
{
lock();
- dout(10) << "_applied_recovered_object " << *obc << dendl;
-
+ dout(20) << __func__ << dendl;
+ if (obc) {
+ dout(20) << "obc = " << *obc << dendl;
+ }
assert(active_pushes >= 1);
--active_pushes;
requeue_scrub(false);
}
}
-
unlock();
}
void PrimaryLogPG::_applied_recovered_object_replica()
{
lock();
- dout(10) << "_applied_recovered_object_replica" << dendl;
-
+ dout(20) << __func__ << dendl;
assert(active_pushes >= 1);
--active_pushes;
PGQueueable(scrubber.active_rep_scrub, get_osdmap()->get_epoch()));
scrubber.active_rep_scrub = OpRequestRef();
}
-
unlock();
}
if (pg_log.get_log().objects.count(p->second)) {
latest = pg_log.get_log().objects.find(p->second)->second;
- assert(latest->is_update());
+ assert(latest->is_update() || latest->is_delete());
soid = latest->soid;
} else {
latest = 0;
return uhoh;
}
+int PrimaryLogPG::prep_object_replica_deletes(
+ const hobject_t& soid, eversion_t v,
+ PGBackend::RecoveryHandle *h)
+{
+ assert(is_primary());
+ dout(10) << __func__ << ": on " << soid << dendl;
+
+ start_recovery_op(soid);
+ assert(!recovering.count(soid));
+ recovering.insert(make_pair(soid, ObjectContextRef()));
+
+ pgbackend->recover_delete_object(soid, v, h);
+ return 1;
+}
+
int PrimaryLogPG::prep_object_replica_pushes(
const hobject_t& soid, eversion_t v,
PGBackend::RecoveryHandle *h)
continue;
}
+ if (missing_loc.is_deleted(soid)) {
+ dout(10) << __func__ << ": " << soid << " is a delete, removing" << dendl;
+ map<hobject_t,pg_missing_item>::const_iterator r = m.get_items().find(soid);
+ started += prep_object_replica_deletes(soid, r->second.need, h);
+ continue;
+ }
+
if (soid.is_snap() && pg_log.get_missing().is_missing(soid.get_head())) {
dout(10) << __func__ << ": " << soid.get_head()
<< " still missing on primary" << dendl;
for (unsigned int i = 0 ; i < peers.size(); ++i) {
map<pg_shard_t, pg_missing_t>::iterator bpm = peer_missing.find(peers[i]);
assert(bpm != peer_missing.end());
- bpm->second.add(oid, eversion_t(), eversion_t());
+ bpm->second.add(oid, eversion_t(), eversion_t(), false);
}
assert(!recovering.count(oid));
continue;
did.insert(p->soid);
- if (p->is_delete()) {
+ if (p->is_delete() && !is_missing_object(p->soid)) {
dout(10) << " checking " << p->soid
<< " at " << p->version << dendl;
struct stat st;
const hobject_t &oid,
const ObjectRecoveryInfo &recovery_info,
ObjectContextRef obc,
+ bool is_delete,
ObjectStore::Transaction *t
) override;
void on_peer_recover(
const hobject_t oid) override;
void on_global_recover(
const hobject_t &oid,
- const object_stat_sum_t &stat_diff) override;
+ const object_stat_sum_t &stat_diff,
+ bool is_delete) override;
void failed_push(const list<pg_shard_t> &from, const hobject_t &soid) override;
void primary_failed(const hobject_t &soid) override;
bool primary_error(const hobject_t& soid, eversion_t v) override;
const hobject_t &soid,
const object_stat_sum_t &delta_stats) override;
void on_primary_error(const hobject_t &oid, eversion_t v) override;
+ void remove_missing_object(const hobject_t &oid,
+ eversion_t v,
+ Context *on_complete) override;
template<class T> class BlessedGenContext;
class BlessedContext;
int prep_object_replica_pushes(const hobject_t& soid, eversion_t v,
PGBackend::RecoveryHandle *h);
+ int prep_object_replica_deletes(const hobject_t& soid, eversion_t v,
+ PGBackend::RecoveryHandle *h);
void finish_degraded_object(const hobject_t& oid);
RPGHandle *h = static_cast<RPGHandle *>(_h);
send_pushes(priority, h->pushes);
send_pulls(priority, h->pulls);
+ send_recovery_deletes(priority, h->deletes);
delete h;
}
dout(10) << __func__ << ": " << op << dendl;
switch (op->get_req()->get_type()) {
case MSG_OSD_PG_PULL:
- case MSG_OSD_PG_RECOVERY_DELETE:
- case MSG_OSD_PG_RECOVERY_DELETE_REPLY:
return true;
default:
return false;
}
}
-bool ReplicatedBackend::handle_message(
+bool ReplicatedBackend::_handle_message(
OpRequestRef op
)
{
bc->get_parent()->primary_error(i.hoid, obc->obs.oi.version);
} else if (!started) {
bc->get_parent()->on_global_recover(
- i.hoid, i.stat);
+ i.hoid, i.stat, false);
}
handle.reset_tp_timeout();
}
clear_pull_from(piter);
to_continue->push_back({hoid, pi.stat});
get_parent()->on_local_recover(
- hoid, pi.recovery_info, pi.obc, t);
+ hoid, pi.recovery_info, pi.obc, false, t);
return false;
} else {
response->soid = pop.soid;
pop.recovery_info.soid,
pop.recovery_info,
ObjectContextRef(), // ok, is replica
+ false,
t);
}
if (pushing[soid].empty()) {
if (!error)
- get_parent()->on_global_recover(soid, stat);
+ get_parent()->on_global_recover(soid, stat, false);
else
get_parent()->on_primary_error(soid, v);
-
pushing.erase(soid);
} else {
// This looks weird, but we erased the current peer and need to remember
bool can_handle_while_inactive(OpRequestRef op) override;
/// @see PGBackend::handle_message
- bool handle_message(
+ bool _handle_message(
OpRequestRef op
) override;