recovery_info.version = v;
ObjectRecoveryProgress progress;
progress.data_complete = false;
+ progress.omap_complete = false;
progress.data_recovered_to = 0;
progress.first = true;
assert(!pulling.count(soid));
pi.recovery_progress.first = true;
pi.recovery_progress.data_recovered_to = 0;
pi.recovery_progress.data_complete = 0;
+ pi.recovery_progress.omap_complete = 0;
ObjectRecoveryProgress new_progress;
send_push(peer, pi.recovery_info, pi.recovery_progress, &new_progress);
const interval_set<uint64_t> &intervals_included,
bufferlist data_included,
map<string, bufferptr> &attrs,
+ map<string, bufferlist> &omap_entries,
ObjectStore::Transaction *t)
{
if (first) {
off += p.get_len();
}
+ t->omap_setkeys(coll_t::TEMP_COLL, recovery_info.soid,
+ omap_entries);
t->setattrs(coll_t::TEMP_COLL, recovery_info.soid,
attrs);
}
Context *onreadable_sync = 0;
submit_push_data(pi.recovery_info, first,
data_included, data, m->attrset,
+ m->omap_entries,
t);
if (complete) {
bufferlist data;
m->claim_data(data);
bool first = m->current_progress.first;
- bool complete = m->recovery_progress.data_complete;
+ bool complete = m->recovery_progress.data_complete &&
+ m->recovery_progress.omap_complete;
ObjectStore::Transaction *t = new ObjectStore::Transaction;
Context *onreadable = new ObjectStore::C_DeleteTransaction(t);
Context *onreadable_sync = 0;
m->data_included,
data,
m->attrset,
+ m->omap_entries,
t);
if (complete)
submit_push_complete(m->recovery_info,
new_progress.first = false;
}
+ uint64_t available = g_conf->osd_recovery_max_chunk;
+ if (!progress.omap_complete) {
+ ObjectMap::ObjectMapIterator iter =
+ osd->store->get_omap_iterator(coll,
+ recovery_info.soid);
+ for (iter->upper_bound(progress.omap_recovered_to);
+ iter->valid();
+ iter->next()) {
+ if (available < (iter->key().size() + iter->value().length()))
+ break;
+ subop->omap_entries.insert(make_pair(iter->key(), iter->value()));
+ available -= (iter->key().size() + iter->value().length());
+ }
+ if (!iter->valid())
+ new_progress.omap_complete = true;
+ else
+ new_progress.omap_recovered_to = iter->key();
+ }
subop->data_included.span_of(recovery_info.copy_subset,
progress.data_recovered_to,
- g_conf->osd_recovery_max_chunk);
+ available);
for (interval_set<uint64_t>::iterator p = subop->data_included.begin();
p != subop->data_included.end();