]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
client: enable using external Objecter
authorJohn Spray <john.spray@redhat.com>
Thu, 30 Mar 2017 14:06:12 +0000 (10:06 -0400)
committerKefu Chai <kchai@redhat.com>
Wed, 3 May 2017 05:37:52 +0000 (13:37 +0800)
To enable places that already have an objecter
(such as ceph-mgr) to avoid spinning up another
one inside Client.

This will also be a path to re-using the librados
objecter in multiple instances of libcephfs in the
same process, a la nfs-ganesha.

Signed-off-by: John Spray <john.spray@redhat.com>
src/ceph_fuse.cc
src/ceph_syn.cc
src/client/Client.cc
src/client/Client.h
src/client/SyntheticClient.cc
src/libcephfs.cc
src/mon/MonClient.cc
src/mon/MonClient.h

index c8018256246f0b877b3768e70f2d3b9989c6a8b4..faa949f9609f1596d2fef0cf525c589d926dbf27 100644 (file)
@@ -194,7 +194,7 @@ int main(int argc, const char **argv, const char *envp[]) {
 
     // get monmap
     Messenger *messenger = NULL;
-    Client *client;
+    StandaloneClient *client;
     CephFuse *cfuse;
     UserPerm perms;
     int tester_r = 0;
@@ -213,7 +213,7 @@ int main(int argc, const char **argv, const char *envp[]) {
     messenger->set_policy(entity_name_t::TYPE_MDS,
                          Messenger::Policy::lossless_client(0));
 
-    client = new Client(messenger, mc);
+    client = new StandaloneClient(messenger, mc);
     if (filer_flags) {
       client->set_filer_flags(filer_flags);
     }
index d3ed16a1295a3cb6058fa25accfeeb6803f27de6..a8cbba4de2ee1b235a603c1abf71d30297bd37a6 100644 (file)
@@ -67,7 +67,7 @@ int main(int argc, const char **argv, char *envp[])
     messengers[i]->bind(g_conf->public_addr);
     mclients[i] = new MonClient(g_ceph_context);
     mclients[i]->build_initial_monmap();
-    Client *client = new Client(messengers[i], mclients[i]);
+    auto client = new StandaloneClient(messengers[i], mclients[i]);
     client->set_filer_flags(syn_filer_flags);
     SyntheticClient *syn = new SyntheticClient(client);
     clients.push_back(client);
index 0cd9d66d284385c1532a29e377ded357ecc183b1..9273812b836ecaf70b0f4744a6e8bbbf63ea2253 100644 (file)
@@ -226,9 +226,8 @@ vinodeno_t Client::map_faked_ino(ino_t ino)
 
 // cons/des
 
-Client::Client(Messenger *m, MonClient *mc)
+Client::Client(Messenger *m, MonClient *mc, Objecter *objecter_)
   : Dispatcher(m->cct),
-    logger(NULL),
     m_command_hook(this),
     timer(m->cct, client_lock),
     callback_handle(NULL),
@@ -246,17 +245,16 @@ Client::Client(Messenger *m, MonClient *mc)
     remount_finisher(m->cct),
     objecter_finisher(m->cct),
     tick_event(NULL),
-    monclient(mc), messenger(m), whoami(mc->get_global_id()),
-    cap_epoch_barrier(0),
+    messenger(m), monclient(mc),
+    objecter(objecter_),
+    whoami(mc->get_global_id()), cap_epoch_barrier(0),
     last_tid(0), oldest_tid(0), last_flush_tid(1),
-    initialized(false), authenticated(false),
+    initialized(false),
     mounted(false), unmounting(false),
     local_osd(-1), local_osd_epoch(0),
     unsafe_sync_write(0),
     client_lock("Client::client_lock")
 {
-  monclient->set_messenger(m);
-
   _reset_faked_inos();
   //
   root = 0;
@@ -279,14 +277,12 @@ Client::Client(Messenger *m, MonClient *mc)
   // file handles
   free_fd_set.insert(10, 1<<30);
 
-  // osd interfaces
   mdsmap.reset(new MDSMap);
-  objecter = new Objecter(cct, messenger, monclient, NULL,
-                         0, 0);
-  objecter->set_client_incarnation(0);  // client always 0, for now.
-  writeback_handler = new ObjecterWriteback(objecter, &objecter_finisher,
-                                           &client_lock);
-  objectcacher = new ObjectCacher(cct, "libcephfs", *writeback_handler, client_lock,
+
+  // osd interfaces
+  writeback_handler.reset(new ObjecterWriteback(objecter, &objecter_finisher,
+                                           &client_lock));
+  objectcacher.reset(new ObjectCacher(cct, "libcephfs", *writeback_handler, client_lock,
                                  client_flush_set_callback,    // all commit callback
                                  (void*)this,
                                  cct->_conf->client_oc_size,
@@ -294,9 +290,9 @@ Client::Client(Messenger *m, MonClient *mc)
                                  cct->_conf->client_oc_max_dirty,
                                  cct->_conf->client_oc_target_dirty,
                                  cct->_conf->client_oc_max_dirty_age,
-                                 true);
+                                 true));
   objecter_finisher.start();
-  filer = new Filer(objecter, &objecter_finisher);
+  filer.reset(new Filer(objecter, &objecter_finisher));
 }
 
 
@@ -305,14 +301,6 @@ Client::~Client()
   assert(!client_lock.is_locked());
 
   tear_down_cache();
-
-  delete objectcacher;
-  delete writeback_handler;
-
-  delete filer;
-  delete objecter;
-
-  delete logger;
 }
 
 void Client::tear_down_cache()
@@ -467,35 +455,27 @@ int Client::init()
 {
   timer.init();
   objectcacher->start();
-  objecter->init();
 
   client_lock.Lock();
   assert(!initialized);
 
-  // ok!
-  messenger->add_dispatcher_tail(objecter);
   messenger->add_dispatcher_tail(this);
+  client_lock.Unlock();
 
-  monclient->set_want_keys(CEPH_ENTITY_TYPE_MDS | CEPH_ENTITY_TYPE_OSD);
-  int r = monclient->init();
-  if (r < 0) {
-    // need to do cleanup because we're in an intermediate init state
-    timer.shutdown();
-    client_lock.Unlock();
-    objecter->shutdown();
-    objectcacher->stop();
-    monclient->shutdown();
-    return r;
-  }
-  objecter->start();
+  _finish_init();
+  return 0;
+}
 
+void Client::_finish_init()
+{
+  client_lock.Lock();
   // logger
   PerfCountersBuilder plb(cct, "client", l_c_first, l_c_last);
   plb.add_time_avg(l_c_reply, "reply", "Latency of receiving a reply on metadata request");
   plb.add_time_avg(l_c_lat, "lat", "Latency of processing a metadata request");
   plb.add_time_avg(l_c_wrlat, "wrlat", "Latency of a file data write operation");
-  logger = plb.create_perf_counters();
-  cct->get_perfcounters_collection()->add(logger);
+  logger.reset(plb.create_perf_counters());
+  cct->get_perfcounters_collection()->add(logger.get());
 
   client_lock.Unlock();
 
@@ -546,7 +526,6 @@ int Client::init()
   client_lock.Lock();
   initialized = true;
   client_lock.Unlock();
-  return r;
 }
 
 void Client::shutdown() 
@@ -600,22 +579,16 @@ void Client::shutdown()
   timer.shutdown();
   client_lock.Unlock();
 
-  objecter->shutdown();
   objecter_finisher.wait_for_empty();
   objecter_finisher.stop();
 
-  monclient->shutdown();
-
   if (logger) {
-    cct->get_perfcounters_collection()->remove(logger);
-    delete logger;
-    logger = NULL;
+    cct->get_perfcounters_collection()->remove(logger.get());
+    logger.reset();
   }
 }
 
 
