cmds_CXXFLAGS = ${AM_CFLAGS}
# admin tools
-ceph_SOURCES = ceph.cc msg/SimpleMessenger.cc
+ceph_SOURCES = tools/ceph.cc msg/SimpleMessenger.cc
ceph_LDADD = libcrush.a libcommon.a -ledit -lpthread -lm -lcrypto
cconf_SOURCES = cconf.cc
cconf_LDADD = libcommon.a -lpthread -lm -lcrypto
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#include <sys/stat.h>
-#include <iostream>
-#include <string>
-using namespace std;
-
-#include "config.h"
-
-#include "mon/MonMap.h"
-#include "mon/MonClient.h"
-#include "msg/SimpleMessenger.h"
-#include "messages/MMonCommand.h"
-#include "messages/MMonCommandAck.h"
-
-#include "common/Timer.h"
-#include "common/common_init.h"
-
-#ifndef DARWIN
-#include <envz.h>
-#endif // DARWIN
-
-#include <sys/types.h>
-#include <sys/stat.h>
-#include <fcntl.h>
-
-extern "C" {
-#include <histedit.h>
-}
-
-
-
-Mutex lock("ceph.cc lock");
-Cond cond;
-SimpleMessenger *messenger = 0;
-SafeTimer timer(lock);
-MonClient mc;
-
-const char *outfile = 0;
-
-
-
-// sync command
-vector<string> pending_cmd;
-bufferlist pending_bl;
-bool reply;
-string reply_rs;
-int reply_rc;
-bufferlist reply_bl;
-entity_inst_t reply_from;
-Context *resend_event = 0;
-
-
-
-// observe (push)
-#include "mon/PGMap.h"
-#include "osd/OSDMap.h"
-#include "mds/MDSMap.h"
-#include "include/LogEntry.h"
-#include "include/ClassLibrary.h"
-
-#include "mon/mon_types.h"
-
-#include "messages/MMonObserve.h"
-#include "messages/MMonObserveNotify.h"
-
-int observe = 0;
-bool one_shot = false;
-static PGMap pgmap;
-static MDSMap mdsmap;
-static OSDMap osdmap;
-
-static set<int> registered, seen;
-
-version_t map_ver[PAXOS_NUM];
-
-version_t last_seen_version = 0;
-
-void handle_observe(MMonObserve *observe)
-{
- dout(1) << observe->get_source() << " -> " << get_paxos_name(observe->machine_id)
- << " registered" << dendl;
- lock.Lock();
- registered.insert(observe->machine_id);
- lock.Unlock();
- observe->put();
-}
-
-void handle_notify(MMonObserveNotify *notify)
-{
- utime_t now = g_clock.now();
-
- dout(1) << notify->get_source() << " -> " << get_paxos_name(notify->machine_id)
- << " v" << notify->ver
- << (notify->is_latest ? " (latest)" : "")
- << dendl;
-
- if (ceph_fsid_compare(¬ify->fsid, &mc.monmap.fsid)) {
- dout(0) << notify->get_source_inst() << " notify fsid " << notify->fsid << " != " << mc.monmap.fsid << dendl;
- notify->put();
- return;
- }
-
- if (map_ver[notify->machine_id] >= notify->ver)
- return;
-
- switch (notify->machine_id) {
- case PAXOS_PGMAP:
- {
- bufferlist::iterator p = notify->bl.begin();
- if (notify->is_latest) {
- pgmap.decode(p);
- } else {
- PGMap::Incremental inc;
- inc.decode(p);
- pgmap.apply_incremental(inc);
- }
- cout << now << " pg " << pgmap << std::endl;
- break;
- }
-
- case PAXOS_MDSMAP:
- mdsmap.decode(notify->bl);
- cout << now << " mds " << mdsmap << std::endl;
- break;
-
- case PAXOS_OSDMAP:
- {
- if (notify->is_latest) {
- osdmap.decode(notify->bl);
- } else {
- OSDMap::Incremental inc(notify->bl);
- osdmap.apply_incremental(inc);
- }
- cout << now << " osd " << osdmap << std::endl;
- }
- break;
-
- case PAXOS_LOG:
- {
- bufferlist::iterator p = notify->bl.begin();
- if (notify->is_latest) {
- LogSummary summary;
- ::decode(summary, p);
- // show last log message
- if (!summary.tail.empty())
- cout << now << " log " << summary.tail.back() << std::endl;
- } else {
- LogEntry le;
- __u8 v;
- ::decode(v, p);
- while (!p.end()) {
- le.decode(p);
- cout << now << " log " << le << std::endl;
- }
- }
- break;
- }
-
- case PAXOS_CLASS:
- {
- bufferlist::iterator p = notify->bl.begin();
- if (notify->is_latest) {
- ClassLibrary list;
- ::decode(list, p);
- // show the first class info
- map<string, ClassVersionMap>::iterator mapiter = list.library_map.begin();
- if (mapiter != list.library_map.end()) {
- ClassVersionMap& map = mapiter->second;
- tClassVersionMap::iterator iter = map.begin();
-
- if (iter != map.end())
- cout << now << " class " << iter->second << std::endl;
- }
- } else {
- __u8 v;
- ::decode(v, p);
- while (!p.end()) {
- ClassLibraryIncremental inc;
- ::decode(inc, p);
- ClassInfo info;
- inc.decode_info(info);
- cout << now << " class " << info << std::endl;
- }
- }
- break;
- }
-
- case PAXOS_AUTH:
- {
-#if 0
- bufferlist::iterator p = notify->bl.begin();
- if (notify->is_latest) {
- KeyServerData data;
- ::decode(data, p);
- cout << now << " auth " << std::endl;
- } else {
- while (!p.end()) {
- AuthMonitor::Incremental inc;
- inc.decode(p);
- cout << now << " auth " << inc.name.to_str() << std::endl;
- }
- }
-#endif
- /* ignoring auth incremental.. don't want to decode it */
- break;
- }
-
- case PAXOS_MONMAP:
- {
- mc.monmap.decode(notify->bl);
- cout << now << " mon " << mc.monmap << std::endl;
- }
- break;
-
- default:
- cout << now << " ignoring unknown machine id " << notify->machine_id << std::endl;
- }
-
- map_ver[notify->machine_id] = notify->ver;
-
- // have we seen them all?
- seen.insert(notify->machine_id);
- if (one_shot && seen.size() == PAXOS_NUM) {
- messenger->shutdown();
- }
-
- notify->put();
-}
-
-static void send_observe_requests();
-
-class C_ObserverRefresh : public Context {
-public:
- bool newmon;
- C_ObserverRefresh(bool n) : newmon(n) {}
- void finish(int r) {
- send_observe_requests();
- }
-};
-
-static void send_observe_requests()
-{
- dout(1) << "send_observe_requests " << dendl;
-
- bool sent = false;
- for (int i=0; i<PAXOS_NUM; i++) {
- MMonObserve *m = new MMonObserve(mc.monmap.fsid, i, map_ver[i]);
- dout(1) << "mon" << " <- observe " << get_paxos_name(i) << dendl;
- mc.send_mon_message(m);
- sent = true;
- }
-
- registered.clear();
- float seconds = g_conf.paxos_observer_timeout/2;
- dout(1) << " refresh after " << seconds << " with same mon" << dendl;
- timer.add_event_after(seconds, new C_ObserverRefresh(false));
-}
-
-
-
-
-int lines = 0;
-
-void handle_ack(MMonCommandAck *ack)
-{
- lock.Lock();
- reply = true;
- reply_from = ack->get_source_inst();
- reply_rs = ack->rs;
- reply_rc = ack->r;
- reply_bl = ack->get_data();
- cond.Signal();
- if (resend_event) {
- timer.cancel_event(resend_event);
- resend_event = 0;
- }
- lock.Unlock();
- ack->put();
-}
-
-void send_command()
-{
- MMonCommand *m = new MMonCommand(mc.monmap.fsid, last_seen_version);
- m->cmd = pending_cmd;
- m->set_data(pending_bl);
-
- cout << g_clock.now() << " mon" << " <- " << pending_cmd << std::endl;
- mc.send_mon_message(m);
-}
-
-
-class Admin : public Dispatcher {
- bool ms_dispatch(Message *m) {
- switch (m->get_type()) {
- case MSG_MON_COMMAND_ACK:
- handle_ack((MMonCommandAck*)m);
- break;
- case MSG_MON_OBSERVE_NOTIFY:
- handle_notify((MMonObserveNotify *)m);
- break;
- case MSG_MON_OBSERVE:
- handle_observe((MMonObserve *)m);
- break;
- case CEPH_MSG_MON_MAP:
- m->put();
- break;
- default:
- return false;
- }
- return true;
- }
-
- void ms_handle_connect(Connection *con) {
- if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
- lock.Lock();
- if (observe)
- send_observe_requests();
- if (pending_cmd.size())
- send_command();
- lock.Unlock();
- }
- }
- bool ms_handle_reset(Connection *con) { return false; }
- void ms_handle_remote_reset(Connection *con) {}
-
-} dispatcher;
-
-int do_command(vector<string>& cmd, bufferlist& bl, string& rs, bufferlist& rbl)
-{
- Mutex::Locker l(lock);
-
- pending_cmd = cmd;
- pending_bl = bl;
- reply = false;
-
- send_command();
-
- while (!reply)
- cond.Wait(lock);
-
- rs = rs;
- rbl = reply_bl;
- cout << g_clock.now() << " "
- << reply_from.name << " -> '"
- << reply_rs << "' (" << reply_rc << ")"
- << std::endl;
-
- return reply_rc;
-}
-
-
-
-void usage()
-{
- cerr << "usage: ceph [options] [commands]" << std::endl;
- cerr << "If no commands are specified, enter interactive mode.\n";
- cerr << "Commands:" << std::endl;
- cerr << " stop -- cleanly shut down file system" << std::endl
- << " (osd|pg|mds) stat -- get monitor subsystem status" << std::endl
- << " ..." << std::endl;
- cerr << "Options:" << std::endl;
- cerr << " -i infile\n";
- cerr << " -o outfile\n";
- cerr << " specify input or output file (for certain commands)\n";
- cerr << " -s or --status\n";
- cerr << " print current system status\n";
- cerr << " -w or --watch\n";
- cerr << " watch system status changes in real time (push)\n";
- generic_client_usage();
-}
-
-
-const char *cli_prompt(EditLine *e) {
- return "ceph> ";
-}
-
-int do_cli()
-{
- /* emacs style */
- EditLine *el = el_init("ceph", stdin, stdout, stderr);
- el_set(el, EL_PROMPT, &cli_prompt);
- el_set(el, EL_EDITOR, "emacs");
-
- History *myhistory = history_init();
- if (myhistory == 0) {
- fprintf(stderr, "history could not be initialized\n");
- return 1;
- }
-
- HistEvent ev;
-
- /* Set the size of the history */
- history(myhistory, &ev, H_SETSIZE, 800);
-
- /* This sets up the call back functions for history functionality */
- el_set(el, EL_HIST, history, myhistory);
-
- Tokenizer *tok = tok_init(NULL);
-
- bufferlist in;
- while (1) {
- int count; // # chars read
- const char *line = el_gets(el, &count);
-
- if (!count) {
- cout << "quit" << std::endl;
- break;
- }
-
- //cout << "typed '" << line << "'" << std::endl;
-
- if (strcmp(line, "quit\n") == 0)
- break;
-
- history(myhistory, &ev, H_ENTER, line);
-
- int argc;
- const char **argv;
- tok_str(tok, line, &argc, &argv);
- tok_reset(tok);
-
- vector<string> cmd;
- const char *infile = 0;
- const char *outfile = 0;
- for (int i=0; i<argc; i++) {
- if (strcmp(argv[i], ">") == 0 && i < argc-1) {
- outfile = argv[++i];
- continue;
- }
- if (argv[i][0] == '>') {
- outfile = argv[i] + 1;
- while (*outfile == ' ') outfile++;
- continue;
- }
- if (strcmp(argv[i], "<") == 0 && i < argc-1) {
- infile = argv[++i];
- continue;
- }
- if (argv[i][0] == '<') {
- infile = argv[i] + 1;
- while (*infile == ' ') infile++;
- continue;
- }
- cmd.push_back(argv[i]);
- }
- if (cmd.empty())
- continue;
-
- if (cmd.size() == 1 && cmd[0] == "print") {
- cout << "----" << std::endl;
- write(1, in.c_str(), in.length());
- cout << "---- (" << in.length() << " bytes)" << std::endl;
- continue;
- }
-
- //cout << "cmd is " << cmd << std::endl;
-
- bufferlist out;
- if (infile) {
- if (out.read_file(infile) == 0) {
- cout << "read " << out.length() << " from " << infile << std::endl;
- } else {
- char buf[80];
- cerr << "couldn't read from " << infile << ": " << strerror_r(errno, buf, sizeof(buf)) << std::endl;
- continue;
- }
- }
-
- in.clear();
- string rs;
- do_command(cmd, out, rs, in);
-
- if (in.length()) {
- if (outfile) {
- if (strcmp(outfile, "-") == 0) {
- cout << "----" << std::endl;
- write(1, in.c_str(), in.length());
- cout << "---- (" << in.length() << " bytes)" << std::endl;
- } else {
- in.write_file(outfile);
- cout << "wrote " << in.length() << " to " << outfile << std::endl;
- }
- } else {
- cout << "got " << in.length() << " byte payload; 'print' to dump to terminal, or add '>-' to command." << std::endl;
- }
- }
- }
-
- history_end(myhistory);
- el_end(el);
-
- return 0;
-}
-
-
-
-
-
-int main(int argc, const char **argv, const char *envp[])
-{
- DEFINE_CONF_VARS(usage);
- vector<const char*> args;
- argv_to_vec(argc, argv, args);
- env_to_vec(args);
-
- ceph_set_default_id("admin");
-
- common_set_defaults(false);
- common_init(args, "ceph", true);
-
- vec_to_argv(args, argc, argv);
-
- srand(getpid());
-
- // default to 'admin' user
- if (!g_conf.id || !g_conf.id[0])
- g_conf.id = strdup("admin");
-
- char *fname;
- bufferlist indata;
- vector<const char*> nargs;
-
- FOR_EACH_ARG(args) {
- if (CONF_ARG_EQ("out_file", 'o')) {
- CONF_SAFE_SET_ARG_VAL(&outfile, OPT_STR);
- } else if (CONF_ARG_EQ("in_file", 'i')) {
- CONF_SAFE_SET_ARG_VAL(&fname, OPT_STR);
- int fd = ::open(fname, O_RDONLY);
- struct stat st;
- if (::fstat(fd, &st) == 0) {
- indata.push_back(buffer::create(st.st_size));
- indata.zero();
- ::read(fd, indata.c_str(), st.st_size);
- ::close(fd);
- cout << "read " << st.st_size << " bytes from " << args[i] << std::endl;
- }
- } else if (CONF_ARG_EQ("status", 's')) {
- CONF_SAFE_SET_ARG_VAL(&observe, OPT_BOOL);
- one_shot = true;
- } else if (CONF_ARG_EQ("watch", 'w')) {
- CONF_SAFE_SET_ARG_VAL(&observe, OPT_BOOL);
- } else if (CONF_ARG_EQ("help", 'h')) {
- usage();
- } else if (args[i][0] == '-' && nargs.empty()) {
- cerr << "unrecognized option " << args[i] << std::endl;
- usage();
- } else
- nargs.push_back(args[i]);
- }
-
- // build command
- vector<string> vcmd;
- string cmd;
- for (unsigned i=0; i<nargs.size(); i++) {
- if (i) cmd += " ";
- cmd += nargs[i];
- vcmd.push_back(string(nargs[i]));
- }
-
- // get monmap
- if (mc.build_initial_monmap() < 0)
- return -1;
-
- // start up network
- messenger = new SimpleMessenger();
- messenger->register_entity(entity_name_t::CLIENT());
- messenger->add_dispatcher_head(&dispatcher);
-
- messenger->start();
-
- mc.set_messenger(messenger);
- mc.init();
-
- if (mc.authenticate() < 0) {
- cerr << "unable to authenticate as " << *g_conf.entity_name << std::endl;
- return -1;
- }
- if (mc.get_monmap() < 0) {
- cerr << "unable to get monmap" << std::endl;
- return -1;
- }
-
- int ret = 0;
-
- if (observe) {
- lock.Lock();
- send_observe_requests();
- lock.Unlock();
- } else {
- if (vcmd.size()) {
-
- string rs;
- bufferlist odata;
- ret = do_command(vcmd, indata, rs, odata);
-
- int len = odata.length();
- if (len) {
- if (outfile) {
- if (strcmp(outfile, "-") == 0) {
- ::write(1, odata.c_str(), len);
- } else {
- odata.write_file(outfile);
- }
- cout << g_clock.now() << " wrote " << len << " byte payload to " << outfile << std::endl;
- } else {
- cout << g_clock.now() << " got " << len << " byte payload, discarding (specify -o <outfile)" << std::endl;
- }
- }
- } else {
- // interactive mode
- do_cli();
- }
-
- messenger->shutdown();
- }
-
-
- // wait for messenger to finish
- messenger->wait();
- messenger->destroy();
- return ret;
-}
-
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <sys/stat.h>
+#include <iostream>
+#include <string>
+using namespace std;
+
+#include "config.h"
+
+#include "mon/MonMap.h"
+#include "mon/MonClient.h"
+#include "msg/SimpleMessenger.h"
+#include "messages/MMonCommand.h"
+#include "messages/MMonCommandAck.h"
+
+#include "common/Timer.h"
+#include "common/common_init.h"
+
+#ifndef DARWIN
+#include <envz.h>
+#endif // DARWIN
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+extern "C" {
+#include <histedit.h>
+}
+
+
+
+Mutex lock("ceph.cc lock");
+Cond cond;
+SimpleMessenger *messenger = 0;
+SafeTimer timer(lock);
+MonClient mc;
+
+const char *outfile = 0;
+
+
+
+// sync command
+vector<string> pending_cmd;
+bufferlist pending_bl;
+bool reply;
+string reply_rs;
+int reply_rc;
+bufferlist reply_bl;
+entity_inst_t reply_from;
+Context *resend_event = 0;
+
+
+
+// observe (push)
+#include "mon/PGMap.h"
+#include "osd/OSDMap.h"
+#include "mds/MDSMap.h"
+#include "include/LogEntry.h"
+#include "include/ClassLibrary.h"
+
+#include "mon/mon_types.h"
+
+#include "messages/MMonObserve.h"
+#include "messages/MMonObserveNotify.h"
+
+int observe = 0;
+bool one_shot = false;
+static PGMap pgmap;
+static MDSMap mdsmap;
+static OSDMap osdmap;
+
+static set<int> registered, seen;
+
+version_t map_ver[PAXOS_NUM];
+
+version_t last_seen_version = 0;
+
+void handle_observe(MMonObserve *observe)
+{
+ dout(1) << observe->get_source() << " -> " << get_paxos_name(observe->machine_id)
+ << " registered" << dendl;
+ lock.Lock();
+ registered.insert(observe->machine_id);
+ lock.Unlock();
+ observe->put();
+}
+
+void handle_notify(MMonObserveNotify *notify)
+{
+ utime_t now = g_clock.now();
+
+ dout(1) << notify->get_source() << " -> " << get_paxos_name(notify->machine_id)
+ << " v" << notify->ver
+ << (notify->is_latest ? " (latest)" : "")
+ << dendl;
+
+ if (ceph_fsid_compare(¬ify->fsid, &mc.monmap.fsid)) {
+ dout(0) << notify->get_source_inst() << " notify fsid " << notify->fsid << " != " << mc.monmap.fsid << dendl;
+ notify->put();
+ return;
+ }
+
+ if (map_ver[notify->machine_id] >= notify->ver)
+ return;
+
+ switch (notify->machine_id) {
+ case PAXOS_PGMAP:
+ {
+ bufferlist::iterator p = notify->bl.begin();
+ if (notify->is_latest) {
+ pgmap.decode(p);
+ } else {
+ PGMap::Incremental inc;
+ inc.decode(p);
+ pgmap.apply_incremental(inc);
+ }
+ cout << now << " pg " << pgmap << std::endl;
+ break;
+ }
+
+ case PAXOS_MDSMAP:
+ mdsmap.decode(notify->bl);
+ cout << now << " mds " << mdsmap << std::endl;
+ break;
+
+ case PAXOS_OSDMAP:
+ {
+ if (notify->is_latest) {
+ osdmap.decode(notify->bl);
+ } else {
+ OSDMap::Incremental inc(notify->bl);
+ osdmap.apply_incremental(inc);
+ }
+ cout << now << " osd " << osdmap << std::endl;
+ }
+ break;
+
+ case PAXOS_LOG:
+ {
+ bufferlist::iterator p = notify->bl.begin();
+ if (notify->is_latest) {
+ LogSummary summary;
+ ::decode(summary, p);
+ // show last log message
+ if (!summary.tail.empty())
+ cout << now << " log " << summary.tail.back() << std::endl;
+ } else {
+ LogEntry le;
+ __u8 v;
+ ::decode(v, p);
+ while (!p.end()) {
+ le.decode(p);
+ cout << now << " log " << le << std::endl;
+ }
+ }
+ break;
+ }
+
+ case PAXOS_CLASS:
+ {
+ bufferlist::iterator p = notify->bl.begin();
+ if (notify->is_latest) {
+ ClassLibrary list;
+ ::decode(list, p);
+ // show the first class info
+ map<string, ClassVersionMap>::iterator mapiter = list.library_map.begin();
+ if (mapiter != list.library_map.end()) {
+ ClassVersionMap& map = mapiter->second;
+ tClassVersionMap::iterator iter = map.begin();
+
+ if (iter != map.end())
+ cout << now << " class " << iter->second << std::endl;
+ }
+ } else {
+ __u8 v;
+ ::decode(v, p);
+ while (!p.end()) {
+ ClassLibraryIncremental inc;
+ ::decode(inc, p);
+ ClassInfo info;
+ inc.decode_info(info);
+ cout << now << " class " << info << std::endl;
+ }
+ }
+ break;
+ }
+
+ case PAXOS_AUTH:
+ {
+#if 0
+ bufferlist::iterator p = notify->bl.begin();
+ if (notify->is_latest) {
+ KeyServerData data;
+ ::decode(data, p);
+ cout << now << " auth " << std::endl;
+ } else {
+ while (!p.end()) {
+ AuthMonitor::Incremental inc;
+ inc.decode(p);
+ cout << now << " auth " << inc.name.to_str() << std::endl;
+ }
+ }
+#endif
+ /* ignoring auth incremental.. don't want to decode it */
+ break;
+ }
+
+ case PAXOS_MONMAP:
+ {
+ mc.monmap.decode(notify->bl);
+ cout << now << " mon " << mc.monmap << std::endl;
+ }
+ break;
+
+ default:
+ cout << now << " ignoring unknown machine id " << notify->machine_id << std::endl;
+ }
+
+ map_ver[notify->machine_id] = notify->ver;
+
+ // have we seen them all?
+ seen.insert(notify->machine_id);
+ if (one_shot && seen.size() == PAXOS_NUM) {
+ messenger->shutdown();
+ }
+
+ notify->put();
+}
+
+static void send_observe_requests();
+
+class C_ObserverRefresh : public Context {
+public:
+ bool newmon;
+ C_ObserverRefresh(bool n) : newmon(n) {}
+ void finish(int r) {
+ send_observe_requests();
+ }
+};
+
+static void send_observe_requests()
+{
+ dout(1) << "send_observe_requests " << dendl;
+
+ bool sent = false;
+ for (int i=0; i<PAXOS_NUM; i++) {
+ MMonObserve *m = new MMonObserve(mc.monmap.fsid, i, map_ver[i]);
+ dout(1) << "mon" << " <- observe " << get_paxos_name(i) << dendl;
+ mc.send_mon_message(m);
+ sent = true;
+ }
+
+ registered.clear();
+ float seconds = g_conf.paxos_observer_timeout/2;
+ dout(1) << " refresh after " << seconds << " with same mon" << dendl;
+ timer.add_event_after(seconds, new C_ObserverRefresh(false));
+}
+
+
+
+
+int lines = 0;
+
+void handle_ack(MMonCommandAck *ack)
+{
+ lock.Lock();
+ reply = true;
+ reply_from = ack->get_source_inst();
+ reply_rs = ack->rs;
+ reply_rc = ack->r;
+ reply_bl = ack->get_data();
+ cond.Signal();
+ if (resend_event) {
+ timer.cancel_event(resend_event);
+ resend_event = 0;
+ }
+ lock.Unlock();
+ ack->put();
+}
+
+void send_command()
+{
+ MMonCommand *m = new MMonCommand(mc.monmap.fsid, last_seen_version);
+ m->cmd = pending_cmd;
+ m->set_data(pending_bl);
+
+ cout << g_clock.now() << " mon" << " <- " << pending_cmd << std::endl;
+ mc.send_mon_message(m);
+}
+
+
+class Admin : public Dispatcher {
+ bool ms_dispatch(Message *m) {
+ switch (m->get_type()) {
+ case MSG_MON_COMMAND_ACK:
+ handle_ack((MMonCommandAck*)m);
+ break;
+ case MSG_MON_OBSERVE_NOTIFY:
+ handle_notify((MMonObserveNotify *)m);
+ break;
+ case MSG_MON_OBSERVE:
+ handle_observe((MMonObserve *)m);
+ break;
+ case CEPH_MSG_MON_MAP:
+ m->put();
+ break;
+ default:
+ return false;
+ }
+ return true;
+ }
+
+ void ms_handle_connect(Connection *con) {
+ if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) {
+ lock.Lock();
+ if (observe)
+ send_observe_requests();
+ if (pending_cmd.size())
+ send_command();
+ lock.Unlock();
+ }
+ }
+ bool ms_handle_reset(Connection *con) { return false; }
+ void ms_handle_remote_reset(Connection *con) {}
+
+} dispatcher;
+
+int do_command(vector<string>& cmd, bufferlist& bl, string& rs, bufferlist& rbl)
+{
+ Mutex::Locker l(lock);
+
+ pending_cmd = cmd;
+ pending_bl = bl;
+ reply = false;
+
+ send_command();
+
+ while (!reply)
+ cond.Wait(lock);
+
+ rs = rs;
+ rbl = reply_bl;
+ cout << g_clock.now() << " "
+ << reply_from.name << " -> '"
+ << reply_rs << "' (" << reply_rc << ")"
+ << std::endl;
+
+ return reply_rc;
+}
+
+
+
+void usage()
+{
+ cerr << "usage: ceph [options] [commands]" << std::endl;
+ cerr << "If no commands are specified, enter interactive mode.\n";
+ cerr << "Commands:" << std::endl;
+ cerr << " stop -- cleanly shut down file system" << std::endl
+ << " (osd|pg|mds) stat -- get monitor subsystem status" << std::endl
+ << " ..." << std::endl;
+ cerr << "Options:" << std::endl;
+ cerr << " -i infile\n";
+ cerr << " -o outfile\n";
+ cerr << " specify input or output file (for certain commands)\n";
+ cerr << " -s or --status\n";
+ cerr << " print current system status\n";
+ cerr << " -w or --watch\n";
+ cerr << " watch system status changes in real time (push)\n";
+ generic_client_usage();
+}
+
+
+const char *cli_prompt(EditLine *e) {
+ return "ceph> ";
+}
+
+int do_cli()
+{
+ /* emacs style */
+ EditLine *el = el_init("ceph", stdin, stdout, stderr);
+ el_set(el, EL_PROMPT, &cli_prompt);
+ el_set(el, EL_EDITOR, "emacs");
+
+ History *myhistory = history_init();
+ if (myhistory == 0) {
+ fprintf(stderr, "history could not be initialized\n");
+ return 1;
+ }
+
+ HistEvent ev;
+
+ /* Set the size of the history */
+ history(myhistory, &ev, H_SETSIZE, 800);
+
+ /* This sets up the call back functions for history functionality */
+ el_set(el, EL_HIST, history, myhistory);
+
+ Tokenizer *tok = tok_init(NULL);
+
+ bufferlist in;
+ while (1) {
+ int count; // # chars read
+ const char *line = el_gets(el, &count);
+
+ if (!count) {
+ cout << "quit" << std::endl;
+ break;
+ }
+
+ //cout << "typed '" << line << "'" << std::endl;
+
+ if (strcmp(line, "quit\n") == 0)
+ break;
+
+ history(myhistory, &ev, H_ENTER, line);
+
+ int argc;
+ const char **argv;
+ tok_str(tok, line, &argc, &argv);
+ tok_reset(tok);
+
+ vector<string> cmd;
+ const char *infile = 0;
+ const char *outfile = 0;
+ for (int i=0; i<argc; i++) {
+ if (strcmp(argv[i], ">") == 0 && i < argc-1) {
+ outfile = argv[++i];
+ continue;
+ }
+ if (argv[i][0] == '>') {
+ outfile = argv[i] + 1;
+ while (*outfile == ' ') outfile++;
+ continue;
+ }
+ if (strcmp(argv[i], "<") == 0 && i < argc-1) {
+ infile = argv[++i];
+ continue;
+ }
+ if (argv[i][0] == '<') {
+ infile = argv[i] + 1;
+ while (*infile == ' ') infile++;
+ continue;
+ }
+ cmd.push_back(argv[i]);
+ }
+ if (cmd.empty())
+ continue;
+
+ if (cmd.size() == 1 && cmd[0] == "print") {
+ cout << "----" << std::endl;
+ write(1, in.c_str(), in.length());
+ cout << "---- (" << in.length() << " bytes)" << std::endl;
+ continue;
+ }
+
+ //cout << "cmd is " << cmd << std::endl;
+
+ bufferlist out;
+ if (infile) {
+ if (out.read_file(infile) == 0) {
+ cout << "read " << out.length() << " from " << infile << std::endl;
+ } else {
+ char buf[80];
+ cerr << "couldn't read from " << infile << ": " << strerror_r(errno, buf, sizeof(buf)) << std::endl;
+ continue;
+ }
+ }
+
+ in.clear();
+ string rs;
+ do_command(cmd, out, rs, in);
+
+ if (in.length()) {
+ if (outfile) {
+ if (strcmp(outfile, "-") == 0) {
+ cout << "----" << std::endl;
+ write(1, in.c_str(), in.length());
+ cout << "---- (" << in.length() << " bytes)" << std::endl;
+ } else {
+ in.write_file(outfile);
+ cout << "wrote " << in.length() << " to " << outfile << std::endl;
+ }
+ } else {
+ cout << "got " << in.length() << " byte payload; 'print' to dump to terminal, or add '>-' to command." << std::endl;
+ }
+ }
+ }
+
+ history_end(myhistory);
+ el_end(el);
+
+ return 0;
+}
+
+
+
+
+
+int main(int argc, const char **argv, const char *envp[])
+{
+ DEFINE_CONF_VARS(usage);
+ vector<const char*> args;
+ argv_to_vec(argc, argv, args);
+ env_to_vec(args);
+
+ ceph_set_default_id("admin");
+
+ common_set_defaults(false);
+ common_init(args, "ceph", true);
+
+ vec_to_argv(args, argc, argv);
+
+ srand(getpid());
+
+ // default to 'admin' user
+ if (!g_conf.id || !g_conf.id[0])
+ g_conf.id = strdup("admin");
+
+ char *fname;
+ bufferlist indata;
+ vector<const char*> nargs;
+
+ FOR_EACH_ARG(args) {
+ if (CONF_ARG_EQ("out_file", 'o')) {
+ CONF_SAFE_SET_ARG_VAL(&outfile, OPT_STR);
+ } else if (CONF_ARG_EQ("in_file", 'i')) {
+ CONF_SAFE_SET_ARG_VAL(&fname, OPT_STR);
+ int fd = ::open(fname, O_RDONLY);
+ struct stat st;
+ if (::fstat(fd, &st) == 0) {
+ indata.push_back(buffer::create(st.st_size));
+ indata.zero();
+ ::read(fd, indata.c_str(), st.st_size);
+ ::close(fd);
+ cout << "read " << st.st_size << " bytes from " << args[i] << std::endl;
+ }
+ } else if (CONF_ARG_EQ("status", 's')) {
+ CONF_SAFE_SET_ARG_VAL(&observe, OPT_BOOL);
+ one_shot = true;
+ } else if (CONF_ARG_EQ("watch", 'w')) {
+ CONF_SAFE_SET_ARG_VAL(&observe, OPT_BOOL);
+ } else if (CONF_ARG_EQ("help", 'h')) {
+ usage();
+ } else if (args[i][0] == '-' && nargs.empty()) {
+ cerr << "unrecognized option " << args[i] << std::endl;
+ usage();
+ } else
+ nargs.push_back(args[i]);
+ }
+
+ // build command
+ vector<string> vcmd;
+ string cmd;
+ for (unsigned i=0; i<nargs.size(); i++) {
+ if (i) cmd += " ";
+ cmd += nargs[i];
+ vcmd.push_back(string(nargs[i]));
+ }
+
+ // get monmap
+ if (mc.build_initial_monmap() < 0)
+ return -1;
+
+ // start up network
+ messenger = new SimpleMessenger();
+ messenger->register_entity(entity_name_t::CLIENT());
+ messenger->add_dispatcher_head(&dispatcher);
+
+ messenger->start();
+
+ mc.set_messenger(messenger);
+ mc.init();
+
+ if (mc.authenticate() < 0) {
+ cerr << "unable to authenticate as " << *g_conf.entity_name << std::endl;
+ return -1;
+ }
+ if (mc.get_monmap() < 0) {
+ cerr << "unable to get monmap" << std::endl;
+ return -1;
+ }
+
+ int ret = 0;
+
+ if (observe) {
+ lock.Lock();
+ send_observe_requests();
+ lock.Unlock();
+ } else {
+ if (vcmd.size()) {
+
+ string rs;
+ bufferlist odata;
+ ret = do_command(vcmd, indata, rs, odata);
+
+ int len = odata.length();
+ if (len) {
+ if (outfile) {
+ if (strcmp(outfile, "-") == 0) {
+ ::write(1, odata.c_str(), len);
+ } else {
+ odata.write_file(outfile);
+ }
+ cout << g_clock.now() << " wrote " << len << " byte payload to " << outfile << std::endl;
+ } else {
+ cout << g_clock.now() << " got " << len << " byte payload, discarding (specify -o <outfile)" << std::endl;
+ }
+ }
+ } else {
+ // interactive mode
+ do_cli();
+ }
+
+ messenger->shutdown();
+ }
+
+
+ // wait for messenger to finish
+ messenger->wait();
+ messenger->destroy();
+ return ret;
+}
+