#ifndef CEPH_MOSDPGLOG_H
#define CEPH_MOSDPGLOG_H
-#include "msg/Message.h"
+#include "messages/MOSDPeeringOp.h"
-class MOSDPGLog : public Message {
+class MOSDPGLog : public MOSDPeeringOp {
static const int HEAD_VERSION = 5;
static const int COMPAT_VERSION = 5;
spg_t get_pgid() const { return spg_t(info.pgid.pgid, to); }
epoch_t get_query_epoch() const { return query_epoch; }
- MOSDPGLog() : Message(MSG_OSD_PG_LOG, HEAD_VERSION, COMPAT_VERSION) {
+ spg_t get_spg() const override {
+ return spg_t(info.pgid.pgid, to);
+ }
+ epoch_t get_map_epoch() const override {
+ return epoch;
+ }
+ epoch_t get_min_epoch() const override {
+ return query_epoch;
+ }
+
+ PGPeeringEvent *get_event() override {
+ return new PGPeeringEvent(
+ epoch, query_epoch,
+ MLogRec(pg_shard_t(get_source().num(), from),
+ this),
+ true,
+ new PGCreateInfo(
+ get_spg(),
+ epoch,
+ info.history,
+ past_intervals));
+ }
+
+ MOSDPGLog() : MOSDPeeringOp(MSG_OSD_PG_LOG, HEAD_VERSION, COMPAT_VERSION) {
set_priority(CEPH_MSG_PRIO_HIGH);
}
MOSDPGLog(shard_id_t to, shard_id_t from, version_t mv, pg_info_t& i)
- : Message(MSG_OSD_PG_LOG, HEAD_VERSION, COMPAT_VERSION),
+ : MOSDPeeringOp(MSG_OSD_PG_LOG, HEAD_VERSION, COMPAT_VERSION),
epoch(mv), query_epoch(mv),
to(to), from(from),
info(i) {
}
MOSDPGLog(shard_id_t to, shard_id_t from,
version_t mv, pg_info_t& i, epoch_t query_epoch)
- : Message(MSG_OSD_PG_LOG, HEAD_VERSION, COMPAT_VERSION),
+ : MOSDPeeringOp(MSG_OSD_PG_LOG, HEAD_VERSION, COMPAT_VERSION),
epoch(mv), query_epoch(query_epoch),
to(to), from(from),
info(i) {
public:
const char *get_type_name() const override { return "PGlog"; }
- void print(ostream& out) const override {
+ void inner_print(ostream& out) const override {
// NOTE: log is not const, but operator<< doesn't touch fields
// swapped out by OSD code.
- out << "pg_log(" << info.pgid << " epoch " << epoch
- << " log " << log
- << " pi " << past_intervals
- << " query_epoch " << query_epoch << ")";
+ out << "log " << log
+ << " pi " << past_intervals;
}
void encode_payload(uint64_t features) override {
// peering event?
switch (m->get_type()) {
+ case MSG_OSD_PG_QUERY:
+ return handle_fast_pg_query(static_cast<MOSDPGQuery*>(m));
+ case MSG_OSD_PG_NOTIFY:
+ return handle_fast_pg_notify(static_cast<MOSDPGNotify*>(m));
+ case MSG_OSD_PG_INFO:
+ return handle_fast_pg_info(static_cast<MOSDPGInfo*>(m));
+ case MSG_OSD_PG_REMOVE:
+ return handle_fast_pg_remove(static_cast<MOSDPGRemove*>(m));
+
+ // these are single-pg messages that handle themselves
+ case MSG_OSD_PG_LOG:
case MSG_OSD_PG_TRIM:
case MSG_OSD_BACKFILL_RESERVE:
case MSG_OSD_RECOVERY_RESERVE:
case MSG_OSD_PG_CREATE:
handle_pg_create(op);
break;
- case MSG_OSD_PG_NOTIFY:
- handle_pg_notify(op);
- break;
- case MSG_OSD_PG_QUERY:
- handle_pg_query(op);
- break;
- case MSG_OSD_PG_LOG:
- handle_pg_log(op);
- break;
- case MSG_OSD_PG_REMOVE:
- handle_pg_remove(op);
- break;
- case MSG_OSD_PG_INFO:
- handle_pg_info(op);
- break;
}
}
}
-/** PGNotify
- * from non-primary to primary
- * includes pg_info_t.
- * NOTE: called with opqueue active.
- */
-void OSD::handle_pg_notify(OpRequestRef op)
+void OSD::handle_fast_pg_query(MOSDPGQuery *m)
{
- const MOSDPGNotify *m = static_cast<const MOSDPGNotify*>(op->get_req());
- assert(m->get_type() == MSG_OSD_PG_NOTIFY);
-
- dout(7) << "handle_pg_notify from " << m->get_source() << dendl;
+ dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
int from = m->get_source().num();
-
- if (!require_osd_peer(op->get_req()))
- return;
-
- if (!require_same_or_newer_map(op, m->get_epoch(), false))
- return;
-
- op->mark_started();
-
- for (auto it = m->get_pg_list().begin();
- it != m->get_pg_list().end();
- ++it) {
- handle_pg_peering_evt(
- spg_t(it->first.info.pgid.pgid, it->first.to),
- it->first.info.history, it->second,
- it->first.query_epoch,
+ for (auto& p : m->pg_list) {
+ enqueue_peering_evt(
+ p.first,
PGPeeringEventRef(
- new PGPeeringEvent(
- it->first.epoch_sent, it->first.query_epoch,
- MNotifyRec(pg_shard_t(from, it->first.from), it->first,
- op->get_req()->get_connection()->get_features())))
+ std::make_shared<PGPeeringEvent>(
+ p.second.epoch_sent, p.second.epoch_sent,
+ MQuery(
+ p.first,
+ pg_shard_t(from, p.second.from),
+ p.second,
+ p.second.epoch_sent),
+ false))
);
}
}
-void OSD::handle_pg_log(OpRequestRef op)
+void OSD::handle_fast_pg_notify(MOSDPGNotify* m)
{
- MOSDPGLog *m = static_cast<MOSDPGLog*>(op->get_nonconst_req());
- assert(m->get_type() == MSG_OSD_PG_LOG);
- dout(7) << "handle_pg_log " << *m << " from " << m->get_source() << dendl;
-
- if (!require_osd_peer(op->get_req()))
- return;
-
+ dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
int from = m->get_source().num();
- if (!require_same_or_newer_map(op, m->get_epoch(), false))
- return;
-
- op->mark_started();
- handle_pg_peering_evt(
- spg_t(m->get_pgid().pgid, m->to),
- m->info.history, m->past_intervals, m->get_epoch(),
- PGPeeringEventRef(
- new PGPeeringEvent(
- m->get_epoch(), m->get_query_epoch(),
- MLogRec(pg_shard_t(from, m->from), m)))
- );
-}
-
-void OSD::handle_pg_info(OpRequestRef op)
-{
- const MOSDPGInfo *m = static_cast<const MOSDPGInfo *>(op->get_req());
- assert(m->get_type() == MSG_OSD_PG_INFO);
- dout(7) << "handle_pg_info " << *m << " from " << m->get_source() << dendl;
-
- if (!require_osd_peer(op->get_req()))
- return;
-
+ for (auto& p : m->get_pg_list()) {
+ spg_t pgid(p.first.info.pgid.pgid, p.first.to);
+ enqueue_peering_evt(
+ pgid,
+ PGPeeringEventRef(
+ std::make_shared<PGPeeringEvent>(
+ p.first.epoch_sent,
+ p.first.query_epoch,
+ MNotifyRec(
+ pgid, pg_shard_t(from, p.first.from),
+ p.first,
+ m->get_connection()->get_features(),
+ p.second),
+ true,
+ new PGCreateInfo(
+ pgid,
+ p.first.query_epoch,
+ p.first.info.history,
+ p.second)
+ )));
+ }
+}
+
+void OSD::handle_fast_pg_info(MOSDPGInfo* m)
+{
+ dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
int from = m->get_source().num();
- if (!require_same_or_newer_map(op, m->get_epoch(), false))
- return;
-
- op->mark_started();
-
- for (auto p = m->pg_list.begin();
- p != m->pg_list.end();
- ++p) {
- handle_pg_peering_evt(
- spg_t(p->first.info.pgid.pgid, p->first.to),
- p->first.info.history, p->second, p->first.epoch_sent,
+ for (auto& p : m->pg_list) {
+ enqueue_peering_evt(
+ spg_t(p.first.info.pgid.pgid, p.first.to),
PGPeeringEventRef(
- new PGPeeringEvent(
- p->first.epoch_sent, p->first.query_epoch,
+ std::make_shared<PGPeeringEvent>(
+ p.first.epoch_sent, p.first.query_epoch,
MInfoRec(
- pg_shard_t(
- from, p->first.from), p->first.info, p->first.epoch_sent)))
+ pg_shard_t(from, p.first.from),
+ p.first.info,
+ p.first.epoch_sent)))
);
}
}
+void OSD::handle_fast_pg_remove(MOSDPGRemove *m)
+{
+ dout(7) << __func__ << " " << *m << " from " << m->get_source() << dendl;
+ for (auto& pgid : m->pg_list) {
+ enqueue_peering_evt(
+ pgid,
+ PGPeeringEventRef(
+ std::make_shared<PGPeeringEvent>(
+ m->get_epoch(), m->get_epoch(),
+ PG::DeleteStart())));
+ }
+}
+
void OSD::handle_force_recovery(Message *m)
{
MOSDForceRecovery *msg = static_cast<MOSDForceRecovery*>(m);
msg->put();
}
-/** PGQuery
- * from primary to replica | stray
- * NOTE: called with opqueue active.
- */
-void OSD::handle_pg_query(OpRequestRef op)
+void OSD::handle_pg_query_nopg(const MQuery& q)
{
- assert(osd_lock.is_locked());
-
- const MOSDPGQuery *m = static_cast<const MOSDPGQuery*>(op->get_req());
- assert(m->get_type() == MSG_OSD_PG_QUERY);
+ spg_t pgid = q.pgid;
+ dout(10) << __func__ << " " << pgid << dendl;
- if (!require_osd_peer(op->get_req()))
+ OSDMapRef osdmap = get_osdmap();
+ if (!osdmap->have_pg_pool(pgid.pool()))
return;
- dout(7) << "handle_pg_query from " << m->get_source() << " epoch " << m->get_epoch() << dendl;
- int from = m->get_source().num();
+ // get active crush mapping
+ int up_primary, acting_primary;
+ vector<int> up, acting;
+ osdmap->pg_to_up_acting_osds(
+ pgid.pgid, &up, &up_primary, &acting, &acting_primary);
- if (!require_same_or_newer_map(op, m->get_epoch(), false))
+ // same primary?
+ pg_history_t history = q.query.history;
+ bool valid_history = project_pg_history(
+ pgid, history, q.query.epoch_sent,
+ up, up_primary, acting, acting_primary);
+
+ if (!valid_history ||
+ q.query.epoch_sent < history.same_interval_since) {
+ dout(10) << " pg " << pgid << " dne, and pg has changed in "
+ << history.same_interval_since
+ << " (msg from " << q.query.epoch_sent << ")" << dendl;
return;
+ }
- op->mark_started();
-
- map< int, vector<pair<pg_notify_t, PastIntervals> > > notify_list;
-
- for (auto it = m->pg_list.begin();
- it != m->pg_list.end();
- ++it) {
- spg_t pgid = it->first;
-
- if (service.splitting(pgid)) {
- peering_wait_for_split[pgid].push_back(
- PGPeeringEventRef(
- new PGPeeringEvent(
- it->second.epoch_sent, it->second.epoch_sent,
- MQuery(pg_shard_t(from, it->second.from),
- it->second, it->second.epoch_sent))));
- continue;
- }
-
- {
- RWLock::RLocker l(pg_map_lock);
- if (pg_map.count(pgid)) {
- PG *pg = 0;
- pg = _lookup_lock_pg_with_map_lock_held(pgid);
- pg->queue_query(
- it->second.epoch_sent, it->second.epoch_sent,
- pg_shard_t(from, it->second.from), it->second);
- pg->unlock();
- continue;
- }
- }
-
- if (!osdmap->have_pg_pool(pgid.pool()))
- continue;
-
- // get active crush mapping
- int up_primary, acting_primary;
- vector<int> up, acting;
- osdmap->pg_to_up_acting_osds(
- pgid.pgid, &up, &up_primary, &acting, &acting_primary);
-
- // same primary?
- pg_history_t history = it->second.history;
- bool valid_history = project_pg_history(
- pgid, history, it->second.epoch_sent,
- up, up_primary, acting, acting_primary);
-
- if (!valid_history ||
- it->second.epoch_sent < history.same_interval_since) {
- dout(10) << " pg " << pgid << " dne, and pg has changed in "
- << history.same_interval_since
- << " (msg from " << it->second.epoch_sent << ")" << dendl;
- continue;
- }
-
- dout(10) << " pg " << pgid << " dne" << dendl;
- pg_info_t empty(spg_t(pgid.pgid, it->second.to));
- if (it->second.type == pg_query_t::LOG ||
- it->second.type == pg_query_t::FULLLOG) {
- ConnectionRef con = service.get_con_osd_cluster(from, osdmap->get_epoch());
- if (con) {
- MOSDPGLog *mlog = new MOSDPGLog(
- it->second.from, it->second.to,
- osdmap->get_epoch(), empty,
- it->second.epoch_sent);
- service.share_map_peer(from, con.get(), osdmap);
- con->send_message(mlog);
- }
+ dout(10) << " pg " << pgid << " dne" << dendl;
+ pg_info_t empty(spg_t(pgid.pgid, q.query.to));
+ ConnectionRef con = service.get_con_osd_cluster(q.from.osd, osdmap->get_epoch());
+ if (con) {
+ Message *m;
+ if (q.query.type == pg_query_t::LOG ||
+ q.query.type == pg_query_t::FULLLOG) {
+ m = new MOSDPGLog(
+ q.query.from, q.query.to,
+ osdmap->get_epoch(), empty,
+ q.query.epoch_sent);
} else {
- notify_list[from].push_back(
+ vector<pair<pg_notify_t,PastIntervals>> ls;
+ ls.push_back(
make_pair(
pg_notify_t(
- it->second.from, it->second.to,
- it->second.epoch_sent,
+ q.query.from, q.query.to,
+ q.query.epoch_sent,
osdmap->get_epoch(),
empty),
PastIntervals()));
+ m = new MOSDPGNotify(osdmap->get_epoch(), ls);
}
+ service.share_map_peer(q.from.osd, con.get(), osdmap);
+ con->send_message(m);
}
- do_notifies(notify_list, osdmap);
}
-void OSD::handle_pg_remove(OpRequestRef op)
-{
- const MOSDPGRemove *m = static_cast<const MOSDPGRemove *>(op->get_req());
- assert(m->get_type() == MSG_OSD_PG_REMOVE);
- assert(osd_lock.is_locked());
-
- if (!require_osd_peer(op->get_req()))
- return;
-
- dout(7) << "handle_pg_remove from " << m->get_source() << " on "
- << m->pg_list.size() << " pgs" << dendl;
-
- if (!require_same_or_newer_map(op, m->get_epoch(), false))
- return;
-
- op->mark_started();
-
- for (auto it = m->pg_list.begin();
- it != m->pg_list.end();
- ++it) {
- spg_t pgid = *it;
- enqueue_peering_evt(
- pgid,
- PGPeeringEventRef(
- new PGPeeringEvent(
- m->get_epoch(), m->get_epoch(),
- PG::DeleteStart())));
- }
-}
-
// =========================================================
// RECOVERY
dequeue_peering_evt(
pg,
PGPeeringEventRef(
- new PGPeeringEvent(
+ std::make_shared<PGPeeringEvent>(
e, e,
PG::DeleteSome())),
handle);