]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
OSD: refactor handle_op error handling cases
authorGreg Farnum <greg@inktank.com>
Wed, 26 Mar 2014 20:14:12 +0000 (13:14 -0700)
committerGreg Farnum <greg@inktank.com>
Mon, 5 May 2014 22:29:18 +0000 (15:29 -0700)
We move our map version-checking code earlier (to dispatch_op) and refactor
our other fail-to-dispatch cases. This is friendlier for the no-lock
message processing we'll use with fast dispatch.

Signed-off-by: Samuel Just <sam.just@inktank.com>
Signed-off-by: Greg Farnum <greg@inktank.com>
src/osd/OSD.cc
src/osd/OSD.h

index 6e3f3a96f0db3b00593d5310343cb7377171bf11..3f1e23d53226b072f157c145d349bb43727fa3cf 100644 (file)
@@ -2013,6 +2013,21 @@ PG *OSD::_create_lock_pg(
   return pg;
 }
 
+PG *OSD::get_pg_or_queue_for_pg(spg_t pgid, OpRequestRef op)
+{
+  {
+    RWLock::RLocker l(pg_map_lock);
+    if (pg_map.count(pgid))
+      return pg_map[pgid];
+  }
+  RWLock::WLocker l(pg_map_lock);
+  if (pg_map.count(pgid)) {
+    return pg_map[pgid];
+  } else {
+    waiting_for_pg[pgid].push_back(op);
+    return NULL;
+  }
+}
 
 bool OSD::_have_pg(spg_t pgid)
 {
@@ -4914,7 +4929,6 @@ bool OSD::ms_verify_authorizer(Connection *con, int peer_type,
   return true;
 };
 
-
 void OSD::do_waiters()
 {
   assert(osd_lock.is_locked());
@@ -4932,6 +4946,54 @@ void OSD::do_waiters()
   dout(10) << "do_waiters -- finish" << dendl;
 }
 
+template<typename T, int MSGTYPE>
+epoch_t replica_op_required_epoch(OpRequestRef op)
+{
+  T *m = static_cast<T *>(op->get_req());
+  assert(m->get_header().type == MSGTYPE);
+  return m->map_epoch;
+}
+
+epoch_t op_required_epoch(OpRequestRef op)
+{
+  switch (op->get_req()->get_type()) {
+  case CEPH_MSG_OSD_OP: {
+    MOSDOp *m = static_cast<MOSDOp*>(op->get_req());
+    return m->get_map_epoch();
+  }
+  case MSG_OSD_SUBOP:
+    return replica_op_required_epoch<MOSDSubOp, MSG_OSD_SUBOP>(op);
+  case MSG_OSD_SUBOPREPLY:
+    return replica_op_required_epoch<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(
+      op);
+  case MSG_OSD_PG_PUSH:
+    return replica_op_required_epoch<MOSDPGPush, MSG_OSD_PG_PUSH>(
+      op);
+  case MSG_OSD_PG_PULL:
+    return replica_op_required_epoch<MOSDPGPull, MSG_OSD_PG_PULL>(
+      op);
+  case MSG_OSD_PG_PUSH_REPLY:
+    return replica_op_required_epoch<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(
+      op);
+  case MSG_OSD_PG_SCAN:
+    return replica_op_required_epoch<MOSDPGScan, MSG_OSD_PG_SCAN>(op);
+  case MSG_OSD_PG_BACKFILL:
+    return replica_op_required_epoch<MOSDPGBackfill, MSG_OSD_PG_BACKFILL>(
+      op);
+  case MSG_OSD_EC_WRITE:
+    return replica_op_required_epoch<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op);
+  case MSG_OSD_EC_WRITE_REPLY:
+    return replica_op_required_epoch<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op);
+  case MSG_OSD_EC_READ:
+    return replica_op_required_epoch<MOSDECSubOpRead, MSG_OSD_EC_READ>(op);
+  case MSG_OSD_EC_READ_REPLY:
+    return replica_op_required_epoch<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op);
+  default:
+    assert(0);
+    return 0;
+  }
+}
+
 void OSD::dispatch_op(OpRequestRef op)
 {
   switch (op->get_req()->get_type()) {
@@ -4969,45 +5031,54 @@ void OSD::dispatch_op(OpRequestRef op)
   case MSG_OSD_RECOVERY_RESERVE:
     handle_pg_recovery_reserve(op);
     break;
+  default:
+    epoch_t msg_epoch(op_required_epoch(op));
+    if (msg_epoch > osdmap->get_epoch())
+      return false;
 
+    switch(op->get_req()->get_type()) {
     // client ops
-  case CEPH_MSG_OSD_OP:
-    handle_op(op, osdmap);
-    break;
+    case CEPH_MSG_OSD_OP:
+      handle_op(op, osdmap);
+      break;
 
-    // for replication etc.
-  case MSG_OSD_SUBOP:
-    handle_replica_op<MOSDSubOp, MSG_OSD_SUBOP>(op, osdmap);
-    break;
-  case MSG_OSD_SUBOPREPLY:
-    handle_replica_op<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(op, osdmap);
-    break;
-  case MSG_OSD_PG_PUSH:
-    handle_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op, osdmap);
-    break;
-  case MSG_OSD_PG_PULL:
-    handle_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op, osdmap);
-    break;
-  case MSG_OSD_PG_PUSH_REPLY:
-    handle_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op, osdmap);
-    break;
-  case MSG_OSD_PG_SCAN:
-    handle_replica_op<MOSDPGScan, MSG_OSD_PG_SCAN>(op, osdmap);
-    break;
-  case MSG_OSD_PG_BACKFILL:
-    handle_replica_op<MOSDPGBackfill, MSG_OSD_PG_BACKFILL>(op, osdmap);
-    break;
-  case MSG_OSD_EC_WRITE:
-    handle_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op, osdmap);
-    break;
-  case MSG_OSD_EC_WRITE_REPLY:
-    handle_replica_op<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op, osdmap);
-    break;
-  case MSG_OSD_EC_READ:
-    handle_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op, osdmap);
-    break;
-  case MSG_OSD_EC_READ_REPLY:
-    handle_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op, osdmap);
+      // for replication etc.
+    case MSG_OSD_SUBOP:
+      handle_replica_op<MOSDSubOp, MSG_OSD_SUBOP>(op, osdmap);
+      break;
+    case MSG_OSD_SUBOPREPLY:
+      handle_replica_op<MOSDSubOpReply, MSG_OSD_SUBOPREPLY>(op, osdmap);
+      break;
+    case MSG_OSD_PG_PUSH:
+      handle_replica_op<MOSDPGPush, MSG_OSD_PG_PUSH>(op, osdmap);
+      break;
+    case MSG_OSD_PG_PULL:
+      handle_replica_op<MOSDPGPull, MSG_OSD_PG_PULL>(op, osdmap);
+      break;
+    case MSG_OSD_PG_PUSH_REPLY:
+      handle_replica_op<MOSDPGPushReply, MSG_OSD_PG_PUSH_REPLY>(op, osdmap);
+      break;
+    case MSG_OSD_PG_SCAN:
+      handle_replica_op<MOSDPGScan, MSG_OSD_PG_SCAN>(op, osdmap);
+      break;
+    case MSG_OSD_PG_BACKFILL:
+      handle_replica_op<MOSDPGBackfill, MSG_OSD_PG_BACKFILL>(op, osdmap);
+      break;
+    case MSG_OSD_EC_WRITE:
+      handle_replica_op<MOSDECSubOpWrite, MSG_OSD_EC_WRITE>(op, osdmap);
+      break;
+    case MSG_OSD_EC_WRITE_REPLY:
+      handle_replica_op<MOSDECSubOpWriteReply, MSG_OSD_EC_WRITE_REPLY>(op, osdmap);
+      break;
+    case MSG_OSD_EC_READ:
+      handle_replica_op<MOSDECSubOpRead, MSG_OSD_EC_READ>(op, osdmap);
+      break;
+    case MSG_OSD_EC_READ_REPLY:
+      handle_replica_op<MOSDECSubOpReadReply, MSG_OSD_EC_READ_REPLY>(op, osdmap);
+      break;
+    default:
+      assert(0);
+    }
     break;
   }
 }
