p.m_seed,
p.get_split_bits(curmap->get_pg_num(_pool.id)),
_pool.id),
+ map_lock("PG::map_lock"),
osdmap_ref(curmap), pool(_pool),
_lock("PG::_lock"),
ref(0),
return cap;
}
+void PG::take_op_map_waiters()
+{
+ Mutex::Locker l(map_lock);
+ for (list<OpRequestRef>::iterator i = waiting_for_map.begin();
+ i != waiting_for_map.end();
+ ) {
+ if (op_must_wait_for_map(get_osdmap_with_maplock(), *i)) {
+ break;
+ } else {
+ osd->op_wq.queue(make_pair(PGRef(this), *i));
+ waiting_for_map.erase(i++);
+ }
+ }
+}
+
+void PG::queue_op(OpRequestRef op)
+{
+ Mutex::Locker l(map_lock);
+ if (!waiting_for_map.empty()) {
+ // preserve ordering
+ waiting_for_map.push_back(op);
+ return;
+ }
+ if (op_must_wait_for_map(get_osdmap_with_maplock(), op)) {
+ waiting_for_map.push_back(op);
+ return;
+ }
+ osd->op_wq.queue(make_pair(PGRef(this), op));
+}
+
void PG::do_request(OpRequestRef op)
{
// do any pending flush
osd->reply_op_error(op, -EPERM);
return;
}
- if (op_must_wait_for_map(get_osdmap(), op)) {
- dout(20) << " waiting for map on " << op << dendl;
- waiting_for_map.push_back(op);
- return;
- }
+ assert(!op_must_wait_for_map(get_osdmap(), op));
if (can_discard_request(op)) {
return;
}
void PG::split_ops(PG *child, unsigned split_bits) {
unsigned match = child->info.pgid.m_seed;
- assert(waiting_for_map.empty());
assert(waiting_for_all_missing.empty());
assert(waiting_for_missing_object.empty());
assert(waiting_for_degraded_object.empty());
osd->dequeue_pg(this, &waiting_for_active);
split_list(&waiting_for_active, &(child->waiting_for_active), match, split_bits);
+ {
+ Mutex::Locker l(map_lock); // to avoid a race with the osd dispatch
+ split_list(&waiting_for_map, &(child->waiting_for_map), match, split_bits);
+ }
}
void PG::split_into(pg_t child_pgid, PG *child, unsigned split_bits)
{
child->update_snap_mapper_bits(split_bits);
- child->osdmap_ref = osdmap_ref;
+ child->update_osdmap_ref(get_osdmap());
child->pool = pool;
void PG::take_waiters()
{
dout(10) << "take_waiters" << dendl;
- requeue_ops(waiting_for_map);
+ take_op_map_waiters();
for (list<CephPeeringEvtRef>::iterator i = peering_waiters.begin();
i != peering_waiters.end();
++i) osd->queue_for_peering(this);
assert(lastmap->get_epoch() == osdmap_ref->get_epoch());
assert(lastmap == osdmap_ref);
dout(10) << "handle_advance_map " << newup << "/" << newacting << dendl;
- osdmap_ref = osdmap;
+ update_osdmap_ref(osdmap);
pool.update(osdmap);
AdvMap evt(osdmap, lastmap, newup, newacting);
recovery_state.handle_event(evt, rctx);
snap_mapper.update_bits(bits);
}
protected:
+ // Ops waiting for map, should be queued at back
+ Mutex map_lock;
+ list<OpRequestRef> waiting_for_map;
OSDMapRef osdmap_ref;
PGPool pool;
+ void queue_op(OpRequestRef op);
+ void take_op_map_waiters();
+
+ void update_osdmap_ref(OSDMapRef newmap) {
+ assert(_lock.is_locked_by_me());
+ Mutex::Locker l(map_lock);
+ osdmap_ref = newmap;
+ }
+
+ OSDMapRef get_osdmap_with_maplock() const {
+ assert(map_lock.is_locked());
+ assert(osdmap_ref);
+ return osdmap_ref;
+ }
+
OSDMapRef get_osdmap() const {
assert(is_locked());
assert(osdmap_ref);
// Ops waiting on backfill_pos to change
list<OpRequestRef> waiting_for_backfill_pos;
-
- list<OpRequestRef> waiting_for_map;
list<OpRequestRef> waiting_for_active;
list<OpRequestRef> waiting_for_all_missing;
map<hobject_t, list<OpRequestRef> > waiting_for_missing_object,