]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mgr: forward RADOS client instances for potential blacklist
authorPatrick Donnelly <pdonnell@redhat.com>
Tue, 3 Dec 2019 04:27:44 +0000 (20:27 -0800)
committerPatrick Donnelly <pdonnell@redhat.com>
Wed, 4 Dec 2019 01:53:14 +0000 (17:53 -0800)
The mgr creates a per-module RADOS client connection for modules which
interact with RADOS (e.g. the volumes module). These clients should also
be blacklisted when the active mgr is failed; we don't want the former
active mgr to continue interacting with RADOS when the new one takes
over. This is particularly impactful for avoiding extraneous
"unresponsive client" warnings from the MDS when the mgr switches
(especially in testing). The MDS will pickup the new OSD blacklists
which include's the old mgr's libcephfs instance and blacklist/evict
that session quietly.

Signed-off-by: Patrick Donnelly <pdonnell@redhat.com>
17 files changed:
src/include/rados/librados.h
src/libcephfs.cc
src/librados/RadosClient.cc
src/librados/RadosClient.h
src/librados/librados_c.cc
src/messages/MMgrBeacon.h
src/mgr/ActivePyModules.cc
src/mgr/ActivePyModules.h
src/mgr/BaseMgrModule.cc
src/mgr/MgrStandby.cc
src/mgr/PyModuleRegistry.h
src/mon/MgrMap.h
src/mon/MgrMonitor.cc
src/pybind/cephfs/cephfs.pyx
src/pybind/mgr/mgr_module.py
src/pybind/mgr/volumes/fs/volume.py
src/pybind/rados/rados.pyx

index d1fb2795c55a98ce4ecd41f97ca439657001d3f8..c6d5633fac7eb4e449d471090af375c8499a2bbd 100644 (file)
@@ -3666,6 +3666,15 @@ CEPH_RADOS_API int rados_blacklist_add(rados_t cluster,
                                       char *client_address,
                                       uint32_t expire_seconds);
 
+/**
+ * Gets addresses of the RADOS session, suitable for blacklisting.
+ *
+ * @param cluster cluster handle
+ * @param addrs the output string.
+ * @returns 0 on success, negative error code on failure
+ */
+CEPH_RADOS_API int rados_getaddrs(rados_t cluster, char** addrs);
+
 CEPH_RADOS_API void rados_set_osdmap_full_try(rados_ioctx_t io)
   __attribute__((deprecated));
 
index 8a6b14d1ed8dda06e322a32fce0348188eaeb1ec..0ace2c0f62001681922c2fb68866daf62e5c54eb 100644 (file)
@@ -228,6 +228,13 @@ public:
     return umask;
   }
 
+  std::string getaddrs()
+  {
+    CachedStackStringStream cos;
+    *cos << messenger->get_myaddrs();
+    return std::string(cos->strv());
+  }
+
   int conf_read_file(const char *path_list)
   {
     int ret = cct->_conf.parse_config_files(path_list, nullptr, 0);
@@ -427,6 +434,15 @@ extern "C" uint64_t ceph_get_instance_id(struct ceph_mount_info *cmount)
   return 0;
 }
 
