return;
}
+ // we don't need encoded payload anymore
+ op->clear_payload();
+
// require same or newer map
if (!require_same_or_newer_map(op, op->get_map_epoch()))
return;
return;
}
+ // full?
+ if (osdmap->test_flag(CEPH_OSDMAP_FULL) &&
+ !op->get_source().is_mds()) { // FIXME: we'll exclude mds writes for now.
+ reply_op_error(op, -ENOSPC);
+ return;
+ }
+
+ // invalid?
+ if (op->get_snapid() != CEPH_NOSNAP) {
+ reply_op_error(op, -EINVAL);
+ return;
+ }
+
+ // too big?
+ if (g_conf->osd_max_write_size &&
+ op->get_data_len() > g_conf->osd_max_write_size << 20) {
+ // journal can't hold commit!
+ reply_op_error(op, -OSD_WRITETOOBIG);
+ return;
+ }
+
// share our map with sender, if they're old
_share_map_incoming(op->get_source_inst(), op->get_map_epoch(),
(Session *)op->get_connection()->get_priv());
}
pg->get();
- _handle_op(pg, op);
+ enqueue_op(pg, op);
pg->unlock();
pg->put();
}
return true;
}
-/*
- * queue an operation, or discard it. avoid side-effects or any "real" work.
- */
-void OSD::_handle_op(PG *pg, MOSDOp *op)
-{
- dout(10) << *pg << " _handle_op " << op << " " << *op << dendl;
- assert(pg->is_locked());
-
- if (!op_has_sufficient_caps(pg, op)) {
- reply_op_error(op, -EPERM);
- return;
- }
-
- // we don't need encoded payload anymore
- op->clear_payload();
-
- if (op->may_write()) {
- // misdirected?
- if (!pg->is_primary() ||
- !pg->same_for_modify_since(op->get_map_epoch())) {
- handle_misdirected_op(pg, op);
- return;
- }
-
- // full?
- if (osdmap->test_flag(CEPH_OSDMAP_FULL) &&
- !op->get_source().is_mds()) { // FIXME: we'll exclude mds writes for now.
- reply_op_error(op, -ENOSPC);
- return;
- }
-
- // invalid?
- if (op->get_snapid() != CEPH_NOSNAP) {
- reply_op_error(op, -EINVAL);
- return;
- }
-
- // too big?
- if (g_conf->osd_max_write_size &&
- op->get_data_len() > g_conf->osd_max_write_size << 20) {
- // journal can't hold commit!
- reply_op_error(op, -OSD_WRITETOOBIG);
- return;
- }
- } else {
- // misdirected?
- if (!pg->same_for_read_since(op->get_map_epoch())) {
- handle_misdirected_op(pg, op);
- return;
- }
- }
-
- // pg must be active.
- if (!pg->is_active()) {
- dout(7) << *pg << " not active (yet)" << dendl;
- pg->waiting_for_active.push_back(op);
- return;
- }
- if (pg->is_replay()) {
- if (op->get_version().version > 0) {
- dout(7) << *pg << " queueing replay at " << op->get_version()
- << " for " << *op << dendl;
- pg->replay_queue[op->get_version()] = op;
- return;
- }
- }
-
- enqueue_op(pg, op);
-}
-
-
void OSD::handle_sub_op(MOSDSubOp *op)
{
dout(10) << "handle_sub_op " << *op << " epoch " << op->map_epoch << dendl;
return;
}
pg->get();
- _handle_sub_op(pg, op);
+ enqueue_op(pg, op);
pg->unlock();
pg->put();
}
-/*
- * queue an operation, or discard it. avoid side-effects or any "real" work.
- */
-void OSD::_handle_sub_op(PG *pg, MOSDSubOp *op)
-{
- dout(10) << *pg << " _handle_sub_op " << op << " " << *op << dendl;
- assert(pg->is_locked());
-
- // same pg?
- // if pg changes _at all_, we reset and repeer!
- if (op->map_epoch < pg->info.history.same_interval_since) {
- dout(10) << "handle_sub_op pg changed " << pg->info.history
- << " after " << op->map_epoch
- << ", dropping" << dendl;
- op->put();
- return;
- }
-
- enqueue_op(pg, op); // queue for worker threads
-}
-
void OSD::handle_sub_op_reply(MOSDSubOpReply *op)
{
if (op->get_map_epoch() < up_epoch) {
return;
}
pg->get();
- _handle_sub_op_reply(pg, op);
+ enqueue_op(pg, op);
pg->unlock();
pg->put();
}
/*
- * queue an operation, or discard it. avoid side-effects or any "real" work.
+ * discard operation, or return true. no side-effects.
*/
-void OSD::_handle_sub_op_reply(PG *pg, MOSDSubOpReply *op)
+bool OSD::op_is_queueable(PG *pg, MOSDOp *op)
{
- dout(10) << *pg << " _handle_sub_op_reply " << op << " " << *op << dendl;
assert(pg->is_locked());
- enqueue_op(pg, op); // queue for worker threads
+
+ if (!op_has_sufficient_caps(pg, op)) {
+ reply_op_error(op, -EPERM);
+ return false;
+ }
+
+ // misdirected?
+ if (op->may_write()) {
+ if (!pg->is_primary() ||
+ !pg->same_for_modify_since(op->get_map_epoch())) {
+ handle_misdirected_op(pg, op);
+ return false;
+ }
+ } else {
+ if (!pg->same_for_read_since(op->get_map_epoch())) {
+ handle_misdirected_op(pg, op);
+ return false;
+ }
+ }
+
+ if (!pg->is_active()) {
+ dout(7) << *pg << " not active (yet)" << dendl;
+ pg->waiting_for_active.push_back(op);
+ return false;
+ }
+
+ if (pg->is_replay()) {
+ if (op->get_version().version > 0) {
+ dout(7) << *pg << " queueing replay at " << op->get_version()
+ << " for " << *op << dendl;
+ pg->replay_queue[op->get_version()] = op;
+ return false;
+ }
+ }
+
+ return true;
}
+/*
+ * discard operation, or return true. no side-effects.
+ */
+bool OSD::subop_is_queueable(PG *pg, MOSDSubOp *op)
+{
+ assert(pg->is_locked());
+
+ // same pg?
+ // if pg changes _at all_, we reset and repeer!
+ if (op->map_epoch < pg->info.history.same_interval_since) {
+ dout(10) << "handle_sub_op pg changed " << pg->info.history
+ << " after " << op->map_epoch
+ << ", dropping" << dendl;
+ op->put();
+ return false;
+ }
+
+ return true;
+}
/*
* enqueue called with osd_lock held
{
dout(15) << *pg << " enqueue_op " << op << " " << *op << dendl;
assert(pg->is_locked());
+
+ switch (op->get_type()) {
+ case CEPH_MSG_OSD_OP:
+ if (!op_is_queueable(pg, (MOSDOp*)op))
+ return;
+ break;
+
+ case MSG_OSD_SUBOP:
+ if (!subop_is_queueable(pg, (MOSDSubOp*)op))
+ return;
+ break;
+
+ case MSG_OSD_SUBOPREPLY:
+ // don't care.
+ break;
+
+ default:
+ assert(0 == "enqueued an illegal message type");
+ }
+
// add to pg's op_queue
pg->op_queue.push_back(op);
pending_ops++;
while (!q.empty()) {
Message *op = q.front();
q.pop_front();
-
- switch (op->get_type()) {
- case CEPH_MSG_OSD_OP:
- _handle_op(pg, (MOSDOp*)op);
- break;
- case MSG_OSD_SUBOP:
- _handle_sub_op(pg, (MOSDSubOp*)op);
- break;
- case MSG_OSD_SUBOPREPLY:
- _handle_sub_op_reply(pg, (MOSDSubOpReply*)op);
- break;
- default:
- assert(0 == "requeued an illegal type");
- }
+ enqueue_op(pg, op);
}
// put orig queue contents back in line, after the stuff we requeued.