return;
}
- // pg must be same-ish...
- if (!op->may_write()) {
- // read
- if (!pg->same_for_read_since(op->get_map_epoch())) {
- dout(7) << "handle_rep_op pg changed " << pg->info.history
- << " after " << op->get_map_epoch()
- << ", dropping" << dendl;
- assert(op->get_map_epoch() < osdmap->get_epoch());
- pg->unlock();
- delete op;
- return;
- }
-
- if (op->get_snapid() > 0) {
- // snap read. hrm.
- // are we missing a revision that we might need?
- // let's get them all.
- for (unsigned i=0; i<op->get_snaps().size(); i++) {
- sobject_t soid(op->get_oid(), op->get_snaps()[i]);
- if (pg->is_missing_object(soid)) {
- dout(10) << "handle_op _may_ need missing rev " << soid << ", pulling" << dendl;
- pg->wait_for_missing_object(soid, op);
- pg->unlock();
- return;
- }
+ // pg must be active.
+ if (!pg->is_active()) {
+ // replay?
+ if (op->get_version().version > 0) {
+ if (op->get_version() > pg->info.last_update) {
+ dout(7) << *pg << " queueing replay at " << op->get_version()
+ << " for " << *op << dendl;
+ pg->replay_queue[op->get_version()] = op;
+ pg->unlock();
+ return;
+ } else {
+ dout(7) << *pg << " replay at " << op->get_version() << " <= " << pg->info.last_update
+ << " for " << *op
+ << ", will queue for WRNOOP" << dendl;
}
}
- } else {
+ dout(7) << *pg << " not active (yet)" << dendl;
+ pg->waiting_for_active.push_back(op);
+ pg->unlock();
+ return;
+ }
+
+ // pg must be same-ish...
+ if (op->may_write()) {
+ if (op->get_snapid() != CEPH_NOSNAP) {
+ reply_op_error(op, -EINVAL);
+ return;
+ }
+
// modify
if ((!pg->is_primary() ||
!pg->same_for_modify_since(op->get_map_epoch()))) {
pg->unlock();
return;
}
- }
-
- // pg must be active.
- if (!pg->is_active()) {
- // replay?
- if (op->get_version().version > 0) {
- if (op->get_version() > pg->info.last_update) {
- dout(7) << *pg << " queueing replay at " << op->get_version()
- << " for " << *op << dendl;
- pg->replay_queue[op->get_version()] = op;
- pg->unlock();
- return;
- } else {
- dout(7) << *pg << " replay at " << op->get_version() << " <= " << pg->info.last_update
- << " for " << *op
- << ", will queue for WRNOOP" << dendl;
+ } else {
+ // read
+ if (!pg->same_for_read_since(op->get_map_epoch())) {
+ dout(7) << "handle_op pg changed " << pg->info.history
+ << " after " << op->get_map_epoch()
+ << ", dropping" << dendl;
+ assert(op->get_map_epoch() < osdmap->get_epoch());
+ pg->unlock();
+ delete op;
+ return;
+ }
+
+ if (op->get_snapid() > 0) {
+ // snap read. hrm.
+ // are we missing a revision that we might need?
+ // let's get them all.
+ for (unsigned i=0; i<op->get_snaps().size(); i++) {
+ sobject_t soid(op->get_oid(), op->get_snaps()[i]);
+ if (pg->is_missing_object(soid)) {
+ dout(10) << "handle_op _may_ need missing rev " << soid << ", pulling" << dendl;
+ pg->wait_for_missing_object(soid, op);
+ pg->unlock();
+ return;
+ }
}
}
-
- dout(7) << *pg << " not active (yet)" << dendl;
- pg->waiting_for_active.push_back(op);
- pg->unlock();
- return;
}
-
+
// missing object?
sobject_t head(op->get_oid(), CEPH_NOSNAP);
if (pg->is_missing_object(head)) {
pg->unlock();
return;
}
+
dout(10) << "handle_op " << *op << " in " << *pg << dendl;
+ /* turn this off for now.
// proprocess op?
if (pg->preprocess_op(op, now)) {
pg->unlock();
return;
}
+ */
if (!op->may_write()) {
Mutex::Locker lock(peer_stat_lock);