// osd interfaces
osdmap = new OSDMap; // initially blank.. see mount()
mdsmap = new MDSMap;
- objecter = new Objecter(messenger, monclient, osdmap, client_lock);
+ objecter = new Objecter(messenger, monclient, osdmap, client_lock, timer);
objecter->set_client_incarnation(0); // client always 0, for now.
objectcacher = new ObjectCacher(objecter, client_lock,
0, // all ack callback
// per-process lock. lame, but this way I protect LogType too!
Mutex logger_lock("logger_lock");
-std::auto_ptr < SafeTimer >logger_timer;
+SafeTimer logger_timer(logger_lock);
Context *logger_event = 0;
list<Logger*> logger_list;
utime_t start;
void logger_start()
{
Mutex::Locker l(logger_lock);
+ logger_timer.init();
flush_all_loggers();
}
<< " next=" << next
<< dendl;
logger_event = new C_FlushLoggers;
- if (!logger_timer.get())
- logger_timer.reset(new SafeTimer(logger_lock));
- logger_timer->add_event_at(next, logger_event);
+ logger_timer.add_event_at(next, logger_event);
}
static void stop()
{
- logger_timer.reset(NULL);
+ logger_timer.shutdown();
}
thread(NULL),
stopping(false)
{
- thread = new SafeTimerThread(this);
- thread->create();
}
SafeTimer::~SafeTimer()
{
- shutdown();
+ assert(thread == NULL);
+}
+
+void SafeTimer::init()
+{
+ dout(10) << "init" << dendl;
+ thread = new SafeTimerThread(this);
+ thread->create();
}
void SafeTimer::shutdown()
{
+ dout(10) << "shutdown" << dendl;
if (thread) {
- lock.Lock();
cancel_all_events();
-
stopping = true;
cond.Signal();
lock.Unlock();
thread->join();
+ lock.Lock();
delete thread;
thread = NULL;
}
*
* If there are any events that still have to run, they will need to take
* the event_lock first. */
+ void init();
void shutdown();
/* Schedule an event in the future
Mutex lock;
Cond cond;
+ SafeTimer timer;
public:
- RadosClient() : messenger(NULL), lock("radosclient") {
+ RadosClient() : messenger(NULL), lock("radosclient"), timer(lock) {
messenger = new SimpleMessenger();
}
return false;
dout(1) << "starting objecter" << dendl;
- objecter = new Objecter(messenger, &monclient, &osdmap, lock);
+ objecter = new Objecter(messenger, &monclient, &osdmap, lock, timer);
if (!objecter)
return false;
objecter->set_balanced_budget();
lock.Lock();
+ timer.init();
+
objecter->set_client_incarnation(0);
objecter->init();
monclient.renew_subs();
{
lock.Lock();
objecter->shutdown();
+ timer.shutdown();
lock.Unlock();
messenger->shutdown();
messenger->wait();
return *authorizer != NULL;
}
-void Dumper::init() {
+void Dumper::init()
+{
inodeno_t ino = MDS_INO_LOG_OFFSET + strtol(g_conf.id, 0, 0);
unsigned pg_pool = CEPH_METADATA_RULE;
osdmap = new OSDMap();
- objecter = new Objecter(messenger, monc, osdmap, lock);
+ objecter = new Objecter(messenger, monc, osdmap, lock, timer);
journaler = new Journaler(ino, pg_pool, CEPH_FS_ONDISK_MAGIC,
- objecter, 0, 0, &lock);
+ objecter, 0, 0, &timer);
objecter->set_client_incarnation(0);
lock.Lock();
objecter->init();
objecter->wait_for_osd_map();
+ timer.init();
+ lock.Unlock();
+}
+
+void Dumper::shutdown()
+{
+ lock.Lock();
+ timer.shutdown();
lock.Unlock();
}
// wait for messenger to finish
messenger->wait();
+ shutdown();
}
SimpleMessenger *messenger;
MonClient *monc;
Mutex lock;
+ SafeTimer timer;
/*
* The messenger should be a valid SimpleMessenger. You should call bind()
* The MonClient needs to be valid, and you should have called
* build_initial_monmap().
*/
- Dumper(SimpleMessenger *messenger_, MonClient *monc_) : messenger(messenger_),
- monc(monc_),
- lock("Dumper::lock")
+ Dumper(SimpleMessenger *messenger_, MonClient *monc_) :
+ messenger(messenger_),
+ monc(monc_),
+ lock("Dumper::lock"), timer(lock)
{}
bool ms_dispatch(Message *m) {
bool ms_get_authorizer(int dest_type, AuthAuthorizer **authorizer,
bool force_new);
void init();
+ void shutdown();
void dump(const char *dumpfile);
};
if (journaler) delete journaler;
journaler = new Journaler(ino, mds->mdsmap->get_metadata_pg_pool(), CEPH_FS_ONDISK_MAGIC, mds->objecter,
logger, l_mdl_jlat,
- &mds->mds_lock);
+ &mds->timer);
}
void MDLog::write_head(Context *c)
mdsmap = new MDSMap;
osdmap = new OSDMap;
- objecter = new Objecter(messenger, monc, osdmap, mds_lock);
+ objecter = new Objecter(messenger, monc, osdmap, mds_lock, timer);
objecter->unset_honor_osdmap_full();
filer = new Filer(objecter);
}
MDS::~MDS() {
- timer.shutdown();
Mutex::Locker lock(mds_lock);
if (mdcache) { delete mdcache; mdcache = NULL; }
mds_lock.Lock();
+ timer.init();
+
// starting beacon. this will induce an MDSMap from the monitor
want_state = wanted_state;
if (wanted_state == MDSMap::STATE_STANDBY && g_conf.id)
messenger->shutdown();
monc->shutdown();
+
+ timer.shutdown();
}
void MDS::respawn()
void Elector::shutdown()
{
if (expire_event)
- mon->timer->cancel_event(expire_event);
+ mon->timer.cancel_event(expire_event);
}
void Elector::bump_epoch(epoch_t e)
// set the timer
cancel_timer();
expire_event = new C_ElectionExpire(this);
- mon->timer->add_event_after(g_conf.mon_lease + plus,
+ mon->timer.add_event_after(g_conf.mon_lease + plus,
expire_event);
}
void Elector::cancel_timer()
{
if (expire_event) {
- mon->timer->cancel_event(expire_event);
+ mon->timer.cancel_event(expire_event);
expire_event = 0;
}
}
void MonClient::init()
{
dout(10) << "init" << dendl;
- timer.reset(new SafeTimer(monc_lock));
+
messenger->add_dispatcher_head(this);
entity_name = *g_conf.entity_name;
Mutex::Locker l(monc_lock);
+ timer.init();
schedule_tick();
// seed rng so we choose a different monitor each time
void MonClient::shutdown()
{
monc_lock.Lock();
- timer->cancel_all_events();
+ timer.shutdown();
monc_lock.Unlock();
}
void MonClient::schedule_tick()
{
if (hunting)
- timer->add_event_after(g_conf.mon_client_hunt_interval, new C_Tick(this));
+ timer.add_event_after(g_conf.mon_client_hunt_interval, new C_Tick(this));
else
- timer->add_event_after(g_conf.mon_client_ping_interval, new C_Tick(this));
+ timer.add_event_after(g_conf.mon_client_ping_interval, new C_Tick(this));
}
entity_addr_t my_addr;
Mutex monc_lock;
- std::auto_ptr < SafeTimer > timer;
+ SafeTimer timer;
set<__u32> auth_supported;
state(MC_STATE_NONE),
messenger(NULL),
monc_lock("MonClient::monc_lock"),
+ timer(monc_lock),
hunting(true),
want_monmap(true),
want_keys(0), global_id(0),
rank(-1),
messenger(m),
lock("Monitor::lock"),
+ timer(lock),
monmap(map),
logclient(messenger, monmap),
store(s),
messenger->add_dispatcher_head(&logclient);
// start ticker
- timer.reset(new SafeTimer(lock));
+ timer.init();
new_tick();
// call election?
for (vector<PaxosService*>::iterator p = paxos_service.begin(); p != paxos_service.end(); p++)
(*p)->shutdown();
- // Cancel all events. The timer thread will be joined later in ~SafeTimer
- timer->cancel_all_events();
+ timer.shutdown();
// die.
messenger->shutdown();
void Monitor::new_tick()
{
C_Mon_Tick *ctx = new C_Mon_Tick(this);
- timer->add_event_after(g_conf.mon_tick_interval, ctx);
+ timer.add_event_after(g_conf.mon_tick_interval, ctx);
}
void Monitor::tick()
entity_addr_t myaddr;
Messenger *messenger;
Mutex lock;
+ SafeTimer timer;
MonMap *monmap;
LogClient logclient;
KeyServer key_server;
- // timer.
- std::auto_ptr < SafeTimer > timer;
private:
void new_tick();
friend class C_Mon_Tick;
// set timeout event
collect_timeout_event = new C_CollectTimeout(this);
- mon->timer->add_event_after(g_conf.mon_accept_timeout, collect_timeout_event);
+ mon->timer.add_event_after(g_conf.mon_accept_timeout, collect_timeout_event);
}
dout(10) << " they had a higher pn than us, picking a new one." << dendl;
// cancel timeout event
- mon->timer->cancel_event(collect_timeout_event);
+ mon->timer.cancel_event(collect_timeout_event);
collect_timeout_event = 0;
collect(last->pn);
// is that everyone?
if (num_last == mon->get_quorum().size()) {
// cancel timeout event
- mon->timer->cancel_event(collect_timeout_event);
+ mon->timer.cancel_event(collect_timeout_event);
collect_timeout_event = 0;
// almost...
// set timeout event
accept_timeout_event = new C_AcceptTimeout(this);
- mon->timer->add_event_after(g_conf.mon_accept_timeout, accept_timeout_event);
+ mon->timer.add_event_after(g_conf.mon_accept_timeout, accept_timeout_event);
}
// peon
if (accepted == mon->get_quorum()) {
dout(10) << " got quorum, done with update" << dendl;
// cancel timeout event
- mon->timer->cancel_event(accept_timeout_event);
+ mon->timer.cancel_event(accept_timeout_event);
accept_timeout_event = 0;
// yay!
// if old timeout is still in place, leave it.
if (!lease_ack_timeout_event) {
lease_ack_timeout_event = new C_LeaseAckTimeout(this);
- mon->timer->add_event_after(g_conf.mon_lease_ack_timeout, lease_ack_timeout_event);
+ mon->timer.add_event_after(g_conf.mon_lease_ack_timeout, lease_ack_timeout_event);
}
// set renew event
utime_t at = lease_expire;
at -= g_conf.mon_lease;
at += g_conf.mon_lease_renew_interval;
- mon->timer->add_event_at(at, lease_renew_event);
+ mon->timer.add_event_at(at, lease_renew_event);
}
// (re)set timeout event.
if (lease_timeout_event)
- mon->timer->cancel_event(lease_timeout_event);
+ mon->timer.cancel_event(lease_timeout_event);
lease_timeout_event = new C_LeaseTimeout(this);
- mon->timer->add_event_after(g_conf.mon_lease_ack_timeout, lease_timeout_event);
+ mon->timer.add_event_after(g_conf.mon_lease_ack_timeout, lease_timeout_event);
// trim?
trim_to(lease->first_committed);
// yay!
dout(10) << "handle_lease_ack from " << ack->get_source()
<< " -- got everyone" << dendl;
- mon->timer->cancel_event(lease_ack_timeout_event);
+ mon->timer.cancel_event(lease_ack_timeout_event);
lease_ack_timeout_event = 0;
} else {
dout(10) << "handle_lease_ack from " << ack->get_source()
void Paxos::cancel_events()
{
if (collect_timeout_event) {
- mon->timer->cancel_event(collect_timeout_event);
+ mon->timer.cancel_event(collect_timeout_event);
collect_timeout_event = 0;
}
if (accept_timeout_event) {
- mon->timer->cancel_event(accept_timeout_event);
+ mon->timer.cancel_event(accept_timeout_event);
accept_timeout_event = 0;
}
if (lease_renew_event) {
- mon->timer->cancel_event(lease_renew_event);
+ mon->timer.cancel_event(lease_renew_event);
lease_renew_event = 0;
}
if (lease_ack_timeout_event) {
- mon->timer->cancel_event(lease_ack_timeout_event);
+ mon->timer.cancel_event(lease_ack_timeout_event);
lease_ack_timeout_event = 0;
}
if (lease_timeout_event) {
- mon->timer->cancel_event(lease_timeout_event);
+ mon->timer.cancel_event(lease_timeout_event);
lease_timeout_event = 0;
}
}
if (!proposal_timer) {
dout(10) << " setting propose timer with delay of " << delay << dendl;
proposal_timer = new C_Propose(this);
- mon->timer->add_event_after(delay, proposal_timer);
+ mon->timer.add_event_after(delay, proposal_timer);
} else {
dout(10) << " propose timer already set" << dendl;
}
assert(mon->is_leader() && paxos->is_active());
if (proposal_timer) {
- mon->timer->cancel_event(proposal_timer);
+ mon->timer.cancel_event(proposal_timer);
proposal_timer = 0;
}
{
dout(10) << "election_starting" << dendl;
if (proposal_timer) {
- mon->timer->cancel_event(proposal_timer);
+ mon->timer.cancel_event(proposal_timer);
proposal_timer = 0;
}
paxos->cancel_events();
if (proposal_timer) {
- mon->timer->cancel_event(proposal_timer);
+ mon->timer.cancel_event(proposal_timer);
proposal_timer = 0;
}
}
OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, Messenger *hbm, MonClient *mc, const char *dev, const char *jdev) :
osd_lock("OSD::osd_lock"),
+ timer(osd_lock),
cluster_messenger(internal_messenger),
client_messenger(external_messenger),
monc(mc),
int OSD::init()
{
- timer.reset(new SafeTimer(osd_lock));
Mutex::Locker lock(osd_lock);
+ timer.init();
+
// mount.
dout(2) << "mounting " << dev_path << " " << (journal_path ? journal_path : "(no journal)") << dendl;
assert(store); // call pre_init() first!
heartbeat_thread.create();
// tick
- timer->add_event_after(g_conf.osd_heartbeat_interval, new C_Tick(this));
+ timer.add_event_after(g_conf.osd_heartbeat_interval, new C_Tick(this));
if (false) {
signal(SIGTERM, handle_signal);
state = STATE_STOPPING;
- // Cancel all timers. The timer thread will be destroyed by ~SafeTimer
- timer->cancel_all_events();
+ timer.shutdown();
heartbeat_lock.Lock();
heartbeat_stop = true;
logclient.send_log();
- timer->add_event_after(1.0, new C_Tick(this));
+ timer.add_event_after(1.0, new C_Tick(this));
// only do waiters if dispatch() isn't currently running. (if it is,
// it'll do the waiters, and doing them here may screw up ordering
/** OSD **/
protected:
Mutex osd_lock; // global lock
- std::auto_ptr < SafeTimer > timer; // safe timer (osd_lock)
+ SafeTimer timer; // safe timer (osd_lock)
Messenger *cluster_messenger;
Messenger *client_messenger;
if (write_buf.length() < g_conf.journaler_batch_max) {
// delay! schedule an event.
dout(20) << "flush delaying flush" << dendl;
- if (delay_flush_event) timer.cancel_event(delay_flush_event);
+ if (delay_flush_event)
+ timer->cancel_event(delay_flush_event);
delay_flush_event = new C_DelayFlush(this);
- timer.add_event_after(g_conf.journaler_batch_interval, delay_flush_event);
+ timer->add_event_after(g_conf.journaler_batch_interval, delay_flush_event);
} else {
dout(20) << "flush not delaying flush" << dendl;
_do_flush();
Logger *logger;
int logger_key_lat;
- Mutex *lock;
- SafeTimer timer;
+ SafeTimer *timer;
class C_DelayFlush : public Context {
Journaler *journaler;
friend class C_Trim;
public:
- Journaler(inodeno_t ino_, int pool, const char *mag, Objecter *obj, Logger *l, int lkey, Mutex *lk) :
+ Journaler(inodeno_t ino_, int pool, const char *mag, Objecter *obj, Logger *l, int lkey, SafeTimer *tim) :
last_written(mag), last_committed(mag),
ino(ino_), pg_pool(pool), magic(mag),
objecter(obj), filer(objecter), logger(l), logger_key_lat(lkey),
- lock(lk), timer(*lk), delay_flush_event(0),
+ timer(tim), delay_flush_event(0),
state(STATE_UNDEF), error(0),
write_pos(0), flush_pos(0), ack_pos(0), safe_pos(0),
read_pos(0), requested_pos(0), received_pos(0),
void Objecter::init()
{
assert(client_lock.is_locked());
- timer.reset(new SafeTimer(client_lock));
- timer->add_event_after(g_conf.objecter_tick_interval, new C_Tick(this));
+ timer.add_event_after(g_conf.objecter_tick_interval, new C_Tick(this));
maybe_request_map();
}
void Objecter::shutdown()
{
- client_lock.Unlock();
- timer->shutdown();
- client_lock.Lock();
}
messenger->send_message(new MPing, osdmap->get_inst(*p));
// reschedule
- timer->add_event_after(g_conf.objecter_tick_interval, new C_Tick(this));
+ timer.add_event_after(g_conf.objecter_tick_interval, new C_Tick(this));
}
void Objecter::resend_mon_ops()
version_t last_seen_pgmap_version;
Mutex &client_lock;
- std::auto_ptr < SafeTimer > timer;
+ SafeTimer &timer;
class C_Tick : public Context {
Objecter *ob;
Throttle op_throttler;
public:
- Objecter(Messenger *m, MonClient *mc, OSDMap *om, Mutex& l) :
+ Objecter(Messenger *m, MonClient *mc, OSDMap *om, Mutex& l, SafeTimer& t) :
messenger(m), monc(mc), osdmap(om),
last_tid(0), client_inc(-1),
num_unacked(0), num_uncommitted(0),
keep_balanced_budget(false), honor_osdmap_full(true),
last_seen_osdmap_version(0),
last_seen_pgmap_version(0),
- client_lock(l),
+ client_lock(l), timer(t),
op_throttler(g_conf.objecter_inflight_op_bytes)
{ }
~Objecter() { }
static Cond cmd_cond;
static SimpleMessenger *messenger = 0;
-static std::auto_ptr < SafeTimer > timer;
static Tokenizer *tok;
static const char *outfile = 0;
registered.clear();
float seconds = g_conf.paxos_observer_timeout/2;
dout(1) << " refresh after " << seconds << " with same mon" << dendl;
- timer->add_event_after(seconds, new C_ObserverRefresh(false));
+ g.timer.add_event_after(seconds, new C_ObserverRefresh(false));
}
static void handle_ack(MMonCommandAck *ack)
reply_bl = ack->get_data();
cmd_cond.Signal();
if (resend_event) {
- timer->cancel_event(resend_event);
+ g.timer.cancel_event(resend_event);
resend_event = 0;
}
g.lock.Unlock();
common_set_defaults(false);
common_init(args, "ceph", true);
- timer.reset(new SafeTimer(g.lock));
vec_to_argv(args, argc, argv);
messenger->start();
+ g.lock.Lock();
+ g.timer.init();
+ g.lock.Unlock();
+
g.mc.set_messenger(messenger);
g.mc.init();
+ int ret = -1;
+
if (g.mc.authenticate() < 0) {
cerr << "unable to authenticate as " << *g_conf.entity_name << std::endl;
- return -1;
+ goto out;
}
if (g.mc.get_monmap() < 0) {
cerr << "unable to get monmap" << std::endl;
- return -1;
+ goto out;
}
- int ret = 0;
-
switch (ceph_tool_mode)
{
case CEPH_TOOL_MODE_OBSERVER:
break;
}
+ ret = 0;
+ out:
+
// wait for messenger to finish
messenger->wait();
messenger->destroy();
- timer->shutdown();
tok_end(tok);
+
+ g.lock.Lock();
+ g.mc.shutdown();
+ g.timer.shutdown();
+ g.lock.Unlock();
return ret;
}
#include "mon/PGMap.h"
#include "mds/MDSMap.h"
#include "osd/OSDMap.h"
+#include "common/Timer.h"
#include <iosfwd>
#include <stdint.h>
// The ceph-tool lock
Mutex lock;
+ SafeTimer timer;
// A condition variable used to wake up the GUI thread
Cond gui_cond;
- ceph_tool_data()
- : updates(EVERYTHING_UPDATE),
- log(&std::cout),
- slog(NULL),
- lock("ceph.cc lock")
+ ceph_tool_data() :
+ updates(EVERYTHING_UPDATE),
+ log(&std::cout),
+ slog(NULL),
+ lock("ceph.cc lock"), timer(lock)
{
}
};