BackfillInterval bi;
osr->flush();
+ bi.begin = m->begin;
scan_range(
- m->begin, cct->_conf->osd_backfill_scan_min,
+ cct->_conf->osd_backfill_scan_min,
cct->_conf->osd_backfill_scan_max, &bi, handle);
MOSDPGScan *reply = new MOSDPGScan(MOSDPGScan::OP_SCAN_DIGEST,
get_osdmap()->get_epoch(), m->query_epoch,
<< " interval " << pbi.begin << "-" << pbi.end
<< " " << pbi.objects.size() << " objects" << dendl;
- int local_min = osd->store->get_ideal_list_min();
- int local_max = osd->store->get_ideal_list_max();
+ int local_min = cct->_conf->osd_backfill_scan_min;
+ int local_max = cct->_conf->osd_backfill_scan_max;
- // re-scan our local interval to cope with recent changes
- // FIXME: we could track the eversion_t when we last scanned, and invalidate
- // that way. or explicitly modify/invalidate when we actually change specific
- // objects.
- dout(10) << " rescanning local backfill_info from " << backfill_pos << dendl;
- backfill_info.clear();
- osr->flush();
- scan_range(backfill_pos, local_min, local_max, &backfill_info, handle);
+ // update our local interval to cope with recent changes
+ backfill_info.begin = backfill_pos;
+ update_range(&backfill_info, handle);
int ops = 0;
map<hobject_t, pair<eversion_t, eversion_t> > to_push;
if (backfill_info.begin <= pbi.begin &&
!backfill_info.extends_to_end() && backfill_info.empty()) {
osr->flush();
- scan_range(backfill_info.end, local_min, local_max, &backfill_info,
+ backfill_info.begin = backfill_info.end;
+ scan_range(local_min, local_max, &backfill_info,
handle);
backfill_info.trim();
}
start_recovery_op(oid);
recovering.insert(oid);
ObjectContextRef obc = get_object_context(oid, false);
+
+ // We need to take the read_lock here in order to flush in-progress writes
+ obc->ondisk_read_lock();
pgbackend->recover_object(
oid,
ObjectContextRef(),
obc,
h);
+ obc->ondisk_read_unlock();
+}
+
+void ReplicatedPG::update_range(
+ BackfillInterval *bi,
+ ThreadPool::TPHandle &handle)
+{
+ int local_min = cct->_conf->osd_backfill_scan_min;
+ int local_max = cct->_conf->osd_backfill_scan_max;
+ if (bi->version >= info.last_update) {
+ dout(10) << __func__<< ": bi is current " << dendl;
+ assert(bi->version == info.last_update);
+ } else if (bi->version >= info.log_tail) {
+ assert(!pg_log.get_log().empty());
+ dout(10) << __func__<< ": bi is old, (" << bi->version
+ << ") can be updated with log" << dendl;
+ list<pg_log_entry_t>::const_iterator i =
+ pg_log.get_log().log.end();
+ --i;
+ while (i != pg_log.get_log().log.begin() &&
+ i->version > bi->version) {
+ --i;
+ }
+ if (i->version == bi->version)
+ ++i;
+
+ assert(i != pg_log.get_log().log.end());
+ dout(10) << __func__ << ": updating from version " << i->version
+ << dendl;
+ for (; i != pg_log.get_log().log.end(); ++i) {
+ const hobject_t &soid = i->soid;
+ if (soid >= bi->begin && soid < bi->end) {
+ if (i->is_update()) {
+ dout(10) << __func__ << ": " << i->soid << " updated to version "
+ << i->version << dendl;
+ bi->objects.erase(i->soid);
+ bi->objects.insert(
+ make_pair(
+ i->soid,
+ i->version));
+ } else if (i->is_delete()) {
+ dout(10) << __func__ << ": " << i->soid << " removed" << dendl;
+ bi->objects.erase(i->soid);
+ }
+ }
+ }
+ bi->version = info.last_update;
+ } else {
+ dout(10) << __func__<< ": bi is old, rescanning local backfill_info"
+ << dendl;
+ osr->flush();
+ scan_range(local_min, local_max, &backfill_info, handle);
+ }
}
void ReplicatedPG::scan_range(
- hobject_t begin, int min, int max, BackfillInterval *bi,
+ int min, int max, BackfillInterval *bi,
ThreadPool::TPHandle &handle)
{
assert(is_locked());
- dout(10) << "scan_range from " << begin << dendl;
- bi->begin = begin;
+ dout(10) << "scan_range from " << bi->begin << dendl;
+ bi->version = info.last_update;
bi->objects.clear(); // for good measure
vector<hobject_t> ls;
ls.reserve(max);
- int r = osd->store->collection_list_partial(coll, begin, min, max,
+ int r = osd->store->collection_list_partial(coll, bi->begin, min, max,
0, &ls, &bi->end);
assert(r >= 0);
dout(10) << " got " << ls.size() << " items, next " << bi->end << dendl;