]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crush, mon: unlink vs remove
authorSage Weil <sage@inktank.com>
Fri, 22 Mar 2013 19:32:15 +0000 (12:32 -0700)
committerSage Weil <sage@inktank.com>
Fri, 22 Mar 2013 22:15:38 +0000 (15:15 -0700)
Make an 'unlink' mode of remove that will remove a link to a bucket but
not remove the bucket itself.  This refactors remove_item[_under] and moves
some of the checks into common helpers where they are not duplicated.  Fix
callers to pass the extra arg.

Signed-off-by: Sage Weil <sage@inktank.com>
src/crush/CrushWrapper.cc
src/crush/CrushWrapper.h
src/crushtool.cc
src/mon/OSDMonitor.cc
src/test/cli/ceph/help.t
src/tools/ceph.cc

index 5a4a9107597c3bcd6e32aa7028620a707deba8d3..a2aab0a8dfc7ee25b6c062d1d25a33e57d3f2ff1 100644 (file)
@@ -50,13 +50,47 @@ bool CrushWrapper::subtree_contains(int root, int item) const
   return false;
 }
 
+bool CrushWrapper::_maybe_remove_last_instance(CephContext *cct, int item, bool unlink_only)
+{
+  // last instance?
+  if (_search_item_exists(item)) {
+    if (name_map.count(item)) {
+      ldout(cct, 5) << "_maybe_remove_last_instance removing name for item " << item << dendl;
+      name_map.erase(item);
+      have_rmaps = false;
+      return true;
+    }
+    return false;
+  }
+
+  if (item < 0 && !unlink_only) {
+    crush_bucket *t = get_bucket(item);
+    ldout(cct, 5) << "_maybe_remove_last_instance removing bucket " << item << dendl;
+    crush_remove_bucket(crush, t);
+  }
+  if ((item >= 0 || !unlink_only) && name_map.count(item)) {
+    ldout(cct, 5) << "_maybe_remove_last_instance removing name for item " << item << dendl;
+    name_map.erase(item);
+    have_rmaps = false;
+  }
+  return true;
+}
 
-int CrushWrapper::remove_item(CephContext *cct, int item)
+int CrushWrapper::remove_item(CephContext *cct, int item, bool unlink_only)
 {
-  ldout(cct, 5) << "remove_item " << item << dendl;
+  ldout(cct, 5) << "remove_item " << item << (unlink_only ? " unlink_only":"") << dendl;
 
   int ret = -ENOENT;
 
+  if (item < 0 && !unlink_only) {
+    crush_bucket *t = get_bucket(item);
+    if (t && t->size) {
+      ldout(cct, 1) << "remove_item bucket " << item << " has " << t->size
+                   << " items, not empty" << dendl;
+      return -ENOTEMPTY;
+    }
+  }
+
   for (int i = 0; i < crush->max_buckets; i++) {
     if (!crush->buckets[i])
       continue;
@@ -65,14 +99,6 @@ int CrushWrapper::remove_item(CephContext *cct, int item)
     for (unsigned i=0; i<b->size; ++i) {
       int id = b->items[i];
       if (id == item) {
-       if (item < 0) {
-         crush_bucket *t = get_bucket(item);
-         if (t && t->size) {
-           ldout(cct, 1) << "remove_item bucket " << item << " has " << t->size
-                         << " items, not empty" << dendl;
-           return -ENOTEMPTY;
-         }
-       }
        adjust_item_weight(cct, item, 0);
        ldout(cct, 5) << "remove_item removing item " << item
                      << " from bucket " << b->id << dendl;
@@ -82,19 +108,8 @@ int CrushWrapper::remove_item(CephContext *cct, int item)
     }
   }
 
-  // last instance?
-  if (!_search_item_exists(item)) {
-    if (item < 0) {
-      ldout(cct, 5) << "remove_item removing bucket " << item << dendl;
-      crush_bucket *t = get_bucket(item);
-      crush_remove_bucket(crush, t);
-      ret = 0;
-    }
-    if (name_map.count(item)) {
-      name_map.erase(item);
-      have_rmaps = false;
-    }
-  }
+  if (_maybe_remove_last_instance(cct, item, unlink_only))
+    ret = 0;
   
   return ret;
 }
@@ -113,9 +128,10 @@ bool CrushWrapper::_search_item_exists(int item) const
   return false;
 }
 
