} else if (!is_active()) {
dout(20) << __func__ << " not active" << dendl;
} else {
- do_notifies(ctx.notify_list, curmap);
do_queries(ctx.query_map, curmap);
do_infos(ctx.info_map, curmap);
}
}
-/** do_notifies
- * Send an MOSDPGNotify to a primary, with a list of PGs that I have
- * content for, and they are primary for.
- */
-
-void OSD::do_notifies(
- map<int,vector<pg_notify_t>>& notify_list,
- OSDMapRef curmap)
-{
- for (auto& [osd, notifies] : notify_list) {
- if (!curmap->is_up(osd)) {
- dout(20) << __func__ << " skipping down osd." << osd << dendl;
- continue;
- }
- ConnectionRef con = service.get_con_osd_cluster(
- osd, curmap->get_epoch());
- if (!con) {
- dout(20) << __func__ << " skipping osd." << osd << " (NULL con)" << dendl;
- continue;
- }
- service.maybe_share_map(con.get(), curmap);
- dout(7) << __func__ << " osd." << osd
- << " on " << notifies.size() << " PGs" << dendl;
- MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(),
- std::move(notifies));
- con->send_message(m);
- }
-}
-
-
/** do_queries
* send out pending queries for info | summaries
*/
#include "messages/MOSDPGInfo.h"
#include "messages/MOSDPGTrim.h"
#include "messages/MOSDPGLog.h"
+#include "messages/MOSDPGNotify.h"
#define dout_context cct
#define dout_subsys ceph_subsys_osd
BufferedRecoveryMessages::BufferedRecoveryMessages(PeeringCtx &ctx)
: query_map(std::move(ctx.query_map)),
info_map(std::move(ctx.info_map)),
- notify_list(std::move(ctx.notify_list)),
message_map(std::move(ctx.message_map))
{
ctx.query_map.clear();
ctx.info_map.clear();
- ctx.notify_list.clear();
ctx.message_map.clear();
}
+void PeeringCtxWrapper::send_notify(int to, const pg_notify_t &n)
+{
+ vector<pg_notify_t> notifies;
+ notifies.push_back(n);
+ message_map[to].push_back(
+ new MOSDPGNotify(n.epoch_sent, std::move(notifies))
+ );
+}
+
void PGPool::update(CephContext *cct, OSDMapRef map)
{
const pg_pool_t *pi = map->get_pg_pool(id);
update_history(query.query.history);
fulfill_info(query.from, query.query, notify_info);
rctx.send_notify(
- notify_info.first,
+ notify_info.first.osd,
pg_notify_t(
notify_info.first.shard, pg_whoami.shard,
query.query_epoch,
DECLARE_LOCALS;
if (ps->should_send_notify() && ps->get_primary().osd >= 0) {
context< PeeringMachine >().send_notify(
- ps->get_primary(),
+ ps->get_primary().osd,
pg_notify_t(
ps->get_primary().shard, ps->pg_whoami.shard,
ps->get_osdmap_epoch(),
DECLARE_LOCALS;
if (ps->should_send_notify() && ps->get_primary().osd >= 0) {
context< PeeringMachine >().send_notify(
- ps->get_primary(),
+ ps->get_primary().osd,
pg_notify_t(
ps->get_primary().shard, ps->pg_whoami.shard,
ps->get_osdmap_epoch(),
DECLARE_LOCALS;
if (ps->should_send_notify() && ps->get_primary().osd >= 0) {
context< PeeringMachine >().send_notify(
- ps->get_primary(),
+ ps->get_primary().osd,
pg_notify_t(
ps->get_primary().shard, ps->pg_whoami.shard,
ps->get_osdmap_epoch(),
struct BufferedRecoveryMessages {
map<int, map<spg_t, pg_query_t> > query_map;
map<int, vector<pg_notify_t>> info_map;
- map<int, vector<pg_notify_t>> notify_list;
map<int, vector<MessageRef>> message_map;
BufferedRecoveryMessages() = default;
ovec.reserve(ovec.size() + ivec.size());
ovec.insert(ovec.end(), ivec.begin(), ivec.end());
}
- for (auto &[target, nlist] : m.notify_list) {
- auto &ovec = notify_list[target];
- ovec.reserve(ovec.size() + nlist.size());
- ovec.insert(ovec.end(), nlist.begin(), nlist.end());
- }
for (auto &[target, ls] : m.message_map) {
auto &ovec = message_map[target];
// put buffered messages in front
}
};
+/**
+ * Wraps PeeringCtx to hide the difference between buffering messages to
+ * be sent after flush or immediately.
+ */
+struct PeeringCtxWrapper {
+ utime_t start_time;
+ map<int, map<spg_t, pg_query_t> > &query_map;
+ map<int, vector<pg_notify_t>> &info_map;
+ map<int, vector<MessageRef>> &message_map;
+ ObjectStore::Transaction &transaction;
+ HBHandle * const handle = nullptr;
+
+ PeeringCtxWrapper(PeeringCtx &wrapped) :
+ query_map(wrapped.query_map),
+ info_map(wrapped.info_map),
+ message_map(wrapped.message_map),
+ transaction(wrapped.transaction),
+ handle(wrapped.handle) {}
+
+ PeeringCtxWrapper(BufferedRecoveryMessages &buf, PeeringCtx &wrapped)
+ : query_map(buf.query_map),
+ info_map(buf.info_map),
+ message_map(buf.message_map),
+ transaction(wrapped.transaction),
+ handle(wrapped.handle) {}
+
+ PeeringCtxWrapper(PeeringCtxWrapper &&ctx) = default;
+
+ void send_osd_message(int target, Message *m) {
+ message_map[target].push_back(m);
+ }
+ void send_notify(int to, const pg_notify_t &n);
+};
+
/* Encapsulates PG recovery process */
class PeeringState : public MissingLoc::MappingInfo {
public:
virtual ~PeeringListener() {}
};
-private:
- /**
- * Wraps PeeringCtx to hide the difference between buffering messages to
- * be sent after flush or immediately.
- */
- struct PeeringCtxWrapper {
- utime_t start_time;
- map<int, map<spg_t, pg_query_t> > &query_map;
- map<int, vector<pg_notify_t>> &info_map;
- map<int, vector<pg_notify_t>> ¬ify_list;
- map<int, vector<MessageRef>> &message_map;
- ObjectStore::Transaction &transaction;
- HBHandle * const handle = nullptr;
-
- PeeringCtxWrapper(PeeringCtx &wrapped) :
- query_map(wrapped.query_map),
- info_map(wrapped.info_map),
- notify_list(wrapped.notify_list),
- message_map(wrapped.message_map),
- transaction(wrapped.transaction),
- handle(wrapped.handle) {}
-
- PeeringCtxWrapper(BufferedRecoveryMessages &buf, PeeringCtx &wrapped)
- : query_map(buf.query_map),
- info_map(buf.info_map),
- notify_list(buf.notify_list),
- message_map(buf.message_map),
- transaction(wrapped.transaction),
- handle(wrapped.handle) {}
-
- PeeringCtxWrapper(PeeringCtxWrapper &&ctx) = default;
-
- void send_osd_message(int target, Message *m) {
- message_map[target].push_back(m);
- }
- void send_notify(pg_shard_t to, const pg_notify_t &n) {
- notify_list[to.osd].emplace_back(n);
- }
- };
-public:
-
struct QueryState : boost::statechart::event< QueryState > {
Formatter *f;
explicit QueryState(Formatter *f) : f(f) {}
return *(state->rctx);
}
- void send_notify(pg_shard_t to, const pg_notify_t &n) {
+ void send_notify(int to, const pg_notify_t &n) {
ceph_assert(state->rctx);
state->rctx->send_notify(to, n);
}