From: Yehuda Sadeh Date: Tue, 28 Aug 2018 21:23:53 +0000 (-0700) Subject: rgw: sysobj service: initial cache conversion X-Git-Tag: v14.1.0~965^2~38 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0f39a29dc8429d15f53084e4dc381b4a6841cb26;p=ceph.git rgw: sysobj service: initial cache conversion Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 5ace7d82b70..703375cad26 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -42,6 +42,7 @@ set(librgw_common_srcs services/svc_quota.cc services/svc_rados.cc services/svc_sys_obj.cc + services/svc_sys_obj_cache.cc services/svc_sys_obj_core.cc services/svc_zone.cc services/svc_zone_utils.cc diff --git a/src/rgw/services/svc_sys_obj.cc b/src/rgw/services/svc_sys_obj.cc index 85da7a0e743..722c43a46f1 100644 --- a/src/rgw/services/svc_sys_obj.cc +++ b/src/rgw/services/svc_sys_obj.cc @@ -98,6 +98,22 @@ int RGWSI_SysObj::Obj::WOp::write(bufferlist& bl) bl, objv_tracker, mtime); } +int RGWSI_SysObj::Obj::WOp::write_data(bufferlist& bl) +{ + RGWSI_SysObj_Core *svc = source.core_svc; + rgw_raw_obj& obj = source.get_obj(); + + return svc->write_data(obj, bl, exclusive, objv_tracker); +} + +int RGWSI_SysObj::Obj::WOp::write_attrs() +{ + RGWSI_SysObj_Core *svc = source.core_svc; + rgw_raw_obj& obj = source.get_obj(); + + return svc->set_attrs(obj, attrs, objv_tracker); +} + int RGWSI_SysObj::Pool::Op::list_prefixed_objs(const string& prefix, list *result) { bool is_truncated; diff --git a/src/rgw/services/svc_sys_obj.h b/src/rgw/services/svc_sys_obj.h index 2209727bdc6..c85b5b2607d 100644 --- a/src/rgw/services/svc_sys_obj.h +++ b/src/rgw/services/svc_sys_obj.h @@ -123,6 +123,9 @@ public: int remove(); int write(bufferlist& bl); + + int write_data(bufferlist& bl); /* write data only */ + int write_attrs(); /* write attrs only */ }; struct OmapOp { diff --git a/src/rgw/services/svc_sys_obj_cache.cc b/src/rgw/services/svc_sys_obj_cache.cc new file mode 100644 index 00000000000..7f2c55a2963 --- /dev/null +++ b/src/rgw/services/svc_sys_obj_cache.cc @@ -0,0 +1,356 @@ +#include "svc_sys_obj_cache.h" + + +int RGWSI_SysObj_Cache::remove(RGWSysObjectCtxBase& obj_ctx, + RGWObjVersionTracker *objv_tracker, + rgw_raw_obj& obj) + +{ + rgw_pool pool; + string oid; + normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); + + string name = normal_name(pool, oid); + cache.remove(name); + + ObjectCacheInfo info; + distribute_cache(name, obj, info, REMOVE_OBJ); + + return RGWSI_SysObj_Core::remove(obj_ctx, objv_tracker, obj); +} + +int RGWSI_SysObj_Cache::read(RGWSysObjectCtxBase& obj_ctx, + GetObjState& read_state, + RGWObjVersionTracker *objv_tracker, + rgw_raw_obj& obj, + bufferlist *obl, off_t ofs, off_t end, + map *attrs, + rgw_cache_entry_info *cache_info, + boost::optional refresh_version) +{ + rgw_pool pool; + string oid; + if (ofs != 0) { + return RGWSI_SysObj_Core::read(obj_ctx, read_state, objv_tracker, + obj, obl, ofs, end, attrs, + cache_info, refresh_version); + } + + normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); + string name = normal_name(pool, oid); + + ObjectCacheInfo info; + + uint32_t flags = CACHE_FLAG_DATA; + if (objv_tracker) + flags |= CACHE_FLAG_OBJV; + if (attrs) + flags |= CACHE_FLAG_XATTRS; + + if ((cache.get(name, info, flags, cache_info) == 0) && + (!refresh_version || !info.version.compare(&(*refresh_version)))) { + if (info.status < 0) + return info.status; + + bufferlist& bl = info.data; + + bufferlist::iterator i = bl.begin(); + + obl->clear(); + + i.copy_all(obl); + if (objv_tracker) + objv_tracker->read_version = info.version; + if (attrs) + *attrs = info.xattrs; + return obl->length(); + } + int r = RGWSI_SysObj_Core::read(obj_ctx, read_state, objv_tracker, + obj, obl, ofs, end, + attrs, cache_info, + refresh_version); + if (r < 0) { + if (r == -ENOENT) { // only update ENOENT, we'd rather retry other errors + info.status = r; + cache.put(name, info, cache_info); + } + return r; + } + + if (obl->length() == end + 1) { + /* in this case, most likely object contains more data, we can't cache it */ + return r; + } + + bufferptr p(r); + bufferlist& bl = info.data; + bl.clear(); + bufferlist::iterator o = obl->begin(); + o.copy_all(bl); + info.status = 0; + info.flags = flags; + if (objv_tracker) { + info.version = objv_tracker->read_version; + } + if (attrs) { + info.xattrs = *attrs; + } + cache.put(name, info, cache_info); + return r; +} + +int RGWSI_SysObj_Cache::set_attrs(rgw_raw_obj& obj, + map& attrs, + RGWObjVersionTracker *objv_tracker) +{ + rgw_pool pool; + string oid; + normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); + ObjectCacheInfo info; + info.xattrs = attrs; + info.status = 0; + info.flags = CACHE_FLAG_MODIFY_XATTRS; + if (objv_tracker) { + info.version = objv_tracker->write_version; + info.flags |= CACHE_FLAG_OBJV; + } + int ret = RGWSI_SysObj_Core::set_attrs(obj, attrs, rmattrs, objv_tracker); + string name = normal_name(pool, oid); + if (ret >= 0) { + cache.put(name, info, NULL); + int r = distribute_cache(name, obj, info, UPDATE_OBJ); + if (r < 0) + mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl; + } else { + cache.remove(name); + } + + return ret; +} + +int RGWSI_SysObjCache::write(rgw_raw_obj& obj, + real_time *pmtime, + map& attrs, + bool exclusive, + const bufferlist& data, + RGWObjVersionTracker *objv_tracker, + real_time set_mtime) override; +{ + rgw_pool pool; + string oid; + normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); + ObjectCacheInfo info; + info.xattrs = attrs; + info.status = 0; + info.data = data; + info.flags = CACHE_FLAG_XATTRS | CACHE_FLAG_DATA | CACHE_FLAG_META; + if (objv_tracker) { + info.version = objv_tracker->write_version; + info.flags |= CACHE_FLAG_OBJV; + } + ceph::real_time result_mtime; + int ret = RGWSI_SysObj_Core::write(obj, &result_mtime, attrs, + exclusive, data, + objv_tracker, set_mtime); + if (pmtime) { + *pmtime = result_mtime; + } + info.meta.mtime = result_mtime; + info.meta.size = data.length(); + string name = normal_name(pool, oid); + if (ret >= 0) { + cache.put(name, info, NULL); + // Only distribute the cache information if we did not just create + // the object with the exclusive flag. Note: PUT_OBJ_EXCL implies + // PUT_OBJ_CREATE. Generally speaking, when successfully creating + // a system object with the exclusive flag it is not necessary to + // call distribute_cache, as a) it's unclear whether other RGWs + // will need that system object in the near-term and b) it + // generates additional network traffic. + if (!exclusive) { + int r = distribute_cache(name, obj, info, UPDATE_OBJ); + if (r < 0) + mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl; + } + } else { + cache.remove(name); + } + + return ret; +} + +int RGWSI_SysObj_Cache::write_data(rgw_raw_obj& obj, + const bufferlist& bl, + bool exclusive, + RGWObjVersionTracker *objv_tracker) +{ + rgw_pool pool; + string oid; + normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); + ObjectCacheInfo info; + info.data = data; + info.meta.size = data.length(); + info.status = 0; + info.flags = CACHE_FLAG_DATA; + if (objv_tracker) { + info.version = objv_tracker->write_version; + info.flags |= CACHE_FLAG_OBJV; + } + int ret = RGWSI_SysObj_Core::write_data(obj, data, ofs, exclusive, objv_tracker); + string name = normal_name(pool, oid); + if (ret >= 0) { + cache.put(name, info, NULL); + int r = distribute_cache(name, obj, info, UPDATE_OBJ); + if (r < 0) + mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl; + } else { + cache.remove(name); + } + + return ret; +} + +int RGWSI_SysObj_Cache::raw_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *epoch, + map *attrs, bufferlist *first_chunk, + RGWObjVersionTracker *objv_tracker) +{ + rgw_pool pool; + string oid; + normalize_pool_and_obj(obj.pool, obj.oid, pool, oid); + + string name = normal_name(pool, oid); + + uint64_t size; + real_time mtime; + uint64_t epoch; + + ObjectCacheInfo info; + uint32_t flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS; + if (objv_tracker) + flags |= CACHE_FLAG_OBJV; + int r = cache.get(name, info, flags, NULL); + if (r == 0) { + if (info.status < 0) + return info.status; + + size = info.meta.size; + mtime = info.meta.mtime; + epoch = info.epoch; + if (objv_tracker) + objv_tracker->read_version = info.version; + goto done; + } + r = RGWSI_SysObj_Core::raw_stat(obj, &size, &mtime, &epoch, &info.xattrs, first_chunk, objv_tracker); + if (r < 0) { + if (r == -ENOENT) { + info.status = r; + cache.put(name, info, NULL); + } + return r; + } + info.status = 0; + info.epoch = epoch; + info.meta.mtime = mtime; + info.meta.size = size; + info.flags = CACHE_FLAG_META | CACHE_FLAG_XATTRS; + if (objv_tracker) { + info.flags |= CACHE_FLAG_OBJV; + info.version = objv_tracker->read_version; + } + cache.put(name, info, NULL); +done: + if (psize) + *psize = size; + if (pmtime) + *pmtime = mtime; + if (pepoch) + *pepoch = epoch; + if (attrs) + *attrs = info.xattrs; + return 0; +} + +int RGWSI_SysObj_Cache::distribute_cache(const string& normal_name, rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op) +{ + RGWCacheNotifyInfo info; + + info.op = op; + + info.obj_info = obj_info; + info.obj = obj; + bufferlist bl; + encode(info, bl); + return T::distribute(normal_name, bl); +} + +int RGWSI_SysObj_Cache::watch_cb(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl) +{ + RGWCacheNotifyInfo info; + + try { + auto iter = bl.cbegin(); + decode(info, iter); + } catch (buffer::end_of_buffer& err) { + mydout(0) << "ERROR: got bad notification" << dendl; + return -EIO; + } catch (buffer::error& err) { + mydout(0) << "ERROR: buffer::error" << dendl; + return -EIO; + } + + rgw_pool pool; + string oid; + normalize_pool_and_obj(info.obj.pool, info.obj.oid, pool, oid); + string name = normal_name(pool, oid); + + switch (info.op) { + case UPDATE_OBJ: + cache.put(name, info.obj_info, NULL); + break; + case REMOVE_OBJ: + cache.remove(name); + break; + default: + mydout(0) << "WARNING: got unknown notification op: " << info.op << dendl; + return -EINVAL; + } + + return 0; +} + +int RGWSI_SysObj_Cache::call_list(const std::optional& filter, Formatter* f) +{ + cache.for_each( + [this, &filter, f] (const string& name, const ObjectCacheEntry& entry) { + if (!filter || name.find(*filter) != name.npos) { + T::cache_list_dump_helper(f, name, entry.info.meta.mtime, + entry.info.meta.size); + } + }); +} + +int RGWSI_SysObj_Cache::call_inspect(const std::string& target, Formatter* f) +{ + if (const auto entry = cache.get(target)) { + f->open_object_section("cache_entry"); + f->dump_string("name", target.c_str()); + entry->dump(f); + f->close_section(); + return true; + } else { + return false; + } +} + +int RGWSI_SysObj_Cache::call_erase(const std::string& target) +{ + return cache.remove(target); +} + +int RGWSI_SysObj_Cache::call_zap() +{ + cache.invalidate_all(); +} diff --git a/src/rgw/services/svc_sys_obj_cache.h b/src/rgw/services/svc_sys_obj_cache.h new file mode 100644 index 00000000000..ece465d91f8 --- /dev/null +++ b/src/rgw/services/svc_sys_obj_cache.h @@ -0,0 +1,68 @@ + +#ifndef CEPH_RGW_SERVICES_SYS_OBJ_CACHE_H +#define CEPH_RGW_SERVICES_SYS_OBJ_CACHE_H + + +#include "rgw/rgw_service.h" + +#include "svc_rados.h" + + +class RGWSI_SysObj_Cache : public RGWSI_SysObj_Core +{ +protected: + std::map get_deps() override; + int load(const std::string& conf, std::map& dep_refs) override; + + int raw_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *epoch, + map *attrs, bufferlist *first_chunk, + RGWObjVersionTracker *objv_tracker) override; + + int read(RGWSysObjectCtxBase& obj_ctx, + GetObjState& read_state, + RGWObjVersionTracker *objv_tracker, + rgw_raw_obj& obj, + bufferlist *bl, off_t ofs, off_t end, + map *attrs, + boost::optional) override; + + int get_attr(rgw_raw_obj& obj, const char *name, bufferlist *dest) override; + + int set_attrs(rgw_raw_obj& obj, + map& attrs, + RGWObjVersionTracker *objv_tracker); + + int remove(RGWSysObjectCtxBase& obj_ctx, + RGWObjVersionTracker *objv_tracker, + rgw_raw_obj& obj) override; + + int write(rgw_raw_obj& obj, + real_time *pmtime, + map& attrs, + bool exclusive, + const bufferlist& data, + RGWObjVersionTracker *objv_tracker, + real_time set_mtime) override; + + int write_data(rgw_raw_obj& obj, + const bufferlist& bl, + bool exclusive, + RGWObjVersionTracker *objv_tracker); + + int distribute_cache(const string& normal_name, rgw_raw_obj& obj, ObjectCacheInfo& obj_info, int op); + + int watch_cb(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl); + +public: + RGWSI_SysObj_Cache(RGWService *svc, CephContext *cct): RGW_SysObj_Core(svc, cct) {} + + int call_list(const std::optional& filter, Formatter* f); + int call_inspect(const std::string& target, Formatter* f); + int call_erase(const std::string& target); + int call_zap(); +}; + +#endif diff --git a/src/rgw/services/svc_sys_obj_core.cc b/src/rgw/services/svc_sys_obj_core.cc index 491828abd2b..39c88483e1c 100644 --- a/src/rgw/services/svc_sys_obj_core.cc +++ b/src/rgw/services/svc_sys_obj_core.cc @@ -279,6 +279,53 @@ int RGWSI_SysObj_Core::get_attr(rgw_raw_obj& obj, return 0; } +int RGWSI_SysObj_Core::set_attrs(rgw_raw_obj& obj, + map& attrs, + RGWObjVersionTracker *objv_tracker) +{ + RGWSI_RADOS::Obj rados_obj; + int r = get_rados_obj(zone_svc.get(), obj, &rados_obj); + if (r < 0) { + ldout(cct, 20) << "get_rados_obj() on obj=" << obj << " returned " << r << dendl; + return r; + } + + ObjectWriteOperation op; + + if (objv_tracker) { + objv_tracker->prepare_op_for_write(&op); + } + + map::iterator iter; + if (rmattrs) { + for (iter = rmattrs->begin(); iter != rmattrs->end(); ++iter) { + const string& name = iter->first; + op.rmxattr(name.c_str()); + } + } + + for (iter = attrs.begin(); iter != attrs.end(); ++iter) { + const string& name = iter->first; + bufferlist& bl = iter->second; + + if (!bl.length()) + continue; + + op.setxattr(name.c_str(), bl); + } + + if (!op.size()) + return 0; + + bufferlist bl; + + r = rados_obj.operate(&op); + if (r < 0) + return r; + + return 0; +} + int RGWSI_SysObj_Core::omap_get_vals(rgw_raw_obj& obj, const string& marker, uint64_t count, @@ -503,3 +550,39 @@ int RGWSI_SysObj_Core::write(rgw_raw_obj& obj, } +int RGWSI_SysObj_Core::write_data(rgw_raw_obj& obj, + const bufferlist& bl, + bool exclusive, + RGWObjVersionTracker *objv_tracker) +{ + RGWSI_RADOS::Obj rados_obj; + int r = get_rados_obj(zone_svc.get(), obj, &rados_obj); + if (r < 0) { + ldout(cct, 20) << "get_rados_obj() on obj=" << obj << " returned " << r << dendl; + return r; + } + + ObjectWriteOperation op; + + if (exclusive) { + op.create(true); + } + + if (objv_tracker) { + objv_tracker->prepare_op_for_write(&op); + } + if (ofs == -1) { + op.write_full(bl); + } else { + op.write(ofs, bl); + } + r = rados_obj.operate(&op); + if (r < 0) + return r; + + if (objv_tracker) { + objv_tracker->apply_write(); + } + return 0; +} + diff --git a/src/rgw/services/svc_sys_obj_core.h b/src/rgw/services/svc_sys_obj_core.h index b79c32bc8d9..ec00f08a7d0 100644 --- a/src/rgw/services/svc_sys_obj_core.h +++ b/src/rgw/services/svc_sys_obj_core.h @@ -121,23 +121,12 @@ protected: std::map get_deps() override; int load(const std::string& conf, std::map& dep_refs) override; - virtual int get_rados_obj(RGWSI_Zone *zone_svc, rgw_raw_obj& obj, RGWSI_RADOS::Obj *pobj); - - virtual int get_system_obj_state_impl(RGWSysObjectCtxBase *rctx, rgw_raw_obj& obj, RGWSysObjState **state, RGWObjVersionTracker *objv_tracker); - virtual int get_system_obj_state(RGWSysObjectCtxBase *rctx, rgw_raw_obj& obj, RGWSysObjState **state, RGWObjVersionTracker *objv_tracker); + int get_rados_obj(RGWSI_Zone *zone_svc, rgw_raw_obj& obj, RGWSI_RADOS::Obj *pobj); virtual int raw_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *epoch, map *attrs, bufferlist *first_chunk, RGWObjVersionTracker *objv_tracker); - virtual int stat(RGWSysObjectCtxBase& obj_ctx, - GetObjState& state, - rgw_raw_obj& obj, - map *attrs, - real_time *lastmod, - uint64_t *obj_size, - RGWObjVersionTracker *objv_tracker); - virtual int read(RGWSysObjectCtxBase& obj_ctx, GetObjState& read_state, RGWObjVersionTracker *objv_tracker, @@ -146,8 +135,29 @@ protected: map *attrs, boost::optional); + virtual int remove(RGWSysObjectCtxBase& obj_ctx, + RGWObjVersionTracker *objv_tracker, + rgw_raw_obj& obj); + + virtual int write(rgw_raw_obj& obj, + real_time *pmtime, + map& attrs, + bool exclusive, + const bufferlist& data, + RGWObjVersionTracker *objv_tracker, + real_time set_mtime); + + virtual int write_data(rgw_raw_obj& obj, + const bufferlist& bl, + bool exclusive, + RGWObjVersionTracker *objv_tracker); + virtual int get_attr(rgw_raw_obj& obj, const char *name, bufferlist *dest); + virtual int set_attrs(rgw_raw_obj& obj, + map& attrs, + RGWObjVersionTracker *objv_tracker); + virtual int omap_get_all(rgw_raw_obj& obj, std::map *m); virtual int omap_get_vals(rgw_raw_obj& obj, const string& marker, @@ -158,17 +168,17 @@ protected: virtual int omap_set(rgw_raw_obj& obj, const map& m, bool must_exist = false); virtual int omap_del(rgw_raw_obj& obj, const std::string& key); - virtual int remove(RGWSysObjectCtxBase& obj_ctx, - RGWObjVersionTracker *objv_tracker, - rgw_raw_obj& obj); - - virtual int write(rgw_raw_obj& obj, - real_time *pmtime, - map& attrs, - bool exclusive, - const bufferlist& data, - RGWObjVersionTracker *objv_tracker, - real_time set_mtime); + /* wrappers */ + int get_system_obj_state_impl(RGWSysObjectCtxBase *rctx, rgw_raw_obj& obj, RGWSysObjState **state, RGWObjVersionTracker *objv_tracker); + int get_system_obj_state(RGWSysObjectCtxBase *rctx, rgw_raw_obj& obj, RGWSysObjState **state, RGWObjVersionTracker *objv_tracker); + + int stat(RGWSysObjectCtxBase& obj_ctx, + GetObjState& state, + rgw_raw_obj& obj, + map *attrs, + real_time *lastmod, + uint64_t *obj_size, + RGWObjVersionTracker *objv_tracker); public: RGWSI_SysObj_Core(RGWService *svc, CephContext *cct): RGWServiceInstance(svc, cct) {}