-int CrushWrapper::_remove_item_under(CephContext *cct, int item, int ancestor)
+int CrushWrapper::_remove_item_under(CephContext *cct, int item, int ancestor, bool unlink_only)
 {
-  ldout(cct, 5) << "remove_item_under " << item << " under " << ancestor << dendl;
+  ldout(cct, 5) << "_remove_item_under " << item << " under " << ancestor
+               << (unlink_only ? " unlink_only":"") << dendl;
 
   if (ancestor >= 0) {
     return -EINVAL;
@@ -130,19 +146,12 @@ int CrushWrapper::_remove_item_under(CephContext *cct, int item, int ancestor)
   for (unsigned i=0; i<b->size; ++i) {
     int id = b->items[i];
     if (id == item) {
-      if (item < 0) {
-       crush_bucket *t = get_bucket(item);
-       if (t && t->size) {
-         ldout(cct, 1) << "remove_item_under bucket " << item << " has " << t->size << " items, not empty" << dendl;
-         return -ENOTEMPTY;
-       }
-      }
       adjust_item_weight(cct, item, 0);
-      ldout(cct, 5) << "remove_item_under removing item " << item << " from bucket " << b->id << dendl;
+      ldout(cct, 5) << "_remove_item_under removing item " << item << " from bucket " << b->id << dendl;
       crush_bucket_remove_item(b, item);
       ret = 0;
     } else if (id < 0) {
-      int r = remove_item_under(cct, item, id);
+      int r = remove_item_under(cct, item, id, unlink_only);
       if (r == 0)
        ret = 0;
     }
@@ -150,25 +159,26 @@ int CrushWrapper::_remove_item_under(CephContext *cct, int item, int ancestor)
   return ret;
 }
 
-int CrushWrapper::remove_item_under(CephContext *cct, int item, int ancestor)
+int CrushWrapper::remove_item_under(CephContext *cct, int item, int ancestor, bool unlink_only)
 {
-  int ret = _remove_item_under(cct, item, ancestor);
+  ldout(cct, 5) << "remove_item_under " << item << " under " << ancestor
+               << (unlink_only ? " unlink_only":"") << dendl;
+  int ret = _remove_item_under(cct, item, ancestor, unlink_only);
   if (ret < 0)
     return ret;
 
-  // last instance?
-  if (!_search_item_exists(item)) {
-    if (item < 0) {
-      ldout(cct, 5) << "remove_item removing bucket " << item << dendl;
-      crush_bucket *t = get_bucket(item);
-      crush_remove_bucket(crush, t);
-    }
-    if (name_map.count(item)) {
-      name_map.erase(item);
-      have_rmaps = false;
+  if (item < 0 && !unlink_only) {
+    crush_bucket *t = get_bucket(item);
+    if (t && t->size) {
+      ldout(cct, 1) << "remove_item_undef bucket " << item << " has " << t->size
+                   << " items, not empty" << dendl;
+      return -ENOTEMPTY;
     }
   }
 
+  if (_maybe_remove_last_instance(cct, item, unlink_only))
+    ret = 0;
+
   return ret;
 }
 
@@ -475,7 +485,7 @@ int CrushWrapper::create_or_move_item(CephContext *cct, int item, float weight,
     if (item_exists(item)) {
       weight = get_item_weightf(item);
       ldout(cct, 10) << "create_or_move_item " << item << " exists with weight " << weight << dendl;
-      remove_item(cct, item);
+      remove_item(cct, item, true);
     }
     ldout(cct, 5) << "create_or_move_item adding " << item << " weight " << weight
                  << " at " << loc << dendl;
@@ -511,7 +521,7 @@ int CrushWrapper::update_item(CephContext *cct, int item, float weight, string n
     }
   } else {
     if (item_exists(item)) {
-      remove_item(cct, item);
+      remove_item(cct, item, true);
     }
     ldout(cct, 5) << "update_item adding " << item << " weight " << weight
                  << " at " << loc << dendl;
index 1cbdf1192a76ddc173f8b99c59bc20161ef6c219..9109156d066d3ae8ced697964203ee0826b6e0af 100644 (file)
@@ -421,9 +421,10 @@ public:
    *
    * @param cct cct
    * @param id item id to remove
+   * @param unlink_only unlink but do not remove bucket (useful if multiple links or not empty)
    * @return 0 on success, negative on error
    */
-  int remove_item(CephContext *cct, int id);
+  int remove_item(CephContext *cct, int id, bool unlink_only);
 
   /**
    * remove all instances of an item nested beneath a certain point from the map
@@ -431,12 +432,14 @@ public:
    * @param cct cct
    * @param id item id to remove
    * @param ancestor ancestor item id under which to search for id
+   * @param unlink_only unlink but do not remove bucket (useful if bucket has multiple links or is not empty)
    * @return 0 on success, negative on error
    */
 private:
-  int _remove_item_under(CephContext *cct, int id, int ancestor);
+  bool _maybe_remove_last_instance(CephContext *cct, int id, bool unlink_only);
+  int _remove_item_under(CephContext *cct, int id, int ancestor, bool unlink_only);
 public:
-  int remove_item_under(CephContext *cct, int id, int ancestor);
+  int remove_item_under(CephContext *cct, int id, int ancestor, bool unlink_only);
 
   /**
    * get an item's weight
index 750d79f097294bea6f60937187695ef2b7764257..054c4df86988f5e35da85d7f1a50145981668034 100644 (file)
@@ -632,7 +632,7 @@ int main(int argc, const char **argv)
       r = -ENOENT;
     } else {
       int remove_item = crush.get_item_id(remove_name.c_str());
-      r = crush.remove_item(g_ceph_context, remove_item);
+      r = crush.remove_item(g_ceph_context, remove_item, false);
     }
     if (r == 0)
       modified = true;
index 41df0d8b36ed64dca7a7bc5cc1bd05075adb9162..b8a82cc3d8d19a1bdb99e006542909b357b27b20 100644 (file)
@@ -2485,7 +2485,9 @@ bool OSDMonitor::prepare_command(MMonCommand *m)
        }
       } while (false);
     }
-    else if (m->cmd.size() > 3 && m->cmd[1] == "crush" && (m->cmd[2] == "rm" || m->cmd[2] == "remove")) {
+    else if (m->cmd.size() > 3 && m->cmd[1] == "crush" && (m->cmd[2] == "rm" ||
+                                                          m->cmd[2] == "remove" ||
+                                                          m->cmd[2] == "unlink")) {
       do {
        // osd crush rm <id> [ancestor]
        bufferlist bl;
@@ -2504,6 +2506,7 @@ bool OSDMonitor::prepare_command(MMonCommand *m)
          break;
        }
        int id = newcrush.get_item_id(m->cmd[3].c_str());
+       bool unlink_only = m->cmd[2] == "unlink";
        if (m->cmd.size() > 4) {
          if (!newcrush.name_exists(m->cmd[4])) {
            err = -ENOENT;
@@ -2511,9 +2514,9 @@ bool OSDMonitor::prepare_command(MMonCommand *m)
            break;
          }
          int ancestor = newcrush.get_item_id(m->cmd[4]);
-         err = newcrush.remove_item_under(g_ceph_context, id, ancestor);
+         err = newcrush.remove_item_under(g_ceph_context, id, ancestor, unlink_only);
        } else {
-         err = newcrush.remove_item(g_ceph_context, id);
+         err = newcrush.remove_item(g_ceph_context, id, unlink_only);
        }
        if (err == 0) {
          pending_inc.crush.clear();
index c3c0a984beed6b9d19be48b60384619f1fbb07c6..6370c036791d8f21cc9afcf890a61c72a84b4623 100644 (file)
     ceph osd crush set <osd-id> <weight> <loc1> [<loc2> ...]
     ceph osd crush add <osd-id> <weight> <loc1> [<loc2> ...]
     ceph osd crush create-or-move <osd-id> <initial-weight> <loc1> [<loc2> ...]
+    ceph osd crush rm <name> [ancestor]
     ceph osd crush move <bucketname> <loc1> [<loc2> ...]
     ceph osd crush link <bucketname> <loc1> [<loc2> ...]
+    ceph osd crush unlink <bucketname> [ancestor]
     ceph osd crush add-bucket <bucketname> <type>
     ceph osd crush reweight <name> <weight>
     ceph osd crush tunables <legacy|argonaut|bobtail|optimal|default>
index 3a7aa068a49b67441f6ae7b7f4a3bfca2a999c1a..cb51a8dad5aada74423b10c188d3b557ba1add9d 100644 (file)
@@ -90,8 +90,10 @@ static void usage()
   cout << "  ceph osd crush set <osd-id> <weight> <loc1> [<loc2> ...]\n";
   cout << "  ceph osd crush add <osd-id> <weight> <loc1> [<loc2> ...]\n";
   cout << "  ceph osd crush create-or-move <osd-id> <initial-weight> <loc1> [<loc2> ...]\n";
+  cout << "  ceph osd crush rm <name> [ancestor]\n";
   cout << "  ceph osd crush move <bucketname> <loc1> [<loc2> ...]\n";
   cout << "  ceph osd crush link <bucketname> <loc1> [<loc2> ...]\n";
+  cout << "  ceph osd crush unlink <bucketname> [ancestor]\n";
   cout << "  ceph osd crush add-bucket <bucketname> <type>\n";
   cout << "  ceph osd crush reweight <name> <weight>\n";
   cout << "  ceph osd crush tunables <legacy|argonaut|bobtail|optimal|default>\n";