From: Sage Weil Date: Tue, 25 Oct 2011 05:13:59 +0000 (-0700) Subject: osd: move queue checks into enqueue_op, kill _handle_ helpers X-Git-Tag: v0.38~57^2~4^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fa722de6708d3e92037df6289cc29ece12c8ea66;p=ceph.git osd: move queue checks into enqueue_op, kill _handle_ helpers This simplifies things, and renames the checks to make it clear that we are doing validation checks only, with no side-effects allowed. Also move some checks into the parent handle_op() to further simplify the (re)queue checks. Signed-off-by: Sage Weil --- diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index 85a7358c1f23..829f3e79b2d7 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -5152,6 +5152,9 @@ void OSD::handle_op(MOSDOp *op) return; } + // we don't need encoded payload anymore + op->clear_payload(); + // require same or newer map if (!require_same_or_newer_map(op, op->get_map_epoch())) return; @@ -5171,6 +5174,27 @@ void OSD::handle_op(MOSDOp *op) return; } + // full? + if (osdmap->test_flag(CEPH_OSDMAP_FULL) && + !op->get_source().is_mds()) { // FIXME: we'll exclude mds writes for now. + reply_op_error(op, -ENOSPC); + return; + } + + // invalid? + if (op->get_snapid() != CEPH_NOSNAP) { + reply_op_error(op, -EINVAL); + return; + } + + // too big? + if (g_conf->osd_max_write_size && + op->get_data_len() > g_conf->osd_max_write_size << 20) { + // journal can't hold commit! + reply_op_error(op, -OSD_WRITETOOBIG); + return; + } + // share our map with sender, if they're old _share_map_incoming(op->get_source_inst(), op->get_map_epoch(), (Session *)op->get_connection()->get_priv()); @@ -5198,7 +5222,7 @@ void OSD::handle_op(MOSDOp *op) } pg->get(); - _handle_op(pg, op); + enqueue_op(pg, op); pg->unlock(); pg->put(); } @@ -5234,77 +5258,6 @@ bool OSD::op_has_sufficient_caps(PG *pg, MOSDOp *op) return true; } -/* - * queue an operation, or discard it. avoid side-effects or any "real" work. - */ -void OSD::_handle_op(PG *pg, MOSDOp *op) -{ - dout(10) << *pg << " _handle_op " << op << " " << *op << dendl; - assert(pg->is_locked()); - - if (!op_has_sufficient_caps(pg, op)) { - reply_op_error(op, -EPERM); - return; - } - - // we don't need encoded payload anymore - op->clear_payload(); - - if (op->may_write()) { - // misdirected? - if (!pg->is_primary() || - !pg->same_for_modify_since(op->get_map_epoch())) { - handle_misdirected_op(pg, op); - return; - } - - // full? - if (osdmap->test_flag(CEPH_OSDMAP_FULL) && - !op->get_source().is_mds()) { // FIXME: we'll exclude mds writes for now. - reply_op_error(op, -ENOSPC); - return; - } - - // invalid? - if (op->get_snapid() != CEPH_NOSNAP) { - reply_op_error(op, -EINVAL); - return; - } - - // too big? - if (g_conf->osd_max_write_size && - op->get_data_len() > g_conf->osd_max_write_size << 20) { - // journal can't hold commit! - reply_op_error(op, -OSD_WRITETOOBIG); - return; - } - } else { - // misdirected? - if (!pg->same_for_read_since(op->get_map_epoch())) { - handle_misdirected_op(pg, op); - return; - } - } - - // pg must be active. - if (!pg->is_active()) { - dout(7) << *pg << " not active (yet)" << dendl; - pg->waiting_for_active.push_back(op); - return; - } - if (pg->is_replay()) { - if (op->get_version().version > 0) { - dout(7) << *pg << " queueing replay at " << op->get_version() - << " for " << *op << dendl; - pg->replay_queue[op->get_version()] = op; - return; - } - } - - enqueue_op(pg, op); -} - - void OSD::handle_sub_op(MOSDSubOp *op) { dout(10) << "handle_sub_op " << *op << " epoch " << op->map_epoch << dendl; @@ -5337,32 +5290,11 @@ void OSD::handle_sub_op(MOSDSubOp *op) return; } pg->get(); - _handle_sub_op(pg, op); + enqueue_op(pg, op); pg->unlock(); pg->put(); } -/* - * queue an operation, or discard it. avoid side-effects or any "real" work. - */ -void OSD::_handle_sub_op(PG *pg, MOSDSubOp *op) -{ - dout(10) << *pg << " _handle_sub_op " << op << " " << *op << dendl; - assert(pg->is_locked()); - - // same pg? - // if pg changes _at all_, we reset and repeer! - if (op->map_epoch < pg->info.history.same_interval_since) { - dout(10) << "handle_sub_op pg changed " << pg->info.history - << " after " << op->map_epoch - << ", dropping" << dendl; - op->put(); - return; - } - - enqueue_op(pg, op); // queue for worker threads -} - void OSD::handle_sub_op_reply(MOSDSubOpReply *op) { if (op->get_map_epoch() < up_epoch) { @@ -5393,21 +5325,74 @@ void OSD::handle_sub_op_reply(MOSDSubOpReply *op) return; } pg->get(); - _handle_sub_op_reply(pg, op); + enqueue_op(pg, op); pg->unlock(); pg->put(); } /* - * queue an operation, or discard it. avoid side-effects or any "real" work. + * discard operation, or return true. no side-effects. */ -void OSD::_handle_sub_op_reply(PG *pg, MOSDSubOpReply *op) +bool OSD::op_is_queueable(PG *pg, MOSDOp *op) { - dout(10) << *pg << " _handle_sub_op_reply " << op << " " << *op << dendl; assert(pg->is_locked()); - enqueue_op(pg, op); // queue for worker threads + + if (!op_has_sufficient_caps(pg, op)) { + reply_op_error(op, -EPERM); + return false; + } + + // misdirected? + if (op->may_write()) { + if (!pg->is_primary() || + !pg->same_for_modify_since(op->get_map_epoch())) { + handle_misdirected_op(pg, op); + return false; + } + } else { + if (!pg->same_for_read_since(op->get_map_epoch())) { + handle_misdirected_op(pg, op); + return false; + } + } + + if (!pg->is_active()) { + dout(7) << *pg << " not active (yet)" << dendl; + pg->waiting_for_active.push_back(op); + return false; + } + + if (pg->is_replay()) { + if (op->get_version().version > 0) { + dout(7) << *pg << " queueing replay at " << op->get_version() + << " for " << *op << dendl; + pg->replay_queue[op->get_version()] = op; + return false; + } + } + + return true; } +/* + * discard operation, or return true. no side-effects. + */ +bool OSD::subop_is_queueable(PG *pg, MOSDSubOp *op) +{ + assert(pg->is_locked()); + + // same pg? + // if pg changes _at all_, we reset and repeer! + if (op->map_epoch < pg->info.history.same_interval_since) { + dout(10) << "handle_sub_op pg changed " << pg->info.history + << " after " << op->map_epoch + << ", dropping" << dendl; + op->put(); + return false; + } + + return true; +} /* * enqueue called with osd_lock held @@ -5416,6 +5401,26 @@ void OSD::enqueue_op(PG *pg, Message *op) { dout(15) << *pg << " enqueue_op " << op << " " << *op << dendl; assert(pg->is_locked()); + + switch (op->get_type()) { + case CEPH_MSG_OSD_OP: + if (!op_is_queueable(pg, (MOSDOp*)op)) + return; + break; + + case MSG_OSD_SUBOP: + if (!subop_is_queueable(pg, (MOSDSubOp*)op)) + return; + break; + + case MSG_OSD_SUBOPREPLY: + // don't care. + break; + + default: + assert(0 == "enqueued an illegal message type"); + } + // add to pg's op_queue pg->op_queue.push_back(op); pending_ops++; @@ -5448,20 +5453,7 @@ void OSD::requeue_ops(PG *pg, list& ls) while (!q.empty()) { Message *op = q.front(); q.pop_front(); - - switch (op->get_type()) { - case CEPH_MSG_OSD_OP: - _handle_op(pg, (MOSDOp*)op); - break; - case MSG_OSD_SUBOP: - _handle_sub_op(pg, (MOSDSubOp*)op); - break; - case MSG_OSD_SUBOPREPLY: - _handle_sub_op_reply(pg, (MOSDSubOpReply*)op); - break; - default: - assert(0 == "requeued an illegal type"); - } + enqueue_op(pg, op); } // put orig queue contents back in line, after the stuff we requeued. diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 3d2191cbdc31..1cd083bd6505 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -1094,14 +1094,8 @@ public: private: /// check if op has sufficient caps bool op_has_sufficient_caps(PG *pg, class MOSDOp *m); - - /* - * these locked helpers assume pg is locked, and will do fina - * checks before enqueuing the given operation. - */ - void _handle_op(PG *pg, class MOSDOp *m); - void _handle_sub_op(PG *pg, class MOSDSubOp *m); - void _handle_sub_op_reply(PG *pg, class MOSDSubOpReply *m); + bool op_is_queueable(PG *pg, class MOSDOp *m); + bool subop_is_queueable(PG *pg, class MOSDSubOp *m); public: void force_remount();