]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
objecter: track pending requests by osd, not pg
authorSage Weil <sage@newdream.net>
Wed, 15 Dec 2010 00:15:04 +0000 (16:15 -0800)
committerSage Weil <sage@newdream.net>
Wed, 15 Dec 2010 19:02:09 +0000 (11:02 -0800)
This is a big cleanup.  Also
 - switch to keeping per-osd Connection *'s
 - make requests time out independently (not very efficiently yet)

Signed-off-by: Sage Weil <sage@newdream.net>
src/osdc/Objecter.cc
src/osdc/Objecter.h

index 349762699b881ee05c5d43809a973440302cb1c7..5fcf3ba97efcfa6b59216f1bbef98e5de1f15a12 100644 (file)
@@ -58,8 +58,10 @@ void Objecter::shutdown()
 {
 }
 
-tid_t Objecter::resend_linger(LingerOp *info)
+tid_t Objecter::send_linger(LingerOp *info)
 {
+  dout(15) << "send_linger " << info->linger_id << dendl;
+  
   vector<OSDOp> ops = info->ops; // need to pass a copy to ops
   Context *onack = NULL;
   Context *oncommit = NULL;
@@ -71,7 +73,7 @@ tid_t Objecter::resend_linger(LingerOp *info)
   }
   Op *o = new Op(info->oid, info->oloc, ops, info->flags | CEPH_OSD_FLAG_READ,
                 onack, oncommit,
-                info->pobjver, true);
+                info->pobjver);
   o->snapid = info->snap;
   return op_submit(o, info);
 }
@@ -106,7 +108,7 @@ void Objecter::unregister_linger(uint64_t linger_id)
   map<uint64_t, LingerOp*>::iterator iter = op_linger_info.find(linger_id);
   if (iter != op_linger_info.end()) {
     LingerOp *info = iter->second;
-    info->pg_item.remove_myself();
+    info->session_item.remove_myself();
     op_linger_info.erase(iter);
     delete info;
   }
@@ -133,7 +135,7 @@ tid_t Objecter::linger(const object_t& oid, const object_locator_t& oloc,
   info->linger_id = ++max_linger_id;
   op_linger_info[info->linger_id] = info;
 
-  resend_linger(info);
+  send_linger(info);
 
   return info->linger_id;
 }
@@ -177,6 +179,10 @@ void Objecter::handle_osd_map(MOSDMap *m)
     return;
   }
 
+  bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
+  bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR);
+  bool was_full = osdmap->test_flag(CEPH_OSDMAP_FULL);
+
   if (m->get_last() <= osdmap->get_epoch()) {
     dout(3) << "handle_osd_map ignoring epochs [" 
             << m->get_first() << "," << m->get_last() 
@@ -188,18 +194,12 @@ void Objecter::handle_osd_map(MOSDMap *m)
             << "] > " << osdmap->get_epoch()
             << dendl;
 
-    set<pg_t> changed_pgs;
-
     if (osdmap->get_epoch()) {
       // we want incrementals
       for (epoch_t e = osdmap->get_epoch() + 1;
           e <= m->get_last();
           e++) {
-
-       bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
-       bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR);
-       bool was_full = osdmap->test_flag(CEPH_OSDMAP_FULL);
-  
        if (m->incremental_maps.count(e)) {
          dout(3) << "handle_osd_map decoding incremental epoch " << e << dendl;
          OSDMap::Incremental inc(m->incremental_maps[e]);
@@ -222,26 +222,6 @@ void Objecter::handle_osd_map(MOSDMap *m)
          break;
        }
 
-       if (was_pauserd || was_pausewr || was_full)
-         maybe_request_map();
-       
-       // scan pgs for changes
-       scan_pgs(changed_pgs);
-
-       // kick paused
-       if ((was_pauserd && !osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) ||
-           (was_pausewr && !osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) ||
-           (was_full && !osdmap->test_flag(CEPH_OSDMAP_FULL))) {
-         for (hash_map<tid_t,Op*>::iterator p = op_osd.begin();
-              p != op_osd.end();
-              p++) {
-           if (p->second->paused) {
-             p->second->paused = false;
-             op_submit(p->second);
-           }
-         }
-       }
-        dump_active();
        assert(e == osdmap->get_epoch());
       }
       
@@ -250,25 +230,103 @@ void Objecter::handle_osd_map(MOSDMap *m)
       if (m->maps.count(m->get_last())) {
        dout(3) << "handle_osd_map decoding full epoch " << m->get_last() << dendl;
        osdmap->decode(m->maps[m->get_last()]);
-
-       scan_pgs(changed_pgs);
       } else {
        dout(3) << "handle_osd_map hmm, i want a full map, requesting" << dendl;
        monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
        monc->renew_subs();
       }
     }
