// keep track of active pushes for scrub
++active_pushes;
+ MOSDSubOpReply *reply = new MOSDSubOpReply(
+ m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
+ assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
+
+ Context *oncomplete = new C_OSD_CompletedPushedObjectReplica(
+ osd, reply, m->get_connection());
Context *onreadable = new C_OSD_AppliedRecoveredObjectReplica(this, t);
Context *onreadable_sync = 0;
submit_push_data(m->recovery_info,
t);
int r = osd->store->
- queue_transaction(osr.get(), t,
- onreadable,
- new C_OSD_CommittedPushedObject(
- this, op,
- info.history.same_interval_since,
- info.last_complete),
- onreadable_sync);
+ queue_transaction(
+ osr.get(), t,
+ onreadable,
+ new C_OSD_CommittedPushedObject(
+ this, op,
+ info.history.same_interval_since,
+ info.last_complete),
+ onreadable_sync,
+ oncomplete,
+ OpRequestRef()
+ );
assert(r == 0);
osd->logger->inc(l_osd_push_in);
osd->logger->inc(l_osd_push_inb, m->ops[0].indata.length());
- MOSDSubOpReply *reply = new MOSDSubOpReply(
- m, 0, get_osdmap()->get_epoch(), CEPH_OSD_FLAG_ACK);
- assert(entity_name_t::TYPE_OSD == m->get_connection()->peer_type);
- osd->send_message_osd_cluster(reply, m->get_connection());
}
int ReplicatedPG::send_push(int prio, int peer,
pg->put();
}
};
+ struct C_OSD_CompletedPushedObjectReplica : public Context {
+ OSDService *osd;
+ Message *reply;
+ ConnectionRef conn;
+ C_OSD_CompletedPushedObjectReplica (
+ OSDService *osd,
+ Message *reply,
+ ConnectionRef conn) : osd(osd), reply(reply), conn(conn) {}
+ void finish(int) {
+ osd->send_message_osd_cluster(reply, conn.get());
+ }
+ };
struct C_OSD_AppliedRecoveredObjectReplica : public Context {
boost::intrusive_ptr<ReplicatedPG> pg;
ObjectStore::Transaction *t;