Decided this wasn't useful.
Signed-off-by: Sage Weil <sage@redhat.com>
switch (e) {
case CEPH_WATCH_EVENT_NOTIFY: return "notify";
case CEPH_WATCH_EVENT_NOTIFY_COMPLETE: return "notify_complete";
- case CEPH_WATCH_EVENT_FAILED_NOTIFY: return "failed_notify";
case CEPH_WATCH_EVENT_DISCONNECT: return "disconnect";
}
return "???";
enum {
CEPH_WATCH_EVENT_NOTIFY = 1, /* notifying watcher */
CEPH_WATCH_EVENT_NOTIFY_COMPLETE = 2, /* notifier notified when done */
- CEPH_WATCH_EVENT_FAILED_NOTIFY = 3, /* we made a notify time out */
- CEPH_WATCH_EVENT_DISCONNECT = 4, /* we were disconnected */
+ CEPH_WATCH_EVENT_DISCONNECT = 3, /* we were disconnected */
};
const char *ceph_watch_event_name(int o);
void *data,
size_t data_len);
-/**
- * @typedef rados_watchfailcb_t
- *
- * Callback activated when a notify is not acked in a timely manner,
- * resulting in a timeout for the notifier.
- *
- * @param arg opaque user-defined value provided to rados_watch2()
- * @param notify_id an id for this notify event
- * @param handle the watcher handle we are notifying
- * @param notifier_id the unique client id for the notifier
- */
-typedef void (*rados_watchfailcb_t)(void *arg,
- uint64_t notify_id,
- uint64_t handle,
- uint64_t notifier_id);
-
/**
* @typedef rados_watcherrcb_t
*
* @param o the object to watch
* @param cookie where to store the internal id assigned to this watch
* @param watchcb2 what to do when a notify is received on this object
- * @param watchfailcb what to do when a notify is not acked in time
* @param watcherrcb what to do when the watch session encounters an error
* @param arg opaque value to pass to the callback
* @returns 0 on success, negative error code on failure
*/
CEPH_RADOS_API int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *cookie,
rados_watchcb2_t watchcb,
- rados_watchfailcb_t watchfailcb,
rados_watcherrcb_t watcherrcb,
void *arg);
* @param cookie the watch handle
* @returns ms since last confirmed on success, negative error code on failure
*/
-int rados_watch_check(rados_ioctx_t io, uint64_t cookie);
+CEPH_RADOS_API int rados_watch_check(rados_ioctx_t io, uint64_t cookie);
/**
* Unregister an interest in an object
uint64_t notifier_id,
bufferlist& bl) = 0;
- /**
- * Callback activated when we are too slow to ack a notify
- *
- * If we fail to ack a notify or our ack doesn't arrive in time
- * and a notify timeout is triggered for another client, this
- * callback will be triggered to let us know about it.
- *
- * @param notify_id unique id for this notify event
- * @param cookie the watcher we are notifying
- * @param notifier_id the unique client id of the notifier
- */
- virtual void handle_failed_notify(uint64_t notify_id,
- uint64_t cookie,
- uint64_t notifier_id) = 0;
-
/**
* Callback activated when we encounter an error with the watch.
*
ioctx->notify_ack(oid, notify_id, cookie, empty);
}
}
- void handle_failed_notify(uint64_t notify_id,
- uint64_t cookie,
- uint64_t notifier_id) {
- ldout(ioctx->client->cct, 10) << __func__ << " " << notify_id
- << " cookie " << cookie
- << " notifier_id " << notifier_id
- << dendl;
- if (ctx2)
- ctx2->handle_failed_notify(notify_id, cookie, notifier_id);
- }
void handle_error(uint64_t cookie, int err) {
ldout(ioctx->client->cct, 10) << __func__ << " cookie " << cookie
<< " err " << err
struct C_WatchCB2 : public librados::WatchCtx2 {
rados_watchcb2_t wcb;
- rados_watchfailcb_t failcb;
rados_watcherrcb_t errcb;
void *arg;
C_WatchCB2(rados_watchcb2_t _wcb,
- rados_watchfailcb_t _failcb,
rados_watcherrcb_t _errcb,
- void *_arg) : wcb(_wcb), failcb(_failcb), errcb(_errcb), arg(_arg) {}
+ void *_arg) : wcb(_wcb), errcb(_errcb), arg(_arg) {}
void handle_notify(uint64_t notify_id,
uint64_t cookie,
uint64_t notifier_gid,
bufferlist& bl) {
wcb(arg, notify_id, cookie, notifier_gid, bl.c_str(), bl.length());
}
- void handle_failed_notify(uint64_t notify_id,
- uint64_t cookie,
- uint64_t notifier_gid) {
- if (failcb)
- failcb(arg, notify_id, cookie, notifier_gid);
- }
void handle_error(uint64_t cookie, int err) {
if (errcb)
errcb(arg, cookie, err);
extern "C" int rados_watch2(rados_ioctx_t io, const char *o, uint64_t *handle,
rados_watchcb2_t watchcb,
- rados_watchfailcb_t watchfailcb,
rados_watcherrcb_t watcherrcb,
void *arg)
{
uint64_t *cookie = handle;
librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
object_t oid(o);
- C_WatchCB2 *wc = new C_WatchCB2(watchcb, watchfailcb, watcherrcb, arg);
+ C_WatchCB2 *wc = new C_WatchCB2(watchcb, watcherrcb, arg);
ret = ctx->watch(oid, cookie, NULL, wc);
}
tracepoint(librados, rados_watch_exit, ret, handle ? *handle : 0);
::encode(notify_replies, bl);
list<pair<uint64_t,uint64_t> > missed;
for (set<WatchRef>::iterator p = watchers.begin(); p != watchers.end(); ++p) {
- (*p)->send_failed_notify(this);
missed.push_back(make_pair((*p)->get_watcher_gid(),
(*p)->get_cookie()));
}
conn->send_message(notify_msg);
}
-void Watch::send_failed_notify(Notify *notif)
-{
- if (!conn)
- return;
- bufferlist empty;
- MWatchNotify *reply(new MWatchNotify(cookie, notif->version, notif->notify_id,
- CEPH_WATCH_EVENT_FAILED_NOTIFY, empty));
- reply->notifier_gid = notif->client_gid;
- conn->send_message(reply);
-}
-
void Watch::send_disconnect()
{
if (!conn)
return conn.get() != NULL;
}
- /// send a failed notify message
- void send_failed_notify(Notify *notif);
-
/// send a disconnect notice to the client
void send_disconnect();
m->notifier_gid, m->bl);
break;
- case CEPH_WATCH_EVENT_FAILED_NOTIFY:
- info->watch_context->handle_failed_notify(m->notify_id, m->cookie,
- m->notifier_gid);
- break;
-
case CEPH_WATCH_EVENT_DISCONNECT:
info->watch_context->handle_error(m->cookie, -ENOTCONN);
break;
uint64_t cookie,
uint64_t notifier_id,
bufferlist& bl) = 0;
- virtual void handle_failed_notify(uint64_t notify_id,
- uint64_t cookie,
- uint64_t notifier_id) = 0;
virtual void handle_error(uint64_t cookie, int err) = 0;
virtual ~WatchContext() {}
};
rados_ioctx_t notify_io;
const char *notify_oid = 0;
int notify_err = 0;
-bool notify_failed = false;
static void watch_notify2_test_cb(void *arg,
uint64_t notify_id,
rados_notify_ack(notify_io, notify_oid, notify_id, cookie, "reply", 5);
}
-static void watch_notify2_test_failcb(void *arg,
- uint64_t notify_id,
- uint64_t cookie,
- uint64_t notifier_gid)
-{
- std::cout << __func__ << " from " << notifier_gid << " notify_id " << notify_id
- << " cookie " << cookie << std::endl;
- notify_failed = true;
-}
-
static void watch_notify2_test_errcb(void *arg, uint64_t cookie, int err)
{
std::cout << __func__ << " cookie " << cookie << std::endl;
notify_ioctx->notify_ack(notify_oid, notify_id, cookie, reply);
}
- void handle_failed_notify(uint64_t notify_id, uint64_t cookie,
- uint64_t notifier_gid) {
- std::cout << __func__ << " cookie " << cookie << " notify_id " << notify_id
- << " notifier_gid " << notifier_gid << std::endl;
- notify_failed = true;
- }
-
void handle_error(uint64_t cookie, int err) {
std::cout << __func__ << " cookie " << cookie << std::endl;
notify_err = err;
ASSERT_EQ(0,
rados_watch2(ioctx, notify_oid, &handle,
watch_notify2_test_cb,
- watch_notify2_test_failcb,
watch_notify2_test_errcb, NULL));
int age = rados_watch_check(ioctx, handle);
time_t age_bound = time(0) + 1 - start;
ASSERT_EQ(0,
rados_watch2(ioctx, notify_oid, &handle,
watch_notify2_test_cb,
- watch_notify2_test_failcb,
watch_notify2_test_errcb, NULL));
ASSERT_TRUE(rados_watch_check(ioctx, handle) > 0);
ASSERT_EQ(0,
rados_watch2(ioctx, notify_oid, &handle,
watch_notify2_test_cb,
- watch_notify2_test_failcb,
watch_notify2_test_errcb, NULL));
ASSERT_TRUE(rados_watch_check(ioctx, handle) > 0);
char *reply_buf = 0;
ASSERT_EQ(0,
rados_watch2(ioctx, notify_oid, &handle1,
watch_notify2_test_cb,
- watch_notify2_test_failcb,
watch_notify2_test_errcb, NULL));
ASSERT_EQ(0,
rados_watch2(ioctx, notify_oid, &handle2,
watch_notify2_test_cb,
- watch_notify2_test_failcb,
watch_notify2_test_errcb, NULL));
ASSERT_TRUE(rados_watch_check(ioctx, handle1) > 0);
ASSERT_TRUE(rados_watch_check(ioctx, handle2) > 0);
notify_oid = "foo";
notify_sleep = 3; // 3s
notify_cookies.clear();
- notify_failed = false;
char buf[128];
memset(buf, 0xcc, sizeof(buf));
ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
ASSERT_EQ(0,
rados_watch2(ioctx, notify_oid, &handle,
watch_notify2_test_cb,
- watch_notify2_test_failcb,
watch_notify2_test_errcb, NULL));
ASSERT_TRUE(rados_watch_check(ioctx, handle) > 0);
char *reply_buf = 0;
"notify", 6, 1000, // 1s
&reply_buf, &reply_buf_len));
ASSERT_EQ(1u, notify_cookies.size());
- int wait = 10;
- while (!notify_failed && --wait)
- sleep(1);
- ASSERT_TRUE(notify_failed);
{
bufferlist reply;
reply.append(reply_buf, reply_buf_len);
rados_buffer_free(reply_buf);
// we should get the next notify, though!
- notify_failed = false;
notify_sleep = 0;
notify_cookies.clear();
ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
notify_ioctx = &ioctx;
notify_sleep = 3; // 3s
notify_cookies.clear();
- notify_failed = true;
char buf[128];
memset(buf, 0xcc, sizeof(buf));
bufferlist bl1;
bufferlist bl2, bl_reply;
ASSERT_EQ(-ETIMEDOUT, ioctx.notify(notify_oid, bl2, 1000 /* 1s */,
&bl_reply));
- int wait = 10;
- while (!notify_failed && --wait)
- sleep(1);
- ASSERT_TRUE(notify_failed);
ASSERT_TRUE(ioctx.watch_check(handle) > 0);
ioctx.unwatch(handle);
}
waiting = false;
cond.SignalAll();
}
- void handle_failed_notify(uint64_t notify_id, uint64_t cookie,
- uint64_t notifier_id) {
- Mutex::Locker l(lock);
- cout << "watch handle_failed_notify" << std::endl;
- }
void handle_error(uint64_t cookie, int err) {
Mutex::Locker l(lock);
cout << "watch handle_error " << err << std::endl;
bl.hexdump(cout);
ioctx.notify_ack(name, notify_id, cookie, bl);
}
- void handle_failed_notify(uint64_t notify_id,
- uint64_t cookie,
- uint64_t notifier_id) {
- cout << "FAILED_NOTIFY"
- << " cookie " << cookie
- << " notify_id " << notify_id
- << " from " << notifier_id
- << std::endl;
- }
void handle_error(uint64_t cookie, int err) {
cout << "ERROR"
<< " cookie " << cookie