+  }
 
-    // kick requests who might be timing out on the wrong osds
-    if (!changed_pgs.empty())
-      kick_requests(changed_pgs);
+  if (was_pauserd || was_pausewr || was_full)
+    maybe_request_map();
+  
+  bool kick_paused =
+    (was_pauserd && !osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) ||
+    (was_pausewr && !osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) ||
+    (was_full && !osdmap->test_flag(CEPH_OSDMAP_FULL));
+  
+  // osd addr changes?
+  for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
+       p != osd_sessions.end();
+       p++) {
+    OSDSession *s = p->second;
+    if (osdmap->is_up(s->osd)) {
+      entity_inst_t inst = osdmap->get_inst(s->osd);
+      if (s->con->get_peer_addr() != osdmap->get_inst(s->osd).addr) {
+       dout(10) << " reopening osd" << s->osd << " session, addr now " << inst << dendl;
+       messenger->mark_down(s->con);
+       s->con->put();
+       s->con = messenger->get_connection(inst);
+       s->incarnation++;
+      }
+    }
+  }
+
+  // active requests
+  for (hash_map<tid_t,Op*>::iterator p = op_osd.begin();
+       p != op_osd.end();
+       p++) {
+    Op *op = p->second;
+
+    // calc target
+    vector<int> acting;
+    if (op->oid.name.length())
+      op->pgid = osdmap->object_locator_to_pg(op->oid, op->oloc);
+    osdmap->pg_to_acting_osds(op->pgid, acting);
+    OSDSession *s = 0;
+
+    if (acting.size())
+      s = get_session(acting[0]);
+    if (op->session != s) {
+      dout(10) << " redirecting tid " << op->tid << " to osd" << (s ? s->osd : -1) << dendl;
+      if (!op->session)
+       num_homeless_ops--;
+      op->session_item.remove_myself();
+      op->session = s;
+      if (s) {
+       op->session->ops.push_back(&op->session_item);
+       send_op(op);
+      } else {
+       num_homeless_ops++;
+      }
+    } else if (s) {
+      if (op->incarnation != s->incarnation) {
+       dout(10) << " resending tid " << op->tid << " to (reopened) osd" << s->osd << dendl;
+       send_op(op);
+      } else if (op->paused && kick_paused) {
+       dout(10) << " kicking paused tid " << op->tid << " on osd" << s->osd << dendl;
+       send_op(op);
+      }
+    }
   }
 
-  //now check if the map is full -- we want to subscribe if it is!
+  // lingers
+  for (map<uint64_t,LingerOp*>::iterator p = op_linger_info.begin();
+       p != op_linger_info.end();
+       p++) {
+    LingerOp *op = p->second;
+    vector<int> acting;
+    pg_t pgid = osdmap->object_locator_to_pg(op->oid, op->oloc);
+    osdmap->pg_to_acting_osds(pgid, acting);
+    OSDSession *s = acting.size() ? get_session(acting[0]) : NULL;
+    if (op->session != s) {
+      dout(10) << " redirecting linger id " << op->linger_id << " to osd" << (s ? s->osd : -1) << dendl;
+      op->session_item.remove_myself();
+      op->session = s;
+      if (s) {
+       send_linger(op);
+      }
+    }
+  }
+  
+  dump_active();
+
+  // now check if the map is full -- we want to subscribe if it is!
   if (osdmap->test_flag(CEPH_OSDMAP_FULL) & CEPH_OSDMAP_FULL)
     maybe_request_map();
   
-  //finish any Contexts that were waiting on a map update
+  // finish any Contexts that were waiting on a map update
   map<epoch_t,list< pair< Context*, int > > >::iterator p =
     waiting_for_map.begin();
   while (p != waiting_for_map.end() &&
@@ -316,123 +374,17 @@ void Objecter::maybe_request_map()
 }
 
 
