uint64_t notify_id, uint64_t cookie,
const char *buf, int buf_len);
+/**
+ * Flush watch/notify callbacks
+ *
+ * This call will block until all pending watch/notify callbacks have
+ * been executed and the queue is empty. It should usually be called
+ * after shutting down any watches before shutting down the ioctx or
+ * librados to ensure that any callbacks do not misuse the ioctx (for
+ * example by calling rados_notify_ack after the ioctx has been
+ * destroyed).
+ *
+ * @param cluster the cluster handle
+ */
+CEPH_RADOS_API int rados_watch_flush(rados_t cluster);
/** @} Watch/Notify */
config_t cct();
int connect();
void shutdown();
+ int watch_flush();
int conf_read_file(const char * const path) const;
int conf_parse_argv(int argc, const char ** argv) const;
int conf_parse_argv_remainder(int argc, const char ** argv,
ldout(cct, 1) << "shutdown" << dendl;
}
+int librados::RadosClient::watch_flush()
+{
+ ldout(cct, 10) << __func__ << " enter" << dendl;
+ objecter->linger_callback_flush();
+ ldout(cct, 10) << __func__ << " exit" << dendl;
+ return 0;
+}
+
uint64_t librados::RadosClient::get_instance_id()
{
return instance_id;
int connect();
void shutdown();
+ int watch_flush();
+
uint64_t get_instance_id();
int wait_for_latest_osdmap();
return (config_t)client->cct;
}
+int librados::Rados::watch_flush()
+{
+ if (!client)
+ return -EINVAL;
+ return client->watch_flush();
+}
+
void librados::Rados::shutdown()
{
if (!client)
return retval;
}
+extern "C" int rados_watch_flush(rados_t cluster)
+{
+ tracepoint(librados, rados_watch_flush_enter, cluster);
+ librados::RadosClient *client = (librados::RadosClient *)cluster;
+ int retval = client->watch_flush();
+ tracepoint(librados, rados_watch_flush_exit, retval);
+ return retval;
+}
+
extern "C" int rados_set_alloc_hint(rados_ioctx_t io, const char *o,
uint64_t expected_object_size,
uint64_t expected_write_size)
}
struct C_DoWatchError : public Context {
+ Objecter *objecter;
Objecter::LingerOp *info;
int err;
- C_DoWatchError(Objecter::LingerOp *i, int r) : info(i), err(r) {
+ C_DoWatchError(Objecter *o, Objecter::LingerOp *i, int r)
+ : objecter(o), info(i), err(r) {
info->get();
info->_queued_async();
}
info->watch_context->handle_error(info->get_cookie(), err);
info->finished_async();
info->put();
+ objecter->_linger_callback_finish();
}
};
if (!info->last_error) {
r = _normalize_watch_error(r);
info->last_error = r;
- if (info->watch_context)
- finisher->queue(new C_DoWatchError(info, r));
+ if (info->watch_context) {
+ finisher->queue(new C_DoWatchError(this, info, r));
+ _linger_callback_queue();
+ }
}
info->watch_lock.put_write();
}
} else if (r < 0 && !info->last_error) {
r = _normalize_watch_error(r);
info->last_error = r;
- if (info->watch_context)
- finisher->queue(new C_DoWatchError(info, r));
+ if (info->watch_context) {
+ finisher->queue(new C_DoWatchError(this, info, r));
+ _linger_callback_queue();
+ }
}
} else {
ldout(cct, 20) << " ignoring old gen" << dendl;
if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
if (!info->last_error) {
info->last_error = -ENOTCONN;
- if (info->watch_context)
- finisher->queue(new C_DoWatchError(info, -ENOTCONN));
+ if (info->watch_context) {
+ finisher->queue(new C_DoWatchError(this, info, -ENOTCONN));
+ _linger_callback_queue();
+ }
}
} else if (!info->is_watch) {
// notify completion; we can do this inline since we know the only user
info->on_notify_finish->complete(m->return_code);
} else {
finisher->queue(new C_DoWatchNotify(this, info, m));
+ _linger_callback_queue();
}
}
ldout(cct, 10) << __func__ << " " << *m << dendl;
rwlock.get_read();
- if (!initialized.read()) {
- rwlock.put_read();
- goto out;
- }
+ assert(initialized.read());
if (info->canceled) {
rwlock.put_read();
info->finished_async();
info->put();
m->put();
+ _linger_callback_finish();
}
bool Objecter::ms_dispatch(Message *m)
map<uint64_t, LingerOp*> linger_ops;
// we use this just to confirm a cookie is valid before dereferencing the ptr
set<LingerOp*> linger_ops_set;
+ int num_linger_callbacks;
+ Mutex linger_callback_lock;
+ Cond linger_callback_cond;
map<ceph_tid_t,PoolStatOp*> poolstat_ops;
map<ceph_tid_t,StatfsOp*> statfs_ops;
void _linger_ping(LingerOp *info, int r, utime_t sent, uint32_t register_gen);
int _normalize_watch_error(int r);
+ void _linger_callback_queue() {
+ Mutex::Locker l(linger_callback_lock);
+ ++num_linger_callbacks;
+ }
+ void _linger_callback_finish() {
+ Mutex::Locker l(linger_callback_lock);
+ if (--num_linger_callbacks == 0)
+ linger_callback_cond.SignalAll();
+ assert(num_linger_callbacks >= 0);
+ }
+ friend class C_DoWatchError;
+public:
+ void linger_callback_flush() {
+ Mutex::Locker l(linger_callback_lock);
+ while (num_linger_callbacks > 0)
+ linger_callback_cond.Wait(linger_callback_lock);
+ }
+
+private:
void _check_op_pool_dne(Op *op, bool session_locked);
void _send_op_map_check(Op *op);
void _op_cancel_map_check(Op *op);
timer(cct, timer_lock, false),
logger(NULL), tick_event(NULL),
m_request_state_hook(NULL),
+ num_linger_callbacks(0),
+ linger_callback_lock("Objecter::linger_callback_lock"),
num_homeless_ops(0),
homeless_session(new OSDSession(cct, -1)),
mon_timeout(mon_timeout),
ASSERT_GT(rados_watch_check(ioctx, handle), 0);
rados_unwatch2(ioctx, handle);
+ rados_watch_flush(cluster);
}
TEST_P(LibRadosWatchNotifyPP, WatchNotify2Timeout) {
ASSERT_EQ(watches.size(), 1u);
ASSERT_EQ(0u, notify_cookies.size());
bufferlist bl2, bl_reply;
+ std::cout << " trying..." << std::endl;
ASSERT_EQ(-ETIMEDOUT, ioctx.notify2(notify_oid, bl2, 1000 /* 1s */,
&bl_reply));
+ std::cout << " timed out" << std::endl;
ASSERT_GT(ioctx.watch_check(handle), 0);
ioctx.unwatch2(handle);
+ std::cout << " flushing" << std::endl;
+ cluster.watch_flush();
+ std::cout << " flushed" << std::endl;
}
// --
)
)
+TRACEPOINT_EVENT(librados, rados_watch_flush_enter,
+ TP_ARGS(
+ rados_t, cluster),
+ TP_FIELDS(
+ ctf_integer_hex(rados_t, cluster, cluster)
+ )
+)
+
+TRACEPOINT_EVENT(librados, rados_watch_flush_exit,
+ TP_ARGS(
+ int, retval),
+ TP_FIELDS(
+ ctf_integer(int, retval, retval)
+ )
+)
+
TRACEPOINT_EVENT(librados, rados_set_alloc_hint_enter,
TP_ARGS(
rados_ioctx_t, ioctx,