]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc/Objecter: respect backoffs
authorSage Weil <sage@redhat.com>
Wed, 1 Feb 2017 22:35:36 +0000 (17:35 -0500)
committerSage Weil <sage@redhat.com>
Fri, 10 Feb 2017 23:55:58 +0000 (18:55 -0500)
Signed-off-by: Sage Weil <sage@redhat.com>
src/osdc/Objecter.cc
src/osdc/Objecter.h

index 7e87f4ab05d8e04e844bbe9302d7dc0c506217ad..f592811b6fcc23cc0a97e19bf031411bed668567 100644 (file)
@@ -24,6 +24,7 @@
 #include "messages/MPing.h"
 #include "messages/MOSDOp.h"
 #include "messages/MOSDOpReply.h"
+#include "messages/MOSDBackoff.h"
 #include "messages/MOSDMap.h"
 
 #include "messages/MPoolOp.h"
@@ -969,6 +970,10 @@ bool Objecter::ms_dispatch(Message *m)
     handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
     return true;
 
+  case CEPH_MSG_OSD_BACKOFF:
+    handle_osd_backoff(static_cast<MOSDBackoff*>(m));
+    return true;
+
   case CEPH_MSG_WATCH_NOTIFY:
     handle_watch_notify(static_cast<MWatchNotify*>(m));
     m->put();
