}
};
+
+class C_clean_handler : public EventCallback {
+ AsyncConnectionRef conn;
+ public:
+ C_clean_handler(AsyncConnectionRef c): conn(c) {}
+ void do_request(int id) {
+ conn->cleanup_handler();
+ }
+};
+
static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
{
// create a buffer to read into that matches the data alignment
assert(sent.empty());
assert(!authorizer);
if (recv_buf)
- delete recv_buf;
+ delete[] recv_buf;
if (state_buffer)
- delete state_buffer;
+ delete[] state_buffer;
}
/* return -1 means `fd` occurs error or closed, it should be closed
} else {
if (session_security->check_message_signature(message)) {
ldout(async_msgr->cct, 0) << __func__ << "Signature check failed" << dendl;
+ message->put();
goto fail;
}
}
if (existing->policy.lossy) {
// disconnect from the Connection
- center->dispatch_event_external(EventCallbackRef(new C_handle_reset(async_msgr, existing)));
+ existing->center->dispatch_event_external(existing->reset_handler);
existing->_stop();
} else {
// queue a reset on the new connection, which we're dumping for the old
// we want to handle fault within internal thread
center->dispatch_event_external(write_handler);
}
+ } else if (state == STATE_CLOSED) {
+ ldout(async_msgr->cct, 10) << __func__ << " connection closed."
+ << " Drop message " << m << dendl;
+ } else if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection
+ ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
+ local_messages.push_back(m);
+ center->dispatch_event_external(local_deliver_handler);
} else {
out_q[m->get_priority()].push_back(m);
- if ((state == STATE_STANDBY || state == STATE_CLOSED) && !policy.server) {
+ if (state == STATE_STANDBY && !policy.server) {
ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
<< " policy.server is false" << dendl;
_connect();
- } else if (async_msgr->get_myaddr() == get_peer_addr()) { //loopback connection
- ldout(async_msgr->cct, 20) << __func__ << " " << *m << " local" << dendl;
- local_messages.push_back(m);
- center->dispatch_event_external(local_deliver_handler);
} else if (sd > 0 && !open_write) {
center->dispatch_event_external(write_handler);
}
return ;
}
- if (policy.lossy && state != STATE_CONNECTING) {
+ if (policy.lossy && !(state >= STATE_CONNECTING && state < STATE_CONNECTING_READY)) {
ldout(async_msgr->cct, 10) << __func__ << " on lossy channel, failing" << dendl;
center->dispatch_event_external(reset_handler);
_stop();
once_session_reset = true;
}
-// *note: `async` is true only happen when replacing connection process
void AsyncConnection::_stop()
{
assert(lock.is_locked());
for (set<uint64_t>::iterator it = register_time_events.begin();
it != register_time_events.end(); ++it)
center->delete_time_event(*it);
+ // Make sure in-queue events will been processed
+ center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));
}
int AsyncConnection::_send(Message *m)
{
ldout(async_msgr->cct, 10) << __func__ << " started." << dendl;
Mutex::Locker l(lock);
- keepalive = true;
- center->dispatch_event_external(write_handler);
+ if (state != STATE_CLOSED) {
+ keepalive = true;
+ center->dispatch_event_external(write_handler);
+ }
}
void AsyncConnection::mark_down()
void wakeup_from(uint64_t id);
void local_deliver();
void stop() {
- mark_down();
center->dispatch_event_external(reset_handler);
+ mark_down();
+ }
+ void cleanup_handler() {
+ read_handler.reset();
+ write_handler.reset();
+ reset_handler.reset();
+ remote_reset_handler.reset();
+ connect_handler.reset();
+ accept_handler.reset();
+ local_deliver_handler.reset();
}
}; /* AsyncConnection */
AsyncMessenger::~AsyncMessenger()
{
assert(!did_bind); // either we didn't bind or we shut down the Processor
+ local_connection->mark_down();
}
void AsyncMessenger::ready()
set<AsyncConnectionRef>::iterator it = deleted_conns.begin();
AsyncConnectionRef p = *it;
ldout(cct, 5) << __func__ << " delete " << p << dendl;
- p->put();
deleted_conns.erase(it);
}
}
Mutex::Locker l(deleted_lock);
if (deleted_conns.count(p->second)) {
deleted_conns.erase(p->second);
- p->second->put();
conns.erase(p);
return NULL;
}
Mutex::Locker l(deleted_lock);
if (deleted_conns.count(existing)) {
deleted_conns.erase(existing);
- existing->put();
} else if (conn != existing) {
return -1;
}
void EventCenter::delete_file_event(int fd, int mask)
{
Mutex::Locker l(file_lock);
+ if (fd > nevent) {
+ ldout(cct, 1) << __func__ << " delete event fd=" << fd << " exceed nevent=" << nevent
+ << "mask=" << mask << dendl;
+ return ;
+ }
EventCenter::FileEvent *event = _get_file_event(fd);
ldout(cct, 20) << __func__ << " delete event started fd=" << fd << " mask=" << mask
<< " original mask is " << event->mask << dendl;
EventCallbackRef e = external_events.front();
external_events.pop_front();
external_lock.Unlock();
- e->do_request(0);
+ if (e)
+ e->do_request(0);
external_lock.Lock();
}
external_lock.Unlock();
int process_time_events();
FileEvent *_get_file_event(int fd) {
+ assert(fd < nevent);
FileEvent *p = &file_events[fd];
if (!p->mask)
new(p) FileEvent();
#if GTEST_HAS_PARAM_TEST
#define CHECK_AND_WAIT_TRUE(expr) do { \
- int n = 50; \
+ int n = 1000; \
while (--n) { \
if (expr) \
break; \
- usleep(100); \
+ usleep(1000); \
} \
} while(0);
}
got_new = true;
cond.Signal();
+ m->put();
return true;
}
bool ms_handle_reset(Connection *con) {
}
got_new = true;
cond.Signal();
+ m->put();
}
bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
// 2. test for client lossy
server_conn->mark_down();
ASSERT_FALSE(server_conn->is_connected());
+ conn->send_keepalive();
CHECK_AND_WAIT_TRUE(!conn->is_connected());
ASSERT_FALSE(conn->is_connected());
conn = client_msgr->get_connection(server_msgr->get_myinst());
ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
ConnectionRef server_conn = server_msgr->get_connection(client_msgr->get_myinst());
ASSERT_FALSE(cli_dispatcher.got_remote_reset);
+ cli_dispatcher.got_connect = false;
server_conn->mark_down();
ASSERT_FALSE(server_conn->is_connected());
// client should be standby
usleep(300*1000);
// client should be standby, so we use original connection
{
- m = new MPing();
conn->send_keepalive();
+ {
+ Mutex::Locker l(cli_dispatcher.lock);
+ while (!cli_dispatcher.got_remote_reset)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_remote_reset = false;
+ while (!cli_dispatcher.got_connect)
+ cli_dispatcher.cond.Wait(cli_dispatcher.lock);
+ cli_dispatcher.got_connect = false;
+ }
CHECK_AND_WAIT_TRUE(conn->is_connected());
ASSERT_TRUE(conn->is_connected());
+ m = new MPing();
ASSERT_EQ(conn->send_message(m), 0);
Mutex::Locker l(cli_dispatcher.lock);
while (!cli_dispatcher.got_new)
cli_dispatcher.cond.Wait(cli_dispatcher.lock);
cli_dispatcher.got_new = false;
}
- // resetcheck for client, so it discard state previously
- ASSERT_TRUE(cli_dispatcher.got_remote_reset);
- cli_dispatcher.got_remote_reset = false;
ASSERT_TRUE(static_cast<Session*>(conn->get_priv())->get_count() == 1);
server_conn = server_msgr->get_connection(client_msgr->get_myinst());
ASSERT_TRUE(static_cast<Session*>(server_conn->get_priv())->get_count() == 1);
}
got_new = true;
cond.Signal();
+ m->put();
}
bool ms_verify_authorizer(Connection *con, int peer_type, int protocol,
boost::uniform_int<> u(32, max_message_len);
uint64_t value_len = u(rng);
bufferptr bp(value_len);
- for (uint64_t j = 0; j < value_len; ) {
+ bp.zero();
+ for (uint64_t j = 0; j < value_len-sizeof(i); ) {
memcpy(bp.c_str()+j, &i, sizeof(i));
j += 4096;
}
it != available_servers.end(); ++it) {
(*it)->shutdown();
(*it)->wait();
+ delete (*it);
}
+ available_servers.clear();
+
for (set<Messenger*>::iterator it = available_clients.begin();
it != available_clients.end(); ++it) {
(*it)->shutdown();
(*it)->wait();
+ delete (*it);
}
+ available_clients.clear();
}
};
class MarkdownDispatcher : public Dispatcher {
Mutex lock;
- set<Connection*> conns;
+ set<ConnectionRef> conns;
bool last_mark;
public:
atomic_t count;
cerr << __func__ << " conn: " << m->get_connection() << std::endl;
Mutex::Locker l(lock);
count.inc();
- conns.insert(m->get_connection().get());
+ conns.insert(m->get_connection());
if (conns.size() < 2 && !last_mark)
return true;
last_mark = true;
usleep(rand() % 500);
- for (set<Connection*>::iterator it = conns.begin(); it != conns.end(); ++it) {
+ for (set<ConnectionRef>::iterator it = conns.begin(); it != conns.end(); ++it) {
if ((*it) != m->get_connection().get()) {
(*it)->mark_down();
conns.erase(it);
}
if (conns.empty())
last_mark = false;
+ m->put();
return true;
}
bool ms_handle_reset(Connection *con) {