.set_description("number of failed attempts to acquire lock after missing heartbeats before breaking lock"),
Option("rbd_mirror_image_policy_type", Option::TYPE_STR, Option::LEVEL_ADVANCED)
- .set_default("simple")
- .set_enum_allowed({"simple"})
- .set_description("policy type for mapping images to instances"),
+ .set_default("none")
+ .set_enum_allowed({"none", "simple"})
+ .set_description("active/active policy type for mapping images to instances"),
Option("rbd_mirror_image_policy_migration_throttle", Option::TYPE_INT, Option::LEVEL_ADVANCED)
.set_default(300)
CephContext *cct = reinterpret_cast<CephContext *>(m_local_io_ctx.cct());
std::string policy_type = cct->_conf->get_val<string>("rbd_mirror_image_policy_type");
- if (policy_type == "simple") {
+ if (policy_type == "none" || policy_type == "simple") {
m_policy = image_map::SimplePolicy::create(m_local_io_ctx);
} else {
assert(false);
void SetUp() override {
TestFixture::SetUp();
+ m_local_instance_id = stringify(m_local_io_ctx.get_instance_id());
+
EXPECT_EQ(0, _rados->conf_set("rbd_mirror_image_policy_migration_throttle",
"0"));
+ EXPECT_EQ(0, _rados->conf_set("rbd_mirror_image_policy_type", "simple"));
+ }
+
+ void TearDown() override {
+ EXPECT_EQ(0, _rados->conf_set("rbd_mirror_image_policy_type", "none"));
+
+ TestFixture::TearDown();
}
void expect_work_queue(MockThreads &mock_threads) {
Cond m_cond;
uint32_t m_notify_update_count;
uint32_t m_map_update_count;
+ std::string m_local_instance_id;
};
TEST_F(TestMockImageMap, SetLocalImages) {
MockListener mock_listener(this);
std::unique_ptr<MockImageMap> mock_image_map{
- MockImageMap::create(m_local_io_ctx, &mock_threads, mock_listener)};
+ MockImageMap::create(m_local_io_ctx, &mock_threads, m_local_instance_id,
+ mock_listener)};
C_SaferCond cond;
mock_image_map->init(&cond);
MockListener mock_listener(this);
std::unique_ptr<MockImageMap> mock_image_map{
- MockImageMap::create(m_local_io_ctx, &mock_threads, mock_listener)};
+ MockImageMap::create(m_local_io_ctx, &mock_threads, m_local_instance_id,
+ mock_listener)};
C_SaferCond cond;
mock_image_map->init(&cond);
MockListener mock_listener(this);
std::unique_ptr<MockImageMap> mock_image_map{
- MockImageMap::create(m_local_io_ctx, &mock_threads, mock_listener)};
+ MockImageMap::create(m_local_io_ctx, &mock_threads, m_local_instance_id,
+ mock_listener)};
C_SaferCond cond;
mock_image_map->init(&cond);
MockListener mock_listener(this);
std::unique_ptr<MockImageMap> mock_image_map{
- MockImageMap::create(m_local_io_ctx, &mock_threads, mock_listener)};
+ MockImageMap::create(m_local_io_ctx, &mock_threads, m_local_instance_id,
+ mock_listener)};
C_SaferCond cond;
mock_image_map->init(&cond);
MockListener mock_listener(this);
std::unique_ptr<MockImageMap> mock_image_map{
- MockImageMap::create(m_local_io_ctx, &mock_threads, mock_listener)};
+ MockImageMap::create(m_local_io_ctx, &mock_threads, m_local_instance_id,
+ mock_listener)};
C_SaferCond cond;
mock_image_map->init(&cond);
MockListener mock_listener(this);
std::unique_ptr<MockImageMap> mock_image_map{
- MockImageMap::create(m_local_io_ctx, &mock_threads, mock_listener)};
+ MockImageMap::create(m_local_io_ctx, &mock_threads, m_local_instance_id,
+ mock_listener)};
C_SaferCond cond;
mock_image_map->init(&cond);
MockListener mock_listener(this);
std::unique_ptr<MockImageMap> mock_image_map{
- MockImageMap::create(m_local_io_ctx, &mock_threads, mock_listener)};
+ MockImageMap::create(m_local_io_ctx, &mock_threads, m_local_instance_id,
+ mock_listener)};
C_SaferCond cond;
mock_image_map->init(&cond);
&peer_ack_ctxs);
wait_for_scheduled_task();
- auto local_instance_id = stringify(m_local_io_ctx.get_instance_id());
- mock_image_map->update_instances_added({local_instance_id});
+ mock_image_map->update_instances_added({m_local_instance_id});
std::set<std::string> shuffled_global_image_ids;
MockListener mock_listener(this);
std::unique_ptr<MockImageMap> mock_image_map{
- MockImageMap::create(m_local_io_ctx, &mock_threads, mock_listener)};
+ MockImageMap::create(m_local_io_ctx, &mock_threads, m_local_instance_id,
+ mock_listener)};
C_SaferCond cond;
mock_image_map->init(&cond);
&peer_ack_ctxs);
wait_for_scheduled_task();
- auto local_instance_id = stringify(m_local_io_ctx.get_instance_id());
- mock_image_map->update_instances_added({local_instance_id});
+ mock_image_map->update_instances_added({m_local_instance_id});
std::set<std::string> shuffled_global_image_ids;
"global id 11", "global id 12", "global id 13", "global id 14"
};
- auto local_instance_id = stringify(m_local_io_ctx.get_instance_id());
std::map<std::string, cls::rbd::MirrorImageMap> image_mapping;
for (auto& global_image_id : global_image_ids) {
- image_mapping[global_image_id] = {local_instance_id, {}, {}};
+ image_mapping[global_image_id] = {m_local_instance_id, {}, {}};
}
// ACQUIRE
&peer_ack_ctxs);
std::unique_ptr<MockImageMap> mock_image_map{
- MockImageMap::create(m_local_io_ctx, &mock_threads, mock_listener)};
+ MockImageMap::create(m_local_io_ctx, &mock_threads, m_local_instance_id,
+ mock_listener)};
C_SaferCond cond;
mock_image_map->init(&cond);
ASSERT_EQ(0, cond.wait());
- mock_image_map->update_instances_added({local_instance_id});
+ mock_image_map->update_instances_added({m_local_instance_id});
std::set<std::string> global_image_ids_ack(global_image_ids);
MockListener mock_listener(this);
std::unique_ptr<MockImageMap> mock_image_map{
- MockImageMap::create(m_local_io_ctx, &mock_threads, mock_listener)};
+ MockImageMap::create(m_local_io_ctx, &mock_threads, m_local_instance_id,
+ mock_listener)};
C_SaferCond cond;
mock_image_map->init(&cond);
&peer_ack_ctxs);
wait_for_scheduled_task();
- auto local_instance_id = stringify(m_local_io_ctx.get_instance_id());
- mock_image_map->update_instances_added({local_instance_id});
+ mock_image_map->update_instances_added({m_local_instance_id});
std::set<std::string> shuffled_global_image_ids;
MockListener mock_listener(this);
std::unique_ptr<MockImageMap> mock_image_map{
- MockImageMap::create(m_local_io_ctx, &mock_threads, mock_listener)};
+ MockImageMap::create(m_local_io_ctx, &mock_threads, m_local_instance_id,
+ mock_listener)};
C_SaferCond cond;
mock_image_map->init(&cond);
ASSERT_EQ(0, cond.wait());
- auto local_instance_id = stringify(m_local_io_ctx.get_instance_id());
- mock_image_map->update_instances_added({local_instance_id});
+ mock_image_map->update_instances_added({m_local_instance_id});
std::set<std::string> global_image_ids{
"global id 1", "global id 2", "global id 3", "remote id 4",
MockListener mock_listener(this);
std::unique_ptr<MockImageMap> mock_image_map{
- MockImageMap::create(m_local_io_ctx, &mock_threads, mock_listener)};
+ MockImageMap::create(m_local_io_ctx, &mock_threads, m_local_instance_id,
+ mock_listener)};
C_SaferCond cond;
mock_image_map->init(&cond);
};
template <typename I>
-ImageMap<I>::ImageMap(librados::IoCtx &ioctx, Threads<I> *threads, image_map::Listener &listener)
- : m_ioctx(ioctx),
- m_threads(threads),
+ImageMap<I>::ImageMap(librados::IoCtx &ioctx, Threads<I> *threads,
+ const std::string& instance_id,
+ image_map::Listener &listener)
+ : m_ioctx(ioctx), m_threads(threads), m_instance_id(instance_id),
m_listener(listener),
m_lock(unique_lock_name("rbd::mirror::ImageMap::m_lock", this)) {
}
image_map::ActionType action_type =
m_policy->start_action(global_image_id);
image_map::LookupInfo info = m_policy->lookup(global_image_id);
- assert(info.instance_id != image_map::UNMAPPED_INSTANCE_ID);
+ dout(15) << "global_image_id=" << global_image_id << ", "
+ << "action=" << action_type << ", "
+ << "instance=" << info.instance_id << dendl;
switch (action_type) {
case image_map::ACTION_TYPE_NONE:
continue;
case image_map::ACTION_TYPE_MAP_UPDATE:
+ assert(info.instance_id != image_map::UNMAPPED_INSTANCE_ID);
map_updates.emplace_back(global_image_id, info.instance_id,
info.mapped_time);
break;
map_removals.emplace(global_image_id);
break;
case image_map::ACTION_TYPE_ACQUIRE:
+ assert(info.instance_id != image_map::UNMAPPED_INSTANCE_ID);
acquire_updates.emplace_back(global_image_id, info.instance_id);
break;
case image_map::ACTION_TYPE_RELEASE:
+ assert(info.instance_id != image_map::UNMAPPED_INSTANCE_ID);
release_updates.emplace_back(global_image_id, info.instance_id);
break;
}
template <typename I>
void ImageMap<I>::update_instances_added(
const std::vector<std::string> &instance_ids) {
- dout(20) << dendl;
-
{
Mutex::Locker locker(m_lock);
if (m_shutting_down) {
return;
}
+ std::vector<std::string> filtered_instance_ids;
+ filter_instance_ids(instance_ids, &filtered_instance_ids, false);
+ if (filtered_instance_ids.empty()) {
+ return;
+ }
+
+ dout(20) << "instance_ids=" << filtered_instance_ids << dendl;
+
std::set<std::string> remap_global_image_ids;
- m_policy->add_instances(instance_ids, &remap_global_image_ids);
+ m_policy->add_instances(filtered_instance_ids, &remap_global_image_ids);
for (auto const &global_image_id : remap_global_image_ids) {
schedule_action(global_image_id);
template <typename I>
void ImageMap<I>::update_instances_removed(
const std::vector<std::string> &instance_ids) {
- dout(20) << dendl;
-
{
Mutex::Locker locker(m_lock);
if (m_shutting_down) {
return;
}
+ std::vector<std::string> filtered_instance_ids;
+ filter_instance_ids(instance_ids, &filtered_instance_ids, true);
+ if (filtered_instance_ids.empty()) {
+ return;
+ }
+
+ dout(20) << "instance_ids=" << filtered_instance_ids << dendl;
+
std::set<std::string> remap_global_image_ids;
- m_policy->remove_instances(instance_ids, &remap_global_image_ids);
+ m_policy->remove_instances(filtered_instance_ids, &remap_global_image_ids);
for (auto const &global_image_id : remap_global_image_ids) {
schedule_action(global_image_id);
CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
std::string policy_type = cct->_conf->get_val<string>("rbd_mirror_image_policy_type");
- if (policy_type == "simple") {
+ if (policy_type == "none" || policy_type == "simple") {
m_policy.reset(image_map::SimplePolicy::create(m_ioctx));
} else {
assert(false); // not really needed as such, but catch it.
wait_for_async_ops(on_finish);
}
+template <typename I>
+void ImageMap<I>::filter_instance_ids(
+ const std::vector<std::string> &instance_ids,
+ std::vector<std::string> *filtered_instance_ids, bool removal) const {
+ CephContext *cct = reinterpret_cast<CephContext *>(m_ioctx.cct());
+ std::string policy_type = cct->_conf->get_val<string>("rbd_mirror_image_policy_type");
+
+ if (policy_type != "none") {
+ *filtered_instance_ids = instance_ids;
+ return;
+ }
+
+ if (removal) {
+ // propagate removals for external instances
+ for (auto& instance_id : instance_ids) {
+ if (instance_id != m_instance_id) {
+ filtered_instance_ids->push_back(instance_id);
+ }
+ }
+ } else if (std::find(instance_ids.begin(), instance_ids.end(),
+ m_instance_id) != instance_ids.end()) {
+ // propagate addition only for local instance
+ filtered_instance_ids->push_back(m_instance_id);
+ }
+}
+
} // namespace mirror
} // namespace rbd
class ImageMap {
public:
static ImageMap *create(librados::IoCtx &ioctx, Threads<ImageCtxT> *threads,
+ const std::string& instance_id,
image_map::Listener &listener) {
- return new ImageMap(ioctx, threads, listener);
+ return new ImageMap(ioctx, threads, instance_id, listener);
}
~ImageMap();
struct C_NotifyInstance;
ImageMap(librados::IoCtx &ioctx, Threads<ImageCtxT> *threads,
- image_map::Listener &listener);
+ const std::string& instance_id, image_map::Listener &listener);
struct Update {
std::string global_image_id;
librados::IoCtx &m_ioctx;
Threads<ImageCtxT> *m_threads;
+ std::string m_instance_id;
image_map::Listener &m_listener;
std::unique_ptr<image_map::Policy> m_policy; // our mapping policy
const std::set<std::string> &global_image_ids);
void update_images_removed(const std::string &peer_uuid,
const std::set<std::string> &global_image_ids);
+
+ void filter_instance_ids(const std::vector<std::string> &instance_ids,
+ std::vector<std::string> *filtered_instance_ids,
+ bool removal) const;
+
};
} // namespace mirror
Mutex::Locker locker(m_lock);
assert(!m_image_map);
m_image_map.reset(ImageMap<I>::create(m_local_io_ctx, m_threads,
+ m_instance_watcher->get_instance_id(),
m_image_map_listener));
auto ctx = new FunctionContext([this, on_finish](int r) {
void PoolReplayer<I>::handle_instances_added(const InstanceIds &instance_ids) {
dout(5) << "instance_ids=" << instance_ids << dendl;
- // TODO only register ourselves for now
- auto instance_id = m_instance_watcher->get_instance_id();
- m_image_map->update_instances_added({instance_id});
+ m_image_map->update_instances_added(instance_ids);
}
template <typename I>
void PoolReplayer<I>::handle_instances_removed(
const InstanceIds &instance_ids) {
dout(5) << "instance_ids=" << instance_ids << dendl;
- // TODO
+
+ m_image_map->update_instances_removed(instance_ids);
}
} // namespace mirror
RWLock::WLocker map_lock(m_map_lock);
for (auto& it : image_mapping) {
+ assert(!it.second.instance_id.empty());
auto map_result = m_map[it.second.instance_id].emplace(it.first);
assert(map_result.second);
RWLock::WLocker map_lock(m_map_lock);
for (auto& instance : instance_ids) {
+ assert(!instance.empty());
m_map.emplace(instance, std::set<std::string>{});
}
}
instance_id = do_map(m_map, global_image_id);
+ assert(!instance_id.empty());
dout(5) << "global_image_id=" << global_image_id << ", "
<< "instance_id=" << instance_id << dendl;
dout(5) << "global_image_id=" << global_image_id << ", "
<< "instance_id=" << instance_id << dendl;
+ assert(!instance_id.empty());
m_map[instance_id].erase(global_image_id);
image_state->instance_id = UNMAPPED_INSTANCE_ID;
image_state->mapped_time = {};