#include "include/stringify.h"
#include "cls/rbd/cls_rbd_client.h"
#include "librbd/ObjectWatcher.h"
+#include "librbd/internal.h"
#include "Replayer.h"
#include "Threads.h"
using std::unique_ptr;
using std::vector;
+using librbd::cls_client::dir_get_name;
+
namespace rbd {
namespace mirror {
Watcher *m_watcher;
};
-Replayer::Replayer(Threads *threads, RadosRef local_cluster,
- const peer_t &peer, const std::vector<const char*> &args) :
+Replayer::Replayer(Threads *threads, std::shared_ptr<ImageDeleter> image_deleter,
+ RadosRef local_cluster, const peer_t &peer,
+ const std::vector<const char*> &args) :
m_threads(threads),
+ m_image_deleter(image_deleter),
m_lock(stringify("rbd::mirror::Replayer ") + stringify(peer)),
m_peer(peer),
m_args(args),
{
dout(20) << "replaying for " << m_peer << dendl;
- m_image_deleter.reset(new ImageDeleter(m_peer.cluster_name, m_local));
-
// NOTE: manually bootstrap a CephContext here instead of via
// the librados API to avoid mixing global singletons between
// the librados shared library and the daemon
dout(20) << "connected to " << m_peer << dendl;
+ // Bootstrap existing mirroring images
+ init_local_mirroring_images();
+
// TODO: make interval configurable
m_pool_watcher.reset(new PoolWatcher(m_remote, 30, m_lock, m_cond));
m_pool_watcher->refresh_images();
return 0;
}
+void Replayer::init_local_mirroring_images() {
+ list<pair<int64_t, string> > pools;
+ int r = m_local->pool_list2(pools);
+ if (r < 0) {
+ derr << "error listing pools: " << cpp_strerror(r) << dendl;
+ return;
+ }
+
+ for (auto kv : pools) {
+ int64_t pool_id = kv.first;
+ string pool_name = kv.second;
+ int64_t base_tier;
+ r = m_local->pool_get_base_tier(pool_id, &base_tier);
+ if (r == -ENOENT) {
+ dout(10) << "pool " << pool_name << " no longer exists" << dendl;
+ continue;
+ } else if (r < 0) {
+ derr << "Error retrieving base tier for pool " << pool_name << dendl;
+ continue;
+ }
+ if (pool_id != base_tier) {
+ // pool is a cache; skip it
+ continue;
+ }
+
+ librados::IoCtx ioctx;
+ r = m_local->ioctx_create2(pool_id, ioctx);
+ if (r == -ENOENT) {
+ dout(10) << "pool " << pool_name << " no longer exists" << dendl;
+ continue;
+ } else if (r < 0) {
+ derr << "Error accessing pool " << pool_name << cpp_strerror(r) << dendl;
+ continue;
+ }
+
+ rbd_mirror_mode_t mirror_mode;
+ r = librbd::mirror_mode_get(ioctx, &mirror_mode);
+ if (r < 0) {
+ derr << "could not tell whether mirroring was enabled for " << pool_name
+ << " : " << cpp_strerror(r) << dendl;
+ continue;
+ }
+ if (mirror_mode == RBD_MIRROR_MODE_DISABLED) {
+ dout(20) << "pool " << pool_name << " has mirroring disabled" << dendl;
+ continue;
+ }
+
+ librados::IoCtx remote_ioctx;
+ r = m_remote->ioctx_create(ioctx.get_pool_name().c_str(), remote_ioctx);
+ if (r < 0 && r != -ENOENT) {
+ dout(10) << "Error connecting to remote pool " << ioctx.get_pool_name()
+ << ": " << cpp_strerror(r) << dendl;
+ continue;
+ } else if (r == -ENOENT) {
+ // remote pool does not exist anymore, we are going to add the images
+ // with local pool id
+ pool_id = ioctx.get_id();
+ }
+ else {
+ pool_id = remote_ioctx.get_id();
+ }
+
+ std::set<InitImageInfo> images;
+
+ std::string last_read = "";
+ int max_read = 1024;
+ do {
+ std::map<std::string, std::string> mirror_images;
+ r = librbd::cls_client::mirror_image_list(&ioctx, last_read, max_read,
+ &mirror_images);
+ if (r < 0) {
+ derr << "error listing mirrored image directory: "
+ << cpp_strerror(r) << dendl;
+ continue;
+ }
+ for (auto it = mirror_images.begin(); it != mirror_images.end(); ++it) {
+ std::string image_name;
+ r = dir_get_name(&ioctx, RBD_DIRECTORY, it->first, &image_name);
+ if (r < 0) {
+ derr << "error retrieving local image name: " << cpp_strerror(r)
+ << dendl;
+ continue;
+ }
+ images.insert(InitImageInfo(it->second, ioctx.get_id(), it->first,
+ image_name));
+ }
+ if (!mirror_images.empty()) {
+ last_read = mirror_images.rbegin()->first;
+ }
+ r = mirror_images.size();
+ } while (r == max_read);
+
+ if (!images.empty()) {
+ m_init_images[pool_id] = std::move(images);
+ }
+ }
+}
+
void Replayer::run()
{
dout(20) << "enter" << dendl;
m_cond.WaitInterval(g_ceph_context, m_lock, seconds(30));
}
- // Stopping
m_image_deleter.reset();
PoolImageIds empty_sources;
}
}
- if (f) {
- f->close_section();
- f->open_object_section("image_deleter");
- }
-
- m_image_deleter->print_status(f, ss);
-
if (f) {
f->close_section();
f->close_section();
dout(20) << "enter" << dendl;
assert(m_lock.is_locked());
+
+ if (!m_init_images.empty()) {
+ dout(20) << "m_init_images has images!" << dendl;
+ for (auto it = m_init_images.begin(); it != m_init_images.end(); ++it) {
+ int64_t pool_id = it->first;
+ std::set<InitImageInfo>& images = it->second;
+ auto remote_pool_it = pool_image_ids.find(pool_id);
+ if (remote_pool_it != pool_image_ids.end()) {
+ const std::set<ImageIds>& remote_images = remote_pool_it->second;
+ for (const auto& remote_image : remote_images) {
+ auto image = images.find(InitImageInfo(remote_image.global_id));
+ if (image != images.end()) {
+ images.erase(image);
+ }
+ }
+ }
+ }
+ // the remaining images in m_init_images must be deleted
+ for (auto it = m_init_images.begin(); it != m_init_images.end(); ++it) {
+ for (const auto& image : it->second) {
+ dout(20) << "scheduling the deletion of init image: "
+ << image.name << dendl;
+ m_image_deleter->schedule_image_delete(image.pool_id, image.id,
+ image.name, image.global_id);
+ }
+ }
+ m_init_images.clear();
+ } else {
+ dout(20) << "m_init_images is empty!" << dendl;
+ }
+
for (auto it = m_images.begin(); it != m_images.end();) {
int64_t pool_id = it->first;
auto &pool_images = it->second;
);
image_replayer->stop(ctx);
} else {
- // TODO: check how long it is stopping and alert if it is too long.
+ // TODO: checkhow long it is stopping and alert if it is too long.
}
return false;
*/
class Replayer {
public:
- Replayer(Threads *threads, RadosRef local_cluster, const peer_t &peer,
- const std::vector<const char*> &args);
+ Replayer(Threads *threads, std::shared_ptr<ImageDeleter> image_deleter,
+ RadosRef local_cluster, const peer_t &peer,
+ const std::vector<const char*> &args);
~Replayer();
Replayer(const Replayer&) = delete;
Replayer& operator=(const Replayer&) = delete;
typedef PoolWatcher::ImageIds ImageIds;
typedef PoolWatcher::PoolImageIds PoolImageIds;
+ void init_local_mirroring_images();
void set_sources(const PoolImageIds &pool_image_ids);
void start_image_replayer(unique_ptr<ImageReplayer<> > &image_replayer,
void mirror_image_status_shut_down(int64_t pool_id);
Threads *m_threads;
+ std::shared_ptr<ImageDeleter> m_image_deleter;
Mutex m_lock;
Cond m_cond;
atomic_t m_stopping;
std::unique_ptr<ImageReplayer<> > > > m_images;
std::map<int64_t, std::unique_ptr<MirrorStatusWatchCtx> > m_status_watchers;
ReplayerAdminSocketHook *m_asok_hook;
- std::unique_ptr<ImageDeleter> m_image_deleter;
+ struct InitImageInfo {
+ std::string global_id;
+ int64_t pool_id;
+ std::string id;
+ std::string name;
+
+ InitImageInfo(const std::string& global_id, int64_t pool_id = 0,
+ const std::string &id = "", const std::string &name = "")
+ : global_id(global_id), pool_id(pool_id), id(id), name(name) {
+ }
+
+ inline bool operator==(const InitImageInfo &rhs) const {
+ return (global_id == rhs.global_id && pool_id == rhs.pool_id &&
+ id == rhs.id && name == rhs.name);
+ }
+ inline bool operator<(const InitImageInfo &rhs) const {
+ return global_id < rhs.global_id;
+ }
+ };
+
+ std::map<int64_t, std::set<InitImageInfo> > m_init_images;
class ReplayerThread : public Thread {
Replayer *m_replayer;