} else if (!is_active()) {
dout(20) << __func__ << " not active" << dendl;
} else {
- do_queries(ctx.query_map, curmap);
do_infos(ctx.info_map, curmap);
for (auto& [osd, ls] : ctx.message_map) {
}
}
-/** do_queries
- * send out pending queries for info | summaries
- */
-void OSD::do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
- OSDMapRef curmap)
-{
- for (map<int, map<spg_t,pg_query_t> >::iterator pit = query_map.begin();
- pit != query_map.end();
- ++pit) {
- if (!curmap->is_up(pit->first)) {
- dout(20) << __func__ << " skipping down osd." << pit->first << dendl;
- continue;
- }
- int who = pit->first;
- ConnectionRef con = service.get_con_osd_cluster(who, curmap->get_epoch());
- if (!con) {
- dout(20) << __func__ << " skipping osd." << who
- << " (NULL con)" << dendl;
- continue;
- }
- service.maybe_share_map(con.get(), curmap);
- dout(7) << __func__ << " querying osd." << who
- << " on " << pit->second.size() << " PGs" << dendl;
- MOSDPGQuery *m = new MOSDPGQuery(curmap->get_epoch(),
- std::move(pit->second));
- con->send_message(m);
- }
-}
-
-
void OSD::do_infos(map<int,vector<pg_notify_t>>& info_map,
OSDMapRef curmap)
{
void dispatch_context(PeeringCtx &ctx, PG *pg, OSDMapRef curmap,
ThreadPool::TPHandle *handle = NULL);
void discard_context(PeeringCtx &ctx);
- void do_queries(map<int, map<spg_t,pg_query_t> >& query_map,
- OSDMapRef map);
void do_infos(map<int,vector<pg_notify_t>>& info_map,
OSDMapRef map);
#define dout_subsys ceph_subsys_osd
BufferedRecoveryMessages::BufferedRecoveryMessages(PeeringCtx &ctx)
- : query_map(std::move(ctx.query_map)),
- info_map(std::move(ctx.info_map)),
+ : info_map(std::move(ctx.info_map)),
message_map(std::move(ctx.message_map))
{
- ctx.query_map.clear();
ctx.info_map.clear();
ctx.message_map.clear();
}
void PeeringState::activate(
ObjectStore::Transaction& t,
epoch_t activation_epoch,
- map<int, map<spg_t,pg_query_t> >& query_map,
map<int,vector<pg_notify_t>> *activator_map,
PeeringCtxWrapper &ctx)
{
ps->start_flush(context< PeeringMachine >().get_cur_transaction());
ps->activate(context< PeeringMachine >().get_cur_transaction(),
ps->get_osdmap_epoch(),
- context< PeeringMachine >().get_query_map(),
&context< PeeringMachine >().get_info_map(),
context< PeeringMachine >().get_recovery_ctx());
const Activate& actevt) {
DECLARE_LOCALS;
psdout(10) << "In ReplicaActive, about to call activate" << dendl;
- map<int, map<spg_t, pg_query_t> > query_map;
ps->activate(
context< PeeringMachine >().get_cur_transaction(),
actevt.activation_epoch,
- query_map,
NULL,
context< PeeringMachine >().get_recovery_ctx());
psdout(10) << "Activate Finished" << dendl;
// [primary only] content recovery state
struct BufferedRecoveryMessages {
- map<int, map<spg_t, pg_query_t> > query_map;
map<int, vector<pg_notify_t>> info_map;
map<int, vector<MessageRef>> message_map;
BufferedRecoveryMessages(PeeringCtx &);
void accept_buffered_messages(BufferedRecoveryMessages &m) {
- for (auto &[target, qmap] : m.query_map) {
- auto &omap = query_map[target];
- for (auto &[pg, query] : qmap) {
- omap[pg] = query;
- }
- }
for (auto &[target, ivec] : m.info_map) {
auto &ovec = info_map[target];
ovec.reserve(ovec.size() + ivec.size());
struct PeeringCtxWrapper {
utime_t start_time;
BufferedRecoveryMessages &msgs;
- map<int, map<spg_t, pg_query_t> > &query_map;
map<int, vector<pg_notify_t>> &info_map;
ObjectStore::Transaction &transaction;
HBHandle * const handle = nullptr;
PeeringCtxWrapper(PeeringCtx &wrapped) :
msgs(wrapped),
- query_map(wrapped.query_map),
info_map(wrapped.info_map),
transaction(wrapped.transaction),
handle(wrapped.handle) {}
PeeringCtxWrapper(BufferedRecoveryMessages &buf, PeeringCtx &wrapped)
: msgs(buf),
- query_map(buf.query_map),
info_map(buf.info_map),
transaction(wrapped.transaction),
handle(wrapped.handle) {}
return state->rctx->transaction;
}
-
- map<int, map<spg_t, pg_query_t> > &get_query_map() {
- ceph_assert(state->rctx);
- return state->rctx->query_map;
- }
-
map<int, vector<pg_notify_t>> &get_info_map() {
ceph_assert(state->rctx);
return state->rctx->info_map;
void activate(
ObjectStore::Transaction& t,
epoch_t activation_epoch,
- map<int, map<spg_t,pg_query_t> >& query_map,
map<int, vector<pg_notify_t>> *activator_map,
PeeringCtxWrapper &ctx);