// we don't need encoded payload anymore
op->clear_payload();
-
- // pg must be active.
- if (pg->is_replay()) {
- if (op->get_version().version > 0) {
- dout(7) << *pg << " queueing replay at " << op->get_version()
- << " for " << *op << dendl;
- pg->replay_queue[op->get_version()] = op;
+ if (op->may_write()) {
+ // misdirected?
+ if (!pg->is_primary() ||
+ !pg->same_for_modify_since(op->get_map_epoch())) {
+ handle_misdirected_op(pg, op);
pg->unlock();
return;
}
- }
-
- if (!pg->is_active()) {
- dout(7) << *pg << " not active (yet)" << dendl;
- pg->waiting_for_active.push_back(op);
- pg->unlock();
- return;
- }
- // pg must be same-ish...
- if (op->may_write()) {
// full?
if (osdmap->test_flag(CEPH_OSDMAP_FULL) &&
!op->get_source().is_mds()) { // FIXME: we'll exclude mds writes for now.
return;
}
+ // invalid?
if (op->get_snapid() != CEPH_NOSNAP) {
reply_op_error(op, -EINVAL);
pg->unlock();
return;
}
- // modify
- if (!pg->is_primary() ||
- !pg->same_for_modify_since(op->get_map_epoch())) {
- handle_misdirected_op(pg, op);
- pg->unlock();
- return;
- }
-
+ // too big?
if (g_conf.osd_max_write_size &&
op->get_data_len() > g_conf.osd_max_write_size << 20) {
// journal can't hold commit!
pg->unlock();
return;
}
-
} else {
- // read
+ // misdirected?
if (!pg->same_for_read_since(op->get_map_epoch())) {
handle_misdirected_op(pg, op);
pg->unlock();
}
}
+ // pg must be active.
+ if (!pg->is_active()) {
+ dout(7) << *pg << " not active (yet)" << dendl;
+ pg->waiting_for_active.push_back(op);
+ pg->unlock();
+ return;
+ }
+ if (pg->is_replay()) {
+ if (op->get_version().version > 0) {
+ dout(7) << *pg << " queueing replay at " << op->get_version()
+ << " for " << *op << dendl;
+ pg->replay_queue[op->get_version()] = op;
+ pg->unlock();
+ return;
+ }
+ }
+
if ((op->get_flags() & CEPH_OSD_FLAG_PGOP) == 0) {
// missing object?
sobject_t head(op->get_oid(), CEPH_NOSNAP);