return 0;
}
+int RGWRados::watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx) {
+ int r = control_pool_ctx.watch2(oid, watch_handle, ctx);
+ if (r < 0)
+ return r;
+ return 0;
+}
+
+int RGWRados::unwatch(uint64_t watch_handle)
+{
+ int r = control_pool_ctx.unwatch2(watch_handle);
+ if (r < 0) {
+ return r;
+ }
+ return 0;
+}
+
+void RGWRados::add_watcher(int i)
+{
+ ldout(cct, 20) << "add_watcher() i=" << i << dendl;
+ Mutex::Locker l(watchers_lock);
+ watchers_set.insert(i);
+ if (watchers_set.size() == (size_t)num_watchers) {
+ ldout(cct, 2) << "all " << num_watchers << " watchers are set, enabling cache" << dendl;
+ set_cache_enabled(true);
+ }
+}
+
+void RGWRados::remove_watcher(int i)
+{
+ ldout(cct, 20) << "remove_watcher() i=" << i << dendl;
+ Mutex::Locker l(watchers_lock);
+ size_t orig_size = watchers_set.size();
+ watchers_set.erase(i);
+ if (orig_size == (size_t)num_watchers &&
+ watchers_set.size() < orig_size) { /* actually removed */
+ ldout(cct, 2) << "removed watcher, disabling cache" << dendl;
+ set_cache_enabled(false);
+ }
+}
+
class RGWWatcher : public librados::WatchCtx2 {
RGWRados *rados;
+ int index;
string oid;
+ uint64_t watch_handle;
class C_ReinitWatch : public Context {
RGWWatcher *watcher;
}
};
public:
- RGWWatcher(RGWRados *r, const string& o) : rados(r), oid(o) {}
+ RGWWatcher(RGWRados *r, int i, const string& o) : rados(r), index(i), oid(o), watch_handle(0) {}
void handle_notify(uint64_t notify_id,
uint64_t cookie,
uint64_t notifier_id,
void handle_error(uint64_t cookie, int err) {
lderr(rados->ctx()) << "RGWWatcher::handle_error cookie " << cookie
<< " err " << cpp_strerror(err) << dendl;
- rados->set_cache_enabled(false);
+ rados->remove_watcher(index);
rados->schedule_context(new C_ReinitWatch(this));
}
void reinit() {
- rados->finalize_watch();
- int ret = rados->init_watch();
+ int ret = unregister_watch();
if (ret < 0) {
- ldout(rados->ctx(), 0) << "ERROR: init_watch() returned ret=" << ret << ", cache is disabled" << dendl;
+ ldout(rados->ctx(), 0) << "ERROR: unregister_watch() returned ret=" << ret << dendl;
return;
}
+ ret = register_watch();
+ if (ret < 0) {
+ ldout(rados->ctx(), 0) << "ERROR: register_watch() returned ret=" << ret << dendl;
+ return;
+ }
+ }
+
+ int unregister_watch() {
+ int r = rados->unwatch(watch_handle);
+ if (r < 0) {
+ return r;
+ }
+ rados->remove_watcher(index);
+ return 0;
+ }
+
+ int register_watch() {
+ int r = rados->watch(oid, &watch_handle, this);
+ if (r < 0) {
+ return r;
+ }
+ rados->add_watcher(index);
+ return 0;
}
};
void RGWRados::finalize_watch()
{
for (int i = 0; i < num_watchers; i++) {
- string& notify_oid = notify_oids[i];
- if (notify_oid.empty())
- continue;
- uint64_t watch_handle = watch_handles[i];
- control_pool_ctx.unwatch2(watch_handle);
-
RGWWatcher *watcher = watchers[i];
+ watcher->unregister_watch();
delete watcher;
}
delete[] notify_oids;
- delete[] watch_handles;
delete[] watchers;
}
notify_oids = new string[num_watchers];
watchers = new RGWWatcher *[num_watchers];
- watch_handles = new uint64_t[num_watchers];
for (int i=0; i < num_watchers; i++) {
string& notify_oid = notify_oids[i];
if (r < 0 && r != -EEXIST)
return r;
- RGWWatcher *watcher = new RGWWatcher(this, notify_oid);
+ RGWWatcher *watcher = new RGWWatcher(this, i, notify_oid);
watchers[i] = watcher;
- r = control_pool_ctx.watch2(notify_oid, &watch_handles[i], watcher);
+ r = watcher->register_watch();
if (r < 0)
return r;
}
void get_bucket_instance_ids(RGWBucketInfo& bucket_info, int shard_id, map<int, string> *result);
Mutex lock;
+ Mutex watchers_lock;
SafeTimer *timer;
class C_Tick : public Context {
int num_watchers;
RGWWatcher **watchers;
- uint64_t *watch_handles;
+ std::set<int> watchers_set;
librados::IoCtx root_pool_ctx; // .rgw
librados::IoCtx control_pool_ctx; // .rgw.control
bool watch_initialized;
Finisher *finisher;
public:
- RGWRados() : lock("rados_timer_lock"), timer(NULL),
+ RGWRados() : lock("rados_timer_lock"), watchers_lock("watchers_lock"), timer(NULL),
gc(NULL), use_gc_thread(false), quota_threads(false),
- num_watchers(0), watchers(NULL), watch_handles(NULL),
+ num_watchers(0), watchers(NULL),
watch_initialized(false),
bucket_id_lock("rados_bucket_id"),
bucket_index_max_shards(0),
virtual int update_containers_stats(map<string, RGWBucketEnt>& m);
virtual int append_async(rgw_obj& obj, size_t size, bufferlist& bl);
+ int watch(const string& oid, uint64_t *watch_handle, librados::WatchCtx2 *ctx);
+ int unwatch(uint64_t watch_handle);
+ void add_watcher(int i);
+ void remove_watcher(int i);
virtual bool need_watch_notify() { return false; }
virtual int init_watch();
virtual void finalize_watch();