with the parameter set, the message won't be held on to when the remote end resets
or fails to reconnect.
Signed-off-by: Leonid Usov <leonid.usov@ibm.com>
// Cancel all the commands for missing or laggy GIDs
void Client::cancel_commands(const MDSMap& newmap)
{
- std::vector<ceph_tid_t> cancel_ops;
-
- std::scoped_lock cmd_lock(command_lock);
- auto &commands = command_table.get_commands();
- for (const auto &[tid, op] : commands) {
+ cancel_commands_if([=, this](MDSCommandOp const& op) {
const mds_gid_t op_mds_gid = op.mds_gid;
if (newmap.is_dne_gid(op_mds_gid) || newmap.is_laggy_gid(op_mds_gid)) {
- ldout(cct, 1) << __func__ << ": cancelling command op " << tid << dendl;
- cancel_ops.push_back(tid);
+ ldout(cct, 1) << "cancel_commands: cancelling command op " << op.tid << dendl;
if (op.outs) {
std::ostringstream ss;
ss << "MDS " << op_mds_gid << " went away";
* has its own lock.
*/
op.con->mark_down();
- if (op.on_finish)
- op.on_finish->complete(-CEPHFS_ETIMEDOUT);
+ return -CEPHFS_ETIMEDOUT;
}
- }
-
- for (const auto &tid : cancel_ops)
- command_table.erase(tid);
+ return 0;
+ });
}
void Client::handle_mds_map(const MConstRef<MMDSMap>& m)
const bufferlist& inbl,
bufferlist *outbl,
string *outs,
- Context *onfinish)
+ Context *onfinish,
+ bool one_shot)
{
RWRef_t iref_reader(initialize_state, CLIENT_INITIALIZED);
if (!iref_reader.is_state_satisfied())
// Open a connection to the target MDS
ConnectionRef conn = messenger->connect_to_mds(info.get_addrs());
+ if (one_shot) {
+ conn->send_keepalive();
+ }
cl.unlock();
{
op.inbl = inbl;
op.mds_gid = target_gid;
op.con = conn;
+ op.one_shot = one_shot;
ldout(cct, 4) << __func__ << ": new command op to " << target_gid
<< " tid=" << op.tid << " multi_id=" << op.multi_target_id << " "<< cmd << dendl;
bool Client::ms_handle_reset(Connection *con)
{
ldout(cct, 0) << __func__ << " on " << con->get_peer_addr() << dendl;
+
+ cancel_commands_if([=, this](MDSCommandOp const& op) {
+ if (op.one_shot && op.con.get() == con) {
+ ldout(cct, 1) << "ms_handle_reset: aborting one-shot command op " << op.tid << dendl;
+ if (op.outs) {
+ std::ostringstream ss;
+ ss << "MDS connection reset";
+ *(op.outs) = ss.str();
+ }
+ return -EPIPE;
+ }
+ return 0;
+ });
+
return false;
}
void Client::ms_handle_remote_reset(Connection *con)
{
- std::scoped_lock lock(client_lock);
ldout(cct, 0) << __func__ << " on " << con->get_peer_addr() << dendl;
+
+ cancel_commands_if([=, this](MDSCommandOp const& op) {
+ if (op.one_shot && op.con.get() == con) {
+ ldout(cct, 1) << "ms_handle_remote_reset: aborting one-shot command op " << op.tid << dendl;
+ if (op.outs) {
+ std::ostringstream ss;
+ ss << "MDS remote session reset";
+ *(op.outs) = ss.str();
+ }
+ return -EPIPE;
+ }
+ return 0;
+ });
+
+ std::scoped_lock lock(client_lock);
switch (con->get_peer_type()) {
case CEPH_ENTITY_TYPE_MDS:
{
bool Client::ms_handle_refused(Connection *con)
{
ldout(cct, 1) << __func__ << " on " << con->get_peer_addr() << dendl;
+
+ cancel_commands_if([=, this](MDSCommandOp const& op) {
+ if (op.one_shot && op.con.get() == con) {
+ ldout(cct, 1) << "ms_handle_refused: aborting one-shot command op " << op.tid << dendl;
+ if (op.outs) {
+ std::ostringstream ss;
+ ss << "MDS connection refused";
+ *(op.outs) = ss.str();
+ }
+ return -EPIPE;
+ }
+ return 0;
+ });
+
return false;
}
{
public:
mds_gid_t mds_gid;
+ bool one_shot = false;
explicit MDSCommandOp(ceph_tid_t t) : CommandOp(t) {}
explicit MDSCommandOp(ceph_tid_t t, ceph_tid_t multi_id) : CommandOp(t, multi_id) {}
const std::string &mds_spec,
const std::vector<std::string>& cmd,
const bufferlist& inbl,
- bufferlist *poutbl, std::string *prs, Context *onfinish);
+ bufferlist *poutbl, std::string *prs, Context *onfinish, bool one_shot = false);
// these should (more or less) mirror the actual system calls.
int statfs(const char *path, struct statvfs *stbuf, const UserPerm& perms);
virtual void shutdown();
// messaging
+ int cancel_commands_if(std::regular_invocable<MDSCommandOp const&> auto && error_for_op)
+ {
+ std::vector<ceph_tid_t> cancel_ops;
+
+ std::scoped_lock cmd_lock(command_lock);
+ auto& commands = command_table.get_commands();
+ for (const auto &[tid, op]: commands) {
+ int rc = static_cast<int>(error_for_op(op));
+ if (rc) {
+ cancel_ops.push_back(tid);
+ if (op.on_finish)
+ op.on_finish->complete(rc);
+ }
+ }
+
+ for (const auto& tid : cancel_ops)
+ command_table.erase(tid);
+
+ return cancel_ops.size();
+ }
+
void cancel_commands(const MDSMap& newmap);
void handle_mds_map(const MConstRef<MMDSMap>& m);
void handle_fs_map(const MConstRef<MFSMap>& m);
static PyObject*
-ceph_send_command(BaseMgrModule *self, PyObject *args)
+ceph_send_command(BaseMgrModule *self, PyObject *args, PyObject *kwargs)
{
// Like mon, osd, mds
char *type = nullptr;
Py_ssize_t inbuf_len = 0;
bufferlist inbuf = {};
+ static const char * keywords[] {
+ "result",
+ "svc_type",
+ "svc_id",
+ "command",
+ "tag",
+ "inbuf",
+ // --- kwargs star here
+ "one_shot", // whether to keep the command while we reestablish connection
+ nullptr // must be the last element
+ };
+
+ int one_shot = false;
+
PyObject *completion = nullptr;
- if (!PyArg_ParseTuple(args, "Ossssz#:ceph_send_command",
- &completion, &type, &name, &cmd_json, &tag, &inbuf_ptr, &inbuf_len)) {
+ if (!PyArg_ParseTupleAndKeywords(args, kwargs, "Ossssz#|$p:ceph_send_command", const_cast<char**>(keywords),
+ &completion, &type, &name, &cmd_json, &tag, &inbuf_ptr, &inbuf_len, &one_shot)) {
return nullptr;
}
inbuf,
&command_c->outbl,
&command_c->outs,
- new C_OnFinisher(command_c, &self->py_modules->cmd_finisher));
+ new C_OnFinisher(command_c, &self->py_modules->cmd_finisher),
+ one_shot);
if (r != 0) {
string msg("failed to send command to mds: ");
msg.append(cpp_strerror(r));
{"_ceph_get_daemon_status", (PyCFunction)get_daemon_status, METH_VARARGS,
"Get a service's status"},
- {"_ceph_send_command", (PyCFunction)ceph_send_command, METH_VARARGS,
+ {"_ceph_send_command", (PyCFunction)ceph_send_command, METH_VARARGS | METH_KEYWORDS,
"Send a mon command"},
{"_ceph_set_health_checks", (PyCFunction)ceph_set_health_checks, METH_VARARGS,
svc_id: str,
command: str,
tag: str,
- inbuf: Optional[str]) -> None: ...
+ inbuf: Optional[str],
+ *,
+ one_shot: bool) -> None: ...
def _ceph_set_health_checks(self, checks: Mapping[str, HealthCheckT]) -> None: ...
def _ceph_get_mgr_id(self) -> str: ...
def _ceph_get_ceph_conf_path(self) -> str: ...
return r
- def tell_command(self, daemon_type: str, daemon_id: str, cmd_dict: dict, inbuf: Optional[str] = None) -> Tuple[int, str, str]:
+ def tell_command(self, daemon_type: str, daemon_id: str, cmd_dict: dict, inbuf: Optional[str] = None, one_shot: bool = False) -> Tuple[int, str, str]:
"""
Helper for `ceph tell` command execution.
"""
t1 = time.time()
result = CommandResult()
- self.send_command(result, daemon_type, daemon_id, json.dumps(cmd_dict), "", inbuf)
+ self.send_command(result, daemon_type, daemon_id, json.dumps(cmd_dict), "", inbuf, one_shot=one_shot)
r = result.wait()
t2 = time.time()
svc_id: str,
command: str,
tag: str,
- inbuf: Optional[str] = None) -> None:
+ inbuf: Optional[str] = None,
+ *, # kw-only args go below
+ one_shot: bool = False) -> None:
"""
Called by the plugin to send a command to the mon
cluster.
triggered, with notify_type set to "command", and notify_id set to
the tag of the command.
:param str inbuf: input buffer for sending additional data.
+ :param bool one_shot: a keyword-only param to make the command abort
+ with EPIPE when the target resets or refuses to reconnect
"""
- self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf)
+ self._ceph_send_command(result, svc_type, svc_id, command, tag, inbuf, one_shot=one_shot)
def tool_exec(
self,
def _ceph_get(self, data_name):
return self.mock_store_get('_ceph_get', data_name, mock.MagicMock())
- def _ceph_send_command(self, res, svc_type, svc_id, command, tag, inbuf):
+ def _ceph_send_command(self, res, svc_type, svc_id, command, tag, inbuf, *, one_shot=False):
cmd = json.loads(command)
getattr(self, '_mon_commands_sent', []).append(cmd)