}
}
+void OSDService::send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch)
+{
+ Mutex::Locker l(publish_lock);
+
+ // service map is always newer/newest
+ assert(from_epoch <= next_osdmap->get_epoch());
+
+ if (next_osdmap->is_down(peer) ||
+ next_osdmap->get_info(peer).up_from > from_epoch) {
+ m->put();
+ return;
+ }
+ osd->cluster_messenger->send_message(m, next_osdmap->get_cluster_inst(peer));
+}
+
+Connection *OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
+{
+ Mutex::Locker l(publish_lock);
+
+ // service map is always newer/newest
+ assert(from_epoch <= next_osdmap->get_epoch());
+
+ if (next_osdmap->is_down(peer) ||
+ next_osdmap->get_info(peer).up_from > from_epoch) {
+ return NULL;
+ }
+ return osd->cluster_messenger->get_connection(next_osdmap->get_cluster_inst(peer));
+}
+
+Connection *OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
+{
+ Mutex::Locker l(publish_lock);
+
+ // service map is always newer/newest
+ assert(from_epoch <= next_osdmap->get_epoch());
+
+ if (next_osdmap->is_down(peer) ||
+ next_osdmap->get_info(peer).up_from > from_epoch) {
+ return NULL;
+ }
+ return osd->hbclient_messenger->get_connection(next_osdmap->get_hb_inst(peer));
+}
+
void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want)
{
Mutex::Locker l(pg_temp_lock);
OSDMapRef newmap = get_map(cur);
assert(newmap); // we just cached it above!
+ // start blacklisting messages sent to peers that go down.
+ service.pre_publish_map(newmap);
+
// kill connections to newly down osds
set<int> old;
osdmap->get_all_osds(old);
- for (set<int>::iterator p = old.begin(); p != old.end(); p++)
+ for (set<int>::iterator p = old.begin(); p != old.end(); p++) {
if (*p != whoami &&
osdmap->have_inst(*p) && // in old map
- (!newmap->exists(*p) || !newmap->is_up(*p))) // but not the new one
+ (!newmap->exists(*p) || !newmap->is_up(*p))) { // but not the new one
note_down_osd(*p);
+ }
+ }
osdmap = newmap;
Mutex::Locker l(publish_lock);
superblock = block;
}
- OSDMapRef osdmap;
+ OSDMapRef osdmap, next_osdmap;
OSDMapRef get_osdmap() {
Mutex::Locker l(publish_lock);
return osdmap;
}
+ void pre_publish_map(OSDMapRef map) {
+ Mutex::Locker l(publish_lock);
+ next_osdmap = map;
+ }
void publish_map(OSDMapRef map) {
Mutex::Locker l(publish_lock);
osdmap = map;
+ next_osdmap = map;
}
int get_nodeid() const { return whoami; }
+ // -- message helpers --
+ Connection *get_con_osd_cluster(int peer, epoch_t from_epoch);
+ Connection *get_con_osd_hb(int peer, epoch_t from_epoch);
+ void send_message_osd_cluster(int peer, Message *m, epoch_t from_epoch);
+
// -- scrub scheduling --
Mutex sched_scrub_lock;
int scrubs_pending;