for (set<hobject_t, hobject_t::BitwiseComparator>::iterator j = i->second.begin();
j != i->second.end();
++j) {
- assert(pulling.count(*j) == 1);
get_parent()->cancel_pull(*j);
- pulling.erase(*j);
+ clear_pull(pulling.find(*j), false);
}
pull_from_peer.erase(i++);
} else {
void ReplicatedBackend::clear_recovery_state()
{
// clear pushing/pulling maps
+ for (auto &&i: pushing) {
+ for (auto &&j: i.second) {
+ get_parent()->release_locks(j.second.lock_manager);
+ }
+ }
pushing.clear();
+
+ for (auto &&i: pulling) {
+ get_parent()->release_locks(i.second.lock_manager);
+ }
pulling.clear();
pull_from_peer.clear();
}
struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
ReplicatedBackend *bc;
- list<hobject_t> to_continue;
+ list<ReplicatedBackend::pull_complete_info> to_continue;
int priority;
C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
: bc(bc), priority(priority) {}
void finish(ThreadPool::TPHandle &handle) {
ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
- for (list<hobject_t>::iterator i =
- to_continue.begin();
- i != to_continue.end();
- ++i) {
- map<hobject_t, ReplicatedBackend::PullInfo, hobject_t::BitwiseComparator>::iterator j =
- bc->pulling.find(*i);
- assert(j != bc->pulling.end());
- if (!bc->start_pushes(*i, j->second.obc, h)) {
+ for (auto &&i: to_continue) {
+ if (!bc->start_pushes(i.hoid, i.obc, h)) {
bc->get_parent()->on_global_recover(
- *i, j->second.stat);
+ i.hoid, i.stat);
}
- bc->pulling.erase(*i);
handle.reset_tp_timeout();
}
bc->run_recovery_op(h, priority);
vector<PullOp> replies(1);
ObjectStore::Transaction t;
- list<hobject_t> to_continue;
+ list<pull_complete_info> to_continue;
for (vector<PushOp>::iterator i = m->pushes.begin();
i != m->pushes.end();
++i) {
const pg_missing_t& missing,
const hobject_t &last_backfill,
interval_set<uint64_t>& data_subset,
- map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets)
+ map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
+ ObcLockManager &manager)
{
dout(10) << "calc_head_subsets " << head
<< " clone_overlap " << snapset.clone_overlap << dendl;
c.snap = snapset.clones[j];
prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
if (!missing.is_missing(c) &&
- cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0) {
+ cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0 &&
+ get_parent()->try_lock_for_read(c, manager)) {
dout(10) << "calc_head_subsets " << head << " has prev " << c
<< " overlap " << prev << dendl;
clone_subsets[c] = prev;
if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
dout(10) << "skipping clone, too many holes" << dendl;
+ get_parent()->release_locks(manager);
clone_subsets.clear();
cloning.clear();
}
const pg_missing_t& missing,
const hobject_t &last_backfill,
interval_set<uint64_t>& data_subset,
- map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets)
+ map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
+ ObcLockManager &manager)
{
dout(10) << "calc_clone_subsets " << soid
<< " clone_overlap " << snapset.clone_overlap << dendl;
c.snap = snapset.clones[j];
prev.intersection_of(snapset.clone_overlap[snapset.clones[j]]);
if (!missing.is_missing(c) &&
- cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0) {
+ cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0 &&
+ get_parent()->try_lock_for_read(c, manager)) {
dout(10) << "calc_clone_subsets " << soid << " has prev " << c
<< " overlap " << prev << dendl;
clone_subsets[c] = prev;
c.snap = snapset.clones[j];
next.intersection_of(snapset.clone_overlap[snapset.clones[j-1]]);
if (!missing.is_missing(c) &&
- cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0) {
+ cmp(c, last_backfill, get_parent()->sort_bitwise()) < 0 &&
+ get_parent()->try_lock_for_read(c, manager)) {
dout(10) << "calc_clone_subsets " << soid << " has next " << c
<< " overlap " << next << dendl;
clone_subsets[c] = next;
if (cloning.num_intervals() > cct->_conf->osd_recover_clone_overlap_limit) {
dout(10) << "skipping clone, too many holes" << dendl;
+ get_parent()->release_locks(manager);
clone_subsets.clear();
cloning.clear();
}
}
ObjectRecoveryInfo recovery_info;
+ ObcLockManager lock_manager;
if (soid.is_snap()) {
assert(!get_parent()->get_local_missing().is_missing(
SnapSetContext *ssc = headctx->ssc;
assert(ssc);
dout(10) << " snapset " << ssc->snapset << dendl;
- calc_clone_subsets(ssc->snapset, soid, get_parent()->get_local_missing(),
- get_info().last_backfill,
- recovery_info.copy_subset,
- recovery_info.clone_subset);
+ calc_clone_subsets(
+ ssc->snapset, soid, get_parent()->get_local_missing(),
+ get_info().last_backfill,
+ recovery_info.copy_subset,
+ recovery_info.clone_subset,
+ lock_manager);
// FIXME: this may overestimate if we are pulling multiple clones in parallel...
dout(10) << " pulling " << recovery_info << dendl;
assert(!pulling.count(soid));
pull_from_peer[fromshard].insert(soid);
PullInfo &pi = pulling[soid];
+ pi.from = fromshard;
+ pi.soid = soid;
pi.head_ctx = headctx;
pi.recovery_info = op.recovery_info;
pi.recovery_progress = op.recovery_progress;
pi.cache_dont_need = h->cache_dont_need;
+ pi.lock_manager = std::move(lock_manager);
}
/*
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator> clone_subsets;
interval_set<uint64_t> data_subset;
+ ObcLockManager lock_manager;
// are we doing a clone on the replica?
if (soid.snap && soid.snap < CEPH_NOSNAP) {
hobject_t head = soid;
map<pg_shard_t, pg_info_t>::const_iterator pi =
get_parent()->get_shard_info().find(peer);
assert(pi != get_parent()->get_shard_info().end());
- calc_clone_subsets(ssc->snapset, soid,
- pm->second,
- pi->second.last_backfill,
- data_subset, clone_subsets);
+ calc_clone_subsets(
+ ssc->snapset, soid,
+ pm->second,
+ pi->second.last_backfill,
+ data_subset, clone_subsets,
+ lock_manager);
} else if (soid.snap == CEPH_NOSNAP) {
// pushing head or unversioned object.
// base this on partially on replica's clones?
obc,
ssc->snapset, soid, get_parent()->get_shard_missing().find(peer)->second,
get_parent()->get_shard_info().find(peer)->second.last_backfill,
- data_subset, clone_subsets);
+ data_subset, clone_subsets,
+ lock_manager);
}
- prep_push(obc, soid, peer, oi.version, data_subset, clone_subsets, pop, cache_dont_need);
+ prep_push(
+ obc,
+ soid,
+ peer,
+ oi.version,
+ data_subset,
+ clone_subsets,
+ pop,
+ cache_dont_need,
+ std::move(lock_manager));
}
void ReplicatedBackend::prep_push(ObjectContextRef obc,
prep_push(obc, soid, peer,
obc->obs.oi.version, data_subset, clone_subsets,
- pop, cache_dont_need);
+ pop, cache_dont_need, ObcLockManager());
}
void ReplicatedBackend::prep_push(
interval_set<uint64_t> &data_subset,
map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
PushOp *pop,
- bool cache_dont_need)
+ bool cache_dont_need,
+ ObcLockManager &&lock_manager)
{
get_parent()->begin_peer_recover(peer, soid);
// take note.
pi.recovery_progress.data_recovered_to = 0;
pi.recovery_progress.data_complete = 0;
pi.recovery_progress.omap_complete = 0;
+ pi.lock_manager = std::move(lock_manager);
ObjectRecoveryProgress new_progress;
int r = build_push_op(pi.recovery_info,
ObjectRecoveryInfo ReplicatedBackend::recalc_subsets(
const ObjectRecoveryInfo& recovery_info,
- SnapSetContext *ssc)
+ SnapSetContext *ssc,
+ ObcLockManager &manager)
{
if (!recovery_info.soid.snap || recovery_info.soid.snap >= CEPH_NOSNAP)
return recovery_info;
new_info.copy_subset.clear();
new_info.clone_subset.clear();
assert(ssc);
- calc_clone_subsets(ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
- get_info().last_backfill,
- new_info.copy_subset, new_info.clone_subset);
+ get_parent()->release_locks(manager); // might already have locks
+ calc_clone_subsets(
+ ssc->snapset, new_info.soid, get_parent()->get_local_missing(),
+ get_info().last_backfill,
+ new_info.copy_subset, new_info.clone_subset,
+ manager);
return new_info;
}
bool ReplicatedBackend::handle_pull_response(
pg_shard_t from, PushOp &pop, PullOp *response,
- list<hobject_t> *to_continue,
- ObjectStore::Transaction *t
- )
+ list<pull_complete_info> *to_continue,
+ ObjectStore::Transaction *t)
{
interval_set<uint64_t> data_included = pop.data_included;
bufferlist data;
}
pi.obc = get_parent()->get_obc(pi.recovery_info.soid, pop.attrset);
pi.recovery_info.oi = pi.obc->obs.oi;
- pi.recovery_info = recalc_subsets(pi.recovery_info, pi.obc->ssc);
+ pi.recovery_info = recalc_subsets(
+ pi.recovery_info,
+ pi.obc->ssc,
+ pi.lock_manager);
}
if (complete) {
pi.stat.num_objects_recovered++;
- to_continue->push_back(hoid);
+ to_continue->push_back({hoid, pi.obc, pi.stat});
get_parent()->on_local_recover(
hoid, pi.recovery_info, pi.obc, t);
- pull_from_peer[from].erase(hoid);
- if (pull_from_peer[from].empty())
- pull_from_peer.erase(from);
+ clear_pull(pulling.find(hoid));
return false;
} else {
response->soid = pop.soid;
stat.num_keys_recovered = reply->omap_entries.size();
stat.num_objects_recovered = 1;
+ get_parent()->release_locks(pi->lock_manager);
pushing[soid].erase(peer);
pi = NULL;
if (is_primary()) {
PullOp resp;
RPGHandle *h = _open_recovery_op();
- list<hobject_t> to_continue;
+ list<pull_complete_info> to_continue;
bool more = handle_pull_response(
m->from, pop, &resp,
&to_continue, &t);
{
list<pg_shard_t> fl = { from };
get_parent()->failed_push(fl, soid);
- pull_from_peer[from].erase(soid);
- if (pull_from_peer[from].empty())
- pull_from_peer.erase(from);
- pulling.erase(soid);
+
+ clear_pull(pulling.find(soid));
+}
+
+void ReplicatedBackend::clear_pull(
+ map<hobject_t, PullInfo, hobject_t::BitwiseComparator>::iterator piter,
+ bool clear_pull_from_peer)
+{
+ auto from = piter->second.from;
+ if (clear_pull_from_peer) {
+ pull_from_peer[from].erase(piter->second.soid);
+ if (pull_from_peer[from].empty())
+ pull_from_peer.erase(from);
+ }
+ get_parent()->release_locks(piter->second.lock_manager);
+ pulling.erase(piter);
}
int ReplicatedBackend::start_pushes(
ObjectRecoveryInfo recovery_info;
ObjectContextRef obc;
object_stat_sum_t stat;
+ ObcLockManager lock_manager;
void dump(Formatter *f) const {
{
// pull
struct PullInfo {
+ pg_shard_t from;
+ hobject_t soid;
ObjectRecoveryProgress recovery_progress;
ObjectRecoveryInfo recovery_info;
ObjectContextRef head_ctx;
ObjectContextRef obc;
object_stat_sum_t stat;
bool cache_dont_need;
+ ObcLockManager lock_manager;
void dump(Formatter *f) const {
{
// Reverse mapping from osd peer to objects beging pulled from that peer
map<pg_shard_t, set<hobject_t, hobject_t::BitwiseComparator> > pull_from_peer;
+ void clear_pull(
+ map<hobject_t, PullInfo, hobject_t::BitwiseComparator>::iterator piter,
+ bool clear_pull_from_peer = true);
void sub_op_push(OpRequestRef op);
void sub_op_push_reply(OpRequestRef op);
bool handle_push_reply(pg_shard_t peer, PushReplyOp &op, PushOp *reply);
void handle_pull(pg_shard_t peer, PullOp &op, PushOp *reply);
+
+ struct pull_complete_info {
+ hobject_t hoid;
+ ObjectContextRef obc;
+ object_stat_sum_t stat;
+ };
bool handle_pull_response(
pg_shard_t from, PushOp &op, PullOp *response,
- list<hobject_t> *to_continue,
+ list<pull_complete_info> *to_continue,
ObjectStore::Transaction *t);
void handle_push(pg_shard_t from, PushOp &op, PushReplyOp *response,
ObjectStore::Transaction *t);
SnapSet& snapset, const hobject_t& poid, const pg_missing_t& missing,
const hobject_t &last_backfill,
interval_set<uint64_t>& data_subset,
- map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets);
+ map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
+ ObcLockManager &lock_manager);
void prepare_pull(
eversion_t v,
const hobject_t& soid,
void prep_push_to_replica(
ObjectContextRef obc, const hobject_t& soid, pg_shard_t peer,
PushOp *pop, bool cache_dont_need = true);
- void prep_push(ObjectContextRef obc,
- const hobject_t& oid, pg_shard_t dest,
- PushOp *op,
- bool cache_dont_need);
- void prep_push(ObjectContextRef obc,
- const hobject_t& soid, pg_shard_t peer,
- eversion_t version,
- interval_set<uint64_t> &data_subset,
- map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
- PushOp *op,
- bool cache = false);
- void calc_head_subsets(ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
- const pg_missing_t& missing,
- const hobject_t &last_backfill,
- interval_set<uint64_t>& data_subset,
- map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets);
+ void prep_push(
+ ObjectContextRef obc,
+ const hobject_t& oid, pg_shard_t dest,
+ PushOp *op,
+ bool cache_dont_need);
+ void prep_push(
+ ObjectContextRef obc,
+ const hobject_t& soid, pg_shard_t peer,
+ eversion_t version,
+ interval_set<uint64_t> &data_subset,
+ map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
+ PushOp *op,
+ bool cache,
+ ObcLockManager &&lock_manager);
+ void calc_head_subsets(
+ ObjectContextRef obc, SnapSet& snapset, const hobject_t& head,
+ const pg_missing_t& missing,
+ const hobject_t &last_backfill,
+ interval_set<uint64_t>& data_subset,
+ map<hobject_t, interval_set<uint64_t>, hobject_t::BitwiseComparator>& clone_subsets,
+ ObcLockManager &lock_manager);
ObjectRecoveryInfo recalc_subsets(
const ObjectRecoveryInfo& recovery_info,
- SnapSetContext *ssc
- );
+ SnapSetContext *ssc,
+ ObcLockManager &lock_manager);
/**
* Client IO