]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Create a libcommon service thread
authorColin Patrick McCabe <cmccabe@alumni.cmu.edu>
Mon, 23 May 2011 23:25:57 +0000 (16:25 -0700)
committerColin Patrick McCabe <cmccabe@alumni.cmu.edu>
Tue, 24 May 2011 19:21:02 +0000 (12:21 -0700)
Create a libcommon service thread. Use it to handle SIGHUP.

Handle it by means of a flag that gets set. Using a queue would raise
the complicated question of what to do when the queue was full.

Signed-off-by: Colin McCabe <colin.mccabe@dreamhost.com>
18 files changed:
src/Makefile.am
src/common/DoutStreambuf.cc
src/common/DoutStreambuf.h
src/common/SignalSafeQueue.cc [deleted file]
src/common/SignalSafeQueue.h [deleted file]
src/common/ceph_context.cc
src/common/ceph_context.h
src/common/ceph_crypto.cc
src/common/common_init.cc
src/common/common_init.h
src/common/signal.cc
src/libceph.cc
src/librados.cc
src/mds/MDS.cc
src/mon/Monitor.cc
src/osd/OSD.cc
src/rgw/librgw.cc
src/test/SignalSafeQueue.cc [deleted file]

index 376eebd8e8ec15c1f44d487868f0e0c763ac9fbe..7cc922c6294e807a7f517954db360c4584d9935d 100644 (file)
@@ -456,12 +456,6 @@ unittest_librgw_CXXFLAGS = ${CRYPTO_CFLAGS} ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS}
 check_PROGRAMS += unittest_librgw
 endif
 
-unittest_SignalSafeQueue_SOURCES = test/SignalSafeQueue.cc
-unittest_SignalSafeQueue_LDFLAGS = -pthread -lcurl ${AM_LDFLAGS}
-unittest_SignalSafeQueue_LDADD =  ${CRYPTO_LIBS} libcommon.a ${UNITTEST_LDADD}
-unittest_SignalSafeQueue_CXXFLAGS = ${CRYPTO_CFLAGS} ${AM_CXXFLAGS} ${UNITTEST_CXXFLAGS}
-check_PROGRAMS += unittest_SignalSafeQueue
-
 # shell scripts
 editpaths = sed \
        -e 's|@bindir[@]|$(bindir)|g' \
@@ -618,7 +612,6 @@ libcommon_files = \
        common/Clock.cc \
        common/Timer.cc \
        common/Finisher.cc \
-       common/SignalSafeQueue.cc \
        common/environment.cc\
        common/sctp_crc32.c\
        common/assert.cc \
@@ -800,7 +793,6 @@ noinst_HEADERS = \
         common/ConfUtils.h\
         common/DecayCounter.h\
         common/Finisher.h\
-        common/SignalSafeQueue.h\
         common/ProfLogType.h\
         common/ProfLogger.h\
         common/MemoryModel.h\
index 2c73bcc58d8d515a0d86e0081984b33e4cfb02a6..8ac56acc665c9a7bec1ae1f0dc9b4a38e91a7eb6 100644 (file)
@@ -137,15 +137,13 @@ static int create_symlink(string oldpath, const string &newpath)
 ///////////////////////////// DoutStreambuf /////////////////////////////
 template <typename charT, typename traits>
 DoutStreambuf<charT, traits>::DoutStreambuf()
-  : flags(0), ofd(-1), rlr(false)
+  : flags(0), ofd(-1)
 {
   // Initialize get pointer to zero so that underflow is called on the first read.
   this->setg(0, 0, 0);
 
   // Initialize output_buffer
   _clear_output_buffer();
-
-  pthread_spin_init(&rlr_lock, PTHREAD_PROCESS_PRIVATE);
 }
 
 template <typename charT, typename traits>
@@ -155,7 +153,6 @@ DoutStreambuf<charT, traits>::~DoutStreambuf()
     TEMP_FAILURE_RETRY(::close(ofd));
     ofd = -1;
   }
-  pthread_spin_destroy(&rlr_lock);
 }
 
 // This function is called when the output buffer is filled.
