]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
objecter: shard completion_lock
authorYehuda Sadeh <yehuda@inktank.com>
Mon, 12 May 2014 23:58:31 +0000 (16:58 -0700)
committerJohn Spray <john.spray@redhat.com>
Mon, 25 Aug 2014 00:33:59 +0000 (01:33 +0100)
Object ops responses are sharded, lock hashed by object name. This
guarantees ordering on the same object. Cross object order is not
guaranteed anymore.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/common/config_opts.h
src/osdc/Objecter.cc
src/osdc/Objecter.h

index ad9ba2623a1545374cb56e2c3af69f79fde69d07..bc22e4ab3b55f199e9f6df4d47bd6d5f8643ddec 100644 (file)
@@ -288,6 +288,7 @@ OPTION(objecter_tick_interval, OPT_DOUBLE, 5.0)
 OPTION(objecter_timeout, OPT_DOUBLE, 10.0)    // before we ask for a map
 OPTION(objecter_inflight_op_bytes, OPT_U64, 1024*1024*100) // max in-flight data (both directions)
 OPTION(objecter_inflight_ops, OPT_U64, 1024)               // max in-flight ios
+OPTION(objecter_completion_locks_per_session, OPT_U64, 32) // num of completion locks per each session, for serializing same object responses
 OPTION(journaler_allow_split_entries, OPT_BOOL, true)
 OPTION(journaler_write_head_interval, OPT_INT, 15)
 OPTION(journaler_prefetch_periods, OPT_INT, 10)   // * journal object size
index 5b5e38d8f4350d63bb2e53860e94ae91402a98e3..ed364e43e87a4dd9b557991cc8d668fd5566dcdd 100644 (file)
@@ -136,6 +136,14 @@ static const char *config_keys[] = {
   NULL
 };
 
+Mutex *Objecter::OSDSession::get_lock(object_t& oid)
+{
+#define HASH_PRIME 1021
+  uint32_t h = ceph_str_hash_linux(oid.name.c_str(), oid.name.size()) % HASH_PRIME;
+
+  return completion_locks[h % num_locks];
+}
+
 const char** Objecter::get_tracked_conf_keys() const
 {
   return config_keys;
@@ -256,6 +264,8 @@ void Objecter::init()
               << cpp_strerror(ret) << dendl;
   }
 
+  timer.init();
+
   rwlock.get_read();
 
   schedule_tick();
@@ -299,6 +309,9 @@ void Objecter::shutdown()
     delete logger;
     logger = NULL;
   }
+
+  timer.shutdown();
+
 }
 
 void Objecter::_send_linger(LingerOp *info)
@@ -368,6 +381,12 @@ void Objecter::_linger_commit(LingerOp *info, int r)
 }
 
 void Objecter::unregister_linger(uint64_t linger_id)
