void *data,
size_t data_len);
+/**
+ * @typedef rados_watchfailcb_t
+ *
+ * Callback activated when a notify is not acked in a timely manner,
+ * resulting in a timeout for the notifier.
+ *
+ * @param arg opaque user-defined value provided to rados_watch2()
+ * @param notify_id an id for this notify event
+ * @param handle the watcher handle we are notifying
+ * @param notifier_id the unique client id for the notifier
+ */
+typedef void (*rados_watchfailcb_t)(void *arg,
+ uint64_t notify_id,
+ uint64_t handle,
+ uint64_t notifier_id);
+
/**
* @typedef rados_watcherrcb_t
*
* @param o the object to watch
* @param handle where to store the internal id assigned to this watch
* @param watchcb2 what to do when a notify is received on this object
+ * @param watchfailcb what to do when a notify is not acked in time
* @param watcherrcb what to do when the watch session encounters an error
* @param arg opaque value to pass to the callback
* @returns 0 on success, negative error code on failure
*/
int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handle,
- rados_watchcb2_t watchcb, rados_watcherrcb_t watcherrcb,
+ rados_watchcb2_t watchcb,
+ rados_watchfailcb_t watchfailcb,
+ rados_watcherrcb_t watcherrcb,
void *arg);
/**
uint64_t cookie,
uint64_t notifier_id,
bufferlist& bl) = 0;
+
+ /**
+ * Callback activated when we are too slow to ack a notify
+ *
+ * If we fail to ack a notify or our ack doesn't arrive in time
+ * and a notify timeout is triggered for another client, this
+ * callback will be triggered to let us know about it.
+ *
+ * @param notify_id unique id for this notify event
+ * @param cookie the watcher we are notifying
+ * @param notifier_id the unique client id of the notifier
+ */
+ virtual void handle_failed_notify(uint64_t notify_id,
+ uint64_t cookie,
+ uint64_t notifier_id) = 0;
+
/**
* Callback activated when we encounter an error with the watch.
*
assert(wc);
if (wc->notify_lock) {
// we sent a notify and it completed (or failed)
+ // NOTE: opcode may be either NOTIFY (older OSDs) or NOTIFY_COMPLETE
+ // (newer OSDs). In practice it doesn't matter because completion is the
+ // only kind of event we get on notify cookies.
ldout(cct,10) << __func__ << " completed notify " << *m << dendl;
wc->notify_lock->Lock();
*wc->notify_done = true;
}
wc->notify_cond->Signal();
wc->notify_lock->Unlock();
- } else {
+ } else if (m->opcode == CEPH_WATCH_EVENT_NOTIFY) {
// we are watcher and got a notify
ldout(cct,10) << __func__ << " got notify " << *m << dendl;
wc->get();
lock.Lock();
ldout(cct,10) << __func__ << " notify done" << dendl;
wc->put();
+ } else if (m->opcode == CEPH_WATCH_EVENT_FAILED_NOTIFY) {
+ // we are watcher and failed to ack a notify in time, causing it to time
+ // out.
+ ldout(cct,10) << __func__ << " failed notify " << *m << dendl;
+ wc->get();
+ // trigger the callback
+ assert(!!wc->watch_ctx ^ !!wc->watch_ctx2); // only one is defined
+ lock.Unlock();
+ if (wc->watch_ctx2) {
+ wc->watch_ctx2->handle_failed_notify(m->notify_id, m->cookie,
+ m->notifier_gid);
+ }
+ lock.Lock();
+ ldout(cct,10) << __func__ << " failed notify done" << dendl;
+ wc->put();
+ } else {
+ lderr(cct) << __func__ << " got unknown event " << m->opcode
+ << " " << ceph_watch_event_name(m->opcode) << dendl;
}
} else {
ldout(cct, 4) << __func__ << " unknown cookie " << m->cookie << dendl;
struct C_WatchCB2 : public librados::WatchCtx2 {
rados_watchcb2_t wcb;
+ rados_watchfailcb_t failcb;
rados_watcherrcb_t errcb;
void *arg;
C_WatchCB2(rados_watchcb2_t _wcb,
+ rados_watchfailcb_t _failcb,
rados_watcherrcb_t _errcb,
- void *_arg) : wcb(_wcb), errcb(_errcb), arg(_arg) {}
+ void *_arg) : wcb(_wcb), failcb(_failcb), errcb(_errcb), arg(_arg) {}
void handle_notify(uint64_t notify_id,
uint64_t cookie,
uint64_t notifier_gid,
bufferlist& bl) {
wcb(arg, notify_id, cookie, notifier_gid, bl.c_str(), bl.length());
}
+ void handle_failed_notify(uint64_t notify_id,
+ uint64_t cookie,
+ uint64_t notifier_gid) {
+ if (failcb)
+ failcb(arg, notify_id, cookie, notifier_gid);
+ }
void handle_error(uint64_t cookie, int err) {
if (errcb)
errcb(arg, cookie, err);
extern "C" int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handle,
rados_watchcb2_t watchcb,
+ rados_watchfailcb_t watchfailcb,
rados_watcherrcb_t watcherrcb,
void *arg)
{
uint64_t *cookie = handle;
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
- C_WatchCB2 *wc = new C_WatchCB2(watchcb, watcherrcb, arg);
+ C_WatchCB2 *wc = new C_WatchCB2(watchcb, watchfailcb, watcherrcb, arg);
ret = ctx->watch(oid, cookie, NULL, wc);
}
tracepoint(librados, rados_watch_exit, ret, handle ? *handle : 0);
rados_ioctx_t notify_io;
const char *notify_oid = 0;
int notify_err = 0;
+bool notify_failed = false;
static void watch_notify2_test_cb(void *arg,
uint64_t notify_id,
rados_notify_ack(notify_io, notify_oid, notify_id, cookie, "reply", 5);
}
+static void watch_notify2_test_failcb(void *arg,
+ uint64_t notify_id,
+ uint64_t cookie,
+ uint64_t notifier_gid)
+{
+ std::cout << __func__ << " from " << notifier_gid << " notify_id " << notify_id
+ << " cookie " << cookie << std::endl;
+ notify_failed = true;
+}
+
static void watch_notify2_test_errcb(void *arg, uint64_t cookie, int err)
{
std::cout << __func__ << " cookie " << cookie << std::endl;
{
public:
void handle_notify(uint64_t notify_id, uint64_t cookie, uint64_t notifier_gid,
- bufferlist& bl)
- {
+ bufferlist& bl) {
std::cout << __func__ << " cookie " << cookie << " notify_id " << notify_id
<< " notifier_gid " << notifier_gid << std::endl;
notify_bl = bl;
notify_ioctx->notify_ack(notify_oid, notify_id, cookie, reply);
}
+ void handle_failed_notify(uint64_t notify_id, uint64_t cookie,
+ uint64_t notifier_gid) {
+ std::cout << __func__ << " cookie " << cookie << " notify_id " << notify_id
+ << " notifier_gid " << notifier_gid << std::endl;
+ notify_failed = true;
+ }
+
void handle_error(uint64_t cookie, int err) {
std::cout << __func__ << " cookie " << cookie << std::endl;
notify_err = err;
ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
uint64_t handle;
ASSERT_EQ(0,
- rados_watch2(ioctx, notify_oid, &handle, watch_notify2_test_cb,
+ rados_watch2(ioctx, notify_oid, &handle,
+ watch_notify2_test_cb,
+ watch_notify2_test_failcb,
watch_notify2_test_errcb, NULL));
char *reply_buf;
size_t reply_buf_len;
ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
uint64_t handle1, handle2;
ASSERT_EQ(0,
- rados_watch2(ioctx, notify_oid, &handle1, watch_notify2_test_cb,
+ rados_watch2(ioctx, notify_oid, &handle1,
+ watch_notify2_test_cb,
+ watch_notify2_test_failcb,
watch_notify2_test_errcb, NULL));
ASSERT_EQ(0,
- rados_watch2(ioctx, notify_oid, &handle2, watch_notify2_test_cb,
+ rados_watch2(ioctx, notify_oid, &handle2,
+ watch_notify2_test_cb,
+ watch_notify2_test_failcb,
watch_notify2_test_errcb, NULL));
ASSERT_NE(handle1, handle2);
char *reply_buf;
notify_oid = "foo";
notify_sleep = 3; // 3s
notify_cookies.clear();
+ notify_failed = false;
char buf[128];
memset(buf, 0xcc, sizeof(buf));
ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
uint64_t handle;
ASSERT_EQ(0,
- rados_watch2(ioctx, notify_oid, &handle, watch_notify2_test_cb,
+ rados_watch2(ioctx, notify_oid, &handle,
+ watch_notify2_test_cb,
+ watch_notify2_test_failcb,
watch_notify2_test_errcb, NULL));
char *reply_buf;
size_t reply_buf_len;
"notify", 6, 1000, // 1s
&reply_buf, &reply_buf_len));
ASSERT_EQ(1, notify_cookies.size());
+ int wait = 10;
+ while (!notify_failed && --wait)
+ sleep(1);
+ ASSERT_TRUE(notify_failed);
rados_unwatch(ioctx, notify_oid, handle);
}
notify_ioctx = &ioctx;
notify_sleep = 3; // 3s
notify_cookies.clear();
+ notify_failed = true;
char buf[128];
memset(buf, 0xcc, sizeof(buf));
bufferlist bl1;
ASSERT_EQ(0, notify_cookies.size());
bufferlist bl2, bl_reply;
ASSERT_EQ(-ETIMEDOUT, ioctx.notify2(notify_oid, bl2, 1000 /* 1s */, &bl_reply));
+ int wait = 10;
+ while (!notify_failed && --wait)
+ sleep(1);
+ ASSERT_TRUE(notify_failed);
ioctx.unwatch(notify_oid, handle);
}
bufferlist empty;
ioctx.notify_ack(name, notify_id, cookie, empty);
}
+ void handle_failed_notify(uint64_t notify_id,
+ uint64_t cookie,
+ uint64_t notifier_id) {
+ cout << "FAILED_NOTIFY"
+ << " cookie " << cookie
+ << " notify_id " << notify_id
+ << " from " << notifier_id
+ << std::endl;
+ }
void handle_error(uint64_t cookie, int err) {
cout << "ERROR"
<< " cookie " << cookie