* @typedef rados_watchcb_t
*
* Callback activated when a notify is received on a watched
- * object. Parameters are:
- * - opcode undefined
- * - ver version of the watched object
- * - arg application-specific data
+ * object.
+ *
+ * @param opcode undefined
+ * @param ver version of the watched object
+ * @param arg application-specific data
*
* @note BUG: opcode is an internal detail that shouldn't be exposed
+ * @note BUG: ver is unused
*/
typedef void (*rados_watchcb_t)(uint8_t opcode, uint64_t ver, void *arg);
* @typedef rados_watchcb2_t
*
* Callback activated when a notify is received on a watched
- * object. Parameters are:
- * - arg opaque user-defined value provided to rados_watch2()
- * - notify_id an id for this notify event
- * - handle the watcher handle we are notifying
- * - notifier_id the unique client id for the notifier
- * - data payload from the notifier
- * - datalen length of payload buffer
+ * object.
+ *
+ * @param arg opaque user-defined value provided to rados_watch2()
+ * @param notify_id an id for this notify event
+ * @param handle the watcher handle we are notifying
+ * @param notifier_id the unique client id for the notifier
+ * @param data payload from the notifier
+ * @param datalen length of payload buffer
*/
typedef void (*rados_watchcb2_t)(void *arg,
uint64_t notify_id,
void *data,
size_t data_len);
+/**
+ * @typedef rados_watcherrcb_t
+ *
+ * Callback activated when we encounter an error with the watch session.
+ * This can happen when the location of the objects moves within the
+ * cluster and we fail to register our watch with the new object location,
+ * or when our connection with the object OSD is otherwise interrupted and
+ * we may have missed notify events.
+ *
+ * @param pre opaque user-defined value provided to rados_watch2()
+ * @param err error code
+ */
+ typedef void (*rados_watcherrcb_t)(void *pre, uint64_t cookie, int err);
+
/**
* Register an interest in an object
*
* @param o the object to watch
* @param handle where to store the internal id assigned to this watch
* @param watchcb2 what to do when a notify is received on this object
+ * @param watcherrcb what to do when the watch session encounters an error
* @param arg opaque value to pass to the callback
* @returns 0 on success, negative error code on failure
*/
int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handle,
- rados_watchcb2_t watchcb, void *arg);
+ rados_watchcb2_t watchcb, rados_watcherrcb_t watcherrcb,
+ void *arg);
/**
* Unregister an interest in an object
public:
virtual ~WatchCtx2();
/**
+ * Callback activated when we receive a notify event.
+ *
* @param notify_id unique id for this notify event
* @param cookie the watcher we are notifying
* @param notifier_id the unique client id of the notifier
uint64_t cookie,
uint64_t notifier_id,
bufferlist& bl) = 0;
+ /**
+ * Callback activated when we encounter an error with the watch.
+ *
+ * @param cookie the watcher with the problem
+ * @param err error
+ */
+ virtual void handle_error(uint64_t cookie, int err) = 0;
};
struct CEPH_RADOS_API AioCompletion {
struct C_WatchCB2 : public librados::WatchCtx2 {
rados_watchcb2_t wcb;
+ rados_watcherrcb_t errcb;
void *arg;
- C_WatchCB2(rados_watchcb2_t _wcb, void *_arg) : wcb(_wcb), arg(_arg) {}
+ C_WatchCB2(rados_watchcb2_t _wcb,
+ rados_watcherrcb_t _errcb,
+ void *_arg) : wcb(_wcb), errcb(_errcb), arg(_arg) {}
void handle_notify(uint64_t notify_id,
uint64_t cookie,
uint64_t notifier_gid,
bufferlist& bl) {
wcb(arg, notify_id, cookie, notifier_gid, bl.c_str(), bl.length());
}
+ void handle_error(uint64_t cookie, int err) {
+ errcb(arg, cookie, err);
+ }
};
extern "C" int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handle,
- rados_watchcb2_t watchcb, void *arg)
+ rados_watchcb2_t watchcb,
+ rados_watcherrcb_t watcherrcb,
+ void *arg)
{
tracepoint(librados, rados_watch2_enter, io, o, handle, watchcb, arg);
uint64_t *cookie = handle;
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
- C_WatchCB2 *wc = new C_WatchCB2(watchcb, arg);
+ C_WatchCB2 *wc = new C_WatchCB2(watchcb, watcherrcb, arg);
int ret = ctx->watch(oid, cookie, NULL, wc);
tracepoint(librados, rados_watch_exit, ret, *handle);
return ret;
bufferlist notify_bl;
rados_ioctx_t notify_io;
const char *notify_oid = 0;
+int notify_err = 0;
static void watch_notify2_test_cb(void *arg,
uint64_t notify_id,
sleep(notify_sleep);
notify_ioctx->notify_ack(notify_oid, notify_id, cookie, reply);
}
+
+ void handle_error(uint64_t cookie, int err) {
+ std::cout << __func__ << " cookie " << cookie << std::endl;
+ notify_err = err;
+ }
};
TEST_F(LibRadosWatchNotify, WatchNotifyTest) {
ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
uint64_t handle;
ASSERT_EQ(0,
- rados_watch2(ioctx, notify_oid, &handle, watch_notify2_test_cb, NULL));
+ rados_watch2(ioctx, notify_oid, &handle, watch_notify2_test_cb,
+ watch_notify2_test_errcb, NULL));
char *reply_buf;
size_t reply_buf_len;
ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
uint64_t handle1, handle2;
ASSERT_EQ(0,
- rados_watch2(ioctx, notify_oid, &handle1, watch_notify2_test_cb, NULL));
+ rados_watch2(ioctx, notify_oid, &handle1, watch_notify2_test_cb,
+ watch_notify2_test_errcb, NULL));
ASSERT_EQ(0,
- rados_watch2(ioctx, notify_oid, &handle2, watch_notify2_test_cb, NULL));
+ rados_watch2(ioctx, notify_oid, &handle2, watch_notify2_test_cb,
+ watch_notify2_test_errcb, NULL));
ASSERT_NE(handle1, handle2);
char *reply_buf;
size_t reply_buf_len;
ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
uint64_t handle;
ASSERT_EQ(0,
- rados_watch2(ioctx, notify_oid, &handle, watch_notify2_test_cb, NULL));
+ rados_watch2(ioctx, notify_oid, &handle, watch_notify2_test_cb,
+ watch_notify2_test_errcb, NULL));
char *reply_buf;
size_t reply_buf_len;
ASSERT_EQ(-ETIMEDOUT, rados_notify2(ioctx, notify_oid,