]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librgw: make rgw file handle versioned
authorXuehan Xu <xuxuehan@qianxin.com>
Sat, 2 Jan 2021 14:50:23 +0000 (22:50 +0800)
committerCory Snyder <csnyder@iland.com>
Thu, 17 Mar 2022 13:34:34 +0000 (09:34 -0400)
The reason that we need this is that there could be the following scenario:

1. rgw_setattr sets the file attr;
2. rgw_write writes some new data, and encodes its attr to store into rados;
3. before the actual persistence of the file's attr bl, rgw_lookup loads the file's
   previous attr and modifies the current file handle's metadata;
4. rgw_write's result persisted to rados;
5. rgw_setattr set the current file handle's metadata which is actually an old one to rados

In this case, the attr in rados would be out of date which means loss of data

Fixes: https://tracker.ceph.com/issues/50194
Signed-off-by: Xuehan Xu <xuxuehan@qianxin.com>
(cherry picked from commit 49a35d72e0982c03781d4845c800332bded1c658)

src/rgw/rgw_file.cc
src/rgw/rgw_file.h

index b78d0506ef840745f50233e4fb34ae5736efa789..42de2ab918b52fc5030e396301ba2797a3b8b8c0 100644 (file)
@@ -736,6 +736,7 @@ namespace rgw {
         rgw_fh->set_acls(*(req.get_attr(RGW_ATTR_ACL))); 
 
        get<0>(mkr) = rgw_fh;
+       rgw_fh->file_ondisk_version = 0; // inital version
        rgw_fh->mtx.unlock();
       } else
        rc = -EIO;
