Create a libcommon service thread. Use it to handle SIGHUP.
Handle it by means of a flag that gets set. Using a queue would raise
the complicated question of what to do when the queue was full.
Signed-off-by: Colin McCabe <colin.mccabe@dreamhost.com>
check_PROGRAMS += unittest_librgw
endif
-unittest_SignalSafeQueue_SOURCES = test/SignalSafeQueue.cc
-unittest_SignalSafeQueue_LDFLAGS = -pthread -lcurl ${AM_LDFLAGS}
-unittest_SignalSafeQueue_LDADD = ${CRYPTO_LIBS} libcommon.a ${UNITTEST_LDADD}
-unittest_SignalSafeQueue_CXXFLAGS = ${CRYPTO_CFLAGS} ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS}
-check_PROGRAMS += unittest_SignalSafeQueue
-
# shell scripts
editpaths = sed \
-e 's|@bindir[@]|$(bindir)|g' \
common/Clock.cc \
common/Timer.cc \
common/Finisher.cc \
- common/SignalSafeQueue.cc \
common/environment.cc\
common/sctp_crc32.c\
common/assert.cc \
common/ConfUtils.h\
common/DecayCounter.h\
common/Finisher.h\
- common/SignalSafeQueue.h\
common/ProfLogType.h\
common/ProfLogger.h\
common/MemoryModel.h\
///////////////////////////// DoutStreambuf /////////////////////////////
template <typename charT, typename traits>
DoutStreambuf<charT, traits>::DoutStreambuf()
- : flags(0), ofd(-1), rlr(false)
+ : flags(0), ofd(-1)
{
// Initialize get pointer to zero so that underflow is called on the first read.
this->setg(0, 0, 0);
// Initialize output_buffer
_clear_output_buffer();
-
- pthread_spin_init(&rlr_lock, PTHREAD_PROCESS_PRIVATE);
}
template <typename charT, typename traits>
TEMP_FAILURE_RETRY(::close(ofd));
ofd = -1;
}
- pthread_spin_destroy(&rlr_lock);
}
// This function is called when the output buffer is filled.
template <typename charT, typename traits>
void DoutStreambuf<charT, traits>::
-request_log_reopen(void)
-{
- pthread_spin_lock(&rlr_lock);
- rlr = true;
- pthread_spin_unlock(&rlr_lock);
-}
-
-template <typename charT, typename traits>
-void DoutStreambuf<charT, traits>::
-handle_log_reopen_requests(const md_config_t *conf)
+reopen_logs(const md_config_t *conf)
{
- bool need;
- pthread_spin_lock(&rlr_lock);
- need = rlr;
- pthread_spin_unlock(&rlr_lock);
- if (!need)
- return;
std::set <std::string> changed;
const char **keys = get_tracked_conf_keys();
for (const char **k = keys; *k; ++k) {
// (if those sinks are active)
void dout_emergency_to_file_and_syslog(const char * const str) const;
- // The next two functions are used to implement the Ceph daemons'
- // SIGHUP handling.
-
- // Set the request_log_reopen bit.
- // Signal-safe.
- void request_log_reopen(void);
-
- // Read the request_log_reopen bit.
- // If it's set, reopen the log.
- // This is meant to be called from an event loop. Not signal-safe.
- void handle_log_reopen_requests(const md_config_t *conf);
+ // Reopen the logs
+ void reopen_logs(const md_config_t *conf);
protected:
// Called when the buffer fills up
std::string opath;
std::string symlink_dir;
std::string isym_path;
-
- pthread_spinlock_t rlr_lock;
- bool rlr;
};
// Secret evil interfaces for writing logs without taking the lock.
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2011 New Dream Network
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#include "common/config.h"
-#include "common/SignalSafeQueue.h"
-
-#include <errno.h>
-#include <fcntl.h>
-#include <limits.h>
-#include <pthread.h>
-#include <stdio.h>
-#include <sys/mman.h>
-#include <sys/stat.h>
-#include <unistd.h>
-
-SignalSafeQueue *SignalSafeQueue::
-create_queue()
-{
- return new SignalSafeQueue();
-}
-
-SignalSafeQueue::
-SignalSafeQueue()
- : _item_sz(0)
-{
- _fd[0] = -1;
- _fd[1] = -1;
-}
-
-SignalSafeQueue::
-~SignalSafeQueue()
-{
- if (_fd[0] != -1) {
- TEMP_FAILURE_RETRY(close(_fd[0]));
- _fd[0] = -1;
- }
- if (_fd[1] != -1) {
- TEMP_FAILURE_RETRY(close(_fd[1]));
- _fd[1] = -1;
- }
-}
-
-void SignalSafeQueue::
-wake_readers_and_shutdown(void)
-{
- /* Close write file descriptor.
- * Readers will get EPIPE. */
- TEMP_FAILURE_RETRY(close(_fd[1]));
- _fd[1] = -1;
-}
-
-int SignalSafeQueue::
-init(size_t item_sz)
-{
- int ret;
- assert(_fd[0] < 0);
- assert(_fd[1] < 0);
- assert(_item_sz < PIPE_BUF);
-
- _item_sz = item_sz;
- ret = pipe2(_fd, O_CLOEXEC);
- if (ret)
- return ret;
- return 0;
-}
-
-void SignalSafeQueue::
-push(void *buf)
-{
- /* Writing less than PIPE_BUF bytes to the pipe should always be atomic */
- int ret = write(_fd[1], buf, _item_sz);
- assert(ret == (int)_item_sz);
-}
-
-int SignalSafeQueue::
-pop(void *buf)
-{
- int ret;
- /* Read less than PIPE_BUF bytes from the pipe should always be atomic */
- ret = read(_fd[0], buf, _item_sz);
- if (ret < 0) {
- ret = errno;
- if (ret == EPIPE) {
- /* EPIPE means that the other side has closed the pipe */
- return ret;
- }
- derr << "SignalSafeQueue::dequeue() failed with error " << ret << dendl;
- return ret;
- }
- else if (ret == 0) {
- return EPIPE;
- }
- else if (ret != (int)_item_sz) {
- derr << "SignalSafeQueue::dequeue() only read " << ret << " bytes (should "
- << "not be possible)" << dendl;
- return EIO;
- }
- return 0;
-}
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2011 New Dream Network
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#ifndef CEPH_LOCKLESS_QUEUE_H
-#define CEPH_LOCKLESS_QUEUE_H
-
-#include <unistd.h>
-
-/* A simple signal-safe queue. It uses pipe2 internally. */
-
-class SignalSafeQueue
-{
-public:
- static SignalSafeQueue* create_queue();
-
- ~SignalSafeQueue();
-
- /* Unblock all readers.
- * After this function has been called, no further push() operations may be
- * done. However, the memory for the class will continue to exist until the
- * destructor is called.
- *
- * Assumes no concurrent writers exist while we are shutting down.
- */
- void wake_readers_and_shutdown();
-
- /* Initialize the queue. Item size is set to item_sz.
- *
- * Returns: 0 on success; error code otherwise.
- */
- int init(size_t item_sz);
-
- /* Puts an item into the queue using a blocking write().
- * This is safe to call from a signal handler.
- *
- * It is assumed that buf is a buffer of length 'item_sz'
- *
- * This function is reentrant and any number of writers and readers can
- * exist.
- */
- void push(void *buf);
-
- /* Blocks until there is something available in the queue.
- * When it is available, it will be copied into the provided buffer,
- * which we assume is of length 'item_sz'
- *
- * This function is reentrant and any number of writers and readers can
- * exist.
- *
- * Returns: 0 on success
- * EPIPE if the queue has been shut down
- * Other error codes if an unexpected error has occurred.
- */
- int pop(void *buf);
-
-private:
- /* Force heap allocation. */
- SignalSafeQueue();
-
- size_t _item_sz;
- int _fd[2];
-};
-
-#endif
#include "common/DoutStreambuf.h"
#include "common/ProfLogger.h"
+#include "common/Thread.h"
#include "common/ceph_context.h"
#include "common/config.h"
#include <iostream>
+#include <pthread.h>
+#include <semaphore.h>
// FIXME
// These variables are here temporarily to make the transition easier.
*/
pthread_mutex_t _dout_lock = PTHREAD_MUTEX_INITIALIZER;
+class CephContextServiceThread : public Thread
+{
+public:
+ CephContextServiceThread(CephContext *cct)
+ : _reopen_logs(false), _exit_thread(false), _cct(cct)
+ {
+ sem_init(&_sem, 0, 0);
+ };
+
+ ~CephContextServiceThread()
+ {
+ sem_destroy(&_sem);
+ };
+
+ void *entry()
+ {
+ while (1) {
+ sem_wait(&_sem);
+ if (_exit_thread) {
+ break;
+ }
+ if (_reopen_logs) {
+ _cct->_doss->reopen_logs(_cct->_conf);
+ _reopen_logs = false;
+ }
+ }
+ return NULL;
+ }
+
+ void reopen_logs()
+ {
+ _reopen_logs = true;
+ sem_post(&_sem);
+ }
+
+ void exit_thread()
+ {
+ _exit_thread = true;
+ sem_post(&_sem);
+ }
+
+private:
+ volatile bool _reopen_logs;
+ volatile bool _exit_thread;
+ sem_t _sem;
+ CephContext *_cct;
+};
+
CephContext::
CephContext()
: _doss(new DoutStreambuf <char, std::basic_string<char>::traits_type>()),
_dout(_doss),
- _prof_logger_conf_obs(new ProfLoggerConfObs())
+ _prof_logger_conf_obs(new ProfLoggerConfObs()),
+ _service_thread(NULL)
{
_conf = new md_config_t();
_conf->add_observer(_doss);
_conf->add_observer(_prof_logger_conf_obs);
+ pthread_spin_init(&_service_thread_lock, PTHREAD_PROCESS_SHARED);
}
CephContext::
~CephContext()
{
+ join_service_thread();
+
_conf->remove_observer(_prof_logger_conf_obs);
_conf->remove_observer(_doss);
_prof_logger_conf_obs = NULL;
delete _conf;
+ pthread_spin_destroy(&_service_thread_lock);
+}
+
+void CephContext::
+start_service_thread()
+{
+ pthread_spin_lock(&_service_thread_lock);
+ if (_service_thread) {
+ pthread_spin_unlock(&_service_thread_lock);
+ return;
+ }
+ _service_thread = new CephContextServiceThread(this);
+ _service_thread->create();
+ pthread_spin_unlock(&_service_thread_lock);
+}
+
+void CephContext::
+reopen_logs()
+{
+ pthread_spin_lock(&_service_thread_lock);
+ if (_service_thread)
+ _service_thread->reopen_logs();
+ pthread_spin_unlock(&_service_thread_lock);
+}
+
+void CephContext::
+join_service_thread()
+{
+ pthread_spin_lock(&_service_thread_lock);
+ CephContextServiceThread *thread = _service_thread;
+ if (!thread) {
+ pthread_spin_unlock(&_service_thread_lock);
+ return;
+ }
+ _service_thread = NULL;
+ pthread_spin_unlock(&_service_thread_lock);
+
+ thread->exit_thread();
+ thread->join();
+ delete thread;
}
class md_config_t;
class md_config_obs_t;
+class CephContextServiceThread;
/* A CephContext represents the context held by a single library user.
* There can be multiple CephContexts in the same process.
DoutStreambuf <char, std::basic_string<char>::traits_type> *_doss;
std::ostream _dout;
+ /* Start the Ceph Context's service thread */
+ void start_service_thread();
+
+ /* Reopen the log files */
+ void reopen_logs();
+
private:
+ /* Stop and join the Ceph Context's service thread */
+ void join_service_thread();
+
md_config_obs_t *_prof_logger_conf_obs;
+
+ /* libcommon service thread.
+ * SIGHUP wakes this thread, which then reopens logfiles */
+ friend class CephContextServiceThread;
+ CephContextServiceThread *_service_thread;
+
+ char foo[512];
+ /* lock which protects service thread creation, destruction, etc. */
+ pthread_spinlock_t _service_thread_lock;
+ char bar[512];
};
/* Globals (FIXME: remove) */
#elif USE_NSS
void ceph::crypto::init() {
+ if (crypto_init)
+ return;
crypto_init = true;
SECStatus s;
s = NSS_NoDB_Init(NULL);
}
void ceph::crypto::shutdown() {
+ if (!crypto_init)
+ return;
crypto_init = false;
SECStatus s;
s = NSS_Shutdown();
dout(1) << "finished common_init_daemonize" << dendl;
}
+/* Please be sure that this can safely be called multiple times by the
+ * same application. */
void common_init_finish(CephContext *cct)
{
ceph::crypto::init();
keyring_init(cct);
+ cct->start_service_thread();
}
CINIT_FLAG_NO_CLOSE_STDERR = 0x4,
};
-int keyring_init(CephContext *cct);
CephContext *common_preinit(const CephInitParameters &iparams,
enum code_environment_t code_env, int flags);
void complain_about_parse_errors(std::deque<std::string> *parse_errors);
void sighup_handler(int signum)
{
- g_ceph_context._doss->request_log_reopen();
+ g_ceph_context.reopen_logs();
}
static void reraise_fatal(int signum)
if (mounted)
return -EDOM;
+ common_init_finish(cct);
+
//monmap
monclient = new MonClient();
if (monclient->build_initial_monmap() < 0) {
extern "C" int ceph_mount(struct ceph_mount_info *cmount, const char *root)
{
std::string mount_root;
-
- keyring_init(cmount->get_ceph_context());
-
if (root)
mount_root = root;
return cmount->mount(mount_root);
int librados::RadosClient::
connect()
{
+ common_init_finish(cct);
+
int err;
uint64_t nonce;
int librados::Rados::
connect()
{
- int ret = keyring_init(client->cct);
- if (ret)
- return ret;
return client->connect();
}
extern "C" int rados_connect(rados_t cluster)
{
- librados::RadosClient *radosp = (librados::RadosClient *)cluster;
- int ret = keyring_init(radosp->cct);
- if (ret)
- return ret;
-
- return radosp->connect();
+ librados::RadosClient *client = (librados::RadosClient *)cluster;
+ return client->connect();
}
extern "C" void rados_shutdown(rados_t cluster)
if (snapserver)
snapserver->check_osd_map(false);
}
-
- g_ceph_context._doss->handle_log_reopen_requests(&g_conf);
}
}
}
- g_ceph_context._doss->handle_log_reopen_requests(&g_conf);
-
new_tick();
}
dispatch_running = false;
dispatch_cond.Signal();
}
-
- g_ceph_context._doss->handle_log_reopen_requests(&g_conf);
}
// =========================================
int librgw_create(librgw_t *rgw, const char * const id)
{
librgw_init_mutex.Lock();
+ CephContext *cct;
if (!librgw_initialized) {
CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT, CEPH_CONF_FILE_DEFAULT);
iparams.conf_file = "";
cct->_conf->apply_changes();
++librgw_initialized;
+ common_init_finish(cct);
+ }
+ else {
+ cct = &g_ceph_context;
}
librgw_init_mutex.Unlock();
*rgw = &g_ceph_context;
+++ /dev/null
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2011 New Dream Network
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation. See file COPYING.
- *
- */
-
-#include "common/errno.h"
-#include "common/SignalSafeQueue.h"
-#include "gtest/gtest.h"
-
-#include <errno.h>
-#include <iostream>
-#include <pthread.h>
-
-using std::cout;
-
-void *SUCCESS_RET = 0;
-void *ERROR_RET = (void*)-1;
-
-static void *dthread1(void *v)
-{
- SignalSafeQueue *ssq = (SignalSafeQueue*)v;
- int item;
- int ret = ssq->pop((void*)&item);
- if (ret) {
- cout << "pop returned " << ret << "(" << cpp_strerror(ret) << std::endl;
- pthread_exit(ERROR_RET);
- }
- if (item != 123) {
- cout << "expected 123, got " << item << std::endl;
- pthread_exit(ERROR_RET);
- }
- pthread_exit(SUCCESS_RET);
-}
-
-TEST(EnqueueDequeue, Test1) {
- int i, ret;
- void *thread_ret;
- SignalSafeQueue *ssq = SignalSafeQueue::create_queue();
- ret = ssq->init(sizeof(int));
- ASSERT_EQ(ret, 0);
-
- pthread_t t1;
- pthread_create(&t1, NULL, dthread1, (void*)ssq);
-
- i = 123;
- ssq->push((void*)&i);
-
- ret = pthread_join(t1, &thread_ret);
- ASSERT_EQ(ret, 0);
- ASSERT_EQ(thread_ret, SUCCESS_RET);
-
- delete ssq;
-}
-
-static int test2_total = 0;
-
-void increment_test2_total(int amount)
-{
- static pthread_mutex_t test2_lock = PTHREAD_MUTEX_INITIALIZER;
- pthread_mutex_lock(&test2_lock);
- test2_total += amount;
- pthread_mutex_unlock(&test2_lock);
-}
-
-static void *dthread2(void *v)
-{
- SignalSafeQueue *ssq = (SignalSafeQueue*)v;
- int item;
- int ret = ssq->pop((void*)&item);
- if (ret) {
- cout << "pop returned " << ret << "(" << cpp_strerror(ret) << std::endl;
- pthread_exit(ERROR_RET);
- }
- increment_test2_total(item);
-
- ret = ssq->pop((void*)&item);
- if (ret) {
- cout << "pop returned " << ret << "(" << cpp_strerror(ret) << std::endl;
- pthread_exit(ERROR_RET);
- }
- increment_test2_total(item);
-
- pthread_exit(SUCCESS_RET);
-}
-
-TEST(EnqueueDequeue, Test2) {
- int i, ret;
- void *thread_ret;
- SignalSafeQueue *ssq = SignalSafeQueue::create_queue();
- ret = ssq->init(sizeof(int));
- ASSERT_EQ(ret, 0);
-
- pthread_t t2_threadA, t2_threadB;
- pthread_create(&t2_threadA, NULL, dthread2, (void*)ssq);
- pthread_create(&t2_threadB, NULL, dthread2, (void*)ssq);
-
- i = 50;
- ssq->push((void*)&i);
- i = 100;
- ssq->push((void*)&i);
- i = 0;
- ssq->push((void*)&i);
- i = 50;
- ssq->push((void*)&i);
-
- ret = pthread_join(t2_threadA, &thread_ret);
- ASSERT_EQ(ret, 0);
- ASSERT_EQ(thread_ret, SUCCESS_RET);
-
- ret = pthread_join(t2_threadB, &thread_ret);
- ASSERT_EQ(ret, 0);
- ASSERT_EQ(thread_ret, SUCCESS_RET);
-
- ASSERT_EQ(test2_total, 200);
-
- delete ssq;
-}
-
-static pthread_mutex_t shutdown_test_lock = PTHREAD_MUTEX_INITIALIZER;
-
-void *shutdown_thread(void *v)
-{
- int ret, i;
- SignalSafeQueue *ssq = (SignalSafeQueue*)v;
-
- ret = ssq->pop((void*)&i);
- if (ret != 0) {
- printf("shutdown_thread: failed to pop the first element off the queue. "
- "Error %d\n", ret);
- pthread_exit(ERROR_RET);
- }
- if (i != 456) {
- printf("shutdown_thread: unexpected value for first element. "
- "Got %d, Expected %d\n", i, 456);
- pthread_exit(ERROR_RET);
- }
-
- pthread_mutex_lock(&shutdown_test_lock);
- // block until the parent has finished shutting down the queue
- pthread_mutex_unlock(&shutdown_test_lock);
-
- ret = ssq->pop((void*)&i);
- if (ret == EPIPE) {
- pthread_exit(SUCCESS_RET);
- }
- printf("shutdown_thread: expected to get EPIPE, but got return code %d\n",
- ret);
- pthread_exit(ERROR_RET);
-}
-
-TEST(ShutdownTest, ShutdownTest1) {
- int i, ret;
- void *thread_ret;
- SignalSafeQueue *ssq = SignalSafeQueue::create_queue();
- ret = ssq->init(sizeof(int));
- ASSERT_EQ(ret, 0);
-
- pthread_mutex_lock(&shutdown_test_lock);
- pthread_t s_thread;
- pthread_create(&s_thread, NULL, shutdown_thread, (void*)ssq);
-
- // send 456
- i = 456;
- ssq->push((void*)&i);
-
- // shutdown
- ssq->wake_readers_and_shutdown();
-
- pthread_mutex_unlock(&shutdown_test_lock);
-
- ret = pthread_join(s_thread, &thread_ret);
- ASSERT_EQ(ret, 0);
- ASSERT_EQ(thread_ret, SUCCESS_RET);
-
- delete ssq;
-}