-Objecter::PG &Objecter::get_pg(pg_t pgid)
-{
-  if (!pg_map.count(pgid)) {
-    osdmap->pg_to_acting_osds(pgid, pg_map[pgid].acting);
-    dout(10) << "get_pg " << pgid << " is new, " << pg_map[pgid].acting << dendl;
-  } else {
-    dout(10) << "get_pg " << pgid << " is old, " << pg_map[pgid].acting << dendl;
-  }
-  return pg_map[pgid];
-}
-
-
-void Objecter::scan_pgs_for(set<pg_t>& pgs, int osd)
-{
-  dout(10) << "scan_pgs_for osd" << osd << dendl;
-
-  for (hash_map<pg_t,PG>::iterator i = pg_map.begin();
-       i != pg_map.end();
-       i++) {
-    pg_t pgid = i->first;
-    PG& pg = i->second;
-    if (pg.acting.size() && pg.acting[0] == osd)
-      pgs.insert(pgid);
-  }
-}
-
-void Objecter::scan_pgs(set<pg_t>& changed_pgs)
+void Objecter::kick_requests(OSDSession *session)
 {
-  dout(10) << "scan_pgs" << dendl;
-
-  for (hash_map<pg_t,PG>::iterator i = pg_map.begin();
-       i != pg_map.end();
-       i++) {
-    pg_t pgid = i->first;
-    PG& pg = i->second;
-    
-    // calc new.
-    vector<int> other;
-    osdmap->pg_to_acting_osds(pgid, other);
-
-    if (other == pg.acting) 
-      continue; // no change.
+  dout(10) << "kick_requests for osd" << session->osd << dendl;
 
-    dout(10) << "scan_pgs " << pgid << " " << pg.acting << " -> " << other << dendl;
-    
-    other.swap(pg.acting);
-
-    if (other.size() && pg.acting.size() &&
-       other[0] == pg.acting[0])
-      continue;  // same primary.
-
-    // changed significantly.
-    dout(10) << "scan_pgs pg " << pgid 
-             << " (" << pg.active_tids << ")"
-             << " " << other << " -> " << pg.acting
-             << dendl;
-    changed_pgs.insert(pgid);
-  }
-}
+  // resend ops
+  for (xlist<Op*>::iterator p = session->ops.begin(); !p.end(); ++p)
+    send_op(*p);
 
-void Objecter::kick_requests(set<pg_t>& changed_pgs) 
-{
-  dout(10) << "kick_requests in pgs " << changed_pgs << dendl;
-  dout(0) << "*** kick_requests in pgs " << changed_pgs << dendl;
-
-  for (set<pg_t>::iterator i = changed_pgs.begin();
-       i != changed_pgs.end();
-       i++) {
-    pg_t pgid = *i;
-    PG& pg = pg_map[pgid];
-
-    // resubmit ops!
-    set<tid_t> tids;
-    tids.swap( pg.active_tids );
-
-    // resend lingers
-    for (xlist<LingerOp*>::iterator j = pg.linger_ops.begin(); !j.end(); ++j)
-      resend_linger(*j);
-
-    if (pg.linger_ops.empty())
-      close_pg( pgid );  // will pbly reopen, unless it's just commits we're missing
-
-    dout(10) << "kick_requests pg " << pgid << " tids " << tids << dendl;
-    for (set<tid_t>::iterator p = tids.begin();
-         p != tids.end();
-         p++) {
-      tid_t tid = *p;
-      
-      hash_map<tid_t, Op*>::iterator p = op_osd.find(tid);
-      if (p != op_osd.end()) {
-       Op *op = p->second;
-       put_op_budget(op);
-       op_osd.erase(p);
-
-       if (op->onack)
-         num_unacked--;
-       if (op->oncommit)
-         num_uncommitted--;
-       
-        dout(0) << "kick_requests tid=" << tid << " linger=" << op->linger << dendl;
-        // WRITE
-       if (op->onack) {
-          dout(3) << "kick_requests missing ack, resub " << tid << dendl;
-          op_submit(op, false);
-        } else {
-          if (!op->linger) {
-           assert(op->oncommit);
-           dout(3) << "kick_requests missing commit, resub " << tid << dendl;
-           dout(0) << "kick_requests missing commit, resub " << tid << dendl;
-           op_submit(op, false);
-          }
-        }
-      } else
-          assert(0);
-    }
-  }
-  dout(0) << "*** kick_requests done" << dendl;
+  // resend lingers
+  for (xlist<LingerOp*>::iterator j = session->linger_ops.begin(); !j.end(); ++j)
+    send_linger(*j);
 }
 
 
