From: Sage Weil Date: Mon, 15 Dec 2008 19:45:10 +0000 (-0800) Subject: ceph: fold cobserver into ceph X-Git-Tag: v0.6~62 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=483dba8d6850e85713398f987d05163dafe20c94;p=ceph.git ceph: fold cobserver into ceph --- diff --git a/src/Makefile.am b/src/Makefile.am index 42e1ab11d6c49..60b71e1cbaad2 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -9,10 +9,8 @@ cmon_SOURCES = cmon.cc msg/SimpleMessenger.cc cmon_LDADD = libcommon.a libmon.a libcrush.a libcommon.a # admin tools -cobserver_SOURCES = cobserver.cc msg/SimpleMessenger.cc -cobserver_LDADD = libcrush.a libcommon.a ceph_SOURCES = ceph.cc msg/SimpleMessenger.cc -ceph_LDADD = libcommon.a -ledit +ceph_LDADD = libcrush.a libcommon.a -ledit mkmonfs_SOURCES = mkmonfs.cc mkmonfs_LDADD = libcommon.a libmon.a libcrush.a libcommon.a monmaptool_SOURCES = monmaptool.cc @@ -43,7 +41,7 @@ csyn_LDADD = libcommon.a libclient.a libosdc.a libcrush.a libcommon.a bin_PROGRAMS = \ cmon cmds cosd csyn \ - ceph cobserver \ + ceph \ mkmonfs monmaptool osdmaptool crushtool \ streamtest dupstore dumpjournal testmsgr diff --git a/src/ceph.cc b/src/ceph.cc index 323fd6dc0fb58..023395c87fb4c 100644 --- a/src/ceph.cc +++ b/src/ceph.cc @@ -43,13 +43,13 @@ extern "C" { Mutex lock("ceph.cc lock"); Cond cond; - Messenger *messenger = 0; +MonMap monmap; +SafeTimer timer(lock); const char *outfile = 0; -int watch = 0; -MonMap monmap; + // sync command vector pending_cmd; @@ -62,7 +62,151 @@ entity_inst_t reply_from; Context *resend_event = 0; -// watch + +// observe (push) +#include "mon/ClientMap.h" +#include "mon/PGMap.h" +#include "mon/ClientMap.h" +#include "osd/OSDMap.h" +#include "mds/MDSMap.h" +#include "include/LogEntry.h" + +#include "mon/mon_types.h" + +#include "messages/MMonObserve.h" +#include "messages/MMonObserveNotify.h" + +int observe = 0; +static PGMap pgmap; +static MDSMap mdsmap; +static OSDMap osdmap; +static ClientMap clientmap; + +static bool got_response = false; +static bool first_response = false; + +version_t map_ver[PAXOS_NUM]; + +void handle_notify(MMonObserveNotify *notify) +{ + generic_dout(1) << notify->get_source() << " -> " << get_paxos_name(notify->machine_id) + << " v" << notify->ver + << (notify->is_latest ? " (latest)" : "") + << dendl; + + lock.Lock(); + if (!got_response) { + first_response = true; + got_response = true; + } + lock.Unlock(); + + if (map_ver[notify->machine_id] >= notify->ver) + return; + + switch (notify->machine_id) { + case PAXOS_PGMAP: + { + bufferlist::iterator p = notify->bl.begin(); + if (notify->is_latest) { + pgmap.decode(p); + } else { + PGMap::Incremental inc; + inc.decode(p); + pgmap.apply_incremental(inc); + } + dout(0) << " pg " << pgmap << dendl; + break; + } + + case PAXOS_MDSMAP: + mdsmap.decode(notify->bl); + dout(0) << " mds " << mdsmap << dendl; + break; + + case PAXOS_OSDMAP: + { + if (notify->is_latest) { + osdmap.decode(notify->bl); + } else { + OSDMap::Incremental inc(notify->bl); + osdmap.apply_incremental(inc); + } + dout(0) << " osd " << osdmap << dendl; + } + break; + + case PAXOS_CLIENTMAP: + { + bufferlist::iterator p = notify->bl.begin(); + if (notify->is_latest) { + clientmap.decode(p); + } else { + ClientMap::Incremental inc; + inc.decode(p); + clientmap.apply_incremental(inc); + } + dout(0) << "client " << clientmap << dendl; + } + break; + + case PAXOS_LOG: + { + LogEntry le; + bufferlist::iterator p = notify->bl.begin(); + while (!p.end()) { + le.decode(p); + dout(0) << " log " << le << dendl; + } + break; + } + } + + map_ver[notify->machine_id] = notify->ver; +} + +static void send_observe_requests(); + +class C_ObserverRefresh : public Context { +public: + C_ObserverRefresh() {} + void finish(int r) { + send_observe_requests(); + } +}; + +static void send_observe_requests() +{ + bufferlist indata; + float seconds = g_conf.paxos_observer_timeout/2; + +#define RETRY_SECONDS 5 + if (first_response) { + first_response = false; + if (RETRY_SECONDS < seconds) + return; + } + + for (int i=0; iset_data(indata); + int mon = monmap.pick_mon(); + generic_dout(1) << "mon" << mon << " <- observe " << get_paxos_name(i) << dendl; + messenger->send_message(m, monmap.get_inst(mon)); + } + + C_ObserverRefresh *observe_refresh_event = new C_ObserverRefresh(); + + if (!got_response) + seconds = (seconds < RETRY_SECONDS ? seconds : RETRY_SECONDS); + + timer.add_event_after(seconds, observe_refresh_event); +} + + + +// watch (poll) +int watch = 0; enum { OSD, MON, MDS, CLIENT, LAST }; int which = 0; int same = 0; @@ -80,7 +224,6 @@ struct C_Refresh : public Context { } }; -SafeTimer timer(lock); Context *event = 0; void get_status(bool newmon) @@ -99,7 +242,6 @@ void get_status(bool newmon) timer.add_event_after(.2, event); } - void handle_ack(MMonCommandAck *ack) { if (watch) { @@ -150,6 +292,9 @@ class Admin : public Dispatcher { case MSG_MON_COMMAND_ACK: handle_ack((MMonCommandAck*)m); break; + case MSG_MON_OBSERVE_NOTIFY: + handle_notify((MMonObserveNotify *)m); + break; default: return false; } @@ -211,7 +356,8 @@ void usage() cerr << " -m monhost -- specify monitor hostname or ip" << std::endl; cerr << " -i infile -- specify input file" << std::endl; cerr << " -o outfile -- specify output file" << std::endl; - cerr << " -w or --watch -- watch mds, osd, pg status" << std::endl; + cerr << " -w or --watch -- watch mds, osd, pg status (push)" << std::endl; + cerr << " -p or --poll -- watch mds, osd, pg status (poll)" << std::endl; cerr << "Commands:" << std::endl; cerr << " stop -- cleanly shut down file system" << std::endl << " (osd|pg|mds) stat -- get monitor subsystem status" << std::endl @@ -221,7 +367,7 @@ void usage() const char *cli_prompt(EditLine *e) { - return "monctl> "; + return "ceph> "; } int do_cli() @@ -319,6 +465,9 @@ int main(int argc, const char **argv, const char *envp[]) { } } else if (strcmp(args[i], "-w") == 0 || strcmp(args[i], "--watch") == 0) { + observe = 1; + } else if (strcmp(args[i], "-p") == 0 || + strcmp(args[i], "--poll") == 0) { watch = 1; } else nargs.push_back(args[i]); @@ -353,7 +502,13 @@ int main(int argc, const char **argv, const char *envp[]) { lock.Lock(); get_status(); lock.Unlock(); - } else { + } + if (observe) { + lock.Lock(); + send_observe_requests(); + lock.Unlock(); + } + if (!watch && !observe) { if (vcmd.size()) { string rs; diff --git a/src/cobserver.cc b/src/cobserver.cc deleted file mode 100644 index 9d12c2002d961..0000000000000 --- a/src/cobserver.cc +++ /dev/null @@ -1,242 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include -#include -#include -using namespace std; - -#include "config.h" - -#include "mon/MonMap.h" -#include "mon/MonClient.h" -#include "mon/mon_types.h" - -#include "mon/ClientMap.h" -#include "mon/PGMap.h" -#include "mon/ClientMap.h" -#include "osd/OSDMap.h" -#include "mds/MDSMap.h" -#include "include/LogEntry.h" - -#include "msg/SimpleMessenger.h" - -#include "messages/MMonObserve.h" -#include "messages/MMonObserveNotify.h" - -#include "common/Timer.h" - -#ifndef DARWIN -#include -#endif // DARWIN - -#include -#include -#include - -#include - -Mutex lock("cobserver.cc lock"); -Messenger *messenger = 0; - -static PGMap pgmap; -static MonMap monmap; -static MDSMap mdsmap; -static OSDMap osdmap; -static ClientMap clientmap; - -static bool got_response = false; -static bool first_response = false; - -version_t map_ver[PAXOS_NUM]; - -SafeTimer timer(lock); - - -void handle_notify(MMonObserveNotify *notify) -{ - generic_dout(1) << notify->get_source() << " -> " << get_paxos_name(notify->machine_id) - << " v" << notify->ver - << (notify->is_latest ? " (latest)" : "") - << dendl; - - lock.Lock(); - if (!got_response) { - first_response = true; - got_response = true; - } - lock.Unlock(); - - if (map_ver[notify->machine_id] >= notify->ver) - return; - - switch (notify->machine_id) { - case PAXOS_PGMAP: - { - bufferlist::iterator p = notify->bl.begin(); - if (notify->is_latest) { - pgmap.decode(p); - } else { - PGMap::Incremental inc; - inc.decode(p); - pgmap.apply_incremental(inc); - } - dout(0) << " pg " << pgmap << dendl; - break; - } - - case PAXOS_MDSMAP: - mdsmap.decode(notify->bl); - dout(0) << " mds " << mdsmap << dendl; - break; - - case PAXOS_OSDMAP: - { - if (notify->is_latest) { - osdmap.decode(notify->bl); - } else { - OSDMap::Incremental inc(notify->bl); - osdmap.apply_incremental(inc); - } - dout(0) << " osd " << osdmap << dendl; - } - break; - - case PAXOS_CLIENTMAP: - { - bufferlist::iterator p = notify->bl.begin(); - if (notify->is_latest) { - clientmap.decode(p); - } else { - ClientMap::Incremental inc; - inc.decode(p); - clientmap.apply_incremental(inc); - } - dout(0) << "client " << clientmap << dendl; - } - break; - - case PAXOS_LOG: - { - LogEntry le; - bufferlist::iterator p = notify->bl.begin(); - while (!p.end()) { - le.decode(p); - dout(0) << " log " << le << dendl; - } - break; - } - } - - map_ver[notify->machine_id] = notify->ver; -} - -class Admin : public Dispatcher { - bool dispatch_impl(Message *m) { - switch (m->get_type()) { - case MSG_MON_OBSERVE_NOTIFY: - handle_notify((MMonObserveNotify *)m); - break; - default: - return false; - } - return true; - } -} dispatcher; - - -void usage() -{ - cerr << "usage: cobserver [options]" << std::endl; - cerr << "Options:" << std::endl; - cerr << " -m monhost -- specify monitor hostname or ip" << std::endl; - exit(1); -} - -static void send_requests(); - -class C_ObserverRefresh : public Context { -public: - C_ObserverRefresh() {} - void finish(int r) { - send_requests(); - } -}; - -static void send_requests() -{ - bufferlist indata; - float seconds=g_conf.paxos_observer_timeout/2; - -#define RETRY_SECONDS 5 - if (first_response) { - first_response = false; - if (RETRY_SECONDS < seconds) - return; - } - - for (int i=0; iset_data(indata); - int mon = monmap.pick_mon(); - generic_dout(1) << "mon" << mon << " <- observe " << get_paxos_name(i) << dendl; - messenger->send_message(m, monmap.get_inst(mon)); - } - - C_ObserverRefresh *observe_refresh_event = new C_ObserverRefresh(); - - if (!got_response) - seconds = (seconds < RETRY_SECONDS ? seconds : RETRY_SECONDS); - - timer.add_event_after(seconds, observe_refresh_event); -} - -int main(int argc, const char **argv, const char *envp[]) { - - vector args; - argv_to_vec(argc, argv, args); - env_to_vec(args); - parse_config_options(args); - - vec_to_argv(args, argc, argv); - - srand(getpid()); - - vector nargs; - - // get monmap - MonClient mc; - if (mc.get_monmap(&monmap) < 0) - return -1; - memset(map_ver, 0, sizeof(map_ver)); - - // start up network - rank.bind(); - g_conf.daemonize = false; // not us! - messenger = rank.register_entity(entity_name_t::ADMIN()); - messenger->set_dispatcher(&dispatcher); - - rank.start(); - rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fail_after(1.0)); - - lock.Lock(); - send_requests(); - lock.Unlock(); - - // wait for messenger to finish - rank.wait(); - messenger->destroy(); - return 0; -} -