OPTION(objecter_tick_interval, 0, OPT_DOUBLE, 5.0),
OPTION(objecter_mon_retry_interval, 0, OPT_DOUBLE, 5.0),
OPTION(objecter_timeout, 0, OPT_DOUBLE, 10.0), // before we ask for a map
+ OPTION(objecter_inflight_op_bytes, 0, OPT_LONGLONG, 1024*1024*100), //max in-flight data (both directions)
OPTION(journaler_allow_split_entries, 0, OPT_BOOL, true),
OPTION(journaler_safe, 0, OPT_BOOL, true), // wait for COMMIT on journal writes
OPTION(journaler_write_head_interval, 0, OPT_INT, 15),
double objecter_map_request_interval;
double objecter_tick_interval;
double objecter_timeout;
+ uint64_t objecter_inflight_op_bytes;
// journaler
bool journaler_allow_split_entries;
hash_map<tid_t, Op*>::iterator p = op_osd.find(tid);
if (p != op_osd.end()) {
Op *op = p->second;
+ put_op_budget(op);
op_osd.erase(p);
if (op->onack)
} else {
dout(20) << " note: not requesting commit" << dendl;
}
+ take_op_budget(op);
op_osd[op->tid] = op;
pg.active_tids.insert(op->tid);
pg.last = g_clock.now();
return op->tid;
}
+int Objecter::calc_op_budget(Op *op)
+{
+ int op_budget = 0;
+ for (vector<OSDOp>::iterator i = op->ops.begin();
+ i != op->ops.end();
+ ++i) {
+ if (i->op.op & CEPH_OSD_OP_MODE_WR) {
+ op_budget += i->data.length();
+ } else if (i->op.op & CEPH_OSD_OP_MODE_RD) {
+ if (i->op.op & CEPH_OSD_OP_TYPE_DATA)
+ op_budget += i->op.extent.length;
+ else if (i->op.op & CEPH_OSD_OP_TYPE_ATTR)
+ op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
+ }
+ }
+ return op_budget;
+}
+
+void Objecter::throttle_op(Op *op)
+{
+ int op_budget = calc_op_budget(op);
+ if (!op_throttler.get_or_fail(op_budget)) { //couldn't take right now
+ client_lock.Unlock();
+ op_throttler.get(op_budget);
+ client_lock.Lock();
+ }
+}
+
/* This function DOES put the passed message before returning */
void Objecter::handle_osd_op_reply(MOSDOpReply *m)
{
<< " still has " << pg.active_tids << dendl;
if (pg.active_tids.empty())
close_pg( m->get_pg() );
+ put_op_budget(op);
op_osd.erase( tid );
delete op;
}
void resend_mon_ops();
+ /**
+ * handle a budget for in-flight ops
+ * budget is taken whenever an op goes into the op_osd map
+ * and returned whenever an op is removed from the map
+ * If throttle_op needs to throttle it will unlock client_lock.
+ */
+ int calc_op_budget(Op *op);
+ void throttle_op(Op *op);
+ void take_op_budget(Op *op) {
+ int op_budget = calc_op_budget(op);
+ op_throttler.take(op_budget);
+ }
+ void put_op_budget(Op *op) {
+ int op_budget = calc_op_budget(op);
+ op_throttler.put(op_budget);
+ }
+ Throttle op_throttler;
+
public:
Objecter(Messenger *m, MonClient *mc, OSDMap *om, Mutex& l) :
messenger(m), monc(mc), osdmap(om),
num_unacked(0), num_uncommitted(0),
last_seen_osdmap_version(0),
last_seen_pgmap_version(0),
- client_lock(l), timer(l)
+ client_lock(l), timer(l),
+ op_throttler(g_conf.objecter_inflight_op_bytes)
{ }
~Objecter() { }