rgw_fh->set_acls(*(req.get_attr(RGW_ATTR_ACL)));
get<0>(mkr) = rgw_fh;
+ rgw_fh->file_ondisk_version = 0; // inital version
rgw_fh->mtx.unlock();
} else
rc = -EIO;
rgw::sal::RGWRadosUser ruser(rgwlib.get_store(), user);
RGWSetAttrsRequest req(cct, &ruser, rgw_fh->bucket_name(), obj_name);
- rgw_fh->encode_attrs(ux_key, ux_attrs);
+ rgw_fh->encode_attrs(ux_key, ux_attrs, false);
req.emplace_attr(RGW_ATTR_UNIX_KEY1, std::move(ux_key));
req.emplace_attr(RGW_ATTR_UNIX1, std::move(ux_attrs));
}
void RGWFileHandle::encode_attrs(ceph::buffer::list& ux_key1,
- ceph::buffer::list& ux_attrs1)
+ ceph::buffer::list& ux_attrs1,
+ bool inc_ov)
{
using ceph::encode;
fh_key fhk(this->fh.fh_hk);
encode(fhk, ux_key1);
+ bool need_ondisk_version =
+ (fh.fh_type == RGW_FS_TYPE_FILE ||
+ fh.fh_type == RGW_FS_TYPE_SYMBOLIC_LINK);
+ if (need_ondisk_version &&
+ file_ondisk_version < 0) {
+ file_ondisk_version = 0;
+ }
encode(*this, ux_attrs1);
+ if (need_ondisk_version && inc_ov) {
+ file_ondisk_version++;
+ }
} /* RGWFileHandle::encode_attrs */
DecodeAttrsResult RGWFileHandle::decode_attrs(const ceph::buffer::list* ux_key1,
decode(fhk, bl_iter_key1);
get<0>(dar) = true;
+ // decode to a temporary file handle which may not be
+ // copied to the current file handle if its file_ondisk_version
+ // is not newer
+ RGWFileHandle tmp_fh(fs);
+ tmp_fh.fh.fh_type = fh.fh_type;
auto bl_iter_unix1 = ux_attrs1->cbegin();
- decode(*this, bl_iter_unix1);
+ decode(tmp_fh, bl_iter_unix1);
+
+ fh.fh_type = tmp_fh.fh.fh_type;
+ // for file handles that represent files and whose file_ondisk_version
+ // is newer, no updates are need, otherwise, go updating the current
+ // file handle
+ if (!((fh.fh_type == RGW_FS_TYPE_FILE ||
+ fh.fh_type == RGW_FS_TYPE_SYMBOLIC_LINK) &&
+ file_ondisk_version >= tmp_fh.file_ondisk_version)) {
+ // make sure the following "encode" always encode a greater version
+ file_ondisk_version = tmp_fh.file_ondisk_version + 1;
+ state.dev = tmp_fh.state.dev;
+ state.size = tmp_fh.state.size;
+ state.nlink = tmp_fh.state.nlink;
+ state.owner_uid = tmp_fh.state.owner_uid;
+ state.owner_gid = tmp_fh.state.owner_gid;
+ state.unix_mode = tmp_fh.state.unix_mode;
+ state.ctime = tmp_fh.state.ctime;
+ state.mtime = tmp_fh.state.mtime;
+ state.atime = tmp_fh.state.atime;
+ state.version = tmp_fh.state.version;
+ }
+
if (this->state.version < 2) {
get<1>(dar) = true;
}
RGWLibFS* fs;
RGWFileHandle* bucket;
RGWFileHandle* parent;
+ std::atomic_int64_t file_ondisk_version; // version of unix attrs, file only
/* const */ std::string name; /* XXX file or bucket name */
/* const */ fh_key fhk;
private:
explicit RGWFileHandle(RGWLibFS* _fs)
- : fs(_fs), bucket(nullptr), parent(nullptr), variant_type{directory()},
- depth(0), flags(FLAG_NONE)
+ : fs(_fs), bucket(nullptr), parent(nullptr), file_ondisk_version(-1),
+ variant_type{directory()}, depth(0), flags(FLAG_NONE)
{
fh.fh_hk.bucket = 0;
fh.fh_hk.object = 0;
public:
RGWFileHandle(RGWLibFS* _fs, RGWFileHandle* _parent,
const fh_key& _fhk, std::string& _name, uint32_t _flags)
- : fs(_fs), bucket(nullptr), parent(_parent), name(std::move(_name)),
- fhk(_fhk), flags(_flags) {
+ : fs(_fs), bucket(nullptr), parent(_parent), file_ondisk_version(-1),
+ name(std::move(_name)), fhk(_fhk), flags(_flags) {
if (parent->is_root()) {
fh.fh_type = RGW_FS_TYPE_DIRECTORY;
}
void encode(buffer::list& bl) const {
- ENCODE_START(2, 1, bl);
+ ENCODE_START(3, 1, bl);
encode(uint32_t(fh.fh_type), bl);
encode(state.dev, bl);
encode(state.size, bl);
encode(real_clock::from_timespec(t), bl);
}
encode((uint32_t)2, bl);
+ encode(file_ondisk_version.load(), bl);
ENCODE_FINISH(bl);
}
+ //XXX: RGWFileHandle::decode method can only be called from
+ // RGWFileHandle::decode_attrs, otherwise the file_ondisk_version
+ // fied would be contaminated
void decode(bufferlist::const_iterator& bl) {
- DECODE_START(2, bl);
+ DECODE_START(3, bl);
uint32_t fh_type;
decode(fh_type, bl);
if ((fh.fh_type != fh_type) &&
if (struct_v >= 2) {
decode(state.version, bl);
}
+ if (struct_v >= 3) {
+ int64_t fov;
+ decode(fov, bl);
+ file_ondisk_version = fov;
+ }
DECODE_FINISH(bl);
}
void encode_attrs(ceph::buffer::list& ux_key1,
- ceph::buffer::list& ux_attrs1);
+ ceph::buffer::list& ux_attrs1,
+ bool inc_ov = true);
DecodeAttrsResult decode_attrs(const ceph::buffer::list* ux_key1,
const ceph::buffer::list* ux_attrs1);