}
};
-void ReplicatedPG::handle_push(OpRequestRef op)
+void ReplicatedPG::handle_push(
+ int from, PushOp &pop, PushReplyOp *response,
+ ObjectStore::Transaction *t)
{
- MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request);
dout(10) << "handle_push "
- << m->recovery_info
- << m->recovery_progress
+ << pop.recovery_info
+ << pop.after_progress
<< dendl;
bufferlist data;
- m->claim_data(data);
- bool first = m->current_progress.first;
- bool complete = m->recovery_progress.data_complete &&
- m->recovery_progress.omap_complete;
- ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ data.claim(pop.data);
+ bool first = pop.before_progress.first;
+ bool complete = pop.after_progress.data_complete &&
+ pop.after_progress.omap_complete;
// keep track of active pushes for scrub
++active_pushes;
- MOSDSubOpReply *reply = new MOSDSubOpReply(
- m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
- assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
-
- Context *oncomplete = new C_OSD_CompletedPushedObjectReplica(
- osd, reply, m->get_connection());
- Context *onreadable = new C_OSD_AppliedRecoveredObjectReplica(this, t);
- Context *onreadable_sync = 0;
- submit_push_data(m->recovery_info,
+ response->soid = pop.recovery_info.soid;
+ t->register_on_applied(
+ new C_OSD_AppliedRecoveredObjectReplica(this));
+ submit_push_data(pop.recovery_info,
first,
complete,
- m->data_included,
+ pop.data_included,
data,
- m->omap_header,
- m->attrset,
- m->omap_entries,
+ pop.omap_header,
+ pop.attrset,
+ pop.omap_entries,
t);
- t->register_on_commit(new C_OnPushCommit(this, op));
- int r = osd->store->
- queue_transaction(
- osr.get(), t,
- onreadable,
- new C_OSD_CommittedPushedObject(
- this,
- get_osdmap()->get_epoch(),
- info.last_complete),
- onreadable_sync,
- oncomplete,
- OpRequestRef()
- );
- assert(r == 0);
-
- osd->logger->inc(l_osd_push_in);
- osd->logger->inc(l_osd_push_inb, m->ops[0].indata.length());
-
+ t->register_on_commit(
+ new C_OSD_CommittedPushedObject(
+ this,
+ get_osdmap()->get_epoch(),
+ info.last_complete));
}
int ReplicatedPG::send_push(int prio, int peer,
unlock();
}
-void ReplicatedPG::_applied_recovered_object_replica(ObjectStore::Transaction *t)
+void ReplicatedPG::_applied_recovered_object_replica()
{
lock();
dout(10) << "_applied_recovered_object_replica" << dendl;
}
unlock();
- delete t;
}
void ReplicatedPG::recover_got(hobject_t oid, eversion_t v)
op->mark_started();
MOSDSubOp *m = static_cast<MOSDSubOp *>(op->request);
- if (is_primary()) {
- PushOp pop;
- pop.soid = m->recovery_info.soid;
- pop.version = m->version;
- m->claim_data(pop.data);
- pop.data_included.swap(m->data_included);
- pop.omap_header.swap(m->omap_header);
- pop.omap_entries.swap(m->omap_entries);
- pop.attrset.swap(m->attrset);
- pop.recovery_info = m->recovery_info;
- pop.before_progress = m->current_progress;
- pop.after_progress = m->recovery_progress;
+ PushOp pop;
+ pop.soid = m->recovery_info.soid;
+ pop.version = m->version;
+ m->claim_data(pop.data);
+ pop.data_included.swap(m->data_included);
+ pop.omap_header.swap(m->omap_header);
+ pop.omap_entries.swap(m->omap_entries);
+ pop.attrset.swap(m->attrset);
+ pop.recovery_info = m->recovery_info;
+ pop.before_progress = m->current_progress;
+ pop.after_progress = m->recovery_progress;
+ ObjectStore::Transaction *t = new ObjectStore::Transaction;
+ if (is_primary()) {
PullOp resp;
- ObjectStore::Transaction *t = new ObjectStore::Transaction;
- t->register_on_applied(new ObjectStore::C_DeleteTransaction(t));
bool more = handle_pull_response(m->get_source().num(), pop, &resp, t);
- t->register_on_commit(new C_OnPushCommit(this, op));
- osd->store->queue_transaction(osr.get(), t);
if (more) {
send_pull(
- m->get_source().num(),
m->get_priority(),
+ m->get_source().num(),
resp.recovery_info,
resp.recovery_progress);
}
} else {
- handle_push(op);
+ PushReplyOp resp;
+ MOSDSubOpReply *reply = new MOSDSubOpReply(
+ m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+ reply->set_priority(m->get_priority());
+ assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
+ handle_push(m->get_source().num(), pop, &resp, t);
+ t->register_on_complete(new C_OSD_SendMessageOnConn(
+ osd, reply, m->get_connection()));
}
+ t->register_on_commit(new C_OnPushCommit(this, op));
+ osd->store->queue_transaction(osr.get(), t);
return;
}
bool handle_pull_response(
int from, PushOp &op, PullOp *response,
ObjectStore::Transaction *t);
- void handle_push(OpRequestRef op);
+ void handle_push(
+ int from, PushOp &op, PushReplyOp *response,
+ ObjectStore::Transaction *t);
int send_push(int priority, int peer,
const ObjectRecoveryInfo& recovery_info,
const ObjectRecoveryProgress &progress,
pg->_committed_pushed_object(epoch, last_complete);
}
};
- struct C_OSD_CompletedPushedObjectReplica : public Context {
+ struct C_OSD_SendMessageOnConn: public Context {
OSDService *osd;
Message *reply;
ConnectionRef conn;
- C_OSD_CompletedPushedObjectReplica (
+ C_OSD_SendMessageOnConn(
OSDService *osd,
Message *reply,
ConnectionRef conn) : osd(osd), reply(reply), conn(conn) {}
friend class C_OSD_CompletedPull;
struct C_OSD_AppliedRecoveredObjectReplica : public Context {
ReplicatedPGRef pg;
- ObjectStore::Transaction *t;
- C_OSD_AppliedRecoveredObjectReplica(ReplicatedPG *p, ObjectStore::Transaction *tt) :
- pg(p), t(tt) {}
+ C_OSD_AppliedRecoveredObjectReplica(ReplicatedPG *p) :
+ pg(p) {}
void finish(int r) {
- pg->_applied_recovered_object_replica(t);
+ pg->_applied_recovered_object_replica();
}
};
void sub_op_modify_reply(OpRequestRef op);
void _applied_recovered_object(ObjectContext *obc);
- void _applied_recovered_object_replica(ObjectStore::Transaction *t);
+ void _applied_recovered_object_replica();
void _committed_pushed_object(epoch_t epoch, eversion_t lc);
void recover_got(hobject_t oid, eversion_t v);
void sub_op_push(OpRequestRef op);