]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
pybind/mgr: add a `one-shot` parameter to send_command
authorLeonid Usov <leonid.usov@ibm.com>
Sun, 21 Jan 2024 17:37:52 +0000 (19:37 +0200)
committerLeonid Usov <leonid.usov@ibm.com>
Mon, 4 Mar 2024 11:48:03 +0000 (13:48 +0200)
with the parameter set, the message won't be held on to when the remote end resets
or fails to reconnect.

Signed-off-by: Leonid Usov <leonid.usov@ibm.com>
src/client/Client.cc
src/client/Client.h
src/mgr/BaseMgrModule.cc
src/pybind/mgr/ceph_module.pyi
src/pybind/mgr/mgr_module.py
src/pybind/mgr/tests/__init__.py

index 473b6cb229fe61dad25abed1514d11e7440555fc..bbbc3b60e05cf3c4248fdf693250b4ae38d48aac 100644 (file)
@@ -3028,15 +3028,10 @@ void Client::handle_fs_map_user(const MConstRef<MFSMapUser>& m)
 // Cancel all the commands for missing or laggy GIDs
 void Client::cancel_commands(const MDSMap& newmap)
 {
-  std::vector<ceph_tid_t> cancel_ops;
-
-  std::scoped_lock cmd_lock(command_lock);
-  auto &commands = command_table.get_commands();
-  for (const auto &[tid, op] : commands) {
+  cancel_commands_if([=, this](MDSCommandOp const& op) {
     const mds_gid_t op_mds_gid = op.mds_gid;
     if (newmap.is_dne_gid(op_mds_gid) || newmap.is_laggy_gid(op_mds_gid)) {
-      ldout(cct, 1) << __func__ << ": cancelling command op " << tid << dendl;
-      cancel_ops.push_back(tid);
+      ldout(cct, 1) << "cancel_commands: cancelling command op " << op.tid << dendl;
       if (op.outs) {
         std::ostringstream ss;
         ss << "MDS " << op_mds_gid << " went away";
@@ -3048,13 +3043,10 @@ void Client::cancel_commands(const MDSMap& newmap)
        * has its own lock.
        */
       op.con->mark_down();
-      if (op.on_finish)
-        op.on_finish->complete(-CEPHFS_ETIMEDOUT);
+      return -CEPHFS_ETIMEDOUT;
     }
-  }
-
-  for (const auto &tid : cancel_ops)
-    command_table.erase(tid);
+    return 0;
+  });
 }
 
 void Client::handle_mds_map(const MConstRef<MMDSMap>& m)
@@ -6416,7 +6408,8 @@ int Client::mds_command(
     const bufferlist& inbl,
     bufferlist *outbl,
     string *outs,
-    Context *onfinish)
+    Context *onfinish,
+    bool one_shot)
 {
   RWRef_t iref_reader(initialize_state, CLIENT_INITIALIZED);
   if (!iref_reader.is_state_satisfied())
@@ -6475,6 +6468,9 @@ int Client::mds_command(
 
     // Open a connection to the target MDS
     ConnectionRef conn = messenger->connect_to_mds(info.get_addrs());
+    if (one_shot) {
+      conn->send_keepalive();
+    }
 
     cl.unlock();
     {
@@ -6489,6 +6485,7 @@ int Client::mds_command(
       op.inbl = inbl;
       op.mds_gid = target_gid;
       op.con = conn;
+      op.one_shot = one_shot;
 
       ldout(cct, 4) << __func__ << ": new command op to " << target_gid
         << " tid=" << op.tid << " multi_id=" << op.multi_target_id << " "<< cmd << dendl;
@@ -16639,13 +16636,41 @@ void Client::ms_handle_connect(Connection *con)
 bool Client::ms_handle_reset(Connection *con)
 {
   ldout(cct, 0) << __func__ << " on " << con->get_peer_addr() << dendl;
+
+  cancel_commands_if([=, this](MDSCommandOp const& op) {
+    if (op.one_shot && op.con.get() == con) {
+      ldout(cct, 1) << "ms_handle_reset: aborting one-shot command op " << op.tid << dendl;
+      if (op.outs) {
+        std::ostringstream ss;
+        ss << "MDS connection reset";
+        *(op.outs) = ss.str();
+      }
+      return -EPIPE;
+    }
+    return 0;
+  });
+
   return false;
 }
 
 void Client::ms_handle_remote_reset(Connection *con)
 {
-  std::scoped_lock lock(client_lock);
   ldout(cct, 0) << __func__ << " on " << con->get_peer_addr() << dendl;
+
+  cancel_commands_if([=, this](MDSCommandOp const& op) {
+    if (op.one_shot && op.con.get() == con) {
+      ldout(cct, 1) << "ms_handle_remote_reset: aborting one-shot command op " << op.tid << dendl;
+      if (op.outs) {
+        std::ostringstream ss;
+        ss << "MDS remote session reset";
+        *(op.outs) = ss.str();
+      }
+      return -EPIPE;
+    }
+    return 0;
+  });
+
+  std::scoped_lock lock(client_lock);
   switch (con->get_peer_type()) {
   case CEPH_ENTITY_TYPE_MDS:
     {
@@ -16704,6 +16729,20 @@ void Client::ms_handle_remote_reset(Connection *con)
 bool Client::ms_handle_refused(Connection *con)
 {
   ldout(cct, 1) << __func__ << " on " << con->get_peer_addr() << dendl;
+
+  cancel_commands_if([=, this](MDSCommandOp const& op) {
+    if (op.one_shot && op.con.get() == con) {
+      ldout(cct, 1) << "ms_handle_refused: aborting one-shot command op " << op.tid << dendl;
+      if (op.outs) {
+        std::ostringstream ss;
+        ss << "MDS connection refused";
+        *(op.outs) = ss.str();
+      }
+      return -EPIPE;
+    }
+    return 0;
+  });
+
   return false;
 }
 
index 712583aa0d3c5d7c137286df26a5e938706e06b5..e611b6693dbb74e628814981e9f52f52478ebcb6 100644 (file)
@@ -96,6 +96,7 @@ class MDSCommandOp : public CommandOp
 {
   public:
   mds_gid_t     mds_gid;
+  bool          one_shot = false;
 
   explicit MDSCommandOp(ceph_tid_t t) : CommandOp(t) {}
   explicit MDSCommandOp(ceph_tid_t t, ceph_tid_t multi_id) : CommandOp(t, multi_id) {}
@@ -333,7 +334,7 @@ public:
     const std::string &mds_spec,
     const std::vector<std::string>& cmd,
     const bufferlist& inbl,
-    bufferlist *poutbl, std::string *prs, Context *onfinish);
+    bufferlist *poutbl, std::string *prs, Context *onfinish, bool one_shot = false);
 
   // these should (more or less) mirror the actual system calls.
   int statfs(const char *path, struct statvfs *stbuf, const UserPerm& perms);
@@ -712,6 +713,27 @@ public:
   virtual void shutdown();
 
   // messaging
+  int cancel_commands_if(std::regular_invocable<MDSCommandOp const&> auto && error_for_op)
+  {
+    std::vector<ceph_tid_t> cancel_ops;
+
+    std::scoped_lock cmd_lock(command_lock);
+    auto& commands = command_table.get_commands();
+    for (const auto &[tid, op]: commands) {
+      int rc = static_cast<int>(error_for_op(op));
+      if (rc) {
+        cancel_ops.push_back(tid);
+        if (op.on_finish)
+          op.on_finish->complete(rc);
+      }
+    }
+
+    for (const auto& tid : cancel_ops)
+      command_table.erase(tid);
+
+    return cancel_ops.size();
+  }
+
   void cancel_commands(const MDSMap& newmap);
   void handle_mds_map(const MConstRef<MMDSMap>& m);
   void handle_fs_map(const MConstRef<MFSMap>& m);
index 6cb3a6bce2458d2cb00a6a487545f8163971ee78..67d9986ef8e99335c68ac97d4481cb4ec828c101 100644 (file)
@@ -108,7 +108,7 @@ public:
 
 
 static PyObject*
-ceph_send_command(BaseMgrModule *self, PyObject *args)
+ceph_send_command(BaseMgrModule *self, PyObject *args, PyObject *kwargs)
 {
   // Like mon, osd, mds
   char *type = nullptr;
@@ -122,9 +122,23 @@ ceph_send_command(BaseMgrModule *self, PyObject *args)
   Py_ssize_t inbuf_len = 0;
   bufferlist inbuf = {};
 
+  static const char * keywords[] {
+    "result",
+    "svc_type",
+    "svc_id",
+    "command",
+    "tag",
+    "inbuf",
+    // --- kwargs star here
+    "one_shot",   // whether to keep the command while we reestablish connection
+    nullptr       // must be the last element
+  };
+
+  int one_shot = false;
+
   PyObject *completion = nullptr;
-  if (!PyArg_ParseTuple(args, "Ossssz#:ceph_send_command",
-        &completion, &type, &name, &cmd_json, &tag, &inbuf_ptr, &inbuf_len)) {
+  if (!PyArg_ParseTupleAndKeywords(args, kwargs, "Ossssz#|$p:ceph_send_command", const_cast<char**>(keywords),
+        &completion, &type, &name, &cmd_json, &tag, &inbuf_ptr, &inbuf_len, &one_shot)) {
     return nullptr;
   }
 
@@ -199,7 +213,8 @@ ceph_send_command(BaseMgrModule *self, PyObject *args)
         inbuf,
         &command_c->outbl,
         &command_c->outs,
-        new C_OnFinisher(command_c, &self->py_modules->cmd_finisher));
+        new C_OnFinisher(command_c, &self->py_modules->cmd_finisher),
+        one_shot);
     if (r != 0) {
       string msg("failed to send command to mds: ");
       msg.append(cpp_strerror(r));
@@ -1426,7 +1441,7 @@ PyMethodDef BaseMgrModule_methods[] = {
   {"_ceph_get_daemon_status", (PyCFunction)get_daemon_status, METH_VARARGS,
    "Get a service's status"},
 
-  {"_ceph_send_command", (PyCFunction)ceph_send_command, METH_VARARGS,
+  {"_ceph_send_command", (PyCFunction)ceph_send_command, METH_VARARGS | METH_KEYWORDS,
    "Send a mon command"},
 
   {"_ceph_set_health_checks", (PyCFunction)ceph_set_health_checks, METH_VARARGS,
index df4a3782a0c96ed3ce0244a2008b56834279b907..3777c469a1fdda61f69d76dc37ce54388ad664e5 100644 (file)
@@ -83,7 +83,9 @@ class BaseMgrModule(object):
                            svc_id: str,
                            command: str,
                            tag: str,
-                           inbuf: Optional[str]) -> None: ...
+                           inbuf: Optional[str],
+                           *,
+                           one_shot: bool) -> None: ...
     def _ceph_set_health_checks(self, checks: Mapping[str, HealthCheckT]) -> None: ...
     def _ceph_get_mgr_id(self) -> str: ...
     def _ceph_get_ceph_conf_path(self) -> str: ...
index 13cf9386bcdb581d1dcd2cb2411ed9c31b7ea198..51ed4271821eac4785ca03395222f4065c7beb46 100644 (file)
@@ -1707,7 +1707,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
 
         return r
 
-    def tell_command(self, daemon_type: str, daemon_id: str, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
+    def tell_command(self, daemon_type: str, daemon_id: str, cmd_dict: dict, inbuf: Optional[str] = None, one_shot: bool = False) -> Tuple[int, str, str]:
         """
         Helper for `ceph tell` command execution.
 
@@ -1722,7 +1722,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
         """
         t1 = time.time()
         result = CommandResult()
-        self.send_command(result, daemon_type, daemon_id, json.dumps(cmd_dict), "", inbuf)
+        self.send_command(result, daemon_type, daemon_id, json.dumps(cmd_dict), "", inbuf, one_shot=one_shot)
         r = result.wait()
         t2 = time.time()
 
@@ -1739,7 +1739,9 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
             svc_id: str,
             command: str,
             tag: str,
-            inbuf: Optional[str] = None) -> None:
+            inbuf: Optional[str] = None,
+            *, # kw-only args go below
+            one_shot: bool = False) -> None:
         """
         Called by the plugin to send a command to the mon
         cluster.
@@ -1760,8 +1762,10 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin):
             triggered, with notify_type set to "command", and notify_id set to
             the tag of the command.
         :param str inbuf: input buffer for sending additional data.
+        :param bool one_shot: a keyword-only param to make the command abort
+            with EPIPE when the target resets or refuses to reconnect
         """
-        self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf)
+        self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf, one_shot=one_shot)
 
     def tool_exec(
         self,
index 633959084ccbed3158c05f091fa5b6eb4c33e60e..8ae6ea54b462d7ce95875850fcd6161d7975ff58 100644 (file)
@@ -100,7 +100,7 @@ if 'UNITTEST' in os.environ:
         def _ceph_get(self, data_name):
             return self.mock_store_get('_ceph_get', data_name, mock.MagicMock())
 
-        def _ceph_send_command(self, res, svc_type, svc_id, command, tag, inbuf):
+        def _ceph_send_command(self, res, svc_type, svc_id, command, tag, inbuf, *, one_shot=False):
 
             cmd = json.loads(command)
             getattr(self, '_mon_commands_sent', []).append(cmd)