void handle_pull(int peer, PullOp &op, PushOp *reply);
bool handle_pull_response(
int from, PushOp &op, PullOp *response,
- list<ObjectContextRef> *to_continue,
+ list<hobject_t> *to_continue,
ObjectStore::Transaction *t);
void handle_push(int from, PushOp &op, PushReplyOp *response,
ObjectStore::Transaction *t);
struct C_ReplicatedBackend_OnPullComplete : GenContext<ThreadPool::TPHandle&> {
ReplicatedBackend *bc;
- list<ObjectContextRef> to_continue;
+ list<hobject_t> to_continue;
int priority;
C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
: bc(bc), priority(priority) {}
void finish(ThreadPool::TPHandle &handle) {
ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
- for (list<ObjectContextRef>::iterator i =
+ for (list<hobject_t>::iterator i =
to_continue.begin();
i != to_continue.end();
++i) {
- if (!bc->start_pushes((*i)->obs.oi.soid, *i, h)) {
+ map<hobject_t, ReplicatedBackend::PullInfo>::iterator j =
+ bc->pulling.find(*i);
+ assert(j != bc->pulling.end());
+ if (!bc->start_pushes(*i, j->second.obc, h)) {
bc->get_parent()->on_global_recover(
- (*i)->obs.oi.soid);
+ *i);
}
+ bc->pulling.erase(*i);
handle.reset_tp_timeout();
}
bc->run_recovery_op(h, priority);
vector<PullOp> replies(1);
ObjectStore::Transaction *t = new ObjectStore::Transaction;
- list<ObjectContextRef> to_continue;
+ list<hobject_t> to_continue;
for (vector<PushOp>::iterator i = m->pushes.begin();
i != m->pushes.end();
++i) {
bool ReplicatedBackend::handle_pull_response(
int from, PushOp &pop, PullOp *response,
- list<ObjectContextRef> *to_continue,
+ list<hobject_t> *to_continue,
ObjectStore::Transaction *t
)
{
pi.stat.num_keys_recovered += pop.omap_entries.size();
if (complete) {
- to_continue->push_back(pi.obc);
+ to_continue->push_back(hoid);
pi.stat.num_objects_recovered++;
get_parent()->on_local_recover(
hoid, pi.stat, pi.recovery_info, pi.obc, t);
- pulling.erase(hoid);
pull_from_peer[from].erase(hoid);
if (pull_from_peer[from].empty())
pull_from_peer.erase(from);
if (is_primary()) {
PullOp resp;
RPGHandle *h = _open_recovery_op();
- list<ObjectContextRef> to_continue;
+ list<hobject_t> to_continue;
bool more = handle_pull_response(
m->get_source().num(), pop, &resp,
&to_continue, t);