}
};
+class C_handle_dispatch : public EventCallback {
+ AsyncMessenger *msgr;
+ Message *m;
+
+ public:
+ C_handle_dispatch(AsyncMessenger *msgr, Message *m): msgr(msgr), m(m) {}
+ void do_request(int id) {
+ msgr->ms_deliver_dispatch(m);
+ }
+};
+
static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off)
{
// create a buffer to read into that matches the data alignment
AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m)
: Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), out_seq(0), in_seq(0), in_seq_acked(0),
state(STATE_NONE), state_after_send(0), sd(-1),
- lock("AsyncConnection::lock"),
- got_bad_auth(false), authorizer(NULL), state_offset(0), net(cct), center(&m->center) { }
+ lock("AsyncConnection::lock"), open_write(false),
+ got_bad_auth(false), authorizer(NULL),
+ state_buffer(4096), state_offset(0), net(cct), center(&m->center) { }
AsyncConnection::~AsyncConnection()
{
if (errno == EAGAIN || errno == EINTR) {
nread = 0;
} else {
- ldout(async_msgr->cct, 1) << __func__ << " Reading from fd %d " << fd
+ ldout(async_msgr->cct, 1) << __func__ << " Reading from fd=" << fd
<< " : "<< strerror(errno) << dendl;
return -1;
}
} else if (nread == 0) {
- ldout(async_msgr->cct, 1) << __func__ << " Peer close file descriptor %d "
- << fd << dendl;
+ ldout(async_msgr->cct, 1) << __func__ << " Peer close file descriptor "
+ << fd << dendl;
return -1;
}
return nread;
struct iovec *msgvec = new iovec[size];
memset(&msg, 0, sizeof(msg));
msg.msg_iovlen = 0;
+ msg.msg_iov = msgvec;
int msglen = 0;
while (size > 0) {
msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str());
if (r < 0)
return r;
+ delete msgvec;
// "r" is the remaining length
sended += msglen - r;
if (r > 0) {
- center->create_file_event(sd, EVENT_WRITABLE, new C_handle_write(this));
ldout(async_msgr->cct, 5) << __func__ << " remaining " << r
<< " needed to be sent, creating event for writing"
<< dendl;
bl.swap(outcoming_bl);
}
- if (!outcoming_bl.length())
+ if (!open_write && is_queued()) {
+ center->create_file_event(sd, EVENT_WRITABLE, new C_handle_write(this));
+ open_write = true;
+ }
+
+ if (open_write && !is_queued()) {
center->delete_file_event(sd, EVENT_WRITABLE);
+ open_write = false;
+ }
return outcoming_bl.length();
}
void AsyncConnection::process()
{
int r = 0;
- int prev_state;
+ int prev_state = state;
Mutex::Locker l(lock);
do {
- prev_state = state;
ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state)
<< ", prev state is " << get_state_name(prev_state) << dendl;
+ prev_state = state;
switch (state) {
case STATE_OPEN:
{
}
// Reset state
+ data_buf.clear();
front.clear();
middle.clear();
data.clear();
- memset(&connect_msg, 0, sizeof(connect_msg));
- memset(&connect_reply, 0, sizeof(connect_reply));
recv_stamp = ceph_clock_now(async_msgr->cct);
current_header = header;
state = STATE_OPEN_MESSAGE_THROTTLE_MESSAGE;
<< policy.throttler_messages->get_current() << "/"
<< policy.throttler_messages->get_max() << dendl;
// FIXME: may block
- if (policy.throttler_messages->get())
- state = STATE_OPEN_MESSAGE_THROTTLE_BYTES;
+ policy.throttler_messages->get();
}
+ state = STATE_OPEN_MESSAGE_THROTTLE_BYTES;
break;
}
<< policy.throttler_bytes->get_current() << "/"
<< policy.throttler_bytes->get_max() << dendl;
// FIXME: may block
- if (policy.throttler_bytes->get(message_size))
- state = STATE_OPEN_MESSAGE_READ_FRONT;
+ policy.throttler_bytes->get(message_size);
}
}
+ state = STATE_OPEN_MESSAGE_READ_FRONT;
break;
}
// read data
uint64_t data_len = le32_to_cpu(current_header.data_len);
int data_off = le32_to_cpu(current_header.data_off);
- bufferlist bl;
if (data_len) {
// get a buffer
- lock.Lock();
map<ceph_tid_t,pair<bufferlist,int> >::iterator p = rx_buffers.find(current_header.tid);
if (p != rx_buffers.end()) {
ldout(async_msgr->cct,10) << __func__ << " seleting rx buffer v " << p->second.second
<< " at offset " << data_off
<< " len " << p->second.first.length() << dendl;
- bl = p->second.first;
+ data_buf = p->second.first;
// make sure it's big enough
- if (bl.length() < data_len)
- bl.push_back(buffer::create(data_len - bl.length()));
- data_blp = bl.begin();
+ if (data_buf.length() < data_len)
+ data_buf.push_back(buffer::create(data_len - data_buf.length()));
+ data_blp = data_buf.begin();
} else {
ldout(async_msgr->cct,20) << __func__ << " allocating new rx buffer at offset " << data_off << dendl;
- alloc_aligned_buffer(bl, data_len, data_off);
- data_blp = bl.begin();
+ alloc_aligned_buffer(data_buf, data_len, data_off);
+ data_blp = data_buf.begin();
}
- lock.Unlock();
}
msg_left = data_len;
case STATE_OPEN_MESSAGE_READ_DATA:
{
- do {
+ while (msg_left > 0) {
bufferptr bp = data_blp.get_current_ptr();
uint64_t read = MIN(bp.length(), msg_left);
r = read_until(read, bp);
data_blp.advance(read);
data.append(bp, 0, read);
- } while (msg_left > 0);
+ msg_left -= read;
+ }
if (msg_left == 0)
state = STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH;
//
ceph::shared_ptr<AuthSessionHandler> auth_handler = session_security;
- if (auth_handler) {
+ if (auth_handler == NULL) {
ldout(async_msgr->cct, 10) << __func__ << " No session security set" << dendl;
} else {
if (auth_handler->check_message_signature(message)) {
if (async_msgr->ms_can_fast_dispatch(message)) {
async_msgr->ms_fast_dispatch(message);
} else {
- async_msgr->ms_deliver_dispatch(message);
+ center->create_time_event(0, new C_handle_dispatch(async_msgr, message));
}
state = STATE_OPEN;
case STATE_CLOSED:
{
ldout(async_msgr->cct, 20) << __func__ << " socket closed" << dendl;
- center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
- break;
- }
-
- case STATE_FAULT:
- {
- ldout(async_msgr->cct, 20) << __func__ << " socket is in error" << dendl;
break;
}
{
if (_process_connection() < 0)
goto fail;
+ break;
}
}
+ continue;
+
fail:
// clean up state internal variables and states
if (state >= STATE_CONNECTING_SEND_CONNECT_MSG &&
}
}
fault();
- state = STATE_FAULT;
} while (prev_state != state);
}
goto fail;
}
+ ldout(async_msgr->cct, 10) << __func__ << " get banner, ready to send banner" << dendl;
+
bufferlist bl;
bl.append(state_buffer.c_str(), strlen(CEPH_BANNER));
r = _try_send(bl);
delete authorizer;
authorizer = async_msgr->get_authorizer(peer_type, false);
}
- assert(authorizer);
bufferlist bl;
- ceph_msg_connect connect;
- connect.features = policy.features_supported;
- connect.host_type = async_msgr->get_myinst().name.type();
- connect.global_seq = global_seq;
- connect.connect_seq = connect_seq;
- connect.protocol_version = async_msgr->get_proto_version(peer_type, true);
- connect.authorizer_protocol = authorizer ? authorizer->protocol : 0;
- connect.authorizer_len = authorizer ? authorizer->bl.length() : 0;
+ connect_msg.features = policy.features_supported;
+ connect_msg.host_type = async_msgr->get_myinst().name.type();
+ connect_msg.global_seq = global_seq;
+ connect_msg.connect_seq = connect_seq;
+ connect_msg.protocol_version = async_msgr->get_proto_version(peer_type, true);
+ connect_msg.authorizer_protocol = authorizer ? authorizer->protocol : 0;
+ connect_msg.authorizer_len = authorizer ? authorizer->bl.length() : 0;
if (authorizer)
- ldout(async_msgr->cct, 10) << __func__ << "connect.authorizer_len="
- << connect.authorizer_len << " protocol="
- << connect.authorizer_protocol << dendl;
- connect.flags = 0;
+ ldout(async_msgr->cct, 10) << __func__ << "connect_msg.authorizer_len="
+ << connect_msg.authorizer_len << " protocol="
+ << connect_msg.authorizer_protocol << dendl;
+ connect_msg.flags = 0;
if (policy.lossy)
- connect.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides!
- bl.append((char*)&connect, sizeof(connect));
+ connect_msg.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides!
+ bl.append((char*)&connect_msg, sizeof(connect_msg));
if (authorizer) {
bl.append(authorizer->bl.c_str(), authorizer->bl.length());
}
ldout(async_msgr->cct, 10) << __func__ << " connect sending gseq=" << global_seq << " cseq="
- << connect_seq << " proto=" << connect.protocol_version << dendl;
+ << connect_seq << " proto=" << connect_msg.protocol_version << dendl;
r = _try_send(bl);
if (r == 0) {
authorizer_reply.push_back(state_buffer);
bufferlist::iterator iter = authorizer_reply.begin();
- if (!authorizer->verify_reply(iter)) {
+ if (authorizer && !authorizer->verify_reply(iter)) {
ldout(async_msgr->cct, 0) << __func__ << " failed verifying authorize reply" << dendl;
goto fail;
}
got_bad_auth = false;
delete authorizer;
authorizer = NULL;
- return 0;
+ memset(&connect_msg, 0, sizeof(connect_msg));
+ memset(&connect_reply, 0, sizeof(connect_reply));
+ break;
}
case STATE_ACCEPTING:
{
ldout(async_msgr->cct, 20) << __func__ << " accept done" << dendl;
state = STATE_OPEN;
+ memset(&connect_msg, 0, sizeof(connect_msg));
break;
}
if (got_bad_auth)
goto fail;
got_bad_auth = true;
- assert(authorizer);
delete authorizer;
authorizer = async_msgr->get_authorizer(peer_type, true); // try harder
state = STATE_CONNECTING_SEND_CONNECT_MSG;
lock.Lock();
} else {
lock.Lock();
- existing->lock.Unlock();
+ existing->lock.Lock();
}
if (existing->policy.lossy) {
// disconnect from the Connection
- async_msgr->ms_deliver_handle_reset(existing);
+ async_msgr->ms_deliver_handle_reset(existing.get());
} else {
// queue a reset on the new connection, which we're dumping for the old
async_msgr->ms_deliver_handle_reset(this);
process();
}
+int AsyncConnection::send_message(Message *m)
+{
+ m->get_header().src = async_msgr->get_myname();
+ if (!m->get_priority())
+ m->set_priority(async_msgr->get_default_send_priority());
+
+ Mutex::Locker l(lock);
+ out_q[m->get_priority()].push_back(m);
+ if (sd > 0 && !open_write) {
+ center->create_file_event(sd, EVENT_WRITABLE, new C_handle_write(this));
+ open_write = true;
+ }
+ return 0;
+}
+
void AsyncConnection::requeue_sent()
{
if (sent.empty())
shutdown_socket();
center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
+ open_write = false;
// requeue sent items
requeue_sent();
shutdown_socket();
discard_out_queue();
outcoming_bl.clear();
+ center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
+ open_write = false;
state = STATE_CLOSED;
}
bl.append(CEPH_MSGR_TAG_KEEPALIVE);
}
+ ldout(async_msgr->cct, 10) << __func__ << " try send keepalive or ack" << dendl;
_try_send(bl, false);
}
ldout(async_msgr->cct, 10) << __func__ << " started." << dendl;
Mutex::Locker l(lock);
bufferlist bl;
- if (in_seq > in_seq_acked) {
- ceph_le64 s;
- s = in_seq;
- bl.append(CEPH_MSGR_TAG_ACK);
- bl.append((char*)&s, sizeof(s));
- }
+ int r;
+ if (state >= STATE_OPEN && state <= STATE_OPEN_TAG_CLOSE) {
+ if (in_seq > in_seq_acked) {
+ ceph_le64 s;
+ s = in_seq;
+ bl.append(CEPH_MSGR_TAG_ACK);
+ bl.append((char*)&s, sizeof(s));
+ ldout(async_msgr->cct, 10) << __func__ << " try send msg ack" << dendl;
+ }
- int r = _try_send(bl);
- if (r < 0)
- goto fail;
- else if (r > 0)
- return ;
+ r = _try_send(bl);
+ if (r < 0) {
+ ldout(async_msgr->cct, 1) << __func__ << " send msg ack failed :"
+ << strerror(errno) << dendl;
+ goto fail;
+ } else if (r > 0) {
+ return ;
+ }
- while (1) {
- Message *m = _get_next_outgoing();
- if (!m)
- break;
+ while (1) {
+ Message *m = _get_next_outgoing();
+ if (!m)
+ break;
- assert(m);
- r = _send(m);
- if (r < 0)
+ assert(m);
+ ldout(async_msgr->cct, 10) << __func__ << " try send msg " << m << dendl;
+ r = _send(m);
+ if (r < 0) {
+ ldout(async_msgr->cct, 1) << __func__ << " send msg failed :"
+ << strerror(errno) << dendl;
+ goto fail;
+ } else if (r > 0) {
+ break;
+ }
+ }
+ } else {
+ r = _try_send(bl);
+ if (r < 0) {
+ ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed :"
+ << strerror(errno) << dendl;
goto fail;
- else if (r > 0)
- break;
+ }
}
return ;
// start thread
create();
- center->create_file_event(listen_sd, EVENT_READABLE, new C_handle_accept(this));
+ if (listen_sd >= 0)
+ center->create_file_event(listen_sd, EVENT_READABLE, new C_handle_accept(this));
return 0;
}
while (!done) {
ldout(msgr->cct,20) << __func__ << " calling poll" << dendl;
- r = center->process_events(30000);
+ r = center->process_events(1000);
if (r < 0) {
ldout(msgr->cct,20) << __func__ << " process events failed: "
<< cpp_strerror(errno) << dendl;
done = true;
ldout(msgr->cct, 10) << __func__ << " processor" << dendl;
- center->delete_file_event(listen_sd, EVENT_READABLE);
+ if (listen_sd >= 0)
+ center->delete_file_event(listen_sd, EVENT_READABLE);
if (listen_sd >= 0) {
::shutdown(listen_sd, SHUT_RDWR);
}
lock("AsyncMessenger::lock"),
nonce(_nonce), did_bind(false),
global_seq(0),
- cluster_protocol(0),
+ cluster_protocol(0), stopped(true),
local_connection(new AsyncConnection(cct, this)),
center(cct)
{
ceph_spin_init(&global_seq_lock);
init_local_connection();
+ center.init(5000);
}
/**
ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
lock.Lock();
- if (did_bind)
- processor.start();
+ processor.start();
lock.Unlock();
}
int AsyncMessenger::shutdown()
{
ldout(cct,10) << __func__ << "shutdown " << get_myaddr() << dendl;
+ center.stop();
mark_down_all();
// break ref cycles on the loopback connection
local_connection->set_priv(NULL);
+ stop_cond.Signal();
+ stopped = true;
return 0;
}
{
ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl;
assert(did_bind);
+ center.stop();
processor.stop();
mark_down_all();
return processor.rebind(avoid_ports);
assert(!started);
started = true;
+ stopped = false;
if (!did_bind) {
my_inst.addr.nonce = nonce;
lock.Unlock();
- // FIXME
- center.init(5000);
+ center.start();
return 0;
}
lock.Unlock();
return;
}
+ if (!stopped)
+ stop_cond.Wait(lock);
lock.Unlock();
// done! clean up.
- if (did_bind) {
- ldout(cct,20) << __func__ << ": stopping processor thread" << dendl;
- processor.stop();
- did_bind = false;
- ldout(cct,20) << __func__ << ": stopped processor thread" << dendl;
- }
-
- center.stop();
+ ldout(cct,20) << __func__ << ": stopping processor thread" << dendl;
+ processor.stop();
+ did_bind = false;
+ ldout(cct,20) << __func__ << ": stopped processor thread" << dendl;
// close all pipes
lock.Lock();
int AsyncMessenger::_send_message(Message *m, const entity_inst_t& dest)
{
- // set envelope
- m->get_header().src = get_myname();
-
- if (!m->get_priority())
- m->set_priority(get_default_send_priority());
-
ldout(cct, 1) << __func__ << "--> " << dest.name << " "
<< dest.addr << " -- " << *m << " -- ?+"
<< m->get_data().length() << " " << m << dendl;