]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
os/FileStore: implement collection_move_rename
authorSage Weil <sage@inktank.com>
Tue, 3 Sep 2013 22:13:40 +0000 (15:13 -0700)
committerSage Weil <sage@inktank.com>
Wed, 11 Sep 2013 22:19:18 +0000 (15:19 -0700)
This is similar to a collection_add + collection_move sequence in that we
apply the same replay guards.  The difference is that we roll it up into
a single operation, change the filename, and make the omap content carry
over by calling DBObjectMap->clone (as there is no rename function or
collection awareness in the DBObjectMap).

Signed-off-by: Sage Weil <sage@inktank.com>
src/os/FileStore.cc
src/os/FileStore.h
src/test/filestore/store_test.cc

index 8b27246dfdfc0b6ed18b39d629c8c76c0ec0c6c6..66581bcde38bc8ca28ec5a7ce1085572efaf401f 100644 (file)
@@ -299,7 +299,8 @@ int FileStore::lfn_link(coll_t c, coll_t newcid, const hobject_t& o, const hobje
 }
 
 int FileStore::lfn_unlink(coll_t cid, const hobject_t& o,
-                         const SequencerPosition &spos)
+                         const SequencerPosition &spos,
+                         bool force_clear_omap)
 {
   Index index;
   int r = get_index(cid, &index);
@@ -315,14 +316,17 @@ int FileStore::lfn_unlink(coll_t cid, const hobject_t& o,
       return r;
     }
 
-    struct stat st;
-    r = ::stat(path->path(), &st);
-    if (r < 0) {
-      r = -errno;
-      assert(!m_filestore_fail_eio || r != -EIO);
-      return r;
+    if (!force_clear_omap) {
+      struct stat st;
+      r = ::stat(path->path(), &st);
+      if (r < 0) {
+       r = -errno;
+       assert(!m_filestore_fail_eio || r != -EIO);
+       return r;
+      }
+      force_clear_omap = true;
     }
-    if (st.st_nlink == 1) {
+    if (force_clear_omap) {
       dout(20) << __func__ << ": clearing omap on " << o
               << " in cid " << cid << dendl;
       r = object_map->clear(o, &spos);
@@ -2176,6 +2180,16 @@ unsigned FileStore::_do_transaction(Transaction& t, uint64_t op_seq, int trans_n
       }
       break;
 
+    case Transaction::OP_COLL_MOVE_RENAME:
+      {
+       coll_t oldcid = i.get_cid();
+       hobject_t oldoid = i.get_oid();
+       coll_t newcid = i.get_cid();
+       hobject_t newoid = i.get_oid();
+       r = _collection_move_rename(oldcid, oldoid, newcid, newoid, spos);
+      }
+      break;
+
     case Transaction::OP_COLL_SETATTR:
       {
        coll_t cid = i.get_cid();
@@ -4086,7 +4100,7 @@ int FileStore::_destroy_collection(coll_t c)
 
 
 int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o,
-                              const SequencerPosition& spos) 
+                              const SequencerPosition& spos)
 {
   dout(15) << "collection_add " << c << "/" << o << " from " << oldcid << "/" << o << dendl;
   
@@ -4133,6 +4147,73 @@ int FileStore::_collection_add(coll_t c, coll_t oldcid, const hobject_t& o,
   return r;
 }
 
+int FileStore::_collection_move_rename(coll_t oldcid, const hobject_t& oldoid,
+                                      coll_t c, const hobject_t& o,
+                                      const SequencerPosition& spos)
+{
+  dout(15) << __func__ << " " << c << "/" << o << " from " << oldcid << "/" << oldoid << dendl;
+  int r;
+  int dstcmp, srccmp;
+
+  dstcmp = _check_replay_guard(c, o, spos);
+  if (dstcmp < 0)
+    goto out_rm_src;
+
+  // check the src name too; it might have a newer guard, and we don't
+  // want to clobber it
+  srccmp = _check_replay_guard(oldcid, oldoid, spos);
+  if (srccmp < 0)
+    return 0;
+
+  {
+    // open guard on object so we don't any previous operations on the
+    // new name that will modify the source inode.
+    FDRef fd;
+    r = lfn_open(oldcid, oldoid, 0, &fd);
+    if (r < 0) {
+      // the source collection/object does not exist. If we are replaying, we
+      // should be safe, so just return 0 and move on.
+      assert(replaying);
+      dout(10) << __func__ << " " << c << "/" << o << " from "
+              << oldcid << "/" << oldoid << " (dne, continue replay) " << dendl;
+      return 0;
+    }
+    if (dstcmp > 0) {      // if dstcmp == 0 the guard already says "in-progress"
+      _set_replay_guard(**fd, spos, &o, true);
+    }
+
+    r = lfn_link(oldcid, c, oldoid, o);
+    if (replaying && !backend->can_checkpoint() &&
+       r == -EEXIST)    // crashed between link() and set_replay_guard()
+      r = 0;
+
+    _inject_failure();
+
+    // the name changed; link the omap content
+    r = object_map->clone(oldoid, o, &spos);
+    if (r == -ENOENT)
+      r = 0;
+
+    _inject_failure();
+
+    // close guard on object so we don't do this again
+    if (r == 0) {
+      _close_replay_guard(**fd, spos);
+    }
+    lfn_close(fd);
+  }
+
+ out_rm_src:
+  // remove source
+  if (_check_replay_guard(oldcid, oldoid, spos) > 0) {
+    r = lfn_unlink(oldcid, oldoid, spos, true);
+  }
+
+  dout(10) << __func__ << " " << c << "/" << o << " from " << oldcid << "/" << oldoid
+          << " = " << r << dendl;
+  return r;
+}
+
 void FileStore::_inject_failure()
 {
   if (m_filestore_kill_at.read()) {
index 19178be4154c254f18cc935dbfae8c6885b2f73f..4f58df4d698dc25c65da3194f9466311ef714b15 100644 (file)
@@ -299,7 +299,8 @@ public:
     Index *index = 0);
   void lfn_close(FDRef fd);
   int lfn_link(coll_t c, coll_t newcid, const hobject_t& o, const hobject_t& newoid) ;
-  int lfn_unlink(coll_t cid, const hobject_t& o, const SequencerPosition &spos);
+  int lfn_unlink(coll_t cid, const hobject_t& o, const SequencerPosition &spos,
+                bool force_clear_omap=false);
 
 public:
   FileStore(const std::string &base, const std::string &jdev, const char *internal_name = "filestore", bool update_to=false);
@@ -499,6 +500,9 @@ public:
   int _destroy_collection(coll_t c);
   int _collection_add(coll_t c, coll_t ocid, const hobject_t& o,
                      const SequencerPosition& spos);
+  int _collection_move_rename(coll_t oldcid, const hobject_t& oldoid,
+                             coll_t c, const hobject_t& o,
+                             const SequencerPosition& spos);
   void dump_start(const std::string& file);
   void dump_stop();
   void dump_transactions(list<ObjectStore::Transaction*>& ls, uint64_t seq, OpSequencer *osr);
index 80c775052eca31423cd12c07b23226a5551eebcc..9210496012709320e26c6bbe06ef1a9761fc5a7e 100644 (file)
@@ -898,6 +898,65 @@ TEST_F(StoreTest, TwoHash) {
   ASSERT_EQ(r, 0);
 }
 
+TEST_F(StoreTest, MoveRename) {
+  coll_t temp_cid("mytemp");
+  hobject_t temp_oid("tmp_oid", "", CEPH_NOSNAP, 0, 0, "");
+  coll_t cid("dest");
+  hobject_t oid("dest_oid", "", CEPH_NOSNAP, 0, 0, "");
+  int r;
+  {
+    ObjectStore::Transaction t;
+    t.create_collection(cid);
+    t.touch(cid, oid);
+    r = store->apply_transaction(t);
+    ASSERT_EQ(r, 0);
+  }
+  ASSERT_TRUE(store->exists(cid, oid));
+  bufferlist data, attr;
+  map<string, bufferlist> omap;
+  data.append("data payload");
+  attr.append("attr value");
+  omap["omap_key"].append("omap value");
+  {
+    ObjectStore::Transaction t;
+    t.create_collection(temp_cid);
+    t.touch(temp_cid, temp_oid);
+    t.write(temp_cid, temp_oid, 0, data.length(), data);
+    t.setattr(temp_cid, temp_oid, "attr", attr);
+    t.omap_setkeys(temp_cid, temp_oid, omap);
+    r = store->apply_transaction(t);
+    ASSERT_EQ(r, 0);
+  }
+  ASSERT_TRUE(store->exists(temp_cid, temp_oid));
+  {
+    ObjectStore::Transaction t;
+    t.remove(cid, oid);
+    t.collection_move_rename(temp_cid, temp_oid, cid, oid);
+    r = store->apply_transaction(t);
+    ASSERT_EQ(r, 0);
+  }
+  ASSERT_TRUE(store->exists(cid, oid));
+  ASSERT_FALSE(store->exists(temp_cid, temp_oid));
+  {
+    bufferlist newdata;
+    r = store->read(cid, oid, 0, 1000, newdata);
+    ASSERT_GE(r, 0);
+    ASSERT_TRUE(newdata.contents_equal(data));
+    bufferlist newattr;
+    r = store->getattr(cid, oid, "attr", newattr);
+    ASSERT_GE(r, 0);
+    ASSERT_TRUE(newattr.contents_equal(attr));
+    set<string> keys;
+    keys.insert("omap_key");
+    map<string, bufferlist> newomap;
+    r = store->omap_get_values(cid, oid, keys, &newomap);
+    ASSERT_GE(r, 0);
+    ASSERT_EQ(1u, newomap.size());
+    ASSERT_TRUE(newomap.count("omap_key"));
+    ASSERT_TRUE(newomap["omap_key"].contents_equal(omap["omap_key"]));
+  }
+}
+
 //
 // support tests for qa/workunits/filestore/filestore.sh
 //