]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
os/filestore: add merge_delete
authorManali Kulkarni <Manali.Kulkarni@sandisk.com>
Tue, 27 Sep 2016 14:23:17 +0000 (10:23 -0400)
committerSage Weil <sage@redhat.com>
Tue, 27 Sep 2016 15:56:44 +0000 (11:56 -0400)
Signed-off-by: Manali Kulkarni <Manali.Kulkarni@sandisk.com>
src/os/filestore/FileStore.cc
src/os/filestore/FileStore.h

index 9191bb70f4ba055dc4b2f7c60ba7ef6cbaedd21f..279caa41d5247212a36ed0a6f8387b09849fa91f 100644 (file)
@@ -14,6 +14,7 @@
  */
 #include "include/compat.h"
 #include "include/int_types.h"
+#include "boost/tuple/tuple.hpp"
 
 #include <unistd.h>
 #include <stdlib.h>
@@ -2698,6 +2699,22 @@ void FileStore::_do_transaction(
       }
       break;
 
+    case Transaction::OP_MERGE_DELETE:
+      {
+        ghobject_t src_oid = i.get_oid(op->oid);
+        coll_t cid = i.get_cid(op->cid);
+        ghobject_t oid = i.get_oid(op->dest_oid);
+        coll_t src_cid = i.get_cid(op->cid);
+        _kludge_temp_object_collection(cid, oid);
+        _kludge_temp_object_collection(src_cid, src_oid);
+        vector<boost::tuple<uint64_t, uint64_t, uint64_t>> move_info;
+        i.decode_move_info(move_info);
+        tracepoint(objectstore, move_ranges_destroy_src_enter, osr_name);
+        r = _move_ranges_destroy_src(src_cid, src_oid, cid, oid, move_info, spos);
+        tracepoint(objectstore, move_ranges_destroy_src_exit, r);
+      }
+      break;
+
     case Transaction::OP_MKCOLL:
       {
         coll_t cid = i.get_cid(op->cid);
@@ -3748,6 +3765,84 @@ int FileStore::_clone_range(const coll_t& oldcid, const ghobject_t& oldoid, cons
   return r;
 }
 
+/*
+ * Move contents of src object according to move_info to base object. Once the move_info is traversed completely, delete the src object.
+ */
+int FileStore::_move_ranges_destroy_src(const coll_t& src_cid, const ghobject_t& src_oid, const coll_t& cid, const ghobject_t& oid,
+                              const vector<boost::tuple<uint64_t, uint64_t, uint64_t>> move_info,
+                              const SequencerPosition& spos)
+{
+  int r = 0;
+
+  dout(10) << __func__ << src_cid << "/" << src_oid << " -> " << cid << "/" << oid << dendl;
+
+  // check replay guard for base object. If not possible to replay, return.
+  int dstcmp = _check_replay_guard(cid, oid, spos);
+  if (dstcmp < 0)
+    return 0;
+
+  // check the src name too; it might have a newer guard, and we don't
+  // want to clobber it
+  int srccmp = _check_replay_guard(src_cid, src_oid, spos);
+  if (srccmp < 0)
+    return 0;
+
+  FDRef b;
+  r = lfn_open(cid, oid, true, &b);
+  if (r < 0) {
+    return 0;
+  }
+
+  FDRef t;
+  r = lfn_open(src_cid, src_oid, false, &t);
+  //If we are replaying, it is possible that we do not find src obj as it is deleted before crashing.
+  if (r < 0) {
+    lfn_close(b);
+    dout(10) << __func__ << " replaying -->" << replaying  << dendl;
+    if (replaying) {
+      _set_replay_guard(**b, spos, &oid);
+      return 0;
+    } else {
+      return -ENOENT;
+    }
+  }
+
+  for (unsigned i = 0; i < move_info.size(); ++i) {
+     uint64_t srcoff = move_info[i].get<0>();
+     uint64_t dstoff = move_info[i].get<1>();
+     uint64_t len = move_info[i].get<2>();
+
+     r = _do_clone_range(**t, **b, srcoff, len, dstoff);
+  }
+
+  dout(10) << __func__  << cid << "/" << oid << " "  <<  " = " << r << dendl;
+
+  lfn_close(t);
+
+  //In case crash occurs here, replay will have to do cloning again.
+  //Only if do_clone_range is successful, go ahead with deleting the source object.
+  if (r < 0)
+    goto out;
+
+  r = lfn_unlink(src_cid, src_oid, spos, true);
+  // If crash occurs between unlink and set guard, correct the error.
+  // as during next time, it might not find the already deleted object.
+  if (r < 0 && replaying) {
+    r = 0;
+  }
+
+  if (r < 0)
+    goto out;
+
+  //set replay guard for base obj coll_t, as this api is not idempotent.
+  _set_replay_guard(**b, spos, &oid);
+
+out:
+  lfn_close(b);
+  dout(10) << __func__  << cid << "/" << oid << " "  <<  " = " << r << dendl;
+  return r;
+}
+
 class SyncEntryTimeout : public Context {
 public:
   explicit SyncEntryTimeout(int commit_timeo)
index 60718115957fe24ab562227c903e05757aadc6b4..cb83bfbe758fb4194139471fe78286d2098944c0 100644 (file)
@@ -560,6 +560,9 @@ public:
   int _clone_range(const coll_t& oldcid, const ghobject_t& oldoid, const coll_t& newcid, const ghobject_t& newoid,
                   uint64_t srcoff, uint64_t len, uint64_t dstoff,
                   const SequencerPosition& spos);
+  int _move_ranges_destroy_src(const coll_t& temp_cid, const ghobject_t& temp_oid, const coll_t& cid, const ghobject_t& oid,
+                     const vector<boost::tuple<uint64_t, uint64_t, uint64_t> > move_info,
+                     const SequencerPosition& spos);
   int _do_clone_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff);
   int _do_sparse_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff);
   int _do_copy_range(int from, int to, uint64_t srcoff, uint64_t len, uint64_t dstoff, bool skip_sloppycrc=false);