From: Leonid Usov Date: Sun, 21 Jan 2024 17:37:52 +0000 (+0200) Subject: pybind/mgr: add a `one-shot` parameter to send_command X-Git-Tag: v20.0.0~2418^2~16 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=9907efd0132197feed4b7cc4e7c17a7ec9104a47;p=ceph.git pybind/mgr: add a `one-shot` parameter to send_command with the parameter set, the message won't be held on to when the remote end resets or fails to reconnect. Signed-off-by: Leonid Usov --- diff --git a/src/client/Client.cc b/src/client/Client.cc index 473b6cb229fe6..bbbc3b60e05cf 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -3028,15 +3028,10 @@ void Client::handle_fs_map_user(const MConstRef& m) // Cancel all the commands for missing or laggy GIDs void Client::cancel_commands(const MDSMap& newmap) { - std::vector cancel_ops; - - std::scoped_lock cmd_lock(command_lock); - auto &commands = command_table.get_commands(); - for (const auto &[tid, op] : commands) { + cancel_commands_if([=, this](MDSCommandOp const& op) { const mds_gid_t op_mds_gid = op.mds_gid; if (newmap.is_dne_gid(op_mds_gid) || newmap.is_laggy_gid(op_mds_gid)) { - ldout(cct, 1) << __func__ << ": cancelling command op " << tid << dendl; - cancel_ops.push_back(tid); + ldout(cct, 1) << "cancel_commands: cancelling command op " << op.tid << dendl; if (op.outs) { std::ostringstream ss; ss << "MDS " << op_mds_gid << " went away"; @@ -3048,13 +3043,10 @@ void Client::cancel_commands(const MDSMap& newmap) * has its own lock. */ op.con->mark_down(); - if (op.on_finish) - op.on_finish->complete(-CEPHFS_ETIMEDOUT); + return -CEPHFS_ETIMEDOUT; } - } - - for (const auto &tid : cancel_ops) - command_table.erase(tid); + return 0; + }); } void Client::handle_mds_map(const MConstRef& m) @@ -6416,7 +6408,8 @@ int Client::mds_command( const bufferlist& inbl, bufferlist *outbl, string *outs, - Context *onfinish) + Context *onfinish, + bool one_shot) { RWRef_t iref_reader(initialize_state, CLIENT_INITIALIZED); if (!iref_reader.is_state_satisfied()) @@ -6475,6 +6468,9 @@ int Client::mds_command( // Open a connection to the target MDS ConnectionRef conn = messenger->connect_to_mds(info.get_addrs()); + if (one_shot) { + conn->send_keepalive(); + } cl.unlock(); { @@ -6489,6 +6485,7 @@ int Client::mds_command( op.inbl = inbl; op.mds_gid = target_gid; op.con = conn; + op.one_shot = one_shot; ldout(cct, 4) << __func__ << ": new command op to " << target_gid << " tid=" << op.tid << " multi_id=" << op.multi_target_id << " "<< cmd << dendl; @@ -16639,13 +16636,41 @@ void Client::ms_handle_connect(Connection *con) bool Client::ms_handle_reset(Connection *con) { ldout(cct, 0) << __func__ << " on " << con->get_peer_addr() << dendl; + + cancel_commands_if([=, this](MDSCommandOp const& op) { + if (op.one_shot && op.con.get() == con) { + ldout(cct, 1) << "ms_handle_reset: aborting one-shot command op " << op.tid << dendl; + if (op.outs) { + std::ostringstream ss; + ss << "MDS connection reset"; + *(op.outs) = ss.str(); + } + return -EPIPE; + } + return 0; + }); + return false; } void Client::ms_handle_remote_reset(Connection *con) { - std::scoped_lock lock(client_lock); ldout(cct, 0) << __func__ << " on " << con->get_peer_addr() << dendl; + + cancel_commands_if([=, this](MDSCommandOp const& op) { + if (op.one_shot && op.con.get() == con) { + ldout(cct, 1) << "ms_handle_remote_reset: aborting one-shot command op " << op.tid << dendl; + if (op.outs) { + std::ostringstream ss; + ss << "MDS remote session reset"; + *(op.outs) = ss.str(); + } + return -EPIPE; + } + return 0; + }); + + std::scoped_lock lock(client_lock); switch (con->get_peer_type()) { case CEPH_ENTITY_TYPE_MDS: { @@ -16704,6 +16729,20 @@ void Client::ms_handle_remote_reset(Connection *con) bool Client::ms_handle_refused(Connection *con) { ldout(cct, 1) << __func__ << " on " << con->get_peer_addr() << dendl; + + cancel_commands_if([=, this](MDSCommandOp const& op) { + if (op.one_shot && op.con.get() == con) { + ldout(cct, 1) << "ms_handle_refused: aborting one-shot command op " << op.tid << dendl; + if (op.outs) { + std::ostringstream ss; + ss << "MDS connection refused"; + *(op.outs) = ss.str(); + } + return -EPIPE; + } + return 0; + }); + return false; } diff --git a/src/client/Client.h b/src/client/Client.h index 712583aa0d3c5..e611b6693dbb7 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -96,6 +96,7 @@ class MDSCommandOp : public CommandOp { public: mds_gid_t mds_gid; + bool one_shot = false; explicit MDSCommandOp(ceph_tid_t t) : CommandOp(t) {} explicit MDSCommandOp(ceph_tid_t t, ceph_tid_t multi_id) : CommandOp(t, multi_id) {} @@ -333,7 +334,7 @@ public: const std::string &mds_spec, const std::vector& cmd, const bufferlist& inbl, - bufferlist *poutbl, std::string *prs, Context *onfinish); + bufferlist *poutbl, std::string *prs, Context *onfinish, bool one_shot = false); // these should (more or less) mirror the actual system calls. int statfs(const char *path, struct statvfs *stbuf, const UserPerm& perms); @@ -712,6 +713,27 @@ public: virtual void shutdown(); // messaging + int cancel_commands_if(std::regular_invocable auto && error_for_op) + { + std::vector cancel_ops; + + std::scoped_lock cmd_lock(command_lock); + auto& commands = command_table.get_commands(); + for (const auto &[tid, op]: commands) { + int rc = static_cast(error_for_op(op)); + if (rc) { + cancel_ops.push_back(tid); + if (op.on_finish) + op.on_finish->complete(rc); + } + } + + for (const auto& tid : cancel_ops) + command_table.erase(tid); + + return cancel_ops.size(); + } + void cancel_commands(const MDSMap& newmap); void handle_mds_map(const MConstRef& m); void handle_fs_map(const MConstRef& m); diff --git a/src/mgr/BaseMgrModule.cc b/src/mgr/BaseMgrModule.cc index 6cb3a6bce2458..67d9986ef8e99 100644 --- a/src/mgr/BaseMgrModule.cc +++ b/src/mgr/BaseMgrModule.cc @@ -108,7 +108,7 @@ public: static PyObject* -ceph_send_command(BaseMgrModule *self, PyObject *args) +ceph_send_command(BaseMgrModule *self, PyObject *args, PyObject *kwargs) { // Like mon, osd, mds char *type = nullptr; @@ -122,9 +122,23 @@ ceph_send_command(BaseMgrModule *self, PyObject *args) Py_ssize_t inbuf_len = 0; bufferlist inbuf = {}; + static const char * keywords[] { + "result", + "svc_type", + "svc_id", + "command", + "tag", + "inbuf", + // --- kwargs star here + "one_shot", // whether to keep the command while we reestablish connection + nullptr // must be the last element + }; + + int one_shot = false; + PyObject *completion = nullptr; - if (!PyArg_ParseTuple(args, "Ossssz#:ceph_send_command", - &completion, &type, &name, &cmd_json, &tag, &inbuf_ptr, &inbuf_len)) { + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "Ossssz#|$p:ceph_send_command", const_cast(keywords), + &completion, &type, &name, &cmd_json, &tag, &inbuf_ptr, &inbuf_len, &one_shot)) { return nullptr; } @@ -199,7 +213,8 @@ ceph_send_command(BaseMgrModule *self, PyObject *args) inbuf, &command_c->outbl, &command_c->outs, - new C_OnFinisher(command_c, &self->py_modules->cmd_finisher)); + new C_OnFinisher(command_c, &self->py_modules->cmd_finisher), + one_shot); if (r != 0) { string msg("failed to send command to mds: "); msg.append(cpp_strerror(r)); @@ -1426,7 +1441,7 @@ PyMethodDef BaseMgrModule_methods[] = { {"_ceph_get_daemon_status", (PyCFunction)get_daemon_status, METH_VARARGS, "Get a service's status"}, - {"_ceph_send_command", (PyCFunction)ceph_send_command, METH_VARARGS, + {"_ceph_send_command", (PyCFunction)ceph_send_command, METH_VARARGS | METH_KEYWORDS, "Send a mon command"}, {"_ceph_set_health_checks", (PyCFunction)ceph_set_health_checks, METH_VARARGS, diff --git a/src/pybind/mgr/ceph_module.pyi b/src/pybind/mgr/ceph_module.pyi index df4a3782a0c96..3777c469a1fdd 100644 --- a/src/pybind/mgr/ceph_module.pyi +++ b/src/pybind/mgr/ceph_module.pyi @@ -83,7 +83,9 @@ class BaseMgrModule(object): svc_id: str, command: str, tag: str, - inbuf: Optional[str]) -> None: ... + inbuf: Optional[str], + *, + one_shot: bool) -> None: ... def _ceph_set_health_checks(self, checks: Mapping[str, HealthCheckT]) -> None: ... def _ceph_get_mgr_id(self) -> str: ... def _ceph_get_ceph_conf_path(self) -> str: ... diff --git a/src/pybind/mgr/mgr_module.py b/src/pybind/mgr/mgr_module.py index 13cf9386bcdb5..51ed4271821ea 100644 --- a/src/pybind/mgr/mgr_module.py +++ b/src/pybind/mgr/mgr_module.py @@ -1707,7 +1707,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): return r - def tell_command(self, daemon_type: str, daemon_id: str, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]: + def tell_command(self, daemon_type: str, daemon_id: str, cmd_dict: dict, inbuf: Optional[str] = None, one_shot: bool = False) -> Tuple[int, str, str]: """ Helper for `ceph tell` command execution. @@ -1722,7 +1722,7 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): """ t1 = time.time() result = CommandResult() - self.send_command(result, daemon_type, daemon_id, json.dumps(cmd_dict), "", inbuf) + self.send_command(result, daemon_type, daemon_id, json.dumps(cmd_dict), "", inbuf, one_shot=one_shot) r = result.wait() t2 = time.time() @@ -1739,7 +1739,9 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): svc_id: str, command: str, tag: str, - inbuf: Optional[str] = None) -> None: + inbuf: Optional[str] = None, + *, # kw-only args go below + one_shot: bool = False) -> None: """ Called by the plugin to send a command to the mon cluster. @@ -1760,8 +1762,10 @@ class MgrModule(ceph_module.BaseMgrModule, MgrModuleLoggingMixin): triggered, with notify_type set to "command", and notify_id set to the tag of the command. :param str inbuf: input buffer for sending additional data. + :param bool one_shot: a keyword-only param to make the command abort + with EPIPE when the target resets or refuses to reconnect """ - self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf) + self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf, one_shot=one_shot) def tool_exec( self, diff --git a/src/pybind/mgr/tests/__init__.py b/src/pybind/mgr/tests/__init__.py index 633959084ccbe..8ae6ea54b462d 100644 --- a/src/pybind/mgr/tests/__init__.py +++ b/src/pybind/mgr/tests/__init__.py @@ -100,7 +100,7 @@ if 'UNITTEST' in os.environ: def _ceph_get(self, data_name): return self.mock_store_get('_ceph_get', data_name, mock.MagicMock()) - def _ceph_send_command(self, res, svc_type, svc_id, command, tag, inbuf): + def _ceph_send_command(self, res, svc_type, svc_id, command, tag, inbuf, *, one_shot=False): cmd = json.loads(command) getattr(self, '_mon_commands_sent', []).append(cmd)