info.pgid, create, false, role, up, acting, history, pi,
*rctx.transaction);
pg->handle_create(&rctx);
- dispatch_context(rctx, pg);
+ dispatch_context(rctx, pg, osdmap);
created++;
dout(10) << *pg << " is new" << dendl;
int peer = inst.name.num();
- assert(is_active());
-
// send map?
epoch_t pe = get_peer_epoch(peer);
if (pe) {
pg->handle_create(&rctx);
pg->write_if_dirty(*rctx.transaction);
pg->update_stats();
- dispatch_context(rctx, pg);
pg->unlock();
+ dispatch_context(rctx, pg, osdmap);
num_created++;
}
}
- do_queries(query_map);
- do_infos(info_map);
-
maybe_update_heartbeat_peers();
}
}
}
-void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg)
+void OSD::dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap)
{
- do_notifies(*ctx.notify_list);
+ do_notifies(*ctx.notify_list, curmap);
delete ctx.notify_list;
- do_queries(*ctx.query_map);
+ do_queries(*ctx.query_map, curmap);
delete ctx.query_map;
- do_infos(*ctx.info_map);
+ do_infos(*ctx.info_map, curmap);
delete ctx.info_map;
if (ctx.transaction->empty() || !pg) {
delete ctx.transaction;
*/
void OSD::do_notifies(
- map< int,vector<pair<pg_notify_t,pg_interval_map_t> > >& notify_list)
+ map< int,vector<pair<pg_notify_t,pg_interval_map_t> > >& notify_list,
+ OSDMapRef curmap)
{
for (map< int, vector<pair<pg_notify_t,pg_interval_map_t> > >::iterator it = notify_list.begin();
it != notify_list.end();
dout(7) << "do_notify osd." << it->first << " is self, skipping" << dendl;
continue;
}
- if (!osdmap->is_up(it->first))
+ if (!curmap->is_up(it->first))
continue;
dout(7) << "do_notify osd." << it->first
<< " on " << it->second.size() << " PGs" << dendl;
- MOSDPGNotify *m = new MOSDPGNotify(osdmap->get_epoch(),
+ MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(),
it->second);
- _share_map_outgoing(osdmap->get_cluster_inst(it->first));
- cluster_messenger->send_message(m, osdmap->get_cluster_inst(it->first));
+ _share_map_outgoing(curmap->get_cluster_inst(it->first), curmap);
+ cluster_messenger->send_message(m, curmap->get_cluster_inst(it->first));
}
}
/** do_queries
* send out pending queries for info | summaries
*/
-void OSD::do_queries(map< int, map<pg_t,pg_query_t> >& query_map)
+void OSD::do_queries(map< int, map<pg_t,pg_query_t> >& query_map,
+ OSDMapRef curmap)
{
for (map< int, map<pg_t,pg_query_t> >::iterator pit = query_map.begin();
pit != query_map.end();
pit++) {
- if (!osdmap->is_up(pit->first))
+ if (!curmap->is_up(pit->first))
continue;
int who = pit->first;
dout(7) << "do_queries querying osd." << who
<< " on " << pit->second.size() << " PGs" << dendl;
- MOSDPGQuery *m = new MOSDPGQuery(osdmap->get_epoch(), pit->second);
- _share_map_outgoing(osdmap->get_cluster_inst(who));
- cluster_messenger->send_message(m, osdmap->get_cluster_inst(who));
+ MOSDPGQuery *m = new MOSDPGQuery(curmap->get_epoch(), pit->second);
+ _share_map_outgoing(curmap->get_cluster_inst(who), curmap);
+ cluster_messenger->send_message(m, curmap->get_cluster_inst(who));
}
}
-void OSD::do_infos(map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >& info_map)
+void OSD::do_infos(map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >& info_map,
+ OSDMapRef curmap)
{
for (map<int,vector<pair<pg_notify_t, pg_interval_map_t> > >::iterator p = info_map.begin();
p != info_map.end();
- ++p) {
- if (!osdmap->is_up(p->first))
+ ++p) {
+ if (!curmap->is_up(p->first))
continue;
for (vector<pair<pg_notify_t,pg_interval_map_t> >::iterator i = p->second.begin();
i != p->second.end();
++i) {
dout(20) << "Sending info " << i->first.info << " to osd." << p->first << dendl;
}
- MOSDPGInfo *m = new MOSDPGInfo(osdmap->get_epoch());
+ MOSDPGInfo *m = new MOSDPGInfo(curmap->get_epoch());
m->pg_list = p->second;
- cluster_messenger->send_message(m, osdmap->get_cluster_inst(p->first));
+ cluster_messenger->send_message(m, curmap->get_cluster_inst(p->first));
}
info_map.clear();
}
pg_interval_map_t()));
}
}
- do_notifies(notify_list);
+ do_notifies(notify_list, osdmap);
}
}
}
-
pg->write_if_dirty(*rctx.transaction);
- dispatch_context(rctx, pg);
+ OSDMapRef curmap = pg->get_osdmap();
pg->unlock();
+ dispatch_context(rctx, pg, curmap);
}
pg->put();
}
same_interval_since = pg->info.history.same_interval_since;
pg->unlock();
}
- {
- Mutex::Locker l(osd_lock);
- if (need_up_thru)
- queue_want_up_thru(same_interval_since);
- dispatch_context(rctx, 0);
- }
+ if (need_up_thru)
+ queue_want_up_thru(same_interval_since);
+ dispatch_context(rctx, 0, curmap);
+
service.send_pg_temp();
}
// -- generic pg peering --
PG::RecoveryCtx create_context();
- void dispatch_context(PG::RecoveryCtx &ctx, PG *pg);
+ void dispatch_context(PG::RecoveryCtx &ctx, PG *pg, OSDMapRef curmap);
void dispatch_context_transaction(PG::RecoveryCtx &ctx, PG *pg);
- void do_notifies(map< int,vector<pair<pg_notify_t, pg_interval_map_t> > >& notify_list);
- void do_queries(map< int, map<pg_t,pg_query_t> >& query_map);
- void do_infos(map<int, vector<pair<pg_notify_t, pg_interval_map_t> > >& info_map);
+ void do_notifies(map< int,vector<pair<pg_notify_t, pg_interval_map_t> > >& notify_list,
+ OSDMapRef map);
+ void do_queries(map< int, map<pg_t,pg_query_t> >& query_map,
+ OSDMapRef map);
+ void do_infos(map<int, vector<pair<pg_notify_t, pg_interval_map_t> > >& info_map,
+ OSDMapRef map);
void repeer(PG *pg, map< int, map<pg_t,pg_query_t> >& query_map);
bool require_mon_peer(Message *m);