}
int distribute_cache(const string& normal_name, rgw_obj& obj, ObjectCacheInfo& obj_info, int op);
- int watch_cb(int opcode, uint64_t ver, bufferlist& bl);
+ int watch_cb(uint64_t notify_id,
+ uint64_t cookie,
+ uint64_t notifier_id,
+ bufferlist& bl);
public:
RGWCache() {}
}
template <class T>
-int RGWCache<T>::watch_cb(int opcode, uint64_t ver, bufferlist& bl)
+int RGWCache<T>::watch_cb(uint64_t notify_id,
+ uint64_t cookie,
+ uint64_t notifier_id,
+ bufferlist& bl)
{
RGWCacheNotifyInfo info;
return 0;
}
-class RGWWatcher : public librados::WatchCtx {
+class RGWWatcher : public librados::WatchCtx2 {
RGWRados *rados;
+ string oid;
public:
- RGWWatcher(RGWRados *r) : rados(r) {}
- void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) {
- ldout(rados->ctx(), 10) << "RGWWatcher::notify() opcode=" << (int)opcode << " ver=" << ver << " bl.length()=" << bl.length() << dendl;
- rados->watch_cb(opcode, ver, bl);
+ RGWWatcher(RGWRados *r, const string& o) : rados(r), oid(o) {}
+ void handle_notify(uint64_t notify_id,
+ uint64_t cookie,
+ uint64_t notifier_id,
+ bufferlist& bl) {
+ ldout(rados->ctx(), 10) << "RGWWatcher::handle_notify() "
+ << " notify_id " << notify_id
+ << " cookie " << cookie
+ << " notifier " << notifier_id
+ << " bl.length()=" << bl.length() << dendl;
+ rados->watch_cb(notify_id, cookie, notifier_id, bl);
+
+ bufferlist reply_bl; // empty reply payload
+ rados->control_pool_ctx.notify_ack(oid, notify_id, cookie, reply_bl);
+ }
+ void handle_error(uint64_t cookie, int err) {
+ lderr(rados->ctx()) << "RGWWatcher::handle_error cookie " << cookie
+ << " err " << cpp_strerror(err) << dendl;
}
};
if (notify_oid.empty())
continue;
uint64_t watch_handle = watch_handles[i];
- control_pool_ctx.unwatch(notify_oid, watch_handle);
+ control_pool_ctx.unwatch2(watch_handle);
RGWWatcher *watcher = watchers[i];
delete watcher;
if (r < 0 && r != -EEXIST)
return r;
- RGWWatcher *watcher = new RGWWatcher(this);
+ RGWWatcher *watcher = new RGWWatcher(this, notify_oid);
watchers[i] = watcher;
- r = control_pool_ctx.watch(notify_oid, 0, &watch_handles[i], watcher);
+ r = control_pool_ctx.watch2(notify_oid, &watch_handles[i], watcher);
if (r < 0)
return r;
}
pick_control_oid(key, notify_oid);
ldout(cct, 10) << "distributing notification oid=" << notify_oid << " bl.length()=" << bl.length() << dendl;
- int r = control_pool_ctx.notify(notify_oid, 0, bl);
+ int r = control_pool_ctx.notify2(notify_oid, bl, 0, NULL);
return r;
}
librados::IoCtx control_pool_ctx; // .rgw.control
bool watch_initialized;
+ friend class RGWWatcher;
+
Mutex bucket_id_lock;
// This field represents the number of bucket index object shards
virtual int init_watch();
virtual void finalize_watch();
virtual int distribute(const string& key, bufferlist& bl);
- virtual int watch_cb(int opcode, uint64_t ver, bufferlist& bl) { return 0; }
+ virtual int watch_cb(uint64_t notify_id,
+ uint64_t cookie,
+ uint64_t notifier_id,
+ bufferlist& bl) { return 0; }
void pick_control_oid(const string& key, string& notify_oid);
void set_atomic(void *ctx, rgw_obj& obj) {