From: Samuel Just Date: Tue, 24 Apr 2012 00:40:58 +0000 (-0700) Subject: OSD,PG: Move Op,SubOp queueing into PG X-Git-Tag: v0.50~109^2~2^2~69 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ddef446dc115a85ec40e985e5a4235a68b95b99a;p=ceph.git OSD,PG: Move Op,SubOp queueing into PG PG now handles delaying/discarding messages since pg map epoch may not be the same as the OSD map. Signed-off-by: Samuel Just --- diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 48c2053728e2..5d2cdb2f6000 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -4567,20 +4567,6 @@ void OSD::handle_pg_scan(OpRequestRef op) pg->put(); } -bool OSD::scan_is_queueable(PG *pg, OpRequestRef op) -{ - MOSDPGScan *m = (MOSDPGScan *)op->request; - assert(m->get_header().type == MSG_OSD_PG_SCAN); - assert(pg->is_locked()); - - if (m->query_epoch < pg->info.history.same_interval_since) { - dout(10) << *pg << " got old scan, ignoring" << dendl; - return false; - } - - return true; -} - void OSD::handle_pg_backfill(OpRequestRef op) { MOSDPGBackfill *m = (MOSDPGBackfill*)op->request; @@ -4612,19 +4598,6 @@ void OSD::handle_pg_backfill(OpRequestRef op) pg->put(); } -bool OSD::backfill_is_queueable(PG *pg, OpRequestRef op) -{ - MOSDPGBackfill *m = (MOSDPGBackfill *)op->request; - assert(m->get_header().type == MSG_OSD_PG_BACKFILL); - assert(pg->is_locked()); - - if (m->query_epoch < pg->info.history.same_interval_since) { - dout(10) << *pg << " got old backfill, ignoring" << dendl; - return false; - } - - return true; -} /** PGQuery * from primary to replica | stray @@ -5247,8 +5220,23 @@ void OSD::handle_op(OpRequestRef op) handle_misdirected_op(NULL, op); } return; + } else if (m->may_write() && + (!pg->is_primary() || + !pg->same_for_modify_since(m->get_map_epoch()))) { + handle_misdirected_op(pg, op); + pg->unlock(); + return; + } else if (m->may_read() && + !pg->same_for_read_since(m->get_map_epoch())) { + handle_misdirected_op(pg, op); + pg->unlock(); + return; + } else if (!op_has_sufficient_caps(pg, m)) { + pg->unlock(); + return; } + pg->get(); enqueue_op(pg, op); pg->unlock(); @@ -5363,94 +5351,11 @@ bool OSD::op_is_discardable(MOSDOp *op) // want to do what we can to apply it. if (!op->get_connection()->is_connected() && op->get_version().version == 0) { - dout(10) << " sender " << op->get_connection()->get_peer_addr() - << " not connected, dropping " << *op << dendl; return true; } return false; } -/* - * Determine if we can queue the op right now; if not this deals with it. - * If it's not queueable, we deal with it in one of a few ways: - * dropping the request, putting it into a wait list for later, or - * telling the sender that the request was misdirected. - * - * @return true if the op is queueable; false otherwise. - */ -bool OSD::op_is_queueable(PG *pg, OpRequestRef op) -{ - assert(pg->is_locked()); - MOSDOp *m = (MOSDOp*)op->request; - assert(m->get_header().type == CEPH_MSG_OSD_OP); - - if (!op_has_sufficient_caps(pg, m)) { - reply_op_error(op, -EPERM); - return false; - } - - if (op_is_discardable(m)) { - return false; - } - - // misdirected? - if (m->may_write()) { - if (!pg->is_primary() || - !pg->same_for_modify_since(m->get_map_epoch())) { - handle_misdirected_op(pg, op); - return false; - } - } else { - if (!pg->same_for_read_since(m->get_map_epoch())) { - handle_misdirected_op(pg, op); - return false; - } - } - - if (!pg->is_active()) { - dout(7) << *pg << " not active (yet)" << dendl; - pg->waiting_for_active.push_back(op); - op->mark_delayed(); - return false; - } - - if (pg->is_replay()) { - if (m->get_version().version > 0) { - dout(7) << *pg << " queueing replay at " << m->get_version() - << " for " << *m << dendl; - pg->replay_queue[m->get_version()] = op; - op->mark_delayed(); - } else { - dout(7) << *pg << " waiting until after replay for " << *m << dendl; - pg->waiting_for_active.push_back(op); - } - return false; - } - - return true; -} - -/* - * discard operation, or return true. no side-effects. - */ -bool OSD::subop_is_queueable(PG *pg, OpRequestRef op) -{ - MOSDSubOp *m = (MOSDSubOp *)op->request; - assert(m->get_header().type == MSG_OSD_SUBOP); - assert(pg->is_locked()); - - // same pg? - // if pg changes _at all_, we reset and repeer! - if (m->map_epoch < pg->info.history.same_interval_since) { - dout(10) << "handle_sub_op pg changed " << pg->info.history - << " after " << m->map_epoch - << ", dropping" << dendl; - return false; - } - - return true; -} - /* * enqueue called with osd_lock held */ @@ -5459,42 +5364,7 @@ void OSD::enqueue_op(PG *pg, OpRequestRef op) dout(15) << *pg << " enqueue_op " << op->request << " " << *(op->request) << dendl; assert(pg->is_locked()); - - switch (op->request->get_type()) { - case CEPH_MSG_OSD_OP: - if (!op_is_queueable(pg, op)) - return; - break; - - case MSG_OSD_SUBOP: - if (!subop_is_queueable(pg, op)) - return; - break; - - case MSG_OSD_SUBOPREPLY: - // don't care. - break; - - case MSG_OSD_PG_SCAN: - if (!scan_is_queueable(pg, op)) - return; - break; - - case MSG_OSD_PG_BACKFILL: - if (!backfill_is_queueable(pg, op)) - return; - break; - - default: - assert(0 == "enqueued an illegal message type"); - } - - // add to pg's op_queue - pg->op_queue.push_back(op); - - op_wq.queue(pg); - - op->mark_queued_for_pg(); + pg->queue_op(op); } bool OSD::OpWQ::_enqueue(PG *pg) diff --git a/src/osd/OSD.h b/src/osd/OSD.h index e88c5554f452..9890dceeccf1 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -677,10 +677,8 @@ protected: void handle_pg_trim(OpRequestRef op); void handle_pg_scan(OpRequestRef op); - bool scan_is_queueable(PG *pg, OpRequestRef op); void handle_pg_backfill(OpRequestRef op); - bool backfill_is_queueable(PG *pg, OpRequestRef op); void handle_pg_remove(OpRequestRef op); void queue_pg_for_deletion(PG *pg); @@ -1136,16 +1134,11 @@ public: void handle_sub_op(OpRequestRef op); void handle_sub_op_reply(OpRequestRef op); -private: + static bool op_is_discardable(class MOSDOp *m); /// check if we can throw out op from a disconnected client - bool op_is_discardable(class MOSDOp *m); /// check if op has sufficient caps bool op_has_sufficient_caps(PG *pg, class MOSDOp *m); /// check if op should be (re)queued for processing - bool op_is_queueable(PG *pg, OpRequestRef op); - /// check if subop should be (re)queued for processing - bool subop_is_queueable(PG *pg, OpRequestRef op); - public: void force_remount(); diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 11a61cdd16f2..f5b9329a6059 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -25,6 +25,8 @@ #include "messages/MOSDPGRemove.h" #include "messages/MOSDPGInfo.h" #include "messages/MOSDPGTrim.h" +#include "messages/MOSDPGScan.h" +#include "messages/MOSDPGBackfill.h" #include "messages/MOSDSubOp.h" #include "messages/MOSDSubOpReply.h" @@ -1461,11 +1463,22 @@ void PG::do_request(OpRequestRef op) { // do any pending flush do_pending_flush(); + if (can_discard_request(op)) { + return; + } else if (must_delay_request(op)) { + op_waiters.push_back(op); + return; + } else if (!is_active()) { + waiting_for_active.push_back(op); + return; + } else if (is_replay()) { + waiting_for_active.push_back(op); + return; + } switch (op->request->get_type()) { case CEPH_MSG_OSD_OP: - if (!osd->op_is_discardable((MOSDOp*)op->request)) - do_op(op); // do it now + do_op(op); // do it now break; case MSG_OSD_SUBOP: @@ -3882,6 +3895,132 @@ ostream& operator<<(ostream& out, const PG& pg) return out; } +bool PG::can_discard_op(OpRequestRef op) +{ + MOSDOp *m = (MOSDOp*)op->request; + if (OSD::op_is_discardable(m)) { + return true; + } else if (m->may_write() && + (!is_primary() || + !same_for_modify_since(m->get_map_epoch()))) { + return true; + } else if (m->may_read() && + !same_for_read_since(m->get_map_epoch())) { + return true; + } else if (is_replay()) { + if (m->get_version().version > 0) { + dout(7) << " queueing replay at " << m->get_version() + << " for " << *m << dendl; + replay_queue[m->get_version()] = op; + op->mark_delayed(); + return true; + } + } + return false; +} + +bool PG::can_discard_subop(OpRequestRef op) +{ + MOSDSubOp *m = (MOSDSubOp *)op->request; + assert(m->get_header().type == MSG_OSD_SUBOP); + + // same pg? + // if pg changes _at all_, we reset and repeer! + if (old_peering_msg(m->map_epoch, m->map_epoch)) { + dout(10) << "handle_sub_op pg changed " << info.history + << " after " << m->map_epoch + << ", dropping" << dendl; + return true; + } + return false; +} + +bool PG::can_discard_scan(OpRequestRef op) +{ + MOSDPGScan *m = (MOSDPGScan *)op->request; + assert(m->get_header().type == MSG_OSD_PG_SCAN); + + if (old_peering_msg(m->map_epoch, m->query_epoch)) { + dout(10) << " got old scan, ignoring" << dendl; + return true; + } + return false; +} + +bool PG::can_discard_backfill(OpRequestRef op) +{ + MOSDPGBackfill *m = (MOSDPGBackfill *)op->request; + assert(m->get_header().type == MSG_OSD_PG_BACKFILL); + + if (old_peering_msg(m->map_epoch, m->query_epoch)) { + dout(10) << " got old backfill, ignoring" << dendl; + return true; + } + + return false; + +} + +bool PG::can_discard_request(OpRequestRef op) +{ + switch (op->request->get_type()) { + case CEPH_MSG_OSD_OP: + return can_discard_op(op); + case MSG_OSD_SUBOP: + return can_discard_subop(op); + case MSG_OSD_SUBOPREPLY: + return false; + case MSG_OSD_PG_SCAN: + return can_discard_scan(op); + + case MSG_OSD_PG_BACKFILL: + return can_discard_backfill(op); + } + return true; +} + +bool PG::must_delay_request(OpRequestRef op) +{ + switch (op->request->get_type()) { + case CEPH_MSG_OSD_OP: + return !require_same_or_newer_map( + static_cast(op->request)->get_map_epoch()); + + case MSG_OSD_SUBOP: + return !require_same_or_newer_map( + static_cast(op->request)->map_epoch); + + case MSG_OSD_SUBOPREPLY: + return !require_same_or_newer_map( + static_cast(op->request)->map_epoch); + + case MSG_OSD_PG_SCAN: + return !require_same_or_newer_map( + static_cast(op->request)->map_epoch); + + case MSG_OSD_PG_BACKFILL: + return !require_same_or_newer_map( + static_cast(op->request)->map_epoch); + } + assert(0); + return false; +} + +void PG::queue_op(OpRequestRef op) +{ + if (can_discard_op(op)) + return; + // TODO: deal with osd queueing + op_queue.push_back(op); +} + +void PG::take_waiters() +{ + list ls; + ls.swap(op_waiters); + ls.splice(ls.end(), op_queue); +} + void PG::handle_peering_event(CephPeeringEvtRef evt, RecoveryCtx *rctx) { if (old_peering_evt(evt)) @@ -4120,6 +4259,7 @@ boost::statechart::result PG::RecoveryState::Reset::react(const ActMap&) } pg->update_heartbeat_peers(); + pg->take_waiters(); return transit< Started >(); } diff --git a/src/osd/PG.h b/src/osd/PG.h index 14e5495a838e..58ecd5aff399 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -1363,12 +1363,24 @@ public: pair ¬ify_info); void fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch); bool acting_up_affected(const vector& newup, const vector& newacting); + + // OpRequest queueing + bool can_discard_op(OpRequestRef op); + bool can_discard_scan(OpRequestRef op); + bool can_discard_subop(OpRequestRef op); + bool can_discard_backfill(OpRequestRef op); + bool can_discard_request(OpRequestRef op); + + bool must_delay_request(OpRequestRef op); + void queue_op(OpRequestRef op); + bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch); bool old_peering_evt(CephPeeringEvtRef evt) { return old_peering_msg(evt->get_epoch_sent(), evt->get_epoch_requested()); } // recovery bits + void take_waiters(); void queue_peering_event(CephPeeringEvtRef evt); void handle_peering_event(CephPeeringEvtRef evt, RecoveryCtx *rctx); void queue_notify(epoch_t msg_epoch, epoch_t query_epoch,