-
-
 // ===================
 // metadata cache stuff
 
@@ -5436,7 +5409,7 @@ int Client::authenticate()
 {
   assert(client_lock.is_locked_by_me());
 
-  if (authenticated) {
+  if (monclient->is_authenticated()) {
     return 0;
   }
 
@@ -5449,7 +5422,6 @@ int Client::authenticate()
 
   whoami = monclient->get_global_id();
   messenger->set_myname(entity_name_t::CLIENT(whoami.v));
-  authenticated = true;
 
   return 0;
 }
@@ -13257,3 +13229,55 @@ mds_rank_t Client::_get_random_up_mds() const
   return *p;
 }
 
+
+StandaloneClient::StandaloneClient(Messenger *m, MonClient *mc)
+    : Client(m, mc, new Objecter(m->cct, m, mc, NULL, 0, 0))
+{
+  monclient->set_messenger(m);
+  objecter->set_client_incarnation(0);
+}
+
+StandaloneClient::~StandaloneClient()
+{
+  delete objecter;
+  objecter = nullptr;
+}
+
+int StandaloneClient::init()
+{
+  timer.init();
+  objectcacher->start();
+  objecter->init();
+
+  client_lock.Lock();
+  assert(!initialized);
+
+  messenger->add_dispatcher_tail(objecter);
+  messenger->add_dispatcher_tail(this);
+
+  monclient->set_want_keys(CEPH_ENTITY_TYPE_MDS | CEPH_ENTITY_TYPE_OSD);
+  int r = monclient->init();
+  if (r < 0) {
+    // need to do cleanup because we're in an intermediate init state
+    timer.shutdown();
+    client_lock.Unlock();
+    objecter->shutdown();
+    objectcacher->stop();
+    monclient->shutdown();
+    return r;
+  }
+  objecter->start();
+
+  client_lock.Unlock();
+  _finish_init();
+
+  return 0;
+}
+
+void StandaloneClient::shutdown()
+{
+  Client::shutdown();
+  objecter->shutdown();
+  monclient->shutdown();
+}
+
index 236ca90d995e0cb4f0e2a5af572be512e575a775..7d2b73eb35d55d258ace5249fdd2fc2c76568cd5 100644 (file)
@@ -252,7 +252,7 @@ class Client : public Dispatcher, public md_config_obs_t {
  public:
   using Dispatcher::cct;
 
-  PerfCounters *logger;
+  std::unique_ptr<PerfCounters> logger;
 
   class CommandHook : public AdminSocketHook {
     Client *m_client;
@@ -304,8 +304,10 @@ public:
     return UserPerm(uid, gid);
   }
 protected:
-  MonClient *monclient;
   Messenger *messenger;  
+  MonClient *monclient;
+  Objecter  *objecter;
+
   client_t whoami;
 
   int user_id, group_id;
@@ -384,7 +386,6 @@ protected:
   bool is_dir_operation(MetaRequest *request);
 
   bool   initialized;
-  bool   authenticated;
   bool   mounted;
   bool   unmounting;
 
@@ -404,10 +405,9 @@ public:
   void _sync_write_commit(Inode *in);
 
 protected:
-  Filer                 *filer;     
-  ObjectCacher          *objectcacher;
-  Objecter              *objecter;     // (non-blocking) osd interface
-  WritebackHandler      *writeback_handler;
+  std::unique_ptr<Filer>             filer;
+  std::unique_ptr<ObjectCacher>      objectcacher;
+  std::unique_ptr<WritebackHandler>  writeback_handler;
 
   // cache
   ceph::unordered_map<vinodeno_t, Inode*> inode_map;
@@ -586,11 +586,18 @@ protected:
 
   void _close_sessions();
 
+  /**
+   * The basic housekeeping parts of init (perf counters, admin socket)
+   * that is independent of how objecters/monclient/messengers are
+   * being set up.
+   */
+  void _finish_init();
+
  public:
   void set_filer_flags(int flags);
   void clear_filer_flags(int flags);
 
-  Client(Messenger *m, MonClient *mc);
+  Client(Messenger *m, MonClient *mc, Objecter *objecter_);
   ~Client() override;
   void tear_down_cache();
 
@@ -601,8 +608,8 @@ protected:
   inodeno_t get_root_ino();
   Inode *get_root();
 
-  int init()  WARN_UNUSED_RESULT;
-  void shutdown();
+  virtual int init();
+  virtual void shutdown();
 
   // messaging
   void handle_mds_map(class MMDSMap *m);
@@ -1225,4 +1232,19 @@ public:
                                  const std::set <std::string> &changed) override;
 };
 