+extern "C" int ceph_getaddrs(struct ceph_mount_info *cmount, char** addrs)
+{
+  if (!cmount->is_initialized())
+    return -ENOTCONN;
+  auto s = cmount->getaddrs();
+  *addrs = strdup(s.c_str());
+  return 0;
+}
+
 extern "C" int ceph_conf_read_file(struct ceph_mount_info *cmount, const char *path)
 {
   return cmount->conf_read_file(path);
index 372419816d8b34166d79185922aeac8ed79d6895..4de30e1e7dfe55c5db0174358ae5f7652b697697 100644 (file)
@@ -796,6 +796,12 @@ void librados::RadosClient::blacklist_self(bool set) {
   objecter->blacklist_self(set);
 }
 
+std::string librados::RadosClient::get_addrs() const {
+  CachedStackStringStream cos;
+  *cos << messenger->get_myaddrs();
+  return std::string(cos->strv());
+}
+
 int librados::RadosClient::blacklist_add(const string& client_address,
                                         uint32_t expire_seconds)
 {
index 0f2b153def4a7fd27f55182c61132f6954a98a4b..00520214a790b372f2e46a94f7bcccb37c066e9b 100644 (file)
@@ -167,6 +167,8 @@ public:
   bool put();
   void blacklist_self(bool set);
 
+  std::string get_addrs() const;
+
   int service_daemon_register(
     const std::string& service,  ///< service name (e.g., 'rgw')
     const std::string& name,     ///< daemon name (e.g., 'gwfoo')
index 21f366e74bc490db7b06563e8b9556f129dd7fb8..c918e571974f9147aceea98a0e6c9c1157eac0ae 100644 (file)
@@ -442,6 +442,15 @@ extern "C" int _rados_blacklist_add(rados_t cluster, char *client_address,
 }
 LIBRADOS_C_API_BASE_DEFAULT(rados_blacklist_add);
 
+extern "C" int _rados_getaddrs(rados_t cluster, char** addrs)
+{
+  librados::RadosClient *radosp = (librados::RadosClient *)cluster;
+  auto s = radosp->get_addrs();
+  *addrs = strdup(s.c_str());
+  return 0;
+}
+LIBRADOS_C_API_BASE_DEFAULT(rados_getaddrs);
+
 extern "C" void _rados_set_osdmap_full_try(rados_ioctx_t io)
 {
   librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
index 6b09fec9c9177a147b7bad4a31ef041c4b9d1c28..a3598922193adabd1519d57ad1a64e694964b709 100644 (file)
@@ -24,7 +24,7 @@
 
 class MMgrBeacon : public PaxosServiceMessage {
 private:
-  static constexpr int HEAD_VERSION = 9;
+  static constexpr int HEAD_VERSION = 10;
   static constexpr int COMPAT_VERSION = 8;
 
 protected:
@@ -45,6 +45,8 @@ protected:
 
   map<string,string> metadata; ///< misc metadata about this osd
 
+  std::vector<entity_addrvec_t> clients;
+
   uint64_t mgr_features = 0;   ///< reporting mgr's features
 
 public:
@@ -57,10 +59,12 @@ public:
              entity_addrvec_t server_addrs_, bool available_,
             std::vector<MgrMap::ModuleInfo>&& modules_,
             map<string,string>&& metadata_,
+             std::vector<entity_addrvec_t> clients,
             uint64_t feat)
     : PaxosServiceMessage{MSG_MGR_BEACON, 0, HEAD_VERSION, COMPAT_VERSION},
       gid(gid_), server_addrs(server_addrs_), available(available_), name(name_),
       fsid(fsid_), modules(std::move(modules_)), metadata(std::move(metadata_)),
+      clients(std::move(clients)),
       mgr_features(feat)
   {
   }
@@ -98,6 +102,11 @@ public:
     return modules;
   }
 
+  const auto& get_clients() const
+  {
+    return clients;
+  }
+
 private:
   ~MMgrBeacon() override {}
 
@@ -143,6 +152,7 @@ public:
 
     encode(modules, payload);
     encode(mgr_features, payload);
+    encode(clients, payload, features);
   }
   void decode_payload() override {
     auto p = payload.cbegin();
@@ -182,6 +192,9 @@ public:
     if (header.version >= 9) {
       decode(mgr_features, p);
     }
+    if (header.version >= 10) {
+      decode(clients, p);
+    }
   }
 private:
   template<class T, typename... Args>
index 4707a7d74d482ba33c6e8f4a15aceee315619135..36a5f74bafea5701e98147b904bcd6bbe71fdf37 100644 (file)
@@ -1080,3 +1080,25 @@ void ActivePyModules::cluster_log(const std::string &channel, clog_type prio,
     clog->do_log(prio, message);
   }
 }
+
+void ActivePyModules::register_client(std::string_view name, std::string addrs)
+{
+  std::lock_guard l(lock);
+
+  entity_addrvec_t addrv;
+  addrv.parse(addrs.data());
+
+  dout(7) << "registering msgr client handle " << addrv << dendl;
+  py_module_registry.register_client(name, std::move(addrv));
+}
+
+void ActivePyModules::unregister_client(std::string_view name, std::string addrs)
+{
+  std::lock_guard l(lock);
+
+  entity_addrvec_t addrv;
+  addrv.parse(addrs.data());
+
+  dout(7) << "unregistering msgr client handle " << addrv << dendl;
+  py_module_registry.unregister_client(name, addrv);
+}
index caa5915efa918063bc772a23171b712665f9f4f7..a8a0f3c4cf0503e4476651d6d0544c5f03328a80 100644 (file)
@@ -134,6 +134,9 @@ public:
   void clear_all_progress_events();
   void get_progress_events(std::map<std::string,ProgressEvent>* events);
 
+  void register_client(std::string_view name, std::string addrs);
+  void unregister_client(std::string_view name, std::string addrs);
+
   void config_notify();
 
   void set_uri(const std::string& module_name, const std::string &uri);
index a6963bb28eb65d16ab000ee0ee5a0aad07651a66..c8d30875bf4124c4a437d10411c7f0550d8deb85 100644 (file)
@@ -1050,6 +1050,32 @@ ceph_is_authorized(BaseMgrModule *self, PyObject *args)
   Py_RETURN_FALSE;
 }
 
