#include "messages/MMonCommandAck.h"
#include "mon/MonClient.h"
#include "mon/MonMap.h"
+#include "osd/OSDMap.h"
#include "msg/SimpleMessenger.h"
#include "tools/common.h"
static Tokenizer *tok;
// sync command
+bool pending_tell; // is tell, vs monitor command
+uint64_t pending_tid = 0;
+EntityName pending_target;
vector<string> pending_cmd;
bufferlist pending_bl;
bool reply;
entity_inst_t reply_from;
Context *resend_event = 0;
-
+OSDMap *osdmap = 0;
// observe (push)
#include "mon/PGMap.h"
#include "messages/MMonObserve.h"
#include "messages/MMonObserveNotify.h"
+#include "messages/MOSDMap.h"
+#include "messages/MCommand.h"
+#include "messages/MCommandReply.h"
static set<int> registered, seen;
version_t map_ver[PAXOS_NUM];
+static void handle_osd_map(CephToolCtx *ctx, MOSDMap *m)
+{
+ epoch_t e = m->get_first();
+ assert(m->maps.count(e));
+ ctx->lock.Lock();
+ delete osdmap;
+ osdmap = new OSDMap;
+ osdmap->decode(m->maps[e]);
+ cmd_cond.Signal();
+ ctx->lock.Unlock();
+ m->put();
+}
+
static void handle_observe(CephToolCtx *ctx, MMonObserve *observe)
{
dout(1) << observe->get_source() << " -> " << get_paxos_name(observe->machine_id)
ack->put();
}
+static void handle_ack(CephToolCtx *ctx, MCommandReply *ack)
+{
+ ctx->lock.Lock();
+ if (ack->get_tid() == pending_tid) {
+ reply = true;
+ reply_from = ack->get_source_inst();
+ reply_rs = ack->rs;
+ reply_rc = ack->r;
+ reply_bl = ack->get_data();
+ cmd_cond.Signal();
+ if (resend_event) {
+ ctx->timer.cancel_event(resend_event);
+ resend_event = 0;
+ }
+ }
+ ctx->lock.Unlock();
+ ack->put();
+}
+
static void send_command(CephToolCtx *ctx)
{
- version_t last_seen_version = 0;
- MMonCommand *m = new MMonCommand(ctx->mc.monmap.fsid, last_seen_version);
+ if (!pending_tell) {
+ version_t last_seen_version = 0;
+ MMonCommand *m = new MMonCommand(ctx->mc.monmap.fsid, last_seen_version);
+ m->cmd = pending_cmd;
+ m->set_data(pending_bl);
+
+ if (!ctx->concise)
+ *ctx->log << ceph_clock_now(g_ceph_context) << " mon" << " <- " << pending_cmd << std::endl;
+
+ ctx->mc.send_mon_message(m);
+ return;
+ }
+
+ if (!ctx->concise)
+ *ctx->log << ceph_clock_now(g_ceph_context) << " " << pending_target << " <- " << pending_cmd << std::endl;
+
+ MCommand *m = new MCommand(ctx->mc.monmap.fsid);
m->cmd = pending_cmd;
m->set_data(pending_bl);
+ m->set_tid(++pending_tid);
+
+ if (pending_target.get_type() == (int)entity_name_t::TYPE_OSD) {
+ if (!osdmap) {
+ ctx->mc.sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
+ ctx->mc.renew_subs();
+ while (!osdmap)
+ cmd_cond.Wait(ctx->lock);
+ }
- if (!ctx->concise)
- *ctx->log << ceph_clock_now(g_ceph_context) << " mon" << " <- " << pending_cmd << std::endl;
- ctx->mc.send_mon_message(m);
+ const char *start = pending_target.get_id().c_str();
+ char *end;
+ int n = strtoll(start, &end, 10);
+ if (end <= start) {
+ reply_rc = -EINVAL;
+ reply = true;
+ return;
+ }
+
+ if (!osdmap->is_up(n)) {
+ reply_rc = -ENOENT;
+ reply = true;
+ }
+
+ messenger->send_message(m, osdmap->get_inst(n));
+ return;
+ }
+
+ reply_rc = -EINVAL;
+ reply = true;
}
class Admin : public Dispatcher {
case MSG_MON_COMMAND_ACK:
handle_ack(ctx, (MMonCommandAck*)m);
break;
+ case MSG_COMMAND_REPLY:
+ handle_ack(ctx, (MCommandReply*)m);
+ break;
case MSG_MON_OBSERVE_NOTIFY:
handle_notify(ctx, (MMonObserveNotify *)m);
break;
case CEPH_MSG_MON_MAP:
m->put();
break;
+ case CEPH_MSG_OSD_MAP:
+ handle_osd_map(ctx, (MOSDMap *)m);
+ break;
default:
return false;
}
bool ms_handle_reset(Connection *con) { return false; }
void ms_handle_remote_reset(Connection *con) {}
+ bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer, bool force_new)
+ {
+ if (dest_type == CEPH_ENTITY_TYPE_MON)
+ return true;
+ *authorizer = ctx->mc.auth->build_authorizer(dest_type);
+ return true;
+ }
+
private:
CephToolCtx *ctx;
};
{
Mutex::Locker l(ctx->lock);
+ pending_target = EntityName();
pending_cmd = cmd;
pending_bl = bl;
+ pending_tell = false;
reply = false;
+ if (cmd.size() > 0 && cmd[0] == "tell") {
+ if (cmd.size() == 1) {
+ cerr << "no tell target specified" << std::endl;
+ return -EINVAL;
+ }
+ if (!pending_target.from_str(cmd[1])) {
+ cerr << "tell target '" << cmd[1] << "' not a valid entity name" << std::endl;
+ return -EINVAL;
+ }
+ pending_cmd.erase(pending_cmd.begin(), pending_cmd.begin() + 2);
+ pending_tell = true;
+ }
+
send_command(ctx);
while (!reply)
ctx->mc.set_messenger(messenger);
ctx->mc.init();
+ // in case we 'tell ...'
+ ctx->mc.set_want_keys(CEPH_ENTITY_TYPE_MDS | CEPH_ENTITY_TYPE_OSD);
+
if (ctx->mc.authenticate() < 0) {
derr << "unable to authenticate as " << g_conf->name << dendl;
ceph_tool_messenger_shutdown();