} else if (!is_active()) {
dout(20) << __func__ << " not active" << dendl;
} else {
- do_infos(ctx.info_map, curmap);
-
for (auto& [osd, ls] : ctx.message_map) {
if (!curmap->is_up(osd)) {
dout(20) << __func__ << " skipping down osd." << osd << dendl;
}
}
-void OSD::do_infos(map<int,vector<pg_notify_t>>& info_map,
- OSDMapRef curmap)
-{
- for (auto& [osd, notifies] : info_map) {
- if (!curmap->is_up(osd)) {
- dout(20) << __func__ << " skipping down osd." << osd << dendl;
- continue;
- }
- for (auto& i : notifies) {
- dout(20) << __func__ << " sending info " << i.info
- << " to osd " << osd << dendl;
- }
- ConnectionRef con = service.get_con_osd_cluster(
- osd, curmap->get_epoch());
- if (!con) {
- dout(20) << __func__ << " skipping osd." << osd << " (NULL con)" << dendl;
- continue;
- }
- service.maybe_share_map(con.get(), curmap);
- MOSDPGInfo *m = new MOSDPGInfo(curmap->get_epoch());
- m->pg_list = std::move(notifies);
- con->send_message(m);
- }
- info_map.clear();
-}
-
void OSD::handle_fast_pg_create(MOSDPGCreate2 *m)
{
dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
void dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
ThreadPool::TPHandle *handle = NULL);
void discard_context(PeeringCtx &ctx);
- void do_infos(map<int,vector<pg_notify_t>>& info_map,
- OSDMapRef map);
bool require_mon_peer(const Message *m);
bool require_mon_or_mgr_peer(const Message *m);
#define dout_subsys ceph_subsys_osd
BufferedRecoveryMessages::BufferedRecoveryMessages(PeeringCtx &ctx)
- : info_map(std::move(ctx.info_map)),
- message_map(std::move(ctx.message_map))
+ : message_map(std::move(ctx.message_map))
{
- ctx.info_map.clear();
ctx.message_map.clear();
}
// [primary only] content recovery state
struct BufferedRecoveryMessages {
- map<int, vector<pg_notify_t>> info_map;
map<int, vector<MessageRef>> message_map;
BufferedRecoveryMessages() = default;
BufferedRecoveryMessages(PeeringCtx &);
void accept_buffered_messages(BufferedRecoveryMessages &m) {
- for (auto &[target, ivec] : m.info_map) {
- auto &ovec = info_map[target];
- ovec.reserve(ovec.size() + ivec.size());
- ovec.insert(ovec.end(), ivec.begin(), ivec.end());
- }
for (auto &[target, ls] : m.message_map) {
auto &ovec = message_map[target];
// put buffered messages in front
struct PeeringCtxWrapper {
utime_t start_time;
BufferedRecoveryMessages &msgs;
- map<int, vector<pg_notify_t>> &info_map;
ObjectStore::Transaction &transaction;
HBHandle * const handle = nullptr;
PeeringCtxWrapper(PeeringCtx &wrapped) :
msgs(wrapped),
- info_map(wrapped.info_map),
transaction(wrapped.transaction),
handle(wrapped.handle) {}
PeeringCtxWrapper(BufferedRecoveryMessages &buf, PeeringCtx &wrapped)
: msgs(buf),
- info_map(buf.info_map),
transaction(wrapped.transaction),
handle(wrapped.handle) {}
return state->rctx->transaction;
}
- map<int, vector<pg_notify_t>> &get_info_map() {
- ceph_assert(state->rctx);
- return state->rctx->info_map;
- }
-
PeeringCtxWrapper &get_recovery_ctx() {
assert(state->rctx);
return *(state->rctx);