+static PyObject*
+ceph_register_client(BaseMgrModule *self, PyObject *args)
+{
+  char *addrs = nullptr;
+  if (!PyArg_ParseTuple(args, "s:ceph_register_client", &addrs)) {
+    return nullptr;
+  }
+
+  self->py_modules->register_client(self->this_module->get_name(), addrs);
+
+  Py_RETURN_NONE;
+}
+
+static PyObject*
+ceph_unregister_client(BaseMgrModule *self, PyObject *args)
+{
+  char *addrs = nullptr;
+  if (!PyArg_ParseTuple(args, "s:ceph_unregister_client", &addrs)) {
+    return nullptr;
+  }
+
+  self->py_modules->unregister_client(self->this_module->get_name(), addrs);
+
+  Py_RETURN_NONE;
+}
+
 PyMethodDef BaseMgrModule_methods[] = {
   {"_ceph_get", (PyCFunction)ceph_state_get, METH_VARARGS,
    "Get a cluster object"},
@@ -1146,6 +1172,12 @@ PyMethodDef BaseMgrModule_methods[] = {
   {"_ceph_is_authorized", (PyCFunction)ceph_is_authorized,
     METH_VARARGS, "Verify the current session caps are valid"},
 
+  {"_ceph_register_client", (PyCFunction)ceph_register_client,
+    METH_VARARGS, "Register RADOS instance for potential blacklisting"},
+
+  {"_ceph_unregister_client", (PyCFunction)ceph_unregister_client,
+    METH_VARARGS, "Unregister RADOS instance for potential blacklisting"},
+
   {NULL, NULL, 0, NULL}
 };
 
index 1d806569c3976ae045e1e7256557e84f2bda6e69..16c82b72e88d8548e8c6669943a0173bd0642684 100644 (file)
@@ -206,6 +206,11 @@ void MgrStandby::send_beacon()
     module_info.push_back(std::move(info));
   }
 
+  auto clients = py_module_registry.get_clients();
+  for (const auto& client : clients) {
+    dout(15) << "noting RADOS client for blacklist: " << client << dendl;
+  }
+
   // Whether I think I am available (request MgrMonitor to set me
   // as available in the map)
   bool available = active_mgr != nullptr && active_mgr->is_initialized();
@@ -225,6 +230,7 @@ void MgrStandby::send_beacon()
                                  available,
                                 std::move(module_info),
                                 std::move(metadata),
+                                 std::move(clients),
                                 CEPH_FEATURES_ALL);
 
   if (available) {
index c7f8b806d964453446c50fe10f2040bb92f5cd39..12bcb93e8ac53ad86d24f1f21fbc567eb3ffee10 100644 (file)
@@ -44,6 +44,7 @@ private:
   LogChannelRef clog;
 
   std::map<std::string, PyModuleRef> modules;
+  std::multimap<std::string, entity_addrvec_t> clients;
 
   std::unique_ptr<ActivePyModules> active_modules;
   std::unique_ptr<StandbyPyModules> standby_modules;
@@ -182,5 +183,30 @@ public:
     ceph_assert(active_modules);
     return active_modules->get_services();
   }
