]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
objecter: add accounting to keep track of total in-flight messages.
authorGreg Farnum <gregf@hq.newdream.net>
Mon, 20 Sep 2010 16:33:06 +0000 (09:33 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Mon, 20 Sep 2010 16:34:58 +0000 (09:34 -0700)
If the user wishes, they can call throttle_op to hold an operation
until it fits within the limits. The user is responsible for
consistency guarantees and making sure the locking will work!

src/config.cc
src/config.h
src/osdc/Objecter.cc
src/osdc/Objecter.h

index 2b69af13b02eed87b43f492f83da1a3a5ff82a96..b0b02adae29492c4c9ddebe9906a6e48b908f1e1 100644 (file)
@@ -375,6 +375,7 @@ static struct config_option config_optionsp[] = {
        OPTION(objecter_tick_interval, 0, OPT_DOUBLE, 5.0),
        OPTION(objecter_mon_retry_interval, 0, OPT_DOUBLE, 5.0),
        OPTION(objecter_timeout, 0, OPT_DOUBLE, 10.0),    // before we ask for a map
+       OPTION(objecter_inflight_op_bytes, 0, OPT_LONGLONG, 1024*1024*100), //max in-flight data (both directions)
        OPTION(journaler_allow_split_entries, 0, OPT_BOOL, true),
        OPTION(journaler_safe, 0, OPT_BOOL, true),  // wait for COMMIT on journal writes
        OPTION(journaler_write_head_interval, 0, OPT_INT, 15),
index fb0324b45bd9dff03c1e3751916f5331acf91503..ec246d2ae1c228ac622b0177eb909d2639f3edbd 100644 (file)
@@ -206,6 +206,7 @@ struct md_config_t {
   double objecter_map_request_interval;
   double objecter_tick_interval;
   double objecter_timeout;
+  uint64_t objecter_inflight_op_bytes;
 
   // journaler
   bool  journaler_allow_split_entries;
index bebbefda45d84833dac3f35f67e895962f520cd8..4939e34a3b3c014bbadd56aeebc9b88ac707302d 100644 (file)
@@ -302,6 +302,7 @@ void Objecter::kick_requests(set<pg_t>& changed_pgs)
       hash_map<tid_t, Op*>::iterator p = op_osd.find(tid);
       if (p != op_osd.end()) {
        Op *op = p->second;
+       put_op_budget(op);
        op_osd.erase(p);
 
        if (op->onack)
@@ -409,6 +410,7 @@ tid_t Objecter::op_submit(Op *op)
   } else {
     dout(20) << " note: not requesting commit" << dendl;
   }
+  take_op_budget(op);
   op_osd[op->tid] = op;
   pg.active_tids.insert(op->tid);
   pg.last = g_clock.now();
@@ -464,6 +466,34 @@ tid_t Objecter::op_submit(Op *op)
   return op->tid;
 }
 
+int Objecter::calc_op_budget(Op *op)
+{
+  int op_budget = 0;
+  for (vector<OSDOp>::iterator i = op->ops.begin();
+       i != op->ops.end();
+       ++i) {
+    if (i->op.op & CEPH_OSD_OP_MODE_WR) {
+      op_budget += i->data.length();
+    } else if (i->op.op & CEPH_OSD_OP_MODE_RD) {
+      if (i->op.op & CEPH_OSD_OP_TYPE_DATA)
+        op_budget += i->op.extent.length;
+      else if (i->op.op & CEPH_OSD_OP_TYPE_ATTR)
+        op_budget += i->op.xattr.name_len + i->op.xattr.value_len;
+    }
+  }
+  return op_budget;
+}
+
+void Objecter::throttle_op(Op *op)
+{
+  int op_budget = calc_op_budget(op);
+  if (!op_throttler.get_or_fail(op_budget)) { //couldn't take right now
+    client_lock.Unlock();
+    op_throttler.get(op_budget);
+    client_lock.Lock();
+  }
+}
+
 /* This function DOES put the passed message before returning */
 void Objecter::handle_osd_op_reply(MOSDOpReply *m)
 {
@@ -539,6 +569,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
             << " still has " << pg.active_tids << dendl;
     if (pg.active_tids.empty()) 
       close_pg( m->get_pg() );
+    put_op_budget(op);
     op_osd.erase( tid );
     delete op;
   }
index b5d6dd5e71220f8c8128f9e551f1fdb39c61bcd4..44dbb07dcc436d057218fb62360cbffd9918b764 100644 (file)
@@ -409,6 +409,24 @@ public:
 
   void resend_mon_ops();
 
+  /**
+   * handle a budget for in-flight ops
+   * budget is taken whenever an op goes into the op_osd map
+   * and returned whenever an op is removed from the map
+   * If throttle_op needs to throttle it will unlock client_lock.
+   */
+  int calc_op_budget(Op *op);
+  void throttle_op(Op *op);
+  void take_op_budget(Op *op) {
+    int op_budget = calc_op_budget(op);
+    op_throttler.take(op_budget);
+  }
+  void put_op_budget(Op *op) {
+    int op_budget = calc_op_budget(op);
+    op_throttler.put(op_budget);
+  }
+  Throttle op_throttler;
+
  public:
   Objecter(Messenger *m, MonClient *mc, OSDMap *om, Mutex& l) : 
     messenger(m), monc(mc), osdmap(om),
@@ -416,7 +434,8 @@ public:
     num_unacked(0), num_uncommitted(0),
     last_seen_osdmap_version(0),
     last_seen_pgmap_version(0),
-    client_lock(l), timer(l)
+    client_lock(l), timer(l),
+    op_throttler(g_conf.objecter_inflight_op_bytes)
   { }
   ~Objecter() { }