}
AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p)
- : Connection(cct, m), async_msgr(m), logger(p), global_seq(0), connect_seq(0), peer_global_seq(0),
- out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(0), sd(-1), port(-1),
- write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE),
+ : Connection(cct, m), delay_thread(NULL), async_msgr(m), logger(p), global_seq(0), connect_seq(0),
+ peer_global_seq(0), out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(0), sd(-1),
+ port(-1), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE),
open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL),
recv_max_prefetch(MIN(msgr->cct->_conf->ms_tcp_prefetch_max_size, TCP_PREFETCH_MIN_SIZE)),
recv_start(0), recv_end(0), got_bad_auth(false), authorizer(NULL), replacing(false),
delete[] recv_buf;
if (state_buffer)
delete[] state_buffer;
+ if (delay_thread)
+ delete delay_thread;
+}
+
+void AsyncConnection::maybe_start_delay_thread()
+{
+ if (!delay_thread &&
+ async_msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(peer_type)) != string::npos) {
+ lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Connection " << this << dendl;
+ delay_thread = new DelayedDelivery(this, async_msgr);
+ delay_thread->create("ms_async_delay");
+ }
}
/* return -1 means `fd` occurs error or closed, it should be closed
return len - state_offset;
}
+void AsyncConnection::inject_delay() {
+ if (async_msgr->cct->_conf->ms_inject_internal_delays) {
+ ldout(async_msgr->cct, 10) << __func__ << " sleep for " <<
+ async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
+ utime_t t;
+ t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
+ t.sleep();
+ }
+}
+
void AsyncConnection::process()
{
ssize_t r = 0;
state = STATE_OPEN;
async_msgr->ms_fast_preprocess(message);
- if (async_msgr->ms_can_fast_dispatch(message)) {
+ if (delay_thread) {
+ utime_t release;
+ if (rand() % 10000 < async_msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
+ release = message->get_recv_stamp();
+ release += async_msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
+ ldout(async_msgr->cct, 1) << "queue_received will delay until " << release << " on " << message << " " << *message << dendl;
+ }
+ delay_thread->queue(release, message);
+ } else if (async_msgr->ms_can_fast_dispatch(message)) {
lock.Unlock();
async_msgr->ms_fast_dispatch(message);
lock.Lock();
if (is_queued())
center->dispatch_event_external(write_handler);
write_lock.Unlock();
-
+ maybe_start_delay_thread();
break;
}
r = read_until(sizeof(newly_acked_seq), state_buffer);
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " read ack seq failed" << dendl;
- goto fail;
+ goto fail_registered;
} else if (r > 0) {
break;
}
if (is_queued())
center->dispatch_event_external(write_handler);
write_lock.Unlock();
+ maybe_start_delay_thread();
break;
}
return 0;
+fail_registered:
+ ldout(async_msgr->cct, 10) << "accept fault after register" << dendl;
+ inject_delay();
+
fail:
return -1;
}
lock.Unlock();
AsyncConnectionRef existing = async_msgr->lookup_conn(peer_addr);
- if (async_msgr->cct->_conf->ms_inject_internal_delays) {
- ldout(msgr->cct, 10) << __func__ << " sleep for "
- << async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
- utime_t t;
- t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
- t.sleep();
- }
+ inject_delay();
lock.Lock();
if (state != STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH) {
replace:
ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl;
- if (async_msgr->cct->_conf->ms_inject_internal_delays) {
- ldout(msgr->cct, 10) << __func__ << " sleep for "
- << async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
- utime_t t;
- t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
- t.sleep();
+ inject_delay();
+ if (existing->delay_thread) {
+ existing->delay_thread->steal_for_pipe(this);
+ delay_thread = existing->delay_thread;
+ existing->delay_thread = NULL;
+ delay_thread->flush();
}
if (existing->policy.lossy) {
// it's safe that here we don't acquire Connection's lock
r = async_msgr->accept_conn(this);
- if (async_msgr->cct->_conf->ms_inject_internal_delays) {
- ldout(msgr->cct, 10) << __func__ << " sleep for "
- << async_msgr->cct->_conf->ms_inject_internal_delays << dendl;
- utime_t t;
- t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
- t.sleep();
- }
-
+ inject_delay();
+
lock.Lock();
replacing = false;
if (r < 0) {
fail_registered:
ldout(async_msgr->cct, 10) << __func__ << " accept fault after register" << dendl;
-
- if (async_msgr->cct->_conf->ms_inject_internal_delays) {
- ldout(async_msgr->cct, 10) << __func__ << " sleep for "
- << async_msgr->cct->_conf->ms_inject_internal_delays
- << dendl;
- utime_t t;
- t.set_from_double(async_msgr->cct->_conf->ms_inject_internal_delays);
- t.sleep();
- }
+ inject_delay();
fail:
ldout(async_msgr->cct, 10) << __func__ << " failed to accept." << dendl;
ldout(async_msgr->cct, 1) << __func__ << " on lossy channel, failing" << dendl;
center->dispatch_event_external(reset_handler);
_stop();
+ if (delay_thread)
+ delay_thread->discard();
return ;
}
can_write = WriteStatus::NOWRITE;
open_write = false;
+ // queue delayed items immediately
+ if (delay_thread)
+ delay_thread->flush();
// requeue sent items
requeue_sent();
recv_start = recv_end = 0;
ldout(async_msgr->cct,10) << __func__ << " started" << dendl;
assert(lock.is_locked());
Mutex::Locker l(write_lock);
+ if (delay_thread)
+ delay_thread->discard();
discard_out_queue();
center->dispatch_event_external(remote_reset_handler);
void AsyncConnection::_stop()
{
assert(lock.is_locked());
+ if (delay_thread && delay_thread->is_started()) {
+ ldout(msgr->cct, 20) << "joining delay_thread" << dendl;
+ if (delay_thread->is_flushing()) {
+ delay_thread->wait_for_flush();
+ }
+ delay_thread->stop();
+ delay_thread->join();
+ }
if (state == STATE_CLOSED)
return ;
center->delete_time_event(*it);
// Make sure in-queue events will been processed
center->dispatch_event_external(EventCallbackRef(new C_clean_handler(this)));
+
}
void AsyncConnection::prepare_send_message(uint64_t features, Message *m, bufferlist &bl)
}
}
+void AsyncConnection::DelayedDelivery::discard()
+{
+ Mutex::Locker l(delay_lock);
+ while (!delay_queue.empty()) {
+ Message *m = delay_queue.front().second;
+ // TODO: what to use here?
+ //pipe->async_msgr->dispatch_throttle_release(m->get_dispatch_throttle_size());
+ m->put();
+ delay_queue.pop_front();
+ }
+}
+
+void AsyncConnection::DelayedDelivery::flush()
+{
+ Mutex::Locker l(delay_lock);
+ flush_count = delay_queue.size();
+ delay_cond.Signal();
+}
+
+void *AsyncConnection::DelayedDelivery::entry()
+{
+ Mutex::Locker locker(delay_lock);
+
+ while (!stop_delayed_delivery) {
+ if (delay_queue.empty()) {
+ delay_cond.Wait(delay_lock);
+ continue;
+ }
+ utime_t release = delay_queue.front().first;
+ Message *m = delay_queue.front().second;
+ string delay_msg_type = connection->async_msgr->cct->_conf->ms_inject_delay_msg_type;
+ if (!flush_count &&
+ (release > ceph_clock_now(connection->async_msgr->cct) &&
+ (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) {
+ delay_cond.WaitUntil(delay_lock, release);
+ continue;
+ }
+ delay_queue.pop_front();
+ if (flush_count > 0) {
+ --flush_count;
+ active_flush = true;
+ }
+ if (connection->async_msgr->ms_can_fast_dispatch(m)) {
+ if (!stop_fast_dispatching_flag) {
+ delay_dispatching = true;
+ delay_lock.Unlock();
+ connection->async_msgr->ms_fast_dispatch(m);
+ delay_lock.Lock();
+ delay_dispatching = false;
+ if (stop_fast_dispatching_flag) {
+ // we need to let the stopping thread proceed
+ delay_cond.Signal();
+ delay_lock.Unlock();
+ delay_lock.Lock();
+ }
+ }
+ } else {
+ connection->center->dispatch_event_external(EventCallbackRef(new C_handle_dispatch(connection->async_msgr, m)));
+ }
+ active_flush = false;
+ }
+ return NULL;
+}
+
+void AsyncConnection::DelayedDelivery::stop_fast_dispatching() {
+ Mutex::Locker l(delay_lock);
+ stop_fast_dispatching_flag = true;
+ while (delay_dispatching)
+ delay_cond.Wait(delay_lock);
+}
+
void AsyncConnection::send_keepalive()
{
ldout(async_msgr->cct, 10) << __func__ << " started." << dendl;
void handle_ack(uint64_t seq);
void _send_keepalive_or_ack(bool ack=false, utime_t *t=NULL);
ssize_t write_message(Message *m, bufferlist& bl, bool more);
+ void inject_delay();
ssize_t _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply,
bufferlist &authorizer_reply) {
bufferlist reply_bl;
reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
}
ssize_t r = try_send(reply_bl);
- if (r < 0)
+ if (r < 0) {
+ inject_delay();
return -1;
+ }
state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
return 0;
assert(write_lock.is_locked());
return !out_q.empty();
}
+
+ /**
+ * The DelayedDelivery is for injecting delays into Message delivery off
+ * the socket. It is only enabled if delays are requested, and if they
+ * are then it pulls Messages off the DelayQueue and puts them into the
+ * AsyncMessenger event queue.
+ * This is a nearly direct copy & paste from SimpleMessenger, and as
+ * such, there was a problem during AsyncConnection shutdown, fixed by
+ * checking whether delay_thread isn't already stopped and refraining
+ * from stopping it again.
+ */
+ class DelayedDelivery: public Thread {
+ AsyncConnection *connection;
+ std::deque< pair<utime_t,Message*> > delay_queue;
+ Mutex delay_lock;
+ Cond delay_cond;
+ int flush_count;
+ AsyncMessenger *msgr;
+ bool active_flush;
+ bool stop_delayed_delivery;
+ bool delay_dispatching; // we are in fast dispatch now
+ bool stop_fast_dispatching_flag; // we need to stop fast dispatching
+
+ public:
+ explicit DelayedDelivery(AsyncConnection *p, AsyncMessenger *omsgr)
+ : connection(p),
+ delay_lock("AsyncConnection::DelayedDelivery::delay_lock"), flush_count(0),
+ msgr(omsgr),
+ active_flush(false),
+ stop_delayed_delivery(false),
+ delay_dispatching(false),
+ stop_fast_dispatching_flag(false) { }
+ ~DelayedDelivery() { discard(); }
+ void *entry();
+ void queue(utime_t release, Message *m) {
+ Mutex::Locker l(delay_lock);
+ delay_queue.push_back(make_pair(release, m));
+ delay_cond.Signal();
+ }
+ void discard();
+ void flush();
+ bool is_flushing() {
+ Mutex::Locker l(delay_lock);
+ return flush_count > 0 || active_flush;
+ }
+ void wait_for_flush() {
+ Mutex::Locker l(delay_lock);
+ while (flush_count > 0 || active_flush)
+ delay_cond.Wait(delay_lock);
+ }
+ void stop() {
+ delay_lock.Lock();
+ stop_delayed_delivery = true;
+ delay_cond.Signal();
+ delay_lock.Unlock();
+ }
+ void steal_for_pipe(AsyncConnection *new_owner) {
+ Mutex::Locker l(delay_lock);
+ connection = new_owner;
+ }
+ /**
+ * We need to stop fast dispatching before we need to stop putting
+ * normal messages into the DispatchQueue.
+ */
+ void stop_fast_dispatching();
+ } *delay_thread;
public:
AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p);
~AsyncConnection();
+ void maybe_start_delay_thread();
ostream& _conn_prefix(std::ostream *_dout);