This lets the error handlers get at session state (if any).
return true;
}
- void ms_handle_failure(Message *m, const entity_addr_t& addr) {}
+ void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& addr) {}
- bool ms_handle_reset(const entity_addr_t& peer) {
+ bool ms_handle_reset(Connection *con, const entity_addr_t& peer) {
lock.Lock();
if (observe)
send_observe_requests();
return true;
}
- void ms_handle_remote_reset(const entity_addr_t& peer) {}
+ void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {}
} dispatcher;
// ===============================
-void Client::ms_handle_failure(Message *m, const entity_addr_t& addr)
+void Client::ms_handle_failure(Connection *con, Message *m, const entity_addr_t& addr)
{
dout(0) << "ms_handle_failure " << *m << " to " << addr << dendl;
}
-bool Client::ms_handle_reset(const entity_addr_t& addr)
+bool Client::ms_handle_reset(Connection *con, const entity_addr_t& addr)
{
dout(0) << "ms_handle_reset on " << addr << dendl;
return false;
}
-void Client::ms_handle_remote_reset(const entity_addr_t& addr)
+void Client::ms_handle_remote_reset(Connection *con, const entity_addr_t& addr)
{
dout(0) << "ms_handle_remote_reset on " << addr << dendl;
#if 0
friend class SyntheticClient;
bool ms_dispatch(Message *m);
- bool ms_handle_reset(const entity_addr_t& peer);
- void ms_handle_failure(Message *m, const entity_addr_t& peer);
- void ms_handle_remote_reset(const entity_addr_t& peer);
+ bool ms_handle_reset(Connection *con, const entity_addr_t& peer);
+ void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer);
+ void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer);
public:
bool is_synchronous;
void _send_log();
- bool ms_handle_reset(const entity_addr_t& peer) { return false; }
- void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
- void ms_handle_remote_reset(const entity_addr_t& peer) {}
+ bool ms_handle_reset(Connection *con, const entity_addr_t& peer) { return false; }
+ void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { }
+ void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {}
public:
}
return true;
}
- bool ms_handle_reset(const entity_addr_t& peer) { return false; }
- void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
- void ms_handle_remote_reset(const entity_addr_t& peer) {}
+ bool ms_handle_reset(Connection *con, const entity_addr_t& peer) { return false; }
+ void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { }
+ void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {}
} dispatcher;
bool _dispatch(Message *m);
bool ms_dispatch(Message *m);
- bool ms_handle_reset(const entity_addr_t& peer) { return false; }
- void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
- void ms_handle_remote_reset(const entity_addr_t& peer) {}
+ bool ms_handle_reset(Connection *con, const entity_addr_t& peer) { return false; }
+ void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { }
+ void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {}
Objecter *objecter;
-void MDS::ms_handle_failure(Message *m, const entity_addr_t& addr)
+void MDS::ms_handle_failure(Connection *con, Message *m, const entity_addr_t& addr)
{
mds_lock.Lock();
dout(0) << "ms_handle_failure to " << addr << " on " << *m << dendl;
mds_lock.Unlock();
}
-bool MDS::ms_handle_reset(const entity_addr_t& addr)
+bool MDS::ms_handle_reset(Connection *con, const entity_addr_t& addr)
{
dout(0) << "ms_handle_reset on " << addr << dendl;
return false;
}
-void MDS::ms_handle_remote_reset(const entity_addr_t& addr)
+void MDS::ms_handle_remote_reset(Connection *con, const entity_addr_t& addr)
{
dout(0) << "ms_handle_remote_reset on " << addr << dendl;
objecter->ms_handle_remote_reset(addr);
// messages
bool _dispatch(Message *m);
- bool ms_handle_reset(const entity_addr_t& peer);
- void ms_handle_failure(Message *m, const entity_addr_t& peer);
- void ms_handle_remote_reset(const entity_addr_t& peer);
+ bool ms_handle_reset(Connection *con, const entity_addr_t& peer);
+ void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer);
+ void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer);
// special message types
void handle_mds_map(class MMDSMap *m);
}
}
-bool MonClient::ms_handle_reset(const entity_addr_t& peer)
+bool MonClient::ms_handle_reset(Connection *con, const entity_addr_t& peer)
{
dout(10) << "ms_handle_reset " << peer << dendl;
if (hunting)
SafeTimer timer;
bool ms_dispatch(Message *m);
- bool ms_handle_reset(const entity_addr_t& peer);
+ bool ms_handle_reset(Connection *con, const entity_addr_t& peer);
- void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
- void ms_handle_remote_reset(const entity_addr_t& peer) {}
+ void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { }
+ void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {}
void handle_monmap(MMonMap *m);
private:
bool ms_dispatch(Message *m);
- bool ms_handle_reset(const entity_addr_t& peer) { return false; }
- void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
- void ms_handle_remote_reset(const entity_addr_t& peer) {}
+ bool ms_handle_reset(Connection *con, const entity_addr_t& peer);
+ void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { }
+ void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {}
public:
Monitor(int w, MonitorStore *s, Messenger *m, MonMap *map);
virtual bool ms_dispatch(Message *m) = 0;
// how i deal with transmission failures.
- virtual void ms_handle_failure(Message *m, const entity_addr_t& addr) = 0;
+ virtual void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& addr) = 0;
/*
* on any connection reset.
* this indicates that the ordered+reliable delivery semantics have
* been violated. messages may have been lost.
*/
- virtual bool ms_handle_reset(const entity_addr_t& peer) = 0;
+ virtual bool ms_handle_reset(Connection *con, const entity_addr_t& peer) = 0;
// on deliberate reset of connection by remote
// implies incoming messages dropped; possibly/probably some of our previous outgoing too.
- virtual void ms_handle_remote_reset(const entity_addr_t& peer) = 0;
+ virtual void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) = 0;
};
#endif
virtual ~RefCountedObject() {}
RefCountedObject *get() {
+ //generic_dout(0) << "RefCountedObject::get " << this << " " << nref.test() << " -> " << (nref.test() + 1) << dendl;
nref.inc();
return this;
}
void put() {
+ //generic_dout(0) << "RefCountedObject::put " << this << " " << nref.test() << " -> " << (nref.test() - 1) << dendl;
if (nref.dec() == 0)
delete this;
}
public:
Connection() : nref(1), lock("Connection::lock"), priv(NULL) {}
~Connection() {
- if (priv)
+ //generic_dout(0) << "~Connection " << this << dendl;
+ if (priv) {
+ //generic_dout(0) << "~Connection " << this << " dropping priv " << priv << dendl;
priv->put();
+ }
}
Connection *get() {
Mutex::Locker l(lock);
if (priv)
priv->put();
- priv = o->get();
+ priv = o;
}
RefCountedObject *get_priv() {
Mutex::Locker l(lock);
class Messenger {
- private:
+private:
list<Dispatcher*> dispatchers;
protected:
<< dendl;
assert(0);
}
- void ms_deliver_handle_reset(const entity_addr_t& peer) {
+ void ms_deliver_handle_reset(Connection *con, const entity_addr_t& peer) {
for (list<Dispatcher*>::iterator p = dispatchers.begin();
p != dispatchers.end();
p++)
- if ((*p)->ms_handle_reset(peer))
+ if ((*p)->ms_handle_reset(con, peer))
return;
}
- void ms_deliver_handle_remote_reset(const entity_addr_t& peer) {
+ void ms_deliver_handle_remote_reset(Connection *con, const entity_addr_t& peer) {
for (list<Dispatcher*>::iterator p = dispatchers.begin();
p != dispatchers.end();
p++)
- (*p)->ms_handle_remote_reset(peer);
+ (*p)->ms_handle_remote_reset(con, peer);
}
- void ms_deliver_handle_failure(Message *m, const entity_addr_t& peer) {
+ void ms_deliver_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) {
for (list<Dispatcher*>::iterator p = dispatchers.begin();
p != dispatchers.end();
p++)
- (*p)->ms_handle_failure(m, peer);
+ (*p)->ms_handle_failure(con, m, peer);
}
// shutdown
ls.pop_front();
if ((long)m == BAD_REMOTE_RESET) {
lock.Lock();
- entity_addr_t a = remote_reset_q.front();
+ Connection *con = remote_reset_q.front().first;
+ entity_addr_t a = remote_reset_q.front().second;
remote_reset_q.pop_front();
lock.Unlock();
- ms_deliver_handle_remote_reset(a);
+ ms_deliver_handle_remote_reset(con, a);
+ con->put();
} else if ((long)m == BAD_RESET) {
lock.Lock();
- entity_addr_t a = reset_q.front();
+ Connection *con = reset_q.front().first;
+ entity_addr_t a = reset_q.front().second;
reset_q.pop_front();
lock.Unlock();
- ms_deliver_handle_reset(a);
+ ms_deliver_handle_reset(con, a);
+ con->put();
} else if ((long)m == BAD_FAILED) {
lock.Lock();
- m = failed_q.front().first;
- entity_addr_t a = failed_q.front().second;
+ Connection *con = failed_q.front().con;
+ m = failed_q.front().msg;
+ entity_addr_t a = failed_q.front().addr;
failed_q.pop_front();
lock.Unlock();
- ms_deliver_handle_failure(m, a);
+ ms_deliver_handle_failure(con, m, a);
+ con->put();
m->put();
} else {
dout(1) << "<== " << m->get_source_inst()
for (unsigned i=0; i<rank->local.size(); i++)
if (rank->local[i])
- rank->local[i]->queue_reset(peer_addr);
+ rank->local[i]->queue_reset(connection_state->get(), peer_addr);
// unregister
lock.Unlock();
report_failures();
for (unsigned i=0; i<rank->local.size(); i++)
if (rank->local[i])
- rank->local[i]->queue_remote_reset(peer_addr);
+ rank->local[i]->queue_remote_reset(connection_state->get(), peer_addr);
out_seq = 0;
in_seq = 0;
dout(1) << "fail on " << *m << ", dispatcher stopping, ignoring." << dendl;
} else {
dout(10) << "fail on " << *m << dendl;
- rank->local[srcrank]->queue_failure(m, peer_addr);
+ rank->local[srcrank]->queue_failure(connection_state->get(), m, peer_addr);
}
}
m->put();
}
enum { BAD_REMOTE_RESET, BAD_RESET, BAD_FAILED };
- list<entity_addr_t> remote_reset_q;
- list<entity_addr_t> reset_q;
- list<pair<Message*,entity_addr_t> > failed_q;
+ list<pair<Connection*,entity_addr_t> > remote_reset_q;
+ list<pair<Connection*,entity_addr_t> > reset_q;
+ struct fail_item {
+ Connection *con;
+ Message *msg;
+ entity_addr_t addr;
+ fail_item(Connection *c, Message *m, entity_addr_t a) : con(c), msg(m), addr(a) {}
+ };
+ list<fail_item> failed_q;
- void queue_remote_reset(entity_addr_t a) {
+ void queue_remote_reset(Connection *con, entity_addr_t a) {
lock.Lock();
- remote_reset_q.push_back(a);
+ remote_reset_q.push_back(pair<Connection*,entity_addr_t>(con, a));
dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_REMOTE_RESET);
cond.Signal();
lock.Unlock();
}
- void queue_reset(entity_addr_t a) {
+ void queue_reset(Connection *con, entity_addr_t a) {
lock.Lock();
- reset_q.push_back(a);
+ reset_q.push_back(pair<Connection*,entity_addr_t>(con, a));
dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_RESET);
cond.Signal();
lock.Unlock();
}
- void queue_failure(Message *m, entity_addr_t a) {
+ void queue_failure(Connection *con, Message *m, entity_addr_t a) {
lock.Lock();
m->get();
- failed_q.push_back(pair<Message*,entity_addr_t>(m, a));
+ failed_q.push_back(fail_item(con, m, a));
dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_FAILED);
cond.Signal();
lock.Unlock();
bool ms_dispatch(Message *m) {
return osd->heartbeat_dispatch(m);
};
- bool ms_handle_reset(const entity_addr_t& peer) { return false; }
- void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
- void ms_handle_remote_reset(const entity_addr_t& peer) {}
+ bool ms_handle_reset(Connection *con, const entity_addr_t& peer) { return false; }
+ void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { }
+ void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {}
public:
OSD *osd;
HeartbeatDispatcher(OSD *o) : osd(o) {}
private:
bool ms_dispatch(Message *m);
- bool ms_handle_reset(const entity_addr_t& peer) { return false; }
- void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
- void ms_handle_remote_reset(const entity_addr_t& peer) {}
+ bool ms_handle_reset(Connection *con, const entity_addr_t& peer) { return false; }
+ void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { }
+ void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {}
public:
OSD(int id, Messenger *m, Messenger *hbm, MonClient *mc, const char *dev = 0, const char *jdev = 0);
return true;
}
- bool ms_handle_reset(const entity_addr_t& peer) { return false; }
- void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
- void ms_handle_remote_reset(const entity_addr_t& peer) {}
+ bool ms_handle_reset(Connection *con, const entity_addr_t& peer) { return false; }
+ void ms_handle_failure(Connection *con, Message *m, const entity_addr_t& peer) { }
+ void ms_handle_remote_reset(Connection *con, const entity_addr_t& peer) {}
} dispatcher;