op_queue_cond.Wait(osd_lock);
}
+ // require same or newer map
+ if (!require_same_or_newer_map(op, op->get_map_epoch()))
+ return;
+
+ // blacklisted?
+ if (osdmap->is_blacklisted(op->get_source_addr())) {
+ dout(4) << "handle_op " << op->get_source_addr() << " is blacklisted" << dendl;
+ reply_op_error(op, -EBLACKLISTED);
+ return;
+ }
+
+ // share our map with sender, if they're old
+ _share_map_incoming(op->get_source_inst(), op->get_map_epoch());
+
+
// calc actual pgid
pg_t pgid = osdmap->raw_pg_to_pg(op->get_pg());
// get and lock *pg.
PG *pg = _have_pg(pgid) ? _lookup_lock_pg(pgid):0;
+
logger->set("buf", buffer_total_alloc.test());
utime_t now = g_clock.now();
// modify
if ((pg->get_primary() != whoami ||
!pg->same_for_modify_since(op->get_map_epoch()))) {
- dout(7) << "handle_rep_op pg changed " << pg->info.history
+ dout(7) << "handle_op pg changed " << pg->info.history
<< " after " << op->get_map_epoch()
<< ", dropping" << dendl;
assert(op->get_map_epoch() < osdmap->get_epoch());
delete op;
return;
}
+
+ // scrubbing?
+ if (pg->state_test(PG_STATE_SCRUBBING)) {
+ dout(10) << *pg << " is scrubbing, deferring op " << *op << dendl;
+ pg->waiting_for_active.push_back(op);
+ pg->unlock();
+ return;
+ }
}
// pg must be active.
osd->map_lock.get_read();
lock();
+
+ epoch_t epoch = info.history.same_since;
+
if (!is_primary()) {
dout(10) << "scrub -- not primary" << dendl;
unlock();
// request maps from replicas
for (unsigned i=1; i<acting.size(); i++) {
- dout(10) << " requesting scrubmap from osd" << acting[i] << dendl;
+ dout(10) << "scrub requesting scrubmap from osd" << acting[i] << dendl;
osd->messenger->send_message(new MOSDPGScrub(info.pgid, osd->osdmap->get_epoch()),
osd->osdmap->get_inst(acting[i]));
}
osd->map_lock.put_read();
- dout(10) << " building my scrub map" << dendl;
+ // wait for any ops in progress
+ while (is_write_in_progress()) {
+ dout(10) << "scrub write(s) in progress, waiting" << dendl;
+ wait();
+ }
+
+ unlock();
+
+ dout(10) << "scrub building my map" << dendl;
ScrubMap scrubmap;
build_scrub_map(scrubmap);
+ lock();
+ if (epoch != info.history.same_since) {
+ dout(10) << "scrub pg changed, aborting" << dendl;
+ unlock();
+ return;
+ }
+
while (peer_scrub_map.size() < acting.size() - 1) {
dout(10) << " have " << (peer_scrub_map.size()+1) << " / " << acting.size()
<< " scrub maps, waiting" << dendl;
wait();
+
+ if (epoch != info.history.same_since) {
+ dout(10) << "scrub pg changed, aborting" << dendl;
+ unlock();
+ return;
+ }
}
+ unlock();
+
// first, compare scrub maps
vector<ScrubMap*> m(acting.size());
m[0] = &scrubmap;
}
}
-
if (ok)
dout(10) << "scrub " << po->poid << " size " << po->size << " ok" << dendl;
osd->get_logclient()->log(LOG_ERROR, s);
}
+ lock();
+ if (epoch != info.history.same_since) {
+ dout(10) << "scrub pg changed, aborting" << dendl;
+ unlock();
+ return;
+ }
+
// discard peer scrub info.
peer_scrub_map.clear();
+ unlock();
+
// ok, do the pg-type specific scrubbing
_scrub(scrubmap);
-
+
+ lock();
+ if (epoch != info.history.same_since) {
+ dout(10) << "scrub pg changed, aborting" << dendl;
+ unlock();
+ return;
+ }
+
+ // finish up
info.stats.last_scrub = info.last_update;
info.stats.last_scrub_stamp = g_clock.now();
state_clear(PG_STATE_SCRUBBING);