@@ -388,23 +385,8 @@ DoutStreambuf<charT, traits>::underflow()
 
 template <typename charT, typename traits>
 void DoutStreambuf<charT, traits>::
-request_log_reopen(void)
-{
-  pthread_spin_lock(&rlr_lock);
-  rlr = true;
-  pthread_spin_unlock(&rlr_lock);
-}
-
-template <typename charT, typename traits>
-void DoutStreambuf<charT, traits>::
-handle_log_reopen_requests(const md_config_t *conf)
+reopen_logs(const md_config_t *conf)
 {
-  bool need;
-  pthread_spin_lock(&rlr_lock);
-  need = rlr;
-  pthread_spin_unlock(&rlr_lock);
-  if (!need)
-    return;
   std::set <std::string> changed;
   const char **keys = get_tracked_conf_keys();
   for (const char **k = keys; *k; ++k) {
index 4731ae2f6059a3132abd56a8f2aaaec83754bc2c..b2bcfb299da61d25d83eaa3aef6e41e6545285a6 100644 (file)
@@ -75,17 +75,8 @@ public:
   // (if those sinks are active)
   void dout_emergency_to_file_and_syslog(const char * const str) const;
 
-  // The next two functions are used to implement the Ceph daemons'
-  // SIGHUP handling.
-
-  // Set the request_log_reopen bit.
-  // Signal-safe.
-  void request_log_reopen(void);
-
-  // Read the request_log_reopen bit.
-  // If it's set, reopen the log.
-  // This is meant to be called from an event loop. Not signal-safe.
-  void handle_log_reopen_requests(const md_config_t *conf);
+  // Reopen the logs
+  void reopen_logs(const md_config_t *conf);
 
 protected:
   // Called when the buffer fills up
@@ -120,9 +111,6 @@ private:
   std::string opath;
   std::string symlink_dir;
   std::string isym_path;
-
-  pthread_spinlock_t rlr_lock;
-  bool rlr;
 };
 
 // Secret evil interfaces for writing logs without taking the lock.
diff --git a/src/common/SignalSafeQueue.cc b/src/common/SignalSafeQueue.cc
deleted file mode 100644 (file)
index e8c8d4a..0000000
+++ /dev/null
@@ -1,110 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2011 New Dream Network
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-#include "common/config.h"
-#include "common/SignalSafeQueue.h"
-
-#include <errno.h>
-#include <fcntl.h>
-#include <limits.h>
-#include <pthread.h>
-#include <stdio.h>
-#include <sys/mman.h>
-#include <sys/stat.h>
-#include <unistd.h>
-
-SignalSafeQueue *SignalSafeQueue::
-create_queue()
-{
-  return new SignalSafeQueue();
-}
-
-SignalSafeQueue::
-SignalSafeQueue()
-  : _item_sz(0)
-{
-  _fd[0] = -1;
-  _fd[1] = -1;
-}
-
-SignalSafeQueue::
-~SignalSafeQueue()
-{
-  if (_fd[0] != -1) {
-    TEMP_FAILURE_RETRY(close(_fd[0]));
-    _fd[0] = -1;
-  }
-  if (_fd[1] != -1) {
-    TEMP_FAILURE_RETRY(close(_fd[1]));
-    _fd[1] = -1;
-  }
-}
-
-void SignalSafeQueue::
-wake_readers_and_shutdown(void)
-{
-  /* Close write file descriptor.
-   * Readers will get EPIPE. */
-  TEMP_FAILURE_RETRY(close(_fd[1]));
-  _fd[1] = -1;
-}
-
-int SignalSafeQueue::
-init(size_t item_sz)
-{
-  int ret;
-  assert(_fd[0] < 0);
-  assert(_fd[1] < 0);
-  assert(_item_sz < PIPE_BUF);
-
-  _item_sz = item_sz;
-  ret = pipe2(_fd, O_CLOEXEC);
-  if (ret)
-    return ret;
-  return 0;
-}
-
-void SignalSafeQueue::
-push(void *buf)
-{
-  /* Writing less than PIPE_BUF bytes to the pipe should always be atomic */
-  int ret = write(_fd[1], buf, _item_sz);
-  assert(ret == (int)_item_sz);
-}
-
-int SignalSafeQueue::
-pop(void *buf)
-{
-  int ret;
-  /* Read less than PIPE_BUF bytes from the pipe should always be atomic */
-  ret = read(_fd[0], buf, _item_sz);
-  if (ret < 0) {
-    ret = errno;
-    if (ret == EPIPE) {
-      /* EPIPE means that the other side has closed the pipe */
-      return ret;
-    }
-    derr << "SignalSafeQueue::dequeue() failed with error " << ret << dendl;
-    return ret;
-  }
-  else if (ret == 0) {
-    return EPIPE;
-  }
-  else if (ret != (int)_item_sz) {
-    derr << "SignalSafeQueue::dequeue() only read " << ret << " bytes (should "
-         << "not be possible)" << dendl;
-    return EIO; 
-  }
-  return 0;
-}
diff --git a/src/common/SignalSafeQueue.h b/src/common/SignalSafeQueue.h
deleted file mode 100644 (file)
index 44f6efd..0000000
+++ /dev/null
@@ -1,75 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2011 New Dream Network
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-#ifndef CEPH_LOCKLESS_QUEUE_H
-#define CEPH_LOCKLESS_QUEUE_H
-
-#include <unistd.h>
-
-/* A simple signal-safe queue. It uses pipe2 internally. */
-
-class SignalSafeQueue
-{
-public:
-  static SignalSafeQueue* create_queue();
-
-  ~SignalSafeQueue();
-
-  /* Unblock all readers.
-   * After this function has been called, no further push() operations may be
-   * done. However, the memory for the class will continue to exist until the
-   * destructor is called.
-   *
-   * Assumes no concurrent writers exist while we are shutting down.
-   */
-  void wake_readers_and_shutdown();
-
-  /* Initialize the queue. Item size is set to item_sz.
-   *
-   * Returns: 0 on success; error code otherwise.
-   */
-  int init(size_t item_sz);
-
-  /* Puts an item into the queue using a blocking write().
-   * This is safe to call from a signal handler.
-   *
-   * It is assumed that buf is a buffer of length 'item_sz'
-   *
-   * This function is reentrant and any number of writers and readers can
-   * exist.
-   */
-  void push(void *buf);
-
-  /* Blocks until there is something available in the queue.
-   * When it is available, it will be copied into the provided buffer,
-   * which we assume is of length 'item_sz'
-   *
-   * This function is reentrant and any number of writers and readers can
-   * exist.
-   *
-   * Returns: 0 on success
-   *          EPIPE if the queue has been shut down
-   *         Other error codes if an unexpected error has occurred.
-   */
-  int pop(void *buf);
-
-private:
-  /* Force heap allocation. */
-  SignalSafeQueue();
-
-  size_t _item_sz;
-  int _fd[2];
-};
-
-#endif
index 80aec2c658c35e46a157c8bbb44239ba60fffb02..b093d730193437a61e0954618af9143b597aad39 100644 (file)
 
 #include "common/DoutStreambuf.h"
 #include "common/ProfLogger.h"
+#include "common/Thread.h"
 #include "common/ceph_context.h"
 #include "common/config.h"
 
 #include <iostream>
+#include <pthread.h>
+#include <semaphore.h>
 
 // FIXME
 // These variables are here temporarily to make the transition easier.
@@ -32,20 +35,72 @@ DoutStreambuf <char, std::basic_string<char>::traits_type> *_doss(g_ceph_context
  */
 pthread_mutex_t _dout_lock = PTHREAD_MUTEX_INITIALIZER;
 
+class CephContextServiceThread : public Thread
+{
+public:
+  CephContextServiceThread(CephContext *cct)
+    : _reopen_logs(false), _exit_thread(false), _cct(cct)
+  {
+    sem_init(&_sem, 0, 0);
+  };
+
+  ~CephContextServiceThread()
+  {
+    sem_destroy(&_sem);
+  };
+
+  void *entry()
+  {
+    while (1) {
+      sem_wait(&_sem);
+      if (_exit_thread) {
+       break;
+      }
+      if (_reopen_logs) {
+       _cct->_doss->reopen_logs(_cct->_conf);
+       _reopen_logs = false;
+      }
+    }
+    return NULL;
+  }
+
+  void reopen_logs()
+  {
+    _reopen_logs = true;
+    sem_post(&_sem);
+  }
+
+  void exit_thread()
+  {
+    _exit_thread = true;
+    sem_post(&_sem);
+  }
+
+private:
+  volatile bool _reopen_logs;
+  volatile bool _exit_thread;
+  sem_t _sem;
+  CephContext *_cct;
+};
+
 CephContext::
 CephContext()
   : _doss(new DoutStreambuf <char, std::basic_string<char>::traits_type>()),
     _dout(_doss),
-    _prof_logger_conf_obs(new ProfLoggerConfObs())
+    _prof_logger_conf_obs(new ProfLoggerConfObs()),
+    _service_thread(NULL)
 {
   _conf = new md_config_t();
   _conf->add_observer(_doss);
   _conf->add_observer(_prof_logger_conf_obs);
+  pthread_spin_init(&_service_thread_lock, PTHREAD_PROCESS_SHARED);
 }
 
 CephContext::
 ~CephContext()
 {
+  join_service_thread();
+
   _conf->remove_observer(_prof_logger_conf_obs);
   _conf->remove_observer(_doss);
 
@@ -55,4 +110,44 @@ CephContext::
   _prof_logger_conf_obs = NULL;
 
   delete _conf;
+  pthread_spin_destroy(&_service_thread_lock);
+}
+
+void CephContext::
+start_service_thread()
+{
+  pthread_spin_lock(&_service_thread_lock);
+  if (_service_thread) {
+    pthread_spin_unlock(&_service_thread_lock);
+    return;
+  }
+  _service_thread = new CephContextServiceThread(this);
+  _service_thread->create();
+  pthread_spin_unlock(&_service_thread_lock);
+}
+
+void CephContext::
+reopen_logs()
+{
+  pthread_spin_lock(&_service_thread_lock);
+  if (_service_thread)
+    _service_thread->reopen_logs();
+  pthread_spin_unlock(&_service_thread_lock);
+}
+
+void CephContext::
+join_service_thread()
+{
+  pthread_spin_lock(&_service_thread_lock);
+  CephContextServiceThread *thread = _service_thread;
+  if (!thread) {
+    pthread_spin_unlock(&_service_thread_lock);
+    return;
+  }
+  _service_thread = NULL;
+  pthread_spin_unlock(&_service_thread_lock);
+
+  thread->exit_thread();
+  thread->join();
+  delete thread;
 }
index 35aa7e599dad52a81893303c63081420d45204e5..1d80a772655f98121bbbc41d7d23882e9872a0b0 100644 (file)
@@ -23,6 +23,7 @@ class DoutStreambuf;
 
 class md_config_t;
 class md_config_obs_t;
+class CephContextServiceThread;
 
 /* A CephContext represents the context held by a single library user.
  * There can be multiple CephContexts in the same process.
@@ -39,8 +40,27 @@ public:
   DoutStreambuf <char, std::basic_string<char>::traits_type> *_doss;
   std::ostream _dout;
 
+  /* Start the Ceph Context's service thread */
+  void start_service_thread();
+
+  /* Reopen the log files */
+  void reopen_logs();
+
 private:
+  /* Stop and join the Ceph Context's service thread */
+  void join_service_thread();
+
   md_config_obs_t *_prof_logger_conf_obs;
+
+  /* libcommon service thread.
+   * SIGHUP wakes this thread, which then reopens logfiles */
+  friend class CephContextServiceThread;
+  CephContextServiceThread *_service_thread;
+
+  char foo[512];
+  /* lock which protects service thread creation, destruction, etc. */
+  pthread_spinlock_t _service_thread_lock;
+  char bar[512];
 };
 
 /* Globals (FIXME: remove) */ 
index 940c42f40352bd93f82187875d3a8eeb2bbe8609..19b775c05885dfc4fb32795109f5b7ecd0216f9c 100644 (file)
@@ -39,6 +39,8 @@ ceph::crypto::HMACSHA1::~HMACSHA1()
 #elif USE_NSS
 
 void ceph::crypto::init() {
+  if (crypto_init)
+    return;
   crypto_init = true;
   SECStatus s;
   s = NSS_NoDB_Init(NULL);
@@ -46,6 +48,8 @@ void ceph::crypto::init() {
 }
 
 void ceph::crypto::shutdown() {
+  if (!crypto_init)
+    return;
   crypto_init = false;
   SECStatus s;
   s = NSS_Shutdown();
index e335e1efd3a754f911fc613a366b4aa3ff69460b..b3c2bd999dfc775a1f1ec950aa40f4680d4f7636 100644 (file)
@@ -294,8 +294,11 @@ void common_init_daemonize(const CephContext *cct, int flags)
   dout(1) << "finished common_init_daemonize" << dendl;
 }
 
+/* Please be sure that this can safely be called multiple times by the
+ * same application. */
 void common_init_finish(CephContext *cct)
 {
   ceph::crypto::init();
   keyring_init(cct);
+  cct->start_service_thread();
 }
index 5be95b4ec22d099aa0d48cfa64aae0c11cbae65b..7459f79c437fd040e6b75ee65d1866a3a488a35c 100644 (file)
@@ -22,7 +22,6 @@ enum common_init_flags_t {
   CINIT_FLAG_NO_CLOSE_STDERR = 0x4,
 };
 
-int keyring_init(CephContext *cct);
 CephContext *common_preinit(const CephInitParameters &iparams,
                            enum code_environment_t code_env, int flags);
 void complain_about_parse_errors(std::deque<std::string> *parse_errors);
index 07e3ade70b91f48a2477f1b78418f3a10002d9bc..6bc15ed569839369000b847783ae7dfed595c0bf 100644 (file)
@@ -52,7 +52,7 @@ void install_sighandler(int signum, signal_handler_t handler, int flags)
 
 void sighup_handler(int signum)
 {
-  g_ceph_context._doss->request_log_reopen();
+  g_ceph_context.reopen_logs();
 }
 
 static void reraise_fatal(int signum)
index e8ff17ee91acaebb2c62c68594e25a38e9f4026f..e5b6782337587d7c69893f7ff35838db6c58cfe9 100644 (file)
@@ -70,6 +70,8 @@ public:
     if (mounted)
       return -EDOM;
 
+    common_init_finish(cct);
+
     //monmap
     monclient = new MonClient();
     if (monclient->build_initial_monmap() < 0) {
@@ -283,9 +285,6 @@ extern "C" int ceph_conf_get(struct ceph_mount_info *cmount, const char *option,
 extern "C" int ceph_mount(struct ceph_mount_info *cmount, const char *root)
 {
   std::string mount_root;
-
-  keyring_init(cmount->get_ceph_context());
-
   if (root)
     mount_root = root;
   return cmount->mount(mount_root);
index d3290d30acce6ea63186bf12d6ce89b18361a770..8e3f2e6342a2e34c03ec4249719c801fbfe375b3 100644 (file)
@@ -638,6 +638,8 @@ IoCtxImpl(RadosClient *c, int pid, const char *pool_name_, snapid_t s)
 int librados::RadosClient::
 connect()
 {
+  common_init_finish(cct);
+
   int err;
   uint64_t nonce;
 
@@ -2646,9 +2648,6 @@ init_with_context(CephContext *cct_)
 int librados::Rados::
 connect()
 {
-  int ret = keyring_init(client->cct);
-  if (ret)
-    return ret;
   return client->connect();
 }
 
@@ -2853,12 +2852,8 @@ extern "C" int rados_create_with_context(rados_t *pcluster, CephContext *cct_)
 
 extern "C" int rados_connect(rados_t cluster)
 {
-  librados::RadosClient *radosp = (librados::RadosClient *)cluster;
-  int ret = keyring_init(radosp->cct);
-  if (ret)
-    return ret;
-
-  return radosp->connect();
+  librados::RadosClient *client = (librados::RadosClient *)cluster;
+  return client->connect();
 }
 
 extern "C" void rados_shutdown(rados_t cluster)
index d1ae8354255fef32e326b1e27d73e946deabd4e4..d2cdaaccd8442320c21c616f7f4e87c903a8dd5c 100644 (file)
@@ -586,8 +586,6 @@ void MDS::tick()
     if (snapserver)
       snapserver->check_osd_map(false);
   }
-
-  g_ceph_context._doss->handle_log_reopen_requests(&g_conf);
 }
 
 
index 49d8e488ab7d6960dfc4acc13bb130f5414e4e79..c27bc03e7967e02751847dd295e8a51288f360a3 100644 (file)
@@ -965,8 +965,6 @@ void Monitor::tick()
     }
   }
 
-  g_ceph_context._doss->handle_log_reopen_requests(&g_conf);
-
   new_tick();
 }
 
index aa9b6039fb571abe3b8f59f420ac6fe88f741e51..549e0ba2e1a8fbb9ede46fa34105ce5f3589fbc4 100644 (file)
@@ -1792,8 +1792,6 @@ void OSD::tick()
     dispatch_running = false;
     dispatch_cond.Signal();
   }
