From 8158f9ea770104d3dfe7e4e0d81e4cd5abf48c59 Mon Sep 17 00:00:00 2001 From: sage Date: Tue, 7 Jun 2005 18:29:20 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@278 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/Makefile | 1 + ceph/client/Client.cc | 14 ++- ceph/client/Client.h | 3 +- ceph/client/fuse.cc | 17 +-- ceph/common/Timer.cc | 70 ++++++++++-- ceph/common/Timer.h | 42 +------ ceph/config.cc | 4 +- ceph/mds/MDS.cc | 15 +-- ceph/mds/OSDMonitor.cc | 94 ++++------------ ceph/mds/OSDMonitor.h | 6 +- ceph/messages/MFailure.h | 28 +++++ ceph/messages/MFailureAck.h | 28 +++++ ceph/messages/MPing.h | 14 +-- ceph/messages/MPingAck.h | 26 +++++ ceph/msg/CheesySerializer.h | 4 +- ceph/msg/FakeMessenger.cc | 2 +- ceph/msg/HostMonitor.cc | 213 ++++++++++++++++++++++++++++++++++++ ceph/msg/HostMonitor.h | 80 ++++++++++++++ ceph/msg/MPIMessenger.h | 2 +- ceph/msg/Message.h | 8 +- ceph/msg/Messenger.cc | 12 ++ ceph/msg/Messenger.h | 5 +- ceph/osd/OSD.cc | 69 +++++++++--- ceph/osd/OSD.h | 6 +- 24 files changed, 585 insertions(+), 178 deletions(-) create mode 100644 ceph/messages/MFailure.h create mode 100644 ceph/messages/MFailureAck.h create mode 100644 ceph/messages/MPingAck.h create mode 100644 ceph/msg/HostMonitor.cc create mode 100644 ceph/msg/HostMonitor.h diff --git a/ceph/Makefile b/ceph/Makefile index 16a265e5bec3f..0862e519f535e 100644 --- a/ceph/Makefile +++ b/ceph/Makefile @@ -31,6 +31,7 @@ COMMON_OBJS= \ msg/Messenger.o\ msg/Dispatcher.o\ msg/error.o\ + msg/HostMonitor.o\ osd/FakeStore.o\ osd/Filer.o\ osd/OSDCluster.o\ diff --git a/ceph/client/Client.cc b/ceph/client/Client.cc index 6fbc2b4d68ad0..70b1ee777bec2 100644 --- a/ceph/client/Client.cc +++ b/ceph/client/Client.cc @@ -1100,19 +1100,25 @@ int Client::write(fileh_t fh, const char *buf, size_t size, off_t offset) } -int Client::flush(fileh_t fh) +int Client::fsync(fileh_t fh, bool syncdataonly) { client_lock.Lock(); int r = 0; - dout(3) << "flush fh " << fh << endl; - assert(fh_map.count(fh)); Fh *f = fh_map[fh]; Inode *in = f->inode; - + + dout(3) << "fsync fh " << fh << " ino " << in->inode.ino << " syncdataonly " << syncdataonly << endl; + flush_inode_buffers(in); + if (syncdataonly && + (f->caps & CFILE_CAP_WR)) { + // flush metadata too.. size, mtime + // ... + } + client_lock.Unlock(); return r; } diff --git a/ceph/client/Client.h b/ceph/client/Client.h index bb7b30c80497f..f5f219e060b06 100644 --- a/ceph/client/Client.h +++ b/ceph/client/Client.h @@ -288,9 +288,8 @@ class Client : public Dispatcher { int close(fileh_t fh); int read(fileh_t fh, char *buf, size_t size, off_t offset); int write(fileh_t fh, const char *buf, size_t size, off_t offset); - int flush(fileh_t fh); int truncate(fileh_t fh, off_t size); - int fsync(fileh_t fh); + int fsync(fileh_t fh, bool syncdataonly); }; diff --git a/ceph/client/fuse.cc b/ceph/client/fuse.cc index 0915dc37fb29d..fcac60f6d7f59 100644 --- a/ceph/client/fuse.cc +++ b/ceph/client/fuse.cc @@ -190,15 +190,13 @@ static int ceph_write(const char *path, const char *buf, size_t size, return client->write(fh, buf, size, offset); } +/* static int ceph_flush(const char *path, struct fuse_file_info *fi) { - /*pfh_lock.Lock(); - fileh_t fh = pfh_map[fi->fh]; - pfh_lock.Unlock(); - */ fileh_t fh = fi->fh; return client->flush(fh); } +*/ static int ceph_statfs(const char *path, struct statfs *stbuf) { @@ -223,13 +221,8 @@ static int ceph_release(const char *path, struct fuse_file_info *fi) static int ceph_fsync(const char *path, int isdatasync, struct fuse_file_info *fi) { - /* Just a stub. This method is optional and can safely be left - unimplemented */ - - (void) path; - (void) isdatasync; - (void) fi; - return 0; + fileh_t fh = fi->fh; + return client->fsync(fh, isdatasync ? true:false); } @@ -252,7 +245,7 @@ static struct fuse_operations ceph_oper = { read: ceph_read, write: ceph_write, statfs: ceph_statfs, - flush: ceph_flush, + flush: 0, //ceph_flush, release: ceph_release, fsync: ceph_fsync }; diff --git a/ceph/common/Timer.cc b/ceph/common/Timer.cc index 4d748ccaac4f4..5901c0651ae09 100644 --- a/ceph/common/Timer.cc +++ b/ceph/common/Timer.cc @@ -11,6 +11,7 @@ #undef dout #define dout(x) if (x <= g_conf.debug) cout << "Timer: " +#define DBL 20 #include #include @@ -58,27 +59,27 @@ void Timer::timer_thread() if (it->first > now) break; timepair_t t = it->first; - dout(5) << "queuing event(s) scheduled at " << t << endl; + dout(DBL) << "queuing event(s) scheduled at " << t << endl; pending[t] = it->second; it++; scheduled.erase(t); } - dout(5) << "kicking messenger" << endl; + dout(DBL) << "kicking messenger" << endl; messenger_to_kick->trigger_timer(this); } else { // sleep if (event) { - dout(5) << "sleeping until " << next << endl; + dout(DBL) << "sleeping until " << next << endl; struct timeval tv; tv.tv_sec = next.first; tv.tv_usec = next.second; cond.Wait(lock, &tv); // wait for waker or time } else { - dout(5) << "sleeping" << endl; + dout(DBL) << "sleeping" << endl; cond.Wait(lock); // wait for waker } } @@ -93,6 +94,7 @@ void Timer::timer_thread() * Timer bits */ + void Timer::set_messenger(Messenger *m) { dout(10) << "messenger to kick is " << m << endl; @@ -108,10 +110,10 @@ void Timer::unset_messenger() void Timer::register_timer() { if (thread_id) { - dout(10) << "register_timer kicking thread" << endl; + dout(DBL) << "register_timer kicking thread" << endl; cond.Signal(); } else { - dout(10) << "register_timer starting thread" << endl; + dout(DBL) << "register_timer starting thread" << endl; pthread_create(&thread_id, NULL, timer_thread_entrypoint, (void*)this); } } @@ -137,6 +139,58 @@ void Timer::cancel_timer() } +/* + * schedule + */ + + +void Timer::add_event_after(float seconds, + Context *callback) +{ + struct timeval tv; + g_clock.gettime(&tv); + tv.tv_sec += seconds; + add_event_at(&tv, callback); +} + +void Timer::add_event_at(struct timeval *tv, + Context *callback) +{ + // insert + timepair_t when = timepair_t(tv->tv_sec,tv->tv_usec); + + dout(DBL) << "add_event " << callback << " at " << when << endl; + + lock.Lock(); + scheduled[ when ].insert(callback); + event_times[callback] = when; + lock.Unlock(); + + // make sure i wake up + register_timer(); +} + +bool Timer::cancel_event(Context *callback) +{ + lock.Lock(); + + dout(DBL) << "cancel_event " << callback << endl; + + if (!event_times.count(callback)) { + dout(DBL) << "cancel_event " << callback << " wasn't scheduled?" << endl; + lock.Unlock(); + return false; // wasn't scheduled. + } + + timepair_t tp = event_times[callback]; + + event_times.erase(callback); + scheduled.erase(tp); + pending.erase(tp); + + lock.Unlock(); + return true; +} /*** * do user callbacks @@ -154,14 +208,14 @@ void Timer::execute_pending() lock.Unlock(); - dout(5) << "executing event " << event << " scheduled for " << when << endl; + dout(DBL) << "executing event " << event << " scheduled for " << when << endl; event->finish(0); delete event; lock.Lock(); } - dout(12) << "no more events for now" << endl; + dout(DBL) << "no more events for now" << endl; lock.Unlock(); } diff --git a/ceph/common/Timer.h b/ceph/common/Timer.h index b8cf53ef20b00..9854009d25997 100644 --- a/ceph/common/Timer.h +++ b/ceph/common/Timer.h @@ -79,46 +79,10 @@ class Timer { // schedule events void add_event_after(float seconds, - Context *callback) { - struct timeval tv; - g_clock.gettime(&tv); - tv.tv_sec += seconds; - add_event_at(&tv, callback); - } - + Context *callback); void add_event_at(struct timeval *tv, - Context *callback) { - // insert - timepair_t when = timepair_t(tv->tv_sec,tv->tv_usec); - - lock.Lock(); - scheduled[ when ].insert(callback); - event_times[callback] = when; - lock.Unlock(); - - // make sure i wake up - register_timer(); - } - - /* - bool cancel_event(Context *callback) { - lock.Lock(); - - if (!event_times.count(callback)) { - lock.Unlock(); - return false; // wasn't scheduled. - } - - timepair_t tp = event_times[callback]; - - event_times.erase(callback); - event_map[ tp ].erase(callback); - if (event_map[ tp ].empty()) event_map.erase( tp ); - - lock.Unlock(); - return true; - } - */ + Context *callback); + bool cancel_event(Context *callback); // execute pending events void execute_pending(); diff --git a/ceph/config.cc b/ceph/config.cc index 646d04502771e..6510a0b9b2f2b 100644 --- a/ceph/config.cc +++ b/ceph/config.cc @@ -13,7 +13,7 @@ md_config_t g_conf = { num_mds: 2, - num_osd: 2, + num_osd: 5, num_client: 1, osd_cow: false, // crashy? true, @@ -25,7 +25,7 @@ md_config_t g_conf = { fake_clock: false, fakemessenger_serialize: true, - debug: 3, + debug: 15, // --- client --- client_cache_size: 400, diff --git a/ceph/mds/MDS.cc b/ceph/mds/MDS.cc index 7359fa44b7598..0feb1c9a9ae55 100644 --- a/ceph/mds/MDS.cc +++ b/ceph/mds/MDS.cc @@ -23,6 +23,7 @@ #include "common/LogType.h" #include "messages/MPing.h" +#include "messages/MPingAck.h" #include "messages/MGenericMessage.h" #include "messages/MClientMount.h" @@ -374,6 +375,7 @@ void MDS::my_dispatch(Message *m) // HACK FOR NOW + /* static bool did_heartbeat_hack = false; if (!shutting_down && !shut_down && false && @@ -381,6 +383,7 @@ void MDS::my_dispatch(Message *m) osdmonitor->initiate_heartbeat(); did_heartbeat_hack = true; } + */ // finish any triggered contexts @@ -493,14 +496,12 @@ void MDS::handle_client_unmount(Message *m) void MDS::handle_ping(MPing *m) { - dout(10) << " received ping from " << MSG_ADDR_NICE(m->get_source()) << " with ttl " << m->ttl << endl; - if (m->ttl > 0) { - //cout << "mds" << whoami << " responding to " << m->get_source() << endl; - messenger->send_message(new MPing(m->ttl - 1), - m->get_source(), m->get_source_port(), - MDS_PORT_MAIN); - } + dout(10) << " received ping from " << MSG_ADDR_NICE(m->get_source()) << " with seq " << m->seq << endl; + messenger->send_message(new MPingAck(m), + m->get_source(), m->get_source_port(), + MDS_PORT_MAIN); + delete m; } diff --git a/ceph/mds/OSDMonitor.cc b/ceph/mds/OSDMonitor.cc index a007027dd2fc9..75e846cdd2e6b 100644 --- a/ceph/mds/OSDMonitor.cc +++ b/ceph/mds/OSDMonitor.cc @@ -4,104 +4,56 @@ #include "osd/OSDCluster.h" #include "msg/Message.h" +#include "msg/Messenger.h" #include "messages/MPing.h" +#include "messages/MPingAck.h" +#include "messages/MFailure.h" +#include "messages/MFailureAck.h" #include "common/Timer.h" #include "common/Clock.h" -/* to send a message, - -Message *messageptr = ......; -mds->messenger->send_message(messageptr, - MSG_ADDR_OSD(osdnum), 0, MDS_PORT_OSDMON); - - -timer example: - -class C_Test : public Context { - OSDMonitor *om; -public: - C_Test(OSDMonitor *om) { - this->om = om; - } - void finish(int r) { - cout << "C_Test->finish(" << r << ")" << endl; - om->check_for_ping_timeouts_or_something(); - } -}; - -g_timer.add_event_after(10, new C_Test); - - -to tell which mds we are, mds->get_nodeid() (out of mds->get_cluster()->get_num_mds()) - -*/ +#include "include/config.h" +#undef dout +#define dout(l) if (l<=g_conf.debug) cout << "mds" << mds->get_nodeid() << ".osdmon: " void OSDMonitor::init_my_stuff() { - set all_osds; - mds->osdcluster->get_all_osds(all_osds); - - // pick mine - for (set::iterator it = all_osds.begin(); - it != all_osds.end(); - it++) { - if (1 /* something */) my_osds.insert(*it); - } + } void OSDMonitor::proc_message(Message *m) { switch (m->get_type()) { - case MSG_PING: - handle_ping((MPing*)m); + case MSG_FAILURE: + handle_failure((MFailure*)m); break; - } -} - - -// simple heartbeat for now - -class C_CheckHeartbeat : public Context { - OSDMonitor *om; -public: - C_CheckHeartbeat(OSDMonitor *om) { - this->om = om; - } - void finish(int r) { - om->check_heartbeat(); + case MSG_PING_ACK: + handle_ping_ack((MPingAck*)m); + break; } -}; - -void OSDMonitor::initiate_heartbeat() -{ - // send out pings - - - // set timer for 10s later - g_timer.add_event_after(10, new C_CheckHeartbeat(this)); - } -void OSDMonitor::check_heartbeat() + +void OSDMonitor::handle_ping_ack(MPingAck *m) { - // blah - + // ... + delete m; } - -void OSDMonitor::handle_ping(MPing *m) +void OSDMonitor::handle_failure(MFailure *m) { + dout(1) << "osd failure: " << MSG_ADDR_NICE(m->get_failed()) << " from " << MSG_ADDR_NICE(m->get_source()) << endl; - // check m->get_osd_status(); - - - + // ack + mds->messenger->send_message(new MFailureAck(m), + m->get_source(), m->get_source_port()); + delete m; } diff --git a/ceph/mds/OSDMonitor.h b/ceph/mds/OSDMonitor.h index 1de7750e48d43..d38450a134004 100644 --- a/ceph/mds/OSDMonitor.h +++ b/ceph/mds/OSDMonitor.h @@ -28,10 +28,8 @@ class OSDMonitor { void init_my_stuff(); void proc_message(Message *m); - void handle_ping(class MPing *m); - - void initiate_heartbeat(); - void check_heartbeat(); + void handle_ping_ack(class MPingAck *m); + void handle_failure(class MFailure *m); }; diff --git a/ceph/messages/MFailure.h b/ceph/messages/MFailure.h new file mode 100644 index 0000000000000..dfd58221b7c48 --- /dev/null +++ b/ceph/messages/MFailure.h @@ -0,0 +1,28 @@ +#ifndef __MFAILURE_H +#define __MFAILURE_H + +#include "msg/Message.h" + + +class MFailure : public Message { + public: + msg_addr_t failed; + MFailure(msg_addr_t failed) : Message(MSG_FAILURE) { + this->failed = failed; + } + MFailure() {} + + msg_addr_t get_failed() { return failed; } + + virtual void decode_payload(crope& s, int& off) { + s.copy(0, sizeof(failed), (char*)&failed); + off += sizeof(failed); + } + virtual void encode_payload(crope& s) { + s.append((char*)&failed, sizeof(failed)); + } + + virtual char *get_type_name() { return "fail"; } +}; + +#endif diff --git a/ceph/messages/MFailureAck.h b/ceph/messages/MFailureAck.h new file mode 100644 index 0000000000000..86fc7e9876891 --- /dev/null +++ b/ceph/messages/MFailureAck.h @@ -0,0 +1,28 @@ +#ifndef __MFAILUREACK_H +#define __MFAILUREACK_H + +#include "MFailure.h" + + +class MFailureAck : public Message { + public: + msg_addr_t failed; + MFailureAck(MFailure *m) : Message(MSG_FAILURE_ACK) { + this->failed = m->get_failed(); + } + MFailureAck() {} + + msg_addr_t get_failed() { return failed; } + + virtual void decode_payload(crope& s, int& off) { + s.copy(0, sizeof(failed), (char*)&failed); + off += sizeof(failed); + } + virtual void encode_payload(crope& s) { + s.append((char*)&failed, sizeof(failed)); + } + + virtual char *get_type_name() { return "faila"; } +}; + +#endif diff --git a/ceph/messages/MPing.h b/ceph/messages/MPing.h index 4580d54a0cead..bcd00ecb37fac 100644 --- a/ceph/messages/MPing.h +++ b/ceph/messages/MPing.h @@ -7,18 +7,18 @@ class MPing : public Message { public: - int ttl; - int osd_status; - MPing(int n) : Message(MSG_PING) { - ttl = n; + int seq; + MPing(int s) : Message(MSG_PING) { + seq = s; } - MPing() {} + MPing() : Message(MSG_PING) {} virtual void decode_payload(crope& s, int& off) { - s.copy(0, sizeof(ttl), (char*)&ttl); + s.copy(0, sizeof(seq), (char*)&seq); + off += sizeof(seq); } virtual void encode_payload(crope& s) { - s.append((char*)&ttl, sizeof(ttl)); + s.append((char*)&seq, sizeof(seq)); } virtual char *get_type_name() { return "ping"; } diff --git a/ceph/messages/MPingAck.h b/ceph/messages/MPingAck.h new file mode 100644 index 0000000000000..396f7bfbf1f40 --- /dev/null +++ b/ceph/messages/MPingAck.h @@ -0,0 +1,26 @@ +#ifndef __MPINGACK_H +#define __MPINGACK_H + +#include "MPing.h" + + +class MPingAck : public Message { + public: + int seq; + MPingAck() {} + MPingAck(MPing *p) : Message(MSG_PING_ACK) { + this->seq = p->seq; + } + + virtual void decode_payload(crope& s, int& off) { + s.copy(0, sizeof(seq), (char*)&seq); + off += sizeof(seq); + } + virtual void encode_payload(crope& s) { + s.append((char*)&seq, sizeof(seq)); + } + + virtual char *get_type_name() { return "pinga"; } +}; + +#endif diff --git a/ceph/msg/CheesySerializer.h b/ceph/msg/CheesySerializer.h index 6b4414ce5af20..24c58fed25910 100644 --- a/ceph/msg/CheesySerializer.h +++ b/ceph/msg/CheesySerializer.h @@ -26,12 +26,12 @@ class CheesySerializer : public Messenger, map call_reply; public: - CheesySerializer(Messenger *msg) { + CheesySerializer(Messenger *msg) : Messenger(msg->get_myaddr()) { messenger = msg; messenger->set_dispatcher(this); last_pcid = 1; } - CheesySerializer() { + ~CheesySerializer() { if (messenger) delete messenger; } diff --git a/ceph/msg/FakeMessenger.cc b/ceph/msg/FakeMessenger.cc index 62cae7c27b32d..0f938f63f408e 100644 --- a/ceph/msg/FakeMessenger.cc +++ b/ceph/msg/FakeMessenger.cc @@ -161,7 +161,7 @@ int fakemessenger_do_loop_2() // class -FakeMessenger::FakeMessenger(long me) +FakeMessenger::FakeMessenger(long me) : Messenger(me) { whoami = me; directory[ whoami ] = this; diff --git a/ceph/msg/HostMonitor.cc b/ceph/msg/HostMonitor.cc new file mode 100644 index 0000000000000..15c21bdbf4ee7 --- /dev/null +++ b/ceph/msg/HostMonitor.cc @@ -0,0 +1,213 @@ + +#include "HostMonitor.h" + +#include "msg/Message.h" +#include "msg/Messenger.h" + +#include "messages/MPing.h" +#include "messages/MPingAck.h" +#include "messages/MFailure.h" +#include "messages/MFailureAck.h" + +#include "common/Timer.h" +#include "common/Clock.h" + +#define DBL 10 + + +#include "include/config.h" +#undef dout +#define dout(l) if (l<=g_conf.debug) cout << whoami << " hostmon: " + + +// timer contexts + +class C_HM_InitiateHeartbeat : public Context { + HostMonitor *hm; +public: + C_HM_InitiateHeartbeat(HostMonitor *hm) { + this->hm = hm; + } + void finish(int r) { + //cout << "HEARTBEAT" << endl; + hm->initiate_heartbeat(); + } +}; + +class C_HM_CheckHeartbeat : public Context { + HostMonitor *hm; +public: + C_HM_CheckHeartbeat(HostMonitor *hm) { + this->hm = hm; + } + void finish(int r) { + //cout << "CHECK" << endl; + hm->check_heartbeat(); + } +}; + + + +// startup/shutdown + +void HostMonitor::init() +{ + dout(DBL) << "init" << endl; + + // hack params for now + heartbeat_interval = 10; + max_ping_time = 2; + max_heartbeat_misses = 3; + notify_retry_interval = 10; + + // schedule first hb + schedule_heartbeat(); +} + + +void HostMonitor::shutdown() +{ + // cancel any events + for (set::iterator it = pending_events.begin(); + it != pending_events.end(); + it++) { + g_timer.cancel_event(*it); + } + pending_events.clear(); +} + + +// schedule next heartbeat + +void HostMonitor::schedule_heartbeat() +{ + dout(DBL) << "schedule_heartbeat" << endl; + Context *e = new C_HM_InitiateHeartbeat(this); + pending_events.insert(e); + g_timer.add_event_after(heartbeat_interval, e); +} + + +// take note of a live host + +void HostMonitor::host_is_alive(msg_addr_t host) +{ + if (hosts.count(host)) + status[host].last_heard_from = g_clock.gettime(); +} + + +// do heartbeat + +void HostMonitor::initiate_heartbeat() +{ + time_t now = g_clock.gettime(); + + // send out pings + inflight_pings.clear(); + for (set::iterator it = hosts.begin(); + it != hosts.end(); + it++) { + // have i heard from them recently? + if (now - status[*it].last_heard_from < heartbeat_interval) { + dout(DBL) << "skipping " << *it << ", i heard from them recently" << endl; + } else { + dout(DBL) << "pinging " << *it << endl; + status[*it].last_pinged = now; + inflight_pings.insert(*it); + + messenger->send_message(new MPing(1), *it, 0); + } + } + + // set timer to check results + Context *e = new C_HM_CheckHeartbeat(this); + pending_events.insert(e); + g_timer.add_event_after(max_ping_time, e); + dout(10) << "scheduled check " << e << endl; + + schedule_heartbeat(); // schedule next heartbeat +} + + +// check results + +void HostMonitor::check_heartbeat() +{ + dout(DBL) << "check_heartbeat()" << endl; + + // check inflight pings + for (set::iterator it = inflight_pings.begin(); + it != inflight_pings.end(); + it++) { + status[*it].num_heartbeats_missed++; + + dout(DBL) << "no response from " << *it << " for " << status[*it].num_heartbeats_missed << " beats" << endl; + + if (status[*it].num_heartbeats_missed >= max_heartbeat_misses) { + if (acked_failures.count(*it)) { + dout(DBL) << *it << " is already failed" << endl; + } else { + if (unacked_failures.count(*it)) { + dout(DBL) << *it << " is already failed, but unacked, sending another failure message" << endl; + } else { + dout(DBL) << "failing " << *it << endl; + unacked_failures.insert(*it); + } + + for (set::iterator nit = notify.begin(); + nit != notify.end(); + nit++) { + messenger->send_message(new MFailure(*it), + *nit, notify_port, 0); + } + } + } + } + + // forget about the pings. + inflight_pings.clear(); +} + + +// incoming messages + +void HostMonitor::proc_message(Message *m) +{ + switch (m->get_type()) { + + case MSG_PING_ACK: + handle_ping_ack((MPingAck*)m); + break; + + case MSG_FAILURE_ACK: + handle_failure_ack((MFailureAck*)m); + break; + + } +} + +void HostMonitor::handle_ping_ack(MPingAck *m) +{ + msg_addr_t from = m->get_source(); + + dout(DBL) << "ping ack from " << from << endl; + status[from].last_pinged = g_clock.gettime(); + status[from].num_heartbeats_missed = 0; + inflight_pings.erase(from); + + delete m; +} + +void HostMonitor::handle_failure_ack(MFailureAck *m) +{ + // the higher-up's acknowledged our failure notification, we can stop resending it. + msg_addr_t failed = m->get_failed(); + dout(DBL) << "handle_failure_ack " << failed << endl; + unacked_failures.erase(failed); + acked_failures.insert(failed); + + // FIXME: this doesn't handle failed -> alive transitions gracefully at all.. +} + + diff --git a/ceph/msg/HostMonitor.h b/ceph/msg/HostMonitor.h new file mode 100644 index 0000000000000..b3627e9852c04 --- /dev/null +++ b/ceph/msg/HostMonitor.h @@ -0,0 +1,80 @@ +#ifndef __HOSTMONITOR_H +#define __HOSTMONITOR_H + +#include + +#include +#include +using namespace std; + +#include "include/Context.h" +#include "msg/Message.h" + +class Message; + +typedef struct { + time_t last_heard_from; + time_t last_pinged; + int num_heartbeats_missed; +} monitor_rec_t; + +class HostMonitor { + Messenger *messenger; + string whoami; + + // hosts i monitor + set hosts; + + // who i tell when they fail + set notify; + int notify_port; + + // their status + map status; + + set inflight_pings; // pings we sent that haven't replied yet + + set unacked_failures; // failed hosts that haven't been acked yet. + set acked_failures; // these failures have been acked. + + float heartbeat_interval; // how often to do a heartbeat + float max_ping_time; // how long before it's a miss + int max_heartbeat_misses; // how many misses before i tell + float notify_retry_interval; // how often to retry failure notification + + set pending_events; + + void schedule_heartbeat(); + + public: + HostMonitor(Messenger *m, string& whoami) { + this->messenger = m; + this->whoami = whoami; + notify_port = 0; + } + set& get_hosts() { return hosts; } + set& get_notify() { return notify; } + void set_notify_port(int p) { notify_port = p; } + + void remove_host(msg_addr_t h) { + hosts.erase(h); + status.erase(h); + unacked_failures.erase(h); + acked_failures.erase(h); + } + + void init(); + void shutdown(); + + void host_is_alive(msg_addr_t who); + + void proc_message(Message *m); + void handle_ping_ack(class MPingAck *m); + void handle_failure_ack(class MFailureAck *m); + + void initiate_heartbeat(); + void check_heartbeat(); + +}; + +#endif diff --git a/ceph/msg/MPIMessenger.h b/ceph/msg/MPIMessenger.h index c56e25d541c62..a9c7e5fb02c47 100644 --- a/ceph/msg/MPIMessenger.h +++ b/ceph/msg/MPIMessenger.h @@ -18,7 +18,7 @@ class MPIMessenger : public Messenger { //class Logger *logger; // for logging public: - MPIMessenger(msg_addr_t myaddr); + MPIMessenger(msg_addr_t myaddr) : Messenger(myaddr); ~MPIMessenger(); // init, shutdown MPI and associated event loop thread. diff --git a/ceph/msg/Message.h b/ceph/msg/Message.h index 998f3279b92f7..c2b120fac2ee7 100644 --- a/ceph/msg/Message.h +++ b/ceph/msg/Message.h @@ -1,10 +1,14 @@ #ifndef __MESSAGE_H #define __MESSAGE_H + +#define MSG_PING 2 +#define MSG_PING_ACK 3 -#define MSG_PING 2 +#define MSG_FAILURE 4 +#define MSG_FAILURE_ACK 5 -#define MSG_SHUTDOWN 3 +#define MSG_SHUTDOWN 6 #define MSG_OSD_READ 10 #define MSG_OSD_READREPLY 11 diff --git a/ceph/msg/Messenger.cc b/ceph/msg/Messenger.cc index d44cf0e6b26bb..c5c70087fd1d5 100644 --- a/ceph/msg/Messenger.cc +++ b/ceph/msg/Messenger.cc @@ -14,6 +14,9 @@ using namespace std; #include "messages/MPing.h" +#include "messages/MPingAck.h" +#include "messages/MFailure.h" +#include "messages/MFailureAck.h" #include "messages/MOSDPing.h" #include "messages/MOSDRead.h" @@ -93,6 +96,15 @@ decode_message(msg_envelope_t& env, bufferlist& payload) case MSG_PING: m = new MPing(); break; + case MSG_PING_ACK: + m = new MPingAck(); + break; + case MSG_FAILURE: + m = new MFailure(); + break; + case MSG_FAILURE_ACK: + m = new MFailureAck(); + break; case MSG_OSD_PING: m = new MOSDPing(); diff --git a/ceph/msg/Messenger.h b/ceph/msg/Messenger.h index b91b6ef2ed908..0987071a40e36 100644 --- a/ceph/msg/Messenger.h +++ b/ceph/msg/Messenger.h @@ -15,10 +15,13 @@ class Timer; class Messenger { private: Dispatcher *dispatcher; + msg_addr_t _myaddr; public: - Messenger() : dispatcher(0) { } + Messenger(msg_addr_t w) : dispatcher(0), _myaddr(w) { } + msg_addr_t get_myaddr() { return _myaddr; } + virtual int shutdown() = 0; // dispatching incoming messages diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index 0257fd9b1f134..53d8518140ffc 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -2,13 +2,17 @@ #include "include/types.h" #include "OSD.h" - #include "FakeStore.h" +#include "mds/MDS.h" + #include "msg/Messenger.h" #include "msg/Message.h" +#include "msg/HostMonitor.h" + #include "messages/MPing.h" +#include "messages/MPingAck.h" #include "messages/MOSDRead.h" #include "messages/MOSDReadReply.h" #include "messages/MOSDWrite.h" @@ -41,6 +45,24 @@ OSD::OSD(int id, Messenger *m) // use fake store store = new FakeStore(osd_base_path, whoami); + + // monitor + char s[80]; + sprintf(s, "osd%d", whoami); + string st = s; + monitor = new HostMonitor(m, st); + monitor->set_notify_port(MDS_PORT_OSDMON); + + // hack + int i = whoami; + if (++i == g_conf.num_osd) i = 0; + monitor->get_hosts().insert(MSG_ADDR_OSD(i)); + if (++i == g_conf.num_osd) i = 0; + monitor->get_hosts().insert(MSG_ADDR_OSD(i)); + if (++i == g_conf.num_osd) i = 0; + monitor->get_hosts().insert(MSG_ADDR_OSD(i)); + + monitor->get_notify().insert(MSG_ADDR_MDS(0)); } OSD::~OSD() @@ -51,15 +73,21 @@ OSD::~OSD() int OSD::init() { osd_lock.Lock(); + int r = store->init(); + + monitor->init(); + osd_lock.Unlock(); return r; } int OSD::shutdown() { + monitor->shutdown(); messenger->shutdown(); - return store->finalize(); + int r = store->finalize(); + return r; } @@ -71,21 +99,32 @@ void OSD::dispatch(Message *m) osd_lock.Lock(); switch (m->get_type()) { + // host monitor + case MSG_PING_ACK: + case MSG_FAILURE_ACK: + monitor->proc_message(m); + break; + + // osd + case MSG_SHUTDOWN: shutdown(); break; case MSG_PING: + // take note. + monitor->host_is_alive(m->get_source()); + handle_ping((MPing*)m); break; case MSG_OSD_READ: - read((MOSDRead*)m); + handle_read((MOSDRead*)m); break; case MSG_OSD_WRITE: - write((MOSDWrite*)m); + handle_write((MOSDWrite*)m); break; case MSG_OSD_OP: @@ -96,8 +135,6 @@ void OSD::dispatch(Message *m) dout(1) << " got unknown message " << m->get_type() << endl; } - delete m; - osd_lock.Unlock(); } @@ -105,14 +142,14 @@ void OSD::dispatch(Message *m) void OSD::handle_ping(MPing *m) { // play dead? - if (whoami == 3) { + if (whoami == 1) { + dout(7) << "playing dead" << endl; + } else { dout(7) << "got ping, replying" << endl; - messenger->send_message(new MPing(0), + messenger->send_message(new MPingAck(m), m->get_source(), m->get_source_port(), 0); - } else { - dout(7) << "playing dead" << endl; } - + delete m; } @@ -160,12 +197,14 @@ void OSD::handle_op(MOSDOp *op) default: assert(0); } + + delete op; } -void OSD::read(MOSDRead *r) +void OSD::handle_read(MOSDRead *r) { // read into a buffer bufferptr bptr = new buffer(r->get_len()); // prealloc space for entire read @@ -188,12 +227,14 @@ void OSD::read(MOSDRead *r) // send it messenger->send_message(reply, r->get_source(), r->get_source_port()); + + delete r; } // -- osd_write -void OSD::write(MOSDWrite *m) +void OSD::handle_write(MOSDWrite *m) { // take buffers from the message bufferlist bl; @@ -219,5 +260,7 @@ void OSD::write(MOSDWrite *m) // reply MOSDWriteReply *reply = new MOSDWriteReply(m, 0); messenger->send_message(reply, m->get_source(), m->get_source_port()); + + delete m; } diff --git a/ceph/osd/OSD.h b/ceph/osd/OSD.h index 182fa0b6cc54d..ed4a46efa66ff 100644 --- a/ceph/osd/OSD.h +++ b/ceph/osd/OSD.h @@ -11,6 +11,7 @@ class MOSDRead; class MOSDWrite; class Message; class ObjectStore; +class HostMonitor; class OSD : public Dispatcher { protected: @@ -18,6 +19,7 @@ class OSD : public Dispatcher { int whoami; ObjectStore *store; + HostMonitor *monitor; Mutex osd_lock; @@ -32,8 +34,8 @@ class OSD : public Dispatcher { void handle_ping(class MPing *m); void handle_op(class MOSDOp *m); - void read(MOSDRead *m); - void write(MOSDWrite *m); + void handle_read(MOSDRead *m); + void handle_write(MOSDWrite *m); }; #endif -- 2.39.5