}
AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p)
- : Connection(cct, m), delay_thread(NULL), async_msgr(m), logger(p), global_seq(0), connect_seq(0),
+ : Connection(cct, m), delay_state(NULL), async_msgr(m), logger(p), global_seq(0), connect_seq(0),
peer_global_seq(0), out_seq(0), ack_left(0), in_seq(0), state(STATE_NONE), state_after_send(0), sd(-1),
port(-1), write_lock("AsyncConnection::write_lock"), can_write(WriteStatus::NOWRITE),
open_write(false), keepalive(false), lock("AsyncConnection::lock"), recv_buf(NULL),
delete[] recv_buf;
if (state_buffer)
delete[] state_buffer;
- if (delay_thread)
- delete delay_thread;
+ assert(!delay_state);
}
void AsyncConnection::maybe_start_delay_thread()
{
- if (!delay_thread &&
+ if (!delay_state &&
async_msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(peer_type)) != string::npos) {
- lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Connection " << this << dendl;
- delay_thread = new DelayedDelivery(this, async_msgr);
- delay_thread->create("ms_async_delay");
+ ldout(msgr->cct, 1) << __func__ << " setting up a delay queue" << dendl;
+ delay_state = new DelayedDelivery(async_msgr, center);
}
}
state = STATE_OPEN;
+ logger->inc(l_msgr_recv_messages);
+ logger->inc(l_msgr_recv_bytes, message_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
+
async_msgr->ms_fast_preprocess(message);
- if (delay_thread) {
- utime_t release;
+ if (delay_state) {
+ utime_t release = message->get_recv_stamp();
+ double delay_period = 0;
if (rand() % 10000 < async_msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
- release = message->get_recv_stamp();
- release += async_msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
- ldout(async_msgr->cct, 1) << "queue_received will delay until " << release << " on " << message << " " << *message << dendl;
+ delay_period = async_msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
+ release += delay_period;
+ ldout(async_msgr->cct, 1) << "queue_received will delay until " << release << " on "
+ << message << " " << *message << dendl;
}
- delay_thread->queue(release, message);
+ delay_state->queue(delay_period, release, message);
} else if (async_msgr->ms_can_fast_dispatch(message)) {
lock.Unlock();
async_msgr->ms_fast_dispatch(message);
} else {
center->dispatch_event_external(EventCallbackRef(new C_handle_dispatch(async_msgr, message)));
}
- logger->inc(l_msgr_recv_messages);
- logger->inc(l_msgr_recv_bytes, message_size + sizeof(ceph_msg_header) + sizeof(ceph_msg_footer));
-
break;
}
ldout(async_msgr->cct, 10) << __func__ << " accept replacing " << existing << dendl;
inject_delay();
- if (existing->delay_thread) {
- existing->delay_thread->steal_for_pipe(this);
- delay_thread = existing->delay_thread;
- existing->delay_thread = NULL;
- delay_thread->flush();
- }
-
if (existing->policy.lossy) {
// disconnect from the Connection
existing->center->dispatch_event_external(existing->reset_handler);
// Clean up output buffer
existing->outcoming_bl.clear();
+ if (existing->delay_state) {
+ existing->delay_state->flush();
+ assert(!delay_state);
+ }
existing->requeue_sent();
swap(existing->sd, sd);
ldout(async_msgr->cct, 1) << __func__ << " on lossy channel, failing" << dendl;
center->dispatch_event_external(reset_handler);
_stop();
- if (delay_thread)
- delay_thread->discard();
return ;
}
open_write = false;
// queue delayed items immediately
- if (delay_thread)
- delay_thread->flush();
+ if (delay_state)
+ delay_state->flush();
// requeue sent items
requeue_sent();
recv_start = recv_end = 0;
ldout(async_msgr->cct,10) << __func__ << " started" << dendl;
assert(lock.is_locked());
Mutex::Locker l(write_lock);
- if (delay_thread)
- delay_thread->discard();
+ if (delay_state)
+ delay_state->discard();
discard_out_queue();
center->dispatch_event_external(remote_reset_handler);
void AsyncConnection::_stop()
{
assert(lock.is_locked());
- if (delay_thread && delay_thread->is_started()) {
- ldout(msgr->cct, 20) << "joining delay_thread" << dendl;
- if (delay_thread->is_flushing()) {
- delay_thread->wait_for_flush();
- }
- delay_thread->stop();
- delay_thread->join();
- }
if (state == STATE_CLOSED)
return ;
+ if (delay_state)
+ delay_state->flush();
+
ldout(async_msgr->cct, 1) << __func__ << dendl;
Mutex::Locker l(write_lock);
if (sd >= 0)
}
}
-void AsyncConnection::DelayedDelivery::discard()
+void AsyncConnection::DelayedDelivery::do_request(int id)
{
- Mutex::Locker l(delay_lock);
- while (!delay_queue.empty()) {
- Message *m = delay_queue.front().second;
- // TODO: what to use here?
- //pipe->async_msgr->dispatch_throttle_release(m->get_dispatch_throttle_size());
- m->put();
+ Message *m = nullptr;
+ {
+ Mutex::Locker l(delay_lock);
+ register_time_events.erase(id);
+ if (delay_queue.empty())
+ return ;
+ utime_t release = delay_queue.front().first;
+ m = delay_queue.front().second;
+ string delay_msg_type = msgr->cct->_conf->ms_inject_delay_msg_type;
+ utime_t now = ceph_clock_now(msgr->cct);
+ if ((release > now &&
+ (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) {
+ utime_t t = release - now;
+ t.sleep();
+ }
delay_queue.pop_front();
}
+ if (msgr->ms_can_fast_dispatch(m)) {
+ msgr->ms_fast_dispatch(m);
+ } else {
+ msgr->ms_deliver_dispatch(m);
+ }
}
-void AsyncConnection::DelayedDelivery::flush()
-{
- Mutex::Locker l(delay_lock);
- flush_count = delay_queue.size();
- delay_cond.Signal();
-}
-
-void *AsyncConnection::DelayedDelivery::entry()
-{
- Mutex::Locker locker(delay_lock);
-
- while (!stop_delayed_delivery) {
- if (delay_queue.empty()) {
- delay_cond.Wait(delay_lock);
- continue;
- }
- utime_t release = delay_queue.front().first;
- Message *m = delay_queue.front().second;
- string delay_msg_type = connection->async_msgr->cct->_conf->ms_inject_delay_msg_type;
- if (!flush_count &&
- (release > ceph_clock_now(connection->async_msgr->cct) &&
- (delay_msg_type.empty() || m->get_type_name() == delay_msg_type))) {
- delay_cond.WaitUntil(delay_lock, release);
- continue;
- }
- delay_queue.pop_front();
- if (flush_count > 0) {
- --flush_count;
- active_flush = true;
- }
- if (connection->async_msgr->ms_can_fast_dispatch(m)) {
- if (!stop_fast_dispatching_flag) {
- delay_dispatching = true;
- delay_lock.Unlock();
- connection->async_msgr->ms_fast_dispatch(m);
- delay_lock.Lock();
- delay_dispatching = false;
- if (stop_fast_dispatching_flag) {
- // we need to let the stopping thread proceed
- delay_cond.Signal();
- delay_lock.Unlock();
- delay_lock.Lock();
- }
+class C_flush_messages : public EventCallback {
+ std::deque<std::pair<utime_t, Message*> > delay_queue;
+ AsyncMessenger *msgr;
+ public:
+ C_flush_messages(std::deque<std::pair<utime_t, Message*> > &&q, AsyncMessenger *m): delay_queue(std::move(q)), msgr(m) {}
+ void do_request(int id) {
+ while (!delay_queue.empty()) {
+ Message *m = delay_queue.front().second;
+ if (msgr->ms_can_fast_dispatch(m)) {
+ msgr->ms_fast_dispatch(m);
+ } else {
+ msgr->ms_deliver_dispatch(m);
}
- } else {
- connection->center->dispatch_event_external(EventCallbackRef(new C_handle_dispatch(connection->async_msgr, m)));
+ delay_queue.pop_front();
}
- active_flush = false;
+ delete this;
}
- return NULL;
-}
+};
-void AsyncConnection::DelayedDelivery::stop_fast_dispatching() {
+void AsyncConnection::DelayedDelivery::flush() {
Mutex::Locker l(delay_lock);
- stop_fast_dispatching_flag = true;
- while (delay_dispatching)
- delay_cond.Wait(delay_lock);
+ center->dispatch_event_external(new C_flush_messages(std::move(delay_queue), msgr));
+ for (auto i : register_time_events)
+ center->delete_time_event(i);
+ register_time_events.clear();
}
void AsyncConnection::send_keepalive()
assert(write_lock.is_locked());
return !out_q.empty();
}
-
+
/**
* The DelayedDelivery is for injecting delays into Message delivery off
* the socket. It is only enabled if delays are requested, and if they
* are then it pulls Messages off the DelayQueue and puts them into the
* AsyncMessenger event queue.
- * This is a nearly direct copy & paste from SimpleMessenger, and as
- * such, there was a problem during AsyncConnection shutdown, fixed by
- * checking whether delay_thread isn't already stopped and refraining
- * from stopping it again.
*/
- class DelayedDelivery: public Thread {
- AsyncConnection *connection;
- std::deque< pair<utime_t,Message*> > delay_queue;
+ class DelayedDelivery : public EventCallback {
+ std::set<uint64_t> register_time_events; // need to delete it if stop
+ std::deque<std::pair<utime_t, Message*> > delay_queue;
Mutex delay_lock;
- Cond delay_cond;
- int flush_count;
AsyncMessenger *msgr;
- bool active_flush;
- bool stop_delayed_delivery;
- bool delay_dispatching; // we are in fast dispatch now
- bool stop_fast_dispatching_flag; // we need to stop fast dispatching
+ EventCenter *center;
- public:
- explicit DelayedDelivery(AsyncConnection *p, AsyncMessenger *omsgr)
- : connection(p),
- delay_lock("AsyncConnection::DelayedDelivery::delay_lock"), flush_count(0),
- msgr(omsgr),
- active_flush(false),
- stop_delayed_delivery(false),
- delay_dispatching(false),
- stop_fast_dispatching_flag(false) { }
- ~DelayedDelivery() { discard(); }
- void *entry();
- void queue(utime_t release, Message *m) {
- Mutex::Locker l(delay_lock);
- delay_queue.push_back(make_pair(release, m));
- delay_cond.Signal();
+ public:
+ explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c)
+ : delay_lock("AsyncConnection::DelayedDelivery::delay_lock"),
+ msgr(omsgr), center(c) { }
+ ~DelayedDelivery() {
+ assert(register_time_events.empty());
+ assert(delay_queue.empty());
}
- void discard();
- void flush();
- bool is_flushing() {
+ void do_request(int id) override;
+ void queue(double delay_period, utime_t release, Message *m) {
Mutex::Locker l(delay_lock);
- return flush_count > 0 || active_flush;
+ delay_queue.push_back(std::make_pair(release, m));
+ register_time_events.insert(center->create_time_event(delay_period*1000000, this));
}
- void wait_for_flush() {
+ void discard() {
Mutex::Locker l(delay_lock);
- while (flush_count > 0 || active_flush)
- delay_cond.Wait(delay_lock);
- }
- void stop() {
- delay_lock.Lock();
- stop_delayed_delivery = true;
- delay_cond.Signal();
- delay_lock.Unlock();
- }
- void steal_for_pipe(AsyncConnection *new_owner) {
- Mutex::Locker l(delay_lock);
- connection = new_owner;
+ while (!delay_queue.empty()) {
+ Message *m = delay_queue.front().second;
+ m->put();
+ delay_queue.pop_front();
+ }
+ for (auto i : register_time_events)
+ center->delete_time_event(i);
+ register_time_events.clear();
}
- /**
- * We need to stop fast dispatching before we need to stop putting
- * normal messages into the DispatchQueue.
- */
- void stop_fast_dispatching();
- } *delay_thread;
+ void flush();
+ } *delay_state;
public:
AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c, PerfCounters *p);
delete connect_handler;
delete local_deliver_handler;
delete wakeup_handler;
+ if (delay_state) {
+ delete delay_state;
+ delay_state = NULL;
+ }
}
PerfCounters *get_perf_counter() {
return logger;