]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Merge branch 'msgr' into unstable
authorSage Weil <sage@newdream.net>
Fri, 23 Apr 2010 21:22:04 +0000 (14:22 -0700)
committerSage Weil <sage@newdream.net>
Fri, 23 Apr 2010 21:22:04 +0000 (14:22 -0700)
Conflicts:
src/TODO
src/mds/Server.cc
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h
src/osd/OSD.cc

27 files changed:
1  2 
src/Makefile.am
src/TODO
src/ceph.cc
src/mds/Locker.cc
src/mds/MDCache.cc
src/mds/MDCache.h
src/mds/MDS.cc
src/mds/MDS.h
src/mds/MDSTableClient.cc
src/mds/Migrator.cc
src/mds/Server.cc
src/messages/MGetPoolStats.h
src/messages/MOSDOp.h
src/mon/AuthMonitor.cc
src/mon/MDSMonitor.cc
src/mon/MonClient.cc
src/mon/Monitor.cc
src/mon/OSDMonitor.cc
src/mon/PGMonitor.cc
src/msg/Message.h
src/msg/Messenger.h
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h
src/osd/OSD.cc
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osdc/Objecter.cc

diff --cc src/Makefile.am
Simple merge
diff --cc src/TODO
Simple merge
diff --cc src/ceph.cc
Simple merge
Simple merge
Simple merge
Simple merge
diff --cc src/mds/MDS.cc
index 54979c614d5a15c818353e8285708cf3ad154eee,a4c748758ca6fae8bde63aca85314a72efe2f06a..d928739ef28685f75aceff369767db965782ad22
@@@ -580,12 -581,9 +590,12 @@@ void MDS::handle_mds_beacon(MMDSBeacon 
      }
      
      reset_beacon_killer();
 +  } else {
 +    dout(10) << "handle_mds_beacon " << ceph_mds_state_name(m->get_state())
 +           << " seq " << m->get_seq() << " dne" << dendl;
    }
  
-   delete m;
+   m->put();
  }
  
  void MDS::reset_beacon_killer()
diff --cc src/mds/MDS.h
Simple merge
Simple merge
index 2864bad7731da17469e7647c30fc8b9a7632bb94,48f533011864066e1788cea846c473b8283f265b..8b194feb4a748fb3571167253416d7d9a6b4bf59
@@@ -918,9 -918,9 +918,9 @@@ void Migrator::finish_export_inode_caps
                                     cap->get_cap_id(), cap->get_last_seq(), 
                                     cap->pending(), cap->wanted(), 0,
                                     cap->get_mseq());
-     mds->send_message_client(m, it->first);
+     mds->send_message_client_counted(m, it->first);
    }
 -  in->clear_client_caps();
 +  in->clear_client_caps_after_export();
  }
  
  void Migrator::finish_export_inode(CInode *in, utime_t now, list<Context*>& finished)
