]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librgw: make rgw file handle versioned
authorXuehan Xu <xuxuehan@qianxin.com>
Sat, 2 Jan 2021 14:50:23 +0000 (22:50 +0800)
committerCory Snyder <csnyder@iland.com>
Thu, 17 Mar 2022 13:33:35 +0000 (09:33 -0400)
The reason that we need this is that there could be the following scenario:

1. rgw_setattr sets the file attr;
2. rgw_write writes some new data, and encodes its attr to store into rados;
3. before the actual persistence of the file's attr bl, rgw_lookup loads the file's
   previous attr and modifies the current file handle's metadata;
4. rgw_write's result persisted to rados;
5. rgw_setattr set the current file handle's metadata which is actually an old one to rados

In this case, the attr in rados would be out of date which means loss of data

Fixes: https://tracker.ceph.com/issues/50194
Signed-off-by: Xuehan Xu <xuxuehan@qianxin.com>
(cherry picked from commit 49a35d72e0982c03781d4845c800332bded1c658)

src/rgw/rgw_file.cc
src/rgw/rgw_file.h

index 8fe95b964a60ebd2c995c6a99f9834a61d442183..6fa4e817155ccfbf872f2077ed41aa2f2c7fd9ad 100644 (file)
@@ -754,6 +754,7 @@ namespace rgw {
         rgw_fh->set_acls(*(req.get_attr(RGW_ATTR_ACL))); 
 
        get<0>(mkr) = rgw_fh;
+       rgw_fh->file_ondisk_version = 0; // inital version
        rgw_fh->mtx.unlock();
       } else
        rc = -EIO;
