#include "PGBackend.h"
#include "osd_types.h"
+struct C_ReplicatedBackend_OnPullComplete;
class ReplicatedBackend : public PGBackend {
struct RPGHandle : public PGBackend::RecoveryHandle {
map<int, vector<PushOp> > pushes;
map<int, vector<PullOp> > pulls;
};
+ friend struct C_ReplicatedBackend_OnPullComplete;
private:
bool temp_created;
const coll_t temp_coll;
ReplicatedBackend(PGBackend::Listener *pg, coll_t coll, OSDService *osd);
/// @see PGBackend::open_recovery_op
- PGBackend::RecoveryHandle *open_recovery_op() {
+ RPGHandle *_open_recovery_op() {
return new RPGHandle();
}
+ PGBackend::RecoveryHandle *open_recovery_op() {
+ return _open_recovery_op();
+ }
/// @see PGBackend::run_recovery_op
void run_recovery_op(
bool handle_push_reply(int peer, PushReplyOp &op, PushOp *reply);
void handle_pull(int peer, PullOp &op, PushOp *reply);
- bool handle_pull_response(int from, PushOp &op, PullOp *response,
- ObjectStore::Transaction *t);
+ bool handle_pull_response(
+ int from, PushOp &op, PullOp *response,
+ list<pair<hobject_t, ObjectContextRef> > *to_continue,
+ ObjectStore::Transaction *t);
void handle_push(int from, PushOp &op, PushReplyOp *response,
ObjectStore::Transaction *t);
get_parent()->queue_transaction(t);
}
+struct C_ReplicatedBackend_OnPullComplete : Context {
+ ReplicatedBackend *bc;
+ list<pair<hobject_t, ObjectContextRef> > to_continue;
+ int priority;
+ C_ReplicatedBackend_OnPullComplete(ReplicatedBackend *bc, int priority)
+ : bc(bc), priority(priority) {}
+
+ void finish(int) {
+ ReplicatedBackend::RPGHandle *h = bc->_open_recovery_op();
+ for (list<pair<hobject_t, ObjectContextRef> >::iterator i =
+ to_continue.begin();
+ i != to_continue.end();
+ ++i) {
+ if (!bc->start_pushes(i->first, i->second, h)) {
+ bc->get_parent()->on_global_recover(
+ i->first);
+ }
+ }
+ bc->run_recovery_op(h, priority);
+ }
+};
+
void ReplicatedBackend::_do_pull_response(OpRequestRef op)
{
MOSDPGPush *m = static_cast<MOSDPGPush *>(op->request);
vector<PullOp> replies(1);
ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ list<pair<hobject_t, ObjectContextRef> > to_continue;
for (vector<PushOp>::iterator i = m->pushes.begin();
i != m->pushes.end();
++i) {
- bool more = handle_pull_response(from, *i, &(replies.back()), t);
+ bool more = handle_pull_response(from, *i, &(replies.back()), &to_continue, t);
if (more)
replies.push_back(PullOp());
}
+ if (!to_continue.empty()) {
+ C_ReplicatedBackend_OnPullComplete *c =
+ new C_ReplicatedBackend_OnPullComplete(
+ this,
+ m->get_priority());
+ c->to_continue.swap(to_continue);
+ t->register_on_complete(
+ get_parent()->bless_context(c));
+ }
replies.erase(replies.end() - 1);
if (replies.size()) {
bool ReplicatedBackend::handle_pull_response(
int from, PushOp &pop, PullOp *response,
- ObjectStore::Transaction *t)
+ list<pair<hobject_t, ObjectContextRef> > *to_continue,
+ ObjectStore::Transaction *t
+ )
{
interval_set<uint64_t> data_included = pop.data_included;
bufferlist data;
pi.stat.num_keys_recovered += pop.omap_entries.size();
if (complete) {
- pulling.erase(hoid);
- pull_from_peer[from].erase(hoid);
+ to_continue->push_back(make_pair(hoid, pi.obc));
pi.stat.num_objects_recovered++;
get_parent()->on_local_recover(
hoid, pi.stat, pi.recovery_info, pi.obc, t);
+ pulling.erase(hoid);
+ pull_from_peer[from].erase(hoid);
+ if (pull_from_peer[from].empty())
+ pull_from_peer.erase(from);
return false;
} else {
response->soid = pop.soid;
if (is_primary()) {
PullOp resp;
- bool more = handle_pull_response(m->get_source().num(), pop, &resp, t);
+ RPGHandle *h = _open_recovery_op();
+ list<pair<hobject_t, ObjectContextRef> > to_continue;
+ bool more = handle_pull_response(
+ m->get_source().num(), pop, &resp,
+ &to_continue, t);
if (more) {
send_pull_legacy(
m->get_priority(),
m->get_source().num(),
resp.recovery_info,
resp.recovery_progress);
- }
+ } else {
+ C_ReplicatedBackend_OnPullComplete *c =
+ new C_ReplicatedBackend_OnPullComplete(
+ this,
+ op->request->get_priority());
+ c->to_continue.swap(to_continue);
+ t->register_on_complete(
+ get_parent()->bless_context(c));
+ }
+ run_recovery_op(h, op->request->get_priority());
} else {
PushReplyOp resp;
MOSDSubOpReply *reply = new MOSDSubOpReply(