endfunction()
set(librgw_common_srcs
+ services/svc_notify.cc
services/svc_quota.cc
services/svc_rados.cc
services/svc_sys_obj.cc
}
-int RGWRados::watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx) {
- int r = control_pool_ctx.watch2(oid, watch_handle, ctx);
- if (r < 0)
- return r;
- return 0;
-}
-
-int RGWRados::aio_watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c) {
- int r = control_pool_ctx.aio_watch2(oid, c, watch_handle, ctx, 0);
- if (r < 0)
- return r;
- return 0;
-}
-
-int RGWRados::unwatch(uint64_t watch_handle)
-{
- int r = control_pool_ctx.unwatch2(watch_handle);
- if (r < 0) {
- ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl;
- return r;
- }
- r = rados[0].watch_flush();
- if (r < 0) {
- ldout(cct, 0) << "ERROR: rados->watch_flush() returned r=" << r << dendl;
- return r;
- }
- return 0;
-}
-
-void RGWRados::add_watcher(int i)
-{
- ldout(cct, 20) << "add_watcher() i=" << i << dendl;
- Mutex::Locker l(watchers_lock);
- watchers_set.insert(i);
- if (watchers_set.size() == (size_t)num_watchers) {
- ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl;
- set_cache_enabled(true);
- }
-}
-
-void RGWRados::remove_watcher(int i)
-{
- ldout(cct, 20) << "remove_watcher() i=" << i << dendl;
- Mutex::Locker l(watchers_lock);
- size_t orig_size = watchers_set.size();
- watchers_set.erase(i);
- if (orig_size == (size_t)num_watchers &&
- watchers_set.size() < orig_size) { /* actually removed */
- ldout(cct, 2) << "removed watcher, disabling cache" << dendl;
- set_cache_enabled(false);
- }
-}
-
-class RGWWatcher : public librados::WatchCtx2 {
- RGWRados *rados;
- int index;
- string oid;
- uint64_t watch_handle;
- int register_ret{0};
- librados::AioCompletion *register_completion{nullptr};
-
- class C_ReinitWatch : public Context {
- RGWWatcher *watcher;
- public:
- explicit C_ReinitWatch(RGWWatcher *_watcher) : watcher(_watcher) {}
- void finish(int r) override {
- watcher->reinit();
- }
- };
-public:
- RGWWatcher(RGWRados *r, int i, const string& o) : rados(r), index(i), oid(o), watch_handle(0) {}
- void handle_notify(uint64_t notify_id,
- uint64_t cookie,
- uint64_t notifier_id,
- bufferlist& bl) override {
- ldout(rados->ctx(), 10) << "RGWWatcher::handle_notify() "
- << " notify_id " << notify_id
- << " cookie " << cookie
- << " notifier " << notifier_id
- << " bl.length()=" << bl.length() << dendl;
-
- if (unlikely(rados->inject_notify_timeout_probability == 1) ||
- (rados->inject_notify_timeout_probability > 0 &&
- (rados->inject_notify_timeout_probability >
- ceph::util::generate_random_number(0.0, 1.0)))) {
- ldout(rados->ctx(), 0)
- << "RGWWatcher::handle_notify() dropping notification! "
- << "If this isn't what you want, set "
- << "rgw_inject_notify_timeout_probability to zero!" << dendl;
- return;
- }
-
-
-
- rados->watch_cb(notify_id, cookie, notifier_id, bl);
-
- bufferlist reply_bl; // empty reply payload
- rados->control_pool_ctx.notify_ack(oid, notify_id, cookie, reply_bl);
- }
- void handle_error(uint64_t cookie, int err) override {
- lderr(rados->ctx()) << "RGWWatcher::handle_error cookie " << cookie
- << " err " << cpp_strerror(err) << dendl;
- rados->remove_watcher(index);
- rados->schedule_context(new C_ReinitWatch(this));
- }
-
- void reinit() {
- int ret = unregister_watch();
- if (ret < 0) {
- ldout(rados->ctx(), 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl;
- return;
- }
- ret = register_watch();
- if (ret < 0) {
- ldout(rados->ctx(), 0) << "ERROR: register_watch() returned ret=" << ret << dendl;
- return;
- }
- }
-
- int unregister_watch() {
- int r = rados->unwatch(watch_handle);
- if (r < 0) {
- return r;
- }
- rados->remove_watcher(index);
- return 0;
- }
-
- int register_watch_async() {
- if (register_completion) {
- register_completion->release();
- register_completion = nullptr;
- }
- register_completion = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
- register_ret = rados->aio_watch(oid, &watch_handle, this, register_completion);
- if (register_ret < 0) {
- register_completion->release();
- return register_ret;
- }
- return 0;
- }
-
- int register_watch_finish() {
- if (register_ret < 0) {
- return register_ret;
- }
- if (!register_completion) {
- return -EINVAL;
- }
- register_completion->wait_for_safe();
- int r = register_completion->get_return_value();
- register_completion->release();
- register_completion = nullptr;
- if (r < 0) {
- return r;
- }
- rados->add_watcher(index);
- return 0;
- }
-
- int register_watch() {
- int r = rados->watch(oid, &watch_handle, this);
- if (r < 0) {
- return r;
- }
- rados->add_watcher(index);
- return 0;
- }
-};
-
class RGWMetaNotifierManager : public RGWCoroutinesManager {
RGWRados *store;
RGWHTTPManager http_manager;
period_history.reset(new RGWPeriodHistory(cct, period_puller.get(),
svc.zone->get_current_period()));
- if (need_watch_notify()) {
- ret = init_watch();
- if (ret < 0) {
- lderr(cct) << "ERROR: failed to initialize watch: " << cpp_strerror(-ret) << dendl;
- return ret;
- }
- }
-
ret = open_root_pool_ctx();
if (ret < 0)
return ret;
return init_complete();
}
-void RGWRados::finalize_watch()
-{
- for (int i = 0; i < num_watchers; i++) {
- RGWWatcher *watcher = watchers[i];
- watcher->unregister_watch();
- delete watcher;
- }
-
- delete[] notify_oids;
- delete[] watchers;
-}
-
void RGWRados::schedule_context(Context *c) {
finisher->queue(c);
}
return rgw_init_ioctx(get_rados_handle(), svc.zone->get_zone_params().reshard_pool, reshard_pool_ctx, true);
}
-int RGWRados::init_watch()
-{
-#warning needs to be part of the sysobj/cache service
- int r = rgw_init_ioctx(&rados[0], svc.zone->get_zone_params().control_pool, control_pool_ctx, true);
- if (r < 0) {
- return r;
- }
-
- num_watchers = cct->_conf->rgw_num_control_oids;
-
- bool compat_oid = (num_watchers == 0);
-
- if (num_watchers <= 0)
- num_watchers = 1;
-
- notify_oids = new string[num_watchers];
- watchers = new RGWWatcher *[num_watchers];
-
- int error = 0;
-
- for (int i=0; i < num_watchers; i++) {
- string& notify_oid = notify_oids[i];
- notify_oid = notify_oid_prefix;
- if (!compat_oid) {
- char buf[16];
- snprintf(buf, sizeof(buf), ".%d", i);
- notify_oid.append(buf);
- }
- r = control_pool_ctx.create(notify_oid, false);
- if (r < 0 && r != -EEXIST)
- return r;
-
- RGWWatcher *watcher = new RGWWatcher(this, i, notify_oid);
- watchers[i] = watcher;
-
- r = watcher->register_watch_async();
- if (r < 0) {
- ldout(cct, 0) << "WARNING: register_watch_aio() returned " << r << dendl;
- error = r;
- continue;
- }
- }
-
- for (int i = 0; i < num_watchers; ++i) {
- int r = watchers[i]->register_watch_finish();
- if (r < 0) {
- ldout(cct, 0) << "WARNING: async watch returned " << r << dendl;
- error = r;
- }
- }
-
- if (error < 0) {
- return error;
- }
-
- watch_initialized = true;
-
- set_cache_enabled(true);
-
- return 0;
-}
-
-void RGWRados::pick_control_oid(const string& key, string& notify_oid)
-{
- uint32_t r = ceph_str_hash_linux(key.c_str(), key.size());
-
- int i = r % num_watchers;
- char buf[16];
- snprintf(buf, sizeof(buf), ".%d", i);
-
- notify_oid = notify_oid_prefix;
- notify_oid.append(buf);
-}
-
int RGWRados::open_pool_ctx(const rgw_pool& pool, librados::IoCtx& io_ctx)
{
constexpr bool create = true; // create the pool if it doesn't exist
int RGWRados::distribute(const string& key, bufferlist& bl)
{
- /*
- * we were called before watch was initialized. This can only happen if we're updating some system
- * config object (e.g., zone info) during init. Don't try to distribute the cache info for these
- * objects, they're currently only read on startup anyway.
- */
- if (!watch_initialized)
- return 0;
-
- string notify_oid;
- pick_control_oid(key, notify_oid);
+ RGWSI_RADOS::Obj notify_obj = pick_control_obj(key);
ldout(cct, 10) << "distributing notification oid=" << notify_oid << " bl.length()=" << bl.length() << dendl;
return robust_notify(notify_oid, bl);
}
-int RGWRados::robust_notify(const string& notify_oid, bufferlist& bl)
+int RGWSI_Notify::robust_notify(RGWSI_RADOS::Obj& notify_obj, bufferlist& bl)
{
// The reply of every machine that acks goes in here.
boost::container::flat_set<std::pair<uint64_t, uint64_t>> acks;
bufferlist rbl;
// First, try to send, without being fancy about it.
- auto r = control_pool_ctx.notify2(notify_oid, bl, 0, &rbl);
+ auto r = notify_obj.notify(bl, 0, &rbl);
// If that doesn't work, get serious.
if (r < 0) {
rbl.clear();
// Reset the timeouts, we're only concerned with new ones.
timeouts.clear();
- r = control_pool_ctx.notify2(notify_oid, bl, 0, &rbl);
+ r = notify_obj.notify(bl, 0, &rbl);
if (r < 0) {
ldout(cct, 1) << "robust_notify: retry " << tries << " failed: "
<< cpp_strerror(-r) << dendl;
std::atomic<int64_t> max_req_id = { 0 };
Mutex lock;
- Mutex watchers_lock;
SafeTimer *timer;
RGWGC *gc;
Mutex meta_sync_thread_lock;
Mutex data_sync_thread_lock;
- int num_watchers;
- RGWWatcher **watchers;
- std::set<int> watchers_set;
librados::IoCtx root_pool_ctx; // .rgw
- librados::IoCtx control_pool_ctx; // .rgw.control
- bool watch_initialized;
double inject_notify_timeout_probability = 0;
unsigned max_notify_retries = 0;
RGWServiceRegistryRef svc_registry;
RGWIndexCompletionManager *index_completion_manager{nullptr};
public:
- RGWRados() : lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL),
+ RGWRados() : lock("rados_timer_lock"), timer(NULL),
gc(NULL), lc(NULL), obj_expirer(NULL), use_gc_thread(false), use_lc_thread(false), quota_threads(false),
run_sync_thread(false), run_reshard_thread(false), async_rados(nullptr), meta_notifier(NULL),
data_notifier(NULL), meta_sync_processor_thread(NULL),
meta_sync_thread_lock("meta_sync_thread_lock"), data_sync_thread_lock("data_sync_thread_lock"),
- num_watchers(0), watchers(NULL),
- watch_initialized(false),
bucket_id_lock("rados_bucket_id"),
bucket_index_max_shards(0),
max_bucket_id(0), cct(NULL),
--- /dev/null
+#include "include/random.h"
+#include "common/errno.h"
+
+#include "svc_notify.h"
+#include "svc_zone.h"
+#include "svc_rados.h"
+
+#include "rgw/rgw_zone.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+static string notify_oid_prefix = "notify";
+
+class RGWWatcher : public librados::WatchCtx2 {
+ CephContext *cct;
+ RGWSI_Notify *svc;
+ int index;
+ RGWSI_RADOS::Obj obj;
+ uint64_t watch_handle;
+ int register_ret{0};
+ librados::AioCompletion *register_completion{nullptr};
+
+ class C_ReinitWatch : public Context {
+ RGWWatcher *watcher;
+ public:
+ explicit C_ReinitWatch(RGWWatcher *_watcher) : watcher(_watcher) {}
+ void finish(int r) override {
+ watcher->reinit();
+ }
+ };
+public:
+ RGWWatcher(CephContext *_cct, RGWSI_Notify *s, int i, RGWSI_RADOS::Obj& o) : cct(_cct), svc(s), index(i), obj(o), watch_handle(0) {}
+ void handle_notify(uint64_t notify_id,
+ uint64_t cookie,
+ uint64_t notifier_id,
+ bufferlist& bl) override {
+ ldout(cct, 10) << "RGWWatcher::handle_notify() "
+ << " notify_id " << notify_id
+ << " cookie " << cookie
+ << " notifier " << notifier_id
+ << " bl.length()=" << bl.length() << dendl;
+
+ if (unlikely(svc->inject_notify_timeout_probability == 1) ||
+ (svc->inject_notify_timeout_probability > 0 &&
+ (svc->inject_notify_timeout_probability >
+ ceph::util::generate_random_number(0.0, 1.0)))) {
+ ldout(cct, 0)
+ << "RGWWatcher::handle_notify() dropping notification! "
+ << "If this isn't what you want, set "
+ << "rgw_inject_notify_timeout_probability to zero!" << dendl;
+ return;
+ }
+
+ svc->watch_cb(notify_id, cookie, notifier_id, bl);
+
+ bufferlist reply_bl; // empty reply payload
+ obj.notify_ack(notify_id, cookie, reply_bl);
+ }
+ void handle_error(uint64_t cookie, int err) override {
+ lderr(cct) << "RGWWatcher::handle_error cookie " << cookie
+ << " err " << cpp_strerror(err) << dendl;
+ svc->remove_watcher(index);
+ svc->schedule_context(new C_ReinitWatch(this));
+ }
+
+ void reinit() {
+ int ret = unregister_watch();
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl;
+ return;
+ }
+ ret = register_watch();
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: register_watch() returned ret=" << ret << dendl;
+ return;
+ }
+ }
+
+ int unregister_watch() {
+ int r = svc->unwatch(obj, watch_handle);
+ if (r < 0) {
+ return r;
+ }
+ svc->remove_watcher(index);
+ return 0;
+ }
+
+ int register_watch_async() {
+ if (register_completion) {
+ register_completion->release();
+ register_completion = nullptr;
+ }
+ register_completion = librados::Rados::aio_create_completion(nullptr, nullptr, nullptr);
+ register_ret = obj.aio_watch(register_completion, &watch_handle, this);
+ if (register_ret < 0) {
+ register_completion->release();
+ return register_ret;
+ }
+ return 0;
+ }
+
+ int register_watch_finish() {
+ if (register_ret < 0) {
+ return register_ret;
+ }
+ if (!register_completion) {
+ return -EINVAL;
+ }
+ register_completion->wait_for_safe();
+ int r = register_completion->get_return_value();
+ register_completion->release();
+ register_completion = nullptr;
+ if (r < 0) {
+ return r;
+ }
+ svc->add_watcher(index);
+ return 0;
+ }
+
+ int register_watch() {
+ int r = obj.watch(&watch_handle, this);
+ if (r < 0) {
+ return r;
+ }
+ svc->add_watcher(index);
+ return 0;
+ }
+};
+
+int RGWS_Notify::create_instance(const string& conf, RGWServiceInstanceRef *instance)
+{
+ instance->reset(new RGWSI_Notify(this, cct));
+ return 0;
+}
+
+std::map<string, RGWServiceInstance::dependency> RGWSI_Notify::get_deps()
+{
+ map<string, RGWServiceInstance::dependency> deps;
+ deps["zone_dep"] = { .name = "zone",
+ .conf = "{}" };
+ deps["rados_dep"] = { .name = "rados",
+ .conf = "{}" };
+ return deps;
+}
+
+int RGWSI_Notify::load(const string& conf, std::map<std::string, RGWServiceInstanceRef>& dep_refs)
+{
+ zone_svc = static_pointer_cast<RGWSI_Zone>(dep_refs["zone_dep"]);
+ assert(zone_svc);
+ rados_svc = static_pointer_cast<RGWSI_RADOS>(dep_refs["rados_dep"]);
+ assert(rados_svc);
+ return 0;
+}
+
+string RGWSI_Notify::get_control_oid(int i)
+{
+ char buf[notify_oid_prefix.size() + 16];
+ snprintf(buf, sizeof(buf), "%s.%d", notify_oid_prefix.c_str(), i);
+
+ return string(buf);
+}
+
+RGWSI_RADOS::Obj RGWSI_Notify::pick_control_obj(const string& key)
+{
+ uint32_t r = ceph_str_hash_linux(key.c_str(), key.size());
+
+ int i = r % num_watchers;
+ return notify_objs[i];
+}
+
+int RGWSI_Notify::init_watch()
+{
+ num_watchers = cct->_conf->rgw_num_control_oids;
+
+ bool compat_oid = (num_watchers == 0);
+
+ if (num_watchers <= 0)
+ num_watchers = 1;
+
+ watchers = new RGWWatcher *[num_watchers];
+
+ int error = 0;
+
+ notify_objs.resize(num_watchers);
+
+ for (int i=0; i < num_watchers; i++) {
+ string notify_oid;
+
+ if (!compat_oid) {
+ notify_oid = get_control_oid(i);
+ } else {
+ notify_oid = notify_oid_prefix;
+ }
+
+ notify_objs[i] = rados_svc->obj({control_pool, notify_oid});
+ auto& notify_obj = notify_objs[i];
+
+ librados::ObjectWriteOperation op;
+ op.create(false);
+ int r = notify_obj.operate(&op);
+ if (r < 0 && r != -EEXIST) {
+ return r;
+ }
+
+ RGWWatcher *watcher = new RGWWatcher(cct, this, i, notify_obj);
+ watchers[i] = watcher;
+
+ r = watcher->register_watch_async();
+ if (r < 0) {
+ ldout(cct, 0) << "WARNING: register_watch_aio() returned " << r << dendl;
+ error = r;
+ continue;
+ }
+ }
+
+ for (int i = 0; i < num_watchers; ++i) {
+ int r = watchers[i]->register_watch_finish();
+ if (r < 0) {
+ ldout(cct, 0) << "WARNING: async watch returned " << r << dendl;
+ error = r;
+ }
+ }
+
+ if (error < 0) {
+ return error;
+ }
+
+ return 0;
+}
+
+void RGWSI_Notify::finalize_watch()
+{
+ for (int i = 0; i < num_watchers; i++) {
+ RGWWatcher *watcher = watchers[i];
+ watcher->unregister_watch();
+ delete watcher;
+ }
+
+ delete[] watchers;
+}
+
+int RGWSI_Notify::init()
+{
+ control_pool = zone_svc->get_zone_params().control_pool;
+
+ int ret = init_watch();
+ if (ret < 0) {
+ lderr(cct) << "ERROR: failed to initialize watch: " << cpp_strerror(-ret) << dendl;
+ return ret;
+ }
+
+ return 0;
+}
+
+void RGWSI_Notify::shutdown()
+{
+ finalize_watch();
+}
+
+int RGWSI_Notify::watch(RGWSI_RADOS::Obj& obj, uint64_t *watch_handle, librados::WatchCtx2 *ctx)
+{
+ int r = obj.watch(watch_handle, ctx);
+ if (r < 0)
+ return r;
+ return 0;
+}
+
+int RGWSI_Notify::aio_watch(RGWSI_RADOS::Obj& obj, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c)
+{
+ int r = obj.aio_watch(c, watch_handle, ctx, 0);
+ if (r < 0)
+ return r;
+ return 0;
+}
+
+int RGWSI_Notify::unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle)
+{
+ int r = obj.unwatch(watch_handle);
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: rados->unwatch2() returned r=" << r << dendl;
+ return r;
+ }
+ r = rados[0].watch_flush();
+ if (r < 0) {
+ ldout(cct, 0) << "ERROR: rados->watch_flush() returned r=" << r << dendl;
+ return r;
+ }
+ return 0;
+}
+
+void RGWSI_Notify::add_watcher(int i)
+{
+ ldout(cct, 20) << "add_watcher() i=" << i << dendl;
+ Mutex::Locker l(watchers_lock);
+ watchers_set.insert(i);
+ if (watchers_set.size() == (size_t)num_watchers) {
+ ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl;
+#warning fixme
+#if 0
+ set_cache_enabled(true);
+#endif
+ }
+}
+
+void RGWSI_Notify::remove_watcher(int i)
+{
+ ldout(cct, 20) << "remove_watcher() i=" << i << dendl;
+ Mutex::Locker l(watchers_lock);
+ size_t orig_size = watchers_set.size();
+ watchers_set.erase(i);
+ if (orig_size == (size_t)num_watchers &&
+ watchers_set.size() < orig_size) { /* actually removed */
+ ldout(cct, 2) << "removed watcher, disabling cache" << dendl;
+#warning fixme
+#if 0
+ set_cache_enabled(false);
+#endif
+ }
+}
+
--- /dev/null
+#ifndef CEPH_RGW_SERVICES_NOTIFY_H
+#define CEPH_RGW_SERVICES_NOTIFY_H
+
+
+#include "rgw/rgw_service.h"
+
+#include "svc_rados.h"
+
+
+class RGWSI_Zone;
+
+class RGWWatcher;
+
+class RGWS_Notify : public RGWService
+{
+public:
+ RGWS_Notify(CephContext *cct) : RGWService(cct, "quota") {}
+
+ int create_instance(const std::string& conf, RGWServiceInstanceRef *instance) override;
+};
+
+class RGWSI_Notify : public RGWServiceInstance
+{
+ std::shared_ptr<RGWSI_Zone> zone_svc;
+ std::shared_ptr<RGWSI_RADOS> rados_svc;
+
+ std::map<std::string, RGWServiceInstance::dependency> get_deps() override;
+ int load(const std::string& conf, std::map<std::string, RGWServiceInstanceRef>& dep_refs) override;
+
+ Mutex watchers_lock{"watchers_lock"};
+ rgw_pool control_pool;
+
+ int num_watchers{0};
+ RGWWatcher **watchers{nullptr};
+ std::set<int> watchers_set;
+ vector<RGWSI_RADOS::Obj> notify_objs;
+
+ double inject_notify_timeout_probability{0};
+ unsigned max_notify_retries{0};
+
+ friend class RGWWatcher;
+
+ string get_control_oid(int i);
+ RGWSI_RADOS::Obj pick_control_obj(const string& key);
+
+ int init_watch();
+ void finalize_watch();
+
+ int init() override;
+ void shutdown() override;
+
+ int watch(RGWSI_RADOS::Obj& obj, uint64_t *watch_handle, librados::WatchCtx2 *ctx);
+ int aio_watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx, librados::AioCompletion *c);
+ int unwatch(RGWSI_RADOS::Obj& obj, uint64_t watch_handle);
+ void add_watcher(int i);
+ void remove_watcher(int i);
+public:
+ RGWSI_Notify(RGWService *svc, CephContext *cct): RGWServiceInstance(svc, cct) {}
+
+};
+
+#endif
+
return ref.ioctx.aio_operate(ref.oid, c, op);
}
+int RGWSI_RADOS::Obj::watch(uint64_t *handle, librados::WatchCtx2 *ctx)
+{
+ return ref.ioctx.watch2(ref.oid, handle, ctx);
+}
+
+int RGWSI_RADOS::Obj::aio_watch(AioCompletion *c, uint64_t *handle, librados::WatchCtx2 *ctx)
+{
+ return ref.ioctx.aio_watch(ref.oid, c, handle, ctx);
+}
+
+int RGWSI_RADOS::Obj::unwatch(uint64_t handle)
+{
+ return ref.ioctx.unwatch2(handle);
+}
+
uint64_t RGWSI_RADOS::Obj::get_last_version()
{
return ref.ioctx.get_last_version();
int operate(librados::ObjectReadOperation *op, bufferlist *pbl);
int aio_operate(librados::AioCompletion *c, librados::ObjectWriteOperation *op);
+ int watch(uint64_t *handle, librados::WatchCtx2 *ctx);
+ int aio_watch(librados::AioCompletion *c, uint64_t *handle, librados::WatchCtx2 *ctx);
+ int unwatch(uint64_t handle);
+
uint64_t get_last_version();
};
objv_tracker,
obj, bl, ofs, end,
attrs,
+ cache_info,
refresh_version);
}
RGWSI_SysObj_Core *svc = source.core_svc;
rgw_raw_obj& obj = source.get_obj();
- return svc->set_attrs(obj, attrs, objv_tracker);
+ return svc->set_attrs(obj, attrs, nullptr, objv_tracker);
}
int RGWSI_SysObj::Pool::Op::list_prefixed_objs(const string& prefix, list<string> *result)
class RGWSI_SysObj;
class RGWSysObjectCtx;
+struct rgw_cache_entry_info;
+
class RGWS_SysObj : public RGWService
{
public:
boost::optional<obj_version> refresh_version{boost::none};
ceph::real_time *lastmod{nullptr};
uint64_t *obj_size{nullptr};
+ rgw_cache_entry_info *cache_info{nullptr};
ROp& set_last_mod(ceph::real_time *_lastmod) {
lastmod = _lastmod;
return *this;
}
+ ROp& set_cache_info(rgw_cache_entry_info *ci) {
+ cache_info = ci;
+ return *this;
+ }
+
ROp(Obj& _source) : source(_source) {}
int stat();
#include "svc_sys_obj_cache.h"
+#include "svc_zone.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+static string normal_name(rgw_pool& pool, const std::string& oid) {
+ std::string buf;
+ buf.reserve(pool.name.size() + pool.ns.size() + oid.size() + 2);
+ buf.append(pool.name).append("+").append(pool.ns).append("+").append(oid);
+ return buf;
+}
+
+void RGWSI_SysObj_Cache::normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj)
+{
+ if (src_obj.size()) {
+ dst_pool = src_pool;
+ dst_obj = src_obj;
+ } else {
+ dst_pool = zone_svc->get_zone_params().domain_root;
+ dst_obj = src_pool.name;
+ }
+}
int RGWSI_SysObj_Cache::remove(RGWSysObjectCtxBase& obj_ctx,
obl->clear();
- i.copy_all(obl);
+ i.copy_all(*obl);
if (objv_tracker)
objv_tracker->read_version = info.version;
if (attrs)
int RGWSI_SysObj_Cache::set_attrs(rgw_raw_obj& obj,
map<string, bufferlist>& attrs,
+ map<string, bufferlist> *rmattrs,
RGWObjVersionTracker *objv_tracker)
{
rgw_pool pool;
normalize_pool_and_obj(obj.pool, obj.oid, pool, oid);
ObjectCacheInfo info;
info.xattrs = attrs;
+ if (rmattrs) {
+ info.rm_xattrs = *rmattrs;
+ }
info.status = 0;
info.flags = CACHE_FLAG_MODIFY_XATTRS;
if (objv_tracker) {
cache.put(name, info, NULL);
int r = distribute_cache(name, obj, info, UPDATE_OBJ);
if (r < 0)
- mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
+ ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
} else {
cache.remove(name);
}
return ret;
}
-int RGWSI_SysObjCache::write(rgw_raw_obj& obj,
+int RGWSI_SysObj_Cache::write(rgw_raw_obj& obj,
real_time *pmtime,
map<std::string, bufferlist>& attrs,
bool exclusive,
const bufferlist& data,
RGWObjVersionTracker *objv_tracker,
- real_time set_mtime) override;
+ real_time set_mtime)
{
rgw_pool pool;
string oid;
if (!exclusive) {
int r = distribute_cache(name, obj, info, UPDATE_OBJ);
if (r < 0)
- mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
+ ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
}
} else {
cache.remove(name);
}
int RGWSI_SysObj_Cache::write_data(rgw_raw_obj& obj,
- const bufferlist& bl,
+ const bufferlist& data,
bool exclusive,
RGWObjVersionTracker *objv_tracker)
{
info.version = objv_tracker->write_version;
info.flags |= CACHE_FLAG_OBJV;
}
- int ret = RGWSI_SysObj_Core::write_data(obj, data, ofs, exclusive, objv_tracker);
+ int ret = RGWSI_SysObj_Core::write_data(obj, data, exclusive, objv_tracker);
string name = normal_name(pool, oid);
if (ret >= 0) {
cache.put(name, info, NULL);
int r = distribute_cache(name, obj, info, UPDATE_OBJ);
if (r < 0)
- mydout(0) << "ERROR: failed to distribute cache for " << obj << dendl;
+ ldout(cct, 0) << "ERROR: failed to distribute cache for " << obj << dendl;
} else {
cache.remove(name);
}
return ret;
}
-int RGWSI_SysObj_Cache::raw_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *epoch,
+int RGWSI_SysObj_Cache::raw_stat(rgw_raw_obj& obj, uint64_t *psize, real_time *pmtime, uint64_t *pepoch,
map<string, bufferlist> *attrs, bufferlist *first_chunk,
RGWObjVersionTracker *objv_tracker)
{
auto iter = bl.cbegin();
decode(info, iter);
} catch (buffer::end_of_buffer& err) {
- mydout(0) << "ERROR: got bad notification" << dendl;
+ ldout(cct, 0) << "ERROR: got bad notification" << dendl;
return -EIO;
} catch (buffer::error& err) {
- mydout(0) << "ERROR: buffer::error" << dendl;
+ ldout(cct, 0) << "ERROR: buffer::error" << dendl;
return -EIO;
}
cache.remove(name);
break;
default:
- mydout(0) << "WARNING: got unknown notification op: " << info.op << dendl;
+ ldout(cct, 0) << "WARNING: got unknown notification op: " << info.op << dendl;
return -EINVAL;
}
return 0;
}
-int RGWSI_SysObj_Cache::call_list(const std::optional<std::string>& filter, Formatter* f)
+void RGWSI_SysObj_Cache::call_list(const std::optional<std::string>& filter, Formatter* f)
{
cache.for_each(
[this, &filter, f] (const string& name, const ObjectCacheEntry& entry) {
#include "rgw/rgw_service.h"
+#include "rgw/rgw_cache.h"
-#include "svc_rados.h"
+#include "svc_sys_obj_core.h"
class RGWSI_SysObj_Cache : public RGWSI_SysObj_Core
{
+ ObjectCache cache;
+
+ void normalize_pool_and_obj(rgw_pool& src_pool, const string& src_obj, rgw_pool& dst_pool, string& dst_obj);
protected:
std::map<std::string, RGWServiceInstance::dependency> get_deps() override;
int load(const std::string& conf, std::map<std::string, RGWServiceInstanceRef>& dep_refs) override;
rgw_raw_obj& obj,
bufferlist *bl, off_t ofs, off_t end,
map<string, bufferlist> *attrs,
+ rgw_cache_entry_info *cache_info,
boost::optional<obj_version>) override;
int get_attr(rgw_raw_obj& obj, const char *name, bufferlist *dest) override;
int set_attrs(rgw_raw_obj& obj,
map<string, bufferlist>& attrs,
+ map<string, bufferlist> *rmattrs,
RGWObjVersionTracker *objv_tracker);
int remove(RGWSysObjectCtxBase& obj_ctx,
bufferlist& bl);
public:
- RGWSI_SysObj_Cache(RGWService *svc, CephContext *cct): RGW_SysObj_Core(svc, cct) {}
+ RGWSI_SysObj_Cache(RGWService *svc, CephContext *cct) : RGWSI_SysObj_Core(svc, cct) {}
- int call_list(const std::optional<std::string>& filter, Formatter* f);
+ void call_list(const std::optional<std::string>& filter, Formatter* f);
int call_inspect(const std::string& target, Formatter* f);
int call_erase(const std::string& target);
int call_zap();
rgw_raw_obj& obj,
bufferlist *bl, off_t ofs, off_t end,
map<string, bufferlist> *attrs,
+ rgw_cache_entry_info *cache_info,
boost::optional<obj_version>)
{
uint64_t len;
int RGWSI_SysObj_Core::set_attrs(rgw_raw_obj& obj,
map<string, bufferlist>& attrs,
+ map<string, bufferlist> *rmattrs,
RGWObjVersionTracker *objv_tracker)
{
RGWSI_RADOS::Obj rados_obj;
return r;
}
- ObjectWriteOperation op;
+ librados::ObjectWriteOperation op;
if (objv_tracker) {
objv_tracker->prepare_op_for_write(&op);
return r;
}
- ObjectWriteOperation op;
+ librados::ObjectWriteOperation op;
if (exclusive) {
op.create(true);
if (objv_tracker) {
objv_tracker->prepare_op_for_write(&op);
}
- if (ofs == -1) {
- op.write_full(bl);
- } else {
- op.write(ofs, bl);
- }
+ op.write_full(bl);
r = rados_obj.operate(&op);
if (r < 0)
return r;
class RGWSI_Zone;
+struct rgw_cache_entry_info;
+
struct RGWSysObjState {
rgw_raw_obj obj;
bool has_attrs{false};
rgw_raw_obj& obj,
bufferlist *bl, off_t ofs, off_t end,
map<string, bufferlist> *attrs,
+ rgw_cache_entry_info *cache_info,
boost::optional<obj_version>);
virtual int remove(RGWSysObjectCtxBase& obj_ctx,
virtual int set_attrs(rgw_raw_obj& obj,
map<string, bufferlist>& attrs,
+ map<string, bufferlist> *rmattrs,
RGWObjVersionTracker *objv_tracker);
virtual int omap_get_all(rgw_raw_obj& obj, std::map<string, bufferlist> *m);