+
+  void register_client(std::string_view name, entity_addrvec_t addrs)
+  {
+    clients.emplace(std::string(name), std::move(addrs));
+  }
+  void unregister_client(std::string_view name, const entity_addrvec_t& addrs)
+  {
+    auto itp = clients.equal_range(std::string(name));
+    for (auto it = itp.first; it != itp.second; ++it) {
+      if (it->second == addrs) {
+        it = clients.erase(it);
+      }
+    }
+  }
+
+  auto get_clients() const
+  {
+    std::scoped_lock l(lock);
+    std::vector<entity_addrvec_t> v;
+    for (const auto& p : clients) {
+      v.push_back(p.second);
+    }
+    return v;
+  }
+
   // <<< (end of ActivePyModules cheeky call-throughs)
 };
index 410ebd2f6110eff6eb3aa3e04f2b47d842d527d7..d6a0e69af5a59b8ed15eca05b83538fc9565a4a5 100644 (file)
@@ -238,6 +238,8 @@ public:
   /// features
   uint64_t active_mgr_features = 0;
 
+  std::vector<entity_addrvec_t> clients; // for blacklist
+
   std::map<uint64_t, StandbyInfo> standbys;
 
   // Modules which are enabled
@@ -381,7 +383,7 @@ public:
       ENCODE_FINISH(bl);
       return;
     }
-    ENCODE_START(10, 6, bl);
+    ENCODE_START(11, 6, bl);
     encode(epoch, bl);
     encode(active_addrs, bl, features);
     encode(active_gid, bl);
@@ -395,13 +397,14 @@ public:
     encode(always_on_modules, bl);
     encode(active_mgr_features, bl);
     encode(last_failure_osd_epoch, bl);
+    encode(clients, bl, features);
     ENCODE_FINISH(bl);
     return;
   }
 
   void decode(ceph::buffer::list::const_iterator& p)
   {
-    DECODE_START(8, p);
+    DECODE_START(11, p);
     decode(epoch, p);
     decode(active_addrs, p);
     decode(active_gid, p);
@@ -446,6 +449,9 @@ public:
     if (struct_v >= 10) {
       decode(last_failure_osd_epoch, p);
     }
+    if (struct_v >= 11) {
+      decode(clients, p);
+    }
     DECODE_FINISH(p);
   }
 
@@ -498,6 +504,11 @@ public:
       f->close_section();
     }
     f->dump_int("last_failure_osd_epoch", last_failure_osd_epoch);
+    f->open_array_section("active_clients");
+    for (const auto &c : clients) {
+      f->dump_object("client", c);
+    }
+    f->close_section();
     f->close_section();
   }
 
index a361978fcdcae42cf3962e3816c9358a5cafc9bd..515c3467c1b94679a6965234c82f2a41106f67b8 100644 (file)
@@ -524,6 +524,13 @@ bool MgrMonitor::prepare_beacon(MonOpRequestRef op)
       pending_map.available_modules = m->get_available_modules();
       updated = true;
     }
