interval_set<uint64_t> data_subset;
map<sobject_t, interval_set<uint64_t> > clone_subsets;
- virtual void decode_payload() {
+ bool first, complete;
+
+ virtual void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(map_epoch, p);
::decode(reqid, p);
::decode(attrset, p);
::decode(data_subset, p);
::decode(clone_subsets, p);
+
+ if (header.version >= 2) {
+ ::decode(first, p);
+ ::decode(complete, p);
+ }
}
virtual void encode_payload() {
+ header.version = 2;
+
::encode(map_epoch, payload);
::encode(reqid, payload);
::encode(pgid, payload);
header.data_off = ops[0].op.extent.offset;
else
header.data_off = 0;
+ ::encode(first, payload);
+ ::encode(complete, payload);
}
acks_wanted(aw),
noop(noop_),
old_exists(false), old_size(0),
- version(v)
+ version(v),
+ first(false), complete(false)
{
memset(&peer_stat, 0, sizeof(peer_stat));
set_tid(rtid);
<< " " << ops;
if (noop)
out << " (NOOP)";
+ if (first)
+ out << " first";
+ if (complete)
+ out << " complete";
out << " v " << version
<< " snapset=" << snapset << " snapc=" << snapc;
if (!data_subset.empty()) out << " subset " << data_subset;
{
// take note.
push_info_t *pi = &pushing[soid][peer];
+ pi->size = size;
pi->version = version;
pi->data_subset = data_subset;
pi->clone_subsets = clone_subsets;
- //pi->data_subset_pushing.span_of(pi->data_subset, 0, g_conf.osd_recovery_max_chunk);
+
+ pi->data_subset_pushing.span_of(pi->data_subset, 0, g_conf.osd_recovery_max_chunk);
+ bool complete = pi->data_subset_pushing == pi->data_subset;
dout(10) << "push_start " << soid << " size " << size << " data " << data_subset
<< " cloning " << clone_subsets << dendl;
- send_push_op(soid, peer, size, pi->data_subset, pi->clone_subsets);
+ send_push_op(soid, peer, size, true, complete, pi->data_subset_pushing, pi->clone_subsets);
}
*/
void ReplicatedPG::send_push_op(const sobject_t& soid, int peer,
- uint64_t size,
+ uint64_t size, bool first, bool complete,
interval_set<uint64_t> &data_subset,
map<sobject_t, interval_set<uint64_t> >& clone_subsets)
{
//subop->ops[0].op.extent.offset = 0;
//subop->ops[0].op.extent.length = size;
subop->ops[0].data = bl;
- subop->data_subset.swap(data_subset);
- subop->clone_subsets.swap(clone_subsets);
+ subop->data_subset = data_subset;
+ subop->clone_subsets = clone_subsets;
subop->attrset.swap(attrset);
subop->old_size = size;
+ subop->first = first;
+ subop->complete = complete;
osd->messenger->send_message(subop, osd->osdmap->get_inst(peer));
}
} else {
push_info_t *pi = &pushing[soid][peer];
- peer_missing[peer].got(soid, pi->version);
- if (peer_missing[peer].num_missing() == 0)
- uptodate_set.insert(peer);
-
- pushing[soid].erase(peer);
- pi = NULL;
-
- update_stats();
-
- if (pushing[soid].empty()) {
- pushing.erase(soid);
- dout(10) << "pushed " << soid << " to all replicas" << dendl;
- finish_recovery_op(soid);
- if (waiting_for_degraded_object.count(soid)) {
- osd->take_waiters(waiting_for_degraded_object[soid]);
- waiting_for_degraded_object.erase(soid);
- }
+ bool complete = false;
+ if (pi->data_subset.empty() ||
+ pi->data_subset.end() == pi->data_subset_pushing.end())
+ complete = true;
+
+ if (!complete) {
+ // push more
+ uint64_t from = pi->data_subset_pushing.end();
+ dout(10) << " pushing more, " << pi->data_subset << " from " << from << dendl;
+ pi->data_subset_pushing.span_of(pi->data_subset, from, g_conf.osd_recovery_max_chunk);
+ complete = pi->data_subset.end() == pi->data_subset_pushing.end();
+ send_push_op(soid, peer, pi->size, false, complete, pi->data_subset_pushing, pi->clone_subsets);
} else {
- dout(10) << "pushed " << soid << ", still waiting for push ack from "
- << pushing[soid].size() << " others" << dendl;
+ // done!
+ peer_missing[peer].got(soid, pi->version);
+ if (peer_missing[peer].num_missing() == 0)
+ uptodate_set.insert(peer);
+
+ pushing[soid].erase(peer);
+ pi = NULL;
+
+ update_stats();
+
+ if (pushing[soid].empty()) {
+ pushing.erase(soid);
+ dout(10) << "pushed " << soid << " to all replicas" << dendl;
+ finish_recovery_op(soid);
+ if (waiting_for_degraded_object.count(soid)) {
+ osd->take_waiters(waiting_for_degraded_object[soid]);
+ waiting_for_degraded_object.erase(soid);
+ }
+ } else {
+ dout(10) << "pushed " << soid << ", still waiting for push ack from "
+ << pushing[soid].size() << " others" << dendl;
+ }
}
}
reply->put();
assert(r == 0);
uint64_t size = st.st_size;
- send_push_op(soid, op->get_source().num(), size, op->data_subset, op->clone_subsets);
+ send_push_op(soid, op->get_source().num(), size, op->first, op->complete, op->data_subset, op->clone_subsets);
op->put();
}
clone_subsets = op->clone_subsets;
pull_info_t *pi = 0;
- bool first = true;
- bool complete = true;
+ bool first = op->first;
+ bool complete = op->complete;
if (is_primary()) {
if (pulling.count(soid) == 0) {
dout(10) << " not pulling, ignoring" << dendl;
}
if (pi->data_subset.empty()) {
- first = complete = true;
+ complete = true;
} else {
- first = pi->data_subset.start() == data_subset.start();
complete = pi->data_subset.end() == data_subset.end();
}
}
dout(10) << " log.complete_to = " << log.complete_to->version << dendl;
}
+ // update pg
+ write_info(*t);
+
+
// track ObjectContext
if (is_primary()) {
dout(10) << " setting up obc for " << soid << dendl;
} else {
onreadable = new ObjectStore::C_DeleteTransaction(t);
}
+
} else {
onreadable = new ObjectStore::C_DeleteTransaction(t);
}
// apply to disk!
- write_info(*t);
int r = osd->store->queue_transaction(&osr, t,
onreadable,
new C_OSD_Commit(this, info.history.same_acting_since,