@@ -440,31 +392,34 @@ void Objecter::tick()
 {
   dout(10) << "tick" << dendl;
 
-  set<int> ping;
+  set<OSDSession*> toping;
 
-  // look for laggy pgs
+  // look for laggy requests
   utime_t cutoff = g_clock.now();
   cutoff -= g_conf.objecter_timeout;  // timeout
-  for (hash_map<pg_t,PG>::iterator i = pg_map.begin();
-       i != pg_map.end();
-       i++) {
-    if (!i->second.active_tids.empty() &&
-       i->second.last < cutoff) {
-      dout(1) << " pg " << i->first << " on " << i->second.acting
-             << " is laggy: " << i->second.active_tids << dendl;
-      maybe_request_map();
-      //break;
-
-      // send a ping to this osd, to ensure we detect any session resets
-      // (osd reply message policy is lossy)
-      if (i->second.acting.size())
-       ping.insert(i->second.acting[0]);
+
+  for (hash_map<tid_t,Op*>::iterator p = op_osd.begin();
+       p != op_osd.end();
+       p++) {
+    Op *op = p->second;
+    if (op->session && op->stamp < cutoff) {
+      dout(2) << " tid " << p->first << " on osd" << op->session->osd << " is laggy" << dendl;
+      toping.insert(op->session);
     }
   }
 
-  for (set<int>::iterator p = ping.begin(); p != ping.end(); p++)
-    messenger->send_message(new MPing, osdmap->get_inst(*p));
+  if (num_homeless_ops || !toping.empty())
+    maybe_request_map();
 
+  if (!toping.empty()) {
+    // send a ping to these osds, to ensure we detect any session resets
+    // (osd reply message policy is lossy)
+    for (set<OSDSession*>::iterator i = toping.begin();
+        i != toping.end();
+        i++)
+      messenger->send_message(new MPing, osdmap->get_inst((*i)->osd));
+  }
+    
   // reschedule
   timer.add_event_after(g_conf.objecter_tick_interval, new C_Tick(this));
 }
@@ -501,19 +456,26 @@ tid_t Objecter::op_submit(Op *op, LingerOp *linger_op)
   // take_op_budget() may drop our lock while it blocks.
   take_op_budget(op);
 
-  if (op->oid.name.length())
-    op->pgid = osdmap->object_locator_to_pg(op->oid, op->oloc);
-
-  // find
-  PG &pg = get_pg(op->pgid);
-
-  if (linger_op)
-    pg.linger_ops.push_back(&linger_op->pg_item);
-    
   // pick tid
   op->tid = ++last_tid;
   assert(client_inc >= 0);
 
+  // pick target
+  vector<int> acting;
+  if (op->oid.name.length())
+    op->pgid = osdmap->object_locator_to_pg(op->oid, op->oloc);
+  osdmap->pg_to_acting_osds(op->pgid, acting);
+
+  if (acting.size()) {
+    op->session = get_session(acting[0]);
+    op->session->ops.push_back(&op->session_item);
+    if (linger_op)
+      op->session->linger_ops.push_back(&linger_op->session_item);
+  } else {
+    op->session = NULL;
+    num_homeless_ops++;
+  }
+    
   // add to gather set(s)
   int flags = op->flags;
   if (op->onack) {
@@ -530,19 +492,11 @@ tid_t Objecter::op_submit(Op *op, LingerOp *linger_op)
   }
   op_osd[op->tid] = op;
 
-  pg.active_tids.insert(op->tid);
-  pg.last = g_clock.now();
-
   // send?
-  dout(0) << "op_submit oid " << op->oid
-           << " " << op->oloc 
-          << " " << op->ops << " tid " << op->tid
-           << " osd" << pg.primary()
-           << dendl;
   dout(10) << "op_submit oid " << op->oid
            << " " << op->oloc 
           << " " << op->ops << " tid " << op->tid
-           << " osd" << pg.primary()
+           << " osd" << (op->session ? op->session->osd : -1)
            << dendl;
 
   assert(op->flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
@@ -562,56 +516,69 @@ tid_t Objecter::op_submit(Op *op, LingerOp *linger_op)
     dout(10) << " FULL, paused modify " << op << " tid " << last_tid << dendl;
     op->paused = true;
     maybe_request_map();
-  } else if (pg.primary() >= 0) {
-    int flags = op->flags;
-    if (op->oncommit)
-      flags |= CEPH_OSD_FLAG_ONDISK;
-    if (op->onack)
-      flags |= CEPH_OSD_FLAG_ACK;
+  } else if (op->session) {
+    send_op(op);
+  } else 
+    maybe_request_map();
+  
+  dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl;
+  
+  return op->tid;
+}
 
-    if (op->con) {
-      if (op->outbl && op->outbl->length()) {
-       dout(20) << " revoking rx buffer for " << op->tid << " on " << op->con << dendl;
-       op->con->revoke_rx_buffer(op->tid);
-      }
-      op->con->put();
-    }
-    op->con = messenger->get_connection(osdmap->get_inst(pg.primary()));
-    assert(op->con);
-    if (op->outbl && op->outbl->length()) {
-      dout(20) << " posting rx buffer for " << op->tid << " on " << op->con << dendl;
-      op->con->post_rx_buffer(op->tid, *op->outbl);
-    }
+void Objecter::send_op(Op *op)
+{
+  dout(15) << "send_op " << op->tid << " to osd" << op->session->osd << dendl;
+
+  int flags = op->flags;
+  if (op->oncommit)
+    flags |= CEPH_OSD_FLAG_ONDISK;
+  if (op->onack)
+    flags |= CEPH_OSD_FLAG_ACK;
+
+  if (!op->session->con)
+    op->session->con = messenger->get_connection(osdmap->get_inst(op->session->osd));
+  assert(op->session->con);
 
-    ceph_object_layout ol;
-    ol.ol_pgid = op->pgid.v;
-    ol.ol_stripe_unit = 0;
+  // preallocated rx buffer?
+  if (op->con) {
+    dout(20) << " revoking rx buffer for " << op->tid << " on " << op->con << dendl;
+    op->con->revoke_rx_buffer(op->tid);
+    op->con->put();
+  }
+  if (op->outbl && op->outbl->length()) {
+    dout(20) << " posting rx buffer for " << op->tid << " on " << op->session->con << dendl;
+    op->con = op->session->con->get();
+    op->con->post_rx_buffer(op->tid, *op->outbl);
+  }
 
-    MOSDOp *m = new MOSDOp(client_inc, op->tid,
-                          op->oid, ol, osdmap->get_epoch(),
-                          flags);
+  op->paused = false;
+  op->incarnation = op->session->incarnation;
+  op->stamp = g_clock.now();
 
-    m->set_snapid(op->snapid);
-    m->set_snap_seq(op->snapc.seq);
-    m->get_snaps() = op->snapc.snaps;
+  ceph_object_layout ol;
+  ol.ol_pgid = op->pgid.v;
+  ol.ol_stripe_unit = 0;
 
-    m->ops = op->ops;
-    m->set_mtime(op->mtime);
-    m->set_retry_attempt(op->attempts++);
+  MOSDOp *m = new MOSDOp(client_inc, op->tid,
+                        op->oid, ol, osdmap->get_epoch(),
+                        flags);
 
-    if (op->version != eversion_t())
-      m->set_version(op->version);  // we're replaying this op!
+  m->set_snapid(op->snapid);
+  m->set_snap_seq(op->snapc.seq);
+  m->get_snaps() = op->snapc.snaps;
 
-    if (op->priority)
-      m->set_priority(op->priority);
+  m->ops = op->ops;
+  m->set_mtime(op->mtime);
+  m->set_retry_attempt(op->attempts++);
 
-    messenger->send_message(m, op->con);
-  } else 
-    maybe_request_map();
-  
-  dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl;
-  
-  return op->tid;
+  if (op->version != eversion_t())
+    m->set_version(op->version);  // we're replaying this op!
+
+  if (op->priority)
+    m->set_priority(op->priority);
+
+  messenger->send_message(m, op->session->con);
 }
 
 int Objecter::calc_op_budget(Op *op)
