OPTION(ms_rwthread_stack_bytes, OPT_U64, 1024 << 10)
OPTION(ms_tcp_read_timeout, OPT_U64, 900)
OPTION(ms_inject_socket_failures, OPT_U64, 0)
+OPTION(ms_inject_delay_type, OPT_STR, "") // "osd mds mon client" allowed
+OPTION(ms_inject_delay_max, OPT_DOUBLE, 1) // seconds
+OPTION(ms_inject_delay_probability, OPT_DOUBLE, 0) // range [0, 1]
OPTION(mon_data, OPT_STR, "/var/lib/ceph/mon/$cluster-$id")
OPTION(mon_initial_members, OPT_STR, "") // list of initial cluster mon ids; if specified, need majority to form initial quorum and create new cluster
Pipe::Pipe(SimpleMessenger *r, int st, Connection *con)
: reader_thread(this), writer_thread(this),
+ dispatch_thread(NULL), delay_queue(NULL),
+ delay_lock(NULL), delay_cond(NULL), stop_delayed_delivery(true),
msgr(r),
conn_id(r->dispatch_queue.get_id()),
sd(-1), port(0),
msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms
if (msgr->timeout == 0)
msgr->timeout = -1;
+
+ if (msgr->cct->_conf->ms_inject_delay_type.find(ceph_entity_type_name(connection_state->peer_type))
+ != string::npos) {
+ lsubdout(msgr->cct, ms, 1) << "setting up a delay queue on Pipe " << this << dendl;
+ dispatch_thread = new DelayedDelivery(this);
+ delay_queue = new std::deque< Message * >();
+ delay_lock = new Mutex("delay_lock");
+ delay_cond = new Cond();
+ }
}
Pipe::~Pipe()
if (connection_state)
connection_state->put();
delete session_security;
+ if (dispatch_thread) {
+ delete dispatch_thread;
+ assert(delay_queue->empty());
+ delete delay_queue;
+ assert(!delay_lock->is_locked());
+ delete delay_lock;
+ delete delay_cond;
+ }
}
void Pipe::handle_ack(uint64_t seq)
}
reader_running = true;
reader_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes);
+ if (dispatch_thread && stop_delayed_delivery) {
+ lsubdout(msgr->cct, ms, 1) << "running delayed dispatch thread on Pipe " << this << dendl;
+ delay_lock->Lock();
+ stop_delayed_delivery = false;
+ dispatch_thread->create();
+ delay_lock->Unlock();
+ }
}
void Pipe::start_writer()
void Pipe::queue_received(Message *m, int priority)
{
assert(pipe_lock.is_locked());
+ if (delay_queue) {
+ lsubdout(msgr->cct, ms, 1) << "queuing message " << m << " for delayed delivery" << dendl;
+ Mutex::Locker locker(*delay_lock);
+ delay_queue->push_back(m);
+ delay_cond->Signal();
+ return;
+ }
in_q->enqueue(m, priority, conn_id);
}
-
+void Pipe::delayed_delivery() {
+ Mutex::Locker locker(*delay_lock);
+ if (delay_queue->empty())
+ lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond because delay queue is empty" << dendl;
+ delay_cond->Wait(*delay_lock);
+ while (!stop_delayed_delivery) {
+ Message *m = delay_queue->front();
+ lsubdout(msgr->cct, ms, 1) << "dequeuing message " << m << " for delayed delivery" << dendl;
+ delay_queue->pop_front();
+ in_q->enqueue(m, m->get_priority(), conn_id);
+ if (delay_queue->empty()) {
+ lsubdout(msgr->cct, ms, 1) << "sleeping on delay_cond" << dendl;
+ delay_cond->Wait(*delay_lock);
+ }
+ }
+}
int Pipe::accept()
{
state = STATE_CLOSED;
cond.Signal();
shutdown_socket();
+ if (dispatch_thread) {
+ lsubdout(msgr->cct, ms, 1) << "signalling to stop delayed dispatch thread and clear out messages" << dendl;
+ Mutex::Locker locker(*delay_lock);
+ stop_delayed_delivery = true;
+ delay_cond->Signal();
+ }
}
} writer_thread;
friend class Writer;
+ /**
+ * The DelayedDelivery is for injecting delays into Message delivery off
+ * the socket. It is only enabled if delays are requested, and if they
+ * are then it pulls Messages off the DelayQueue and puts them into the
+ * in_q (SimpleMessenger::dispatch_queue).
+ * Please note that this probably has issues with Pipe shutdown and
+ * replacement semantics. I've tried, but no guarantees.
+ */
+ class DelayedDelivery: public Thread {
+ Pipe *pipe;
+ public:
+ DelayedDelivery(Pipe *p) : pipe(p) {}
+ void *entry() { pipe->delayed_delivery(); return 0; }
+ };
+ friend class DelayedDelivery;
+
+ DelayedDelivery *dispatch_thread;
+ // TODO: clean up the delay_queue better on shutdown
+ std::deque< Message * > *delay_queue;
+ Mutex *delay_lock;
+ Cond *delay_cond;
+ bool stop_delayed_delivery;
+
public:
Pipe(SimpleMessenger *r, int st, Connection *con);
~Pipe();
queue_received(m, m->get_priority());
}
+ void delayed_delivery();
+
__u32 get_out_seq() { return out_seq; }
bool is_queued() { return !out_q.empty() || keepalive; }
writer_thread.join();
if (reader_thread.is_started())
reader_thread.join();
+ if (dispatch_thread && dispatch_thread->is_started()) {
+ delay_lock->Lock();
+ stop_delayed_delivery = true;
+ delay_cond->Signal();
+ delay_lock->Unlock();
+ dispatch_thread->join();
+ }
}
void stop();