@@ -7407,10 +7478,6 @@ void OSD::handle_op(OpRequestRef op, OSDMapRef osdmap)
   // we don't need encoded payload anymore
   m->clear_payload();
 
-  // require same or newer map
-  if (!require_same_or_newer_map(op, m->get_map_epoch()))
-    return;
-
   // object name too long?
   if (m->get_oid().name.size() > MAX_CEPH_OBJECT_NAME_LEN) {
     dout(4) << "handle_op '" << m->get_oid().name << "' is longer than "
@@ -7500,52 +7567,40 @@ void OSD::handle_op(OpRequestRef op, OSDMapRef osdmap)
     return;
   }
 
-  // get and lock *pg.
-  PG *pg = _have_pg(pgid) ? _lookup_pg(pgid) : NULL;
-  if (!pg) {
-    dout(7) << "hit non-existent pg " << pgid << dendl;
-
-    if (osdmap->get_pg_acting_role(pgid.pgid, whoami) >= 0) {
-      dout(7) << "we are valid target for op, waiting" << dendl;
-      waiting_for_pg[pgid].push_back(op);
-      op->mark_delayed("waiting for pg to exist locally");
-      return;
-    }
+  OSDMapRef send_map = service.try_get_map(m->get_map_epoch());
+  // check send epoch
+  if (!send_map) {
 
-    // okay, we aren't valid now; check send epoch
-    if (m->get_map_epoch() < superblock.oldest_map) {
-      dout(7) << "don't have sender's osdmap; assuming it was valid and that client will resend" << dendl;
-      return;
-    }
-    OSDMapRef send_map = get_map(m->get_map_epoch());
-
-    if (send_map->get_pg_acting_role(pgid.pgid, whoami) >= 0) {
-      dout(7) << "dropping request; client will resend when they get new map" << dendl;
-    } else if (!send_map->have_pg_pool(pgid.pool())) {
-      dout(7) << "dropping request; pool did not exist" << dendl;
-      clog.warn() << m->get_source_inst() << " invalid " << m->get_reqid()
-                 << " pg " << m->get_pg()
-                 << " to osd." << whoami
-                 << " in e" << osdmap->get_epoch()
-                 << ", client e" << m->get_map_epoch()
-                 << " when pool " << m->get_pg().pool() << " did not exist"
-                 << "\n";
-    } else {
-      dout(7) << "we are invalid target" << dendl;
-      clog.warn() << m->get_source_inst() << " misdirected " << m->get_reqid()
-                 << " pg " << m->get_pg()
-                 << " to osd." << whoami
-                 << " in e" << osdmap->get_epoch()
-                 << ", client e" << m->get_map_epoch()
-                 << " pg " << pgid
-                 << " features " << m->get_connection()->get_features()
-                 << "\n";
-      service.reply_op_error(op, -ENXIO);
-    }
+    dout(7) << "don't have sender's osdmap; assuming it was valid and that client will resend" << dendl;
+    return;
+  }
+  if (!send_map->have_pg_pool(pgid.pool())) {
+    dout(7) << "dropping request; pool did not exist" << dendl;
+    clog.warn() << m->get_source_inst() << " invalid " << m->get_reqid()
+                     << " pg " << m->get_pg()
+                     << " to osd." << whoami
+                     << " in e" << osdmap->get_epoch()
+                     << ", client e" << m->get_map_epoch()
+                     << " when pool " << m->get_pg().pool() << " did not exist"
+                     << "\n";
+    return;
+  } else if (send_map->get_pg_acting_role(pgid.pgid, whoami) < 0) {
+    dout(7) << "we are invalid target" << dendl;
+    clog.warn() << m->get_source_inst() << " misdirected " << m->get_reqid()
+                     << " pg " << m->get_pg()
+                     << " to osd." << whoami
+                     << " in e" << osdmap->get_epoch()
+                     << ", client e" << m->get_map_epoch()
+                     << " pg " << pgid
+                     << " features " << m->get_connection()->get_features()
+                     << "\n";
+    service.reply_op_error(op, -ENXIO);
     return;
   }
 
-  enqueue_op(pg, op);
+  PG *pg = get_pg_or_queue_for_pg(pgid, op);
+  if (pg)
+    enqueue_op(pg, op);
 }
 
 template<typename T, int MSGTYPE>