@@ -1948,6 +1953,10 @@ void Objecter::_kick_requests(OSDSession *session,
 {
   // rwlock is locked unique
 
+  // clear backoffs
+  session->backoffs.clear();
+  session->backoffs_by_id.clear();
+
   // resend ops
   map<ceph_tid_t,Op*> resend;  // resend in tid order
   for (map<ceph_tid_t, Op*>::iterator p = session->ops.begin();
@@ -3061,6 +3070,26 @@ void Objecter::_send_op(Op *op, MOSDOp *m)
   // rwlock is locked
   // op->session->lock is locked
 
+  // backoff?
+  hobject_t hoid = op->target.get_hobj();
+  auto q = op->session->backoffs.lower_bound(hoid);
+  if (q != op->session->backoffs.begin()) {
+    --q;
+    if (cmp_bitwise(hoid, q->second.end) >= 0) {
+      ++q;
+    }
+  }
+  if (q != op->session->backoffs.end()) {
+    ldout(cct, 20) << __func__ << " ? " << q->first << " [" << q->second.begin
+                  << "," << q->second.end << ")" << dendl;
+    int r = cmp_bitwise(hoid, q->second.begin);
+    if (r == 0 || (r > 0 && cmp_bitwise(hoid, q->second.end) < 0)) {
+      ldout(cct, 10) << __func__ << " backoff on " << hoid << ", queuing "
+                    << op << " tid " << op->tid << dendl;
+      return;
+    }
+  }
+
   if (!m) {
     assert(op->tid > 0);
     m = _prepare_osd_op(op);
@@ -3357,6 +3386,90 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   put_session(s);
 }
 
+void Objecter::handle_osd_backoff(MOSDBackoff *m)
+{
+  ldout(cct, 10) << __func__ << " " << *m << dendl;
+  shunique_lock sul(rwlock, ceph::acquire_shared);
+  if (!initialized.read()) {
+    m->put();
+    return;
+  }
+
+  ConnectionRef con = m->get_connection();
+  OSDSession *s = static_cast<OSDSession*>(con->get_priv());
+  if (!s || s->con != con) {
+    ldout(cct, 7) << __func__ << " no session on con " << con << dendl;
+    m->put();
+    return;
+  }
+
+  get_session(s);
+  s->put();  // from get_priv() above
+
+  OSDSession::unique_lock sl(s->lock);
+
+  switch (m->op) {
+  case CEPH_OSD_BACKOFF_OP_BLOCK:
+    {
+      // register
+      OSDBackoff& b = s->backoffs[m->begin];
+      s->backoffs_by_id.insert(make_pair(m->id, &b));
+      b.id = m->id;
+      b.begin = m->begin;
+      b.end = m->end;
+
+      // ack
+      Message *r = new MOSDBackoff(CEPH_OSD_BACKOFF_OP_ACK_BLOCK,
+                                  m->id, m->begin, m->end,
+                                  osdmap->get_epoch());
+      // this priority must match the MOSDOps from _prepare_osd_op
+      r->set_priority(cct->_conf->osd_client_op_priority);
+      con->send_message(r);
+    }
+    break;
+
+  case CEPH_OSD_BACKOFF_OP_UNBLOCK:
+    {
+      auto p = s->backoffs_by_id.find(m->id);
+      while (p != s->backoffs_by_id.end() &&
+            p->second->id == m->id) {
+       OSDBackoff *b = p->second;
+       if (b->begin != m->begin &&
+           b->end != m->end) {
+         lderr(cct) << __func__ << " got id " << m->id << " unblock on ["
+                    << m->begin << "," << m->end << ") but backoff is ["
+                    << b->begin << "," << b->end << ")" << dendl;
+         // hrmpf, unblock it anyway.
+       }
+       ldout(cct, 10) << __func__ << " unblock backoff " << b->id
+                      << " [" << b->begin << "," << b->end
+                      << ")" << dendl;
+       s->backoffs.erase(b->begin);
+       p = s->backoffs_by_id.erase(p);
+
+       // check for any ops to resend
+       for (auto& q : s->ops) {
+         int r = q.second->target.contained_by(m->begin, m->end);
+         ldout(cct, 20) << __func__ <<  " contained_by " << r << " on "
+                        << q.second->target.get_hobj() << dendl;
+         if (r) {
+           _send_op(q.second);
+         }
+       }
+      }
+    }
+    break;
+
+  default:
+    ldout(cct, 10) << __func__ << " unrecognized op " << (int)m->op << dendl;
+  }
+
+  sul.unlock();
+  sl.unlock();
+
+  m->put();
+  put_session(s);
+}
 
 uint32_t Objecter::list_nobjects_seek(NListContext *list_context,
                                      uint32_t pos)
index 0f34458e7cc181e30c3d46479d5319fd625fa72d..8682a61d7ad41a6489fdb57a2346178b5b5cbf77 100644 (file)
@@ -1225,6 +1225,21 @@ public:
 
     op_target_t() = default;
 
+    hobject_t get_hobj() {
+      return hobject_t(target_oid,
+                      target_oloc.key,
+                      CEPH_NOSNAP,
+                      target_oloc.hash >= 0 ? target_oloc.hash : pgid.ps(),
+                      target_oloc.pool,
+                      target_oloc.nspace);
+    }
+
+    bool contained_by(const hobject_t& begin, const hobject_t& end) {
+      hobject_t h = get_hobj();
+      int r = cmp_bitwise(h, begin);
+      return r == 0 || (r > 0 && cmp_bitwise(h, end) < 0);
+    }
+
     void dump(Formatter *f) const;
   };
 
@@ -1774,6 +1789,11 @@ public:
   };
 
   // -- osd sessions --
+  struct OSDBackoff {
+    uint64_t id;
+    hobject_t begin, end;
+  };
+
   struct OSDSession : public RefCountedObject {
     boost::shared_mutex lock;
     using lock_guard = std::lock_guard<decltype(lock)>;
@@ -1786,6 +1806,10 @@ public:
     map<uint64_t, LingerOp*> linger_ops;
     map<ceph_tid_t,CommandOp*> command_ops;
 
+    // backoffs
+    map<hobject_t,OSDBackoff,hobject_t::BitwiseComparator> backoffs;
+    multimap<uint64_t,OSDBackoff*> backoffs_by_id;
+
     int osd;
     int incarnation;
     ConnectionRef con;
@@ -2074,6 +2098,7 @@ private:
   }
 
   void handle_osd_op_reply(class MOSDOpReply *m);
+  void handle_osd_backoff(class MOSDBackoff *m);
   void handle_watch_notify(class MWatchNotify *m);
   void handle_osd_map(class MOSDMap *m);
   void wait_for_osd_map();