uuid != nullptr ? uuid : &gen_uuid,
peer.cluster_name,
peer.client_name));
- m_peer_configs[peer].insert(pool_id);
+ m_pool_peers[pool_id].insert(peer);
m_mirrored_pools.insert(pool_name);
}
if (name != nullptr) {
void delete_pool(const string &name, const peer_t &peer) {
int64_t pool_id = m_cluster->pool_lookup(name.c_str());
ASSERT_GE(pool_id, 0);
- if (m_peer_configs.find(peer) != m_peer_configs.end()) {
- m_peer_configs[peer].erase(pool_id);
+ if (m_pool_peers.find(pool_id) != m_pool_peers.end()) {
+ m_pool_peers[pool_id].erase(peer);
m_mirrored_pools.erase(name);
- if (m_peer_configs[peer].empty()) {
- m_peer_configs.erase(peer);
+ if (m_pool_peers[pool_id].empty()) {
+ m_pool_peers.erase(pool_id);
}
}
m_pools.erase(name);
void check_peers() {
m_cluster_watcher->refresh_pools();
Mutex::Locker l(m_lock);
- ASSERT_EQ(m_peer_configs, m_cluster_watcher->get_peer_configs());
+ ASSERT_EQ(m_pool_peers, m_cluster_watcher->get_pool_peers());
ASSERT_EQ(m_mirrored_pools, m_cluster_watcher->get_pool_names());
}
set<string> m_pools;
set<string> m_mirrored_pools;
- map<peer_t, set<int64_t> > m_peer_configs;
+ ClusterWatcher::PoolPeers m_pool_peers;
};
TEST_F(TestClusterWatcher, NoPools) {
{
}
-const map<peer_t, set<int64_t> >& ClusterWatcher::get_peer_configs() const
+const ClusterWatcher::PoolPeers& ClusterWatcher::get_pool_peers() const
{
assert(m_lock.is_locked());
- return m_peer_configs;
+ return m_pool_peers;
}
-const std::set<std::string>& ClusterWatcher::get_pool_names() const
+const ClusterWatcher::PoolNames& ClusterWatcher::get_pool_names() const
{
assert(m_lock.is_locked());
return m_pool_names;
void ClusterWatcher::refresh_pools()
{
dout(20) << "enter" << dendl;
- map<peer_t, set<int64_t> > peer_configs;
- set<string> pool_names;
- read_configs(&peer_configs, &pool_names);
+
+ PoolPeers pool_peers;
+ PoolNames pool_names;
+ read_pool_peers(&pool_peers, &pool_names);
Mutex::Locker l(m_lock);
- m_peer_configs = peer_configs;
+ m_pool_peers = pool_peers;
m_pool_names = pool_names;
// TODO: perhaps use a workqueue instead, once we get notifications
// about config changes for existing pools
}
-void ClusterWatcher::read_configs(map<peer_t, set<int64_t> > *peer_configs,
- set<string> *pool_names)
+void ClusterWatcher::read_pool_peers(PoolPeers *pool_peers,
+ PoolNames *pool_names)
{
list<pair<int64_t, string> > pools;
int r = m_cluster->pool_list2(pools);
continue;
}
- for (peer_t peer : configs) {
- dout(20) << "pool " << pool_name << " has mirroring enabled for peer "
- << peer << dendl;
- (*peer_configs)[peer].insert(pool_id);
- }
-
+ pool_peers->insert({pool_id, Peers{configs.begin(), configs.end()}});
pool_names->insert(pool_name);
}
}
*/
class ClusterWatcher {
public:
+ typedef std::set<peer_t> Peers;
+ typedef std::map<int64_t, Peers> PoolPeers;
+ typedef std::set<std::string> PoolNames;
+
ClusterWatcher(RadosRef cluster, Mutex &lock);
~ClusterWatcher() = default;
ClusterWatcher(const ClusterWatcher&) = delete;
ClusterWatcher& operator=(const ClusterWatcher&) = delete;
+
// Caller controls frequency of calls
void refresh_pools();
- const std::map<peer_t, std::set<int64_t> >& get_peer_configs() const;
- const std::set<std::string>& get_pool_names() const;
+ const PoolPeers& get_pool_peers() const;
+ const PoolNames& get_pool_names() const;
private:
- void read_configs(std::map<peer_t, std::set<int64_t> > *peer_configs,
- std::set<std::string> *pool_names);
-
Mutex &m_lock;
RadosRef m_cluster;
- std::map<peer_t, std::set<int64_t> > m_peer_configs;
- std::set<std::string> m_pool_names;
+ PoolPeers m_pool_peers;
+ PoolNames m_pool_names;
+
+ void read_pool_peers(PoolPeers *pool_peers, PoolNames *pool_names);
};
} // namespace mirror
m_local_cluster_watcher->refresh_pools();
Mutex::Locker l(m_lock);
if (!m_manual_stop) {
- update_replayers(m_local_cluster_watcher->get_peer_configs());
+ update_replayers(m_local_cluster_watcher->get_pool_peers());
}
// TODO: make interval configurable
m_cond.WaitInterval(g_ceph_context, m_lock, seconds(30));
}
}
-void Mirror::update_replayers(const map<peer_t, set<int64_t> > &peer_configs)
+void Mirror::update_replayers(const PoolPeers &pool_peers)
{
dout(20) << "enter" << dendl;
assert(m_lock.is_locked());
// remove stale replayers before creating new replayers
for (auto it = m_replayers.begin(); it != m_replayers.end();) {
- peer_t peer = it->first;
- if (peer_configs.find(peer) == peer_configs.end()) {
+ auto next_it(it);
+ ++next_it;
+
+ auto &peer = it->first.second;
+ auto pool_peer_it = pool_peers.find(it->first.first);
+ if (pool_peer_it == pool_peers.end() ||
+ pool_peer_it->second.find(peer) == pool_peer_it->second.end()) {
dout(20) << "removing replayer for " << peer << dendl;
// TODO: make async
- m_replayers.erase(it++);
- } else {
- ++it;
+ m_replayers.erase(it);
}
+ it = next_it;
}
- for (auto &kv : peer_configs) {
- const peer_t &peer = kv.first;
- if (m_replayers.find(peer) == m_replayers.end()) {
- dout(20) << "starting replayer for " << peer << dendl;
- unique_ptr<Replayer> replayer(new Replayer(m_threads, m_image_deleter,
- m_local, peer, m_args));
- // TODO: make async, and retry connecting within replayer
- int r = replayer->init();
- if (r < 0) {
- continue;
+ for (auto &kv : pool_peers) {
+ for (auto &peer : kv.second) {
+ PoolPeer pool_peer(kv.first, peer);
+ if (m_replayers.find(pool_peer) == m_replayers.end()) {
+ dout(20) << "starting replayer for " << peer << dendl;
+ unique_ptr<Replayer> replayer(new Replayer(m_threads, m_image_deleter,
+ m_local, peer, m_args));
+ // TODO: make async, and retry connecting within replayer
+ int r = replayer->init();
+ if (r < 0) {
+ continue;
+ }
+ m_replayers.insert(std::make_pair(pool_peer, std::move(replayer)));
}
- m_replayers.insert(std::make_pair(peer, std::move(replayer)));
}
}
}
void flush();
private:
- void refresh_peers(const set<peer_t> &peers);
- void update_replayers(const map<peer_t, set<int64_t> > &peer_configs);
+ typedef ClusterWatcher::PoolPeers PoolPeers;
+ typedef std::pair<int64_t, peer_t> PoolPeer;
+
+ void update_replayers(const PoolPeers &pool_peers);
CephContext *m_cct;
std::vector<const char*> m_args;
// monitor local cluster for config changes in peers
std::unique_ptr<ClusterWatcher> m_local_cluster_watcher;
- std::map<peer_t, std::unique_ptr<Replayer> > m_replayers;
std::shared_ptr<ImageDeleter> m_image_deleter;
+ std::map<PoolPeer, std::unique_ptr<Replayer> > m_replayers;
atomic_t m_stopping;
bool m_manual_stop = false;
MirrorAdminSocketHook *m_asok_hook;