From: John Spray Date: Thu, 30 Mar 2017 14:06:12 +0000 (-0400) Subject: client: enable using external Objecter X-Git-Tag: v12.0.3~79^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=bff87c555508addcf73af92e1d1a3752e802af57;p=ceph.git client: enable using external Objecter To enable places that already have an objecter (such as ceph-mgr) to avoid spinning up another one inside Client. This will also be a path to re-using the librados objecter in multiple instances of libcephfs in the same process, a la nfs-ganesha. Signed-off-by: John Spray --- diff --git a/src/ceph_fuse.cc b/src/ceph_fuse.cc index c8018256246f..faa949f9609f 100644 --- a/src/ceph_fuse.cc +++ b/src/ceph_fuse.cc @@ -194,7 +194,7 @@ int main(int argc, const char **argv, const char *envp[]) { // get monmap Messenger *messenger = NULL; - Client *client; + StandaloneClient *client; CephFuse *cfuse; UserPerm perms; int tester_r = 0; @@ -213,7 +213,7 @@ int main(int argc, const char **argv, const char *envp[]) { messenger->set_policy(entity_name_t::TYPE_MDS, Messenger::Policy::lossless_client(0)); - client = new Client(messenger, mc); + client = new StandaloneClient(messenger, mc); if (filer_flags) { client->set_filer_flags(filer_flags); } diff --git a/src/ceph_syn.cc b/src/ceph_syn.cc index d3ed16a1295a..a8cbba4de2ee 100644 --- a/src/ceph_syn.cc +++ b/src/ceph_syn.cc @@ -67,7 +67,7 @@ int main(int argc, const char **argv, char *envp[]) messengers[i]->bind(g_conf->public_addr); mclients[i] = new MonClient(g_ceph_context); mclients[i]->build_initial_monmap(); - Client *client = new Client(messengers[i], mclients[i]); + auto client = new StandaloneClient(messengers[i], mclients[i]); client->set_filer_flags(syn_filer_flags); SyntheticClient *syn = new SyntheticClient(client); clients.push_back(client); diff --git a/src/client/Client.cc b/src/client/Client.cc index 0cd9d66d2843..9273812b836e 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -226,9 +226,8 @@ vinodeno_t Client::map_faked_ino(ino_t ino) // cons/des -Client::Client(Messenger *m, MonClient *mc) +Client::Client(Messenger *m, MonClient *mc, Objecter *objecter_) : Dispatcher(m->cct), - logger(NULL), m_command_hook(this), timer(m->cct, client_lock), callback_handle(NULL), @@ -246,17 +245,16 @@ Client::Client(Messenger *m, MonClient *mc) remount_finisher(m->cct), objecter_finisher(m->cct), tick_event(NULL), - monclient(mc), messenger(m), whoami(mc->get_global_id()), - cap_epoch_barrier(0), + messenger(m), monclient(mc), + objecter(objecter_), + whoami(mc->get_global_id()), cap_epoch_barrier(0), last_tid(0), oldest_tid(0), last_flush_tid(1), - initialized(false), authenticated(false), + initialized(false), mounted(false), unmounting(false), local_osd(-1), local_osd_epoch(0), unsafe_sync_write(0), client_lock("Client::client_lock") { - monclient->set_messenger(m); - _reset_faked_inos(); // root = 0; @@ -279,14 +277,12 @@ Client::Client(Messenger *m, MonClient *mc) // file handles free_fd_set.insert(10, 1<<30); - // osd interfaces mdsmap.reset(new MDSMap); - objecter = new Objecter(cct, messenger, monclient, NULL, - 0, 0); - objecter->set_client_incarnation(0); // client always 0, for now. - writeback_handler = new ObjecterWriteback(objecter, &objecter_finisher, - &client_lock); - objectcacher = new ObjectCacher(cct, "libcephfs", *writeback_handler, client_lock, + + // osd interfaces + writeback_handler.reset(new ObjecterWriteback(objecter, &objecter_finisher, + &client_lock)); + objectcacher.reset(new ObjectCacher(cct, "libcephfs", *writeback_handler, client_lock, client_flush_set_callback, // all commit callback (void*)this, cct->_conf->client_oc_size, @@ -294,9 +290,9 @@ Client::Client(Messenger *m, MonClient *mc) cct->_conf->client_oc_max_dirty, cct->_conf->client_oc_target_dirty, cct->_conf->client_oc_max_dirty_age, - true); + true)); objecter_finisher.start(); - filer = new Filer(objecter, &objecter_finisher); + filer.reset(new Filer(objecter, &objecter_finisher)); } @@ -305,14 +301,6 @@ Client::~Client() assert(!client_lock.is_locked()); tear_down_cache(); - - delete objectcacher; - delete writeback_handler; - - delete filer; - delete objecter; - - delete logger; } void Client::tear_down_cache() @@ -467,35 +455,27 @@ int Client::init() { timer.init(); objectcacher->start(); - objecter->init(); client_lock.Lock(); assert(!initialized); - // ok! - messenger->add_dispatcher_tail(objecter); messenger->add_dispatcher_tail(this); + client_lock.Unlock(); - monclient->set_want_keys(CEPH_ENTITY_TYPE_MDS | CEPH_ENTITY_TYPE_OSD); - int r = monclient->init(); - if (r < 0) { - // need to do cleanup because we're in an intermediate init state - timer.shutdown(); - client_lock.Unlock(); - objecter->shutdown(); - objectcacher->stop(); - monclient->shutdown(); - return r; - } - objecter->start(); + _finish_init(); + return 0; +} +void Client::_finish_init() +{ + client_lock.Lock(); // logger PerfCountersBuilder plb(cct, "client", l_c_first, l_c_last); plb.add_time_avg(l_c_reply, "reply", "Latency of receiving a reply on metadata request"); plb.add_time_avg(l_c_lat, "lat", "Latency of processing a metadata request"); plb.add_time_avg(l_c_wrlat, "wrlat", "Latency of a file data write operation"); - logger = plb.create_perf_counters(); - cct->get_perfcounters_collection()->add(logger); + logger.reset(plb.create_perf_counters()); + cct->get_perfcounters_collection()->add(logger.get()); client_lock.Unlock(); @@ -546,7 +526,6 @@ int Client::init() client_lock.Lock(); initialized = true; client_lock.Unlock(); - return r; } void Client::shutdown() @@ -600,22 +579,16 @@ void Client::shutdown() timer.shutdown(); client_lock.Unlock(); - objecter->shutdown(); objecter_finisher.wait_for_empty(); objecter_finisher.stop(); - monclient->shutdown(); - if (logger) { - cct->get_perfcounters_collection()->remove(logger); - delete logger; - logger = NULL; + cct->get_perfcounters_collection()->remove(logger.get()); + logger.reset(); } } - - // =================== // metadata cache stuff @@ -5436,7 +5409,7 @@ int Client::authenticate() { assert(client_lock.is_locked_by_me()); - if (authenticated) { + if (monclient->is_authenticated()) { return 0; } @@ -5449,7 +5422,6 @@ int Client::authenticate() whoami = monclient->get_global_id(); messenger->set_myname(entity_name_t::CLIENT(whoami.v)); - authenticated = true; return 0; } @@ -13257,3 +13229,55 @@ mds_rank_t Client::_get_random_up_mds() const return *p; } + +StandaloneClient::StandaloneClient(Messenger *m, MonClient *mc) + : Client(m, mc, new Objecter(m->cct, m, mc, NULL, 0, 0)) +{ + monclient->set_messenger(m); + objecter->set_client_incarnation(0); +} + +StandaloneClient::~StandaloneClient() +{ + delete objecter; + objecter = nullptr; +} + +int StandaloneClient::init() +{ + timer.init(); + objectcacher->start(); + objecter->init(); + + client_lock.Lock(); + assert(!initialized); + + messenger->add_dispatcher_tail(objecter); + messenger->add_dispatcher_tail(this); + + monclient->set_want_keys(CEPH_ENTITY_TYPE_MDS | CEPH_ENTITY_TYPE_OSD); + int r = monclient->init(); + if (r < 0) { + // need to do cleanup because we're in an intermediate init state + timer.shutdown(); + client_lock.Unlock(); + objecter->shutdown(); + objectcacher->stop(); + monclient->shutdown(); + return r; + } + objecter->start(); + + client_lock.Unlock(); + _finish_init(); + + return 0; +} + +void StandaloneClient::shutdown() +{ + Client::shutdown(); + objecter->shutdown(); + monclient->shutdown(); +} + diff --git a/src/client/Client.h b/src/client/Client.h index 236ca90d995e..7d2b73eb35d5 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -252,7 +252,7 @@ class Client : public Dispatcher, public md_config_obs_t { public: using Dispatcher::cct; - PerfCounters *logger; + std::unique_ptr logger; class CommandHook : public AdminSocketHook { Client *m_client; @@ -304,8 +304,10 @@ public: return UserPerm(uid, gid); } protected: - MonClient *monclient; Messenger *messenger; + MonClient *monclient; + Objecter *objecter; + client_t whoami; int user_id, group_id; @@ -384,7 +386,6 @@ protected: bool is_dir_operation(MetaRequest *request); bool initialized; - bool authenticated; bool mounted; bool unmounting; @@ -404,10 +405,9 @@ public: void _sync_write_commit(Inode *in); protected: - Filer *filer; - ObjectCacher *objectcacher; - Objecter *objecter; // (non-blocking) osd interface - WritebackHandler *writeback_handler; + std::unique_ptr filer; + std::unique_ptr objectcacher; + std::unique_ptr writeback_handler; // cache ceph::unordered_map inode_map; @@ -586,11 +586,18 @@ protected: void _close_sessions(); + /** + * The basic housekeeping parts of init (perf counters, admin socket) + * that is independent of how objecters/monclient/messengers are + * being set up. + */ + void _finish_init(); + public: void set_filer_flags(int flags); void clear_filer_flags(int flags); - Client(Messenger *m, MonClient *mc); + Client(Messenger *m, MonClient *mc, Objecter *objecter_); ~Client() override; void tear_down_cache(); @@ -601,8 +608,8 @@ protected: inodeno_t get_root_ino(); Inode *get_root(); - int init() WARN_UNUSED_RESULT; - void shutdown(); + virtual int init(); + virtual void shutdown(); // messaging void handle_mds_map(class MMDSMap *m); @@ -1225,4 +1232,19 @@ public: const std::set &changed) override; }; +/** + * Specialization of Client that manages its own Objecter instance + * and handles init/shutdown of messenger/monclient + */ +class StandaloneClient : public Client +{ + public: + StandaloneClient(Messenger *m, MonClient *mc); + + ~StandaloneClient() override; + + int init() override; + void shutdown() override; +}; + #endif diff --git a/src/client/SyntheticClient.cc b/src/client/SyntheticClient.cc index 1c1b33aa3a29..54c3ddbff44e 100644 --- a/src/client/SyntheticClient.cc +++ b/src/client/SyntheticClient.cc @@ -3354,7 +3354,6 @@ int SyntheticClient::chunk_file(string &filename) uint64_t size = st.st_size; dout(0) << "file " << filename << " size is " << size << dendl; - Filer *filer = client->filer; inode_t inode; memset(&inode, 0, sizeof(inode)); @@ -3374,7 +3373,8 @@ int SyntheticClient::chunk_file(string &filename) flock.Lock(); Context *onfinish = new C_SafeCond(&flock, &cond, &done); - filer->read(inode.ino, &inode.layout, CEPH_NOSNAP, pos, get, &bl, 0, onfinish); + client->filer->read(inode.ino, &inode.layout, CEPH_NOSNAP, pos, get, &bl, 0, + onfinish); while (!done) cond.Wait(flock); flock.Unlock(); diff --git a/src/libcephfs.cc b/src/libcephfs.cc index a196dc669d96..bfacd1bae51b 100644 --- a/src/libcephfs.cc +++ b/src/libcephfs.cc @@ -33,6 +33,7 @@ #include "include/cephfs/libcephfs.h" + struct ceph_mount_info { public: @@ -86,7 +87,7 @@ public: //at last the client ret = -CEPHFS_ERROR_NEW_CLIENT; //defined in libcephfs.h; - client = new Client(messenger, monclient); + client = new StandaloneClient(messenger, monclient); if (!client) goto fail; @@ -248,7 +249,7 @@ public: private: bool mounted; bool inited; - Client *client; + StandaloneClient *client; MonClient *monclient; Messenger *messenger; CephContext *cct; diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index 5d00487a0107..da843269ac99 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -463,6 +463,9 @@ int MonClient::authenticate(double timeout) if (active_con) { ldout(cct, 5) << __func__ << " success, global_id " << active_con->get_global_id() << dendl; + // active_con should not have been set if there was an error + assert(authenticate_err == 0); + authenticated = true; } if (authenticate_err < 0 && no_keyring_disabled_cephx) { diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h index 126032b7b2f8..9656db0ee71a 100644 --- a/src/mon/MonClient.h +++ b/src/mon/MonClient.h @@ -194,6 +194,7 @@ private: uint64_t global_id = 0; Cond auth_cond; int authenticate_err = 0; + bool authenticated = false; list waiting_for_session; utime_t last_rotating_renew_sent; @@ -219,6 +220,7 @@ public: int wait_auth_rotating(double timeout); int authenticate(double timeout=0.0); + bool is_authenticated() const {return authenticated;} /** * Try to flush as many log messages as we can in a single