map<int,HeartbeatInfo>::iterator i = heartbeat_peers.find(p);
if (i == heartbeat_peers.end()) {
- Connection *con = service.get_con_osd_hb(p, osdmap->get_epoch());
+ ConnectionRef con = service.get_con_osd_hb(p, osdmap->get_epoch());
if (!con)
return;
hi = &heartbeat_peers[p];
- hi->con = con;
+ hi->con = con.get();
+ hi->con->get();
hi->peer = p;
hi->con->set_priv(new HeartbeatSession(p));
dout(10) << "_add_heartbeat_peer: new peer osd." << p
if (curmap->is_up(from)) {
note_peer_epoch(from, m->map_epoch);
if (is_active()) {
- Connection *con = service.get_con_osd_cluster(from, curmap->get_epoch());
+ ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch());
if (con) {
- _share_map_outgoing(from, con);
- con->put();
+ _share_map_outgoing(from, con.get());
}
}
}
curmap->is_up(from)) {
note_peer_epoch(from, m->map_epoch);
if (is_active()) {
- Connection *con = service.get_con_osd_cluster(from, curmap->get_epoch());
+ ConnectionRef con = service.get_con_osd_cluster(from, curmap->get_epoch());
if (con) {
- _share_map_outgoing(from, con);
- con->put();
+ _share_map_outgoing(from, con.get());
}
}
}
map<int,HeartbeatInfo>::iterator p = heartbeat_peers.find(s->peer);
if (p != heartbeat_peers.end() &&
p->second.con == con) {
- Connection *newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch);
+ ConnectionRef newcon = service.get_con_osd_hb(p->second.peer, p->second.epoch);
if (!newcon) {
dout(10) << "heartbeat_reset reopen failed hb con " << con << " but failed to reopen" << dendl;
s->put();
return true;
}
dout(10) << "heartbeat_reset reopen failed hb con " << con << dendl;
- p->second.con = newcon;
+ p->second.con = newcon.get();
+ p->second.con->get();
p->second.con->set_priv(s);
} else {
dout(10) << "heartbeat_reset closing (old) failed hb con " << con << dendl;
osd->cluster_messenger->send_message(m, next_osdmap->get_cluster_inst(peer));
}
-Connection *OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
+ConnectionRef OSDService::get_con_osd_cluster(int peer, epoch_t from_epoch)
{
Mutex::Locker l(pre_publish_lock);
next_osdmap->get_info(peer).up_from > from_epoch) {
return NULL;
}
- return osd->cluster_messenger->get_connection(next_osdmap->get_cluster_inst(peer));
+ ConnectionRef ret(
+ osd->cluster_messenger->get_connection(next_osdmap->get_cluster_inst(peer)));
+ ret->put(); // Ref from get_connection
+ return ret;
}
-Connection *OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
+ConnectionRef OSDService::get_con_osd_hb(int peer, epoch_t from_epoch)
{
Mutex::Locker l(pre_publish_lock);
next_osdmap->get_info(peer).up_from > from_epoch) {
return NULL;
}
- return osd->hbclient_messenger->get_connection(next_osdmap->get_hb_inst(peer));
+ ConnectionRef ret(
+ osd->hbclient_messenger->get_connection(next_osdmap->get_hb_inst(peer)));
+ ret->put(); // Ref from get_connection
+ return ret;
}
void OSDService::queue_want_pg_temp(pg_t pgid, vector<int>& want)
}
if (!curmap->is_up(it->first))
continue;
- Connection *con = service.get_con_osd_cluster(it->first, curmap->get_epoch());
+ ConnectionRef con = service.get_con_osd_cluster(it->first, curmap->get_epoch());
if (!con)
continue;
- _share_map_outgoing(it->first, con, curmap);
+ _share_map_outgoing(it->first, con.get(), curmap);
if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) {
dout(7) << "do_notify osd." << it->first
<< " on " << it->second.size() << " PGs" << dendl;
MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(),
it->second);
- cluster_messenger->send_message(m, con);
+ cluster_messenger->send_message(m, con.get());
} else {
dout(7) << "do_notify osd." << it->first
<< " sending seperate messages" << dendl;
list[0] = *i;
MOSDPGNotify *m = new MOSDPGNotify(i->first.epoch_sent,
list);
- cluster_messenger->send_message(m, con);
+ cluster_messenger->send_message(m, con.get());
}
}
- con->put();
}
}
if (!curmap->is_up(pit->first))
continue;
int who = pit->first;
- Connection *con = service.get_con_osd_cluster(who, curmap->get_epoch());
+ ConnectionRef con = service.get_con_osd_cluster(who, curmap->get_epoch());
if (!con)
continue;
- _share_map_outgoing(who, con, curmap);
+ _share_map_outgoing(who, con.get(), curmap);
if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) {
dout(7) << "do_queries querying osd." << who
<< " on " << pit->second.size() << " PGs" << dendl;
MOSDPGQuery *m = new MOSDPGQuery(curmap->get_epoch(), pit->second);
- cluster_messenger->send_message(m, con);
+ cluster_messenger->send_message(m, con.get());
} else {
dout(7) << "do_queries querying osd." << who
<< " sending seperate messages "
map<pg_t, pg_query_t> to_send;
to_send.insert(*i);
MOSDPGQuery *m = new MOSDPGQuery(i->second.epoch_sent, to_send);
- cluster_messenger->send_message(m, con);
+ cluster_messenger->send_message(m, con.get());
}
}
- con->put();
}
}
++i) {
dout(20) << "Sending info " << i->first.info << " to osd." << p->first << dendl;
}
- Connection *con = service.get_con_osd_cluster(p->first, curmap->get_epoch());
+ ConnectionRef con = service.get_con_osd_cluster(p->first, curmap->get_epoch());
if (!con)
continue;
- _share_map_outgoing(p->first, con, curmap);
+ _share_map_outgoing(p->first, con.get(), curmap);
if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) {
MOSDPGInfo *m = new MOSDPGInfo(curmap->get_epoch());
m->pg_list = p->second;
- cluster_messenger->send_message(m, con);
+ cluster_messenger->send_message(m, con.get());
} else {
for (vector<pair<pg_notify_t, pg_interval_map_t> >::iterator i =
p->second.begin();
to_send[0] = *i;
MOSDPGInfo *m = new MOSDPGInfo(i->first.epoch_sent);
m->pg_list = to_send;
- cluster_messenger->send_message(m, con);
+ cluster_messenger->send_message(m, con.get());
}
}
- con->put();
}
info_map.clear();
}
pg_info_t empty(pgid);
if (it->second.type == pg_query_t::LOG ||
it->second.type == pg_query_t::FULLLOG) {
- Connection *con = service.get_con_osd_cluster(from, osdmap->get_epoch());
+ ConnectionRef con = service.get_con_osd_cluster(from, osdmap->get_epoch());
if (con) {
MOSDPGLog *mlog = new MOSDPGLog(osdmap->get_epoch(), empty,
it->second.epoch_sent);
- _share_map_outgoing(from, con, osdmap);
- cluster_messenger->send_message(mlog, con);
- con->put();
+ _share_map_outgoing(from, con.get(), osdmap);
+ cluster_messenger->send_message(mlog, con.get());
}
} else {
notify_list[from].push_back(make_pair(pg_notify_t(it->second.epoch_sent,
OSDMapRef curmap = osd->get_osdmap();
scrubber.is_chunky = true;
for (unsigned i=1; i<acting.size(); i++) {
- Connection *con = osd->get_con_osd_cluster(acting[i], get_osdmap()->get_epoch());
+ ConnectionRef con = osd->get_con_osd_cluster(acting[i], get_osdmap()->get_epoch());
if (!con)
continue;
if (!(con->features & CEPH_FEATURE_CHUNKY_SCRUB)) {
<< " does not support chunky scrubs, falling back to classic"
<< dendl;
scrubber.is_chunky = false;
- con->put();
break;
}
- con->put();
}
if (scrubber.is_chunky) {
dout(10) << " sending " << mlog->log << " " << mlog->missing << dendl;
- Connection *con = osd->get_con_osd_cluster(from, get_osdmap()->get_epoch());
+ ConnectionRef con = osd->get_con_osd_cluster(from, get_osdmap()->get_epoch());
if (con) {
- osd->osd->_share_map_outgoing(from, con, get_osdmap());
- osd->cluster_messenger->send_message(mlog, con);
- con->put();
+ osd->osd->_share_map_outgoing(from, con.get(), get_osdmap());
+ osd->cluster_messenger->send_message(mlog, con.get());
} else {
mlog->put();
}
context< RecoveryMachine >().log_enter(state_name);
PG *pg = context< RecoveryMachine >().pg;
pg->state_set(PG_STATE_BACKFILL_WAIT);
- Connection *con = pg->osd->get_con_osd_cluster(pg->backfill_target, pg->get_osdmap()->get_epoch());
+ ConnectionRef con = pg->osd->get_con_osd_cluster(
+ pg->backfill_target, pg->get_osdmap()->get_epoch());
if (con) {
if ((con->features & CEPH_FEATURE_BACKFILL_RESERVATION)) {
pg->osd->cluster_messenger->send_message(
MBackfillReserve::REQUEST,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
- con);
+ con.get());
} else {
post_event(RemoteBackfillReserved());
}
- con->put();
}
}
}
if (acting_osd_it != context< Active >().sorted_acting_set.end()) {
- Connection *con = pg->osd->get_con_osd_cluster(*acting_osd_it, pg->get_osdmap()->get_epoch());
+ ConnectionRef con = pg->osd->get_con_osd_cluster(*acting_osd_it, pg->get_osdmap()->get_epoch());
if (con) {
if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) {
pg->osd->cluster_messenger->send_message(
new MRecoveryReserve(MRecoveryReserve::REQUEST,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
- con);
+ con.get());
} else {
post_event(RemoteRecoveryReserved());
}
- con->put();
}
++acting_osd_it;
} else {
++i) {
if (*i == pg->osd->whoami) // skip myself
continue;
- Connection *con = pg->osd->get_con_osd_cluster(*i, pg->get_osdmap()->get_epoch());
+ ConnectionRef con = pg->osd->get_con_osd_cluster(*i, pg->get_osdmap()->get_epoch());
if (con) {
if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) {
pg->osd->cluster_messenger->send_message(
new MRecoveryReserve(MRecoveryReserve::RELEASE,
pg->info.pgid,
pg->get_osdmap()->get_epoch()),
- con);
+ con.get());
}
- con->put();
}
}
}