common/simple_spin.cc \
common/Thread.cc \
common/Formatter.cc \
+ common/HeartbeatMap.cc \
include/ceph_fs.cc \
include/ceph_hash.cc \
include/ceph_strings.cc \
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2011 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <time.h>
+
+#include "HeartbeatMap.h"
+#include "ceph_context.h"
+
+#include "debug.h"
+#define DOUT_SUBSYS heartbeatmap
+#undef dout_prefix
+#define dout_prefix *_dout << "heartbeat_map "
+
+
+HeartbeatMap::HeartbeatMap(CephContext *cct)
+ : m_cct(cct),
+ m_rwlock("HeartbeatMap::m_rwlock")
+{
+}
+
+HeartbeatMap::~HeartbeatMap()
+{
+}
+
+heartbeat_handle_d *HeartbeatMap::add_worker(pthread_t thread, string name)
+{
+ m_rwlock.get_write();
+ ldout(m_cct, 10) << "add_worker " << thread << " '" << name << "'" << dendl;
+ assert(m_workers.count(thread) == 0);
+ heartbeat_handle_d *h = new heartbeat_handle_d(thread, name);
+ m_workers[thread] = h;
+ m_rwlock.put_write();
+ return h;
+}
+
+void HeartbeatMap::remove_worker(heartbeat_handle_d *h)
+{
+ m_rwlock.get_write();
+ ldout(m_cct, 10) << "remove_worker " << h->thread << " '" << h->name << "'" << dendl;
+ map<pthread_t, heartbeat_handle_d*>::iterator p = m_workers.find(h->thread);
+ assert(p != m_workers.end());
+ m_workers.erase(p);
+ m_rwlock.put_write();
+ delete h;
+}
+
+void HeartbeatMap::touch_worker(heartbeat_handle_d *h, time_t grace)
+{
+ ldout(m_cct, 20) << "touch_worker " << h->thread << " grace " << grace << dendl;
+ h->timeout = time(NULL) + grace;
+}
+
+bool HeartbeatMap::is_healthy()
+{
+ m_rwlock.get_read();
+ time_t now = time(NULL);
+ bool healthy = true;
+ for (map<pthread_t, heartbeat_handle_d*>::iterator p = m_workers.begin();
+ p != m_workers.end();
+ ++p)
+ if (p->second->timeout && p->second->timeout < now) {
+ ldout(m_cct, 0) << "is_healthy " << p->first << " '" << p->second->name << "'"
+ << " timed out" << dendl;
+ healthy = false;
+ }
+ m_rwlock.put_read();
+ return healthy;
+}
+
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2011 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef CEPH_HEARTBEATMAP_H
+#define CEPH_HEARTBEATMAP_H
+
+#include <pthread.h>
+
+#include <string>
+#include <map>
+
+#include "RWLock.h"
+
+class CephContext;
+
+struct heartbeat_handle_d {
+ pthread_t thread;
+ std::string name;
+ time_t timeout;
+
+ heartbeat_handle_d(pthread_t t, const std::string& n)
+ : thread(t), name(n),
+ timeout(0)
+ { }
+};
+
+class HeartbeatMap {
+ public:
+ heartbeat_handle_d *add_worker(pthread_t thread, std::string name);
+ void remove_worker(heartbeat_handle_d *h);
+ void touch_worker(heartbeat_handle_d *h, time_t grace);
+
+ bool is_healthy();
+
+ HeartbeatMap(CephContext *cct);
+ ~HeartbeatMap();
+
+ private:
+ CephContext *m_cct;
+ RWLock m_rwlock;
+ std::map<unsigned long, heartbeat_handle_d*> m_workers;
+};
+
+#endif
#include "common/ceph_context.h"
#include "common/config.h"
#include "common/debug.h"
+#include "common/HeartbeatMap.h"
#include <iostream>
#include <pthread.h>
_module_type(module_type_),
_service_thread(NULL),
_admin_socket_config_obs(NULL),
- _perf_counters_collection(NULL)
+ _perf_counters_collection(NULL),
+ _heartbeat_map(NULL)
{
pthread_spin_init(&_service_thread_lock, PTHREAD_PROCESS_SHARED);
_perf_counters_collection = new PerfCountersCollection(this);
_conf->add_observer(_doss);
_admin_socket_config_obs = new AdminSocketConfigObs(this);
_conf->add_observer(_admin_socket_config_obs);
+ _heartbeat_map = new HeartbeatMap(this);
}
CephContext::
{
join_service_thread();
+ delete _heartbeat_map;
+
_conf->remove_observer(_admin_socket_config_obs);
_conf->remove_observer(_doss);
class PerfCountersCollection;
class md_config_obs_t;
class md_config_t;
+class HeartbeatMap;
/* A CephContext represents the context held by a single library user.
* There can be multiple CephContexts in the same process.
/* Get the PerfCountersCollection of this CephContext */
PerfCountersCollection *GetPerfCountersCollection();
+ HeartbeatMap *get_heartbeat_map() {
+ return _heartbeat_map;
+ }
+
private:
CephContext(const CephContext &rhs);
CephContext &operator=(const CephContext &rhs);
PerfCountersCollection *_perf_counters_collection;
md_config_obs_t *_perf_counters_conf_obs;
+
+ HeartbeatMap *_heartbeat_map;
};
#endif
OPTION(debug_tp, OPT_INT, 0),
OPTION(debug_auth, OPT_INT, 1),
OPTION(debug_finisher, OPT_INT, 1),
+ OPTION(debug_heartbeatmap, OPT_INT, 1),
OPTION(key, OPT_STR, 0),
OPTION(keyfile, OPT_STR, 0),
OPTION(keyring, OPT_STR, "/etc/ceph/keyring,/etc/ceph/keyring.bin"),
int debug_tp;
int debug_auth;
int debug_finisher;
+ int debug_heartbeatmap;
// auth
std::string key;