From: Colin Patrick McCabe Date: Mon, 23 May 2011 23:25:57 +0000 (-0700) Subject: Create a libcommon service thread X-Git-Tag: v0.29~42^2~2^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=9ff7cc7c3712fbb597f4e97572ba46cd01403775;p=ceph.git Create a libcommon service thread Create a libcommon service thread. Use it to handle SIGHUP. Handle it by means of a flag that gets set. Using a queue would raise the complicated question of what to do when the queue was full. Signed-off-by: Colin McCabe --- diff --git a/src/Makefile.am b/src/Makefile.am index 376eebd8e8ec..7cc922c6294e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -456,12 +456,6 @@ unittest_librgw_CXXFLAGS = ${CRYPTO_CFLAGS} ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS} check_PROGRAMS += unittest_librgw endif -unittest_SignalSafeQueue_SOURCES = test/SignalSafeQueue.cc -unittest_SignalSafeQueue_LDFLAGS = -pthread -lcurl ${AM_LDFLAGS} -unittest_SignalSafeQueue_LDADD = ${CRYPTO_LIBS} libcommon.a ${UNITTEST_LDADD} -unittest_SignalSafeQueue_CXXFLAGS = ${CRYPTO_CFLAGS} ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS} -check_PROGRAMS += unittest_SignalSafeQueue - # shell scripts editpaths = sed \ -e 's|@bindir[@]|$(bindir)|g' \ @@ -618,7 +612,6 @@ libcommon_files = \ common/Clock.cc \ common/Timer.cc \ common/Finisher.cc \ - common/SignalSafeQueue.cc \ common/environment.cc\ common/sctp_crc32.c\ common/assert.cc \ @@ -800,7 +793,6 @@ noinst_HEADERS = \ common/ConfUtils.h\ common/DecayCounter.h\ common/Finisher.h\ - common/SignalSafeQueue.h\ common/ProfLogType.h\ common/ProfLogger.h\ common/MemoryModel.h\ diff --git a/src/common/DoutStreambuf.cc b/src/common/DoutStreambuf.cc index 2c73bcc58d8d..8ac56acc665c 100644 --- a/src/common/DoutStreambuf.cc +++ b/src/common/DoutStreambuf.cc @@ -137,15 +137,13 @@ static int create_symlink(string oldpath, const string &newpath) ///////////////////////////// DoutStreambuf ///////////////////////////// template DoutStreambuf::DoutStreambuf() - : flags(0), ofd(-1), rlr(false) + : flags(0), ofd(-1) { // Initialize get pointer to zero so that underflow is called on the first read. this->setg(0, 0, 0); // Initialize output_buffer _clear_output_buffer(); - - pthread_spin_init(&rlr_lock, PTHREAD_PROCESS_PRIVATE); } template @@ -155,7 +153,6 @@ DoutStreambuf::~DoutStreambuf() TEMP_FAILURE_RETRY(::close(ofd)); ofd = -1; } - pthread_spin_destroy(&rlr_lock); } // This function is called when the output buffer is filled. @@ -388,23 +385,8 @@ DoutStreambuf::underflow() template void DoutStreambuf:: -request_log_reopen(void) -{ - pthread_spin_lock(&rlr_lock); - rlr = true; - pthread_spin_unlock(&rlr_lock); -} - -template -void DoutStreambuf:: -handle_log_reopen_requests(const md_config_t *conf) +reopen_logs(const md_config_t *conf) { - bool need; - pthread_spin_lock(&rlr_lock); - need = rlr; - pthread_spin_unlock(&rlr_lock); - if (!need) - return; std::set changed; const char **keys = get_tracked_conf_keys(); for (const char **k = keys; *k; ++k) { diff --git a/src/common/DoutStreambuf.h b/src/common/DoutStreambuf.h index 4731ae2f6059..b2bcfb299da6 100644 --- a/src/common/DoutStreambuf.h +++ b/src/common/DoutStreambuf.h @@ -75,17 +75,8 @@ public: // (if those sinks are active) void dout_emergency_to_file_and_syslog(const char * const str) const; - // The next two functions are used to implement the Ceph daemons' - // SIGHUP handling. - - // Set the request_log_reopen bit. - // Signal-safe. - void request_log_reopen(void); - - // Read the request_log_reopen bit. - // If it's set, reopen the log. - // This is meant to be called from an event loop. Not signal-safe. - void handle_log_reopen_requests(const md_config_t *conf); + // Reopen the logs + void reopen_logs(const md_config_t *conf); protected: // Called when the buffer fills up @@ -120,9 +111,6 @@ private: std::string opath; std::string symlink_dir; std::string isym_path; - - pthread_spinlock_t rlr_lock; - bool rlr; }; // Secret evil interfaces for writing logs without taking the lock. diff --git a/src/common/SignalSafeQueue.cc b/src/common/SignalSafeQueue.cc deleted file mode 100644 index e8c8d4a0b3d2..000000000000 --- a/src/common/SignalSafeQueue.cc +++ /dev/null @@ -1,110 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2011 New Dream Network - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "common/config.h" -#include "common/SignalSafeQueue.h" - -#include -#include -#include -#include -#include -#include -#include -#include - -SignalSafeQueue *SignalSafeQueue:: -create_queue() -{ - return new SignalSafeQueue(); -} - -SignalSafeQueue:: -SignalSafeQueue() - : _item_sz(0) -{ - _fd[0] = -1; - _fd[1] = -1; -} - -SignalSafeQueue:: -~SignalSafeQueue() -{ - if (_fd[0] != -1) { - TEMP_FAILURE_RETRY(close(_fd[0])); - _fd[0] = -1; - } - if (_fd[1] != -1) { - TEMP_FAILURE_RETRY(close(_fd[1])); - _fd[1] = -1; - } -} - -void SignalSafeQueue:: -wake_readers_and_shutdown(void) -{ - /* Close write file descriptor. - * Readers will get EPIPE. */ - TEMP_FAILURE_RETRY(close(_fd[1])); - _fd[1] = -1; -} - -int SignalSafeQueue:: -init(size_t item_sz) -{ - int ret; - assert(_fd[0] < 0); - assert(_fd[1] < 0); - assert(_item_sz < PIPE_BUF); - - _item_sz = item_sz; - ret = pipe2(_fd, O_CLOEXEC); - if (ret) - return ret; - return 0; -} - -void SignalSafeQueue:: -push(void *buf) -{ - /* Writing less than PIPE_BUF bytes to the pipe should always be atomic */ - int ret = write(_fd[1], buf, _item_sz); - assert(ret == (int)_item_sz); -} - -int SignalSafeQueue:: -pop(void *buf) -{ - int ret; - /* Read less than PIPE_BUF bytes from the pipe should always be atomic */ - ret = read(_fd[0], buf, _item_sz); - if (ret < 0) { - ret = errno; - if (ret == EPIPE) { - /* EPIPE means that the other side has closed the pipe */ - return ret; - } - derr << "SignalSafeQueue::dequeue() failed with error " << ret << dendl; - return ret; - } - else if (ret == 0) { - return EPIPE; - } - else if (ret != (int)_item_sz) { - derr << "SignalSafeQueue::dequeue() only read " << ret << " bytes (should " - << "not be possible)" << dendl; - return EIO; - } - return 0; -} diff --git a/src/common/SignalSafeQueue.h b/src/common/SignalSafeQueue.h deleted file mode 100644 index 44f6efd920e3..000000000000 --- a/src/common/SignalSafeQueue.h +++ /dev/null @@ -1,75 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2011 New Dream Network - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef CEPH_LOCKLESS_QUEUE_H -#define CEPH_LOCKLESS_QUEUE_H - -#include - -/* A simple signal-safe queue. It uses pipe2 internally. */ - -class SignalSafeQueue -{ -public: - static SignalSafeQueue* create_queue(); - - ~SignalSafeQueue(); - - /* Unblock all readers. - * After this function has been called, no further push() operations may be - * done. However, the memory for the class will continue to exist until the - * destructor is called. - * - * Assumes no concurrent writers exist while we are shutting down. - */ - void wake_readers_and_shutdown(); - - /* Initialize the queue. Item size is set to item_sz. - * - * Returns: 0 on success; error code otherwise. - */ - int init(size_t item_sz); - - /* Puts an item into the queue using a blocking write(). - * This is safe to call from a signal handler. - * - * It is assumed that buf is a buffer of length 'item_sz' - * - * This function is reentrant and any number of writers and readers can - * exist. - */ - void push(void *buf); - - /* Blocks until there is something available in the queue. - * When it is available, it will be copied into the provided buffer, - * which we assume is of length 'item_sz' - * - * This function is reentrant and any number of writers and readers can - * exist. - * - * Returns: 0 on success - * EPIPE if the queue has been shut down - * Other error codes if an unexpected error has occurred. - */ - int pop(void *buf); - -private: - /* Force heap allocation. */ - SignalSafeQueue(); - - size_t _item_sz; - int _fd[2]; -}; - -#endif diff --git a/src/common/ceph_context.cc b/src/common/ceph_context.cc index 80aec2c658c3..b093d7301934 100644 --- a/src/common/ceph_context.cc +++ b/src/common/ceph_context.cc @@ -14,10 +14,13 @@ #include "common/DoutStreambuf.h" #include "common/ProfLogger.h" +#include "common/Thread.h" #include "common/ceph_context.h" #include "common/config.h" #include +#include +#include // FIXME // These variables are here temporarily to make the transition easier. @@ -32,20 +35,72 @@ DoutStreambuf ::traits_type> *_doss(g_ceph_context */ pthread_mutex_t _dout_lock = PTHREAD_MUTEX_INITIALIZER; +class CephContextServiceThread : public Thread +{ +public: + CephContextServiceThread(CephContext *cct) + : _reopen_logs(false), _exit_thread(false), _cct(cct) + { + sem_init(&_sem, 0, 0); + }; + + ~CephContextServiceThread() + { + sem_destroy(&_sem); + }; + + void *entry() + { + while (1) { + sem_wait(&_sem); + if (_exit_thread) { + break; + } + if (_reopen_logs) { + _cct->_doss->reopen_logs(_cct->_conf); + _reopen_logs = false; + } + } + return NULL; + } + + void reopen_logs() + { + _reopen_logs = true; + sem_post(&_sem); + } + + void exit_thread() + { + _exit_thread = true; + sem_post(&_sem); + } + +private: + volatile bool _reopen_logs; + volatile bool _exit_thread; + sem_t _sem; + CephContext *_cct; +}; + CephContext:: CephContext() : _doss(new DoutStreambuf ::traits_type>()), _dout(_doss), - _prof_logger_conf_obs(new ProfLoggerConfObs()) + _prof_logger_conf_obs(new ProfLoggerConfObs()), + _service_thread(NULL) { _conf = new md_config_t(); _conf->add_observer(_doss); _conf->add_observer(_prof_logger_conf_obs); + pthread_spin_init(&_service_thread_lock, PTHREAD_PROCESS_SHARED); } CephContext:: ~CephContext() { + join_service_thread(); + _conf->remove_observer(_prof_logger_conf_obs); _conf->remove_observer(_doss); @@ -55,4 +110,44 @@ CephContext:: _prof_logger_conf_obs = NULL; delete _conf; + pthread_spin_destroy(&_service_thread_lock); +} + +void CephContext:: +start_service_thread() +{ + pthread_spin_lock(&_service_thread_lock); + if (_service_thread) { + pthread_spin_unlock(&_service_thread_lock); + return; + } + _service_thread = new CephContextServiceThread(this); + _service_thread->create(); + pthread_spin_unlock(&_service_thread_lock); +} + +void CephContext:: +reopen_logs() +{ + pthread_spin_lock(&_service_thread_lock); + if (_service_thread) + _service_thread->reopen_logs(); + pthread_spin_unlock(&_service_thread_lock); +} + +void CephContext:: +join_service_thread() +{ + pthread_spin_lock(&_service_thread_lock); + CephContextServiceThread *thread = _service_thread; + if (!thread) { + pthread_spin_unlock(&_service_thread_lock); + return; + } + _service_thread = NULL; + pthread_spin_unlock(&_service_thread_lock); + + thread->exit_thread(); + thread->join(); + delete thread; } diff --git a/src/common/ceph_context.h b/src/common/ceph_context.h index 35aa7e599dad..1d80a772655f 100644 --- a/src/common/ceph_context.h +++ b/src/common/ceph_context.h @@ -23,6 +23,7 @@ class DoutStreambuf; class md_config_t; class md_config_obs_t; +class CephContextServiceThread; /* A CephContext represents the context held by a single library user. * There can be multiple CephContexts in the same process. @@ -39,8 +40,27 @@ public: DoutStreambuf ::traits_type> *_doss; std::ostream _dout; + /* Start the Ceph Context's service thread */ + void start_service_thread(); + + /* Reopen the log files */ + void reopen_logs(); + private: + /* Stop and join the Ceph Context's service thread */ + void join_service_thread(); + md_config_obs_t *_prof_logger_conf_obs; + + /* libcommon service thread. + * SIGHUP wakes this thread, which then reopens logfiles */ + friend class CephContextServiceThread; + CephContextServiceThread *_service_thread; + + char foo[512]; + /* lock which protects service thread creation, destruction, etc. */ + pthread_spinlock_t _service_thread_lock; + char bar[512]; }; /* Globals (FIXME: remove) */ diff --git a/src/common/ceph_crypto.cc b/src/common/ceph_crypto.cc index 940c42f40352..19b775c05885 100644 --- a/src/common/ceph_crypto.cc +++ b/src/common/ceph_crypto.cc @@ -39,6 +39,8 @@ ceph::crypto::HMACSHA1::~HMACSHA1() #elif USE_NSS void ceph::crypto::init() { + if (crypto_init) + return; crypto_init = true; SECStatus s; s = NSS_NoDB_Init(NULL); @@ -46,6 +48,8 @@ void ceph::crypto::init() { } void ceph::crypto::shutdown() { + if (!crypto_init) + return; crypto_init = false; SECStatus s; s = NSS_Shutdown(); diff --git a/src/common/common_init.cc b/src/common/common_init.cc index e335e1efd3a7..b3c2bd999dfc 100644 --- a/src/common/common_init.cc +++ b/src/common/common_init.cc @@ -294,8 +294,11 @@ void common_init_daemonize(const CephContext *cct, int flags) dout(1) << "finished common_init_daemonize" << dendl; } +/* Please be sure that this can safely be called multiple times by the + * same application. */ void common_init_finish(CephContext *cct) { ceph::crypto::init(); keyring_init(cct); + cct->start_service_thread(); } diff --git a/src/common/common_init.h b/src/common/common_init.h index 5be95b4ec22d..7459f79c437f 100644 --- a/src/common/common_init.h +++ b/src/common/common_init.h @@ -22,7 +22,6 @@ enum common_init_flags_t { CINIT_FLAG_NO_CLOSE_STDERR = 0x4, }; -int keyring_init(CephContext *cct); CephContext *common_preinit(const CephInitParameters &iparams, enum code_environment_t code_env, int flags); void complain_about_parse_errors(std::deque *parse_errors); diff --git a/src/common/signal.cc b/src/common/signal.cc index 07e3ade70b91..6bc15ed56983 100644 --- a/src/common/signal.cc +++ b/src/common/signal.cc @@ -52,7 +52,7 @@ void install_sighandler(int signum, signal_handler_t handler, int flags) void sighup_handler(int signum) { - g_ceph_context._doss->request_log_reopen(); + g_ceph_context.reopen_logs(); } static void reraise_fatal(int signum) diff --git a/src/libceph.cc b/src/libceph.cc index e8ff17ee91ac..e5b678233758 100644 --- a/src/libceph.cc +++ b/src/libceph.cc @@ -70,6 +70,8 @@ public: if (mounted) return -EDOM; + common_init_finish(cct); + //monmap monclient = new MonClient(); if (monclient->build_initial_monmap() < 0) { @@ -283,9 +285,6 @@ extern "C" int ceph_conf_get(struct ceph_mount_info *cmount, const char *option, extern "C" int ceph_mount(struct ceph_mount_info *cmount, const char *root) { std::string mount_root; - - keyring_init(cmount->get_ceph_context()); - if (root) mount_root = root; return cmount->mount(mount_root); diff --git a/src/librados.cc b/src/librados.cc index d3290d30acce..8e3f2e6342a2 100644 --- a/src/librados.cc +++ b/src/librados.cc @@ -638,6 +638,8 @@ IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s) int librados::RadosClient:: connect() { + common_init_finish(cct); + int err; uint64_t nonce; @@ -2646,9 +2648,6 @@ init_with_context(CephContext *cct_) int librados::Rados:: connect() { - int ret = keyring_init(client->cct); - if (ret) - return ret; return client->connect(); } @@ -2853,12 +2852,8 @@ extern "C" int rados_create_with_context(rados_t *pcluster, CephContext *cct_) extern "C" int rados_connect(rados_t cluster) { - librados::RadosClient *radosp = (librados::RadosClient *)cluster; - int ret = keyring_init(radosp->cct); - if (ret) - return ret; - - return radosp->connect(); + librados::RadosClient *client = (librados::RadosClient *)cluster; + return client->connect(); } extern "C" void rados_shutdown(rados_t cluster) diff --git a/src/mds/MDS.cc b/src/mds/MDS.cc index d1ae8354255f..d2cdaaccd844 100644 --- a/src/mds/MDS.cc +++ b/src/mds/MDS.cc @@ -586,8 +586,6 @@ void MDS::tick() if (snapserver) snapserver->check_osd_map(false); } - - g_ceph_context._doss->handle_log_reopen_requests(&g_conf); } diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 49d8e488ab7d..c27bc03e7967 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -965,8 +965,6 @@ void Monitor::tick() } } - g_ceph_context._doss->handle_log_reopen_requests(&g_conf); - new_tick(); } diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index aa9b6039fb57..549e0ba2e1a8 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -1792,8 +1792,6 @@ void OSD::tick() dispatch_running = false; dispatch_cond.Signal(); } - - g_ceph_context._doss->handle_log_reopen_requests(&g_conf); } // ========================================= diff --git a/src/rgw/librgw.cc b/src/rgw/librgw.cc index 58fecdbfd39f..e0f07776cbf5 100644 --- a/src/rgw/librgw.cc +++ b/src/rgw/librgw.cc @@ -32,6 +32,7 @@ static int librgw_initialized = 0; int librgw_create(librgw_t *rgw, const char * const id) { librgw_init_mutex.Lock(); + CephContext *cct; if (!librgw_initialized) { CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT, CEPH_CONF_FILE_DEFAULT); iparams.conf_file = ""; @@ -44,6 +45,10 @@ int librgw_create(librgw_t *rgw, const char * const id) cct->_conf->apply_changes(); ++librgw_initialized; + common_init_finish(cct); + } + else { + cct = &g_ceph_context; } librgw_init_mutex.Unlock(); *rgw = &g_ceph_context; diff --git a/src/test/SignalSafeQueue.cc b/src/test/SignalSafeQueue.cc deleted file mode 100644 index 41cd9931d687..000000000000 --- a/src/test/SignalSafeQueue.cc +++ /dev/null @@ -1,185 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -// vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2011 New Dream Network - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#include "common/errno.h" -#include "common/SignalSafeQueue.h" -#include "gtest/gtest.h" - -#include -#include -#include - -using std::cout; - -void *SUCCESS_RET = 0; -void *ERROR_RET = (void*)-1; - -static void *dthread1(void *v) -{ - SignalSafeQueue *ssq = (SignalSafeQueue*)v; - int item; - int ret = ssq->pop((void*)&item); - if (ret) { - cout << "pop returned " << ret << "(" << cpp_strerror(ret) << std::endl; - pthread_exit(ERROR_RET); - } - if (item != 123) { - cout << "expected 123, got " << item << std::endl; - pthread_exit(ERROR_RET); - } - pthread_exit(SUCCESS_RET); -} - -TEST(EnqueueDequeue, Test1) { - int i, ret; - void *thread_ret; - SignalSafeQueue *ssq = SignalSafeQueue::create_queue(); - ret = ssq->init(sizeof(int)); - ASSERT_EQ(ret, 0); - - pthread_t t1; - pthread_create(&t1, NULL, dthread1, (void*)ssq); - - i = 123; - ssq->push((void*)&i); - - ret = pthread_join(t1, &thread_ret); - ASSERT_EQ(ret, 0); - ASSERT_EQ(thread_ret, SUCCESS_RET); - - delete ssq; -} - -static int test2_total = 0; - -void increment_test2_total(int amount) -{ - static pthread_mutex_t test2_lock = PTHREAD_MUTEX_INITIALIZER; - pthread_mutex_lock(&test2_lock); - test2_total += amount; - pthread_mutex_unlock(&test2_lock); -} - -static void *dthread2(void *v) -{ - SignalSafeQueue *ssq = (SignalSafeQueue*)v; - int item; - int ret = ssq->pop((void*)&item); - if (ret) { - cout << "pop returned " << ret << "(" << cpp_strerror(ret) << std::endl; - pthread_exit(ERROR_RET); - } - increment_test2_total(item); - - ret = ssq->pop((void*)&item); - if (ret) { - cout << "pop returned " << ret << "(" << cpp_strerror(ret) << std::endl; - pthread_exit(ERROR_RET); - } - increment_test2_total(item); - - pthread_exit(SUCCESS_RET); -} - -TEST(EnqueueDequeue, Test2) { - int i, ret; - void *thread_ret; - SignalSafeQueue *ssq = SignalSafeQueue::create_queue(); - ret = ssq->init(sizeof(int)); - ASSERT_EQ(ret, 0); - - pthread_t t2_threadA, t2_threadB; - pthread_create(&t2_threadA, NULL, dthread2, (void*)ssq); - pthread_create(&t2_threadB, NULL, dthread2, (void*)ssq); - - i = 50; - ssq->push((void*)&i); - i = 100; - ssq->push((void*)&i); - i = 0; - ssq->push((void*)&i); - i = 50; - ssq->push((void*)&i); - - ret = pthread_join(t2_threadA, &thread_ret); - ASSERT_EQ(ret, 0); - ASSERT_EQ(thread_ret, SUCCESS_RET); - - ret = pthread_join(t2_threadB, &thread_ret); - ASSERT_EQ(ret, 0); - ASSERT_EQ(thread_ret, SUCCESS_RET); - - ASSERT_EQ(test2_total, 200); - - delete ssq; -} - -static pthread_mutex_t shutdown_test_lock = PTHREAD_MUTEX_INITIALIZER; - -void *shutdown_thread(void *v) -{ - int ret, i; - SignalSafeQueue *ssq = (SignalSafeQueue*)v; - - ret = ssq->pop((void*)&i); - if (ret != 0) { - printf("shutdown_thread: failed to pop the first element off the queue. " - "Error %d\n", ret); - pthread_exit(ERROR_RET); - } - if (i != 456) { - printf("shutdown_thread: unexpected value for first element. " - "Got %d, Expected %d\n", i, 456); - pthread_exit(ERROR_RET); - } - - pthread_mutex_lock(&shutdown_test_lock); - // block until the parent has finished shutting down the queue - pthread_mutex_unlock(&shutdown_test_lock); - - ret = ssq->pop((void*)&i); - if (ret == EPIPE) { - pthread_exit(SUCCESS_RET); - } - printf("shutdown_thread: expected to get EPIPE, but got return code %d\n", - ret); - pthread_exit(ERROR_RET); -} - -TEST(ShutdownTest, ShutdownTest1) { - int i, ret; - void *thread_ret; - SignalSafeQueue *ssq = SignalSafeQueue::create_queue(); - ret = ssq->init(sizeof(int)); - ASSERT_EQ(ret, 0); - - pthread_mutex_lock(&shutdown_test_lock); - pthread_t s_thread; - pthread_create(&s_thread, NULL, shutdown_thread, (void*)ssq); - - // send 456 - i = 456; - ssq->push((void*)&i); - - // shutdown - ssq->wake_readers_and_shutdown(); - - pthread_mutex_unlock(&shutdown_test_lock); - - ret = pthread_join(s_thread, &thread_ret); - ASSERT_EQ(ret, 0); - ASSERT_EQ(thread_ret, SUCCESS_RET); - - delete ssq; -}