+{
+  RWLock::WLocker wl(rwlock);
+  _unregister_linger(linger_id);
+}
+
+void Objecter::_unregister_linger(uint64_t linger_id)
 {
   map<uint64_t, LingerOp*>::iterator iter = linger_ops.find(linger_id);
   if (iter != linger_ops.end()) {
@@ -376,6 +395,7 @@ void Objecter::unregister_linger(uint64_t linger_id)
     info->session->linger_ops.erase(linger_id);
     info->session->lock.unlock();
     linger_ops.erase(iter);
+    info->canceled = true;
     info->put();
 
     logger->dec(l_osdc_linger_active);
@@ -502,8 +522,11 @@ void Objecter::_scan_requests(OSDSession *s,
 {
   assert(rwlock.is_wlocked());
 
+  list<uint64_t> unregister_lingers;
+
   RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
-  RWLock::WLocker wl(s->lock);
+
+  s->lock.get_write();
 
   // check for changed linger mappings (_before_ regular ops)
   map<ceph_tid_t,LingerOp*>::iterator lp = s->linger_ops.begin();
@@ -511,6 +534,7 @@ void Objecter::_scan_requests(OSDSession *s,
     LingerOp *op = lp->second;
     ++lp;   // check_linger_pool_dne() may touch linger_ops; prevent iterator invalidation
     ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
+    bool unregister;
     int r = _calc_target(&op->target);
     switch (r) {
     case RECALC_OP_TARGET_NO_ACTION:
@@ -525,7 +549,11 @@ void Objecter::_scan_requests(OSDSession *s,
       _linger_cancel_map_check(op);
       break;
     case RECALC_OP_TARGET_POOL_DNE:
-      _check_linger_pool_dne(op);
+      _check_linger_pool_dne(op, &unregister);
+      if (unregister) {
+        ldout(cct, 10) << " need to unregister linger op " << op->linger_id << dendl;
+        unregister_lingers.push_back(op->linger_id);
+      }
       break;
     }
   }
@@ -551,7 +579,7 @@ void Objecter::_scan_requests(OSDSession *s,
       _op_cancel_map_check(op);
       break;
     case RECALC_OP_TARGET_POOL_DNE:
-      _check_op_pool_dne(op);
+      _check_op_pool_dne(op, true);
       break;
     }
   }
@@ -562,7 +590,7 @@ void Objecter::_scan_requests(OSDSession *s,
     CommandOp *c = cp->second;
     ++cp;
     ldout(cct, 10) << " checking command " << c->tid << dendl;
-    int r = _recalc_command_target(c);
+    int r = _calc_command_target(c);
     switch (r) {
     case RECALC_OP_TARGET_NO_ACTION:
       // resend if skipped map; otherwise do nothing.
@@ -571,6 +599,9 @@ void Objecter::_scan_requests(OSDSession *s,
       // -- fall-thru --
     case RECALC_OP_TARGET_NEED_RESEND:
       need_resend_command[c->tid] = c;
+      if (c->session) {
+        _session_command_op_remove(c);
+      }
       _command_cancel_map_check(c);
       break;
     case RECALC_OP_TARGET_POOL_DNE:
@@ -580,6 +611,12 @@ void Objecter::_scan_requests(OSDSession *s,
       break;
     }     
   }
+
+  s->lock.unlock();
+
+  for (list<uint64_t>::iterator iter = unregister_lingers.begin(); iter != unregister_lingers.end(); ++iter) {
+    _unregister_linger(*iter);
+  }
 }
 
 void Objecter::handle_osd_map(MOSDMap *m)
@@ -752,7 +789,8 @@ void Objecter::handle_osd_map(MOSDMap *m)
   for (map<ceph_tid_t,CommandOp*>::iterator p = need_resend_command.begin();
        p != need_resend_command.end(); ++p) {
     CommandOp *c = p->second;
-    if (c->session) {
+    _assign_command_session(c);
+    if (c->session && !c->session->is_homeless()) {
       _send_command(c);
     }
   }
@@ -809,7 +847,7 @@ void Objecter::C_Op_Map_Latest::finish(int r)
   if (op->map_dne_bound == 0)
     op->map_dne_bound = latest;
 
-  objecter->_check_op_pool_dne(op);
+  objecter->_check_op_pool_dne(op, false);
 }
 
 int Objecter::pool_snap_by_name(int64_t poolid, const char *snap_name, snapid_t *snap)
@@ -865,7 +903,7 @@ int Objecter::pool_snap_list(int64_t poolid, vector<uint64_t> *snaps)
   return 0;
 }
 
-void Objecter::_check_op_pool_dne(Op *op)
+void Objecter::_check_op_pool_dne(Op *op, bool session_locked)
 {
   assert(rwlock.is_wlocked());
 
@@ -885,8 +923,13 @@ void Objecter::_check_op_pool_dne(Op *op)
       if (op->oncommit) {
        op->oncommit->complete(-ENOENT);
       }
-      RWLock::WLocker wl(op->session->lock);
+      if (!session_locked) {
+        op->session->lock.get_write();
+      }
       _finish_op(op);
+      if (!session_locked) {
+        op->session->lock.unlock();
+      }
     }
   } else {
     _send_op_map_check(op);
@@ -938,13 +981,20 @@ void Objecter::C_Linger_Map_Latest::finish(int r)
   if (op->map_dne_bound == 0)
     op->map_dne_bound = latest;
 
-  objecter->_check_linger_pool_dne(op);
+  bool unregister;
+  objecter->_check_linger_pool_dne(op, &unregister);
+
+  if (unregister) {
+    objecter->_unregister_linger(op->linger_id);
+  }
 }
 
-void Objecter::_check_linger_pool_dne(LingerOp *op)
+void Objecter::_check_linger_pool_dne(LingerOp *op, bool *need_unregister)
 {
   assert(rwlock.is_wlocked());
 
+  *need_unregister = false;
+
   ldout(cct, 10) << "_check_linger_pool_dne linger_id " << op->linger_id
                 << " current " << osdmap->get_epoch()
                 << " map_dne_bound " << op->map_dne_bound
@@ -957,7 +1007,7 @@ void Objecter::_check_linger_pool_dne(LingerOp *op)
       if (op->on_reg_commit) {
        op->on_reg_commit->complete(-ENOENT);
       }
-      unregister_linger(op->linger_id);
+      *need_unregister = true;
     }
   } else {
     _send_linger_map_check(op);
@@ -1079,7 +1129,7 @@ int Objecter::_get_session(int osd, OSDSession **session, RWLock::Context& lc)
   if (!lc.is_wlocked()) {
     return -EAGAIN;
   }
-  OSDSession *s = new OSDSession(osd);
+  OSDSession *s = new OSDSession(cct, osd);
   osd_sessions[osd] = s;
   s->con = messenger->get_connection(osdmap->get_inst(osd));
   logger->inc(l_osdc_osd_session_open);
@@ -1265,8 +1315,10 @@ void Objecter::_kick_requests(OSDSession *session)
   // resend lingers
   map<uint64_t, LingerOp*> lresend;  // resend in order
   for (map<ceph_tid_t, LingerOp*>::iterator j = session->linger_ops.begin(); j != session->linger_ops.end(); ++j) {
+    LingerOp *op = j->second;
+    op->get();
     logger->inc(l_osdc_linger_resend);
-    lresend[j->first] = j->second;
+    lresend[j->first] = op;
   }
 
   // resend commands
@@ -1282,7 +1334,11 @@ void Objecter::_kick_requests(OSDSession *session)
   session->lock.unlock();
 
   while (!lresend.empty()) {
-    _send_linger(lresend.begin()->second);
+    LingerOp *op = lresend.begin()->second;
+    if (!op->canceled) {
+      _send_linger(op);
+    }
+    op->put();
     lresend.erase(lresend.begin());
   }
 }
@@ -1296,8 +1352,10 @@ void Objecter::schedule_tick()
 
 void Objecter::tick()
 {
-  if (!initialized.read())
+  if (!initialized.read()) {
+    schedule_tick();
     return;
+  }
 
   assert(rwlock.is_locked());
 
@@ -1549,7 +1607,7 @@ ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc)
   ldout(cct, 10) << "_op_submit oid " << op->target.base_oid
            << " " << op->target.base_oloc << " " << op->target.target_oloc
           << " " << op->ops << " tid " << op->tid
-           << " osd." << (!op->session->is_homeless() ? op->session->osd : -1)
+           << " osd." << (!s->is_homeless() ? s->osd : -1)
            << dendl;
 
   assert(op->target.flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
@@ -1882,6 +1940,17 @@ void Objecter::_session_linger_op_remove(LingerOp *info)
   info->session = NULL;
 }
 
+void Objecter::_session_command_op_remove(CommandOp *op)
+{
+  assert(rwlock.is_locked());
+  OSDSession *s = op->session;
+  assert(s);
+  assert(s->lock.is_locked());
+
+  s->command_ops.erase(op->tid);
+  op->session = NULL;
+}
+
 int Objecter::_get_osd_session(int osd, RWLock::Context& lc, OSDSession **psession)
 {
   int r;
@@ -1905,29 +1974,6 @@ int Objecter::_get_op_target_session(Op *op, RWLock::Context& lc, OSDSession **p
   return _get_osd_session(op->target.osd, lc, psession);
 }
 
-void Objecter::_session_op_validate(Op *op, RWLock::Context& lc, bool session_locked)
-{
-  assert(rwlock.is_locked());
-
-  if (op->session && op->session->osd != op->target.osd) {
-    OSDSession *orig_session = op->session;
-    _session_op_remove(op);
-    put_session(orig_session);
-  }
-}
-
-int Objecter::_recalc_op_target(Op *op, RWLock::Context& lc,
-                                bool src_session_locked)
-{
-  assert(rwlock.is_locked());
-
-  int r = _calc_target(&op->target);
-  if (r == RECALC_OP_TARGET_NEED_RESEND) {
-    _session_op_validate(op, lc, src_session_locked);
-  }
-  return r;
-}
-
 bool Objecter::_promote_lock_check_race(RWLock::Context& lc)
 {
   epoch_t epoch = osdmap->get_epoch();
@@ -1995,7 +2041,7 @@ void Objecter::_finish_op(Op *op)
     timer.cancel_event(op->ontimeout);
   }
 
-  op->session->put();
+  put_session(op->session);
 
   inflight_ops.dec();
 
@@ -2138,7 +2184,7 @@ void Objecter::unregister_op(Op *op)
   op->session->lock.get_write();
   op->session->ops.erase(op->tid);
   op->session->lock.unlock();
-  op->session->put();
+  put_session(op->session);
   op->session = NULL;
 
   inflight_ops.dec();
@@ -2314,6 +2360,9 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
     op->outbl = 0;
   }
 
+  /* get it before we call _finish_op() */
+  Mutex *completion_lock = s->get_lock(op->target.base_oid);
+
   // done with this tid?
   if (!op->onack && !op->oncommit) {
     ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
@@ -2323,7 +2372,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   ldout(cct, 5) << num_unacked.read() << " unacked, " << num_uncommitted.read() << " uncommitted" << dendl;
 
   // serialize completions
-  s->completion_lock.Lock();
+
+  completion_lock->Lock();
   s->lock.unlock();
 
   // do callbacks
@@ -2333,7 +2383,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   if (oncommit) {
     oncommit->complete(rc);
   }
-  s->completion_lock.Unlock();
+  completion_lock->Unlock();
 
   m->put();
   s->put();
@@ -3374,13 +3424,16 @@ int Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
 {
   RWLock::WLocker wl(rwlock);
 
+  RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
+
   ceph_tid_t tid = last_tid.inc();
   ldout(cct, 10) << "_submit_command " << tid << " " << c->cmd << dendl;
   c->tid = tid;
   homeless_session.command_ops[tid] = c;
   num_homeless_ops.inc();
   c->session = &homeless_session;
-  (void)_recalc_command_target(c);
+  (void)_calc_command_target(c);
+  _assign_command_session(c);
   if (osd_timeout > 0) {
     c->ontimeout = new C_CancelCommandOp(c->session, tid, this);
     timer.add_event_after(osd_timeout, c->ontimeout);
@@ -3401,14 +3454,14 @@ int Objecter::submit_command(CommandOp *c, ceph_tid_t *ptid)
   return 0;
 }
 
-int Objecter::_recalc_command_target(CommandOp *c)
+int Objecter::_calc_command_target(CommandOp *c)
 {
   assert(rwlock.is_wlocked());
 
   RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
 
-  OSDSession *s = &homeless_session;
   c->map_check_error = 0;
+
   if (c->target_osd >= 0) {
     if (!osdmap->exists(c->target_osd)) {
       c->map_check_error = -ENOENT;
@@ -3420,25 +3473,45 @@ int Objecter::_recalc_command_target(CommandOp *c)
       c->map_check_error_str = "osd down";
       return RECALC_OP_TARGET_OSD_DOWN;
     }
-    int r = _get_session(c->target_osd, &s, lc);
-    assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
+    c->osd = c->target_osd;
   } else {
     if (!osdmap->have_pg_pool(c->target_pg.pool())) {
       c->map_check_error = -ENOENT;
       c->map_check_error_str = "pool dne";
       return RECALC_OP_TARGET_POOL_DNE;
     }
-    int primary;
     vector<int> acting;
-    osdmap->pg_to_acting_osds(c->target_pg, &acting, &primary);
-    if (primary != -1) {
-      int r = _get_session(primary, &s, lc);
-      assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
-    }
+    osdmap->pg_to_acting_osds(c->target_pg, &acting, &c->osd);
   }
 
+  OSDSession *s;
+  int r = _get_session(c->osd, &s, lc);
+  assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
+
   if (c->session != s) {
-    ldout(cct, 10) << "_recalc_command_target " << c->tid << " now " << c->session << dendl;
+    put_session(s);
+    return RECALC_OP_TARGET_NEED_RESEND;
+  }
+
+  put_session(s);
+
+  ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, " << c->session << dendl;
+
+  return RECALC_OP_TARGET_NO_ACTION;
+}
+
+void Objecter::_assign_command_session(CommandOp *c)
+{
+  assert(rwlock.is_wlocked());
+
+  RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
+
+  OSDSession *s;
+  int r = _get_session(c->osd, &s, lc);
+  assert(r != -EAGAIN); /* shouldn't happen as we're holding the write lock */
+
+  if (c->session != s) {
+    ldout(cct, 10) << "_assign_command_session " << c->tid << " now " << c->session << dendl;
     if (c->session) {
       if (c->session->is_homeless()) {
         num_homeless_ops.dec();
@@ -3458,13 +3531,9 @@ int Objecter::_recalc_command_target(CommandOp *c)
     if (s->is_homeless())
       num_homeless_ops.inc();
     s->lock.unlock();
-    return RECALC_OP_TARGET_NEED_RESEND;
   } else {
     put_session(s);
   }
-
-  ldout(cct, 20) << "_recalc_command_target " << c->tid << " no change, " << c->session << dendl;
-  return RECALC_OP_TARGET_NO_ACTION;
 }
 
 void Objecter::_send_command(CommandOp *c)
index e6e6c06966d610a575974664474b6c3559dada30..7111d3387dff1cfd77ac2df5368bed3b3135981d 100644 (file)
@@ -1326,6 +1326,7 @@ public:
     string *prs;
     int target_osd;
     pg_t target_pg;
+    int osd; /* calculated osd for sending request */
     epoch_t map_dne_bound;
     int map_check_error;           // error to return if map check fails
     const char *map_check_error_str;
@@ -1334,7 +1335,7 @@ public:
 
     CommandOp()
       : session(NULL),
-       tid(0), poutbl(NULL), prs(NULL), target_osd(-1),
+       tid(0), poutbl(NULL), prs(NULL), target_osd(-1), osd(-1),
        map_dne_bound(0),
        map_check_error(0),
        map_check_error_str(NULL),
@@ -1342,7 +1343,8 @@ public:
   };
 
   int submit_command(CommandOp *c, ceph_tid_t *ptid);
-  int _recalc_command_target(CommandOp *c);
+  int _calc_command_target(CommandOp *c);
+  void _assign_command_session(CommandOp *c);
   void _send_command(CommandOp *c);
   int command_op_cancel(OSDSession *s, ceph_tid_t tid, int r);
   void _finish_command(CommandOp *c, int r, string rs);
@@ -1366,6 +1368,7 @@ public:
     version_t *pobjver;
 
     bool registered;
+    bool canceled;
     Context *on_reg_ack, *on_reg_commit;
 
     OSDSession *session;
@@ -1378,6 +1381,7 @@ public:
                 snap(CEPH_NOSNAP),
                 poutbl(NULL), pobjver(NULL),
                 registered(false),
+                canceled(false),
                 on_reg_ack(NULL), on_reg_commit(NULL),
                 session(NULL),
                 register_tid(0),
@@ -1430,7 +1434,7 @@ public:
   // -- osd sessions --
   struct OSDSession : public RefCountedObject {
     RWLock lock;
-    Mutex completion_lock;
+    Mutex **completion_locks;
 
     // pending ops
     map<ceph_tid_t,Op*>            ops;
@@ -1439,11 +1443,27 @@ public:
 
     int osd;
     int incarnation;
+    int num_locks;
     ConnectionRef con;
 
-    OSDSession(int o) : lock("OSDSession"), completion_lock("OSDSession::completion_lock"), osd(o), incarnation(0), con(NULL) {}
+    OSDSession(CephContext *cct, int o) : lock("OSDSession"), osd(o), incarnation(0), con(NULL) {
+      num_locks = cct->_conf->objecter_completion_locks_per_session;
+      completion_locks = new Mutex *[num_locks];
+      for (int i = 0; i < num_locks; i++) {
+        completion_locks[i] = new Mutex("OSDSession::completion_lock");
+      }
+    }
+
+    ~OSDSession() {
+      for (int i = 0; i < num_locks; i++) {
+        delete completion_locks[i];
+      }
+      delete[] completion_locks;
+    }
 
     bool is_homeless() { return (osd == -1); }
+
+    Mutex *get_lock(object_t& oid);
   };
   map<int,OSDSession*> osd_sessions;
 
@@ -1494,12 +1514,11 @@ public:
                   RWLock::Context& lc);
   void _session_op_remove(Op *op);
   void _session_linger_op_remove(LingerOp *info);
+  void _session_command_op_remove(CommandOp *op);
   void _session_op_assign(Op *op, OSDSession *s);
   int _get_osd_session(int osd, RWLock::Context& lc, OSDSession **psession);
   int _assign_op_target_session(Op *op, RWLock::Context& lc, bool src_session_locked, bool dst_session_locked);
   int _get_op_target_session(Op *op, RWLock::Context& lc, OSDSession **psession);
-  void _session_op_validate(Op *op, RWLock::Context& lc, bool session_locked);
-  int _recalc_op_target(Op *op, RWLock::Context& lc, bool session_locked = false);
   int _recalc_linger_op_target(LingerOp *op, RWLock::Context& lc);
 
   void _linger_submit(LingerOp *info);
@@ -1507,10 +1526,10 @@ public:
   void _linger_ack(LingerOp *info, int r);
   void _linger_commit(LingerOp *info, int r);
 
-  void _check_op_pool_dne(Op *op);
+  void _check_op_pool_dne(Op *op, bool session_locked);
   void _send_op_map_check(Op *op);
   void _op_cancel_map_check(Op *op);
-  void _check_linger_pool_dne(LingerOp *op);
+  void _check_linger_pool_dne(LingerOp *op, bool *need_unregister);
   void _send_linger_map_check(LingerOp *op);
   void _linger_cancel_map_check(LingerOp *op);
   void _check_command_map_dne(CommandOp *op);
@@ -1574,7 +1593,7 @@ public:
     logger(NULL), tick_event(NULL),
     m_request_state_hook(NULL),
     num_homeless_ops(0),
-    homeless_session(-1),
+    homeless_session(cct, -1),
     mon_timeout(mon_timeout),
     osd_timeout(osd_timeout),
     op_throttle_bytes(cct, "objecter_bytes", cct->_conf->objecter_inflight_op_bytes),
@@ -1771,6 +1790,7 @@ public:
                    Context *onack,
                    version_t *objver);
   void unregister_linger(uint64_t linger_id);
+  void _unregister_linger(uint64_t linger_id);
 
   /**
    * set up initial ops in the op vector, and allocate a final op slot.