From: Mike Ryan Date: Mon, 16 Jul 2012 22:58:26 +0000 (-0700) Subject: osd: chunky scrub, scrub PGs a chunk of objects at a time X-Git-Tag: v0.53~180^2~1^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=f5046798f4e5437f804f526f6fe8a862b6f3e3b8;p=ceph.git osd: chunky scrub, scrub PGs a chunk of objects at a time Chunky scrub is a more efficient scrub. It blocks writes on a subset of objects and scrubs those, allowing writes through to the rest of the PG. The scrub takes longer to complete than a classic scrub, but improves overall write throughput. This feature is backward-compatible with classic scrub. If the primary detects that any replica does not have the chunky scrub feature, it falls back to the less efficient classic scrub. Signed-off-by: Mike Ryan --- diff --git a/src/include/ceph_features.h b/src/include/ceph_features.h index 71654a35446e..d00ae7c938b0 100644 --- a/src/include/ceph_features.h +++ b/src/include/ceph_features.h @@ -23,6 +23,7 @@ #define CEPH_FEATURE_QUERY_T (1<<16) #define CEPH_FEATURE_INDEP_PG_MAP (1<<17) #define CEPH_FEATURE_CRUSH_TUNABLES (1<<18) +#define CEPH_FEATURE_CHUNKY_SCRUB (1<<19) /* * Features supported. Should be everything above. @@ -46,7 +47,8 @@ CEPH_FEATURE_QUERY_T | \ CEPH_FEATURE_MONENC | \ CEPH_FEATURE_INDEP_PG_MAP | \ - CEPH_FEATURE_CRUSH_TUNABLES) + CEPH_FEATURE_CRUSH_TUNABLES | \ + CEPH_FEATURE_CHUNKY_SCRUB) #define CEPH_FEATURES_SUPPORTED_DEFAULT CEPH_FEATURES_ALL diff --git a/src/messages/MOSDRepScrub.h b/src/messages/MOSDRepScrub.h index af3569c0a320..184d153bcc99 100644 --- a/src/messages/MOSDRepScrub.h +++ b/src/messages/MOSDRepScrub.h @@ -24,22 +24,38 @@ struct MOSDRepScrub : public Message { - static const int HEAD_VERSION = 2; + static const int HEAD_VERSION = 3; + static const int COMPAT_VERSION = 2; pg_t pgid; // PG to scrub eversion_t scrub_from; // only scrub log entries after scrub_from eversion_t scrub_to; // last_update_applied when message sent epoch_t map_epoch; + bool chunky; // true for chunky scrubs + hobject_t start; // lower bound of scrub, inclusive + hobject_t end; // upper bound of scrub, exclusive - MOSDRepScrub() : Message(MSG_OSD_REP_SCRUB, HEAD_VERSION) { } + MOSDRepScrub() : Message(MSG_OSD_REP_SCRUB, HEAD_VERSION, COMPAT_VERSION) { } MOSDRepScrub(pg_t pgid, eversion_t scrub_from, eversion_t scrub_to, epoch_t map_epoch) - : Message(MSG_OSD_REP_SCRUB, HEAD_VERSION), + : Message(MSG_OSD_REP_SCRUB, HEAD_VERSION, COMPAT_VERSION), pgid(pgid), scrub_from(scrub_from), scrub_to(scrub_to), - map_epoch(map_epoch) { } - + map_epoch(map_epoch), + chunky(false) { } + + MOSDRepScrub(pg_t pgid, eversion_t scrub_to, epoch_t map_epoch, + hobject_t start, hobject_t end) + : Message(MSG_OSD_REP_SCRUB, HEAD_VERSION, COMPAT_VERSION), + pgid(pgid), + scrub_to(scrub_to), + map_epoch(map_epoch), + chunky(true), + start(start), + end(end) { } + + private: ~MOSDRepScrub() {} @@ -48,7 +64,9 @@ public: void print(ostream& out) const { out << "replica scrub(pg: "; out << pgid << ",from:" << scrub_from << ",to:" << scrub_to - << "epoch:" << map_epoch; + << ",epoch:" << map_epoch << ",start:" << start << ",end:" << end + << ",chunky:" << chunky + << ",version:" << header.version; out << ")"; } @@ -57,6 +75,9 @@ public: ::encode(scrub_from, payload); ::encode(scrub_to, payload); ::encode(map_epoch, payload); + ::encode(chunky, payload); + ::encode(start, payload); + ::encode(end, payload); } void decode_payload() { bufferlist::iterator p = payload.begin(); @@ -64,6 +85,14 @@ public: ::decode(scrub_from, p); ::decode(scrub_to, p); ::decode(map_epoch, p); + + if (header.version >= 3) { + ::decode(chunky, p); + ::decode(start, p); + ::decode(end, p); + } else { // v2 scrub: non-chunky + chunky = false; + } } }; diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 61476ccfebcd..7fea23ba5a39 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -84,6 +84,10 @@ PG::PG(OSDService *o, OSDMapRef curmap, scrub_reserved(false), scrub_reserve_failed(false), scrub_waiting_on(0), active_rep_scrub(0), + scrub_is_chunky(false), + scrub_errors(0), + scrub_fixed(0), + active_pushes(0), recovery_state(this) { } @@ -2767,29 +2771,40 @@ void PG::sub_op_scrub_map(OpRequestRef op) dout(10) << " got osd." << from << " scrub map" << dendl; bufferlist::iterator p = m->get_data().begin(); - if (scrub_received_maps.count(from)) { - ScrubMap incoming; - incoming.decode(p, info.pgid.pool()); - dout(10) << "from replica " << from << dendl; - dout(10) << "map version is " << incoming.valid_through << dendl; - scrub_received_maps[from].merge_incr(incoming); - } else { + + if (scrub_is_chunky) { // chunky scrub scrub_received_maps[from].decode(p, info.pgid.pool()); + dout(10) << "map version is " << scrub_received_maps[from].valid_through << dendl; + } else { // classic scrub + if (scrub_received_maps.count(from)) { + ScrubMap incoming; + incoming.decode(p, info.pgid.pool()); + dout(10) << "from replica " << from << dendl; + dout(10) << "map version is " << incoming.valid_through << dendl; + scrub_received_maps[from].merge_incr(incoming); + } else { + scrub_received_maps[from].decode(p, info.pgid.pool()); + } } --scrub_waiting_on; scrub_waiting_on_whom.erase(from); + if (scrub_waiting_on == 0) { - if (finalizing_scrub) { // incremental lists received - osd->scrub_finalize_wq.queue(this); - } else { // initial lists received - scrub_block_writes = true; - if (last_update_applied == info.last_update) { - finalizing_scrub = true; - scrub_gather_replica_maps(); - ++scrub_waiting_on; - scrub_waiting_on_whom.insert(osd->whoami); - osd->scrub_wq.queue(this); + if (scrub_is_chunky) { // chunky scrub + osd->scrub_wq.queue(this); + } else { // classic scrub + if (finalizing_scrub) { // incremental lists received + osd->scrub_finalize_wq.queue(this); + } else { // initial lists received + scrub_block_writes = true; + if (last_update_applied == info.last_update) { + finalizing_scrub = true; + scrub_gather_replica_maps(); + ++scrub_waiting_on; + scrub_waiting_on_whom.insert(osd->whoami); + osd->scrub_wq.queue(this); + } } } } @@ -2821,7 +2836,8 @@ void PG::_scan_list(ScrubMap &map, vector &ls) } } -void PG::_request_scrub_map(int replica, eversion_t version) +// send scrub v2-compatible messages (classic scrub) +void PG::_request_scrub_map_classic(int replica, eversion_t version) { assert(replica != osd->whoami); dout(10) << "scrub requesting scrubmap from osd." << replica << dendl; @@ -2832,6 +2848,18 @@ void PG::_request_scrub_map(int replica, eversion_t version) get_osdmap()->get_cluster_inst(replica)); } +// send scrub v3 messages (chunky scrub) +void PG::_request_scrub_map(int replica, eversion_t version, hobject_t start, hobject_t end) +{ + assert(replica != osd->whoami); + dout(10) << "scrub requesting scrubmap from osd." << replica << dendl; + MOSDRepScrub *repscrubop = new MOSDRepScrub(info.pgid, version, + get_osdmap()->get_epoch(), + start, end); + osd->cluster_messenger->send_message(repscrubop, + get_osdmap()->get_cluster_inst(replica)); +} + void PG::sub_op_scrub_reserve(OpRequestRef op) { MOSDSubOp *m = (MOSDSubOp*)op->request; @@ -2954,6 +2982,37 @@ void PG::scrub_unreserve_replicas() } } +/* + * build a scrub map over a chunk without releasing the lock + * only used by chunky scrub + */ +int PG::build_scrub_map_chunk(ScrubMap &map, hobject_t start, hobject_t end) +{ + dout(10) << "build_scrub_map" << dendl; + dout(20) << "scrub_map_chunk [" << start << "," << end << ")" << dendl; + + map.valid_through = info.last_update; + + // objects + vector ls; + int ret = osd->store->collection_list_range(coll, start, end, 0, &ls); + if (ret < 0) { + dout(5) << "collection_list_range error: " << ret << dendl; + return ret; + } + + _scan_list(map, ls); + + // pg attrs + osd->store->collection_getattrs(coll, map.attrs); + + // log + osd->store->read(coll_t(), log_oid, 0, 0, map.logbl); + dout(10) << " done. pg log is " << map.logbl.length() << " bytes" << dendl; + + return 0; +} + /* * build a (sorted) summary of pg content for purposes of scrubbing * called while holding pg lock @@ -3053,6 +3112,8 @@ void PG::repair_object(const hobject_t& soid, ScrubMap::object *po, int bad_peer } /* replica_scrub + * + * Classic behavior: * * If msg->scrub_from is not set, replica_scrub calls build_scrubmap to * build a complete map (with the pg lock dropped). @@ -3062,6 +3123,12 @@ void PG::repair_object(const hobject_t& soid, ScrubMap::object *po, int bad_peer * replica_scrub returns to be requeued by sub_op_modify_applied. * replica_scrub then builds an incremental scrub map with the * pg lock held. + * + * Chunky behavior: + * + * Wait for last_update_applied to match msg->scrub_to as above. Wait + * for pushes to complete in case of recent recovery. Build a single + * scrubmap of objects that are in the range [msg->start, msg->end). */ void PG::replica_scrub(MOSDRepScrub *msg) { @@ -3081,27 +3148,47 @@ void PG::replica_scrub(MOSDRepScrub *msg) } ScrubMap map; - if (msg->scrub_from > eversion_t()) { - if (finalizing_scrub) { - assert(last_update_applied == info.last_update); - assert(last_update_applied == msg->scrub_to); - } else { - finalizing_scrub = 1; - if (last_update_applied != msg->scrub_to) { - active_rep_scrub = msg; - msg->get(); - return; - } + + if (msg->chunky) { // chunky scrub + if (last_update_applied < msg->scrub_to) { + dout(10) << "waiting for last_update_applied to catch up" << dendl; + active_rep_scrub = msg; + msg->get(); + return; } - build_inc_scrub_map(map, msg->scrub_from); - finalizing_scrub = 0; + + if (active_pushes > 0) { + dout(10) << "waiting for active pushes to finish" << dendl; + active_rep_scrub = msg; + msg->get(); + return; + } + + build_scrub_map_chunk(map, msg->start, msg->end); + } else { - build_scrub_map(map); - } + if (msg->scrub_from > eversion_t()) { + if (finalizing_scrub) { + assert(last_update_applied == info.last_update); + assert(last_update_applied == msg->scrub_to); + } else { + finalizing_scrub = 1; + if (last_update_applied != msg->scrub_to) { + active_rep_scrub = msg; + msg->get(); + return; + } + } + build_inc_scrub_map(map, msg->scrub_from); + finalizing_scrub = 0; + } else { + build_scrub_map(map); + } - if (msg->map_epoch < info.history.same_interval_since) { - dout(10) << "scrub pg changed, aborting" << dendl; - return; + if (msg->map_epoch < info.history.same_interval_since) { + dout(10) << "scrub pg changed, aborting" << dendl; + return; + } } vector scrub(1); @@ -3120,30 +3207,11 @@ void PG::replica_scrub(MOSDRepScrub *msg) /* Scrub: * PG_STATE_SCRUBBING is set when the scrub is queued * - * Once the initial scrub has completed and the requests have gone out to - * replicas for maps, we set scrub_active and wait for the replicas to - * complete their maps. Once the maps are received, scrub_block_writes is set. - * scrub_waiting_on is set to the number of maps outstanding (active.size()). - * - * If last_update_applied is behind the head of the log, scrub returns to be - * requeued by op_applied. - * - * Once last_update_applied == info.last_update, scrub catches itself up and - * decrements scrub_waiting_on. - * - * sub_op_scrub_map similarly decrements scrub_waiting_on for each map received. - * - * Once scrub_waiting_on hits 0 (either in scrub or sub_op_scrub_map) - * scrub_finalize is queued. - * - * In scrub_finalize, if any replica maps are too old, new ones are requested, - * scrub_waiting_on is reset, and scrub_finalize returns to be requeued by - * sub_op_scrub_map. If all maps are up to date, scrub_finalize checks - * the maps and performs repairs. + * scrub will be chunky if all OSDs in PG support chunky scrub + * scrub will fall back to classic in any other case */ void PG::scrub() { - lock(); if (deleting) { unlock(); @@ -3157,6 +3225,74 @@ void PG::scrub() return; } + // when we're starting a scrub, we need to determine which type of scrub to do + if (!scrub_active) { + OSDMapRef curmap = osd->get_osdmap(); + scrub_is_chunky = true; + for (unsigned i=1; icluster_messenger->get_connection(curmap->get_cluster_inst(acting[i])); + if (!(con->features & CEPH_FEATURE_CHUNKY_SCRUB)) { + dout(20) << "OSD " << acting[i] + << " does not support chunky scrubs, falling back to classic" + << dendl; + scrub_is_chunky = false; + break; + } + } + + dout(10) << "starting a new " << (scrub_is_chunky ? "chunky" : "classic") << " scrub" << dendl; + } + + if (scrub_is_chunky) { + chunky_scrub(); + } else { + classic_scrub(); + } + + unlock(); +} + +/* + * Classic scrub is a two stage scrub: an initial scrub with writes enabled + * followed by a finalize with writes blocked. + * + * A request is sent out to all replicas for initial scrub maps. Once they reply + * (sub_op_scrub_map) writes are blocked for all objects in the PG. + * + * Finalize: Primaries and replicas wait for all writes in the log to be applied + * (op_applied), then builds an incremental scrub of all the changes since the + * beginning of the scrub. + * + * Once the primary has received all maps, it compares them and performs + * repairs. + * + * The initial stage of the scrub is handled by scrub_wq and the final stage by + * scrub_finalize_wq. + * + * Relevant variables: + * + * scrub_waiting_on (int) + * scrub_waiting_on_whom + * Number of people who still need to build an initial/incremental scrub map. + * This is decremented in sub_op_scrub_map. + * + * last_update_applied + * The last update that's hit the disk. In the finalize stage, we block + * writes and wait for all writes to flush by checking: + * + * last_update_appied == info.last_update + * + * This is checked in op_applied. + * + * scrub_block_writes + * Flag to determine if writes are blocked. + * + * finalizing scrub + * Flag set when we're in the finalize stage. + * + */ +void PG::classic_scrub() +{ if (!scrub_active) { dout(10) << "scrub start" << dendl; scrub_active = true; @@ -3184,7 +3320,7 @@ void PG::scrub() // request maps from replicas for (unsigned i=1; iscrub_finalize_wq.queue(this); } - - unlock(); +} + +/* + * Chunky scrub scrubs objects one chunk at a time with writes blocked for that + * chunk. + * + * The object store is partitioned into chunks which end on hash boundaries. For + * each chunk, the following logic is performed: + * + * (1) Block writes on the chunk + * (2) Request maps from replicas + * (3) Wait for pushes to be applied (after recovery) + * (4) Wait for writes to flush on the chunk + * (5) Wait for maps from replicas + * (6) Compare / repair all scrub maps + * + * This logic is encoded in the very linear state machine: + * + * +---------------------+ + * ____________v_____________ | + * | | | + * | SCRUB_INACTIVE | | + * |__________________________| | + * | | + * | +-------------+ | + * ____________v___v_________ | | + * | | | | + * | SCRUB_NEW_CHUNK | | | + * |__________________________| | | + * | | | + * ___________v_____________ | | + * | | | | + * | SCRUB_WAIT_PUSHES | | | + * |_________________________| | | + * | | | + * ____________v_____________ | | + * | | | | + * | SCRUB_WAIT_LAST_UPDATE | | | + * |__________________________| | | + * | | | + * ____________v_____________ | | + * | | | | + * | SCRUB_BUILD_MAP | | | + * |__________________________| | | + * | | | + * ____________v_____________ | | + * | | | | + * | SCRUB_WAIT_REPLICAS | | | + * |__________________________| | | + * | | | + * ____________v_____________ | | + * | | | | + * | SCRUB_COMPARE_MAPS | | | + * |__________________________| | | + * | | | | + * | +-------------+ | + * ____________v_____________ | + * | | | + * | SCRUB_FINISH | | + * |__________________________| | + * | | + * +---------------------+ + * + * The primary determines the last update from the subset by walking the log. If + * it sees a log entry pertaining to a file in the chunk, it tells the replicas + * to wait until that update is applied before building a scrub map. Both the + * primary and replicas will wait for any active pushes to be applied. + * + * In contrast to classic_scrub, chunky_scrub is entirely handled by scrub_wq. + * + * scrub_state encodes the current state of the scrub (refer to state diagram + * for details). + */ +void PG::chunky_scrub() { + // check for map changes + if (is_chunky_scrub_active()) { + if (scrub_epoch_start != info.history.same_interval_since) { + dout(10) << "scrub pg changed, aborting" << dendl; + scrub_clear_state(); + scrub_unreserve_replicas(); + return; + } + } + + bool done = false; + int ret; + + while (!done) { + dout(20) << "scrub state " << scrub_string(scrub_state) << dendl; + + switch (scrub_state) { + case SCRUB_INACTIVE: + dout(10) << "scrub start" << dendl; + + update_stats(); + scrub_epoch_start = info.history.same_interval_since; + scrub_active = true; + + osd->sched_scrub_lock.Lock(); + if (scrub_reserved) { + --(osd->scrubs_pending); + assert(osd->scrubs_pending >= 0); + scrub_reserved = false; + scrub_reserved_peers.clear(); + } + ++(osd->scrubs_active); + osd->sched_scrub_lock.Unlock(); + + scrub_start = hobject_t(); + scrub_state = SCRUB_NEW_CHUNK; + + break; + + case SCRUB_NEW_CHUNK: + primary_scrubmap = ScrubMap(); + scrub_received_maps.clear(); + + { + + // get the start and end of our scrub chunk + // + // start and end need to lie on a hash boundary. We test for this by + // requesting a list and searching backward from the end looking for a + // boundary. If there's no boundary, we request a list after the first + // list, and so forth. + + bool boundary_found = false; + hobject_t start = scrub_start; + while (!boundary_found) { + vector objects; + ret = osd->store->collection_list_partial(coll, start, + 5, 5, 0, + &objects, &scrub_end); + assert(ret >= 0); + + // in case we don't find a boundary: start again at the end + start = scrub_end; + + // special case: reached end of file store, implicitly a boundary + if (objects.size() == 0) { + break; + } + + // search backward from the end looking for a boundary + objects.push_back(scrub_end); + while (!boundary_found && objects.size() > 1) { + hobject_t end = objects.back(); + objects.pop_back(); + + if (objects.back().get_filestore_key() != end.get_filestore_key()) { + scrub_end = end; + boundary_found = true; + } + } + } + } + + scrub_block_writes = true; + + // walk the log to find the latest update that affects our chunk + scrub_subset_last_update = eversion_t(); + for (list::iterator p = log.log.begin(); + p != log.log.end(); + ++p) { + if (p->soid >= scrub_start && p->soid < scrub_end) + scrub_subset_last_update = p->version; + } + + // ask replicas to wait until last_update_applied >= scrub_subset_last_update and then scan + scrub_waiting_on_whom.insert(osd->whoami); + ++scrub_waiting_on; + + // request maps from replicas + for (unsigned i=1; i= scrub_subset_last_update) { + scrub_state = SCRUB_BUILD_MAP; + } else { + // will be requeued by op_applied + dout(15) << "wait for writes to flush" << dendl; + done = true; + } + break; + + case SCRUB_BUILD_MAP: + assert(last_update_applied >= scrub_subset_last_update); + + // build my own scrub map + ret = build_scrub_map_chunk(primary_scrubmap, scrub_start, scrub_end); + if (ret < 0) { + dout(5) << "error building scrub map: " << ret << ", aborting" << dendl; + scrub_clear_state(); + scrub_unreserve_replicas(); + return; + } + + --scrub_waiting_on; + scrub_waiting_on_whom.erase(osd->whoami); + + scrub_state = SCRUB_WAIT_REPLICAS; + break; + + case SCRUB_WAIT_REPLICAS: + if (scrub_waiting_on > 0) { + // will be requeued by sub_op_scrub_map + dout(10) << "wait for replicas to build scrub map" << dendl; + done = true; + } else { + scrub_state = SCRUB_COMPARE_MAPS; + } + break; + + case SCRUB_COMPARE_MAPS: + assert(last_update_applied >= scrub_subset_last_update); + assert(scrub_waiting_on == 0); + + scrub_compare_maps(); + scrub_block_writes = false; + + // requeue the writes from the chunk that just finished + requeue_ops(waiting_for_active); + + if (scrub_end < hobject_t::get_max()) { + // schedule another leg of the scrub + scrub_start = scrub_end; + + scrub_state = SCRUB_NEW_CHUNK; + osd->scrub_wq.queue(this); + done = true; + } else { + scrub_state = SCRUB_FINISH; + } + + break; + + case SCRUB_FINISH: + scrub_finish(); + scrub_state = SCRUB_INACTIVE; + done = true; + + break; + + default: + assert(0); + } + } } void PG::scrub_clear_state() @@ -3276,6 +3671,16 @@ void PG::scrub_clear_state() active_rep_scrub = NULL; } scrub_received_maps.clear(); + + scrub_state = SCRUB_INACTIVE; + scrub_start = hobject_t(); + scrub_end = hobject_t(); + scrub_subset_last_update = eversion_t(); + scrub_errors = 0; + scrub_fixed = 0; + + // type-specific state clear + _scrub_clear_state(); } bool PG::scrub_gather_replica_maps() { @@ -3290,7 +3695,7 @@ bool PG::scrub_gather_replica_maps() { scrub_waiting_on++; scrub_waiting_on_whom.insert(p->first); // Need to request another incremental map - _request_scrub_map(p->first, p->second.valid_through); + _request_scrub_map_classic(p->first, p->second.valid_through); } } @@ -3398,31 +3803,8 @@ void PG::_compare_scrubmaps(const map &maps, } } -void PG::scrub_finalize() { - lock(); - if (deleting) { - unlock(); - return; - } - - assert(last_update_applied == info.last_update); - - if (scrub_epoch_start != info.history.same_interval_since) { - dout(10) << "scrub pg changed, aborting" << dendl; - scrub_clear_state(); - scrub_unreserve_replicas(); - unlock(); - return; - } - - if (!scrub_gather_replica_maps()) { - dout(10) << "maps not yet up to date, sent out new requests" << dendl; - unlock(); - return; - } - - dout(10) << "scrub_finalize has maps, analyzing" << dendl; - int errors = 0, fixed = 0; +void PG::scrub_compare_maps() { + dout(10) << "scrub_compare_maps has maps, analyzing" << dendl; bool repair = state_test(PG_STATE_REPAIR); const char *mode = repair ? "repair":"scrub"; if (acting.size() > 1) { @@ -3489,27 +3871,67 @@ void PG::scrub_finalize() { } // ok, do the pg-type specific scrubbing - _scrub(primary_scrubmap, &errors, &fixed); + _scrub(primary_scrubmap); +} + +void PG::scrub_finalize() { + lock(); + if (deleting) { + unlock(); + return; + } + + assert(last_update_applied == info.last_update); + + if (scrub_epoch_start != info.history.same_interval_since) { + dout(10) << "scrub pg changed, aborting" << dendl; + scrub_clear_state(); + scrub_unreserve_replicas(); + unlock(); + return; + } + + if (!scrub_gather_replica_maps()) { + dout(10) << "maps not yet up to date, sent out new requests" << dendl; + unlock(); + return; + } + + scrub_compare_maps(); + + scrub_finish(); + + dout(10) << "scrub done" << dendl; + unlock(); +} + +// the part that actually finalizes a scrub +void PG::scrub_finish() { + bool repair = state_test(PG_STATE_REPAIR); + const char *mode = repair ? "repair":"scrub"; + + // type-specific finish (can tally more errors) + _scrub_finish(); + + if (scrub_errors == 0 || (repair && (scrub_errors - scrub_fixed) == 0)) + state_clear(PG_STATE_INCONSISTENT); { stringstream oss; oss << info.pgid << " " << mode << " "; - if (errors) - oss << errors << " errors"; + if (scrub_errors) + oss << scrub_errors << " errors"; else oss << "ok"; if (repair) - oss << ", " << fixed << " fixed"; + oss << ", " << scrub_fixed << " fixed"; oss << "\n"; - if (errors) + if (scrub_errors) osd->clog.error(oss); else osd->clog.info(oss); } - if (errors == 0 || (repair && (errors - fixed) == 0)) - state_clear(PG_STATE_INCONSISTENT); - // finish up osd->unreg_last_pg_scrub(info.pgid, info.history.last_scrub_stamp); info.history.last_scrub = info.last_update; @@ -3529,9 +3951,6 @@ void PG::scrub_finalize() { if (is_active() && is_primary()) { share_pg_info(); } - - dout(10) << "scrub done" << dendl; - unlock(); } void PG::share_pg_info() diff --git a/src/osd/PG.h b/src/osd/PG.h index 72b5c594cc06..e2f240f7ded4 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -785,6 +785,43 @@ public: ScrubMap primary_scrubmap; MOSDRepScrub *active_rep_scrub; + bool scrub_is_chunky; + + hobject_t scrub_start; + hobject_t scrub_end; + eversion_t scrub_subset_last_update; + int scrub_errors; + int scrub_fixed; + + int active_pushes; + + enum ScrubState { + SCRUB_INACTIVE, + SCRUB_NEW_CHUNK, + SCRUB_WAIT_PUSHES, + SCRUB_WAIT_LAST_UPDATE, + SCRUB_BUILD_MAP, + SCRUB_WAIT_REPLICAS, + SCRUB_COMPARE_MAPS, + SCRUB_FINISH, + } scrub_state; + + static const char *scrub_string(const PG::ScrubState& state) { + const char *ret = NULL; + switch( state ) + { + case SCRUB_INACTIVE: ret = "SCRUB_INACTIVE"; break; + case SCRUB_NEW_CHUNK: ret = "SCRUB_NEW_CHUNK"; break; + case SCRUB_WAIT_PUSHES: ret = "SCRUB_WAIT_PUSHES"; break; + case SCRUB_WAIT_LAST_UPDATE: ret = "SCRUB_WAIT_LAST_UPDATE"; break; + case SCRUB_BUILD_MAP: ret = "SCRUB_BUILD_MAP"; break; + case SCRUB_WAIT_REPLICAS: ret = "SCRUB_WAIT_REPLICAS"; break; + case SCRUB_COMPARE_MAPS: ret = "SCRUB_COMPARE_MAPS"; break; + case SCRUB_FINISH: ret = "SCRUB_FINISH"; break; + } + return ret; + } + void repair_object(const hobject_t& soid, ScrubMap::object *po, int bad_peer, int ok_peer); bool _compare_scrub_objects(ScrubMap::object &auth, ScrubMap::object &candidate, @@ -795,14 +832,22 @@ public: map &authoritative, ostream &errorstream); void scrub(); + void classic_scrub(); + void chunky_scrub(); + void scrub_compare_maps(); void scrub_finalize(); + void scrub_finish(); void scrub_clear_state(); bool scrub_gather_replica_maps(); void _scan_list(ScrubMap &map, vector &ls); - void _request_scrub_map(int replica, eversion_t version); + void _request_scrub_map_classic(int replica, eversion_t version); + void _request_scrub_map(int replica, eversion_t version, hobject_t start, hobject_t end); + int build_scrub_map_chunk(ScrubMap &map, hobject_t start, hobject_t end); void build_scrub_map(ScrubMap &map); void build_inc_scrub_map(ScrubMap &map, eversion_t v); - virtual int _scrub(ScrubMap &map, int* errors, int* fixed) { return 0; } + virtual void _scrub(ScrubMap &map) { } + virtual void _scrub_clear_state() { } + virtual void _scrub_finish() { } virtual coll_t get_temp_coll() = 0; virtual bool have_temp_coll() = 0; void clear_scrub_reserved(); @@ -1436,6 +1481,8 @@ public: bool is_scrubbing() const { return state_test(PG_STATE_SCRUBBING); } + bool is_chunky_scrub_active() const { return scrub_state != SCRUB_INACTIVE; } + bool is_empty() const { return info.last_update == eversion_t(0,0); } void init(int role, vector& up, vector& acting, pg_history_t& history, diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 7be408437936..11a75b6ab6ed 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -615,17 +615,22 @@ void ReplicatedPG::do_op(OpRequestRef op) dout(10) << "do_op " << *m << (m->may_write() ? " may_write" : "") << dendl; + hobject_t head(m->get_oid(), m->get_object_locator().key, + CEPH_NOSNAP, m->get_pg().ps(), + info.pgid.pool()); + if (scrub_block_writes && m->may_write()) { - dout(20) << __func__ << ": waiting for scrub" << dendl; - waiting_for_active.push_back(op); - op->mark_delayed(); - return; + // classic (non chunk) scrubs block all writes + // chunky scrubs only block writes to a range + if (!scrub_is_chunky || (head >= scrub_start && head < scrub_end)) { + dout(20) << __func__ << ": waiting for scrub" << dendl; + waiting_for_active.push_back(op); + op->mark_delayed(); + return; + } } // missing object? - hobject_t head(m->get_oid(), m->get_object_locator().key, - CEPH_NOSNAP, m->get_pg().ps(), - info.pgid.pool()); if (is_missing_object(head)) { wait_for_missing_object(head, op); return; @@ -1439,7 +1444,7 @@ void ReplicatedPG::snap_trimmer() dout(10) << "snap_trimmer entry" << dendl; if (is_primary()) { entity_inst_t nobody; - if (!mode.try_write(nobody) || scrub_block_writes) { + if (!mode.try_write(nobody) || scrub_active) { dout(10) << " can't write, requeueing" << dendl; queue_snap_trim(); unlock(); @@ -3348,6 +3353,12 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx) pending_backfill_updates[soid].stats.add(ctx->delta_stats, ctx->obc->obs.oi.category); } + if (scrub_active && scrub_is_chunky) { + assert(soid < scrub_start || soid >= scrub_end); + if (soid < scrub_start) + scrub_cstat.add(ctx->delta_stats, ctx->obc->obs.oi.category); + } + return result; } @@ -3456,7 +3467,15 @@ void ReplicatedPG::op_applied(RepGather *repop) assert(info.last_update >= repop->v); assert(last_update_applied < repop->v); last_update_applied = repop->v; - if (last_update_applied == info.last_update && scrub_block_writes) { + + // chunky scrub + if (scrub_active && scrub_is_chunky) { + if (last_update_applied == scrub_subset_last_update) { + osd->scrub_wq.queue(this); + } + + // classic scrub + } else if (last_update_applied == info.last_update && scrub_block_writes) { dout(10) << "requeueing scrub for cleanup" << dendl; finalizing_scrub = true; scrub_gather_replica_maps(); @@ -4360,8 +4379,7 @@ void ReplicatedPG::sub_op_modify_applied(RepModify *rm) assert(info.last_update >= m->version); assert(last_update_applied < m->version); last_update_applied = m->version; - if (finalizing_scrub) { - assert(active_rep_scrub); + if (active_rep_scrub) { if (last_update_applied == active_rep_scrub->scrub_to) { osd->rep_scrub_wq.queue(active_rep_scrub); active_rep_scrub = 0; @@ -5078,7 +5096,7 @@ void ReplicatedPG::handle_push(OpRequestRef op) bool complete = m->recovery_progress.data_complete && m->recovery_progress.omap_complete; ObjectStore::Transaction *t = new ObjectStore::Transaction; - Context *onreadable = new ObjectStore::C_DeleteTransaction(t); + Context *onreadable = new C_OSD_AppliedRecoveredObjectReplica(this, t); Context *onreadable_sync = 0; submit_push_data(m->recovery_info, first, @@ -5387,6 +5405,34 @@ void ReplicatedPG::_applied_recovered_object(ObjectStore::Transaction *t, Object lock(); dout(10) << "_applied_recovered_object " << *obc << dendl; put_object_context(obc); + + assert(active_pushes >= 1); + --active_pushes; + + // requeue an active chunky scrub waiting on recovery ops + if (active_pushes == 0 && is_chunky_scrub_active()) { + osd->scrub_wq.queue(this); + } + + unlock(); + delete t; +} + +void ReplicatedPG::_applied_recovered_object_replica(ObjectStore::Transaction *t) +{ + lock(); + dout(10) << "_applied_recovered_object_replica" << dendl; + + assert(active_pushes >= 1); + --active_pushes; + + // requeue an active chunky scrub waiting on recovery ops + if (active_pushes == 0 && + active_rep_scrub && active_rep_scrub->chunky) { + osd->rep_scrub_wq.queue(active_rep_scrub); + active_rep_scrub = 0; + } + unlock(); delete t; } @@ -5476,6 +5522,10 @@ void ReplicatedPG::trim_pushed_data( void ReplicatedPG::sub_op_push(OpRequestRef op) { op->mark_started(); + + // keep track of active pushes for scrub + ++active_pushes; + if (is_primary()) { handle_pull_response(op); } else { @@ -6074,6 +6124,8 @@ int ReplicatedPG::recover_primary(int max) recover_got(soid, latest->version); + ++active_pushes; + osd->store->queue_transaction(osr.get(), t, new C_OSD_AppliedRecoveredObject(this, t, obc), new C_OSD_CommittedPushedObject(this, OpRequestRef(), @@ -6538,7 +6590,7 @@ void ReplicatedPG::clean_up_local(ObjectStore::Transaction& t) // SCRUB -int ReplicatedPG::_scrub(ScrubMap& scrubmap, int* errors, int* fixed) +void ReplicatedPG::_scrub(ScrubMap& scrubmap) { dout(10) << "_scrub" << dendl; @@ -6551,8 +6603,6 @@ int ReplicatedPG::_scrub(ScrubMap& scrubmap, int* errors, int* fixed) SnapSet snapset; vector::reverse_iterator curclone; - object_stat_collection_t cstat; - bufferlist last_data; for (map::reverse_iterator p = scrubmap.objects.rbegin(); @@ -6569,7 +6619,7 @@ int ReplicatedPG::_scrub(ScrubMap& scrubmap, int* errors, int* fixed) if (p->second.attrs.count(SS_ATTR) == 0) { osd->clog.error() << mode << " " << info.pgid << " " << soid << " no '" << SS_ATTR << "' attr"; - ++(*errors); + ++scrub_errors; continue; } bufferlist bl; @@ -6581,7 +6631,7 @@ int ReplicatedPG::_scrub(ScrubMap& scrubmap, int* errors, int* fixed) if (head != hobject_t()) { osd->clog.error() << mode << " " << info.pgid << " " << head << " missing clones"; - ++(*errors); + ++scrub_errors; } // what will be next? @@ -6606,7 +6656,7 @@ int ReplicatedPG::_scrub(ScrubMap& scrubmap, int* errors, int* fixed) } if (soid.snap == CEPH_SNAPDIR) { string cat; - cstat.add(stat, cat); + scrub_cstat.add(stat, cat); continue; } @@ -6614,7 +6664,7 @@ int ReplicatedPG::_scrub(ScrubMap& scrubmap, int* errors, int* fixed) if (p->second.attrs.count(OI_ATTR) == 0) { osd->clog.error() << mode << " " << info.pgid << " " << soid << " no '" << OI_ATTR << "' attr"; - ++(*errors); + ++scrub_errors; continue; } bufferlist bv; @@ -6625,7 +6675,7 @@ int ReplicatedPG::_scrub(ScrubMap& scrubmap, int* errors, int* fixed) osd->clog.error() << mode << " " << info.pgid << " " << soid << " on disk size (" << p->second.size << ") does not match object info size (" << oi.size << ")"; - ++(*errors); + ++scrub_errors; } dout(20) << mode << " " << soid << " " << oi << dendl; @@ -6640,7 +6690,7 @@ int ReplicatedPG::_scrub(ScrubMap& scrubmap, int* errors, int* fixed) if (!snapset.head_exists) { osd->clog.error() << mode << " " << info.pgid << " " << soid << " snapset.head_exists=false, but object exists"; - ++(*errors); + ++scrub_errors; continue; } } else if (soid.snap) { @@ -6652,7 +6702,7 @@ int ReplicatedPG::_scrub(ScrubMap& scrubmap, int* errors, int* fixed) if (soid.snap != *curclone) { osd->clog.error() << mode << " " << info.pgid << " " << soid << " expected clone " << *curclone; - ++(*errors); + ++scrub_errors; assert(soid.snap == *curclone); } @@ -6673,35 +6723,45 @@ int ReplicatedPG::_scrub(ScrubMap& scrubmap, int* errors, int* fixed) } string cat; // fixme - cstat.add(stat, cat); - } + scrub_cstat.add(stat, cat); + } + dout(10) << "_scrub (" << mode << ") finish" << dendl; +} + +void ReplicatedPG::_scrub_clear_state() +{ + scrub_cstat = object_stat_collection_t(); +} + +void ReplicatedPG::_scrub_finish() +{ + bool repair = state_test(PG_STATE_REPAIR); + const char *mode = repair ? "repair":"scrub"; + dout(10) << mode << " got " - << cstat.sum.num_objects << "/" << info.stats.stats.sum.num_objects << " objects, " - << cstat.sum.num_object_clones << "/" << info.stats.stats.sum.num_object_clones << " clones, " - << cstat.sum.num_bytes << "/" << info.stats.stats.sum.num_bytes << " bytes." + << scrub_cstat.sum.num_objects << "/" << info.stats.stats.sum.num_objects << " objects, " + << scrub_cstat.sum.num_object_clones << "/" << info.stats.stats.sum.num_object_clones << " clones, " + << scrub_cstat.sum.num_bytes << "/" << info.stats.stats.sum.num_bytes << " bytes." << dendl; - if (cstat.sum.num_objects != info.stats.stats.sum.num_objects || - cstat.sum.num_object_clones != info.stats.stats.sum.num_object_clones || - cstat.sum.num_bytes != info.stats.stats.sum.num_bytes) { + if (scrub_cstat.sum.num_objects != info.stats.stats.sum.num_objects || + scrub_cstat.sum.num_object_clones != info.stats.stats.sum.num_object_clones || + scrub_cstat.sum.num_bytes != info.stats.stats.sum.num_bytes) { osd->clog.error() << info.pgid << " " << mode << " stat mismatch, got " - << cstat.sum.num_objects << "/" << info.stats.stats.sum.num_objects << " objects, " - << cstat.sum.num_object_clones << "/" << info.stats.stats.sum.num_object_clones << " clones, " - << cstat.sum.num_bytes << "/" << info.stats.stats.sum.num_bytes << " bytes.\n"; - ++(*errors); + << scrub_cstat.sum.num_objects << "/" << info.stats.stats.sum.num_objects << " objects, " + << scrub_cstat.sum.num_object_clones << "/" << info.stats.stats.sum.num_object_clones << " clones, " + << scrub_cstat.sum.num_bytes << "/" << info.stats.stats.sum.num_bytes << " bytes.\n"; + ++scrub_errors; if (repair) { - ++(*fixed); - info.stats.stats = cstat; + ++scrub_fixed; + info.stats.stats = scrub_cstat; update_stats(); share_pg_info(); } } - - dout(10) << "_scrub (" << mode << ") finish" << dendl; - return (*errors); } /*---SnapTrimmer Logging---*/ @@ -6749,7 +6809,7 @@ boost::statechart::result ReplicatedPG::NotTrimming::react(const SnapTrim&) } else if (!pg->is_primary() || !pg->is_active() || !pg->is_clean()) { dout(10) << "NotTrimming not primary, active, clean" << dendl; return discard_event(); - } else if (pg->scrub_block_writes) { + } else if (pg->scrub_active) { dout(10) << "NotTrimming finalizing scrub" << dendl; pg->queue_snap_trim(); return discard_event(); diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 2cd54301f3b7..9c2836e7e090 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -885,6 +885,15 @@ protected: pg->put(); } }; + struct C_OSD_AppliedRecoveredObjectReplica : public Context { + boost::intrusive_ptr pg; + ObjectStore::Transaction *t; + C_OSD_AppliedRecoveredObjectReplica(ReplicatedPG *p, ObjectStore::Transaction *tt) : + pg(p), t(tt) {} + void finish(int r) { + pg->_applied_recovered_object_replica(t); + } + }; void sub_op_remove(OpRequestRef op); @@ -894,6 +903,7 @@ protected: void sub_op_modify_reply(OpRequestRef op); void _applied_recovered_object(ObjectStore::Transaction *t, ObjectContext *obc); + void _applied_recovered_object_replica(ObjectStore::Transaction *t); void _committed_pushed_object(OpRequestRef op, epoch_t same_since, eversion_t lc); void recover_got(hobject_t oid, eversion_t v); void sub_op_push(OpRequestRef op); @@ -905,7 +915,10 @@ protected: // -- scrub -- - virtual int _scrub(ScrubMap& map, int* errors, int* fixed); + virtual void _scrub(ScrubMap& map); + virtual void _scrub_clear_state(); + virtual void _scrub_finish(); + object_stat_collection_t scrub_cstat; void apply_and_flush_repops(bool requeue);