@@ -7569,10 +7624,6 @@ void OSD::handle_replica_op(OpRequestRef op, OSDMapRef osdmap)
   // must be a rep op.
   assert(m->get_source().is_osd());
   
-  // require same or newer map
-  if (!require_same_or_newer_map(op, m->map_epoch))
-    return;
-
   // share our map with sender, if they're old
   Session *peer_session =
       static_cast<Session*>(m->get_connection()->get_priv());
@@ -7588,18 +7639,9 @@ void OSD::handle_replica_op(OpRequestRef op, OSDMapRef osdmap)
     peer_session->put();
   }
 
-  // make sure we have the pg
-  const spg_t pgid = m->pgid;
-  if (service.splitting(pgid)) {
-    waiting_for_pg[pgid].push_back(op);
-    return;
-  }
-
-  PG *pg = _have_pg(pgid) ? _lookup_pg(pgid) : NULL;
-  if (!pg) {
-    return;
-  }
-  enqueue_op(pg, op);
+  PG *pg = get_pg_or_queue_for_pg(m->pgid, op);
+  if (pg)
+    enqueue_op(pg, op);
 }
 
 bool OSD::op_is_discardable(MOSDOp *op)
index 7fec20d662271e23017479711b55116d29edc614..808020bcadc69dcc332fccfdb6f1391041eb8bb8 100644 (file)
@@ -1399,11 +1399,13 @@ protected:
   RWLock pg_map_lock; // this lock orders *above* individual PG _locks
   ceph::unordered_map<spg_t, PG*> pg_map; // protected by pg_map lock
   map<spg_t, list<OpRequestRef> > waiting_for_pg; // protected by pg_map lock
+
   map<spg_t, list<PG::CephPeeringEvtRef> > peering_wait_for_split;
   PGRecoveryStats pg_recovery_stats;
 
   PGPool _get_pool(int id, OSDMapRef createmap);
 
+  PG *get_pg_or_queue_for_pg(spg_t pgid, OpRequestRef op);
   bool  _have_pg(spg_t pgid);
   PG   *_lookup_lock_pg_with_map_lock_held(spg_t pgid);
   PG   *_lookup_lock_pg(spg_t pgid);