do_notifies(ctx.notify_list, curmap);
do_queries(ctx.query_map, curmap);
do_infos(ctx.info_map, curmap);
+
+ for (auto& [osd, ls] : ctx.message_map) {
+ if (!curmap->is_up(osd)) {
+ dout(20) << __func__ << " skipping down osd." << osd << dendl;
+ continue;
+ }
+ ConnectionRef con = service.get_con_osd_cluster(
+ osd, curmap->get_epoch());
+ if (!con) {
+ dout(20) << __func__ << " skipping osd." << osd << " (NULL con)"
+ << dendl;
+ continue;
+ }
+ service.maybe_share_map(con.get(), curmap);
+ for (auto m : ls) {
+ con->send_message2(m);
+ }
+ ls.clear();
+ }
}
if ((!ctx.transaction.empty() || ctx.transaction.has_contexts()) && pg) {
int tr = store->queue_transaction(
BufferedRecoveryMessages::BufferedRecoveryMessages(PeeringCtx &ctx)
: query_map(std::move(ctx.query_map)),
info_map(std::move(ctx.info_map)),
- notify_list(std::move(ctx.notify_list))
+ notify_list(std::move(ctx.notify_list)),
+ message_map(std::move(ctx.message_map))
{
ctx.query_map.clear();
ctx.info_map.clear();
ctx.notify_list.clear();
+ ctx.message_map.clear();
}
void PGPool::update(CephContext *cct, OSDMapRef map)
map<int, map<spg_t, pg_query_t> > query_map;
map<int, vector<pg_notify_t>> info_map;
map<int, vector<pg_notify_t>> notify_list;
+ map<int, vector<MessageRef>> message_map;
BufferedRecoveryMessages() = default;
BufferedRecoveryMessages(PeeringCtx &);
ovec.reserve(ovec.size() + nlist.size());
ovec.insert(ovec.end(), nlist.begin(), nlist.end());
}
+ for (auto &[target, ls] : m.message_map) {
+ auto &ovec = message_map[target];
+ // put buffered messages in front
+ ls.reserve(ls.size() + ovec.size());
+ ls.insert(ls.end(), ovec.begin(), ovec.end());
+ ovec.clear();
+ ovec.swap(ls);
+ }
}
};
map<int, map<spg_t, pg_query_t> > &query_map;
map<int, vector<pg_notify_t>> &info_map;
map<int, vector<pg_notify_t>> ¬ify_list;
+ map<int, vector<MessageRef>> &message_map;
ObjectStore::Transaction &transaction;
HBHandle * const handle = nullptr;
query_map(wrapped.query_map),
info_map(wrapped.info_map),
notify_list(wrapped.notify_list),
+ message_map(wrapped.message_map),
transaction(wrapped.transaction),
handle(wrapped.handle) {}
: query_map(buf.query_map),
info_map(buf.info_map),
notify_list(buf.notify_list),
+ message_map(buf.message_map),
transaction(wrapped.transaction),
handle(wrapped.handle) {}
PeeringCtxWrapper(PeeringCtxWrapper &&ctx) = default;
+ void send_osd_message(int target, Message *m) {
+ message_map[target].push_back(m);
+ }
void send_notify(pg_shard_t to, const pg_notify_t &n) {
notify_list[to.osd].emplace_back(n);
}