if (objecter) { delete objecter; objecter = 0; }
if (osdmap) { delete osdmap; osdmap = 0; }
if (mdsmap) { delete mdsmap; mdsmap = 0; }
-
- if (messenger) { delete messenger; messenger = 0; }
+ if (messenger)
+ messenger->destroy();
}
dout(0) << "ms_handle_failure " << *m << " to " << inst
<< ", resending to mon" << mon
<< dendl;
- messenger->send_message(m, monmap->get_inst(mon));
+ Message *n = decode_message(m->get_header(), m->get_footer(), m->get_payload(), m->get_data());
+ messenger->send_message(n, monmap->get_inst(mon));
}
else {
dout(0) << "ms_handle_failure " << *m << " to " << inst << ", dropping" << dendl;
- delete m;
}
}
// wait for messenger to finish
rank.wait();
-
+ messenger->destroy();
return 0;
}
if (filer) { delete filer; filer = 0; }
if (objecter) { delete objecter; objecter = 0; }
- if (messenger) { delete messenger; messenger = NULL; }
if (logger) { delete logger; logger = 0; }
if (logger2) { delete logger2; logger2 = 0; }
-
+
+ if (messenger)
+ messenger->destroy();
}
{
mds_lock.Lock();
dout(0) << "ms_handle_failure to " << inst << " on " << *m << dendl;
-
- delete m;
mds_lock.Unlock();
}
dout(2) << "get_monmap got monmap epoch " << pmonmap->epoch << " fsid " << pmonmap->fsid << dendl;
}
msgr->shutdown();
- //delete msgr; // FIXME: we need proper reference counting in messenger
+ msgr->destroy();
rank.wait();
if (monmap_bl.length())
delete mdsmon;
delete clientmon;
delete pgmon;
- delete messenger;
+ if (messenger)
+ messenger->destroy();
}
void Monitor::init()
entity_inst_t _myinst;
int default_send_priority;
+ atomic_t nref;
+
public:
- Messenger(entity_name_t w) : dispatcher(0), default_send_priority(CEPH_MSG_PRIO_DEFAULT) {
+ Messenger(entity_name_t w) : dispatcher(0),
+ default_send_priority(CEPH_MSG_PRIO_DEFAULT),
+ nref(1) {
_myinst.name = w;
}
- virtual ~Messenger() { }
+ virtual ~Messenger() {
+ assert(nref.test() == 0);
+ }
+
+ void get() {
+ nref.inc();
+ }
+ void put() {
+ if (nref.dec() == 0)
+ delete this;
+ }
+ virtual void destroy() {
+ put();
+ }
// accessors
entity_name_t get_myname() { return _myinst.name; }
local.resize(max_local);
stopped.resize(max_local);
+ msgr->get();
local[erank] = msgr;
stopped[erank] = false;
msgr->_myinst.addr = rank_addr;
dout(10) << "unregister_entity " << msgr->get_myname() << dendl;
// remove from local directory.
+ assert(msgr->my_rank >= 0);
+ assert(local[msgr->my_rank] == msgr);
local[msgr->my_rank] = 0;
stopped[msgr->my_rank] = true;
num_local--;
+ msgr->my_rank = -1;
+
+ assert(msgr->nref.test() > 1);
+ msgr->put();
wait_cond.Signal();
// deregister
rank.unregister_entity(this);
+ put();
}
void Rank::EntityMessenger::ready()
{
dout(10) << "ready " << get_myaddr() << dendl;
assert(!dispatch_thread.is_started());
-
- // start my dispatch thread
+ get();
dispatch_thread.create();
}
cond.Signal();
lock.Unlock();
}
-
return 0;
}
my_rank(r),
need_addr(false),
dispatch_thread(this) { }
- ~EntityMessenger() {
+ ~EntityMessenger() { }
+
+ void destroy() {
// join dispatch thread
if (dispatch_thread.is_started())
dispatch_thread.join();
+
+ Messenger::destroy();
}
void ready();
OSD::~OSD()
{
- if (threadpool) { delete threadpool; threadpool = 0; }
- if (osdmap) { delete osdmap; osdmap = 0; }
- //if (monitor) { delete monitor; monitor = 0; }
- if (messenger) { delete messenger; messenger = 0; }
- if (logger) { delete logger; logger = 0; }
- if (store) { delete store; store = 0; }
+ delete threadpool;
+ delete osdmap;
+ delete logger;
+ delete store;
+ if (messenger)
+ messenger->destroy();
}
bool got_sigterm = false;