-
-  g_ceph_context._doss->handle_log_reopen_requests(&g_conf);
 }
 
 // =========================================
index 58fecdbfd39f8de2afe40873900e3c258f1a4ec2..e0f07776cbf592f82eb90d58284b9800de441423 100644 (file)
@@ -32,6 +32,7 @@ static int librgw_initialized = 0;
 int librgw_create(librgw_t *rgw, const char * const id)
 {
   librgw_init_mutex.Lock();
+  CephContext *cct;
   if (!librgw_initialized) {
     CephInitParameters iparams(CEPH_ENTITY_TYPE_CLIENT, CEPH_CONF_FILE_DEFAULT);
     iparams.conf_file = "";
@@ -44,6 +45,10 @@ int librgw_create(librgw_t *rgw, const char * const id)
     cct->_conf->apply_changes();
 
     ++librgw_initialized;
+    common_init_finish(cct);
+  }
+  else {
+    cct = &g_ceph_context;
   }
   librgw_init_mutex.Unlock();
   *rgw = &g_ceph_context;
diff --git a/src/test/SignalSafeQueue.cc b/src/test/SignalSafeQueue.cc
deleted file mode 100644 (file)
index 41cd993..0000000
+++ /dev/null
@@ -1,185 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2011 New Dream Network
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-#include "common/errno.h"
-#include "common/SignalSafeQueue.h"
-#include "gtest/gtest.h"
-
-#include <errno.h>
-#include <iostream>
-#include <pthread.h>
-
-using std::cout;
-
-void *SUCCESS_RET = 0;
-void *ERROR_RET = (void*)-1;
-
-static void *dthread1(void *v)
-{
-  SignalSafeQueue *ssq = (SignalSafeQueue*)v;
-  int item;
-  int ret = ssq->pop((void*)&item);
-  if (ret) {
-    cout << "pop returned " << ret << "(" << cpp_strerror(ret) << std::endl;
-    pthread_exit(ERROR_RET);
-  }
-  if (item != 123) {
-    cout << "expected 123, got " << item << std::endl;
-    pthread_exit(ERROR_RET);
-  }
-  pthread_exit(SUCCESS_RET);
-}
-
-TEST(EnqueueDequeue, Test1) {
-  int i, ret;
-  void *thread_ret;
-  SignalSafeQueue *ssq = SignalSafeQueue::create_queue();
-  ret = ssq->init(sizeof(int));
-  ASSERT_EQ(ret, 0);
-
-  pthread_t t1;
-  pthread_create(&t1, NULL, dthread1, (void*)ssq);
-
-  i = 123;
-  ssq->push((void*)&i);
-
-  ret = pthread_join(t1, &thread_ret);
-  ASSERT_EQ(ret, 0);
-  ASSERT_EQ(thread_ret, SUCCESS_RET);
-
-  delete ssq;
-}
-
-static int test2_total = 0;
-
-void increment_test2_total(int amount)
-{
-  static pthread_mutex_t test2_lock = PTHREAD_MUTEX_INITIALIZER;
-  pthread_mutex_lock(&test2_lock);
-  test2_total += amount;
-  pthread_mutex_unlock(&test2_lock);
-}
-
-static void *dthread2(void *v)
-{
-  SignalSafeQueue *ssq = (SignalSafeQueue*)v;
-  int item;
-  int ret = ssq->pop((void*)&item);
-  if (ret) {
-    cout << "pop returned " << ret << "(" << cpp_strerror(ret) << std::endl;
-    pthread_exit(ERROR_RET);
-  }
-  increment_test2_total(item);
-
-  ret = ssq->pop((void*)&item);
-  if (ret) {
-    cout << "pop returned " << ret << "(" << cpp_strerror(ret) << std::endl;
-    pthread_exit(ERROR_RET);
-  }
-  increment_test2_total(item);
-
-  pthread_exit(SUCCESS_RET);
-}
-
-TEST(EnqueueDequeue, Test2) {
-  int i, ret;
-  void *thread_ret;
-  SignalSafeQueue *ssq = SignalSafeQueue::create_queue();
-  ret = ssq->init(sizeof(int));
-  ASSERT_EQ(ret, 0);
-
-  pthread_t t2_threadA, t2_threadB;
-  pthread_create(&t2_threadA, NULL, dthread2, (void*)ssq);
-  pthread_create(&t2_threadB, NULL, dthread2, (void*)ssq);
-
-  i = 50;
-  ssq->push((void*)&i);
-  i = 100;
-  ssq->push((void*)&i);
-  i = 0;
-  ssq->push((void*)&i);
-  i = 50;
-  ssq->push((void*)&i);
-
-  ret = pthread_join(t2_threadA, &thread_ret);
-  ASSERT_EQ(ret, 0);
-  ASSERT_EQ(thread_ret, SUCCESS_RET);
-
-  ret = pthread_join(t2_threadB, &thread_ret);
-  ASSERT_EQ(ret, 0);
-  ASSERT_EQ(thread_ret, SUCCESS_RET);
-
-  ASSERT_EQ(test2_total, 200);
-
-  delete ssq;
-}
-
-static pthread_mutex_t shutdown_test_lock = PTHREAD_MUTEX_INITIALIZER;
-
-void *shutdown_thread(void *v)
-{
-  int ret, i;
-  SignalSafeQueue *ssq = (SignalSafeQueue*)v;
-
-  ret = ssq->pop((void*)&i);
-  if (ret != 0) {
-    printf("shutdown_thread: failed to pop the first element off the queue. "
-          "Error %d\n", ret);
-    pthread_exit(ERROR_RET);
-  }
-  if (i != 456) {
-    printf("shutdown_thread: unexpected value for first element. "
-          "Got %d, Expected %d\n", i, 456);
-    pthread_exit(ERROR_RET);
-  }
-
-  pthread_mutex_lock(&shutdown_test_lock);
-  // block until the parent has finished shutting down the queue
-  pthread_mutex_unlock(&shutdown_test_lock);
-
-  ret = ssq->pop((void*)&i);
-  if (ret == EPIPE) {
-    pthread_exit(SUCCESS_RET);
-  }
-  printf("shutdown_thread: expected to get EPIPE, but got return code %d\n",
-        ret);
-  pthread_exit(ERROR_RET);
-}
-
-TEST(ShutdownTest, ShutdownTest1) {
-  int i, ret;
-  void *thread_ret;
-  SignalSafeQueue *ssq = SignalSafeQueue::create_queue();
-  ret = ssq->init(sizeof(int));
-  ASSERT_EQ(ret, 0);
-
-  pthread_mutex_lock(&shutdown_test_lock);
-  pthread_t s_thread;
-  pthread_create(&s_thread, NULL, shutdown_thread, (void*)ssq);
-
-  // send 456
-  i = 456;
-  ssq->push((void*)&i);
-
-  // shutdown
-  ssq->wake_readers_and_shutdown();
-
-  pthread_mutex_unlock(&shutdown_test_lock);
-
-  ret = pthread_join(s_thread, &thread_ret);
-  ASSERT_EQ(ret, 0);
-  ASSERT_EQ(thread_ret, SUCCESS_RET);
-
-  delete ssq;
-}