From 0c800a9d6c6b3ef46bbfe4ba9f66ef9be4796656 Mon Sep 17 00:00:00 2001 From: Greg Farnum Date: Mon, 20 Sep 2010 09:33:06 -0700 Subject: [PATCH] objecter: add accounting to keep track of total in-flight messages. If the user wishes, they can call throttle_op to hold an operation until it fits within the limits. The user is responsible for consistency guarantees and making sure the locking will work! --- src/config.cc | 1 + src/config.h | 1 + src/osdc/Objecter.cc | 31 +++++++++++++++++++++++++++++++ src/osdc/Objecter.h | 21 ++++++++++++++++++++- 4 files changed, 53 insertions(+), 1 deletion(-) diff --git a/src/config.cc b/src/config.cc index 2b69af13b02ee..b0b02adae2949 100644 --- a/src/config.cc +++ b/src/config.cc @@ -375,6 +375,7 @@ static struct config_option config_optionsp[] = { OPTION(objecter_tick_interval, 0, OPT_DOUBLE, 5.0), OPTION(objecter_mon_retry_interval, 0, OPT_DOUBLE, 5.0), OPTION(objecter_timeout, 0, OPT_DOUBLE, 10.0), // before we ask for a map + OPTION(objecter_inflight_op_bytes, 0, OPT_LONGLONG, 1024*1024*100), //max in-flight data (both directions) OPTION(journaler_allow_split_entries, 0, OPT_BOOL, true), OPTION(journaler_safe, 0, OPT_BOOL, true), // wait for COMMIT on journal writes OPTION(journaler_write_head_interval, 0, OPT_INT, 15), diff --git a/src/config.h b/src/config.h index fb0324b45bd9d..ec246d2ae1c22 100644 --- a/src/config.h +++ b/src/config.h @@ -206,6 +206,7 @@ struct md_config_t { double objecter_map_request_interval; double objecter_tick_interval; double objecter_timeout; + uint64_t objecter_inflight_op_bytes; // journaler bool journaler_allow_split_entries; diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index bebbefda45d84..4939e34a3b3c0 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -302,6 +302,7 @@ void Objecter::kick_requests(set& changed_pgs) hash_map::iterator p = op_osd.find(tid); if (p != op_osd.end()) { Op *op = p->second; + put_op_budget(op); op_osd.erase(p); if (op->onack) @@ -409,6 +410,7 @@ tid_t Objecter::op_submit(Op *op) } else { dout(20) << " note: not requesting commit" << dendl; } + take_op_budget(op); op_osd[op->tid] = op; pg.active_tids.insert(op->tid); pg.last = g_clock.now(); @@ -464,6 +466,34 @@ tid_t Objecter::op_submit(Op *op) return op->tid; } +int Objecter::calc_op_budget(Op *op) +{ + int op_budget = 0; + for (vector::iterator i = op->ops.begin(); + i != op->ops.end(); + ++i) { + if (i->op.op & CEPH_OSD_OP_MODE_WR) { + op_budget += i->data.length(); + } else if (i->op.op & CEPH_OSD_OP_MODE_RD) { + if (i->op.op & CEPH_OSD_OP_TYPE_DATA) + op_budget += i->op.extent.length; + else if (i->op.op & CEPH_OSD_OP_TYPE_ATTR) + op_budget += i->op.xattr.name_len + i->op.xattr.value_len; + } + } + return op_budget; +} + +void Objecter::throttle_op(Op *op) +{ + int op_budget = calc_op_budget(op); + if (!op_throttler.get_or_fail(op_budget)) { //couldn't take right now + client_lock.Unlock(); + op_throttler.get(op_budget); + client_lock.Lock(); + } +} + /* This function DOES put the passed message before returning */ void Objecter::handle_osd_op_reply(MOSDOpReply *m) { @@ -539,6 +569,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) << " still has " << pg.active_tids << dendl; if (pg.active_tids.empty()) close_pg( m->get_pg() ); + put_op_budget(op); op_osd.erase( tid ); delete op; } diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index b5d6dd5e71220..44dbb07dcc436 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -409,6 +409,24 @@ public: void resend_mon_ops(); + /** + * handle a budget for in-flight ops + * budget is taken whenever an op goes into the op_osd map + * and returned whenever an op is removed from the map + * If throttle_op needs to throttle it will unlock client_lock. + */ + int calc_op_budget(Op *op); + void throttle_op(Op *op); + void take_op_budget(Op *op) { + int op_budget = calc_op_budget(op); + op_throttler.take(op_budget); + } + void put_op_budget(Op *op) { + int op_budget = calc_op_budget(op); + op_throttler.put(op_budget); + } + Throttle op_throttler; + public: Objecter(Messenger *m, MonClient *mc, OSDMap *om, Mutex& l) : messenger(m), monc(mc), osdmap(om), @@ -416,7 +434,8 @@ public: num_unacked(0), num_uncommitted(0), last_seen_osdmap_version(0), last_seen_pgmap_version(0), - client_lock(l), timer(l) + client_lock(l), timer(l), + op_throttler(g_conf.objecter_inflight_op_bytes) { } ~Objecter() { } -- 2.39.5