typedef int (*librbd_progress_fn_t)(uint64_t offset, uint64_t total, void *ptr);
+typedef void (*rbd_update_callback_t)(void *arg);
+
typedef struct {
uint64_t id;
uint64_t size;
CEPH_RBD_API int rbd_group_remove(rados_ioctx_t p, const char *name);
CEPH_RBD_API int rbd_group_list(rados_ioctx_t p, char *names, size_t *size);
+/**
+ * Register an image metadata change watcher.
+ *
+ * @param image the image to watch
+ * @param handle where to store the internal id assigned to this watch
+ * @param watch_cb what to do when a notify is received on this image
+ * @param arg opaque value to pass to the callback
+ * @returns 0 on success, negative error code on failure
+ */
+CEPH_RBD_API int rbd_update_watch(rbd_image_t image, uint64_t *handle,
+ rbd_update_callback_t watch_cb, void *arg);
+
+/**
+ * Unregister an image watcher.
+ *
+ * @param image the image to unwatch
+ * @param handle which watch to unregister
+ * @returns 0 on success, negative error code on failure
+ */
+CEPH_RBD_API int rbd_update_unwatch(rbd_image_t image, uint64_t handle);
+
+
#ifdef __cplusplus
}
#endif
rbd_image_options_t opts;
};
+class CEPH_RBD_API UpdateWatchCtx {
+public:
+ virtual ~UpdateWatchCtx() {}
+ /**
+ * Callback activated when we receive a notify event.
+ */
+ virtual void handle_notify() = 0;
+};
+
class CEPH_RBD_API Image
{
public:
int mirror_image_get_status(mirror_image_status_t *mirror_image_status,
size_t status_size);
+ int update_watch(UpdateWatchCtx *ctx, uint64_t *handle);
+ int update_unwatch(uint64_t handle);
+
private:
friend class RBD;
// vim: ts=8 sw=2 smarttab
#include "librbd/ImageState.h"
+#include "include/rbd/librbd.hpp"
#include "common/dout.h"
#include "common/errno.h"
#include "common/Cond.h"
using util::create_async_context_callback;
using util::create_context_callback;
+class ImageUpdateWatchers {
+public:
+
+ ImageUpdateWatchers(CephContext *cct) : m_cct(cct),
+ m_lock(util::unique_lock_name("librbd::ImageUpdateWatchers::m_lock", this)) {
+ }
+
+ ~ImageUpdateWatchers() {
+ assert(m_watchers.empty());
+ assert(m_in_flight.empty());
+ assert(m_pending_unregister.empty());
+ assert(m_on_shut_down_finish == nullptr);
+
+ destroy_work_queue();
+ }
+
+ void flush(Context *on_finish) {
+ ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << dendl;
+ {
+ Mutex::Locker locker(m_lock);
+ if (!m_in_flight.empty()) {
+ Context *ctx = new FunctionContext(
+ [this, on_finish](int r) {
+ ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__
+ << ": completing flush" << dendl;
+ on_finish->complete(r);
+ });
+ m_work_queue->queue(ctx, 0);
+ return;
+ }
+ }
+ ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__
+ << ": completing flush" << dendl;
+ on_finish->complete(0);
+ }
+
+ void shut_down(Context *on_finish) {
+ ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << dendl;
+ {
+ Mutex::Locker locker(m_lock);
+ assert(m_on_shut_down_finish == nullptr);
+ m_watchers.clear();
+ if (!m_in_flight.empty()) {
+ m_on_shut_down_finish = on_finish;
+ return;
+ }
+ }
+ ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__
+ << ": completing shut down" << dendl;
+ on_finish->complete(0);
+ }
+
+ void register_watcher(UpdateWatchCtx *watcher, uint64_t *handle) {
+ ldout(m_cct, 20) << __func__ << ": watcher=" << watcher << dendl;
+
+ Mutex::Locker locker(m_lock);
+ assert(m_on_shut_down_finish == nullptr);
+
+ create_work_queue();
+
+ *handle = m_next_handle++;
+ m_watchers.insert(std::make_pair(*handle, watcher));
+ }
+
+ void unregister_watcher(uint64_t handle, Context *on_finish) {
+ ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << ": handle="
+ << handle << dendl;
+ int r = 0;
+ {
+ Mutex::Locker locker(m_lock);
+ auto it = m_watchers.find(handle);
+ if (it == m_watchers.end()) {
+ r = -ENOENT;
+ } else {
+ if (m_in_flight.find(handle) != m_in_flight.end()) {
+ assert(m_pending_unregister.find(handle) == m_pending_unregister.end());
+ m_pending_unregister[handle] = on_finish;
+ on_finish = nullptr;
+ }
+ m_watchers.erase(it);
+ }
+ }
+
+ if (on_finish) {
+ ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__
+ << ": completing unregister" << dendl;
+ on_finish->complete(r);
+ }
+ }
+
+ void notify() {
+ ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << dendl;
+
+ Mutex::Locker locker(m_lock);
+ for (auto it : m_watchers) {
+ send_notify(it.first, it.second);
+ }
+ }
+
+ void send_notify(uint64_t handle, UpdateWatchCtx *watcher) {
+ assert(m_lock.is_locked());
+
+ ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << ": handle="
+ << handle << ", watcher=" << watcher << dendl;
+
+ m_in_flight.insert(handle);
+
+ Context *ctx = new FunctionContext(
+ [this, handle, watcher](int r) {
+ handle_notify(handle, watcher);
+ });
+
+ m_work_queue->queue(ctx, 0);
+ }
+
+ void handle_notify(uint64_t handle, UpdateWatchCtx *watcher) {
+
+ ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__ << ": handle="
+ << handle << ", watcher=" << watcher << dendl;
+
+ watcher->handle_notify();
+
+ Context *on_unregister_finish = nullptr;
+ Context *on_shut_down_finish = nullptr;
+
+ {
+ Mutex::Locker locker(m_lock);
+
+ auto in_flight_it = m_in_flight.find(handle);
+ assert(in_flight_it != m_in_flight.end());
+ m_in_flight.erase(in_flight_it);
+
+ // If there is no more in flight notifications for this watcher
+ // and it is pending unregister, complete it now.
+ if (m_in_flight.find(handle) == m_in_flight.end()) {
+ auto it = m_pending_unregister.find(handle);
+ if (it != m_pending_unregister.end()) {
+ on_unregister_finish = it->second;
+ m_pending_unregister.erase(it);
+ }
+ }
+
+ if (m_in_flight.empty()) {
+ assert(m_pending_unregister.empty());
+ if (m_on_shut_down_finish != nullptr) {
+ std::swap(m_on_shut_down_finish, on_shut_down_finish);
+ }
+ }
+ }
+
+ if (on_unregister_finish != nullptr) {
+ ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__
+ << ": completing unregister" << dendl;
+ on_unregister_finish->complete(0);
+ }
+
+ if (on_shut_down_finish != nullptr) {
+ ldout(m_cct, 20) << "ImageUpdateWatchers::" << __func__
+ << ": completing shut down" << dendl;
+ on_shut_down_finish->complete(0);
+ }
+ }
+
+private:
+ class ThreadPoolSingleton : public ThreadPool {
+ public:
+ explicit ThreadPoolSingleton(CephContext *cct)
+ : ThreadPool(cct, "librbd::ImageUpdateWatchers::thread_pool", "tp_librbd",
+ 1) {
+ start();
+ }
+ virtual ~ThreadPoolSingleton() {
+ stop();
+ }
+ };
+
+ CephContext *m_cct;
+ Mutex m_lock;
+ ContextWQ *m_work_queue = nullptr;
+ std::map<uint64_t, UpdateWatchCtx*> m_watchers;
+ uint64_t m_next_handle = 0;
+ std::multiset<uint64_t> m_in_flight;
+ std::map<uint64_t, Context*> m_pending_unregister;
+ Context *m_on_shut_down_finish = nullptr;
+
+ void create_work_queue() {
+ if (m_work_queue != nullptr) {
+ return;
+ }
+ ThreadPoolSingleton *thread_pool_singleton;
+ m_cct->lookup_or_create_singleton_object<ThreadPoolSingleton>(
+ thread_pool_singleton, "librbd::ImageUpdateWatchers::thread_pool");
+ m_work_queue = new ContextWQ("librbd::ImageUpdateWatchers::op_work_queue",
+ m_cct->_conf->rbd_op_thread_timeout,
+ thread_pool_singleton);
+ }
+
+ void destroy_work_queue() {
+ if (m_work_queue == nullptr) {
+ return;
+ }
+ m_work_queue->drain();
+ delete m_work_queue;
+ }
+};
+
template <typename I>
ImageState<I>::ImageState(I *image_ctx)
: m_image_ctx(image_ctx), m_state(STATE_UNINITIALIZED),
m_lock(util::unique_lock_name("librbd::ImageState::m_lock", this)),
- m_last_refresh(0), m_refresh_seq(0) {
+ m_last_refresh(0), m_refresh_seq(0),
+ m_update_watchers(new ImageUpdateWatchers(image_ctx->cct)) {
}
template <typename I>
ImageState<I>::~ImageState() {
assert(m_state == STATE_UNINITIALIZED || m_state == STATE_CLOSED);
+ delete m_update_watchers;
}
template <typename I>
++m_refresh_seq;
CephContext *cct = m_image_ctx->cct;
- ldout(cct, 20) << "refresh_seq = " << m_refresh_seq << ", "
+ ldout(cct, 20) << __func__ << ": refresh_seq = " << m_refresh_seq << ", "
<< "last_refresh = " << m_last_refresh << dendl;
+
+ if (m_state == STATE_OPEN) {
+ m_update_watchers->notify();
+ }
}
template <typename I>
execute_action_unlock(action, on_finish);
}
+template <typename I>
+int ImageState<I>::register_update_watcher(UpdateWatchCtx *watcher,
+ uint64_t *handle) {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << __func__ << dendl;
+
+ m_update_watchers->register_watcher(watcher, handle);
+
+ ldout(cct, 20) << __func__ << ": handle=" << *handle << dendl;
+ return 0;
+}
+
+template <typename I>
+int ImageState<I>::unregister_update_watcher(uint64_t handle) {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << __func__ << ": handle=" << handle << dendl;
+
+ C_SaferCond ctx;
+ m_update_watchers->unregister_watcher(handle, &ctx);
+ return ctx.wait();
+}
+
+template <typename I>
+void ImageState<I>::flush_update_watchers(Context *on_finish) {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << __func__ << dendl;
+
+ m_update_watchers->flush(on_finish);
+}
+
+template <typename I>
+void ImageState<I>::shut_down_update_watchers(Context *on_finish) {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 20) << __func__ << dendl;
+
+ m_update_watchers->shut_down(on_finish);
+}
+
template <typename I>
bool ImageState<I>::is_transition_state() const {
switch (m_state) {
namespace librbd {
class ImageCtx;
+class ImageUpdateWatchers;
+class UpdateWatchCtx;
template <typename ImageCtxT = ImageCtx>
class ImageState {
void snap_set(const std::string &snap_name, Context *on_finish);
+ int register_update_watcher(UpdateWatchCtx *watcher, uint64_t *handle);
+ int unregister_update_watcher(uint64_t handle);
+ void flush_update_watchers(Context *on_finish);
+ void shut_down_update_watchers(Context *on_finish);
+
private:
enum State {
STATE_UNINITIALIZED,
uint64_t m_last_refresh;
uint64_t m_refresh_seq;
+ ImageUpdateWatchers *m_update_watchers;
+
bool is_transition_state() const;
bool is_closed() const;
m_image_ctx.state->handle_update_notification();
m_image_ctx.perfcounter->inc(l_librbd_notify);
+ if (ack_ctx != nullptr) {
+ m_image_ctx.state->flush_update_watchers(new C_ResponseMessage(ack_ctx));
+ return false;
+ }
return true;
}
template <typename I>
void CloseRequest<I>::send() {
+ send_shut_down_update_watchers();
+}
+
+template <typename I>
+void CloseRequest<I>::send_shut_down_update_watchers() {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 10) << this << " " << __func__ << dendl;
+
+ m_image_ctx->state->shut_down_update_watchers(create_async_context_callback(
+ *m_image_ctx, create_context_callback<
+ CloseRequest<I>, &CloseRequest<I>::handle_shut_down_update_watchers>(this)));
+}
+
+template <typename I>
+void CloseRequest<I>::handle_shut_down_update_watchers(int r) {
+ CephContext *cct = m_image_ctx->cct;
+ ldout(cct, 10) << this << " " << __func__ << ": r=" << r << dendl;
+
+ save_result(r);
+ if (r < 0) {
+ lderr(cct) << "failed to shut down update watchers: " << cpp_strerror(r)
+ << dendl;
+ }
+
send_unregister_image_watcher();
}
* <start>
* |
* v
+ * SHUT_DOWN_UPDATE_WATCHERS
+ * |
+ * v
* UNREGISTER_IMAGE_WATCHER
* |
* v
decltype(m_image_ctx->exclusive_lock) m_exclusive_lock;
+ void send_shut_down_update_watchers();
+ void handle_shut_down_update_watchers(int r);
+
void send_unregister_image_watcher();
void handle_unregister_image_watcher(int r);
}
};
+struct C_UpdateWatchCB : public librbd::UpdateWatchCtx {
+ rbd_update_callback_t watch_cb;
+ void *arg;
+ uint64_t handle = 0;
+
+ C_UpdateWatchCB(rbd_update_callback_t watch_cb, void *arg) :
+ watch_cb(watch_cb), arg(arg) {
+ }
+ void handle_notify() {
+ watch_cb(arg);
+ }
+};
+
void mirror_image_info_cpp_to_c(const librbd::mirror_image_info_t &cpp_info,
rbd_mirror_image_info_t *c_info) {
c_info->global_id = strdup(cpp_info.global_id.c_str());
status_size);
}
+ int Image::update_watch(UpdateWatchCtx *wctx, uint64_t *handle) {
+ ImageCtx *ictx = (ImageCtx *)ctx;
+ tracepoint(librbd, update_watch_enter, ictx, wctx);
+ int r = ictx->state->register_update_watcher(wctx, handle);
+ tracepoint(librbd, update_watch_exit, r, *handle);
+ return r;
+ }
+
+ int Image::update_unwatch(uint64_t handle) {
+ ImageCtx *ictx = (ImageCtx *)ctx;
+ tracepoint(librbd, update_unwatch_enter, ictx, handle);
+ int r = ictx->state->unregister_update_watcher(handle);
+ tracepoint(librbd, update_unwatch_exit, r);
+ return r;
+ }
+
} // namespace librbd
extern "C" void rbd_version(int *major, int *minor, int *extra)
return 0;
}
+extern "C" int rbd_update_watch(rbd_image_t image, uint64_t *handle,
+ rbd_update_callback_t watch_cb, void *arg)
+{
+ librbd::ImageCtx *ictx = (librbd::ImageCtx *)image;
+ C_UpdateWatchCB *wctx = new C_UpdateWatchCB(watch_cb, arg);
+ tracepoint(librbd, update_watch_enter, ictx, wctx);
+ int r = ictx->state->register_update_watcher(wctx, &wctx->handle);
+ tracepoint(librbd, update_watch_exit, r, wctx->handle);
+ *handle = reinterpret_cast<uint64_t>(wctx);
+ return r;
+}
+
+extern "C" int rbd_update_unwatch(rbd_image_t image, uint64_t handle)
+{
+ librbd::ImageCtx *ictx = (librbd::ImageCtx *)image;
+ C_UpdateWatchCB *wctx = reinterpret_cast<C_UpdateWatchCB *>(handle);
+ tracepoint(librbd, update_unwatch_enter, ictx, wctx->handle);
+ int r = ictx->state->unregister_update_watcher(wctx->handle);
+ delete wctx;
+ tracepoint(librbd, update_unwatch_exit, r);
+ return r;
+}
+
extern "C" int rbd_aio_is_complete(rbd_completion_t c)
{
librbd::RBD::AioCompletion *comp = (librbd::RBD::AioCompletion *)c;
#include "gtest/gtest.h"
+#include <chrono>
#include <errno.h>
#include <stdarg.h>
#include <stdio.h>
#include "test/librados/test.h"
#include "test/librbd/test_support.h"
+#include "common/Cond.h"
+#include "common/Mutex.h"
#include "common/errno.h"
#include "common/event_socket.h"
#include "include/interval_set.h"
using namespace std;
+using std::chrono::seconds;
+
#define ASSERT_PASSED(x, args...) \
do { \
bool passed = false; \
ioctx.close();
}
+TEST_F(TestLibRBD, UpdateWatchAndResize)
+{
+ rados_ioctx_t ioctx;
+ rados_ioctx_create(_cluster, m_pool_name.c_str(), &ioctx);
+
+ rbd_image_t image;
+ int order = 0;
+ std::string name = get_temp_image_name();
+ uint64_t size = 2 << 20;
+ struct Watcher {
+ static void cb(void *arg) {
+ Watcher *watcher = (Watcher *)arg;
+ watcher->handle_notify();
+ }
+ Watcher(rbd_image_t &image) : m_image(image), m_lock("lock") {}
+ void handle_notify() {
+ rbd_image_info_t info;
+ ASSERT_EQ(0, rbd_stat(m_image, &info, sizeof(info)));
+ Mutex::Locker locker(m_lock);
+ m_size = info.size;
+ m_cond.Signal();
+ }
+ void wait_for_size(size_t size) {
+ Mutex::Locker locker(m_lock);
+ while (m_size != size) {
+ CephContext* cct = reinterpret_cast<CephContext*>(_rados.cct());
+ ASSERT_EQ(0, m_cond.WaitInterval(cct, m_lock, seconds(5)));
+ }
+ }
+ rbd_image_t &m_image;
+ Mutex m_lock;
+ Cond m_cond;
+ size_t m_size = 0;
+ } watcher(image);
+ uint64_t handle;
+
+ ASSERT_EQ(0, create_image(ioctx, name.c_str(), size, &order));
+ ASSERT_EQ(0, rbd_open(ioctx, name.c_str(), &image, NULL));
+
+ ASSERT_EQ(0, rbd_update_watch(image, &handle, Watcher::cb, &watcher));
+
+ ASSERT_EQ(0, rbd_resize(image, size * 4));
+ watcher.wait_for_size(size * 4);
+
+ ASSERT_EQ(0, rbd_resize(image, size / 2));
+ watcher.wait_for_size(size / 2);
+
+ ASSERT_EQ(0, rbd_update_unwatch(image, handle));
+
+ ASSERT_EQ(0, rbd_close(image));
+ rados_ioctx_destroy(ioctx);
+}
+
+TEST_F(TestLibRBD, UpdateWatchAndResizePP)
+{
+ librados::IoCtx ioctx;
+ ASSERT_EQ(0, _rados.ioctx_create(m_pool_name.c_str(), ioctx));
+
+ {
+ librbd::RBD rbd;
+ librbd::Image image;
+ int order = 0;
+ std::string name = get_temp_image_name();
+ uint64_t size = 2 << 20;
+ struct Watcher : public librbd::UpdateWatchCtx {
+ Watcher(librbd::Image &image) : m_image(image), m_lock("lock") {
+ }
+ void handle_notify() {
+ librbd::image_info_t info;
+ ASSERT_EQ(0, m_image.stat(info, sizeof(info)));
+ Mutex::Locker locker(m_lock);
+ m_size = info.size;
+ m_cond.Signal();
+ }
+ void wait_for_size(size_t size) {
+ Mutex::Locker locker(m_lock);
+ while (m_size != size) {
+ CephContext* cct = reinterpret_cast<CephContext*>(_rados.cct());
+ ASSERT_EQ(0, m_cond.WaitInterval(cct, m_lock, seconds(5)));
+ }
+ }
+ librbd::Image &m_image;
+ Mutex m_lock;
+ Cond m_cond;
+ size_t m_size = 0;
+ } watcher(image);
+ uint64_t handle;
+
+ ASSERT_EQ(0, create_image_pp(rbd, ioctx, name.c_str(), size, &order));
+ ASSERT_EQ(0, rbd.open(ioctx, image, name.c_str(), NULL));
+
+ ASSERT_EQ(0, image.update_watch(&watcher, &handle));
+
+ ASSERT_EQ(0, image.resize(size * 4));
+ watcher.wait_for_size(size * 4);
+
+ ASSERT_EQ(0, image.resize(size / 2));
+ watcher.wait_for_size(size / 2);
+
+ ASSERT_EQ(0, image.update_unwatch(handle));
+ }
+
+ ioctx.close();
+}
+
int test_ls(rados_ioctx_t io_ctx, size_t num_expected, ...)
{
int num_images, i;
ctf_integer(int, retval, retval)
)
)
+
+TRACEPOINT_EVENT(librbd, update_watch_enter,
+ TP_ARGS(
+ void*, imagectx,
+ void*, watchctx),
+ TP_FIELDS(
+ ctf_integer_hex(void*, imagctx, imagectx)
+ ctf_integer_hex(void*, watchctx, watchctx)
+ )
+)
+
+TRACEPOINT_EVENT(librbd, update_watch_exit,
+ TP_ARGS(
+ int, retval,
+ uint64_t, handle),
+ TP_FIELDS(
+ ctf_integer(int, retval, retval)
+ ctf_integer(uint64_t, handle, handle)
+ )
+)
+
+TRACEPOINT_EVENT(librbd, update_unwatch_enter,
+ TP_ARGS(
+ void*, imagectx,
+ uint64_t, handle),
+ TP_FIELDS(
+ ctf_integer_hex(void*, imagctx, imagectx)
+ ctf_integer(uint64_t, handle, handle)
+ )
+)
+
+TRACEPOINT_EVENT(librbd, update_unwatch_exit,
+ TP_ARGS(
+ int, retval),
+ TP_FIELDS(
+ ctf_integer(int, retval, retval)
+ )
+)