reply_op_error(op, -ENXIO);
}
+struct send_map_on_destruct {
+ OSD *osd;
+ Message *m;
+ ConnectionRef con;
+ OSDMapRef osdmap;
+ epoch_t map_epoch;
+ bool should_send;
+ send_map_on_destruct(OSD *osd, Message *m,
+ OSDMapRef& osdmap, epoch_t map_epoch) :
+ osd(osd), m(m), con(m->get_connection()),
+ osdmap(osdmap), map_epoch(map_epoch),
+ should_send(true) {}
+ ~send_map_on_destruct() {
+ if (!should_send)
+ return;
+ OSD::Session *client_session = static_cast<OSD::Session *>(
+ con->get_priv());
+ if (client_session) {
+ client_session->sent_epoch_lock.Lock();
+ }
+ osd->_share_map_incoming(
+ m->get_source(),
+ con.get(),
+ map_epoch,
+ osdmap,
+ client_session);
+ if (client_session) {
+ client_session->sent_epoch_lock.Unlock();
+ client_session->put();
+ }
+ }
+};
+
void OSD::handle_op(OpRequestRef op, OSDMapRef osdmap)
{
MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
return;
}
- // share our map with sender, if they're old
- Session *client_session = static_cast<Session *>(m->get_connection()->get_priv());
- if (client_session) {
- client_session->sent_epoch_lock.Lock();
- }
- _share_map_incoming(
- m->get_source(),
- m->get_connection().get(),
- m->get_map_epoch(),
- osdmap,
- client_session
- );
- if (client_session) {
- client_session->sent_epoch_lock.Unlock();
- client_session->put();
- }
+ // set up a map send if the Op gets blocked for some reason
+ send_map_on_destruct share_map(this, m, osdmap, m->get_map_epoch());
if (op->rmw_flags == 0) {
int r = init_op_flags(op);
}
PG *pg = get_pg_or_queue_for_pg(pgid, op);
- if (pg)
+ if (pg) {
+ op->send_map_update = true;
+ op->sent_epoch = m->get_map_epoch();
enqueue_op(pg, op);
+ share_map.should_send = false;
+ }
}
template<typename T, int MSGTYPE>
<< " latency " << latency
<< " " << *(op->get_req())
<< " pg " << *pg << dendl;
+
+ // share our map with sender, if they're old
+ if (op->send_map_update) {
+ Message *m = op->get_req();
+ Session *session = static_cast<Session *>(m->get_connection()->get_priv());
+ if (session) {
+ session->sent_epoch_lock.Lock();
+ }
+ _share_map_incoming(
+ m->get_source(),
+ m->get_connection().get(),
+ op->sent_epoch,
+ osdmap,
+ session
+ );
+ if (session) {
+ session->sent_epoch_lock.Unlock();
+ session->put();
+ }
+ }
+
if (pg->deleting)
return;
epoch_t note_peer_epoch(int p, epoch_t e);
void forget_peer_epoch(int p, epoch_t e);
+ friend struct send_map_on_destruct;
bool _should_share_map(entity_name_t name, Connection *con, epoch_t epoch,
OSDMapRef& osdmap, Session *session);
void _share_map_incoming(entity_name_t name, Connection *con, epoch_t epoch,
OpRequest::OpRequest(Message *req, OpTracker *tracker) :
TrackedOp(req, tracker),
rmw_flags(0),
- hit_flag_points(0), latest_flag_point(0) {
+ hit_flag_points(0), latest_flag_point(0),
+ send_map_update(false), sent_epoch(0) {
if (req->get_priority() < tracker->cct->_conf->osd_client_op_priority) {
// don't warn as quickly for low priority ops
warn_interval_multiplier = tracker->cct->_conf->osd_recovery_op_warn_multiple;