@@ -1198,7 +1199,7 @@ namespace rgw {
 
     RGWSetAttrsRequest req(cct, user->clone(), rgw_fh->bucket_name(), obj_name);
 
-    rgw_fh->encode_attrs(ux_key, ux_attrs);
+    rgw_fh->encode_attrs(ux_key, ux_attrs, false);
 
     req.emplace_attr(RGW_ATTR_UNIX_KEY1, std::move(ux_key));
     req.emplace_attr(RGW_ATTR_UNIX1, std::move(ux_attrs));
@@ -1432,12 +1433,23 @@ namespace rgw {
   }
 
   void RGWFileHandle::encode_attrs(ceph::buffer::list& ux_key1,
-                                  ceph::buffer::list& ux_attrs1)
+                                  ceph::buffer::list& ux_attrs1,
+                                  bool inc_ov)
   {
     using ceph::encode;
     fh_key fhk(this->fh.fh_hk);
     encode(fhk, ux_key1);
+    bool need_ondisk_version =
+      (fh.fh_type == RGW_FS_TYPE_FILE ||
+       fh.fh_type == RGW_FS_TYPE_SYMBOLIC_LINK);
+    if (need_ondisk_version &&
+       file_ondisk_version < 0) {
+      file_ondisk_version = 0;
+    }
     encode(*this, ux_attrs1);
+    if (need_ondisk_version && inc_ov) {
+      file_ondisk_version++;
+    }
   } /* RGWFileHandle::encode_attrs */
 
   DecodeAttrsResult RGWFileHandle::decode_attrs(const ceph::buffer::list* ux_key1,
@@ -1450,8 +1462,35 @@ namespace rgw {
     decode(fhk, bl_iter_key1);
     get<0>(dar) = true;
 
+    // decode to a temporary file handle which may not be
+    // copied to the current file handle if its file_ondisk_version
+    // is not newer
+    RGWFileHandle tmp_fh(fs);
+    tmp_fh.fh.fh_type = fh.fh_type;
     auto bl_iter_unix1 = ux_attrs1->cbegin();
-    decode(*this, bl_iter_unix1);
+    decode(tmp_fh, bl_iter_unix1);
+
+    fh.fh_type = tmp_fh.fh.fh_type;
+    // for file handles that represent files and whose file_ondisk_version
+    // is newer, no updates are need, otherwise, go updating the current
+    // file handle
+    if (!((fh.fh_type == RGW_FS_TYPE_FILE ||
+           fh.fh_type == RGW_FS_TYPE_SYMBOLIC_LINK) &&
+         file_ondisk_version >= tmp_fh.file_ondisk_version)) {
+      // make sure the following "encode" always encode a greater version
+      file_ondisk_version = tmp_fh.file_ondisk_version + 1;
+      state.dev = tmp_fh.state.dev;
+      state.size = tmp_fh.state.size;
+      state.nlink = tmp_fh.state.nlink;
+      state.owner_uid = tmp_fh.state.owner_uid;
+      state.owner_gid = tmp_fh.state.owner_gid;
+      state.unix_mode = tmp_fh.state.unix_mode;
+      state.ctime = tmp_fh.state.ctime;
+      state.mtime = tmp_fh.state.mtime;
+      state.atime = tmp_fh.state.atime;
+      state.version = tmp_fh.state.version;
+    }
+
     if (this->state.version < 2) {
       get<1>(dar) = true;
     }
index 6e909f3ee172faea01d07ac242892c1c9d121bbd..e076054ebf95c7db2e6136c2725d1738250441b5 100644 (file)
@@ -192,6 +192,7 @@ namespace rgw {
     RGWLibFS* fs;
     RGWFileHandle* bucket;
     RGWFileHandle* parent;
+    std::atomic_int64_t file_ondisk_version; // version of unix attrs, file only
     /* const */ std::string name; /* XXX file or bucket name */
     /* const */ fh_key fhk;
 
@@ -280,8 +281,8 @@ namespace rgw {
 
   private:
     explicit RGWFileHandle(RGWLibFS* _fs)
-      : fs(_fs), bucket(nullptr), parent(nullptr), variant_type{directory()},
-       depth(0), flags(FLAG_NONE)
+      : fs(_fs), bucket(nullptr), parent(nullptr), file_ondisk_version(-1),
+       variant_type{directory()}, depth(0), flags(FLAG_NONE)
       {
         fh.fh_hk.bucket = 0;
         fh.fh_hk.object = 0;
@@ -321,8 +322,8 @@ namespace rgw {
   public:
     RGWFileHandle(RGWLibFS* _fs, RGWFileHandle* _parent,
                  const fh_key& _fhk, std::string& _name, uint32_t _flags)
-      : fs(_fs), bucket(nullptr), parent(_parent), name(std::move(_name)),
-       fhk(_fhk), flags(_flags) {
+      : fs(_fs), bucket(nullptr), parent(_parent), file_ondisk_version(-1),
+       name(std::move(_name)), fhk(_fhk), flags(_flags) {
 
       if (parent->is_root()) {
        fh.fh_type = RGW_FS_TYPE_DIRECTORY;
@@ -686,7 +687,7 @@ namespace rgw {
     }
 
     void encode(buffer::list& bl) const {
-      ENCODE_START(2, 1, bl);
+      ENCODE_START(3, 1, bl);
       encode(uint32_t(fh.fh_type), bl);
       encode(state.dev, bl);
       encode(state.size, bl);
@@ -698,11 +699,15 @@ namespace rgw {
        encode(real_clock::from_timespec(t), bl);
       }
       encode((uint32_t)2, bl);
+      encode(file_ondisk_version.load(), bl);
       ENCODE_FINISH(bl);
     }
 
+    //XXX: RGWFileHandle::decode method can only be called from
+    //    RGWFileHandle::decode_attrs, otherwise the file_ondisk_version
+    //    fied would be contaminated
     void decode(bufferlist::const_iterator& bl) {
-      DECODE_START(2, bl);
+      DECODE_START(3, bl);
       uint32_t fh_type;
       decode(fh_type, bl);
       if ((fh.fh_type != fh_type) &&
@@ -723,11 +728,17 @@ namespace rgw {
       if (struct_v >= 2) {
         decode(state.version, bl);
       }
+      if (struct_v >= 3) {
+       int64_t fov;
+       decode(fov, bl);
+       file_ondisk_version = fov;
+      }
       DECODE_FINISH(bl);
     }
 
     void encode_attrs(ceph::buffer::list& ux_key1,
-                     ceph::buffer::list& ux_attrs1);
+                     ceph::buffer::list& ux_attrs1,
+                     bool inc_ov = true);
 
     DecodeAttrsResult decode_attrs(const ceph::buffer::list* ux_key1,
                                    const ceph::buffer::list* ux_attrs1);