]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
client: let Objecter dispatch directly
authorSage Weil <sage@redhat.com>
Mon, 21 Jul 2014 03:50:00 +0000 (20:50 -0700)
committerJohn Spray <john.spray@redhat.com>
Mon, 25 Aug 2014 00:34:02 +0000 (01:34 +0100)
Add Objecter as a direct dispatcher.  Drop all of the callbacks and
messages we were passing along.  Wrap the IO completions in client_lock
(via C_Lock) and shunt them to the objecter_finisher.

Signed-off-by: Sage Weil <sage@redhat.com>
src/client/Client.cc
src/client/ObjecterWriteback.h

index b6285dd400da513f84bb46efc6d0caed6703a23e..659a6323be8764872b995ac0aa4c0c9b134f9dc4 100644 (file)
@@ -196,7 +196,8 @@ Client::Client(Messenger *m, MonClient *mc)
   objecter = new Objecter(cct, messenger, monclient,
                          0, 0);
   objecter->set_client_incarnation(0);  // client always 0, for now.
-  writeback_handler = new ObjecterWriteback(objecter);
+  writeback_handler = new ObjecterWriteback(objecter, &objecter_finisher,
+                                           &client_lock);
   objectcacher = new ObjectCacher(cct, "libcephfs", *writeback_handler, client_lock,
                                  client_flush_set_callback,    // all commit callback
                                  (void*)this,
@@ -351,7 +352,9 @@ int Client::init()
   objectcacher->start();
 
   objecter->init();
+
   // ok!
+  messenger->add_dispatcher_head(objecter);
   messenger->add_dispatcher_head(this);
 
   int r = monclient->init();
@@ -1928,18 +1931,6 @@ bool Client::ms_dispatch(Message *m)
   }
 
   switch (m->get_type()) {
-    // osd
-  case CEPH_MSG_OSD_OPREPLY:
-    objecter->handle_osd_op_reply((MOSDOpReply*)m);
-    break;
-  case CEPH_MSG_OSD_MAP:
-    objecter->handle_osd_map((class MOSDMap*)m);
-    break;
-  case CEPH_MSG_STATFS_REPLY:
-    objecter->handle_fs_stats_reply((MStatfsReply*)m);
-    break;
-
-    
     // mounting and mds sessions
   case CEPH_MSG_MDS_MAP:
     handle_mds_map(static_cast<MMDSMap*>(m));
@@ -9029,15 +9020,11 @@ int Client::get_local_osd()
 void Client::ms_handle_connect(Connection *con)
 {
   ldout(cct, 10) << "ms_handle_connect on " << con->get_peer_addr() << dendl;
-  Mutex::Locker l(client_lock);
-  objecter->ms_handle_connect(con);
 }
 
 bool Client::ms_handle_reset(Connection *con)
 {
   ldout(cct, 0) << "ms_handle_reset on " << con->get_peer_addr() << dendl;
-  Mutex::Locker l(client_lock);
-  objecter->ms_handle_reset(con);
   return false;
 }
 
@@ -9046,10 +9033,6 @@ void Client::ms_handle_remote_reset(Connection *con)
   ldout(cct, 0) << "ms_handle_remote_reset on " << con->get_peer_addr() << dendl;
   Mutex::Locker l(client_lock);
   switch (con->get_peer_type()) {
-  case CEPH_ENTITY_TYPE_OSD:
-    objecter->ms_handle_remote_reset(con);
-    break;
-
   case CEPH_ENTITY_TYPE_MDS:
     {
       // kludge to figure out which mds this is; fixme with a Connection* state
index f2d903a59b71f820c7185b4298156e586d48eb3c..b7e4bd2571285c0f206090764b9c1a2974a5d07b 100644 (file)
@@ -8,7 +8,10 @@
 
 class ObjecterWriteback : public WritebackHandler {
  public:
-  ObjecterWriteback(Objecter *o) : m_objecter(o) {}
+  ObjecterWriteback(Objecter *o, Finisher *fin, Mutex *lock)
+    : m_objecter(o),
+      m_finisher(fin),
+      m_lock(lock) { }
   virtual ~ObjecterWriteback() {}
 
   virtual void read(const object_t& oid, const object_locator_t& oloc,
@@ -16,10 +19,13 @@ class ObjecterWriteback : public WritebackHandler {
                    bufferlist *pbl, uint64_t trunc_size,  __u32 trunc_seq,
                    Context *onfinish) {
     m_objecter->read_trunc(oid, oloc, off, len, snapid, pbl, 0,
-                          trunc_size, trunc_seq, onfinish);
+                          trunc_size, trunc_seq,
+                          new C_OnFinisher(new C_Lock(m_lock, onfinish),
+                                           m_finisher));
   }
 
-  virtual bool may_copy_on_write(const object_t& oid, uint64_t read_off, uint64_t read_len, snapid_t snapid) {
+  virtual bool may_copy_on_write(const object_t& oid, uint64_t read_off,
+                                uint64_t read_len, snapid_t snapid) {
     return false;
   }
 
@@ -28,16 +34,22 @@ class ObjecterWriteback : public WritebackHandler {
                      const bufferlist &bl, utime_t mtime, uint64_t trunc_size,
                      __u32 trunc_seq, Context *oncommit) {
     return m_objecter->write_trunc(oid, oloc, off, len, snapc, bl, mtime, 0,
-                                  trunc_size, trunc_seq, NULL, oncommit);
+                                  trunc_size, trunc_seq, NULL,
+                                  new C_OnFinisher(new C_Lock(m_lock, oncommit),
+                                                   m_finisher));
   }
 
   virtual ceph_tid_t lock(const object_t& oid, const object_locator_t& oloc, int op,
                     int flags, Context *onack, Context *oncommit) {
-    return m_objecter->lock(oid, oloc, op, flags, onack, oncommit);
+    return m_objecter->lock(oid, oloc, op, flags, onack,
+                           new C_OnFinisher(new C_Lock(m_lock, oncommit),
+                                            m_finisher));
   }
 
  private:
   Objecter *m_objecter;
+  Finisher *m_finisher;
+  Mutex *m_lock;
 };
 
 #endif