pg->put();
}
-bool OSD::scan_is_queueable(PG *pg, OpRequestRef op)
-{
- MOSDPGScan *m = (MOSDPGScan *)op->request;
- assert(m->get_header().type == MSG_OSD_PG_SCAN);
- assert(pg->is_locked());
-
- if (m->query_epoch < pg->info.history.same_interval_since) {
- dout(10) << *pg << " got old scan, ignoring" << dendl;
- return false;
- }
-
- return true;
-}
-
void OSD::handle_pg_backfill(OpRequestRef op)
{
MOSDPGBackfill *m = (MOSDPGBackfill*)op->request;
pg->put();
}
-bool OSD::backfill_is_queueable(PG *pg, OpRequestRef op)
-{
- MOSDPGBackfill *m = (MOSDPGBackfill *)op->request;
- assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
- assert(pg->is_locked());
-
- if (m->query_epoch < pg->info.history.same_interval_since) {
- dout(10) << *pg << " got old backfill, ignoring" << dendl;
- return false;
- }
-
- return true;
-}
/** PGQuery
* from primary to replica | stray
handle_misdirected_op(NULL, op);
}
return;
+ } else if (m->may_write() &&
+ (!pg->is_primary() ||
+ !pg->same_for_modify_since(m->get_map_epoch()))) {
+ handle_misdirected_op(pg, op);
+ pg->unlock();
+ return;
+ } else if (m->may_read() &&
+ !pg->same_for_read_since(m->get_map_epoch())) {
+ handle_misdirected_op(pg, op);
+ pg->unlock();
+ return;
+ } else if (!op_has_sufficient_caps(pg, m)) {
+ pg->unlock();
+ return;
}
+
pg->get();
enqueue_op(pg, op);
pg->unlock();
// want to do what we can to apply it.
if (!op->get_connection()->is_connected() &&
op->get_version().version == 0) {
- dout(10) << " sender " << op->get_connection()->get_peer_addr()
- << " not connected, dropping " << *op << dendl;
return true;
}
return false;
}
-/*
- * Determine if we can queue the op right now; if not this deals with it.
- * If it's not queueable, we deal with it in one of a few ways:
- * dropping the request, putting it into a wait list for later, or
- * telling the sender that the request was misdirected.
- *
- * @return true if the op is queueable; false otherwise.
- */
-bool OSD::op_is_queueable(PG *pg, OpRequestRef op)
-{
- assert(pg->is_locked());
- MOSDOp *m = (MOSDOp*)op->request;
- assert(m->get_header().type == CEPH_MSG_OSD_OP);
-
- if (!op_has_sufficient_caps(pg, m)) {
- reply_op_error(op, -EPERM);
- return false;
- }
-
- if (op_is_discardable(m)) {
- return false;
- }
-
- // misdirected?
- if (m->may_write()) {
- if (!pg->is_primary() ||
- !pg->same_for_modify_since(m->get_map_epoch())) {
- handle_misdirected_op(pg, op);
- return false;
- }
- } else {
- if (!pg->same_for_read_since(m->get_map_epoch())) {
- handle_misdirected_op(pg, op);
- return false;
- }
- }
-
- if (!pg->is_active()) {
- dout(7) << *pg << " not active (yet)" << dendl;
- pg->waiting_for_active.push_back(op);
- op->mark_delayed();
- return false;
- }
-
- if (pg->is_replay()) {
- if (m->get_version().version > 0) {
- dout(7) << *pg << " queueing replay at " << m->get_version()
- << " for " << *m << dendl;
- pg->replay_queue[m->get_version()] = op;
- op->mark_delayed();
- } else {
- dout(7) << *pg << " waiting until after replay for " << *m << dendl;
- pg->waiting_for_active.push_back(op);
- }
- return false;
- }
-
- return true;
-}
-
-/*
- * discard operation, or return true. no side-effects.
- */
-bool OSD::subop_is_queueable(PG *pg, OpRequestRef op)
-{
- MOSDSubOp *m = (MOSDSubOp *)op->request;
- assert(m->get_header().type == MSG_OSD_SUBOP);
- assert(pg->is_locked());
-
- // same pg?
- // if pg changes _at all_, we reset and repeer!
- if (m->map_epoch < pg->info.history.same_interval_since) {
- dout(10) << "handle_sub_op pg changed " << pg->info.history
- << " after " << m->map_epoch
- << ", dropping" << dendl;
- return false;
- }
-
- return true;
-}
-
/*
* enqueue called with osd_lock held
*/
dout(15) << *pg << " enqueue_op " << op->request << " "
<< *(op->request) << dendl;
assert(pg->is_locked());
-
- switch (op->request->get_type()) {
- case CEPH_MSG_OSD_OP:
- if (!op_is_queueable(pg, op))
- return;
- break;
-
- case MSG_OSD_SUBOP:
- if (!subop_is_queueable(pg, op))
- return;
- break;
-
- case MSG_OSD_SUBOPREPLY:
- // don't care.
- break;
-
- case MSG_OSD_PG_SCAN:
- if (!scan_is_queueable(pg, op))
- return;
- break;
-
- case MSG_OSD_PG_BACKFILL:
- if (!backfill_is_queueable(pg, op))
- return;
- break;
-
- default:
- assert(0 == "enqueued an illegal message type");
- }
-
- // add to pg's op_queue
- pg->op_queue.push_back(op);
-
- op_wq.queue(pg);
-
- op->mark_queued_for_pg();
+ pg->queue_op(op);
}
bool OSD::OpWQ::_enqueue(PG *pg)
void handle_pg_trim(OpRequestRef op);
void handle_pg_scan(OpRequestRef op);
- bool scan_is_queueable(PG *pg, OpRequestRef op);
void handle_pg_backfill(OpRequestRef op);
- bool backfill_is_queueable(PG *pg, OpRequestRef op);
void handle_pg_remove(OpRequestRef op);
void queue_pg_for_deletion(PG *pg);
void handle_sub_op(OpRequestRef op);
void handle_sub_op_reply(OpRequestRef op);
-private:
+ static bool op_is_discardable(class MOSDOp *m);
/// check if we can throw out op from a disconnected client
- bool op_is_discardable(class MOSDOp *m);
/// check if op has sufficient caps
bool op_has_sufficient_caps(PG *pg, class MOSDOp *m);
/// check if op should be (re)queued for processing
- bool op_is_queueable(PG *pg, OpRequestRef op);
- /// check if subop should be (re)queued for processing
- bool subop_is_queueable(PG *pg, OpRequestRef op);
-
public:
void force_remount();
#include "messages/MOSDPGRemove.h"
#include "messages/MOSDPGInfo.h"
#include "messages/MOSDPGTrim.h"
+#include "messages/MOSDPGScan.h"
+#include "messages/MOSDPGBackfill.h"
#include "messages/MOSDSubOp.h"
#include "messages/MOSDSubOpReply.h"
{
// do any pending flush
do_pending_flush();
+ if (can_discard_request(op)) {
+ return;
+ } else if (must_delay_request(op)) {
+ op_waiters.push_back(op);
+ return;
+ } else if (!is_active()) {
+ waiting_for_active.push_back(op);
+ return;
+ } else if (is_replay()) {
+ waiting_for_active.push_back(op);
+ return;
+ }
switch (op->request->get_type()) {
case CEPH_MSG_OSD_OP:
- if (!osd->op_is_discardable((MOSDOp*)op->request))
- do_op(op); // do it now
+ do_op(op); // do it now
break;
case MSG_OSD_SUBOP:
return out;
}
+bool PG::can_discard_op(OpRequestRef op)
+{
+ MOSDOp *m = (MOSDOp*)op->request;
+ if (OSD::op_is_discardable(m)) {
+ return true;
+ } else if (m->may_write() &&
+ (!is_primary() ||
+ !same_for_modify_since(m->get_map_epoch()))) {
+ return true;
+ } else if (m->may_read() &&
+ !same_for_read_since(m->get_map_epoch())) {
+ return true;
+ } else if (is_replay()) {
+ if (m->get_version().version > 0) {
+ dout(7) << " queueing replay at " << m->get_version()
+ << " for " << *m << dendl;
+ replay_queue[m->get_version()] = op;
+ op->mark_delayed();
+ return true;
+ }
+ }
+ return false;
+}
+
+bool PG::can_discard_subop(OpRequestRef op)
+{
+ MOSDSubOp *m = (MOSDSubOp *)op->request;
+ assert(m->get_header().type == MSG_OSD_SUBOP);
+
+ // same pg?
+ // if pg changes _at all_, we reset and repeer!
+ if (old_peering_msg(m->map_epoch, m->map_epoch)) {
+ dout(10) << "handle_sub_op pg changed " << info.history
+ << " after " << m->map_epoch
+ << ", dropping" << dendl;
+ return true;
+ }
+ return false;
+}
+
+bool PG::can_discard_scan(OpRequestRef op)
+{
+ MOSDPGScan *m = (MOSDPGScan *)op->request;
+ assert(m->get_header().type == MSG_OSD_PG_SCAN);
+
+ if (old_peering_msg(m->map_epoch, m->query_epoch)) {
+ dout(10) << " got old scan, ignoring" << dendl;
+ return true;
+ }
+ return false;
+}
+
+bool PG::can_discard_backfill(OpRequestRef op)
+{
+ MOSDPGBackfill *m = (MOSDPGBackfill *)op->request;
+ assert(m->get_header().type == MSG_OSD_PG_BACKFILL);
+
+ if (old_peering_msg(m->map_epoch, m->query_epoch)) {
+ dout(10) << " got old backfill, ignoring" << dendl;
+ return true;
+ }
+
+ return false;
+
+}
+
+bool PG::can_discard_request(OpRequestRef op)
+{
+ switch (op->request->get_type()) {
+ case CEPH_MSG_OSD_OP:
+ return can_discard_op(op);
+ case MSG_OSD_SUBOP:
+ return can_discard_subop(op);
+ case MSG_OSD_SUBOPREPLY:
+ return false;
+ case MSG_OSD_PG_SCAN:
+ return can_discard_scan(op);
+
+ case MSG_OSD_PG_BACKFILL:
+ return can_discard_backfill(op);
+ }
+ return true;
+}
+
+bool PG::must_delay_request(OpRequestRef op)
+{
+ switch (op->request->get_type()) {
+ case CEPH_MSG_OSD_OP:
+ return !require_same_or_newer_map(
+ static_cast<MOSDOp*>(op->request)->get_map_epoch());
+
+ case MSG_OSD_SUBOP:
+ return !require_same_or_newer_map(
+ static_cast<MOSDSubOp*>(op->request)->map_epoch);
+
+ case MSG_OSD_SUBOPREPLY:
+ return !require_same_or_newer_map(
+ static_cast<MOSDSubOpReply*>(op->request)->map_epoch);
+
+ case MSG_OSD_PG_SCAN:
+ return !require_same_or_newer_map(
+ static_cast<MOSDPGScan*>(op->request)->map_epoch);
+
+ case MSG_OSD_PG_BACKFILL:
+ return !require_same_or_newer_map(
+ static_cast<MOSDPGBackfill*>(op->request)->map_epoch);
+ }
+ assert(0);
+ return false;
+}
+
+void PG::queue_op(OpRequestRef op)
+{
+ if (can_discard_op(op))
+ return;
+ // TODO: deal with osd queueing
+ op_queue.push_back(op);
+}
+
+void PG::take_waiters()
+{
+ list<OpRequestRef> ls;
+ ls.swap(op_waiters);
+ ls.splice(ls.end(), op_queue);
+}
+
void PG::handle_peering_event(CephPeeringEvtRef evt, RecoveryCtx *rctx)
{
if (old_peering_evt(evt))
}
pg->update_heartbeat_peers();
+ pg->take_waiters();
return transit< Started >();
}
pair<int, pg_info_t> ¬ify_info);
void fulfill_log(int from, const pg_query_t &query, epoch_t query_epoch);
bool acting_up_affected(const vector<int>& newup, const vector<int>& newacting);
+
+ // OpRequest queueing
+ bool can_discard_op(OpRequestRef op);
+ bool can_discard_scan(OpRequestRef op);
+ bool can_discard_subop(OpRequestRef op);
+ bool can_discard_backfill(OpRequestRef op);
+ bool can_discard_request(OpRequestRef op);
+
+ bool must_delay_request(OpRequestRef op);
+ void queue_op(OpRequestRef op);
+
bool old_peering_msg(epoch_t reply_epoch, epoch_t query_epoch);
bool old_peering_evt(CephPeeringEvtRef evt) {
return old_peering_msg(evt->get_epoch_sent(), evt->get_epoch_requested());
}
// recovery bits
+ void take_waiters();
void queue_peering_event(CephPeeringEvtRef evt);
void handle_peering_event(CephPeeringEvtRef evt, RecoveryCtx *rctx);
void queue_notify(epoch_t msg_epoch, epoch_t query_epoch,