#define COMMAND_TABLE_H_
#include "messages/MCommand.h"
+#include "messages/MMgrCommand.h"
class CommandOp
{
ceph::buffer::list *outbl;
std::string *outs;
- ceph::ref_t<MCommand> get_message(const uuid_d &fsid) const
+ MessageRef get_message(const uuid_d &fsid,
+ bool mgr=false) const
{
- auto m = make_message<MCommand>(fsid);
- m->cmd = cmd;
- m->set_data(inbl);
- m->set_tid(tid);
-
- return m;
+ if (mgr) {
+ auto m = make_message<MMgrCommand>(fsid);
+ m->cmd = cmd;
+ m->set_data(inbl);
+ m->set_tid(tid);
+ return m;
+ } else {
+ auto m = make_message<MCommand>(fsid);
+ m->cmd = cmd;
+ m->set_data(inbl);
+ m->set_tid(tid);
+ return m;
+ }
}
CommandOp(const ceph_tid_t t) : tid(t), on_finish(nullptr),
#include "messages/MMgrConfigure.h"
#include "messages/MCommand.h"
#include "messages/MCommandReply.h"
+#include "messages/MMgrCommand.h"
+#include "messages/MMgrCommandReply.h"
#include "messages/MPGStats.h"
using std::string;
return handle_mgr_close(ref_cast<MMgrClose>(m));
case MSG_COMMAND_REPLY:
if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
- handle_command_reply(ref_cast<MCommandReply>(m));
+ MCommandReply *c = static_cast<MCommandReply*>(m.get());
+ handle_command_reply(c->get_tid(), c->get_data(), c->rs, c->r);
+ return true;
+ } else {
+ return false;
+ }
+ case MSG_MGR_COMMAND_REPLY:
+ if (m->get_source().type() == CEPH_ENTITY_TYPE_MGR) {
+ MMgrCommandReply *c = static_cast<MMgrCommandReply*>(m.get());
+ handle_command_reply(c->get_tid(), c->get_data(), c->rs, c->r);
return true;
} else {
return false;
// resend any pending commands
for (const auto &p : command_table.get_commands()) {
- auto m = p.second.get_message({});
+ auto m = p.second.get_message(
+ {},
+ HAVE_FEATURE(map.active_mgr_features, SERVER_OCTOPUS));
ceph_assert(session);
ceph_assert(session->con);
session->con->send_message2(std::move(m));
if (session && session->con) {
// Leaving fsid argument null because it isn't used.
- auto m = op.get_message({});
+ auto m = op.get_message(
+ {},
+ HAVE_FEATURE(map.active_mgr_features, SERVER_OCTOPUS));
session->con->send_message2(std::move(m));
} else {
ldout(cct, 5) << "no mgr session (no running mgr daemon?), waiting" << dendl;
return 0;
}
-bool MgrClient::handle_command_reply(ref_t<MCommandReply> m)
+bool MgrClient::handle_command_reply(
+ uint64_t tid,
+ bufferlist& data,
+ const std::string& rs,
+ int r)
{
ceph_assert(ceph_mutex_is_locked_by_me(lock));
- ldout(cct, 20) << *m << dendl;
+ ldout(cct, 20) << "tid " << tid << " r " << r << dendl;
- const auto tid = m->get_tid();
if (!command_table.exists(tid)) {
- ldout(cct, 4) << "handle_command_reply tid " << m->get_tid()
+ ldout(cct, 4) << "handle_command_reply tid " << tid
<< " not found" << dendl;
return true;
}
auto &op = command_table.get_command(tid);
if (op.outbl) {
- op.outbl->claim(m->get_data());
+ op.outbl->claim(data);
}
if (op.outs) {
- *(op.outs) = m->rs;
+ *(op.outs) = rs;
}
if (op.on_finish) {
- op.on_finish->complete(m->r);
+ op.on_finish->complete(r);
}
command_table.erase(tid);