}
};
+class C_handle_stop : public EventCallback {
+ AsyncConnectionRef conn;
+
+ public:
+ C_handle_stop(AsyncConnectionRef c): conn(c) {}
+ void do_request(int id) {
+ conn->stop();
+ }
+};
+
static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
{
write_handler.reset(new C_handle_write(this));
reset_handler.reset(new C_handle_reset(async_msgr, this));
remote_reset_handler.reset(new C_handle_remote_reset(async_msgr, this));
+ stop_handler.reset(new C_handle_stop(this));
memset(msgvec, 0, sizeof(msgvec));
}
in_seq_acked = 0;
}
-void AsyncConnection::_stop(bool external)
+void AsyncConnection::_stop()
{
ldout(async_msgr->cct, 10) << __func__ << dendl;
center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
state = STATE_CLOSED;
::close(sd);
sd = -1;
- async_msgr->unregister_conn(peer_addr, external);
+ async_msgr->unregister_conn(peer_addr);
+ {
+ Mutex::Locker l(stop_lock);
+ stop_cond.Signal();
+ }
put();
}
int read_until(uint64_t needed, bufferptr &p);
int _process_connection();
void _connect();
- void _stop(bool external=false);
+ void _stop();
int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r);
int handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl);
void was_session_reset();
void send_keepalive();
// Don't call it from AsyncConnection
void mark_down() {
- Mutex::Locker l(lock);
- _stop(true);
+ Mutex::Locker l(stop_lock);
+ center.dispatch_event_external(stop_event);
+ mark_down_cond.Wait(stop_lock);
}
void mark_disposable() {
Mutex::Locker l(lock);
EventCallbackRef remote_reset_handler;
bool keepalive;
struct iovec msgvec[IOV_LEN];
+ Mutex stop_lock; // used to protect `mark_down_cond`
+ Cond stop_cond;
// Tis section are temp variables used by state transition
// used by eventcallback
void handle_write();
void process();
+ // Helper: only called by C_handle_stop
+ void stop() {
+ Mutex::Locker l(lock);
+ _stop();
+ }
}; /* AsyncConnection */
typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;
/**
* Unregister connection from `conns`
- * `external` is used to indicate whether need to lock AsyncMessenger::lock,
- * it may call. If external is false, it means that AsyncConnection take the
- * initiative to unregister
*/
- void unregister_conn(const entity_addr_t &addr, bool external) {
- if (!external)
- lock.Lock();
+ void unregister_conn(const entity_addr_t &addr) {
+ Mutex::Locker l(lock);
conns.erase(addr);
- if (!external)
- lock.Unlock();
}
/**
* @} // AsyncMessenger Internals