}
class C_handle_read : public EventCallback {
- AsyncConnection *conn;
+ AsyncConnectionRef conn;
public:
- C_handle_read(AsyncConnection *c): conn(c) {
- conn->get();
- }
+ C_handle_read(AsyncConnectionRef c): conn(c) {}
void do_request(int fd) {
conn->process();
- conn->put();
}
};
class C_handle_write : public EventCallback {
- AsyncConnection *conn;
+ AsyncConnectionRef conn;
public:
- C_handle_write(AsyncConnection *c): conn(c) {
- conn->get();
- }
+ C_handle_write(AsyncConnectionRef c): conn(c) {}
void do_request(int fd) {
conn->handle_write();
- conn->put();
}
};
class C_handle_reset : public EventCallback {
AsyncMessenger *msgr;
- AsyncConnection *conn;
+ AsyncConnectionRef conn;
public:
- C_handle_reset(AsyncMessenger *m, AsyncConnection *c): msgr(m), conn(c) {
- conn->get();
- }
+ C_handle_reset(AsyncMessenger *m, AsyncConnectionRef c): msgr(m), conn(c) {}
void do_request(int id) {
- msgr->ms_deliver_handle_reset(conn);
- conn->put();
+ msgr->ms_deliver_handle_reset(conn.get());
}
};
session_security.reset();
}
- get();
async_msgr->ms_deliver_handle_connect(this);
- get();
async_msgr->ms_deliver_handle_fast_connect(this);
// reset connect state variables
}
bool authorizer_valid;
- get();
if (!async_msgr->verify_authorizer(this, peer_type, connect.authorizer_protocol, authorizer_bl,
authorizer_reply, authorizer_valid, session_key) || !authorizer_valid) {
ldout(async_msgr->cct,0) << __func__ << ": got bad authorizer" << dendl;
ldout(async_msgr->cct, 10) << __func__ << " accept: setting up session_security." << dendl;
// existing?
- AsyncConnection *existing = async_msgr->lookup_conn(peer_addr);
+ AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addr);
if (existing) {
if (connect.global_seq < existing->peer_global_seq) {
ldout(async_msgr->cct, 10) << __func__ << " accept existing " << existing
}
if (existing->policy.lossy) {
// disconnect from the Connection
- existing->get();
async_msgr->ms_deliver_handle_reset(existing);
} else {
// queue a reset on the new connection, which we're dumping for the old
- get();
async_msgr->ms_deliver_handle_reset(this);
// reset the in_seq if this is a hard reset from peer,
session_key, get_features()));
// notify
- get();
async_msgr->ms_deliver_handle_accept(this);
- get();
async_msgr->ms_deliver_handle_fast_accept(this);
// ok!
discard_out_queue();
outcoming_bl.clear();
- get();
async_msgr->ms_deliver_handle_remote_reset(this);
if (randomize_out_seq()) {
}
// associate message with Connection (for benefit of encode_payload)
- get();
m->set_connection(this);
uint64_t features = get_features();
ceph::shared_ptr<AuthSessionHandler> session_security;
}; /* AsyncConnection */
+typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;
+
#endif
ldout(cct, 10) << __func__ << ": closing pipes" << dendl;
while (!conns.empty()) {
- AsyncConnection *p = conns.begin()->second;
+ AsyncConnectionRef p = conns.begin()->second;
_stop_conn(p);
}
}
started = false;
}
-AsyncConnection *AsyncMessenger::add_accept(int sd)
+AsyncConnectionRef AsyncMessenger::add_accept(int sd)
{
lock.Lock();
- AsyncConnection *conn = new AsyncConnection(cct, this);
+ AsyncConnectionRef conn = new AsyncConnection(cct, this);
conn->accept(sd);
accepting_conns.insert(conn);
lock.Unlock();
return conn;
}
-AsyncConnection *AsyncMessenger::create_connect(const entity_addr_t& addr, int type)
+AsyncConnectionRef AsyncMessenger::create_connect(const entity_addr_t& addr, int type)
{
assert(lock.is_locked());
assert(addr != my_inst.addr);
<< ", creating connection and registering" << dendl;
// create connection
- AsyncConnection *conn = new AsyncConnection(cct, this);
+ AsyncConnectionRef conn = new AsyncConnection(cct, this);
conn->connect(addr, type);
assert(!conns.count(addr));
conns[addr] = conn;
return local_connection;
}
- AsyncConnection *conn = _lookup_conn(dest.addr);
+ AsyncConnectionRef conn = _lookup_conn(dest.addr);
if (conn) {
ldout(cct, 10) << __func__ << " " << dest << " existing " << conn << dendl;
} else {
return -EINVAL;
}
- AsyncConnection *conn = _lookup_conn(dest.addr);
+ AsyncConnectionRef conn = _lookup_conn(dest.addr);
submit_message(m, conn, dest.addr, dest.name.type());
return 0;
}
-void AsyncMessenger::submit_message(Message *m, AsyncConnection *con,
+void AsyncMessenger::submit_message(Message *m, AsyncConnectionRef con,
const entity_addr_t& dest_addr, int dest_type)
{
if (cct->_conf->ms_dump_on_send) {
{
ldout(cct,1) << __func__ << " " << dendl;
lock.Lock();
- for (set<AsyncConnection*>::iterator q = accepting_conns.begin();
+ for (set<AsyncConnectionRef>::iterator q = accepting_conns.begin();
q != accepting_conns.end(); ++q) {
- AsyncConnection *p = *q;
+ AsyncConnectionRef p = *q;
ldout(cct, 5) << __func__ << " accepting_conn " << p << dendl;
p->mark_down();
p->get();
- ms_deliver_handle_reset(p);
+ ms_deliver_handle_reset(p.get());
}
accepting_conns.clear();
while (!conns.empty()) {
- ceph::unordered_map<entity_addr_t, AsyncConnection*>::iterator it = conns.begin();
- AsyncConnection *p = it->second;
+ ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator it = conns.begin();
+ AsyncConnectionRef p = it->second;
ldout(cct, 5) << __func__ << " " << it->first << " " << p << dendl;
conns.erase(it);
p->mark_down();
p->get();
- ms_deliver_handle_reset(p);
+ ms_deliver_handle_reset(p.get());
}
lock.Unlock();
}
void AsyncMessenger::mark_down(const entity_addr_t& addr)
{
lock.Lock();
- AsyncConnection *p = _lookup_conn(addr);
+ AsyncConnectionRef p = _lookup_conn(addr);
if (p) {
ldout(cct, 1) << __func__ << " " << addr << " -- " << p << dendl;
_stop_conn(p);
p->get();
- ms_deliver_handle_reset(p);
+ ms_deliver_handle_reset(p.get());
} else {
ldout(cct, 1) << __func__ << " " << addr << " -- pipe dne" << dendl;
}
* @return a pointer to the newly-created connection. Caller does not own a
* reference; take one if you need it.
*/
- AsyncConnection *create_connect(const entity_addr_t& addr, int type);
+ AsyncConnectionRef create_connect(const entity_addr_t& addr, int type);
/**
* Queue up a Message for delivery to the entity specified
* @param dest_type The peer type of the address we're sending to
* just drop silently under failure.
*/
- void submit_message(Message *m, AsyncConnection *con,
+ void submit_message(Message *m, AsyncConnectionRef con,
const entity_addr_t& dest_addr, int dest_type);
int _send_message(Message *m, const entity_inst_t& dest);
* NOTE: a Asyncconnection* with state CLOSED may still be in the map but is considered
* invalid and can be replaced by anyone holding the msgr lock
*/
- ceph::unordered_map<entity_addr_t, AsyncConnection*> conns;
+ ceph::unordered_map<entity_addr_t, AsyncConnectionRef> conns;
/**
* list of connection are in teh process of accepting
*
* These are not yet in the conns map.
*/
- set<AsyncConnection*> accepting_conns;
+ set<AsyncConnectionRef> accepting_conns;
/// internal cluster protocol version, if any, for talking to entities of the same type.
int cluster_protocol;
- AsyncConnection *_lookup_conn(const entity_addr_t& k) {
+ AsyncConnectionRef _lookup_conn(const entity_addr_t& k) {
assert(lock.is_locked());
- ceph::unordered_map<entity_addr_t, AsyncConnection*>::iterator p = conns.find(k);
+ ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator p = conns.find(k);
if (p == conns.end())
return NULL;
if (!p->second->is_connected()) {
return p->second;
}
- void _stop_conn(AsyncConnection *c) {
+ void _stop_conn(AsyncConnectionRef c) {
assert(lock.is_locked());
if (c) {
c->mark_down();
/**
* This wraps _lookup_conn.
*/
- AsyncConnection *lookup_conn(const entity_addr_t& k) {
+ AsyncConnectionRef lookup_conn(const entity_addr_t& k) {
Mutex::Locker l(lock);
return _lookup_conn(k);
}
- void accept_conn(AsyncConnection *conn) {
+ void accept_conn(AsyncConnectionRef conn) {
Mutex::Locker l(lock);
if (conns.count(conn->get_peer_addr()))
delete conns[conn->get_peer_addr()];
}
void learned_addr(const entity_addr_t &peer_addr_for_me);
- AsyncConnection *add_accept(int sd);
+ AsyncConnectionRef add_accept(int sd);
/**
* This wraps ms_deliver_get_authorizer. We use it for AsyncConnection.