return count;
}
+ __u64 get_max() { return max; }
+
bool wait(__u64 m = 0) {
Mutex::Locker l(lock);
if (m)
OPTION(ms_die_on_failure, 0, OPT_BOOL, false),
OPTION(ms_nocrc, 0, OPT_BOOL, false),
OPTION(ms_die_on_bad_msg, 0, OPT_BOOL, false),
+ OPTION(ms_waiting_message_bytes, 0, OPT_INT, 104857600),
OPTION(mon_data, 0, OPT_STR, ""),
OPTION(mon_tick_interval, 0, OPT_INT, 5),
OPTION(mon_subscribe_interval, 0, OPT_DOUBLE, 300),
bool ms_die_on_failure;
bool ms_nocrc;
bool ms_die_on_bad_msg;
+ __u64 ms_waiting_message_bytes;
// mon
const char *mon_data;
dispatch_queue.qlen_lock.unlock();
pipe->pipe_lock.Unlock(); // done with the pipe's message queue now
- {
+ {
if ((long)m == DispatchQueue::D_BAD_REMOTE_RESET) {
dispatch_queue.lock.Lock();
Connection *con = dispatch_queue.remote_reset_q.front();
ms_deliver_handle_reset(con);
con->put();
} else {
+ ceph_msg_header& header = m->get_header();
+ int msize = header.front_len + header.middle_len + header.data_len;
dout(1) << "<== " << m->get_source_inst()
<< " " << m->get_seq()
<< " ==== " << *m
<< " " << m
<< dendl;
ms_deliver_dispatch(m);
+ message_throttler.put(msize);
dout(20) << "done calling dispatch on " << m << dendl;
}
}
dout(0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl;
return 0;
}
-
+ dout(1) << "getting message bytes now, currently using "
+ << messenger->message_throttler.get_current() << "/"
+ << messenger->message_throttler.get_max() << dendl;
+ messenger->message_throttler.get(header.front_len +
+ header.middle_len + header.data_len);
// read front
bufferlist front;
int front_len = header.front_len;
#include "include/Spinlock.h"
#include "common/Cond.h"
#include "common/Thread.h"
+#include "common/Throttle.h"
#include "Messenger.h"
#include "Message.h"
Cond wait_cond; // for wait()
bool started;
bool did_bind;
+ Throttle message_throttler;
// where i listen
bool need_addr;
SimpleMessenger() :
Messenger(entity_name_t()),
accepter(this),
- lock("SimpleMessenger::lock"), started(false), did_bind(false), need_addr(true),
+ lock("SimpleMessenger::lock"), started(false), did_bind(false),
+ message_throttler(g_conf.ms_waiting_message_bytes), need_addr(true),
destination_stopped(true), my_type(-1),
global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
dispatch_thread(this), messenger(this) {