scrub_reserved(false), scrub_reserve_failed(false),
scrub_waiting_on(0),
active_rep_scrub(0),
+ scrub_is_chunky(false),
+ scrub_errors(0),
+ scrub_fixed(0),
+ active_pushes(0),
recovery_state(this)
{
}
dout(10) << " got osd." << from << " scrub map" << dendl;
bufferlist::iterator p = m->get_data().begin();
- if (scrub_received_maps.count(from)) {
- ScrubMap incoming;
- incoming.decode(p, info.pgid.pool());
- dout(10) << "from replica " << from << dendl;
- dout(10) << "map version is " << incoming.valid_through << dendl;
- scrub_received_maps[from].merge_incr(incoming);
- } else {
+
+ if (scrub_is_chunky) { // chunky scrub
scrub_received_maps[from].decode(p, info.pgid.pool());
+ dout(10) << "map version is " << scrub_received_maps[from].valid_through << dendl;
+ } else { // classic scrub
+ if (scrub_received_maps.count(from)) {
+ ScrubMap incoming;
+ incoming.decode(p, info.pgid.pool());
+ dout(10) << "from replica " << from << dendl;
+ dout(10) << "map version is " << incoming.valid_through << dendl;
+ scrub_received_maps[from].merge_incr(incoming);
+ } else {
+ scrub_received_maps[from].decode(p, info.pgid.pool());
+ }
}
--scrub_waiting_on;
scrub_waiting_on_whom.erase(from);
+
if (scrub_waiting_on == 0) {
- if (finalizing_scrub) { // incremental lists received
- osd->scrub_finalize_wq.queue(this);
- } else { // initial lists received
- scrub_block_writes = true;
- if (last_update_applied == info.last_update) {
- finalizing_scrub = true;
- scrub_gather_replica_maps();
- ++scrub_waiting_on;
- scrub_waiting_on_whom.insert(osd->whoami);
- osd->scrub_wq.queue(this);
+ if (scrub_is_chunky) { // chunky scrub
+ osd->scrub_wq.queue(this);
+ } else { // classic scrub
+ if (finalizing_scrub) { // incremental lists received
+ osd->scrub_finalize_wq.queue(this);
+ } else { // initial lists received
+ scrub_block_writes = true;
+ if (last_update_applied == info.last_update) {
+ finalizing_scrub = true;
+ scrub_gather_replica_maps();
+ ++scrub_waiting_on;
+ scrub_waiting_on_whom.insert(osd->whoami);
+ osd->scrub_wq.queue(this);
+ }
}
}
}
}
}
-void PG::_request_scrub_map(int replica, eversion_t version)
+// send scrub v2-compatible messages (classic scrub)
+void PG::_request_scrub_map_classic(int replica, eversion_t version)
{
assert(replica != osd->whoami);
dout(10) << "scrub requesting scrubmap from osd." << replica << dendl;
get_osdmap()->get_cluster_inst(replica));
}
+// send scrub v3 messages (chunky scrub)
+void PG::_request_scrub_map(int replica, eversion_t version, hobject_t start, hobject_t end)
+{
+ assert(replica != osd->whoami);
+ dout(10) << "scrub requesting scrubmap from osd." << replica << dendl;
+ MOSDRepScrub *repscrubop = new MOSDRepScrub(info.pgid, version,
+ get_osdmap()->get_epoch(),
+ start, end);
+ osd->cluster_messenger->send_message(repscrubop,
+ get_osdmap()->get_cluster_inst(replica));
+}
+
void PG::sub_op_scrub_reserve(OpRequestRef op)
{
MOSDSubOp *m = (MOSDSubOp*)op->request;
}
}
+/*
+ * build a scrub map over a chunk without releasing the lock
+ * only used by chunky scrub
+ */
+int PG::build_scrub_map_chunk(ScrubMap &map, hobject_t start, hobject_t end)
+{
+ dout(10) << "build_scrub_map" << dendl;
+ dout(20) << "scrub_map_chunk [" << start << "," << end << ")" << dendl;
+
+ map.valid_through = info.last_update;
+
+ // objects
+ vector<hobject_t> ls;
+ int ret = osd->store->collection_list_range(coll, start, end, 0, &ls);
+ if (ret < 0) {
+ dout(5) << "collection_list_range error: " << ret << dendl;
+ return ret;
+ }
+
+ _scan_list(map, ls);
+
+ // pg attrs
+ osd->store->collection_getattrs(coll, map.attrs);
+
+ // log
+ osd->store->read(coll_t(), log_oid, 0, 0, map.logbl);
+ dout(10) << " done. pg log is " << map.logbl.length() << " bytes" << dendl;
+
+ return 0;
+}
+
/*
* build a (sorted) summary of pg content for purposes of scrubbing
* called while holding pg lock
}
/* replica_scrub
+ *
+ * Classic behavior:
*
* If msg->scrub_from is not set, replica_scrub calls build_scrubmap to
* build a complete map (with the pg lock dropped).
* replica_scrub returns to be requeued by sub_op_modify_applied.
* replica_scrub then builds an incremental scrub map with the
* pg lock held.
+ *
+ * Chunky behavior:
+ *
+ * Wait for last_update_applied to match msg->scrub_to as above. Wait
+ * for pushes to complete in case of recent recovery. Build a single
+ * scrubmap of objects that are in the range [msg->start, msg->end).
*/
void PG::replica_scrub(MOSDRepScrub *msg)
{
}
ScrubMap map;
- if (msg->scrub_from > eversion_t()) {
- if (finalizing_scrub) {
- assert(last_update_applied == info.last_update);
- assert(last_update_applied == msg->scrub_to);
- } else {
- finalizing_scrub = 1;
- if (last_update_applied != msg->scrub_to) {
- active_rep_scrub = msg;
- msg->get();
- return;
- }
+
+ if (msg->chunky) { // chunky scrub
+ if (last_update_applied < msg->scrub_to) {
+ dout(10) << "waiting for last_update_applied to catch up" << dendl;
+ active_rep_scrub = msg;
+ msg->get();
+ return;
}
- build_inc_scrub_map(map, msg->scrub_from);
- finalizing_scrub = 0;
+
+ if (active_pushes > 0) {
+ dout(10) << "waiting for active pushes to finish" << dendl;
+ active_rep_scrub = msg;
+ msg->get();
+ return;
+ }
+
+ build_scrub_map_chunk(map, msg->start, msg->end);
+
} else {
- build_scrub_map(map);
- }
+ if (msg->scrub_from > eversion_t()) {
+ if (finalizing_scrub) {
+ assert(last_update_applied == info.last_update);
+ assert(last_update_applied == msg->scrub_to);
+ } else {
+ finalizing_scrub = 1;
+ if (last_update_applied != msg->scrub_to) {
+ active_rep_scrub = msg;
+ msg->get();
+ return;
+ }
+ }
+ build_inc_scrub_map(map, msg->scrub_from);
+ finalizing_scrub = 0;
+ } else {
+ build_scrub_map(map);
+ }
- if (msg->map_epoch < info.history.same_interval_since) {
- dout(10) << "scrub pg changed, aborting" << dendl;
- return;
+ if (msg->map_epoch < info.history.same_interval_since) {
+ dout(10) << "scrub pg changed, aborting" << dendl;
+ return;
+ }
}
vector<OSDOp> scrub(1);
/* Scrub:
* PG_STATE_SCRUBBING is set when the scrub is queued
*
- * Once the initial scrub has completed and the requests have gone out to
- * replicas for maps, we set scrub_active and wait for the replicas to
- * complete their maps. Once the maps are received, scrub_block_writes is set.
- * scrub_waiting_on is set to the number of maps outstanding (active.size()).
- *
- * If last_update_applied is behind the head of the log, scrub returns to be
- * requeued by op_applied.
- *
- * Once last_update_applied == info.last_update, scrub catches itself up and
- * decrements scrub_waiting_on.
- *
- * sub_op_scrub_map similarly decrements scrub_waiting_on for each map received.
- *
- * Once scrub_waiting_on hits 0 (either in scrub or sub_op_scrub_map)
- * scrub_finalize is queued.
- *
- * In scrub_finalize, if any replica maps are too old, new ones are requested,
- * scrub_waiting_on is reset, and scrub_finalize returns to be requeued by
- * sub_op_scrub_map. If all maps are up to date, scrub_finalize checks
- * the maps and performs repairs.
+ * scrub will be chunky if all OSDs in PG support chunky scrub
+ * scrub will fall back to classic in any other case
*/
void PG::scrub()
{
-
lock();
if (deleting) {
unlock();
return;
}
+ // when we're starting a scrub, we need to determine which type of scrub to do
+ if (!scrub_active) {
+ OSDMapRef curmap = osd->get_osdmap();
+ scrub_is_chunky = true;
+ for (unsigned i=1; i<acting.size(); i++) {
+ Connection *con = osd->cluster_messenger->get_connection(curmap->get_cluster_inst(acting[i]));
+ if (!(con->features & CEPH_FEATURE_CHUNKY_SCRUB)) {
+ dout(20) << "OSD " << acting[i]
+ << " does not support chunky scrubs, falling back to classic"
+ << dendl;
+ scrub_is_chunky = false;
+ break;
+ }
+ }
+
+ dout(10) << "starting a new " << (scrub_is_chunky ? "chunky" : "classic") << " scrub" << dendl;
+ }
+
+ if (scrub_is_chunky) {
+ chunky_scrub();
+ } else {
+ classic_scrub();
+ }
+
+ unlock();
+}
+
+/*
+ * Classic scrub is a two stage scrub: an initial scrub with writes enabled
+ * followed by a finalize with writes blocked.
+ *
+ * A request is sent out to all replicas for initial scrub maps. Once they reply
+ * (sub_op_scrub_map) writes are blocked for all objects in the PG.
+ *
+ * Finalize: Primaries and replicas wait for all writes in the log to be applied
+ * (op_applied), then builds an incremental scrub of all the changes since the
+ * beginning of the scrub.
+ *
+ * Once the primary has received all maps, it compares them and performs
+ * repairs.
+ *
+ * The initial stage of the scrub is handled by scrub_wq and the final stage by
+ * scrub_finalize_wq.
+ *
+ * Relevant variables:
+ *
+ * scrub_waiting_on (int)
+ * scrub_waiting_on_whom
+ * Number of people who still need to build an initial/incremental scrub map.
+ * This is decremented in sub_op_scrub_map.
+ *
+ * last_update_applied
+ * The last update that's hit the disk. In the finalize stage, we block
+ * writes and wait for all writes to flush by checking:
+ *
+ * last_update_appied == info.last_update
+ *
+ * This is checked in op_applied.
+ *
+ * scrub_block_writes
+ * Flag to determine if writes are blocked.
+ *
+ * finalizing scrub
+ * Flag set when we're in the finalize stage.
+ *
+ */
+void PG::classic_scrub()
+{
if (!scrub_active) {
dout(10) << "scrub start" << dendl;
scrub_active = true;
// request maps from replicas
for (unsigned i=1; i<acting.size(); i++) {
- _request_scrub_map(acting[i], eversion_t());
+ _request_scrub_map_classic(acting[i], eversion_t());
}
// Unlocks and relocks...
dout(10) << "scrub pg changed, aborting" << dendl;
scrub_clear_state();
scrub_unreserve_replicas();
- unlock();
return;
}
scrub_block_writes = true;
} else {
dout(10) << "wait for replicas to build initial scrub map" << dendl;
- unlock();
return;
}
if (last_update_applied != info.last_update) {
dout(10) << "wait for cleanup" << dendl;
- unlock();
return;
}
dout(10) << "scrub pg changed, aborting" << dendl;
scrub_clear_state();
scrub_unreserve_replicas();
- unlock();
return;
}
assert(last_update_applied == info.last_update);
osd->scrub_finalize_wq.queue(this);
}
-
- unlock();
+}
+
+/*
+ * Chunky scrub scrubs objects one chunk at a time with writes blocked for that
+ * chunk.
+ *
+ * The object store is partitioned into chunks which end on hash boundaries. For
+ * each chunk, the following logic is performed:
+ *
+ * (1) Block writes on the chunk
+ * (2) Request maps from replicas
+ * (3) Wait for pushes to be applied (after recovery)
+ * (4) Wait for writes to flush on the chunk
+ * (5) Wait for maps from replicas
+ * (6) Compare / repair all scrub maps
+ *
+ * This logic is encoded in the very linear state machine:
+ *
+ * +---------------------+
+ * ____________v_____________ |
+ * | | |
+ * | SCRUB_INACTIVE | |
+ * |__________________________| |
+ * | |
+ * | +-------------+ |
+ * ____________v___v_________ | |
+ * | | | |
+ * | SCRUB_NEW_CHUNK | | |
+ * |__________________________| | |
+ * | | |
+ * ___________v_____________ | |
+ * | | | |
+ * | SCRUB_WAIT_PUSHES | | |
+ * |_________________________| | |
+ * | | |
+ * ____________v_____________ | |
+ * | | | |
+ * | SCRUB_WAIT_LAST_UPDATE | | |
+ * |__________________________| | |
+ * | | |
+ * ____________v_____________ | |
+ * | | | |
+ * | SCRUB_BUILD_MAP | | |
+ * |__________________________| | |
+ * | | |
+ * ____________v_____________ | |
+ * | | | |
+ * | SCRUB_WAIT_REPLICAS | | |
+ * |__________________________| | |
+ * | | |
+ * ____________v_____________ | |
+ * | | | |
+ * | SCRUB_COMPARE_MAPS | | |
+ * |__________________________| | |
+ * | | | |
+ * | +-------------+ |
+ * ____________v_____________ |
+ * | | |
+ * | SCRUB_FINISH | |
+ * |__________________________| |
+ * | |
+ * +---------------------+
+ *
+ * The primary determines the last update from the subset by walking the log. If
+ * it sees a log entry pertaining to a file in the chunk, it tells the replicas
+ * to wait until that update is applied before building a scrub map. Both the
+ * primary and replicas will wait for any active pushes to be applied.
+ *
+ * In contrast to classic_scrub, chunky_scrub is entirely handled by scrub_wq.
+ *
+ * scrub_state encodes the current state of the scrub (refer to state diagram
+ * for details).
+ */
+void PG::chunky_scrub() {
+ // check for map changes
+ if (is_chunky_scrub_active()) {
+ if (scrub_epoch_start != info.history.same_interval_since) {
+ dout(10) << "scrub pg changed, aborting" << dendl;
+ scrub_clear_state();
+ scrub_unreserve_replicas();
+ return;
+ }
+ }
+
+ bool done = false;
+ int ret;
+
+ while (!done) {
+ dout(20) << "scrub state " << scrub_string(scrub_state) << dendl;
+
+ switch (scrub_state) {
+ case SCRUB_INACTIVE:
+ dout(10) << "scrub start" << dendl;
+
+ update_stats();
+ scrub_epoch_start = info.history.same_interval_since;
+ scrub_active = true;
+
+ osd->sched_scrub_lock.Lock();
+ if (scrub_reserved) {
+ --(osd->scrubs_pending);
+ assert(osd->scrubs_pending >= 0);
+ scrub_reserved = false;
+ scrub_reserved_peers.clear();
+ }
+ ++(osd->scrubs_active);
+ osd->sched_scrub_lock.Unlock();
+
+ scrub_start = hobject_t();
+ scrub_state = SCRUB_NEW_CHUNK;
+
+ break;
+
+ case SCRUB_NEW_CHUNK:
+ primary_scrubmap = ScrubMap();
+ scrub_received_maps.clear();
+
+ {
+
+ // get the start and end of our scrub chunk
+ //
+ // start and end need to lie on a hash boundary. We test for this by
+ // requesting a list and searching backward from the end looking for a
+ // boundary. If there's no boundary, we request a list after the first
+ // list, and so forth.
+
+ bool boundary_found = false;
+ hobject_t start = scrub_start;
+ while (!boundary_found) {
+ vector<hobject_t> objects;
+ ret = osd->store->collection_list_partial(coll, start,
+ 5, 5, 0,
+ &objects, &scrub_end);
+ assert(ret >= 0);
+
+ // in case we don't find a boundary: start again at the end
+ start = scrub_end;
+
+ // special case: reached end of file store, implicitly a boundary
+ if (objects.size() == 0) {
+ break;
+ }
+
+ // search backward from the end looking for a boundary
+ objects.push_back(scrub_end);
+ while (!boundary_found && objects.size() > 1) {
+ hobject_t end = objects.back();
+ objects.pop_back();
+
+ if (objects.back().get_filestore_key() != end.get_filestore_key()) {
+ scrub_end = end;
+ boundary_found = true;
+ }
+ }
+ }
+ }
+
+ scrub_block_writes = true;
+
+ // walk the log to find the latest update that affects our chunk
+ scrub_subset_last_update = eversion_t();
+ for (list<pg_log_entry_t>::iterator p = log.log.begin();
+ p != log.log.end();
+ ++p) {
+ if (p->soid >= scrub_start && p->soid < scrub_end)
+ scrub_subset_last_update = p->version;
+ }
+
+ // ask replicas to wait until last_update_applied >= scrub_subset_last_update and then scan
+ scrub_waiting_on_whom.insert(osd->whoami);
+ ++scrub_waiting_on;
+
+ // request maps from replicas
+ for (unsigned i=1; i<acting.size(); i++) {
+ _request_scrub_map(acting[i], scrub_subset_last_update,
+ scrub_start, scrub_end);
+ scrub_waiting_on_whom.insert(acting[i]);
+ ++scrub_waiting_on;
+ }
+
+ scrub_state = SCRUB_WAIT_PUSHES;
+
+ break;
+
+ case SCRUB_WAIT_PUSHES:
+ if (active_pushes == 0) {
+ scrub_state = SCRUB_WAIT_LAST_UPDATE;
+ } else {
+ dout(15) << "wait for pushes to apply" << dendl;
+ done = true;
+ }
+ break;
+
+ case SCRUB_WAIT_LAST_UPDATE:
+ if (last_update_applied >= scrub_subset_last_update) {
+ scrub_state = SCRUB_BUILD_MAP;
+ } else {
+ // will be requeued by op_applied
+ dout(15) << "wait for writes to flush" << dendl;
+ done = true;
+ }
+ break;
+
+ case SCRUB_BUILD_MAP:
+ assert(last_update_applied >= scrub_subset_last_update);
+
+ // build my own scrub map
+ ret = build_scrub_map_chunk(primary_scrubmap, scrub_start, scrub_end);
+ if (ret < 0) {
+ dout(5) << "error building scrub map: " << ret << ", aborting" << dendl;
+ scrub_clear_state();
+ scrub_unreserve_replicas();
+ return;
+ }
+
+ --scrub_waiting_on;
+ scrub_waiting_on_whom.erase(osd->whoami);
+
+ scrub_state = SCRUB_WAIT_REPLICAS;
+ break;
+
+ case SCRUB_WAIT_REPLICAS:
+ if (scrub_waiting_on > 0) {
+ // will be requeued by sub_op_scrub_map
+ dout(10) << "wait for replicas to build scrub map" << dendl;
+ done = true;
+ } else {
+ scrub_state = SCRUB_COMPARE_MAPS;
+ }
+ break;
+
+ case SCRUB_COMPARE_MAPS:
+ assert(last_update_applied >= scrub_subset_last_update);
+ assert(scrub_waiting_on == 0);
+
+ scrub_compare_maps();
+ scrub_block_writes = false;
+
+ // requeue the writes from the chunk that just finished
+ requeue_ops(waiting_for_active);
+
+ if (scrub_end < hobject_t::get_max()) {
+ // schedule another leg of the scrub
+ scrub_start = scrub_end;
+
+ scrub_state = SCRUB_NEW_CHUNK;
+ osd->scrub_wq.queue(this);
+ done = true;
+ } else {
+ scrub_state = SCRUB_FINISH;
+ }
+
+ break;
+
+ case SCRUB_FINISH:
+ scrub_finish();
+ scrub_state = SCRUB_INACTIVE;
+ done = true;
+
+ break;
+
+ default:
+ assert(0);
+ }
+ }
}
void PG::scrub_clear_state()
active_rep_scrub = NULL;
}
scrub_received_maps.clear();
+
+ scrub_state = SCRUB_INACTIVE;
+ scrub_start = hobject_t();
+ scrub_end = hobject_t();
+ scrub_subset_last_update = eversion_t();
+ scrub_errors = 0;
+ scrub_fixed = 0;
+
+ // type-specific state clear
+ _scrub_clear_state();
}
bool PG::scrub_gather_replica_maps() {
scrub_waiting_on++;
scrub_waiting_on_whom.insert(p->first);
// Need to request another incremental map
- _request_scrub_map(p->first, p->second.valid_through);
+ _request_scrub_map_classic(p->first, p->second.valid_through);
}
}
}
}
-void PG::scrub_finalize() {
- lock();
- if (deleting) {
- unlock();
- return;
- }
-
- assert(last_update_applied == info.last_update);
-
- if (scrub_epoch_start != info.history.same_interval_since) {
- dout(10) << "scrub pg changed, aborting" << dendl;
- scrub_clear_state();
- scrub_unreserve_replicas();
- unlock();
- return;
- }
-
- if (!scrub_gather_replica_maps()) {
- dout(10) << "maps not yet up to date, sent out new requests" << dendl;
- unlock();
- return;
- }
-
- dout(10) << "scrub_finalize has maps, analyzing" << dendl;
- int errors = 0, fixed = 0;
+void PG::scrub_compare_maps() {
+ dout(10) << "scrub_compare_maps has maps, analyzing" << dendl;
bool repair = state_test(PG_STATE_REPAIR);
const char *mode = repair ? "repair":"scrub";
if (acting.size() > 1) {
}
// ok, do the pg-type specific scrubbing
- _scrub(primary_scrubmap, &errors, &fixed);
+ _scrub(primary_scrubmap);
+}
+
+void PG::scrub_finalize() {
+ lock();
+ if (deleting) {
+ unlock();
+ return;
+ }
+
+ assert(last_update_applied == info.last_update);
+
+ if (scrub_epoch_start != info.history.same_interval_since) {
+ dout(10) << "scrub pg changed, aborting" << dendl;
+ scrub_clear_state();
+ scrub_unreserve_replicas();
+ unlock();
+ return;
+ }
+
+ if (!scrub_gather_replica_maps()) {
+ dout(10) << "maps not yet up to date, sent out new requests" << dendl;
+ unlock();
+ return;
+ }
+
+ scrub_compare_maps();
+
+ scrub_finish();
+
+ dout(10) << "scrub done" << dendl;
+ unlock();
+}
+
+// the part that actually finalizes a scrub
+void PG::scrub_finish() {
+ bool repair = state_test(PG_STATE_REPAIR);
+ const char *mode = repair ? "repair":"scrub";
+
+ // type-specific finish (can tally more errors)
+ _scrub_finish();
+
+ if (scrub_errors == 0 || (repair && (scrub_errors - scrub_fixed) == 0))
+ state_clear(PG_STATE_INCONSISTENT);
{
stringstream oss;
oss << info.pgid << " " << mode << " ";
- if (errors)
- oss << errors << " errors";
+ if (scrub_errors)
+ oss << scrub_errors << " errors";
else
oss << "ok";
if (repair)
- oss << ", " << fixed << " fixed";
+ oss << ", " << scrub_fixed << " fixed";
oss << "\n";
- if (errors)
+ if (scrub_errors)
osd->clog.error(oss);
else
osd->clog.info(oss);
}
- if (errors == 0 || (repair && (errors - fixed) == 0))
- state_clear(PG_STATE_INCONSISTENT);
-
// finish up
osd->unreg_last_pg_scrub(info.pgid, info.history.last_scrub_stamp);
info.history.last_scrub = info.last_update;
if (is_active() && is_primary()) {
share_pg_info();
}
-
- dout(10) << "scrub done" << dendl;
- unlock();
}
void PG::share_pg_info()
dout(10) << "do_op " << *m << (m->may_write() ? " may_write" : "") << dendl;
+ hobject_t head(m->get_oid(), m->get_object_locator().key,
+ CEPH_NOSNAP, m->get_pg().ps(),
+ info.pgid.pool());
+
if (scrub_block_writes && m->may_write()) {
- dout(20) << __func__ << ": waiting for scrub" << dendl;
- waiting_for_active.push_back(op);
- op->mark_delayed();
- return;
+ // classic (non chunk) scrubs block all writes
+ // chunky scrubs only block writes to a range
+ if (!scrub_is_chunky || (head >= scrub_start && head < scrub_end)) {
+ dout(20) << __func__ << ": waiting for scrub" << dendl;
+ waiting_for_active.push_back(op);
+ op->mark_delayed();
+ return;
+ }
}
// missing object?
- hobject_t head(m->get_oid(), m->get_object_locator().key,
- CEPH_NOSNAP, m->get_pg().ps(),
- info.pgid.pool());
if (is_missing_object(head)) {
wait_for_missing_object(head, op);
return;
dout(10) << "snap_trimmer entry" << dendl;
if (is_primary()) {
entity_inst_t nobody;
- if (!mode.try_write(nobody) || scrub_block_writes) {
+ if (!mode.try_write(nobody) || scrub_active) {
dout(10) << " can't write, requeueing" << dendl;
queue_snap_trim();
unlock();
pending_backfill_updates[soid].stats.add(ctx->delta_stats, ctx->obc->obs.oi.category);
}
+ if (scrub_active && scrub_is_chunky) {
+ assert(soid < scrub_start || soid >= scrub_end);
+ if (soid < scrub_start)
+ scrub_cstat.add(ctx->delta_stats, ctx->obc->obs.oi.category);
+ }
+
return result;
}
assert(info.last_update >= repop->v);
assert(last_update_applied < repop->v);
last_update_applied = repop->v;
- if (last_update_applied == info.last_update && scrub_block_writes) {
+
+ // chunky scrub
+ if (scrub_active && scrub_is_chunky) {
+ if (last_update_applied == scrub_subset_last_update) {
+ osd->scrub_wq.queue(this);
+ }
+
+ // classic scrub
+ } else if (last_update_applied == info.last_update && scrub_block_writes) {
dout(10) << "requeueing scrub for cleanup" << dendl;
finalizing_scrub = true;
scrub_gather_replica_maps();
assert(info.last_update >= m->version);
assert(last_update_applied < m->version);
last_update_applied = m->version;
- if (finalizing_scrub) {
- assert(active_rep_scrub);
+ if (active_rep_scrub) {
if (last_update_applied == active_rep_scrub->scrub_to) {
osd->rep_scrub_wq.queue(active_rep_scrub);
active_rep_scrub = 0;
bool complete = m->recovery_progress.data_complete &&
m->recovery_progress.omap_complete;
ObjectStore::Transaction *t = new ObjectStore::Transaction;
- Context *onreadable = new ObjectStore::C_DeleteTransaction(t);
+ Context *onreadable = new C_OSD_AppliedRecoveredObjectReplica(this, t);
Context *onreadable_sync = 0;
submit_push_data(m->recovery_info,
first,
lock();
dout(10) << "_applied_recovered_object " << *obc << dendl;
put_object_context(obc);
+
+ assert(active_pushes >= 1);
+ --active_pushes;
+
+ // requeue an active chunky scrub waiting on recovery ops
+ if (active_pushes == 0 && is_chunky_scrub_active()) {
+ osd->scrub_wq.queue(this);
+ }
+
+ unlock();
+ delete t;
+}
+
+void ReplicatedPG::_applied_recovered_object_replica(ObjectStore::Transaction *t)
+{
+ lock();
+ dout(10) << "_applied_recovered_object_replica" << dendl;
+
+ assert(active_pushes >= 1);
+ --active_pushes;
+
+ // requeue an active chunky scrub waiting on recovery ops
+ if (active_pushes == 0 &&
+ active_rep_scrub && active_rep_scrub->chunky) {
+ osd->rep_scrub_wq.queue(active_rep_scrub);
+ active_rep_scrub = 0;
+ }
+
unlock();
delete t;
}
void ReplicatedPG::sub_op_push(OpRequestRef op)
{
op->mark_started();
+
+ // keep track of active pushes for scrub
+ ++active_pushes;
+
if (is_primary()) {
handle_pull_response(op);
} else {
recover_got(soid, latest->version);
+ ++active_pushes;
+
osd->store->queue_transaction(osr.get(), t,
new C_OSD_AppliedRecoveredObject(this, t, obc),
new C_OSD_CommittedPushedObject(this, OpRequestRef(),
// SCRUB
-int ReplicatedPG::_scrub(ScrubMap& scrubmap, int* errors, int* fixed)
+void ReplicatedPG::_scrub(ScrubMap& scrubmap)
{
dout(10) << "_scrub" << dendl;
SnapSet snapset;
vector<snapid_t>::reverse_iterator curclone;
- object_stat_collection_t cstat;
-
bufferlist last_data;
for (map<hobject_t,ScrubMap::object>::reverse_iterator p = scrubmap.objects.rbegin();
if (p->second.attrs.count(SS_ATTR) == 0) {
osd->clog.error() << mode << " " << info.pgid << " " << soid
<< " no '" << SS_ATTR << "' attr";
- ++(*errors);
+ ++scrub_errors;
continue;
}
bufferlist bl;
if (head != hobject_t()) {
osd->clog.error() << mode << " " << info.pgid << " " << head
<< " missing clones";
- ++(*errors);
+ ++scrub_errors;
}
// what will be next?
}
if (soid.snap == CEPH_SNAPDIR) {
string cat;
- cstat.add(stat, cat);
+ scrub_cstat.add(stat, cat);
continue;
}
if (p->second.attrs.count(OI_ATTR) == 0) {
osd->clog.error() << mode << " " << info.pgid << " " << soid
<< " no '" << OI_ATTR << "' attr";
- ++(*errors);
+ ++scrub_errors;
continue;
}
bufferlist bv;
osd->clog.error() << mode << " " << info.pgid << " " << soid
<< " on disk size (" << p->second.size
<< ") does not match object info size (" << oi.size << ")";
- ++(*errors);
+ ++scrub_errors;
}
dout(20) << mode << " " << soid << " " << oi << dendl;
if (!snapset.head_exists) {
osd->clog.error() << mode << " " << info.pgid << " " << soid
<< " snapset.head_exists=false, but object exists";
- ++(*errors);
+ ++scrub_errors;
continue;
}
} else if (soid.snap) {
if (soid.snap != *curclone) {
osd->clog.error() << mode << " " << info.pgid << " " << soid
<< " expected clone " << *curclone;
- ++(*errors);
+ ++scrub_errors;
assert(soid.snap == *curclone);
}
}
string cat; // fixme
- cstat.add(stat, cat);
- }
+ scrub_cstat.add(stat, cat);
+ }
+ dout(10) << "_scrub (" << mode << ") finish" << dendl;
+}
+
+void ReplicatedPG::_scrub_clear_state()
+{
+ scrub_cstat = object_stat_collection_t();
+}
+
+void ReplicatedPG::_scrub_finish()
+{
+ bool repair = state_test(PG_STATE_REPAIR);
+ const char *mode = repair ? "repair":"scrub";
+
dout(10) << mode << " got "
- << cstat.sum.num_objects << "/" << info.stats.stats.sum.num_objects << " objects, "
- << cstat.sum.num_object_clones << "/" << info.stats.stats.sum.num_object_clones << " clones, "
- << cstat.sum.num_bytes << "/" << info.stats.stats.sum.num_bytes << " bytes."
+ << scrub_cstat.sum.num_objects << "/" << info.stats.stats.sum.num_objects << " objects, "
+ << scrub_cstat.sum.num_object_clones << "/" << info.stats.stats.sum.num_object_clones << " clones, "
+ << scrub_cstat.sum.num_bytes << "/" << info.stats.stats.sum.num_bytes << " bytes."
<< dendl;
- if (cstat.sum.num_objects != info.stats.stats.sum.num_objects ||
- cstat.sum.num_object_clones != info.stats.stats.sum.num_object_clones ||
- cstat.sum.num_bytes != info.stats.stats.sum.num_bytes) {
+ if (scrub_cstat.sum.num_objects != info.stats.stats.sum.num_objects ||
+ scrub_cstat.sum.num_object_clones != info.stats.stats.sum.num_object_clones ||
+ scrub_cstat.sum.num_bytes != info.stats.stats.sum.num_bytes) {
osd->clog.error() << info.pgid << " " << mode
<< " stat mismatch, got "
- << cstat.sum.num_objects << "/" << info.stats.stats.sum.num_objects << " objects, "
- << cstat.sum.num_object_clones << "/" << info.stats.stats.sum.num_object_clones << " clones, "
- << cstat.sum.num_bytes << "/" << info.stats.stats.sum.num_bytes << " bytes.\n";
- ++(*errors);
+ << scrub_cstat.sum.num_objects << "/" << info.stats.stats.sum.num_objects << " objects, "
+ << scrub_cstat.sum.num_object_clones << "/" << info.stats.stats.sum.num_object_clones << " clones, "
+ << scrub_cstat.sum.num_bytes << "/" << info.stats.stats.sum.num_bytes << " bytes.\n";
+ ++scrub_errors;
if (repair) {
- ++(*fixed);
- info.stats.stats = cstat;
+ ++scrub_fixed;
+ info.stats.stats = scrub_cstat;
update_stats();
share_pg_info();
}
}
-
- dout(10) << "_scrub (" << mode << ") finish" << dendl;
- return (*errors);
}
/*---SnapTrimmer Logging---*/
} else if (!pg->is_primary() || !pg->is_active() || !pg->is_clean()) {
dout(10) << "NotTrimming not primary, active, clean" << dendl;
return discard_event();
- } else if (pg->scrub_block_writes) {
+ } else if (pg->scrub_active) {
dout(10) << "NotTrimming finalizing scrub" << dendl;
pg->queue_snap_trim();
return discard_event();