last_update_ondisk = info.last_update;
min_last_complete_ondisk = eversion_t(0,0); // we don't know (yet)!
}
+ last_update_applied = info.last_update;
assert(info.last_complete >= log.tail || log.backlog);
}
ScrubMap map;
- build_scrub_map(map);
+ if (op->version > eversion_t()) {
+ epoch_t epoch = info.history.same_acting_since;
+ while (last_update_applied != info.last_update) {
+ wait();
+ if (epoch != info.history.same_acting_since ||
+ osd->is_stopping()) {
+ dout(10) << "scrub pg changed, aborting" << dendl;
+ return;
+ }
+ }
+ build_inc_scrub_map(map, op->version);
+ } else {
+ build_scrub_map(map);
+ }
MOSDSubOpReply *reply = new MOSDSubOpReply(op, 0, osd->osdmap->get_epoch(), CEPH_OSD_FLAG_ACK);
::encode(map, reply->get_data());
op->put();
}
+/*
+ * pg lock may or may not be held
+ */
+void PG::_scan_list(ScrubMap &map, vector<sobject_t> &ls)
+{
+ dout(10) << " scanning " << ls.size() << " objects" << dendl;
+ int i = 0;
+ for (vector<sobject_t>::iterator p = ls.begin();
+ p != ls.end();
+ p++, i++) {
+ sobject_t poid = *p;
+
+ struct stat st;
+ int r = osd->store->stat(coll, poid, &st);
+ if (r == 0) {
+ ScrubMap::object &o = map.objects[poid];
+ o.size = st.st_size;
+ osd->store->getattrs(coll, poid, o.attrs);
+ }
+
+ dout(25) << " " << poid << dendl;
+ }
+}
+
+void PG::_request_scrub_map(int replica, eversion_t version)
+{
+ dout(10) << "scrub requesting scrubmap from osd" << replica << dendl;
+ vector<OSDOp> scrub(1);
+ scrub[0].op.op = CEPH_OSD_OP_SCRUB;
+ sobject_t poid;
+ eversion_t v = version;
+ osd_reqid_t reqid;
+ MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, false, 0,
+ osd->osdmap->get_epoch(), osd->get_tid(), v);
+ subop->ops = scrub;
+ osd->cluster_messenger->send_message(subop, //new MOSDPGScrub(info.pgid, osd->osdmap->get_epoch()),
+ osd->osdmap->get_cluster_inst(replica));
+}
/*
* build a (sorted) summary of pg content for purposes of scrubbing
+ * called while holding pg lock
*/
void PG::build_scrub_map(ScrubMap &map)
{
vector<sobject_t> ls;
osd->store->collection_list(coll, ls);
- // sort
- dout(10) << "sorting " << ls.size() << " objects" << dendl;
- vector< pair<sobject_t,int> > tab(ls.size());
- vector< pair<sobject_t,int> >::iterator q = tab.begin();
- int i = 0;
- for (vector<sobject_t>::iterator p = ls.begin();
- p != ls.end();
- p++, i++, q++) {
- q->first = *p;
- q->second = i;
- }
- sort(tab.begin(), tab.end());
- // tab is now sorted, with ->second indicating object's original position
- vector<int> pos(ls.size());
- i = 0;
- for (vector< pair<sobject_t,int> >::iterator p = tab.begin();
- p != tab.end();
- p++, i++)
- pos[p->second] = i;
- // now, pos[orig pos] = sorted pos
+ map.valid_through = last_update_applied;
+ epoch_t epoch = info.history.same_acting_since;
- dout(10) << " scanning " << ls.size() << " objects" << dendl;
- map.objects.resize(ls.size());
- i = 0;
- for (vector<sobject_t>::iterator p = ls.begin();
- p != ls.end();
- p++, i++) {
- sobject_t poid = *p;
+ unlock();
+ _scan_list(map, ls);
+ lock();
- ScrubMap::object& o = map.objects[pos[i]];
- o.poid = *p;
+ if (epoch != info.history.same_acting_since) {
+ dout(10) << "scrub pg changed, aborting" << dendl;
+ return;
+ }
- struct stat st;
- int r = osd->store->stat(coll, poid, &st);
- assert(r == 0);
- o.size = st.st_size;
- osd->store->getattrs(coll, poid, o.attrs);
+ dout(10) << "PG relocked, finalizing" << dendl;
- dout(25) << " " << poid << dendl;
- }
+ // Catch up
+ ScrubMap incr;
+ build_inc_scrub_map(incr, map.valid_through);
+ map.merge_incr(incr);
// pg attrs
osd->store->collection_getattrs(coll, map.attrs);
}
+/*
+ * build a summary of pg content changed starting after v
+ * called while holding pg lock
+ */
+void PG::build_inc_scrub_map(ScrubMap &map, eversion_t v)
+{
+ map.valid_through = last_update_applied;
+ map.incr_since = v;
+ vector<sobject_t> ls;
+ list<Log::Entry>::iterator p;
+ if (v == log.tail) {
+ p = log.log.begin();
+ } else if (v > log.tail) {
+ p = log.find_entry(v);
+ p++;
+ } else {
+ assert(0);
+ }
+
+ for (; p != log.log.end(); p++) {
+ if (p->is_update()) {
+ ls.push_back(p->soid);
+ } else if (p->is_delete()) {
+ map.objects[p->soid];
+ map.objects[p->soid].poid = p->soid;
+ map.objects[p->soid].negative = 1;
+ }
+ }
+
+ _scan_list(map, ls);
+ // pg attrs
+ osd->store->collection_getattrs(coll, map.attrs);
+
+ // log
+ osd->store->read(coll_t(), log_oid, 0, 0, map.logbl);
+}
void PG::repair_object(ScrubMap::object *po, int bad_peer, int ok_peer)
{
int errors = 0, fixed = 0;
bool repair = state_test(PG_STATE_REPAIR);
const char *mode = repair ? "repair":"scrub";
+ map<int,ScrubMap> received_maps;
+ int waiting_on = 0;
osd->map_lock.get_read();
lock();
// request maps from replicas
for (unsigned i=1; i<acting.size(); i++) {
- dout(10) << "scrub requesting scrubmap from osd" << acting[i] << dendl;
- vector<OSDOp> scrub(1);
- scrub[0].op.op = CEPH_OSD_OP_SCRUB;
- sobject_t poid;
- eversion_t v;
- osd_reqid_t reqid;
- MOSDSubOp *subop = new MOSDSubOp(reqid, info.pgid, poid, false, 0,
- osd->osdmap->get_epoch(), osd->get_tid(), v);
- subop->ops = scrub;
- osd->cluster_messenger->send_message(subop, //new MOSDPGScrub(info.pgid, osd->osdmap->get_epoch()),
- osd->osdmap->get_cluster_inst(acting[i]));
+ _request_scrub_map(acting[i], eversion_t());
}
osd->map_lock.put_read();
+ build_scrub_map(scrubmap);
- // wait for any ops in progress
- while (is_write_in_progress()) {
- dout(10) << "scrub write(s) in progress, waiting" << dendl;
+ finalizing_scrub = true;
+ while (last_update_applied != info.last_update) {
wait();
- if (osd->is_stopping())
+ if (epoch != info.history.same_acting_since ||
+ osd->is_stopping()) {
+ dout(10) << "scrub pg changed, aborting" << dendl;
goto out;
+ }
}
- //unlock();
+ waiting_on = acting.size() - 1;
+ while (waiting_on > 0) {
+ while (peer_scrub_map.size() == 0) {
+ wait();
+ if (epoch != info.history.same_acting_since ||
+ osd->is_stopping()) {
+ dout(10) << "scrub pg changed, aborting" << dendl;
+ goto out;
+ }
+ }
- dout(10) << "scrub building my map" << dendl;
- build_scrub_map(scrubmap);
- /*
- lock();
- if (epoch != info.history.same_acting_since) {
- dout(10) << "scrub pg changed, aborting" << dendl;
- goto out;
- }
- */
+ for (map<int,ScrubMap>::iterator p = peer_scrub_map.begin();
+ p != peer_scrub_map.end();
+ peer_scrub_map.erase(p++)) {
- while (peer_scrub_map.size() < acting.size() - 1) {
- dout(10) << "scrub has " << (peer_scrub_map.size()+1) << " / " << acting.size()
- << " maps, waiting" << dendl;
- wait();
+ if (received_maps.count(p->first)) {
+ received_maps[p->first].merge_incr(p->second);
+ } else {
+ received_maps[p->first] = p->second;
+ }
- if (epoch != info.history.same_acting_since ||
- osd->is_stopping()) {
- dout(10) << "scrub pg changed, aborting" << dendl;
- goto out;
+ if (received_maps[p->first].valid_through == log.head) {
+ waiting_on--;
+ } else {
+ // Need to request another incremental map
+ _request_scrub_map(p->first, p->second.valid_through);
+ }
}
- }
- /*
- unlock();
- */
+ if (scrubmap.valid_through != log.head) {
+ ScrubMap incr;
+ build_inc_scrub_map(incr, scrubmap.valid_through);
+ scrubmap.merge_incr(incr);
+ }
+ }
if (acting.size() > 1) {
dout(10) << "scrub comparing replica scrub maps" << dendl;
vector<ScrubMap*> m(acting.size());
m[0] = &scrubmap;
for (unsigned i=1; i<acting.size(); i++)
- m[i] = &peer_scrub_map[acting[i]];
- vector<ScrubMap::object>::iterator p[acting.size()];
+ m[i] = &received_maps[acting[i]];
+ map<sobject_t,ScrubMap::object>::iterator p[acting.size()];
for (unsigned i=0; i<acting.size(); i++)
p[i] = m[i]->objects.begin();
continue;
}
if (!po) {
- po = &(*p[i]);
+ po = &(p[i]->second);
pi = i;
}
- else if (po->poid != p[i]->poid) {
+ else if (po->poid != p[i]->second.poid) {
anymissing = true;
- if (po->poid > p[i]->poid) {
- po = &(*p[i]);
+ if (po->poid > p[i]->second.poid) {
+ po = &(p[i]->second);
pi = i;
}
}
}
- if (!po)
+ if (!po) {
break;
+ }
if (anymissing) {
for (unsigned i=0; i<acting.size(); i++) {
- if (p[i] == m[i]->objects.end() || po->poid != p[i]->poid) {
+ if (p[i] == m[i]->objects.end() || po->poid != p[i]->second.poid) {
ss << info.pgid << " " << mode << " osd" << acting[i] << " missing " << po->poid;
osd->get_logclient()->log(LOG_ERROR, ss);
num_missing++;
bool ok = true;
for (unsigned i=1; i<acting.size(); i++) {
bool peerok = true;
- if (po->size != p[i]->size) {
+ if (po->size != p[i]->second.size) {
dout(0) << "scrub osd" << acting[i] << " " << po->poid
- << " size " << p[i]->size << " != " << po->size << dendl;
+ << " size " << p[i]->second.size << " != " << po->size << dendl;
ss << info.pgid << " " << mode << " osd" << acting[i] << " " << po->poid
- << " size " << p[i]->size << " != " << po->size;
+ << " size " << p[i]->second.size << " != " << po->size;
osd->get_logclient()->log(LOG_ERROR, ss);
peerok = ok = false;
num_bad++;
}
- if (po->attrs.size() != p[i]->attrs.size()) {
+ if (po->attrs.size() != p[i]->second.attrs.size()) {
dout(0) << "scrub osd" << acting[i] << " " << po->poid
- << " attr count " << p[i]->attrs.size() << " != " << po->attrs.size() << dendl;
+ << " attr count " << p[i]->second.attrs.size() << " != " << po->attrs.size() << dendl;
ss << info.pgid << " " << mode << " osd" << acting[i] << " " << po->poid
- << " attr count " << p[i]->attrs.size() << " != " << po->attrs.size();
+ << " attr count " << p[i]->second.attrs.size() << " != " << po->attrs.size();
osd->get_logclient()->log(LOG_ERROR, ss);
peerok = ok = false;
num_bad++;
}
for (map<string,bufferptr>::iterator q = po->attrs.begin(); q != po->attrs.end(); q++) {
- if (p[i]->attrs.count(q->first)) {
- if (q->second.cmp(p[i]->attrs[q->first])) {
+ if (p[i]->second.attrs.count(q->first)) {
+ if (q->second.cmp(p[i]->second.attrs[q->first])) {
dout(0) << "scrub osd" << acting[i] << " " << po->poid
<< " attr " << q->first << " value mismatch" << dendl;
ss << info.pgid << " " << mode << " osd" << acting[i] << " " << po->poid
osd->take_waiters(waiting_for_active);
+ finalizing_scrub = false;
unlock();
}
void ReplicatedPG::calc_trim_to()
{
- if (!is_degraded() &&
+ if (!is_degraded() && !is_scrubbing() &&
(is_clean() ||
log.head.version - log.tail.version > info.stats.num_objects)) {
if (min_last_complete_ondisk != eversion_t() &&
return do_pg_op(op);
dout(10) << "do_op " << *op << dendl;
+ if (finalizing_scrub && op->may_write()) {
+ waiting_for_active.push_back(op);
+ return;
+ }
entity_inst_t client = op->get_source_inst();
put_object_context(repop->obc);
repop->obc = 0;
+ last_update_applied = repop->v;
+ if (last_update_applied == info.last_update && finalizing_scrub) {
+ kick();
+ }
update_stats();
#if 0
rm->applied = true;
bool done = rm->applied && rm->committed;
+ last_update_applied = rm->op->version;
+ if (last_update_applied == info.last_update && finalizing_scrub) {
+ kick();
+ }
+
unlock();
if (done) {
delete rm->ctx;
bufferlist last_data;
- for (vector<ScrubMap::object>::reverse_iterator p = scrubmap.objects.rbegin();
+ for (map<sobject_t,ScrubMap::object>::reverse_iterator p = scrubmap.objects.rbegin();
p != scrubmap.objects.rend();
p++) {
- const sobject_t& soid = p->poid;
+ const sobject_t& soid = p->second.poid;
stat.num_objects++;
// new snapset?
if (soid.snap == CEPH_SNAPDIR ||
soid.snap == CEPH_NOSNAP) {
- if (p->attrs.count(SS_ATTR) == 0) {
+ if (p->second.attrs.count(SS_ATTR) == 0) {
dout(0) << mode << " no '" << SS_ATTR << "' attr on " << soid << dendl;
errors++;
continue;
}
bufferlist bl;
- bl.push_back(p->attrs[SS_ATTR]);
+ bl.push_back(p->second.attrs[SS_ATTR]);
bufferlist::iterator blp = bl.begin();
::decode(snapset, blp);
continue;
// basic checks.
- if (p->attrs.count(OI_ATTR) == 0) {
+ if (p->second.attrs.count(OI_ATTR) == 0) {
dout(0) << mode << " no '" << OI_ATTR << "' attr on " << soid << dendl;
errors++;
continue;
}
bufferlist bv;
- bv.push_back(p->attrs[OI_ATTR]);
+ bv.push_back(p->second.attrs[OI_ATTR]);
object_info_t oi(bv);
dout(20) << mode << " " << soid << " " << oi << dendl;
- stat.num_bytes += p->size;
- stat.num_kb += SHIFT_ROUND_UP(p->size, 10);
+ stat.num_bytes += p->second.size;
+ stat.num_kb += SHIFT_ROUND_UP(p->second.size, 10);
//bufferlist data;
//osd->store->read(c, poid, 0, 0, data);
assert(soid.snap == snapset.clones[curclone]);
- assert(p->size == snapset.clone_size[curclone]);
+ assert(p->second.size == snapset.clone_size[curclone]);
// verify overlap?
// ...