if (op->onack)
flags |= CEPH_OSD_FLAG_ACK;
+ if (op->con) {
+ if (op->outbl->length()) {
+ dout(20) << " revoking rx buffer for " << op->tid << " on " << op->con << dendl;
+ op->con->revoke_rx_buffer(op->tid);
+ }
+ op->con->put();
+ }
+ op->con = messenger->get_connection(osdmap->get_inst(pg.primary()));
+ assert(op->con);
+ if (op->outbl && op->outbl->length()) {
+ dout(20) << " posting rx buffer for " << op->tid << " on " << op->con << dendl;
+ op->con->post_rx_buffer(op->tid, *op->outbl);
+ }
+
ceph_object_layout ol;
ol.ol_pgid = op->pgid.v;
ol.ol_stripe_unit = 0;
if (op->priority)
m->set_priority(op->priority);
- messenger->send_message(m, osdmap->get_inst(pg.primary()));
+ messenger->send_message(m, op->con);
} else
maybe_request_map();
<< dendl;
Op *op = op_osd[ tid ];
- Context *onack = 0;
- Context *oncommit = 0;
-
- PG &pg = get_pg( m->get_pg() );
-
- // ignore?
- if (pg.acker() != m->get_source().num()) {
- dout(7) << " ignoring ack|commit from non-acker" << dendl;
+ if (op->con != m->get_connection()) {
+ dout(7) << " ignoring reply from " << m->get_source_inst()
+ << ", i last sent to " << op->con->get_peer_addr() << dendl;
m->put();
return;
}
-
+
+ Context *onack = 0;
+ Context *oncommit = 0;
+
int rc = m->get_result();
if (rc == -EAGAIN) {
// got data?
if (op->outbl) {
+ if (op->outbl->length())
+ op->con->revoke_rx_buffer(op->tid);
m->claim_data(*op->outbl);
op->outbl = 0;
}
// done with this tid?
if (!op->onack && !op->oncommit) {
+ PG &pg = get_pg( m->get_pg() );
assert(pg.active_tids.count(tid));
pg.active_tids.erase(tid);
dout(15) << "handle_osd_op_reply completed tid " << tid << ", pg " << m->get_pg()
close_pg( m->get_pg() );
put_op_budget(op);
op_osd.erase( tid );
+ if (op->con)
+ op->con->put();
delete op;
}
object_locator_t oloc;
pg_t pgid;
+ Connection *con;
+
vector<OSDOp> ops;
snapid_t snapid;
int f, Context *ac, Context *co) :
session_item(this),
oid(o), oloc(ol),
+ con(NULL),
snapid(CEPH_NOSNAP), outbl(0), flags(f), priority(0), onack(ac), oncommit(co),
tid(0), attempts(0),
paused(false) {
void ms_handle_connect(Connection *con);
void ms_handle_reset(Connection *con);
void ms_handle_remote_reset(Connection *con);
-
};
#endif