@@ -664,11 +631,11 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
          << (m->is_ondisk() ? " ondisk":(m->is_onnvram() ? " onnvram":" ack"))
          << " v " << m->get_version() << " in " << m->get_pg()
          << dendl;
-  Op *op = op_osd[ tid ];
+  Op *op = op_osd[tid];
 
-  if (op->con != m->get_connection()) {
+  if (op->session->con != m->get_connection()) {
     dout(7) << " ignoring reply from " << m->get_source_inst()
-           << ", i last sent to " << op->con->get_peer_addr() << dendl;
+           << ", i last sent to " << op->session->con->get_peer_addr() << dendl;
     m->put();
     return;
   }
@@ -694,7 +661,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
 
   // got data?
   if (op->outbl) {
-    if (op->outbl->length())
+    if (op->con)
       op->con->revoke_rx_buffer(op->tid);
     m->claim_data(*op->outbl);
     op->outbl = 0;
@@ -717,15 +684,10 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
 
   // done with this tid?
   if (!op->onack && !op->oncommit) {
-    PG &pg = get_pg( m->get_pg() );
-    assert(pg.active_tids.count(tid));
-    pg.active_tids.erase(tid);
-    dout(15) << "handle_osd_op_reply completed tid " << tid << ", pg " << m->get_pg()
-            << " still has " << pg.active_tids << dendl;
-    if (pg.active_tids.empty() && pg.linger_ops.empty()) 
-      close_pg( m->get_pg() );
+    op->session_item.remove_myself();
+    dout(15) << "handle_osd_op_reply completed tid " << tid << dendl;
     put_op_budget(op);
-    op_osd.erase( tid );
+    op_osd.erase(tid);
     if (op->con)
       op->con->put();
     delete op;
@@ -796,7 +758,7 @@ void Objecter::list_objects(ListContext *list_context, Context *onfinish) {
   object_locator_t oloc(list_context->pool_id);
 
   // 
-  Op *o = new Op(oid, oloc, op.ops, CEPH_OSD_FLAG_READ, onack, NULL, NULL, false);
+  Op *o = new Op(oid, oloc, op.ops, CEPH_OSD_FLAG_READ, onack, NULL, NULL);
   o->priority = op.priority;
   o->snapid = list_context->pool_snap_seq;
   o->outbl = bl;
@@ -1266,10 +1228,15 @@ void Objecter::ms_handle_reset(Connection *con)
     int osd = osdmap->identify_osd(con->get_peer_addr());
     if (osd >= 0) {
       dout(1) << "ms_handle_reset on osd" << osd << dendl;
-      set<pg_t> changed_pgs;
-      scan_pgs_for(changed_pgs, osd);
-      kick_requests(changed_pgs);
-      maybe_request_map();
+      map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
+      if (p != osd_sessions.end()) {
+       OSDSession *session = p->second;
+       messenger->mark_down(session->con->get_peer_addr());
+       session->con->put();
+       session->con = NULL;
+       kick_requests(session);
+       maybe_request_map();
+      }
     } else {
       dout(10) << "ms_handle_reset on unknown osd addr " << con->get_peer_addr() << dendl;
     }
index fa2da08c22aea5a2cf2f3d1fa454b944b6b3b935..262c9386837233cbae3370375e26910df3667e8b 100644 (file)
@@ -244,8 +244,12 @@ public:
   // read
  public:
 
+  class OSDSession;
+
   struct Op {
+    OSDSession *session;
     xlist<Op*>::item session_item;
+    int incarnation;
 
     object_t oid;
     object_locator_t oloc;
@@ -272,16 +276,16 @@ public:
 
     eversion_t *objver;
 
-    bool linger;
+    utime_t stamp;
 
     Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
-       int f, Context *ac, Context *co, eversion_t *ov, bool ln = false) :
-      session_item(this),
+       int f, Context *ac, Context *co, eversion_t *ov) :
+      session(NULL), session_item(this), incarnation(0),
       oid(o), oloc(ol),
       con(NULL),
       snapid(CEPH_NOSNAP), outbl(0), flags(f), priority(0), onack(ac), oncommit(co), 
       tid(0), attempts(0),
-      paused(false), objver(ov), linger(ln) {
+      paused(false), objver(ov) {
       ops.swap(op);
     }
   };
@@ -394,55 +398,8 @@ public:
               auid(0), crush_rule(0), snapid(0), blp(NULL) {}
   };
 
-  // -- osd sessions --
-  struct Session {
-    xlist<Op*> ops;
-    int osd;
-    int incarnation;
-  };
-  map<int,Session*> sessions;
-
-
- private:
-  // pending ops
-  hash_map<tid_t,Op*>       op_osd;
-  map<tid_t,PoolStatOp*>    op_poolstat;
-  map<tid_t,StatfsOp*>      op_statfs;
-  map<tid_t,PoolOp*>        op_pool;
-
-  map<epoch_t,list< pair<Context*, int> > > waiting_for_map;
-
-  /**
-   * track pending ops by pg
-   *  ...so we can cope with failures, map changes
-   */
-  class LingerOp;
-
-  class PG {
-  public:
-    vector<int> acting;
-    set<tid_t>  active_tids; // active ops
-    utime_t last;
-    xlist<LingerOp*> linger_ops;
-
-    PG() {}
-    
-    // primary - where i write
-    int primary() {
-      if (acting.empty()) return -1;
-      return acting[0];
-    }
-    // acker - where i read, and receive acks from
-    int acker() {
-      if (acting.empty()) return -1;
-      return acting[0];
-    }
-  };
-
-  hash_map<pg_t,PG> pg_map;
 
-  // ---
-  // lingering ops
+  // -- lingering ops --
 
   struct LingerOp {
     uint64_t linger_id;
@@ -458,20 +415,18 @@ public:
     bool registered;
     Context *on_reg_ack, *on_reg_commit;
 
-    PG *pg;
-    xlist<LingerOp*>::item pg_item;
+    OSDSession *session;
+    xlist<LingerOp*>::item session_item;
 
     LingerOp() : linger_id(0), flags(0), poutbl(NULL), pobjver(NULL),
                 registered(false), on_reg_ack(NULL), on_reg_commit(NULL),
-                pg(NULL), pg_item(this) {}
+                session(NULL), session_item(this) {}
 
     // no copy!
     const LingerOp &operator=(const LingerOp& r);
     LingerOp(const LingerOp& o);
   };
 
-  map<uint64_t, LingerOp*>  op_linger_info;
-
   struct C_Linger_Ack : public Context {
     Objecter *objecter;
     LingerOp *info;
@@ -490,16 +445,41 @@ public:
     }
   };
 
-  
-  PG &get_pg(pg_t pgid);
-  void close_pg(pg_t pgid) {
-    assert(pg_map.count(pgid));
-    assert(pg_map[pgid].active_tids.empty());
-    pg_map.erase(pgid);
+
+  // -- osd sessions --
+  struct OSDSession {
+    xlist<Op*> ops;
+    xlist<LingerOp*> linger_ops;
+    int osd;
+    int incarnation;
+    Connection *con;
+
+    OSDSession(int o) : osd(o), incarnation(0), con(NULL) {}
+  };
+  map<int,OSDSession*> osd_sessions;
+
+  OSDSession *get_session(int osd) {
+    map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
+    if (p != osd_sessions.end())
+      return p->second;
+    OSDSession *s = new OSDSession(osd);
+    osd_sessions[osd] = s;
+    return s;
   }
-  void scan_pgs(set<pg_t>& changed_pgs);
-  void scan_pgs_for(set<pg_t>& changed_pgs, int osd);
-  void kick_requests(set<pg_t>& changed_pgs);
+
+ private:
+  // pending ops
+  hash_map<tid_t,Op*>       op_osd;
+  int                       num_homeless_ops;
+  map<uint64_t, LingerOp*>  op_linger_info;
+  map<tid_t,PoolStatOp*>    op_poolstat;
+  map<tid_t,StatfsOp*>      op_statfs;
+  map<tid_t,PoolOp*>        op_pool;
+
+  map<epoch_t,list< pair<Context*, int> > > waiting_for_map;
+
+  void send_op(Op *op);
+  void kick_requests(OSDSession *session);
 
   void _list_reply(ListContext *list_context, bufferlist *bl, Context *final_finish);
 
@@ -535,6 +515,7 @@ public:
     last_seen_osdmap_version(0),
     last_seen_pgmap_version(0),
     client_lock(l), timer(t),
+    num_homeless_ops(0),
     op_throttler(g_conf.objecter_inflight_op_bytes)
   { }
   ~Objecter() { }
@@ -566,7 +547,7 @@ private:
   // low-level
   tid_t op_submit(Op *op, LingerOp *linger_op = NULL);
 
-  tid_t resend_linger(LingerOp *info);
+  tid_t send_linger(LingerOp *info);
   void _linger_ack(LingerOp *info, int r);
   void _linger_commit(LingerOp *info, int r);
 
@@ -756,7 +737,7 @@ private:
                const SnapContext& snapc, int flags,
                Context *onack, Context *oncommit,
                eversion_t *objver = NULL) {
-    Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, false);
+    Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     return op_submit(o);