}
// map hobject range to PG(s)
- bool queued = false;
- hobject_t pos = m->begin;
- do {
- pg_t _pgid(pos.get_hash(), pos.pool);
- if (osdmap->have_pg_pool(pos.pool)) {
- _pgid = osdmap->raw_pg_to_pg(_pgid);
- }
- if (!osdmap->have_pg_pool(_pgid.pool())) {
- // missing pool -- drop
- return;
- }
- spg_t pgid;
- if (osdmap->get_primary_shard(_pgid, &pgid)) {
- dout(10) << __func__ << " pos " << pos << " pgid " << pgid << dendl;
- PGRef pg = get_pg_or_queue_for_pg(pgid, op, s);
- if (pg) {
- if (!queued) {
- enqueue_op(pg, op);
- queued = true;
- } else {
- // use a fresh OpRequest
- m->get(); // take a ref for the new OpRequest
- OpRequestRef newop(op_tracker.create_request<OpRequest, Message*>(m));
- newop->mark_event("duplicated original op for another pg");
- enqueue_op(pg, newop);
- }
- }
- }
- // advance
- pos = _pgid.get_hobj_end(osdmap->get_pg_pool(pos.pool)->get_pg_num());
- dout(20) << __func__ << " next pg " << pos << dendl;
- } while (pos < m->end);
+ PGRef pg = get_pg_or_queue_for_pg(m->pgid, op, s);
+ if (pg) {
+ enqueue_op(pg, op);
+ }
}
template<typename T, int MSGTYPE>
_split_into(child_pgid, child, split_bits);
- // release all backoffs so that Objecter doesn't need to handle unblock
- // on split backoffs
+ // release all backoffs for simplicity
release_backoffs(hobject_t(), hobject_t::get_max());
- // split any remaining (deleting) backoffs among child PGs
- split_backoffs(child, split_bits);
child->on_new_interval();
ConnectionRef con = s->con;
if (!con) // OSD::ms_handle_reset clears s->con without a lock
return;
- Backoff *b = s->have_backoff(begin);
+ Backoff *b = s->have_backoff(info.pgid, begin);
if (b) {
derr << __func__ << " already have backoff for " << s << " begin " << begin
<< " " << *b << dendl;
}
Mutex::Locker l(backoff_lock);
{
- Mutex::Locker l(s->backoff_lock);
- b = new Backoff(this, s, ++s->backoff_seq, begin, end);
- auto& ls = s->backoffs[begin];
- if (ls.empty()) {
- ++s->backoff_count;
- }
- assert(s->backoff_count == (int)s->backoffs.size());
- ls.insert(b);
+ b = new Backoff(info.pgid, this, s, ++s->backoff_seq, begin, end);
backoffs[begin].insert(b);
+ s->add_backoff(b);
dout(10) << __func__ << " session " << s << " added " << *b << dendl;
}
con->send_message(
}
}
-void PG::split_backoffs(PG *child, unsigned split_bits)
-{
- dout(10) << __func__ << " split_bits " << split_bits << " child "
- << child->info.pgid.pgid << dendl;
- unsigned mask = ~((~0)<<split_bits);
- vector<BackoffRef> backoffs_to_dup; // pg backoffs
- vector<BackoffRef> backoffs_to_move; // oid backoffs
- {
- Mutex::Locker l(backoff_lock);
- auto p = backoffs.begin();
- while (p != backoffs.end()) {
- if (p->first == info.pgid.pgid.get_hobj_start()) {
- // if it is a full PG we always dup it for the child.
- for (auto& q : p->second) {
- dout(10) << __func__ << " pg backoff " << p->first
- << " " << q << dendl;
- backoffs_to_dup.push_back(q);
- }
- } else {
- // otherwise, we move it to the child only if falls into the
- // childs hash range.
- if ((p->first.get_hash() & mask) == child->info.pgid.pgid.ps()) {
- for (auto& q : p->second) {
- dout(20) << __func__ << " will move " << p->first
- << " " << q << dendl;
- backoffs_to_move.push_back(q);
- }
- p = backoffs.erase(p);
- continue;
- } else {
- dout(20) << __func__ << " will not move " << p->first
- << " " << p->second << dendl;
- }
- }
- ++p;
- }
- }
- for (auto b : backoffs_to_dup) {
- SessionRef s;
- {
- Mutex::Locker l(b->lock);
- b->end = info.pgid.pgid.get_hobj_end(split_bits);
- dout(10) << __func__ << " pg backoff " << *b << dendl;
- s = b->session;
- }
- if (s) {
- child->add_pg_backoff(b->session);
- } else {
- dout(20) << __func__ << " didn't dup pg backoff, session is null"
- << dendl;
- }
- }
- for (auto b : backoffs_to_move) {
- Mutex::Locker l(b->lock);
- if (b->pg == this) {
- dout(10) << __func__ << " move backoff " << *b << " to child" << dendl;
- b->pg = child;
- child->backoffs[b->begin].insert(b);
- } else {
- dout(20) << __func__ << " move backoff " << *b << " nowhere... pg is null"
- << dendl;
- }
- }
-}
-
void PG::clear_backoffs()
{
dout(10) << __func__ << " " << dendl;
release_backoffs(o, o);
}
void clear_backoffs();
- void split_backoffs(PG *child, unsigned split_bits);
void add_pg_backoff(SessionRef s) {
hobject_t begin = info.pgid.pgid.get_hobj_start();
}
dout(10) << __func__ << " backoff ack id " << m->id
<< " [" << begin << "," << end << ")" << dendl;
- session->ack_backoff(cct, m->id, begin, end);
+ session->ack_backoff(cct, m->pgid, m->id, begin, end);
}
void PrimaryLogPG::do_request(
session->put(); // get_priv takes a ref, and so does the SessionRef
if (op->get_req()->get_type() == CEPH_MSG_OSD_OP) {
- Backoff *b = session->have_backoff(info.pgid.pgid.get_hobj_start());
+ Backoff *b = session->have_backoff(info.pgid,
+ info.pgid.pgid.get_hobj_start());
if (b) {
dout(10) << " have backoff " << *b << " " << *m << dendl;
assert(!b->is_acked() || !g_conf->osd_debug_crash_on_ignored_backoff);
}
session->put(); // get_priv() takes a ref, and so does the intrusive_ptr
- Backoff *b = session->have_backoff(head);
+ Backoff *b = session->have_backoff(info.pgid, head);
if (b) {
dout(10) << __func__ << " have backoff " << *b << " " << *m << dendl;
assert(!b->is_acked() || !g_conf->osd_debug_crash_on_ignored_backoff);
void Session::clear_backoffs()
{
- map<hobject_t,set<BackoffRef>> ls;
+ map<spg_t,map<hobject_t,set<BackoffRef>>> ls;
{
Mutex::Locker l(backoff_lock);
ls.swap(backoffs);
backoff_count = 0;
}
- for (auto& p : ls) {
- for (auto& b : p.second) {
- Mutex::Locker l(b->lock);
- if (b->pg) {
- assert(b->session == this);
- assert(b->is_new() || b->is_acked());
- b->pg->rm_backoff(b);
- b->pg.reset();
- b->session.reset();
- } else if (b->session) {
- assert(b->session == this);
- assert(b->is_deleting());
- b->session.reset();
+ for (auto& i : ls) {
+ for (auto& p : i.second) {
+ for (auto& b : p.second) {
+ Mutex::Locker l(b->lock);
+ if (b->pg) {
+ assert(b->session == this);
+ assert(b->is_new() || b->is_acked());
+ b->pg->rm_backoff(b);
+ b->pg.reset();
+ b->session.reset();
+ } else if (b->session) {
+ assert(b->session == this);
+ assert(b->is_deleting());
+ b->session.reset();
+ }
}
}
}
void Session::ack_backoff(
CephContext *cct,
+ spg_t pgid,
uint64_t id,
const hobject_t& begin,
const hobject_t& end)
{
Mutex::Locker l(backoff_lock);
- // NOTE that ack may be for [a,c] but osd may now have [a,b) and
- // [b,c) due to a PG split.
- auto p = backoffs.lower_bound(begin);
- while (p != backoffs.end()) {
- // note: must still examine begin=end=p->first case
- int r = cmp(p->first, end);
- if (r > 0 || (r == 0 && begin < end)) {
- break;
- }
- auto q = p->second.begin();
- while (q != p->second.end()) {
- Backoff *b = (*q).get();
- if (b->id == id) {
- if (b->is_new()) {
- b->state = Backoff::STATE_ACKED;
- dout(20) << __func__ << " now " << *b << dendl;
- } else if (b->is_deleting()) {
- dout(20) << __func__ << " deleting " << *b << dendl;
- q = p->second.erase(q);
- continue;
- }
+ auto p = backoffs.find(pgid);
+ if (p == backoffs.end()) {
+ dout(20) << __func__ << " " << pgid << " " << id << " [" << begin << ","
+ << end << ") pg not found" << dendl;
+ return;
+ }
+ auto q = p->second.find(begin);
+ if (q == p->second.end()) {
+ dout(20) << __func__ << " " << pgid << " " << id << " [" << begin << ","
+ << end << ") begin not found" << dendl;
+ return;
+ }
+ for (auto i = q->second.begin(); i != q->second.end(); ++i) {
+ Backoff *b = (*i).get();
+ if (b->id == id) {
+ if (b->is_new()) {
+ b->state = Backoff::STATE_ACKED;
+ dout(20) << __func__ << " now " << *b << dendl;
+ } else if (b->is_deleting()) {
+ dout(20) << __func__ << " deleting " << *b << dendl;
+ q->second.erase(i);
+ --backoff_count;
}
- ++q;
+ break;
}
+ }
+ if (q->second.empty()) {
+ dout(20) << __func__ << " clearing begin bin " << q->first << dendl;
+ p->second.erase(q);
if (p->second.empty()) {
- dout(20) << __func__ << " clearing bin " << p->first << dendl;
- p = backoffs.erase(p);
- --backoff_count;
- } else {
- ++p;
+ dout(20) << __func__ << " clearing pg bin " << p->first << dendl;
+ backoffs.erase(p);
}
}
- assert(backoff_count == (int)backoffs.size());
+ assert(!backoff_count == backoffs.empty());
}
STATE_DELETING = 3 ///< backoff deleted, but un-acked
};
std::atomic_int state = {STATE_NEW};
+ spg_t pgid; ///< owning pgid
uint64_t id = 0; ///< unique id (within the Session)
bool is_new() const {
SessionRef session; ///< owning session
hobject_t begin, end; ///< [) range to block, unless ==, then single obj
- Backoff(PGRef pg, SessionRef s,
+ Backoff(spg_t pgid, PGRef pg, SessionRef s,
uint64_t i,
const hobject_t& b, const hobject_t& e)
: RefCountedObject(g_ceph_context, 0),
+ pgid(pgid),
id(i),
lock("Backoff::lock"),
pg(pg),
end(e) {}
friend ostream& operator<<(ostream& out, const Backoff& b) {
- return out << "Backoff(" << &b << " " << b.id
+ return out << "Backoff(" << &b << " " << b.pgid << " " << b.id
<< " " << b.get_state_name()
<< " [" << b.begin << "," << b.end << ") "
<< " session " << b.session
/// protects backoffs; orders inside Backoff::lock *and* PG::backoff_lock
Mutex backoff_lock;
std::atomic_int backoff_count= {0}; ///< simple count of backoffs
- map<hobject_t,set<BackoffRef>> backoffs;
+ map<spg_t,map<hobject_t,set<BackoffRef>>> backoffs;
std::atomic<uint64_t> backoff_seq = {0};
void ack_backoff(
CephContext *cct,
+ spg_t pgid,
uint64_t id,
const hobject_t& start,
const hobject_t& end);
- Backoff *have_backoff(const hobject_t& oid) {
- if (backoff_count.load()) {
- Mutex::Locker l(backoff_lock);
- assert(backoff_count == (int)backoffs.size());
- auto p = backoffs.lower_bound(oid);
- if (p != backoffs.begin() &&
- p->first > oid) {
- --p;
- }
- if (p != backoffs.end()) {
- int r = cmp(oid, p->first);
- if (r == 0 || r > 0) {
- for (auto& q : p->second) {
- if (r == 0 || oid < q->end) {
- return &(*q);
- }
+ Backoff *have_backoff(spg_t pgid, const hobject_t& oid) {
+ if (!backoff_count.load()) {
+ return nullptr;
+ }
+ Mutex::Locker l(backoff_lock);
+ assert(!backoff_count == backoffs.empty());
+ auto i = backoffs.find(pgid);
+ if (i == backoffs.end()) {
+ return nullptr;
+ }
+ auto p = i->second.lower_bound(oid);
+ if (p != i->second.begin() &&
+ p->first > oid) {
+ --p;
+ }
+ if (p != i->second.end()) {
+ int r = cmp(oid, p->first);
+ if (r == 0 || r > 0) {
+ for (auto& q : p->second) {
+ if (r == 0 || oid < q->end) {
+ return &(*q);
}
}
}
return nullptr;
}
+ void add_backoff(Backoff *b) {
+ Mutex::Locker l(backoff_lock);
+ assert(!backoff_count == backoffs.empty());
+ backoffs[b->pgid][b->begin].insert(b);
+ ++backoff_count;
+ }
+
// called by PG::release_*_backoffs and PG::clear_backoffs()
void rm_backoff(BackoffRef b) {
Mutex::Locker l(backoff_lock);
assert(b->lock.is_locked_by_me());
assert(b->session == this);
- auto p = backoffs.find(b->begin);
- // may race with clear_backoffs()
- if (p != backoffs.end()) {
- auto q = p->second.find(b);
- if (q != p->second.end()) {
- p->second.erase(q);
- if (p->second.empty()) {
- backoffs.erase(p);
+ auto i = backoffs.find(b->pgid);
+ if (i != backoffs.end()) {
+ // may race with clear_backoffs()
+ auto p = i->second.find(b->begin);
+ if (p != i->second.end()) {
+ auto q = p->second.find(b);
+ if (q != p->second.end()) {
+ p->second.erase(q);
--backoff_count;
+ if (p->second.empty()) {
+ i->second.erase(p);
+ if (i->second.empty()) {
+ backoffs.erase(i);
+ }
+ }
}
}
}
- assert(backoff_count == (int)backoffs.size());
+ assert(!backoff_count == backoffs.empty());
}
void clear_backoffs();
};