index 3555bf7698c5f21a362b7a9a924cf572e6e6d1b1,9195a845df99050a607e2f62d4c997721925832e..f26e8953281a599ffe095bc55d68a5e5a9048675
@@@ -527,15 -501,24 +527,15 @@@ void Server::handle_client_reconnect(MC
    delay -= reconnect_start;
    dout(10) << " reconnect_start " << reconnect_start << " delay " << delay << dendl;
  
 -  if (!mds->is_reconnect() || session->is_closed()) {
 -    if (!mds->is_reconnect()) {
 -      // XXX maybe in the future we can do better than this?
 -      dout(1) << " no longer in reconnect state, ignoring reconnect, sending close" << dendl;
 -      ss << "denied reconnect attempt (mds is " << ceph_mds_state_name(mds->get_state())
 -       << ") from " << m->get_source_inst();
 -    } else if (!session) {
 -      dout(1) << " no session for " << m->get_source() << ", ignoring reconnect, sending close" << dendl;
 -      ss << "denied reconnect attempt from " << m->get_source_inst() << " (no session)";
 -    } else if (session->is_closed()) {
 -      dout(1) << " no session for " << m->get_source() << ", ignoring reconnect, sending close" << dendl;
 -      ss << "denied reconnect attempt from " << m->get_source_inst() << " (session closed)";
 -    } else
 -      assert(0);
 -    ss << " after " << delay << " (allowed interval " << g_conf.mds_reconnect_timeout << ")";
 +  if (!mds->is_reconnect()) {
 +    // XXX maybe in the future we can do better than this?
 +    dout(1) << " no longer in reconnect state, ignoring reconnect, sending close" << dendl;
 +    ss << "denied reconnect attempt (mds is " << ceph_mds_state_name(mds->get_state())
 +       << ") from " << m->get_source_inst()
 +       << " after " << delay << " (allowed interval " << g_conf.mds_reconnect_timeout << ")";
      mds->logclient.log(LOG_INFO, ss);
-     mds->messenger->send_message(new MClientSession(CEPH_SESSION_CLOSE), m->get_source_inst());
-     delete m;
+     mds->messenger->send_message(new MClientSession(CEPH_SESSION_CLOSE), m->get_connection());
+     m->put();
      return;
    }
  
      // no need to respond to client: they're telling us they have no session
    } else {
      // notify client of success with an OPEN
-     mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), m->get_source_inst());
+     mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), m->get_connection());
      
 -    if (session->is_new()) {
 -      dout(10) << " session is new, will make best effort to reconnect " 
 +    if (session->is_closed()) {
 +      dout(10) << " session is closed, will make best effort to reconnect " 
               << m->get_source_inst() << dendl;
        mds->sessionmap.set_state(session, Session::STATE_OPENING);
        version_t pv = ++mds->sessionmap.projected;
@@@ -1007,14 -990,13 +1007,14 @@@ void Server::handle_client_request(MCli
      session = get_session(req);
      if (!session) {
        dout(5) << "no session for " << req->get_source() << ", dropping" << dendl;
-       delete req;
+       req->put();
        return;
      }
 -    if (session->is_closed() || session->is_closing() || session->is_stale_purging() ||
 -      session->is_stale_closing()) {
 -      dout(5) << "session closed|closing|stale_purging|stale_closing, dropping" << dendl;
 +    if (session->is_closed() ||
 +      session->is_closing() ||
 +      session->is_killing()) {
 +      dout(5) << "session closed|closing|killing, dropping" << dendl;
-       delete req;
+       req->put();
        return;
      }
    }
Simple merge
Simple merge
Simple merge
Simple merge
Simple merge
Simple merge
Simple merge
Simple merge
index 97bca7172f3983f2d151b520bf89573d8b419cf6,70cfdfab9b5bebe6352e8473a838e60eef9834c5..e166b027685f3e6504110603dd25b22e2574f196
@@@ -249,8 -244,9 +249,9 @@@ public
      header.data_off = 0;
      memset(&footer, 0, sizeof(footer));
    }
+ protected:
    virtual ~Message() { 
 -    assert(nref.test() == 0);
 +    assert(nref.read() == 0);
      if (connection)
        connection->put();
    }
Simple merge
Simple merge
index d0a520dcad411efa9a67ae7e8b601b849ed685e7,2a01336f3c3d291cf258e40029447dab0e0af044..36237f93f1ef7e6d84de6ecb51825382d492d109
@@@ -265,7 -268,10 +268,12 @@@ private
      
      void queue_received(Message *m) {
        m->set_recv_stamp(g_clock.now());
-       assert(m->nref.read() == 0);
 -      assert(m->nref.test() == 1); //this is just to make sure that a changeset
 -      //is working properly; if you start using the refcounting more and have multiple
 -      //people hanging on to a message, ditch the assert!
++
++      // this is just to make sure that a changeset is working
++      // properly; if you start using the refcounting more and have
++      // multiple people hanging on to a message, ditch the assert!
++      assert(m->nref.read() == 1); 
        queue_received(m, m->get_priority());
      }
  
diff --cc src/osd/OSD.cc
index cb317ef3283f5712e5bc9744e5ac98a61af01c6a,30e5f701f719c32b3a82e46eda2734828d7cffef..8dcf70b90384e832b6425fff81f740b849e93fff
@@@ -1765,8 -1764,9 +1765,8 @@@ void OSD::_dispatch(Message *m
  
      // -- don't need lock -- 
    case CEPH_MSG_PING:
 -    ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_ANY);
      dout(10) << "ping from " << m->get_source() << dendl;
-     delete m;
+     m->put();
      break;
  
      // -- don't need OSDMap --
  
      // osd
    case CEPH_MSG_SHUTDOWN:
 -    ALLOW_MESSAGES_FROM(CEPH_ENTITY_TYPE_MON);
 -    shutdown();
 +    session = (Session *)m->get_connection()->get_priv();
 +    if (!session ||
 +      session->caps.is_mon() ||
 +      session->caps.is_osd()) shutdown();
 +    else dout(0) << "shutdown message from connection with insufficient privs!"
 +               << m->get_connection() << dendl;
-     delete m;
-     if (session) session->put();
+     m->put();
++    if (session)
++      session->put();
      break;
  
    case MSG_PGSTATSACK:
  void OSD::handle_scrub(MOSDScrub *m)
  {
    dout(10) << "handle_scrub " << *m << dendl;
 -  
 +  if (!require_mon_peer(m))
 +    return;
    if (ceph_fsid_compare(&m->fsid, &monc->get_fsid())) {
      dout(0) << "handle_scrub fsid " << m->fsid << " != " << monc->get_fsid() << dendl;
-     delete m;
+     m->put();
      return;
    }
  
@@@ -1966,15 -1977,6 +1967,15 @@@ void OSD::handle_osd_map(MOSDMap *m
      return;
    }
  
-     delete m;
 +  Session *session = (Session *)m->get_connection()->get_priv();
 +  if (session && !(session->caps.is_mon() || session->caps.is_osd())) {
 +    //not enough perms!
++    m->put();
 +    session->put();
 +    return;
 +  }
 +  if(session) session->put();
 +
    if (osdmap) {
      dout(3) << "handle_osd_map epochs [" 
              << m->get_first() << "," << m->get_last() 
@@@ -2657,27 -2655,7 +2658,27 @@@ bool OSD::get_inc_map(epoch_t e, OSDMap
  
  
  
-     delete m;
 +bool OSD::require_mon_peer(Message *m)
 +{
 +  if (!m->get_connection()->peer_is_mon()) {
 +    dout(0) << "require_mon_peer received from non-mon " << m->get_connection()->get_peer_addr()
 +          << " " << *m << dendl;
++    m->put();
 +    return false;
 +  }
 +  return true;
 +}
  
-     delete m;
 +bool OSD::require_osd_peer(Message *m)
 +{
 +  if (!m->get_connection()->peer_is_osd()) {
 +    dout(0) << "require_osd_peer received from non-osd " << m->get_connection()->get_peer_addr()
 +          << " " << *m << dendl;
++    m->put();
 +    return false;
 +  }
 +  return true;
 +}
  
  bool OSD::require_current_map(Message *m, epoch_t ep) 
  {
@@@ -3057,9 -3036,9 +3058,9 @@@ void OSD::handle_pg_create(MOSDPGCreat
    do_infos(info_map);
  
    kick_pg_split_queue();
 -  if (to_peer.size())
 +  if (num_created)
      update_heartbeat_peers();
-   delete m;
+   m->put();
  }
  
  
Simple merge
Simple merge
Simple merge