class_handler(osd->class_handler),
publish_lock("OSDService::publish_lock"),
pre_publish_lock("OSDService::pre_publish_lock"),
+ peer_map_epoch_lock("OSDService::peer_map_epoch_lock"),
sched_scrub_lock("OSDService::sched_scrub_lock"), scrubs_pending(0),
scrubs_active(0),
agent_lock("OSD::agent_lock"),
op_wq(this, cct->_conf->osd_op_thread_timeout, &op_tp),
peering_wq(this, cct->_conf->osd_op_thread_timeout, &op_tp),
map_lock("OSD::map_lock"),
- peer_map_epoch_lock("OSD::peer_map_epoch_lock"),
pg_map_lock("OSD::pg_map_lock"),
debug_drop_pg_create_probability(cct->_conf->osd_debug_drop_pg_create_probability),
debug_drop_pg_create_duration(cct->_conf->osd_debug_drop_pg_create_duration),
m->get_connection()->get_messenger()->send_message(r, m->get_connection());
if (curmap->is_up(from)) {
- note_peer_epoch(from, m->map_epoch);
+ service.note_peer_epoch(from, m->map_epoch);
if (is_active()) {
ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch());
if (con) {
- _share_map_outgoing(from, con.get());
+ service.share_map_outgoing(from, con.get());
}
}
} else if (!curmap->exists(from) ||
if (m->map_epoch &&
curmap->is_up(from)) {
- note_peer_epoch(from, m->map_epoch);
+ service.note_peer_epoch(from, m->map_epoch);
if (is_active()) {
ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch());
if (con) {
- _share_map_outgoing(from, con.get());
+ service.share_map_outgoing(from, con.get());
}
}
}
}
const entity_inst_t& peer_inst = next_map->get_cluster_inst(peer);
Connection *peer_con = osd->cluster_messenger->get_connection(peer_inst).get();
- osd->_share_map_outgoing(peer, peer_con, next_map);
+ share_map_outgoing(peer, peer_con, next_map);
osd->cluster_messenger->send_message(m, peer_inst);
release_map(next_map);
}
// send them the latest diff to ensure they realize the mapping
// has changed.
- send_incremental_map(osdmap->get_epoch() - 1, con, osdmap);
+ service.send_incremental_map(osdmap->get_epoch() - 1, con, osdmap);
// do not reply; they will get newer maps and realize they
// need to resend.
// --------------------------------------
// dispatch
-epoch_t OSD::get_peer_epoch(int peer)
+epoch_t OSDService::get_peer_epoch(int peer)
{
Mutex::Locker l(peer_map_epoch_lock);
map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
return p->second;
}
-epoch_t OSD::note_peer_epoch(int peer, epoch_t e)
+epoch_t OSDService::note_peer_epoch(int peer, epoch_t e)
{
Mutex::Locker l(peer_map_epoch_lock);
map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
}
}
-void OSD::forget_peer_epoch(int peer, epoch_t as_of)
+void OSDService::forget_peer_epoch(int peer, epoch_t as_of)
{
Mutex::Locker l(peer_map_epoch_lock);
map<int,epoch_t>::iterator p = peer_map_epoch.find(peer);
(osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
osdmap->get_hb_back_addr(name.num()) == con->get_peer_addr())) {
// remember
- epoch_t has = MAX(osd->get_peer_epoch(name.num()), epoch);
+ epoch_t has = MAX(get_peer_epoch(name.num()), epoch);
// share?
if (has < osdmap->get_epoch()) {
<< " < " << osdmap->get_epoch() << dendl;
// we know the Session is valid or we wouldn't be sending
*sent_epoch_p = osdmap->get_epoch();
- osd->send_incremental_map(epoch, con, osdmap);
+ send_incremental_map(epoch, con, osdmap);
} else if (con->get_messenger() == osd->cluster_messenger &&
osdmap->is_up(name.num()) &&
(osdmap->get_cluster_addr(name.num()) == con->get_peer_addr() ||
dout(10) << name << " " << con->get_peer_addr()
<< " has old map " << epoch << " < "
<< osdmap->get_epoch() << dendl;
- osd->note_peer_epoch(name.num(), osdmap->get_epoch());
- osd->send_incremental_map(epoch, con, osdmap);
+ note_peer_epoch(name.num(), osdmap->get_epoch());
+ send_incremental_map(epoch, con, osdmap);
}
}
}
-void OSD::_share_map_outgoing(int peer, Connection *con, OSDMapRef map)
+void OSDService::share_map_outgoing(int peer, Connection *con, OSDMapRef map)
{
if (!map)
- map = service.get_osdmap();
+ map = get_osdmap();
// send map?
epoch_t pe = get_peer_epoch(peer);
void OSD::note_up_osd(int peer)
{
- forget_peer_epoch(peer, osdmap->get_epoch() - 1);
+ service.forget_peer_epoch(peer, osdmap->get_epoch() - 1);
}
struct C_OnMapApply : public Context {
}
-MOSDMap *OSD::build_incremental_map_msg(epoch_t since, epoch_t to,
- OSDSuperblock& superblock)
+MOSDMap *OSDService::build_incremental_map_msg(epoch_t since, epoch_t to,
+ OSDSuperblock& sblock)
{
MOSDMap *m = new MOSDMap(monc->get_fsid());
- m->oldest_map = superblock.oldest_map;
- m->newest_map = superblock.newest_map;
+ m->oldest_map = sblock.oldest_map;
+ m->newest_map = sblock.newest_map;
for (epoch_t e = to; e > since; e--) {
bufferlist bl;
return m;
}
-void OSD::send_map(MOSDMap *m, Connection *con)
+void OSDService::send_map(MOSDMap *m, Connection *con)
{
Messenger *msgr = client_messenger;
if (entity_name_t::TYPE_OSD == con->get_peer_type())
msgr->send_message(m, con);
}
-void OSD::send_incremental_map(epoch_t since, Connection *con,
- OSDMapRef& osdmap)
+void OSDService::send_incremental_map(epoch_t since, Connection *con,
+ OSDMapRef& osdmap)
{
epoch_t to = osdmap->get_epoch();
dout(10) << "send_incremental_map " << since << " -> " << to
MOSDMap *m = NULL;
while (!m) {
- OSDSuperblock superblock(service.get_superblock());
- if (since < superblock.oldest_map) {
+ OSDSuperblock sblock(get_superblock());
+ if (since < sblock.oldest_map) {
// just send latest full map
MOSDMap *m = new MOSDMap(monc->get_fsid());
- m->oldest_map = superblock.oldest_map;
- m->newest_map = superblock.newest_map;
+ m->oldest_map = sblock.oldest_map;
+ m->newest_map = sblock.newest_map;
get_map_bl(to, m->maps[to]);
send_map(m, con);
return;
if (to - since > (epoch_t)cct->_conf->osd_map_message_max)
to = since + cct->_conf->osd_map_message_max;
- m = build_incremental_map_msg(since, to, superblock);
+ m = build_incremental_map_msg(since, to, sblock);
}
send_map(m, con);
}
it->first, curmap->get_epoch());
if (!con)
continue;
- _share_map_outgoing(it->first, con.get(), curmap);
+ service.share_map_outgoing(it->first, con.get(), curmap);
if (con->has_feature(CEPH_FEATURE_INDEP_PG_MAP)) {
dout(7) << "do_notify osd " << it->first
<< " on " << it->second.size() << " PGs" << dendl;
ConnectionRef con = service.get_con_osd_cluster(who, curmap->get_epoch());
if (!con)
continue;
- _share_map_outgoing(who, con.get(), curmap);
+ service.share_map_outgoing(who, con.get(), curmap);
if (con->has_feature(CEPH_FEATURE_INDEP_PG_MAP)) {
dout(7) << "do_queries querying osd." << who
<< " on " << pit->second.size() << " PGs" << dendl;
p->first, curmap->get_epoch());
if (!con)
continue;
- _share_map_outgoing(p->first, con.get(), curmap);
+ service.share_map_outgoing(p->first, con.get(), curmap);
if (con->has_feature(CEPH_FEATURE_INDEP_PG_MAP)) {
MOSDPGInfo *m = new MOSDPGInfo(curmap->get_epoch());
m->pg_list = p->second;
it->second.from, it->second.to,
osdmap->get_epoch(), empty,
it->second.epoch_sent);
- _share_map_outgoing(from, con.get(), osdmap);
+ service.share_map_outgoing(from, con.get(), osdmap);
cluster_messenger->send_message(mlog, con.get());
}
} else {
}
}
+private:
+ Mutex peer_map_epoch_lock;
+ map<int, epoch_t> peer_map_epoch;
+public:
+ epoch_t get_peer_epoch(int p);
+ epoch_t note_peer_epoch(int p, epoch_t e);
+ void forget_peer_epoch(int p, epoch_t e);
+
+ void send_map(class MOSDMap *m, Connection *con);
+ void send_incremental_map(epoch_t since, Connection *con, OSDMapRef& osdmap);
+ MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
+ OSDSuperblock& superblock);
bool should_share_map(entity_name_t name, Connection *con, epoch_t epoch,
OSDMapRef& osdmap, const epoch_t *sent_epoch_p);
void share_map_incoming(entity_name_t name, Connection *con, epoch_t epoch,
OSDMapRef& osdmap, epoch_t *sent_epoch_p);
+ void share_map_outgoing(int peer, Connection *con,
+ OSDMapRef map = OSDMapRef());
ConnectionRef get_con_osd_cluster(int peer, epoch_t from_epoch);
pair<ConnectionRef,ConnectionRef> get_con_osd_hb(int peer, epoch_t from_epoch); // (back, front)
RWLock map_lock;
list<OpRequestRef> waiting_for_osdmap;
- Mutex peer_map_epoch_lock;
- map<int, epoch_t> peer_map_epoch;
-
- epoch_t get_peer_epoch(int p);
- epoch_t note_peer_epoch(int p, epoch_t e);
- void forget_peer_epoch(int p, epoch_t e);
-
friend struct send_map_on_destruct;
- void _share_map_outgoing(int peer, Connection *con,
- OSDMapRef map = OSDMapRef());
-
void wait_for_new_map(OpRequestRef op);
void handle_osd_map(class MOSDMap *m);
void note_down_osd(int osd);
void pin_map_inc_bl(epoch_t e, bufferlist &bl) {
return service.pin_map_inc_bl(e, bl);
}
- bool get_inc_map_bl(epoch_t e, bufferlist& bl) {
- return service.get_inc_map_bl(e, bl);
- }
-
- MOSDMap *build_incremental_map_msg(epoch_t from, epoch_t to,
- OSDSuperblock& superblock);
- void send_incremental_map(epoch_t since, Connection *con, OSDMapRef& osdmap);
- void send_map(MOSDMap *m, Connection *con);
protected:
// -- placement groups --