+    const auto& clients = m->get_clients();
+    if (pending_map.clients != clients) {
+      dout(4) << "active's RADOS clients " << clients
+             << " (was " << pending_map.clients << ")" << dendl;
+      pending_map.clients = clients;
+      updated = true;
+    }
   } else if (pending_map.active_gid == 0) {
     // There is no currently active daemon, select this one.
     if (pending_map.standbys.count(m->get_gid())) {
@@ -834,6 +841,11 @@ void MgrMonitor::drop_active()
           << pending_map.active_gid << " ("
           << pending_map.active_addrs << ")" << dendl;
   auto blacklist_epoch = mon->osdmon()->blacklist(pending_map.active_addrs, until);
+
+  /* blacklist RADOS clients in use by the mgr */
+  for (const auto& a : pending_map.clients) {
+    mon->osdmon()->blacklist(a, until);
+  }
   request_proposal(mon->osdmon());
 
   pending_metadata_rm.insert(pending_map.active_name);
@@ -845,6 +857,7 @@ void MgrMonitor::drop_active()
   pending_map.available = false;
   pending_map.active_addrs = entity_addrvec_t();
   pending_map.services.clear();
+  pending_map.clients.clear();
   pending_map.last_failure_osd_epoch = blacklist_epoch;
 
   // So that when new active mgr subscribes to mgrdigest, it will
index e80bd8a61abc15d8a0d3e7ceda3ded3f6b7f8494..6bf24b1b7486ba7042ec41f83bf4397bc0a0ea95 100644 (file)
@@ -130,6 +130,7 @@ cdef extern from "cephfs/libcephfs.h" nogil:
     int ceph_init(ceph_mount_info *cmount)
     void ceph_shutdown(ceph_mount_info *cmount)
 
+    int ceph_getaddrs(ceph_mount_info* cmount, char** addrs)
     int ceph_conf_read_file(ceph_mount_info *cmount, const char *path_list)
     int ceph_conf_parse_argv(ceph_mount_info *cmount, int argc, const char **argv)
     int ceph_conf_get(ceph_mount_info *cmount, const char *option, char *buf, size_t len)
@@ -503,6 +504,27 @@ cdef class LibCephFS(object):
             for key, value in conf.iteritems():
                 self.conf_set(key, value)
 
+    def get_addrs(self):
+        """
+        Get associated client addresses with this RADOS session.
+        """
+        self.require_state("mounted")
+
+        cdef:
+            char* addrs = NULL
+
+        try:
+
+            with nogil:
+                ret = ceph_getaddrs(self.cluster, &addrs)
+            if ret:
+                raise make_ex(ret, "error calling getaddrs")
+
+            return decode_cstr(addrs)
+        finally:
+            free(addrs)
+
+
     def conf_read_file(self, conffile=None):
         """
         Load the ceph configuration from the specified config file.
index c17d98219edd02f344d4bc2ed86750a28d04ad6c..060288f44714993e35b9baf1ef54a582220bc4de 100644 (file)
@@ -775,7 +775,9 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         :return: None
         """
         if self._rados:
+            addrs = self._rados.get_addrs()
             self._rados.shutdown()
+            self._ceph_unregister_client(addrs)
 
     def get(self, data_name):
         """
@@ -1376,7 +1378,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         ctx_capsule = self.get_context()
         self._rados = rados.Rados(context=ctx_capsule)
         self._rados.connect()
-
+        self._ceph_register_client(self._rados.get_addrs())
         return self._rados
 
     @staticmethod
index 42ee42a4b40b4eb575fc75fbc4c63765018e7ce3..3800615aab35b26e7ca71083ee03199f2734f7a6 100644 (file)
@@ -82,11 +82,14 @@ class ConnectionPool(object):
             log.debug("CephFS mounting...")
             self.fs.mount(filesystem_name=self.fs_name.encode('utf-8'))
             log.debug("Connection to cephfs '{0}' complete".format(self.fs_name))
+            self.mgr._ceph_register_client(self.fs.get_addrs())
 
         def disconnect(self):
             assert self.ops_in_progress == 0
             log.info("disconnecting from cephfs '{0}'".format(self.fs_name))
+            addrs = self.fs.get_addrs()
             self.fs.shutdown()
+            self.mgr._ceph_unregister_client(addrs)
             self.fs = None
 
         def abort(self):
index 4ad8152d1ba73379e9ec7aa713d5401d5e090230..931ce6946ad2a58f0698dfd84030cf12ee938113 100644 (file)
@@ -152,6 +152,7 @@ cdef extern from "rados/librados.h" nogil:
     int rados_cluster_stat(rados_t cluster, rados_cluster_stat_t *result)
     int rados_cluster_fsid(rados_t cluster, char *buf, size_t len)
     int rados_blacklist_add(rados_t cluster, char *client_address, uint32_t expire_seconds)
+    int rados_getaddrs(rados_t cluster, char** addrs)
     int rados_application_enable(rados_ioctx_t io, const char *app_name,
                                  int force)
     void rados_set_osdmap_full_try(rados_ioctx_t io)
@@ -739,6 +740,26 @@ cdef class Rados(object):
             for key, value in conf.items():
                 self.conf_set(key, value)
 
+    def get_addrs(self):
+        """
+        Get associated client addresses with this RADOS session.
+        """
+        self.require_state("configuring", "connected")
+
+        cdef:
+            char* addrs = NULL
+
+        try:
+
+            with nogil:
+                ret = rados_getaddrs(self.cluster, &addrs)
+            if ret:
+                raise make_ex(ret, "error calling getaddrs")
+
+            return decode_cstr(addrs)
+        finally:
+            free(addrs)
+
     def require_state(self, *args):
         """
         Checks if the Rados object is in a special state