#include "messages/MOSDMarkMeDown.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
+#include "messages/MOSDBackoff.h"
#include "messages/MOSDRepOp.h"
#include "messages/MOSDRepOpReply.h"
#include "messages/MOSDSubOp.h"
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
return m->get_map_epoch();
}
+ case CEPH_MSG_OSD_BACKOFF: {
+ MOSDBackoff *m = static_cast<MOSDBackoff*>(op->get_req());
+ return m->osd_epoch;
+ }
case MSG_OSD_SUBOP:
return replica_op_required_epoch<MOSDSubOp, MSG_OSD_SUBOP>(op);
case MSG_OSD_REPOP:
case CEPH_MSG_OSD_OP:
handle_op(op, osdmap);
break;
+ case CEPH_MSG_OSD_BACKOFF:
+ handle_backoff(op, osdmap);
+ break;
// for replication etc.
case MSG_OSD_SUBOP:
handle_replica_op<MOSDSubOp, MSG_OSD_SUBOP>(op, osdmap);
}
}
+void OSD::handle_backoff(OpRequestRef& op, OSDMapRef& osdmap)
+{
+ MOSDBackoff *m = static_cast<MOSDBackoff*>(op->get_req());
+ Session *s = static_cast<Session*>(m->get_connection()->get_priv());
+ dout(10) << __func__ << " " << *m << " session " << s << dendl;
+ assert(s);
+ s->put();
+
+ if (m->op != CEPH_OSD_BACKOFF_OP_ACK_BLOCK) {
+ dout(10) << __func__ << " unrecognized op, ignoring" << dendl;
+ return;
+ }
+
+ // map hobject range to PG(s)
+ bool queued = false;
+ hobject_t pos = m->begin;
+ do {
+ pg_t _pgid(pos.get_hash(), pos.pool);
+ if (osdmap->have_pg_pool(pos.pool)) {
+ _pgid = osdmap->raw_pg_to_pg(_pgid);
+ }
+ if (!osdmap->have_pg_pool(_pgid.pool())) {
+ // missing pool -- drop
+ return;
+ }
+ spg_t pgid;
+ if (osdmap->get_primary_shard(_pgid, &pgid)) {
+ dout(10) << __func__ << " pos " << pos << " pgid " << pgid << dendl;
+ PGRef pg = get_pg_or_queue_for_pg(pgid, op, s);
+ if (pg) {
+ if (!queued) {
+ enqueue_op(pg, op);
+ queued = true;
+ } else {
+ // use a fresh OpRequest
+ m->get(); // take a ref for the new OpRequest
+ OpRequestRef newop(op_tracker.create_request<OpRequest, Message*>(m));
+ newop->mark_event("duplicated original op for another pg");
+ enqueue_op(pg, newop);
+ }
+ }
+ }
+ // advance
+ pos = _pgid.get_hobj_end(osdmap->get_pg_pool(pos.pool)->get_pg_num());
+ dout(20) << __func__ << " next pg " << pos << dendl;
+ } while (cmp_bitwise(pos, m->end) < 0);
+}
+
template<typename T, int MSGTYPE>
void OSD::handle_replica_op(OpRequestRef& op, OSDMapRef& osdmap)
{
bool ms_can_fast_dispatch(Message *m) const {
switch (m->get_type()) {
case CEPH_MSG_OSD_OP:
+ case CEPH_MSG_OSD_BACKOFF:
case MSG_OSD_SUBOP:
case MSG_OSD_REPOP:
case MSG_OSD_SUBOPREPLY:
void handle_scrub(struct MOSDScrub *m);
void handle_osd_ping(class MOSDPing *m);
void handle_op(OpRequestRef& op, OSDMapRef& osdmap);
+ void handle_backoff(OpRequestRef& op, OSDMapRef& osdmap);
template <typename T, int MSGTYPE>
void handle_replica_op(OpRequestRef& op, OSDMapRef& osdmap);
#include "common/perf_counters.h"
#include "messages/MOSDOp.h"
+#include "messages/MOSDBackoff.h"
#include "messages/MOSDSubOp.h"
#include "messages/MOSDSubOpReply.h"
#include "messages/MOSDPGTrim.h"
src_oloc.key = oid.name;
}
+void PrimaryLogPG::handle_backoff(OpRequestRef& op)
+{
+ MOSDBackoff *m = static_cast<MOSDBackoff*>(op->get_req());
+ SessionRef session((Session *)m->get_connection()->get_priv());
+ if (!session)
+ return; // drop it.
+ session->put(); // get_priv takes a ref, and so does the SessionRef
+ hobject_t begin = info.pgid.pgid.get_hobj_start();
+ hobject_t end = info.pgid.pgid.get_hobj_end(pool.info.get_pg_num());
+ if (cmp_bitwise(begin, m->begin) < 0) {
+ begin = m->begin;
+ }
+ if (cmp_bitwise(end, m->end) > 0) {
+ end = m->end;
+ }
+ dout(10) << __func__ << " backoff ack id " << m->id
+ << " [" << begin << "," << end << ")" << dendl;
+ session->ack_backoff(cct, m->id, begin, end);
+}
+
void PrimaryLogPG::do_request(
OpRequestRef& op,
ThreadPool::TPHandle &handle)