+/**
+ * Specialization of Client that manages its own Objecter instance
+ * and handles init/shutdown of messenger/monclient
+ */
+class StandaloneClient : public Client
+{
+  public:
+  StandaloneClient(Messenger *m, MonClient *mc);
+
+  ~StandaloneClient() override;
+
+  int init() override;
+  void shutdown() override;
+};
+
 #endif
index 1c1b33aa3a2900d40a83b3d4dce518bc8f369747..54c3ddbff44e1afa8f4c0e372ac89c03f89a48b9 100644 (file)
@@ -3354,7 +3354,6 @@ int SyntheticClient::chunk_file(string &filename)
   uint64_t size = st.st_size;
   dout(0) << "file " << filename << " size is " << size << dendl;
 
-  Filer *filer = client->filer;
 
   inode_t inode;
   memset(&inode, 0, sizeof(inode));
@@ -3374,7 +3373,8 @@ int SyntheticClient::chunk_file(string &filename)
     
     flock.Lock();
     Context *onfinish = new C_SafeCond(&flock, &cond, &done);
-    filer->read(inode.ino, &inode.layout, CEPH_NOSNAP, pos, get, &bl, 0, onfinish);
+    client->filer->read(inode.ino, &inode.layout, CEPH_NOSNAP, pos, get, &bl, 0,
+                       onfinish);
     while (!done)
       cond.Wait(flock);
     flock.Unlock();
