From: Sage Weil Date: Mon, 21 Jul 2014 03:50:00 +0000 (-0700) Subject: client: let Objecter dispatch directly X-Git-Tag: v0.86~213^2~60 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=7262da3af4adcb9b8984e5e54e3b9d50539cb524;p=ceph.git client: let Objecter dispatch directly Add Objecter as a direct dispatcher. Drop all of the callbacks and messages we were passing along. Wrap the IO completions in client_lock (via C_Lock) and shunt them to the objecter_finisher. Signed-off-by: Sage Weil --- diff --git a/src/client/Client.cc b/src/client/Client.cc index b6285dd400da..659a6323be87 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -196,7 +196,8 @@ Client::Client(Messenger *m, MonClient *mc) objecter = new Objecter(cct, messenger, monclient, 0, 0); objecter->set_client_incarnation(0); // client always 0, for now. - writeback_handler = new ObjecterWriteback(objecter); + writeback_handler = new ObjecterWriteback(objecter, &objecter_finisher, + &client_lock); objectcacher = new ObjectCacher(cct, "libcephfs", *writeback_handler, client_lock, client_flush_set_callback, // all commit callback (void*)this, @@ -351,7 +352,9 @@ int Client::init() objectcacher->start(); objecter->init(); + // ok! + messenger->add_dispatcher_head(objecter); messenger->add_dispatcher_head(this); int r = monclient->init(); @@ -1928,18 +1931,6 @@ bool Client::ms_dispatch(Message *m) } switch (m->get_type()) { - // osd - case CEPH_MSG_OSD_OPREPLY: - objecter->handle_osd_op_reply((MOSDOpReply*)m); - break; - case CEPH_MSG_OSD_MAP: - objecter->handle_osd_map((class MOSDMap*)m); - break; - case CEPH_MSG_STATFS_REPLY: - objecter->handle_fs_stats_reply((MStatfsReply*)m); - break; - - // mounting and mds sessions case CEPH_MSG_MDS_MAP: handle_mds_map(static_cast(m)); @@ -9029,15 +9020,11 @@ int Client::get_local_osd() void Client::ms_handle_connect(Connection *con) { ldout(cct, 10) << "ms_handle_connect on " << con->get_peer_addr() << dendl; - Mutex::Locker l(client_lock); - objecter->ms_handle_connect(con); } bool Client::ms_handle_reset(Connection *con) { ldout(cct, 0) << "ms_handle_reset on " << con->get_peer_addr() << dendl; - Mutex::Locker l(client_lock); - objecter->ms_handle_reset(con); return false; } @@ -9046,10 +9033,6 @@ void Client::ms_handle_remote_reset(Connection *con) ldout(cct, 0) << "ms_handle_remote_reset on " << con->get_peer_addr() << dendl; Mutex::Locker l(client_lock); switch (con->get_peer_type()) { - case CEPH_ENTITY_TYPE_OSD: - objecter->ms_handle_remote_reset(con); - break; - case CEPH_ENTITY_TYPE_MDS: { // kludge to figure out which mds this is; fixme with a Connection* state diff --git a/src/client/ObjecterWriteback.h b/src/client/ObjecterWriteback.h index f2d903a59b71..b7e4bd257128 100644 --- a/src/client/ObjecterWriteback.h +++ b/src/client/ObjecterWriteback.h @@ -8,7 +8,10 @@ class ObjecterWriteback : public WritebackHandler { public: - ObjecterWriteback(Objecter *o) : m_objecter(o) {} + ObjecterWriteback(Objecter *o, Finisher *fin, Mutex *lock) + : m_objecter(o), + m_finisher(fin), + m_lock(lock) { } virtual ~ObjecterWriteback() {} virtual void read(const object_t& oid, const object_locator_t& oloc, @@ -16,10 +19,13 @@ class ObjecterWriteback : public WritebackHandler { bufferlist *pbl, uint64_t trunc_size, __u32 trunc_seq, Context *onfinish) { m_objecter->read_trunc(oid, oloc, off, len, snapid, pbl, 0, - trunc_size, trunc_seq, onfinish); + trunc_size, trunc_seq, + new C_OnFinisher(new C_Lock(m_lock, onfinish), + m_finisher)); } - virtual bool may_copy_on_write(const object_t& oid, uint64_t read_off, uint64_t read_len, snapid_t snapid) { + virtual bool may_copy_on_write(const object_t& oid, uint64_t read_off, + uint64_t read_len, snapid_t snapid) { return false; } @@ -28,16 +34,22 @@ class ObjecterWriteback : public WritebackHandler { const bufferlist &bl, utime_t mtime, uint64_t trunc_size, __u32 trunc_seq, Context *oncommit) { return m_objecter->write_trunc(oid, oloc, off, len, snapc, bl, mtime, 0, - trunc_size, trunc_seq, NULL, oncommit); + trunc_size, trunc_seq, NULL, + new C_OnFinisher(new C_Lock(m_lock, oncommit), + m_finisher)); } virtual ceph_tid_t lock(const object_t& oid, const object_locator_t& oloc, int op, int flags, Context *onack, Context *oncommit) { - return m_objecter->lock(oid, oloc, op, flags, onack, oncommit); + return m_objecter->lock(oid, oloc, op, flags, onack, + new C_OnFinisher(new C_Lock(m_lock, oncommit), + m_finisher)); } private: Objecter *m_objecter; + Finisher *m_finisher; + Mutex *m_lock; }; #endif