@@ -953,7 +954,7 @@ namespace rgw {
     rgw::sal::RGWRadosUser ruser(rgwlib.get_store(), user);
     RGWSetAttrsRequest req(cct, &ruser, rgw_fh->bucket_name(), obj_name);
 
-    rgw_fh->encode_attrs(ux_key, ux_attrs);
+    rgw_fh->encode_attrs(ux_key, ux_attrs, false);
 
     req.emplace_attr(RGW_ATTR_UNIX_KEY1, std::move(ux_key));
     req.emplace_attr(RGW_ATTR_UNIX1, std::move(ux_attrs));
@@ -1187,12 +1188,23 @@ namespace rgw {
   }
 
   void RGWFileHandle::encode_attrs(ceph::buffer::list& ux_key1,
-                                  ceph::buffer::list& ux_attrs1)
+                                  ceph::buffer::list& ux_attrs1,
+                                  bool inc_ov)
   {
     using ceph::encode;
     fh_key fhk(this->fh.fh_hk);
     encode(fhk, ux_key1);
+    bool need_ondisk_version =
+      (fh.fh_type == RGW_FS_TYPE_FILE ||
+       fh.fh_type == RGW_FS_TYPE_SYMBOLIC_LINK);
+    if (need_ondisk_version &&
+       file_ondisk_version < 0) {
+      file_ondisk_version = 0;
+    }
     encode(*this, ux_attrs1);
+    if (need_ondisk_version && inc_ov) {
+      file_ondisk_version++;
+    }
   } /* RGWFileHandle::encode_attrs */
 
   DecodeAttrsResult RGWFileHandle::decode_attrs(const ceph::buffer::list* ux_key1,
@@ -1205,8 +1217,35 @@ namespace rgw {
     decode(fhk, bl_iter_key1);
     get<0>(dar) = true;
 
+    // decode to a temporary file handle which may not be
+    // copied to the current file handle if its file_ondisk_version
+    // is not newer
+    RGWFileHandle tmp_fh(fs);
+    tmp_fh.fh.fh_type = fh.fh_type;
     auto bl_iter_unix1 = ux_attrs1->cbegin();
-    decode(*this, bl_iter_unix1);
+    decode(tmp_fh, bl_iter_unix1);
+
+    fh.fh_type = tmp_fh.fh.fh_type;
+    // for file handles that represent files and whose file_ondisk_version
+    // is newer, no updates are need, otherwise, go updating the current
+    // file handle
+    if (!((fh.fh_type == RGW_FS_TYPE_FILE ||
+           fh.fh_type == RGW_FS_TYPE_SYMBOLIC_LINK) &&
+         file_ondisk_version >= tmp_fh.file_ondisk_version)) {
+      // make sure the following "encode" always encode a greater version
+      file_ondisk_version = tmp_fh.file_ondisk_version + 1;
+      state.dev = tmp_fh.state.dev;
+      state.size = tmp_fh.state.size;
+      state.nlink = tmp_fh.state.nlink;
+      state.owner_uid = tmp_fh.state.owner_uid;
+      state.owner_gid = tmp_fh.state.owner_gid;
+      state.unix_mode = tmp_fh.state.unix_mode;
+      state.ctime = tmp_fh.state.ctime;
+      state.mtime = tmp_fh.state.mtime;
+      state.atime = tmp_fh.state.atime;
+      state.version = tmp_fh.state.version;
+    }
+
     if (this->state.version < 2) {
       get<1>(dar) = true;
     }
index f3ec1e885c1df13a496026089505a94cfb45fcdd..74daa5929589bce49d58344d7d96ac24ed828703 100644 (file)
@@ -192,6 +192,7 @@ namespace rgw {
     RGWLibFS* fs;
     RGWFileHandle* bucket;
     RGWFileHandle* parent;
+    std::atomic_int64_t file_ondisk_version; // version of unix attrs, file only
     /* const */ std::string name; /* XXX file or bucket name */
     /* const */ fh_key fhk;
 
@@ -279,8 +280,8 @@ namespace rgw {
 
   private:
     explicit RGWFileHandle(RGWLibFS* _fs)
-      : fs(_fs), bucket(nullptr), parent(nullptr), variant_type{directory()},
-       depth(0), flags(FLAG_NONE)
+      : fs(_fs), bucket(nullptr), parent(nullptr), file_ondisk_version(-1),
+       variant_type{directory()}, depth(0), flags(FLAG_NONE)
       {
         fh.fh_hk.bucket = 0;
         fh.fh_hk.object = 0;
@@ -320,8 +321,8 @@ namespace rgw {
   public:
     RGWFileHandle(RGWLibFS* _fs, RGWFileHandle* _parent,
                  const fh_key& _fhk, std::string& _name, uint32_t _flags)
-      : fs(_fs), bucket(nullptr), parent(_parent), name(std::move(_name)),
-       fhk(_fhk), flags(_flags) {
+      : fs(_fs), bucket(nullptr), parent(_parent), file_ondisk_version(-1),
+       name(std::move(_name)), fhk(_fhk), flags(_flags) {
 
       if (parent->is_root()) {
        fh.fh_type = RGW_FS_TYPE_DIRECTORY;
@@ -673,7 +674,7 @@ namespace rgw {
     }
 
     void encode(buffer::list& bl) const {
-      ENCODE_START(2, 1, bl);
+      ENCODE_START(3, 1, bl);
       encode(uint32_t(fh.fh_type), bl);
       encode(state.dev, bl);
       encode(state.size, bl);
@@ -685,11 +686,15 @@ namespace rgw {
        encode(real_clock::from_timespec(t), bl);
       }
       encode((uint32_t)2, bl);
+      encode(file_ondisk_version.load(), bl);
       ENCODE_FINISH(bl);
     }
 
+    //XXX: RGWFileHandle::decode method can only be called from
+    //    RGWFileHandle::decode_attrs, otherwise the file_ondisk_version
+    //    fied would be contaminated
     void decode(bufferlist::const_iterator& bl) {
-      DECODE_START(2, bl);
+      DECODE_START(3, bl);
       uint32_t fh_type;
       decode(fh_type, bl);
       if ((fh.fh_type != fh_type) &&
@@ -710,11 +715,17 @@ namespace rgw {
       if (struct_v >= 2) {
         decode(state.version, bl);
       }
+      if (struct_v >= 3) {
+       int64_t fov;
+       decode(fov, bl);
+       file_ondisk_version = fov;
+      }
       DECODE_FINISH(bl);
     }
 
     void encode_attrs(ceph::buffer::list& ux_key1,
-                     ceph::buffer::list& ux_attrs1);
+                     ceph::buffer::list& ux_attrs1,
+                     bool inc_ov = true);
 
     DecodeAttrsResult decode_attrs(const ceph::buffer::list* ux_key1,
                                    const ceph::buffer::list* ux_attrs1);