index a196dc669d96e9b0bd3e41ce2d429fddfcb457c8..bfacd1bae51b1af157ece04f693b1efdcaa05336 100644 (file)
@@ -33,6 +33,7 @@
 
 #include "include/cephfs/libcephfs.h"
 
+
 struct ceph_mount_info
 {
 public:
@@ -86,7 +87,7 @@ public:
 
     //at last the client
     ret = -CEPHFS_ERROR_NEW_CLIENT; //defined in libcephfs.h;
-    client = new Client(messenger, monclient);
+    client = new StandaloneClient(messenger, monclient);
     if (!client)
       goto fail;
 
@@ -248,7 +249,7 @@ public:
 private:
   bool mounted;
   bool inited;
-  Client *client;
+  StandaloneClient *client;
   MonClient *monclient;
   Messenger *messenger;
   CephContext *cct;
index 5d00487a010795ba7bddae4afb74b5e9309dfdb6..da843269ac993ab392c80df2c4b67d763f2195df 100644 (file)
@@ -463,6 +463,9 @@ int MonClient::authenticate(double timeout)
   if (active_con) {
     ldout(cct, 5) << __func__ << " success, global_id "
                  << active_con->get_global_id() << dendl;
+    // active_con should not have been set if there was an error
+    assert(authenticate_err == 0);
+    authenticated = true;
   }
 
   if (authenticate_err < 0 && no_keyring_disabled_cephx) {
index 126032b7b2f8fb3b7cd9ab2a12d9c3ce725e5078..9656db0ee71a6d1fd59801204627ca1002e78c54 100644 (file)
@@ -194,6 +194,7 @@ private:
   uint64_t global_id = 0;
   Cond auth_cond;
   int authenticate_err = 0;
+  bool authenticated = false;
 
   list<Message*> waiting_for_session;
   utime_t last_rotating_renew_sent;
@@ -219,6 +220,7 @@ public:
   int wait_auth_rotating(double timeout);
 
   int authenticate(double timeout=0.0);
+  bool is_authenticated() const {return authenticated;}
 
   /**
    * Try to flush as many log messages as we can in a single