disk_tp(cct, "OSD::disk_tp", cct->_conf->osd_disk_threads, "osd_disk_threads"),
command_tp(cct, "OSD::command_tp", 1),
paused_recovery(false),
- session_waiting_for_map_lock("OSD::session_waiting_for_map_lock"),
+ session_waiting_lock("OSD::session_waiting_lock"),
heartbeat_lock("OSD::heartbeat_lock"),
heartbeat_stop(false), heartbeat_update_lock("OSD::heartbeat_update_lock"),
heartbeat_need_update(true), heartbeat_epoch(0),
pg_map[pgid] = pg;
pg->get("PGMap"); // because it's in pg_map
service.pg_add_epoch(pg->info.pgid, createmap->get_epoch());
- wake_pg_waiters(pg, pgid);
}
+ wake_pg_waiters(pg, pgid);
return pg;
}
pg->get("PGMap"); // For pg_map
pg_map[pg->info.pgid] = pg;
service.pg_add_epoch(pg->info.pgid, pg->get_osdmap()->get_epoch());
- wake_pg_waiters(pg, pg->info.pgid);
dout(10) << "Adding newly split pg " << *pg << dendl;
vector<int> up, acting;
PG *OSD::get_pg_or_queue_for_pg(spg_t pgid, OpRequestRef op)
{
- {
- RWLock::RLocker l(pg_map_lock);
- ceph::unordered_map<spg_t, PG*>::iterator i = pg_map.find(pgid);
- if (i != pg_map.end())
- return i->second;
- }
- RWLock::WLocker l(pg_map_lock);
+ RWLock::RLocker l(pg_map_lock);
+ Session *session = static_cast<Session*>(
+ op->get_req()->get_connection()->get_priv());
+
ceph::unordered_map<spg_t, PG*>::iterator i = pg_map.find(pgid);
- if (i != pg_map.end()) {
- return i->second;
+ if (i == pg_map.end())
+ session->waiting_for_pg[pgid];
+
+ map<spg_t, list<OpRequestRef> >::iterator wlistiter =
+ session->waiting_for_pg.find(pgid);
+
+ PG *out = NULL;
+ if (wlistiter == session->waiting_for_pg.end()) {
+ out = i->second;
} else {
- waiting_for_pg[pgid].push_back(op);
- return NULL;
+ wlistiter->second.push_back(op);
+ register_session_waiting_on_pg(session, pgid);
}
+ session->put();
+ return out;
}
bool OSD::_have_pg(spg_t pgid)
void OSD::dispatch_session_waiting(Session *session, OSDMapRef osdmap)
{
+ assert(session->session_dispatch_lock.is_locked());
+ assert(session->osdmap == osdmap);
for (list<OpRequestRef>::iterator i = session->waiting_on_map.begin();
i != session->waiting_on_map.end() && dispatch_op_fast(*i, osdmap);
session->waiting_on_map.erase(i++));
}
}
+
+void OSD::update_waiting_for_pg(Session *session, OSDMapRef newmap)
+{
+ assert(session->session_dispatch_lock.is_locked());
+ if (!session->osdmap) {
+ session->osdmap = newmap;
+ return;
+ }
+
+ if (newmap->get_epoch() == session->osdmap->get_epoch())
+ return;
+
+ assert(newmap->get_epoch() > session->osdmap->get_epoch());
+
+ map<spg_t, list<OpRequestRef> > from;
+ from.swap(session->waiting_for_pg);
+
+ for (map<spg_t, list<OpRequestRef> >::iterator i = from.begin();
+ i != from.end();
+ from.erase(i++)) {
+ set<spg_t> children;
+ if (!newmap->have_pg_pool(i->first.pool())) {
+ // drop this wait list on the ground
+ continue;
+ }
+ assert(session->osdmap->have_pg_pool(i->first.pool()));
+ if (i->first.is_split(
+ session->osdmap->get_pg_num(i->first.pool()),
+ newmap->get_pg_num(i->first.pool()),
+ &children)) {
+ for (set<spg_t>::iterator child = children.begin();
+ child != children.end();
+ ++child) {
+ unsigned split_bits = child->get_split_bits(
+ newmap->get_pg_num(child->pool()));
+ list<OpRequestRef> child_ops;
+ OSD::split_list(&i->second, &child_ops, child->ps(), split_bits);
+ if (!child_ops.empty()) {
+ session->waiting_for_pg[*child].swap(child_ops);
+ register_session_waiting_on_pg(session, *child);
+ }
+ }
+ }
+ if (i->second.empty()) {
+ clear_session_waiting_on_pg(session, i->first);
+ } else {
+ session->waiting_for_pg[i->first].swap(i->second);
+ }
+ }
+
+ session->osdmap = newmap;
+}
+
+void OSD::session_notify_pg_create(
+ Session *session, OSDMapRef osdmap, spg_t pgid)
+{
+ assert(session->session_dispatch_lock.is_locked());
+ update_waiting_for_pg(session, osdmap);
+ map<spg_t, list<OpRequestRef> >::iterator i =
+ session->waiting_for_pg.find(pgid);
+ if (i != session->waiting_for_pg.end()) {
+ session->waiting_on_map.splice(
+ session->waiting_on_map.end(),
+ i->second);
+ session->waiting_for_pg.erase(i);
+ }
+ clear_session_waiting_on_pg(session, pgid);
+}
+
+void OSD::session_notify_pg_cleared(
+ Session *session, OSDMapRef osdmap, spg_t pgid)
+{
+ assert(session->session_dispatch_lock.is_locked());
+ update_waiting_for_pg(session, osdmap);
+ session->waiting_for_pg.erase(pgid);
+ clear_session_waiting_on_pg(session, pgid);
+}
+
void OSD::ms_fast_dispatch(Message *m)
{
OpRequestRef op = op_tracker.create_request<OpRequest>(m);
assert(session);
{
Mutex::Locker l(session->session_dispatch_lock);
+ update_waiting_for_pg(session, nextmap);
session->waiting_on_map.push_back(op);
dispatch_session_waiting(session, nextmap);
}
i != sessions_to_check.end();
sessions_to_check.erase(i++)) {
(*i)->session_dispatch_lock.Lock();
+ update_waiting_for_pg(*i, osdmap);
dispatch_session_waiting(*i, osdmap);
(*i)->session_dispatch_lock.Unlock();
(*i)->put();
}
- // remove any PGs which we no longer host from the waiting_for_pg list
- set<spg_t> pgs_to_delete;
- {
- RWLock::RLocker l(pg_map_lock);
- map<spg_t, list<OpRequestRef> >::iterator p = waiting_for_pg.begin();
- while (p != waiting_for_pg.end()) {
- spg_t pgid = p->first;
-
- vector<int> acting;
- int nrep = osdmap->pg_to_acting_osds(pgid.pgid, acting);
- int role = osdmap->calc_pg_role(whoami, acting, nrep);
-
- if (role < 0) {
- pgs_to_delete.insert(p->first);
- /* we can delete list contents under the read lock because
- * nobody will be adding to them -- everybody is now using a map
- * new enough that they will simply drop ops instead of adding
- * them to the list. */
- dout(10) << " discarding waiting ops for " << pgid << dendl;
- while (!p->second.empty()) {
- p->second.pop_front();
- }
+ // remove any PGs which we no longer host from the session waiting_for_pg lists
+ set<spg_t> pgs_to_check;
+ get_pgs_with_waiting_sessions(&pgs_to_check);
+ for (set<spg_t>::iterator p = pgs_to_check.begin();
+ p != pgs_to_check.end();
+ ++p) {
+ vector<int> acting;
+ int nrep = osdmap->pg_to_acting_osds(p->pgid, acting);
+ int role = osdmap->calc_pg_role(whoami, acting, nrep);
+
+ if (role < 0) {
+ set<Session*> concerned_sessions;
+ get_sessions_possibly_interested_in_pg(*p, &concerned_sessions);
+ for (set<Session*>::iterator i = concerned_sessions.begin();
+ i != concerned_sessions.end();
+ ++i) {
+ {
+ Mutex::Locker l((*i)->session_dispatch_lock);
+ session_notify_pg_cleared(*i, osdmap, *p);
+ }
+ (*i)->put();
}
- ++p;
}
}
- {
- RWLock::WLocker l(pg_map_lock);
- for (set<spg_t>::iterator i = pgs_to_delete.begin();
- i != pgs_to_delete.end();
- ++i) {
- map<spg_t, list<OpRequestRef> >::iterator p = waiting_for_pg.find(*i);
- assert(p->second.empty());
- waiting_for_pg.erase(p);
- }
- }
-
// scan pg's
{
osd->service.complete_split(to_complete);
}
osd->pg_map_lock.put_write();
+ osd->wake_pg_waiters(&**i, (*i)->info.pgid);
osd->dispatch_context_transaction(rctx, &**i);
to_complete.insert((*i)->info.pgid);
(*i)->unlock();
Mutex session_dispatch_lock;
list<OpRequestRef> waiting_on_map;
+ OSDMapRef osdmap; /// Map as of which waiting_for_pg is current
+ map<spg_t, list<OpRequestRef> > waiting_for_pg;
+
Mutex sent_epoch_lock;
epoch_t last_sent_epoch;
Mutex received_map_lock;
sent_epoch_lock("Session::sent_epoch_lock"), last_sent_epoch(0),
received_map_lock("Session::received_map_lock"), received_map_epoch(0)
{}
+
+
};
+ void update_waiting_for_pg(Session *session, OSDMapRef osdmap);
+ void session_notify_pg_create(Session *session, OSDMapRef osdmap, spg_t pgid);
+ void session_notify_pg_cleared(Session *session, OSDMapRef osdmap, spg_t pgid);
void dispatch_session_waiting(Session *session, OSDMapRef osdmap);
- Mutex session_waiting_for_map_lock;
+
+ Mutex session_waiting_lock;
set<Session*> session_waiting_for_map;
+ map<spg_t, set<Session*> > session_waiting_for_pg;
+
/// Caller assumes refs for included Sessions
void get_sessions_waiting_for_map(set<Session*> *out) {
- Mutex::Locker l(session_waiting_for_map_lock);
+ Mutex::Locker l(session_waiting_lock);
out->swap(session_waiting_for_map);
}
void register_session_waiting_on_map(Session *session) {
- Mutex::Locker l(session_waiting_for_map_lock);
+ Mutex::Locker l(session_waiting_lock);
session->get();
session_waiting_for_map.insert(session);
}
void clear_session_waiting_on_map(Session *session) {
- Mutex::Locker l(session_waiting_for_map_lock);
+ Mutex::Locker l(session_waiting_lock);
set<Session*>::iterator i = session_waiting_for_map.find(session);
if (i != session_waiting_for_map.end()) {
(*i)->put();
session_waiting_for_map.erase(i);
}
}
+ void register_session_waiting_on_pg(Session *session, spg_t pgid) {
+ Mutex::Locker l(session_waiting_lock);
+ set<Session*> &s = session_waiting_for_pg[pgid];
+ set<Session*>::iterator i = s.find(session);
+ if (i == s.end()) {
+ session->get();
+ s.insert(session);
+ }
+ }
+ void clear_session_waiting_on_pg(Session *session, spg_t pgid) {
+ Mutex::Locker l(session_waiting_lock);
+ map<spg_t, set<Session*> >::iterator i = session_waiting_for_pg.find(pgid);
+ if (i == session_waiting_for_pg.end()) {
+ return;
+ }
+ set<Session*>::iterator j = i->second.find(session);
+ if (j != i->second.end()) {
+ (*j)->put();
+ i->second.erase(j);
+ }
+ if (i->second.empty()) {
+ session_waiting_for_pg.erase(i);
+ }
+ }
+ void get_sessions_possibly_interested_in_pg(
+ spg_t pgid, set<Session*> *sessions) {
+ Mutex::Locker l(session_waiting_lock);
+ while (1) {
+ map<spg_t, set<Session*> >::iterator i = session_waiting_for_pg.find(pgid);
+ if (i != session_waiting_for_pg.end()) {
+ sessions->insert(i->second.begin(), i->second.end());
+ }
+ if (pgid.pgid.ps() == 0) {
+ break;
+ } else {
+ pgid = pgid.get_parent();
+ }
+ }
+ for (set<Session*>::iterator i = sessions->begin();
+ i != sessions->end();
+ ++i) {
+ (*i)->get();
+ }
+ }
+ void get_pgs_with_waiting_sessions(set<spg_t> *pgs) {
+ Mutex::Locker l(session_waiting_lock);
+ for (map<spg_t, set<Session*> >::iterator i =
+ session_waiting_for_pg.begin();
+ i != session_waiting_for_pg.end();
+ ++i) {
+ pgs->insert(i->first);
+ }
+ }
private:
/**
// -- placement groups --
RWLock pg_map_lock; // this lock orders *above* individual PG _locks
ceph::unordered_map<spg_t, PG*> pg_map; // protected by pg_map lock
- map<spg_t, list<OpRequestRef> > waiting_for_pg; // protected by pg_map lock
map<spg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split;
PGRecoveryStats pg_recovery_stats;
); ///< @return false if there was a map gap between from and now
void wake_pg_waiters(PG* pg, spg_t pgid) {
+ assert(osd_lock.is_locked());
// Need write lock on pg_map_lock
- map<spg_t, list<OpRequestRef> >::iterator i = waiting_for_pg.find(pgid);
- if (i != waiting_for_pg.end()) {
- for (list<OpRequestRef>::iterator j = i->second.begin();
- j != i->second.end();
- ++j) {
- enqueue_op(pg, *j);
+ set<Session*> concerned_sessions;
+ get_sessions_possibly_interested_in_pg(pgid, &concerned_sessions);
+
+ for (set<Session*>::iterator i = concerned_sessions.begin();
+ i != concerned_sessions.end();
+ ++i) {
+ {
+ Mutex::Locker l((*i)->session_dispatch_lock);
+ session_notify_pg_create(*i, osdmap, pgid);
+ dispatch_session_waiting(*i, osdmap);
}
- waiting_for_pg.erase(i);
+ (*i)->put();
}
}