}
ldout(async_msgr->cct, 0) << __func__ << " accept connect_seq " << connect.connect_seq
- << " vs existing " << existing->connect_seq
- << " state " << existing->state << dendl;
+ << " vs existing csq=" << existing->connect_seq << " state="
+ << get_state_name(existing->state) << dendl;
if (connect.connect_seq == 0 && existing->connect_seq > 0) {
ldout(async_msgr->cct,0) << __func__ << " accept peer reset, then tried to connect to us, replacing" << dendl;
if (existing->replacing || existing->state == STATE_CLOSED) {
ldout(async_msgr->cct, 1) << __func__ << " existing racing replace or mark_down happened while replacing."
<< " state=" << get_state_name(existing->state) << dendl;
- reply.connect_seq = existing->connect_seq + 1;
+ reply.connect_seq = connect.connect_seq + 1;
r = _reply_accept(CEPH_MSGR_TAG_RETRY_SESSION, connect, reply, authorizer_reply);
existing->lock.Unlock();
if (r < 0)
connect_seq = connect.connect_seq + 1;
peer_global_seq = connect.global_seq;
ldout(async_msgr->cct, 10) << __func__ << " accept success, connect_seq = "
- << connect_seq << ", sending READY" << dendl;
+ << connect_seq << " in_seq=" << in_seq << ", sending READY" << dendl;
int next_state;
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
-#define dout_prefix *_dout << "Event "
+#define dout_prefix _event_prefix(_dout)
+ostream& EventCenter::_event_prefix(std::ostream *_dout)
+{
+ return *_dout << "Event(" << this << " owner=" << get_owner() << " nevent=" << nevent
+ << " time_id=" << time_event_next_id << ").";
+}
class C_handle_notify : public EventCallback {
public:
}
EventCenter::FileEvent *event = _get_file_event(fd);
+ ldout(cct, 20) << __func__ << " create event started fd=" << fd << " mask=" << mask
+ << " original mask is " << event->mask << dendl;
+ if (event->mask == mask)
+ return 0;
r = driver->add_event(fd, event->mask, mask);
if (r < 0)
if (mask & EVENT_WRITABLE) {
event->write_cb = ctxt;
}
- ldout(cct, 10) << __func__ << " create event fd=" << fd << " mask=" << mask
- << " now mask is " << event->mask << dendl;
+ ldout(cct, 10) << __func__ << " create event end fd=" << fd << " mask=" << mask
+ << " original mask is " << event->mask << dendl;
return 0;
}
{
Mutex::Locker l(file_lock);
EventCenter::FileEvent *event = _get_file_event(fd);
+ ldout(cct, 20) << __func__ << " delete event started fd=" << fd << " mask=" << mask
+ << " original mask is " << event->mask << dendl;
if (!event->mask)
return ;
}
event->mask = event->mask & (~mask);
- ldout(cct, 10) << __func__ << " delete fd=" << fd << " mask=" << mask
- << " now mask is " << event->mask << dendl;
+ ldout(cct, 10) << __func__ << " delete event end fd=" << fd << " mask=" << mask
+ << " original mask is " << event->mask << dendl;
}
uint64_t EventCenter::create_time_event(uint64_t microseconds, EventCallbackRef ctxt)
last_time = time(NULL);
}
~EventCenter();
+ ostream& _event_prefix(std::ostream *_dout);
+
int init(int nevent);
void set_owner(pthread_t p) { owner = p; }
pthread_t get_owner() { return owner; }
#define dout_subsys ceph_subsys_ms
#undef dout_prefix
-#define dout_prefix *_dout << "net_handler: "
+#define dout_prefix *_dout << "NetHandler "
namespace ceph{
* will be able to close/open sockets a zillion of times */
if (reuse_addr) {
if (::setsockopt(s, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
- lderr(cct) << __func__ << " setsockopt SO_REUSEADDR failed: %s"
+ lderr(cct) << __func__ << " setsockopt SO_REUSEADDR failed: "
<< strerror(errno) << dendl;
close(s);
return -errno;
* Note that fcntl(2) for F_GETFL and F_SETFL can't be
* interrupted by a signal. */
if ((flags = fcntl(sd, F_GETFL)) < 0 ) {
- lderr(cct) << __func__ << " fcntl(F_GETFL) failed: %s" << strerror(errno) << dendl;
+ lderr(cct) << __func__ << " fcntl(F_GETFL) failed: " << strerror(errno) << dendl;
return -errno;
}
if (fcntl(sd, F_SETFL, flags | O_NONBLOCK) < 0) {
- lderr(cct) << __func__ << " fcntl(F_SETFL,O_NONBLOCK): %s" << strerror(errno) << dendl;
+ lderr(cct) << __func__ << " fcntl(F_SETFL,O_NONBLOCK): " << strerror(errno) << dendl;
return -errno;
}
if (errno == EINPROGRESS && nonblock)
return s;
- lderr(cct) << __func__ << " connect: %s " << strerror(errno) << dendl;
+ lderr(cct) << __func__ << " connect: " << strerror(errno) << dendl;
close(s);
return -errno;
}
vector<bufferlist> rand_data;
public:
- static const unsigned max_in_flight = 512;
+ static const unsigned max_in_flight = 64;
static const unsigned max_connections = 128;
static const unsigned max_message_len = 1024 * 1024 * 4;
}
boost::uniform_int<> true_false(0, 99);
int val = true_false(rng);
- if (val > 85) {
+ if (val > 90) {
test_msg.generate_connection();
- } else if (val > 70) {
+ } else if (val > 80) {
test_msg.drop_connection();
} else if (val > 10) {
test_msg.send_message();
} else {
- usleep(rand() % 500 + 100);
+ usleep(rand() % 1000 + 500);
}
}
test_msg.wait_for_done();
TEST_P(MessengerTest, SyntheticInjectTest) {
- g_ceph_context->_conf->set_val("ms_inject_socket_failures", "10");
+ g_ceph_context->_conf->set_val("ms_inject_socket_failures", "30");
g_ceph_context->_conf->set_val("ms_inject_internal_delays", "0.1");
SyntheticWorkload test_msg(4, 16, GetParam(), 100);
for (int i = 0; i < 100; ++i) {
}
boost::uniform_int<> true_false(0, 99);
int val = true_false(rng);
- if (val > 85) {
+ if (val > 90) {
test_msg.generate_connection();
- } else if (val > 70) {
+ } else if (val > 80) {
test_msg.drop_connection();
} else if (val > 10) {
test_msg.send_message();
g_ceph_context->_conf->set_val("auth_client_required", "none");
g_ceph_context->_conf->set_val("enable_experimental_unrecoverable_data_corrupting_features", "ms-type-async");
g_ceph_context->_conf->set_val("ms_die_on_bad_msg", "true");
+ g_ceph_context->_conf->set_val("ms_max_backoff", "1");
common_init_finish(g_ceph_context);
::testing::InitGoogleTest(&argc, argv);