]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
Fixes : #12018
authorxinxin shu <xinxin.shu@intel.com>
Thu, 18 Jun 2015 01:44:47 +0000 (09:44 +0800)
committerxinxin shu <xinxin.shu@intel.com>
Mon, 13 Jul 2015 20:43:55 +0000 (04:43 +0800)
resend writes after pool loses full flag

Signed-off-by: xinxin shu <xinxin.shu@intel.com>
src/osdc/Objecter.cc
src/osdc/Objecter.h

index f82f6c7d064ae2da60e95b3e5b1f9902d4b3b11c..d20c4fb8ff5f55ab52492ef9747de8a3e0eaa2ee 100644 (file)
@@ -884,7 +884,8 @@ bool Objecter::ms_dispatch(Message *m)
 
 void Objecter::_scan_requests(OSDSession *s,
                              bool force_resend,
-                            bool force_resend_writes,
+                            bool cluster_full,
+                             map<int64_t, bool> *pool_full_map,
                             map<ceph_tid_t, Op*>& need_resend,
                             list<LingerOp*>& need_resend_linger,
                             map<ceph_tid_t, CommandOp*>& need_resend_command)
@@ -904,8 +905,10 @@ void Objecter::_scan_requests(OSDSession *s,
     assert(op->session == s);
     ++lp;   // check_linger_pool_dne() may touch linger_ops; prevent iterator invalidation
     ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
-    bool unregister;
+    bool unregister, force_resend_writes = cluster_full;
     int r = _recalc_linger_op_target(op, lc);
+    if (pool_full_map)
+      force_resend_writes = force_resend_writes || (*pool_full_map)[op->target.base_oloc.pool];
     switch (r) {
     case RECALC_OP_TARGET_NO_ACTION:
       if (!force_resend && !force_resend_writes)
@@ -933,6 +936,9 @@ void Objecter::_scan_requests(OSDSession *s,
     Op *op = p->second;
     ++p;   // check_op_pool_dne() may touch ops; prevent iterator invalidation
     ldout(cct, 10) << " checking op " << op->tid << dendl;
+    bool force_resend_writes = cluster_full;
+    if (pool_full_map)
+      force_resend_writes = force_resend_writes || (*pool_full_map)[op->target.base_oloc.pool];
     int r = _calc_target(&op->target, &op->last_force_resend);
     switch (r) {
     case RECALC_OP_TARGET_NO_ACTION:
@@ -959,6 +965,9 @@ void Objecter::_scan_requests(OSDSession *s,
     CommandOp *c = cp->second;
     ++cp;
     ldout(cct, 10) << " checking command " << c->tid << dendl;
+    bool force_resend_writes = cluster_full;
+    if (pool_full_map)
+      force_resend_writes = force_resend_writes || (*pool_full_map)[c->target_pg.pool()];
     int r = _calc_command_target(c);
     switch (r) {
     case RECALC_OP_TARGET_NO_ACTION:
@@ -1006,9 +1015,14 @@ void Objecter::handle_osd_map(MOSDMap *m)
   }
 
   bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
-  bool was_full = _osdmap_full_flag();
-  bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || was_full;
+  bool cluster_full = _osdmap_full_flag();
+  bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || cluster_full || _osdmap_has_pool_full();
+  map<int64_t, bool> pool_full_map;
+  for (map<int64_t, pg_pool_t>::const_iterator it = osdmap->get_pools().begin();
+       it != osdmap->get_pools().end(); it++)
+    pool_full_map[it->first] = it->second.has_flag(pg_pool_t::FLAG_FULL);
 
+  
   list<LingerOp*> need_resend_linger;
   map<ceph_tid_t, Op*> need_resend;
   map<ceph_tid_t, CommandOp*> need_resend_command;
@@ -1059,18 +1073,19 @@ void Objecter::handle_osd_map(MOSDMap *m)
        }
        logger->set(l_osdc_map_epoch, osdmap->get_epoch());
 
-       was_full = was_full || _osdmap_full_flag();
-       _scan_requests(homeless_session, skipped_map, was_full,
-                      need_resend, need_resend_linger,
-                      need_resend_command);
+       cluster_full = cluster_full || _osdmap_full_flag();
+        update_pool_full_map(pool_full_map);
+       _scan_requests(homeless_session, skipped_map, cluster_full,
+                       &pool_full_map, need_resend,
+                       need_resend_linger, need_resend_command);
 
        // osd addr changes?
        for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
             p != osd_sessions.end(); ) {
          OSDSession *s = p->second;
-         _scan_requests(s, skipped_map, was_full,
-                        need_resend, need_resend_linger,
-                        need_resend_command);
+         _scan_requests(s, skipped_map, cluster_full,
+                        &pool_full_map, need_resend,
+                         need_resend_linger, need_resend_command);
          ++p;
          if (!osdmap->is_up(s->osd) ||
              (s->con &&
@@ -1088,14 +1103,14 @@ void Objecter::handle_osd_map(MOSDMap *m)
         for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
             p != osd_sessions.end(); ++p) {
          OSDSession *s = p->second;
-         _scan_requests(s, false, false, need_resend, need_resend_linger,
-                        need_resend_command);
+         _scan_requests(s, false, false, NULL, need_resend,
+                         need_resend_linger, need_resend_command);
         }
        ldout(cct, 3) << "handle_osd_map decoding full epoch "
                      << m->get_last() << dendl;
        osdmap->decode(m->maps[m->get_last()]);
 
-       _scan_requests(homeless_session, false, false,
+       _scan_requests(homeless_session, false, false, NULL,
                       need_resend, need_resend_linger,
                       need_resend_command);
       } else {
@@ -1108,7 +1123,7 @@ void Objecter::handle_osd_map(MOSDMap *m)
   }
 
   bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
-  bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag();
+  bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag() || _osdmap_has_pool_full();
 
   // was/is paused?
   if (was_pauserd || was_pausewr || pauserd || pausewr || osdmap->get_epoch() < epoch_barrier) {
@@ -2162,7 +2177,8 @@ ceph_tid_t Objecter::_op_submit(Op *op, RWLock::Context& lc)
     ldout(cct, 10) << " paused read " << op << " tid " << last_tid.read() << dendl;
     op->target.paused = true;
     _maybe_request_map();
-  } else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) && _osdmap_full_flag()) {
+  } else if ((op->target.flags & CEPH_OSD_FLAG_WRITE) &&
+             (_osdmap_full_flag() || _osdmap_pool_full(op->target.base_oloc.pool))) {
     ldout(cct, 0) << " FULL, paused modify " << op << " tid " << last_tid.read() << dendl;
     op->target.paused = true;
     _maybe_request_map();
@@ -2353,8 +2369,9 @@ bool Objecter::is_pg_changed(
 
 bool Objecter::target_should_be_paused(op_target_t *t)
 {
+  const pg_pool_t *pi = osdmap->get_pg_pool(t->base_oloc.pool);
   bool pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
-  bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag();
+  bool pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR) || _osdmap_full_flag() || pi->has_flag(pg_pool_t::FLAG_FULL);
 
   return (t->flags & CEPH_OSD_FLAG_READ && pauserd) ||
          (t->flags & CEPH_OSD_FLAG_WRITE && pausewr) ||
@@ -2379,6 +2396,11 @@ bool Objecter::osdmap_pool_full(const int64_t pool_id) const
     return true;
   }
 
+  return _osdmap_pool_full(pool_id);
+}
+
+bool Objecter::_osdmap_pool_full(const int64_t pool_id) const
+{
   const pg_pool_t *pool = osdmap->get_pg_pool(pool_id);
   if (pool == NULL) {
     ldout(cct, 4) << __func__ << ": DNE pool " << pool_id << dendl;
@@ -2388,6 +2410,16 @@ bool Objecter::osdmap_pool_full(const int64_t pool_id) const
   return pool->has_flag(pg_pool_t::FLAG_FULL);
 }
 
+bool Objecter::_osdmap_has_pool_full() const
+{
+  for (map<int64_t, pg_pool_t>::const_iterator it = osdmap->get_pools().begin();
+       it != osdmap->get_pools().end(); it++) {
+    if (it->second.has_flag(pg_pool_t::FLAG_FULL))
+      return true;
+  }
+  return false;
+}
+
 /**
  * Wrapper around osdmap->test_flag for special handling of the FULL flag.
  */
@@ -2397,6 +2429,17 @@ bool Objecter::_osdmap_full_flag() const
   return osdmap->test_flag(CEPH_OSDMAP_FULL) && honor_osdmap_full;
 }
 
+void Objecter::update_pool_full_map(map<int64_t, bool>& pool_full_map)
+{
+  for (map<int64_t, pg_pool_t>::const_iterator it = osdmap->get_pools().begin();
+       it != osdmap->get_pools().end(); it++) {
+    if (pool_full_map.find(it->first) == pool_full_map.end()) {
+      pool_full_map[it->first] = it->second.has_flag(pg_pool_t::FLAG_FULL);
+    } else {
+      pool_full_map[it->first] = it->second.has_flag(pg_pool_t::FLAG_FULL) || pool_full_map[it->first];
+    }
+  }
+}
 
 int64_t Objecter::get_object_hash_position(int64_t pool, const string& key,
                                           const string& ns)
index 76cadf68274443ae47d73fa8ab4229dd46e8ec55..2c98b6b23cc3a1cbca25d490b3eb2cb7d8ae69ba 100644 (file)
@@ -1703,6 +1703,8 @@ public:
    *         the global full flag is set, else false
    */
   bool osdmap_pool_full(const int64_t pool_id) const;
+  bool _osdmap_pool_full(const int64_t pool_id) const;
+  void update_pool_full_map(map<int64_t, bool>& pool_full_map);
 
  private:
   map<uint64_t, LingerOp*>  linger_ops;
@@ -1749,6 +1751,7 @@ public:
     RECALC_OP_TARGET_OSD_DOWN,
   };
   bool _osdmap_full_flag() const;
+  bool _osdmap_has_pool_full() const;
 
   bool target_should_be_paused(op_target_t *op);
   int _calc_target(op_target_t *t, epoch_t *last_force_resend=0, bool any_change=false);
@@ -1912,7 +1915,8 @@ private:
 
   void _scan_requests(OSDSession *s,
                      bool force_resend,
-                    bool force_resend_writes,
+                    bool cluster_full,
+                     map<int64_t, bool> *pool_full_map,
                     map<ceph_tid_t, Op*>& need_resend,
                     list<LingerOp*>& need_resend_linger,
                     map<ceph_tid_t, CommandOp*>& need_resend_command);