rados_watcherrcb_t watcherrcb,
void *arg);
+/**
+ * Check on the status of a watch
+ *
+ * Return the number of milliseconds since the watch was last confirmed.
+ * Or, if there has been an error, return that.
+ *
+ * If there is an error, the watch is no longer valid, and should be
+ * destroyed with rados_unwatch2(). The the user is still interested
+ * in the object, a new watch should be created with rados_watch2().
+ *
+ * @param io the pool the object is in
+ * @param handle the watch handle
+ * @returns ms since last confirmed on success, negative error code on failure
+ */
+int rados_watch_check(rados_ioctx_t io, uint64_t handle);
+
/**
* Unregister an interest in an object
*
uint64_t handle, ///< our watch handle
bufferlist& bl); ///< optional reply payload
+ /***
+ * check on watch validity
+ *
+ * Check if a watch is valid. If so, return the number of
+ * milliseconds since we last confirmed its liveness. If there is
+ * a known error, return it.
+ *
+ * If there is an error, the watch is no longer valid, and should
+ * be destroyed with unwatch(). The the user is still interested
+ * in the object, a new watch should be created with watch().
+ *
+ * @param handle watch handle
+ * @returns ms since last confirmed valid, or error
+ */
+ int watch_check(uint64_t handle);
+
// old, deprecated versions
int watch(const std::string& o, uint64_t ver, uint64_t *handle,
librados::WatchCtx *ctx) __attribute__ ((deprecated));
uint64_t to_nsec() const {
return (uint64_t)tv.tv_nsec + (uint64_t)tv.tv_sec * 1000000000ull;
}
+ uint64_t to_msec() const {
+ return (uint64_t)tv.tv_nsec / 1000000ull + (uint64_t)tv.tv_sec * 1000ull;
+ }
void copy_to_timeval(struct timeval *v) const {
v->tv_sec = tv.tv_sec;
int librados::IoCtxImpl::watch_check(uint64_t cookie)
{
- Mutex::Locker(*lock);
+ Mutex::Locker l(*lock);
return client->watch_check(cookie);
}
void set_sync_op_version(version_t ver);
int watch(const object_t& oid, uint64_t *cookie, librados::WatchCtx *ctx,
librados::WatchCtx2 *ctx2);
+ int watch_check(uint64_t cookie);
int unwatch(uint64_t cookie);
int notify(const object_t& oid, bufferlist& bl, uint64_t timeout_ms,
bufferlist *preplybl, char **preply_buf, size_t *preply_buf_len);
const object_t oid; // the object
uint64_t linger_id; // we use this to unlinger when we are done
uint64_t cookie; // callback cookie
+ int err;
// watcher. only one of these will be defined.
librados::WatchCtx *watch_ctx;
oid(_oc),
linger_id(0),
cookie(0),
+ err(0),
watch_ctx(NULL),
watch_ctx2(NULL),
notify_lock(NULL),
lock.Lock();
return 0;
}
+
+int librados::RadosClient::watch_check(uint64_t cookie)
+{
+ ldout(cct,10) << __func__ << " cookie " << cookie << dendl;
+ map<uint64_t, WatchNotifyInfo *>::iterator iter = watch_notify_info.find(cookie);
+ if (iter == watch_notify_info.end())
+ return -EBADF;
+ WatchNotifyInfo *ctx = iter->second;
+ if (ctx->err)
+ return ctx->err;
+ return objecter->linger_check(ctx->linger_id);
}
struct C_DoWatchNotify : public Context {
if (iter != watch_notify_info.end()) {
WatchNotifyInfo *wc = iter->second;
assert(wc);
+ wc->err = err;
if (wc->watch_ctx2) {
wc->get();
ldout(cct,10) << __func__ << " cookie " << cookie
} else if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
// we failed to ping or reconnect and our watch was canceled.
ldout(cct,10) << __func__ << " disconnect " << *m << dendl;
+ wc->err = -ENOTCONN;
if (wc->watch_ctx2) {
wc->get();
// trigger the callback
void register_watch_notify_callback(librados::WatchNotifyInfo *wc,
uint64_t *cookie);
int unregister_watch_notify_callback(uint64_t cookie, object_t *poid);
+ int watch_check(uint64_t cookie);
void handle_watch_notify(MWatchNotify *m);
void do_watch_notify(MWatchNotify *m);
void do_watch_error(uint64_t cookie, int err);
return io_ctx_impl->unwatch(handle);
}
+int librados::IoCtx::watch_check(uint64_t handle)
+{
+ return io_ctx_impl->watch_check(handle);
+}
+
int librados::IoCtx::notify(const string& oid, uint64_t ver, bufferlist& bl)
{
object_t obj(oid);
return retval;
}
+extern "C" int rados_watch_check(rados_ioctx_t io, uint64_t handle)
+{
+ tracepoint(librados, rados_watch_check_enter, io, handle);
+ uint64_t cookie = handle;
+ librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
+ int retval = ctx->watch_check(cookie);
+ tracepoint(librados, rados_watch_check_exit, retval);
+ return retval;
+}
+
extern "C" int rados_notify(rados_ioctx_t io, const char *o,
uint64_t ver, const char *buf, int buf_len)
{
info->watch_lock.Unlock();
}
+int Objecter::linger_check(uint64_t linger_id)
+{
+ RWLock::WLocker wl(rwlock);
+ map<uint64_t, LingerOp*>::iterator iter = linger_ops.find(linger_id);
+ if (iter == linger_ops.end()) {
+ ldout(cct, 10) << __func__ << " " << linger_id << " dne" << dendl;
+ return -EBADF;
+ }
+
+ LingerOp *info = iter->second;
+ utime_t age = ceph_clock_now(NULL) - info->watch_valid_thru;
+ ldout(cct, 10) << __func__ << " " << linger_id
+ << " err " << info->last_error
+ << " age " << age << dendl;
+ if (info->last_error)
+ return info->last_error;
+ return age.to_msec();
+}
+
void Objecter::unregister_linger(uint64_t linger_id)
{
RWLock::WLocker wl(rwlock);
info->on_reg_ack = onack;
info->on_reg_commit = oncommit;
info->on_error = onerror;
+ info->watch_valid_thru = ceph_clock_now(NULL);
RWLock::WLocker wl(rwlock);
_linger_submit(info);
snapid_t snap, bufferlist& inbl, bufferlist *poutbl, int flags,
Context *onack,
version_t *objver);
+ int linger_check(uint64_t linger_id);
void unregister_linger(uint64_t linger_id);
void _unregister_linger(uint64_t linger_id);
memset(buf, 0xcc, sizeof(buf));
ASSERT_EQ(0, rados_write(ioctx, notify_oid, buf, sizeof(buf), 0));
uint64_t handle;
+ time_t start = time(0);
ASSERT_EQ(0,
rados_watch2(ioctx, notify_oid, &handle,
watch_notify2_test_cb,
watch_notify2_test_failcb,
watch_notify2_test_errcb, NULL));
+ int age = rados_watch_check(ioctx, handle);
+ time_t age_bound = time(0) + 1 - start;
+ ASSERT_LT(age, age_bound * 1000);
+ ASSERT_GT(age, 0);
rados_conf_set(cluster, "objecter_inject_no_watch_ping", "true");
int left = 180;
+ std::cout << "waiting up to " << left << " for osd to time us out ..."
+ << std::endl;
while (notify_err == 0 && --left) {
sleep(1);
}
ASSERT_TRUE(left > 0);
rados_conf_set(cluster, "objecter_inject_no_watch_ping", "false");
ASSERT_EQ(-ENOTCONN, notify_err);
+ ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
// a subsequent notify should not reach us
char *reply_buf;
ASSERT_EQ(0u, reply_map.size());
}
ASSERT_EQ(0u, notify_cookies.size());
+ ASSERT_EQ(-ENOTCONN, rados_watch_check(ioctx, handle));
// re-watch
rados_unwatch2(ioctx, handle);
watch_notify2_test_cb,
watch_notify2_test_failcb,
watch_notify2_test_errcb, NULL));
+ ASSERT_TRUE(rados_watch_check(ioctx, handle) > 0);
+
// and now a notify will work.
ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
"notify", 6, 0,
ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
}
ASSERT_EQ(1u, notify_cookies.size());
+ ASSERT_TRUE(rados_watch_check(ioctx, handle) > 0);
rados_unwatch2(ioctx, handle);
}
watch_notify2_test_cb,
watch_notify2_test_failcb,
watch_notify2_test_errcb, NULL));
+ ASSERT_TRUE(rados_watch_check(ioctx, handle) > 0);
char *reply_buf;
size_t reply_buf_len;
ASSERT_EQ(0, rados_notify2(ioctx, notify_oid,
ASSERT_EQ(1, notify_cookies.count(handle));
ASSERT_EQ(5, reply_map.begin()->second.length());
ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
+ ASSERT_TRUE(rados_watch_check(ioctx, handle) > 0);
rados_unwatch2(ioctx, handle);
}
uint64_t handle;
WatchNotifyTestCtx2 ctx;
ASSERT_EQ(0, ioctx.watch(notify_oid, &handle, &ctx));
+ ASSERT_TRUE(ioctx.watch_check(handle) > 0);
std::list<obj_watch_t> watches;
ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
ASSERT_EQ(watches.size(), 1u);
ASSERT_EQ(1u, reply_map.size());
ASSERT_EQ(5, reply_map.begin()->second.length());
ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
+ ASSERT_TRUE(ioctx.watch_check(handle) > 0);
ioctx.unwatch(handle);
}
watch_notify2_test_cb,
watch_notify2_test_failcb,
watch_notify2_test_errcb, NULL));
+ ASSERT_TRUE(rados_watch_check(ioctx, handle1) > 0);
+ ASSERT_TRUE(rados_watch_check(ioctx, handle2) > 0);
ASSERT_NE(handle1, handle2);
char *reply_buf;
size_t reply_buf_len;
ASSERT_EQ(1, notify_cookies.count(handle1));
ASSERT_EQ(1, notify_cookies.count(handle2));
ASSERT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5));
+ ASSERT_TRUE(rados_watch_check(ioctx, handle1) > 0);
+ ASSERT_TRUE(rados_watch_check(ioctx, handle2) > 0);
rados_unwatch2(ioctx, handle1);
rados_unwatch2(ioctx, handle2);
}
watch_notify2_test_cb,
watch_notify2_test_failcb,
watch_notify2_test_errcb, NULL));
+ ASSERT_TRUE(rados_watch_check(ioctx, handle) > 0);
char *reply_buf;
size_t reply_buf_len;
ASSERT_EQ(-ETIMEDOUT, rados_notify2(ioctx, notify_oid,
"notify", 6, 30000, // 30s
&reply_buf, &reply_buf_len));
ASSERT_EQ(1u, notify_cookies.size());
+ ASSERT_TRUE(rados_watch_check(ioctx, handle) > 0);
rados_unwatch2(ioctx, handle);
}
uint64_t handle;
WatchNotifyTestCtx2 ctx;
ASSERT_EQ(0, ioctx.watch(notify_oid, &handle, &ctx));
+ ASSERT_TRUE(ioctx.watch_check(handle) > 0);
std::list<obj_watch_t> watches;
ASSERT_EQ(0, ioctx.list_watchers(notify_oid, &watches));
ASSERT_EQ(watches.size(), 1u);
while (!notify_failed && --wait)
sleep(1);
ASSERT_TRUE(notify_failed);
+ ASSERT_TRUE(ioctx.watch_check(handle) > 0);
ioctx.unwatch(handle);
}
)
)
+TRACEPOINT_EVENT(librados, rados_watch_check_enter,
+ TP_ARGS(
+ rados_ioctx_t, ioctx,
+ uint64_t, handle),
+ TP_FIELDS(
+ ctf_integer_hex(rados_ioctx_t, ioctx, ioctx)
+ ctf_integer(uint64_t, handle, handle)
+ )
+)
+
+TRACEPOINT_EVENT(librados, rados_watch_check_exit,
+ TP_ARGS(
+ int, retval),
+ TP_FIELDS(
+ ctf_integer(int, retval, retval)
+ )
+)
+
TRACEPOINT_EVENT(librados, rados_notify_enter,
TP_ARGS(
rados_ioctx_t, ioctx,