]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
core: introduce DirectMessenger 14755/head
authorCasey Bodley <cbodley@redhat.com>
Sun, 13 Mar 2016 20:38:20 +0000 (16:38 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 25 Apr 2017 14:37:50 +0000 (10:37 -0400)
DirectMessenger provides an efficient mechanism to support
in-process embedding of Ceph components (e.g., embedding of the
Ceph OSD in storage targets such as NFSv4 or iSCSI targets).

Signed-off-by: Casey Bodley <cbodley@redhat.com>
Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
15 files changed:
src/CMakeLists.txt
src/msg/DispatchStrategy.h [new file with mode: 0644]
src/msg/FastStrategy.h [new file with mode: 0644]
src/msg/QueueStrategy.cc [new file with mode: 0644]
src/msg/QueueStrategy.h [new file with mode: 0644]
src/msg/xio/DispatchStrategy.h [deleted file]
src/msg/xio/FastStrategy.h [deleted file]
src/msg/xio/QueueStrategy.cc [deleted file]
src/msg/xio/QueueStrategy.h [deleted file]
src/test/CMakeLists.txt
src/test/direct_messenger/CMakeLists.txt [new file with mode: 0644]
src/test/direct_messenger/DirectMessenger.cc [new file with mode: 0644]
src/test/direct_messenger/DirectMessenger.h [new file with mode: 0644]
src/test/direct_messenger/test_direct_messenger.cc [new file with mode: 0644]
src/test/messenger/CMakeLists.txt

index f5e90761586c2728e815e9b9a3b9562d6c43f08b..77b647bf6f6de6b2b3fc9cc26e4079455ac67c18 100644 (file)
@@ -339,8 +339,7 @@ if(HAVE_XIO)
     msg/xio/XioMsg.cc
     msg/xio/XioPool.cc
     msg/xio/XioMessenger.cc
-    msg/xio/XioPortal.cc
-    msg/xio/QueueStrategy.cc)
+    msg/xio/XioPortal.cc)
 endif(HAVE_XIO)
 
 set(async_rdma_common_srcs)
@@ -448,6 +447,7 @@ set(libcommon_files
   msg/async/Stack.cc
   msg/async/PosixStack.cc
   msg/async/net_handler.cc
+  msg/QueueStrategy.cc
   ${xio_common_srcs}
   ${async_rdma_common_srcs}
   ${dpdk_common_srcs}
diff --git a/src/msg/DispatchStrategy.h b/src/msg/DispatchStrategy.h
new file mode 100644 (file)
index 0000000..44d63d4
--- /dev/null
@@ -0,0 +1,37 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef DISPATCH_STRATEGY_H
+#define DISPATCH_STRATEGY_H
+
+#include "msg/Message.h"
+
+class Messenger;
+
+class DispatchStrategy
+{
+protected:
+  Messenger *msgr;
+public:
+  DispatchStrategy() {}
+  Messenger *get_messenger() { return msgr; }
+  void set_messenger(Messenger *_msgr) { msgr = _msgr; }
+  virtual void ds_dispatch(Message *m) = 0;
+  virtual void shutdown() = 0;
+  virtual void start() = 0;
+  virtual void wait() = 0;
+  virtual ~DispatchStrategy() {}
+};
+
+#endif /* DISPATCH_STRATEGY_H */
diff --git a/src/msg/FastStrategy.h b/src/msg/FastStrategy.h
new file mode 100644 (file)
index 0000000..001ff40
--- /dev/null
@@ -0,0 +1,35 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+
+#ifndef FAST_STRATEGY_H
+#define FAST_STRATEGY_H
+#include "DispatchStrategy.h"
+
+class FastStrategy : public DispatchStrategy {
+public:
+  FastStrategy() {}
+  void ds_dispatch(Message *m) override {
+    msgr->ms_fast_preprocess(m);
+    if (msgr->ms_can_fast_dispatch(m))
+      msgr->ms_fast_dispatch(m);
+    else
+      msgr->ms_deliver_dispatch(m);
+  }
+  void shutdown() override {}
+  void start() override {}
+  void wait() override {}
+  virtual ~FastStrategy() {}
+};
+#endif /* FAST_STRATEGY_H */
diff --git a/src/msg/QueueStrategy.cc b/src/msg/QueueStrategy.cc
new file mode 100644 (file)
index 0000000..0ce279b
--- /dev/null
@@ -0,0 +1,116 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+#include <string>
+#include "QueueStrategy.h"
+#define dout_subsys ceph_subsys_ms
+#include "common/debug.h"
+
+QueueStrategy::QueueStrategy(int _n_threads)
+  : lock("QueueStrategy::lock"),
+    n_threads(_n_threads),
+    stop(false),
+    mqueue(),
+    disp_threads()
+{
+}
+
+void QueueStrategy::ds_dispatch(Message *m) {
+  msgr->ms_fast_preprocess(m);
+  if (msgr->ms_can_fast_dispatch(m)) {
+    msgr->ms_fast_dispatch(m);
+    return;
+  }
+  lock.Lock();
+  mqueue.push_back(*m);
+  if (disp_threads.size()) {
+    if (! disp_threads.empty()) {
+      QSThread *thrd = &disp_threads.front();
+      disp_threads.pop_front();
+      thrd->cond.Signal();
+    }
+  }
+  lock.Unlock();
+}
+
+void QueueStrategy::entry(QSThread *thrd)
+{
+  Message *m = NULL;
+  for (;;) {
+    lock.Lock();
+    for (;;) {
+      if (! mqueue.empty()) {
+       m = &(mqueue.front());
+       mqueue.pop_front();
+       break;
+      }
+      m = NULL;
+      if (stop)
+       break;
+      disp_threads.push_front(*thrd);
+      thrd->cond.Wait(lock);
+    }
+    lock.Unlock();
+    if (stop) {
+       if (!m) break;
+       m->put();
+       continue;
+    }
+    get_messenger()->ms_deliver_dispatch(m);
+  }
+}
+
+void QueueStrategy::shutdown()
+{
+  QSThread *thrd;
+  lock.Lock();
+  stop = true;
+  while (disp_threads.size()) {
+    thrd = &(disp_threads.front());
+    disp_threads.pop_front();
+    thrd->cond.Signal();
+  }
+  lock.Unlock();
+}
+
+void QueueStrategy::wait()
+{
+  QSThread *thrd;
+  lock.Lock();
+  assert(stop);
+  while (disp_threads.size()) {
+    thrd = &(disp_threads.front());
+    disp_threads.pop_front();
+    lock.Unlock();
+
+    // join outside of lock
+    thrd->join();
+
+    lock.Lock();
+  }
+  lock.Unlock();
+}
+
+void QueueStrategy::start()
+{
+  QSThread *thrd;
+  assert(!stop);
+  lock.Lock();
+  for (int ix = 0; ix < n_threads; ++ix) {
+    string thread_name = "ms_xio_qs_";
+    thread_name.append(std::to_string(ix));
+    thrd = new QSThread(this);
+    thrd->create(thread_name.c_str());
+  }
+  lock.Unlock();
+}
diff --git a/src/msg/QueueStrategy.h b/src/msg/QueueStrategy.h
new file mode 100644 (file)
index 0000000..41f28bb
--- /dev/null
@@ -0,0 +1,61 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+
+#ifndef QUEUE_STRATEGY_H
+#define QUEUE_STRATEGY_H
+
+#include <boost/intrusive/list.hpp>
+#include "DispatchStrategy.h"
+#include "msg/Messenger.h"
+
+namespace bi = boost::intrusive;
+
+class QueueStrategy : public DispatchStrategy {
+  Mutex lock;
+  int n_threads;
+  bool stop;
+
+  Message::Queue mqueue;
+
+  class QSThread : public Thread {
+  public:
+    bi::list_member_hook<> thread_q;
+    QueueStrategy *dq;
+    Cond cond;
+    explicit QSThread(QueueStrategy *dq) : thread_q(), dq(dq), cond() {}
+    void* entry() {
+      dq->entry(this);
+      delete(this);
+      return NULL;
+    }
+
+    typedef bi::list< QSThread,
+                     bi::member_hook< QSThread,
+                                      bi::list_member_hook<>,
+                                      &QSThread::thread_q > > Queue;
+  };
+
+  QSThread::Queue disp_threads;
+
+public:
+  explicit QueueStrategy(int n_threads);
+  void ds_dispatch(Message *m) override;
+  void shutdown() override;
+  void start() override;
+  void wait() override;
+  void entry(QSThread *thrd);
+  virtual ~QueueStrategy() {}
+};
+#endif /* QUEUE_STRATEGY_H */
diff --git a/src/msg/xio/DispatchStrategy.h b/src/msg/xio/DispatchStrategy.h
deleted file mode 100644 (file)
index 44d63d4..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 CohortFS, LLC
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-#ifndef DISPATCH_STRATEGY_H
-#define DISPATCH_STRATEGY_H
-
-#include "msg/Message.h"
-
-class Messenger;
-
-class DispatchStrategy
-{
-protected:
-  Messenger *msgr;
-public:
-  DispatchStrategy() {}
-  Messenger *get_messenger() { return msgr; }
-  void set_messenger(Messenger *_msgr) { msgr = _msgr; }
-  virtual void ds_dispatch(Message *m) = 0;
-  virtual void shutdown() = 0;
-  virtual void start() = 0;
-  virtual void wait() = 0;
-  virtual ~DispatchStrategy() {}
-};
-
-#endif /* DISPATCH_STRATEGY_H */
diff --git a/src/msg/xio/FastStrategy.h b/src/msg/xio/FastStrategy.h
deleted file mode 100644 (file)
index 001ff40..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 CohortFS, LLC
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-
-#ifndef FAST_STRATEGY_H
-#define FAST_STRATEGY_H
-#include "DispatchStrategy.h"
-
-class FastStrategy : public DispatchStrategy {
-public:
-  FastStrategy() {}
-  void ds_dispatch(Message *m) override {
-    msgr->ms_fast_preprocess(m);
-    if (msgr->ms_can_fast_dispatch(m))
-      msgr->ms_fast_dispatch(m);
-    else
-      msgr->ms_deliver_dispatch(m);
-  }
-  void shutdown() override {}
-  void start() override {}
-  void wait() override {}
-  virtual ~FastStrategy() {}
-};
-#endif /* FAST_STRATEGY_H */
diff --git a/src/msg/xio/QueueStrategy.cc b/src/msg/xio/QueueStrategy.cc
deleted file mode 100644 (file)
index 0ce279b..0000000
+++ /dev/null
@@ -1,116 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 CohortFS, LLC
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-#include <string>
-#include "QueueStrategy.h"
-#define dout_subsys ceph_subsys_ms
-#include "common/debug.h"
-
-QueueStrategy::QueueStrategy(int _n_threads)
-  : lock("QueueStrategy::lock"),
-    n_threads(_n_threads),
-    stop(false),
-    mqueue(),
-    disp_threads()
-{
-}
-
-void QueueStrategy::ds_dispatch(Message *m) {
-  msgr->ms_fast_preprocess(m);
-  if (msgr->ms_can_fast_dispatch(m)) {
-    msgr->ms_fast_dispatch(m);
-    return;
-  }
-  lock.Lock();
-  mqueue.push_back(*m);
-  if (disp_threads.size()) {
-    if (! disp_threads.empty()) {
-      QSThread *thrd = &disp_threads.front();
-      disp_threads.pop_front();
-      thrd->cond.Signal();
-    }
-  }
-  lock.Unlock();
-}
-
-void QueueStrategy::entry(QSThread *thrd)
-{
-  Message *m = NULL;
-  for (;;) {
-    lock.Lock();
-    for (;;) {
-      if (! mqueue.empty()) {
-       m = &(mqueue.front());
-       mqueue.pop_front();
-       break;
-      }
-      m = NULL;
-      if (stop)
-       break;
-      disp_threads.push_front(*thrd);
-      thrd->cond.Wait(lock);
-    }
-    lock.Unlock();
-    if (stop) {
-       if (!m) break;
-       m->put();
-       continue;
-    }
-    get_messenger()->ms_deliver_dispatch(m);
-  }
-}
-
-void QueueStrategy::shutdown()
-{
-  QSThread *thrd;
-  lock.Lock();
-  stop = true;
-  while (disp_threads.size()) {
-    thrd = &(disp_threads.front());
-    disp_threads.pop_front();
-    thrd->cond.Signal();
-  }
-  lock.Unlock();
-}
-
-void QueueStrategy::wait()
-{
-  QSThread *thrd;
-  lock.Lock();
-  assert(stop);
-  while (disp_threads.size()) {
-    thrd = &(disp_threads.front());
-    disp_threads.pop_front();
-    lock.Unlock();
-
-    // join outside of lock
-    thrd->join();
-
-    lock.Lock();
-  }
-  lock.Unlock();
-}
-
-void QueueStrategy::start()
-{
-  QSThread *thrd;
-  assert(!stop);
-  lock.Lock();
-  for (int ix = 0; ix < n_threads; ++ix) {
-    string thread_name = "ms_xio_qs_";
-    thread_name.append(std::to_string(ix));
-    thrd = new QSThread(this);
-    thrd->create(thread_name.c_str());
-  }
-  lock.Unlock();
-}
diff --git a/src/msg/xio/QueueStrategy.h b/src/msg/xio/QueueStrategy.h
deleted file mode 100644 (file)
index 41f28bb..0000000
+++ /dev/null
@@ -1,61 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 CohortFS, LLC
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-
-#ifndef QUEUE_STRATEGY_H
-#define QUEUE_STRATEGY_H
-
-#include <boost/intrusive/list.hpp>
-#include "DispatchStrategy.h"
-#include "msg/Messenger.h"
-
-namespace bi = boost::intrusive;
-
-class QueueStrategy : public DispatchStrategy {
-  Mutex lock;
-  int n_threads;
-  bool stop;
-
-  Message::Queue mqueue;
-
-  class QSThread : public Thread {
-  public:
-    bi::list_member_hook<> thread_q;
-    QueueStrategy *dq;
-    Cond cond;
-    explicit QSThread(QueueStrategy *dq) : thread_q(), dq(dq), cond() {}
-    void* entry() {
-      dq->entry(this);
-      delete(this);
-      return NULL;
-    }
-
-    typedef bi::list< QSThread,
-                     bi::member_hook< QSThread,
-                                      bi::list_member_hook<>,
-                                      &QSThread::thread_q > > Queue;
-  };
-
-  QSThread::Queue disp_threads;
-
-public:
-  explicit QueueStrategy(int n_threads);
-  void ds_dispatch(Message *m) override;
-  void shutdown() override;
-  void start() override;
-  void wait() override;
-  void entry(QSThread *thrd);
-  virtual ~QueueStrategy() {}
-};
-#endif /* QUEUE_STRATEGY_H */
index 378a1563b056b6663ced4fd282100a79681a1aba..4da3ef05922542914b1d8042e511db0c99ac5fa2 100644 (file)
@@ -24,6 +24,7 @@ add_subdirectory(cls_lua)
 add_subdirectory(common)
 add_subdirectory(compressor)
 add_subdirectory(crush)
+add_subdirectory(direct_messenger)
 add_subdirectory(encoding)
 add_subdirectory(erasure-code)
 add_subdirectory(filestore)
diff --git a/src/test/direct_messenger/CMakeLists.txt b/src/test/direct_messenger/CMakeLists.txt
new file mode 100644 (file)
index 0000000..6776cae
--- /dev/null
@@ -0,0 +1,4 @@
+# unittest_direct_messenger
+add_executable(unittest_direct_messenger test_direct_messenger.cc DirectMessenger.cc)
+add_ceph_unittest(unittest_direct_messenger ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_direct_messenger)
+target_link_libraries(unittest_direct_messenger global)
diff --git a/src/test/direct_messenger/DirectMessenger.cc b/src/test/direct_messenger/DirectMessenger.cc
new file mode 100644 (file)
index 0000000..ea6439e
--- /dev/null
@@ -0,0 +1,252 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "DirectMessenger.h"
+#include "msg/DispatchStrategy.h"
+
+
+class DirectConnection : public Connection {
+  /// sent messages are dispatched here
+  DispatchStrategy *const dispatchers;
+
+  /// the connection that will be attached to outgoing messages, so that replies
+  /// can be dispatched back to the sender. the pointer is atomic for
+  /// thread-safety between mark_down() and send_message(). no reference is held
+  /// on this Connection to avoid cyclical refs. we don't need a reference
+  /// because its owning DirectMessenger will mark both connections down (and
+  /// clear this pointer) before dropping its own reference
+  std::atomic<Connection*> reply_connection{nullptr};
+
+ public:
+  DirectConnection(CephContext *cct, DirectMessenger *m,
+                   DispatchStrategy *dispatchers)
+    : Connection(cct, m),
+      dispatchers(dispatchers)
+  {}
+
+  /// sets the Connection that will receive replies to outgoing messages
+  void set_direct_reply_connection(ConnectionRef conn);
+
+  /// return true if a peer connection exists
+  bool is_connected() override;
+
+  /// pass the given message directly to our dispatchers
+  int send_message(Message *m) override;
+
+  /// release our pointer to the peer connection. later calls to is_connected()
+  /// will return false, and send_message() will fail with -ENOTCONN
+  void mark_down() override;
+
+  /// noop - keepalive messages are not needed within a process
+  void send_keepalive() override {}
+
+  /// noop - reconnect/recovery semantics are not needed within a process
+  void mark_disposable() override {}
+};
+
+void DirectConnection::set_direct_reply_connection(ConnectionRef conn)
+{
+  reply_connection.store(conn.get());
+}
+
+bool DirectConnection::is_connected()
+{
+  // true between calls to set_direct_reply_connection() and mark_down()
+  return reply_connection.load() != nullptr;
+}
+
+int DirectConnection::send_message(Message *m)
+{
+  // read reply_connection atomically and take a reference
+  ConnectionRef conn = reply_connection.load();
+  if (!conn) {
+    m->put();
+    return -ENOTCONN;
+  }
+  // attach reply_connection to the Message, so that calls to
+  // m->get_connection()->send_message() can be dispatched back to the sender
+  m->set_connection(conn);
+
+  dispatchers->ds_dispatch(m);
+  return 0;
+}
+
+void DirectConnection::mark_down()
+{
+  Connection *conn = reply_connection.load();
+  if (!conn) {
+    return; // already marked down
+  }
+  if (!reply_connection.compare_exchange_weak(conn, nullptr)) {
+    return; // lost the race to mark down
+  }
+  // called only once to avoid loops
+  conn->mark_down();
+}
+
+
+static ConnectionRef create_loopback(DirectMessenger *m,
+                                     entity_name_t name,
+                                     DispatchStrategy *dispatchers)
+{
+  auto loopback = boost::intrusive_ptr<DirectConnection>(
+      new DirectConnection(m->cct, m, dispatchers));
+  // loopback replies go to itself
+  loopback->set_direct_reply_connection(loopback);
+  loopback->set_peer_type(name.type());
+  loopback->set_features(CEPH_FEATURES_ALL);
+  return loopback;
+}
+
+DirectMessenger::DirectMessenger(CephContext *cct, entity_name_t name,
+                                 string mname, uint64_t nonce,
+                                 DispatchStrategy *dispatchers)
+  : SimplePolicyMessenger(cct, name, mname, nonce),
+    dispatchers(dispatchers),
+    loopback_connection(create_loopback(this, name, dispatchers))
+{
+  dispatchers->set_messenger(this);
+}
+
+DirectMessenger::~DirectMessenger()
+{
+}
+
+int DirectMessenger::set_direct_peer(DirectMessenger *peer)
+{
+  if (get_myinst() == peer->get_myinst()) {
+    return -EADDRINUSE; // must have a different entity instance
+  }
+  peer_inst = peer->get_myinst();
+
+  // allocate a Connection that dispatches to the peer messenger
+  auto direct_connection = boost::intrusive_ptr<DirectConnection>(
+      new DirectConnection(cct, peer, peer->dispatchers.get()));
+
+  direct_connection->set_peer_addr(peer_inst.addr);
+  direct_connection->set_peer_type(peer_inst.name.type());
+  direct_connection->set_features(CEPH_FEATURES_ALL);
+
+  // if set_direct_peer() was already called on the peer messenger, we can
+  // finish by attaching their connections. if not, the later call to
+  // peer->set_direct_peer() will attach their connection to ours
+  auto connection = peer->get_connection(get_myinst());
+  if (connection) {
+    auto p = static_cast<DirectConnection*>(connection.get());
+
+    p->set_direct_reply_connection(direct_connection);
+    direct_connection->set_direct_reply_connection(p);
+  }
+
+  peer_connection = std::move(direct_connection);
+  return 0;
+}
+
+int DirectMessenger::bind(const entity_addr_t &bind_addr)
+{
+  if (peer_connection) {
+    return -EINVAL; // can't change address after sharing it with the peer
+  }
+  set_myaddr(bind_addr);
+  loopback_connection->set_peer_addr(bind_addr);
+  return 0;
+}
+
+int DirectMessenger::client_bind(const entity_addr_t &bind_addr)
+{
+  // same as bind
+  return bind(bind_addr);
+}
+
+int DirectMessenger::start()
+{
+  if (!peer_connection) {
+    return -EINVAL; // did not connect to a peer
+  }
+  if (started) {
+    return -EINVAL; // already started
+  }
+
+  dispatchers->start();
+  return SimplePolicyMessenger::start();
+}
+
+int DirectMessenger::shutdown()
+{
+  if (!started) {
+    return -EINVAL; // not started
+  }
+
+  mark_down_all();
+  peer_connection.reset();
+  loopback_connection.reset();
+
+  dispatchers->shutdown();
+  SimplePolicyMessenger::shutdown();
+  sem.Put(); // signal wait()
+  return 0;
+}
+
+void DirectMessenger::wait()
+{
+  sem.Get(); // wait on signal from shutdown()
+  dispatchers->wait();
+}
+
+ConnectionRef DirectMessenger::get_connection(const entity_inst_t& dst)
+{
+  if (dst == peer_inst) {
+    return peer_connection;
+  }
+  if (dst == get_myinst()) {
+    return loopback_connection;
+  }
+  return nullptr;
+}
+
+ConnectionRef DirectMessenger::get_loopback_connection()
+{
+  return loopback_connection;
+}
+
+int DirectMessenger::send_message(Message *m, const entity_inst_t& dst)
+{
+  auto conn = get_connection(dst);
+  if (!conn) {
+    m->put();
+    return -ENOTCONN;
+  }
+  return conn->send_message(m);
+}
+
+void DirectMessenger::mark_down(const entity_addr_t& addr)
+{
+  ConnectionRef conn;
+  if (addr == peer_inst.addr) {
+    conn = peer_connection;
+  } else if (addr == get_myaddr()) {
+    conn = loopback_connection;
+  }
+  if (conn) {
+    conn->mark_down();
+  }
+}
+
+void DirectMessenger::mark_down_all()
+{
+  if (peer_connection) {
+    peer_connection->mark_down();
+  }
+  loopback_connection->mark_down();
+}
diff --git a/src/test/direct_messenger/DirectMessenger.h b/src/test/direct_messenger/DirectMessenger.h
new file mode 100644 (file)
index 0000000..dd9d39e
--- /dev/null
@@ -0,0 +1,97 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MSG_DIRECTMESSENGER_H
+#define CEPH_MSG_DIRECTMESSENGER_H
+
+#include "msg/SimplePolicyMessenger.h"
+#include "common/Semaphore.h"
+
+
+class DispatchStrategy;
+
+/**
+ * DirectMessenger provides a direct path between two messengers
+ * within a process. A pair of DirectMessengers share their
+ * DispatchStrategy with each other, and calls to send_message()
+ * forward the message directly to the other.
+ *
+ * This is for testing and i/o injection only, and cannot be used
+ * for normal messengers with ms_type.
+ */
+class DirectMessenger : public SimplePolicyMessenger {
+ private:
+  /// strategy for local dispatch
+  std::unique_ptr<DispatchStrategy> dispatchers;
+  /// peer instance for comparison in get_connection()
+  entity_inst_t peer_inst;
+  /// connection that sends to the peer's dispatchers
+  ConnectionRef peer_connection;
+  /// connection that sends to my own dispatchers
+  ConnectionRef loopback_connection;
+  /// semaphore for signalling wait() from shutdown()
+  Semaphore sem;
+
+ public:
+  DirectMessenger(CephContext *cct, entity_name_t name,
+                  string mname, uint64_t nonce,
+                  DispatchStrategy *dispatchers);
+  ~DirectMessenger();
+
+  /// attach to a peer messenger. must be called before start()
+  int set_direct_peer(DirectMessenger *peer);
+
+
+  // Messenger interface
+
+  /// sets the addr. must not be called after set_direct_peer() or start()
+  int bind(const entity_addr_t& bind_addr) override;
+
+  /// sets the addr. must not be called after set_direct_peer() or start()
+  int client_bind(const entity_addr_t& bind_addr) override;
+
+  /// starts dispatchers
+  int start() override;
+
+  /// breaks connections, stops dispatchers, and unblocks callers of wait()
+  int shutdown() override;
+
+  /// blocks until shutdown() completes
+  void wait() override;
+
+  /// returns a connection to the peer instance, a loopback connection to our
+  /// own instance, or null if not connected
+  ConnectionRef get_connection(const entity_inst_t& dst) override;
+
+  /// returns a loopback connection that dispatches to this messenger
+  ConnectionRef get_loopback_connection() override;
+
+  /// dispatches a message to the peer instance if connected
+  int send_message(Message *m, const entity_inst_t& dst) override;
+
+  /// mark down the connection for the given address
+  void mark_down(const entity_addr_t& a) override;
+
+  /// mark down all connections
+  void mark_down_all() override;
+
+
+  // unimplemented Messenger interface
+  void set_addr_unknowns(const entity_addr_t &addr) override {}
+  int get_dispatch_queue_len() override { return 0; }
+  double get_dispatch_queue_max_age(utime_t now) override { return 0; }
+  void set_cluster_protocol(int p) override {}
+};
+
+#endif
diff --git a/src/test/direct_messenger/test_direct_messenger.cc b/src/test/direct_messenger/test_direct_messenger.cc
new file mode 100644 (file)
index 0000000..0540baa
--- /dev/null
@@ -0,0 +1,437 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include <condition_variable>
+#include <mutex>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include "global/global_init.h"
+#include "common/ceph_argparse.h"
+
+#include "DirectMessenger.h"
+#include "msg/FastStrategy.h"
+#include "msg/QueueStrategy.h"
+#include "messages/MPing.h"
+
+
+/// mock dispatcher that calls the given callback
+class MockDispatcher : public Dispatcher {
+  std::function<void(Message*)> callback;
+ public:
+  MockDispatcher(CephContext *cct, std::function<void(Message*)> callback)
+    : Dispatcher(cct), callback(std::move(callback)) {}
+  bool ms_handle_reset(Connection *con) override { return false; }
+  void ms_handle_remote_reset(Connection *con) override {}
+  bool ms_handle_refused(Connection *con) override { return false; }
+  bool ms_dispatch(Message *m) override {
+    callback(m);
+    m->put();
+    return true;
+  }
+};
+
+/// test synchronous dispatch of messenger and connection interfaces
+TEST(DirectMessenger, SyncDispatch)
+{
+  auto cct = g_ceph_context;
+
+  // use FastStrategy for synchronous dispatch
+  DirectMessenger client(cct, entity_name_t::CLIENT(1),
+                         "client", 0, new FastStrategy());
+  DirectMessenger server(cct, entity_name_t::CLIENT(2),
+                         "server", 0, new FastStrategy());
+
+  ASSERT_EQ(0, client.set_direct_peer(&server));
+  ASSERT_EQ(0, server.set_direct_peer(&client));
+
+  bool got_request = false;
+  bool got_reply = false;
+
+  MockDispatcher client_dispatcher(cct, [&] (Message *m) {
+    got_reply = true;
+  });
+  client.add_dispatcher_head(&client_dispatcher);
+
+  MockDispatcher server_dispatcher(cct, [&] (Message *m) {
+    got_request = true;
+    ASSERT_EQ(0, m->get_connection()->send_message(new MPing()));
+  });
+  server.add_dispatcher_head(&server_dispatcher);
+
+  ASSERT_EQ(0, client.start());
+  ASSERT_EQ(0, server.start());
+
+  // test DirectMessenger::send_message()
+  ASSERT_EQ(0, client.send_message(new MPing(), server.get_myinst()));
+  ASSERT_TRUE(got_request);
+  ASSERT_TRUE(got_reply);
+
+  // test DirectConnection::send_message()
+  {
+    got_request = false;
+    got_reply = false;
+    auto conn = client.get_connection(server.get_myinst());
+    ASSERT_EQ(0, conn->send_message(new MPing()));
+    ASSERT_TRUE(got_request);
+    ASSERT_TRUE(got_reply);
+  }
+
+  // test DirectMessenger::send_message() with loopback address
+  got_request = false;
+  got_reply = false;
+  ASSERT_EQ(0, client.send_message(new MPing(), client.get_myinst()));
+  ASSERT_FALSE(got_request); // server should never see this
+  ASSERT_TRUE(got_reply);
+
+  // test DirectConnection::send_message() with loopback address
+  {
+    got_request = false;
+    got_reply = false;
+    auto conn = client.get_connection(client.get_myinst());
+    ASSERT_EQ(0, conn->send_message(new MPing()));
+    ASSERT_FALSE(got_request); // server should never see this
+    ASSERT_TRUE(got_reply);
+  }
+
+  // test DirectConnection::send_message() with loopback connection
+  {
+    got_request = false;
+    got_reply = false;
+    auto conn = client.get_loopback_connection();
+    ASSERT_EQ(0, conn->send_message(new MPing()));
+    ASSERT_FALSE(got_request); // server should never see this
+    ASSERT_TRUE(got_reply);
+  }
+
+  ASSERT_EQ(0, client.shutdown());
+  client.wait();
+
+  ASSERT_EQ(0, server.shutdown());
+  server.wait();
+}
+
+/// test asynchronous dispatch of messenger and connection interfaces
+TEST(DirectMessenger, AsyncDispatch)
+{
+  auto cct = g_ceph_context;
+
+  // use QueueStrategy for async replies
+  DirectMessenger client(cct, entity_name_t::CLIENT(1),
+                         "client", 0, new QueueStrategy(1));
+  DirectMessenger server(cct, entity_name_t::CLIENT(2),
+                         "server", 0, new FastStrategy());
+
+  ASSERT_EQ(0, client.set_direct_peer(&server));
+  ASSERT_EQ(0, server.set_direct_peer(&client));
+
+  // condition variable to wait on ping reply
+  std::mutex mutex;
+  std::condition_variable cond;
+  bool done = false;
+
+  auto wait_for_reply = [&] {
+    std::unique_lock<std::mutex> lock(mutex);
+    while (!done) {
+      cond.wait(lock);
+    }
+    done = false; // clear for reuse
+  };
+
+  // client dispatcher signals the condition variable on reply
+  MockDispatcher client_dispatcher(cct, [&] (Message *m) {
+    std::lock_guard<std::mutex> lock(mutex);
+    done = true;
+    cond.notify_one();
+  });
+  client.add_dispatcher_head(&client_dispatcher);
+
+  MockDispatcher server_dispatcher(cct, [&] (Message *m) {
+    // hold the lock over the call to send_message() to prove that the client's
+    // dispatch is asynchronous. if it isn't, it will deadlock
+    std::lock_guard<std::mutex> lock(mutex);
+    ASSERT_EQ(0, m->get_connection()->send_message(new MPing()));
+  });
+  server.add_dispatcher_head(&server_dispatcher);
+
+  ASSERT_EQ(0, client.start());
+  ASSERT_EQ(0, server.start());
+
+  // test DirectMessenger::send_message()
+  ASSERT_EQ(0, client.send_message(new MPing(), server.get_myinst()));
+  wait_for_reply();
+
+  // test DirectConnection::send_message()
+  {
+    auto conn = client.get_connection(server.get_myinst());
+    ASSERT_EQ(0, conn->send_message(new MPing()));
+  }
+  wait_for_reply();
+
+  // test DirectMessenger::send_message() with loopback address
+  {
+    // hold the lock to test that loopback dispatch is asynchronous
+    std::lock_guard<std::mutex> lock(mutex);
+    ASSERT_EQ(0, client.send_message(new MPing(), client.get_myinst()));
+  }
+  wait_for_reply();
+
+  // test DirectConnection::send_message() with loopback address
+  {
+    auto conn = client.get_connection(client.get_myinst());
+    // hold the lock to test that loopback dispatch is asynchronous
+    std::lock_guard<std::mutex> lock(mutex);
+    ASSERT_EQ(0, conn->send_message(new MPing()));
+  }
+  wait_for_reply();
+
+  // test DirectConnection::send_message() with loopback connection
+  {
+    auto conn = client.get_loopback_connection();
+    // hold the lock to test that loopback dispatch is asynchronous
+    std::lock_guard<std::mutex> lock(mutex);
+    ASSERT_EQ(0, conn->send_message(new MPing()));
+  }
+  wait_for_reply();
+
+  ASSERT_EQ(0, client.shutdown());
+  client.wait();
+
+  ASSERT_EQ(0, server.shutdown());
+  server.wait();
+}
+
+/// test that wait() blocks until shutdown()
+TEST(DirectMessenger, WaitShutdown)
+{
+  auto cct = g_ceph_context;
+
+  // test wait() with both Queue- and FastStrategy
+  DirectMessenger client(cct, entity_name_t::CLIENT(1),
+                         "client", 0, new QueueStrategy(1));
+  DirectMessenger server(cct, entity_name_t::CLIENT(2),
+                         "server", 0, new FastStrategy());
+
+  ASSERT_EQ(0, client.set_direct_peer(&server));
+  ASSERT_EQ(0, server.set_direct_peer(&client));
+
+  ASSERT_EQ(0, client.start());
+  ASSERT_EQ(0, server.start());
+
+  std::atomic<bool> client_waiting{false};
+  std::atomic<bool> server_waiting{false};
+
+  // spawn threads to wait() on each of the messengers
+  std::thread client_thread([&] {
+    client_waiting = true;
+    client.wait();
+    client_waiting = false;
+  });
+  std::thread server_thread([&] {
+    server_waiting = true;
+    server.wait();
+    server_waiting = false;
+  });
+
+  // give them time to start
+  std::this_thread::sleep_for(std::chrono::milliseconds(50));
+
+  ASSERT_TRUE(client_waiting);
+  ASSERT_TRUE(server_waiting);
+
+  // call shutdown to unblock the waiting threads
+  ASSERT_EQ(0, client.shutdown());
+  ASSERT_EQ(0, server.shutdown());
+
+  client_thread.join();
+  server_thread.join();
+
+  ASSERT_FALSE(client_waiting);
+  ASSERT_FALSE(server_waiting);
+}
+
+/// test connection and messenger interfaces after mark_down()
+TEST(DirectMessenger, MarkDown)
+{
+  auto cct = g_ceph_context;
+
+  DirectMessenger client(cct, entity_name_t::CLIENT(1),
+                         "client", 0, new FastStrategy());
+  DirectMessenger server(cct, entity_name_t::CLIENT(2),
+                         "server", 0, new FastStrategy());
+
+  ASSERT_EQ(0, client.set_direct_peer(&server));
+  ASSERT_EQ(0, server.set_direct_peer(&client));
+
+  ASSERT_EQ(0, client.start());
+  ASSERT_EQ(0, server.start());
+
+  auto client_to_server = client.get_connection(server.get_myinst());
+  auto server_to_client = server.get_connection(client.get_myinst());
+
+  ASSERT_TRUE(client_to_server->is_connected());
+  ASSERT_TRUE(server_to_client->is_connected());
+
+  // mark_down() breaks the connection on both sides
+  client_to_server->mark_down();
+
+  ASSERT_FALSE(client_to_server->is_connected());
+  ASSERT_EQ(-ENOTCONN, client_to_server->send_message(new MPing()));
+  ASSERT_EQ(-ENOTCONN, client.send_message(new MPing(), server.get_myinst()));
+
+  ASSERT_FALSE(server_to_client->is_connected());
+  ASSERT_EQ(-ENOTCONN, server_to_client->send_message(new MPing()));
+  ASSERT_EQ(-ENOTCONN, server.send_message(new MPing(), client.get_myinst()));
+
+  ASSERT_EQ(0, client.shutdown());
+  client.wait();
+
+  ASSERT_EQ(0, server.shutdown());
+  server.wait();
+}
+
+/// test connection and messenger interfaces after shutdown()
+TEST(DirectMessenger, SendShutdown)
+{
+  auto cct = g_ceph_context;
+
+  // put client on the heap so we can free it early
+  std::unique_ptr<DirectMessenger> client{
+    new DirectMessenger(cct, entity_name_t::CLIENT(1),
+                        "client", 0, new FastStrategy())};
+  DirectMessenger server(cct, entity_name_t::CLIENT(2),
+                         "server", 0, new FastStrategy());
+
+  ASSERT_EQ(0, client->set_direct_peer(&server));
+  ASSERT_EQ(0, server.set_direct_peer(client.get()));
+
+  ASSERT_EQ(0, client->start());
+  ASSERT_EQ(0, server.start());
+
+  const auto client_inst = client->get_myinst();
+  const auto server_inst = server.get_myinst();
+
+  auto client_to_server = client->get_connection(server_inst);
+  auto server_to_client = server.get_connection(client_inst);
+
+  ASSERT_TRUE(client_to_server->is_connected());
+  ASSERT_TRUE(server_to_client->is_connected());
+
+  // shut down the client to break connections
+  ASSERT_EQ(0, client->shutdown());
+  client->wait();
+
+  ASSERT_FALSE(client_to_server->is_connected());
+  ASSERT_EQ(-ENOTCONN, client_to_server->send_message(new MPing()));
+  ASSERT_EQ(-ENOTCONN, client->send_message(new MPing(), server_inst));
+
+  // free the client connection/messenger to test that calls to the server no
+  // longer try to dereference them
+  client_to_server.reset();
+  client.reset();
+
+  ASSERT_FALSE(server_to_client->is_connected());
+  ASSERT_EQ(-ENOTCONN, server_to_client->send_message(new MPing()));
+  ASSERT_EQ(-ENOTCONN, server.send_message(new MPing(), client_inst));
+
+  ASSERT_EQ(0, server.shutdown());
+  server.wait();
+}
+
+/// test connection and messenger interfaces after bind()
+TEST(DirectMessenger, Bind)
+{
+  auto cct = g_ceph_context;
+
+  DirectMessenger client(cct, entity_name_t::CLIENT(1),
+                         "client", 0, new FastStrategy());
+  DirectMessenger server(cct, entity_name_t::CLIENT(2),
+                         "server", 0, new FastStrategy());
+
+  entity_addr_t client_addr;
+  client_addr.set_family(AF_INET);
+  client_addr.set_port(1);
+
+  // client bind succeeds before set_direct_peer()
+  ASSERT_EQ(0, client.bind(client_addr));
+
+  ASSERT_EQ(0, client.set_direct_peer(&server));
+  ASSERT_EQ(0, server.set_direct_peer(&client));
+
+  // server bind fails after set_direct_peer()
+  entity_addr_t empty_addr;
+  ASSERT_EQ(-EINVAL, server.bind(empty_addr));
+
+  ASSERT_EQ(0, client.start());
+  ASSERT_EQ(0, server.start());
+
+  auto client_to_server = client.get_connection(server.get_myinst());
+  auto server_to_client = server.get_connection(client.get_myinst());
+
+  ASSERT_TRUE(client_to_server->is_connected());
+  ASSERT_TRUE(server_to_client->is_connected());
+
+  // no address in connection to server
+  ASSERT_EQ(empty_addr, client_to_server->get_peer_addr());
+  // bind address is reflected in connection to client
+  ASSERT_EQ(client_addr, server_to_client->get_peer_addr());
+
+  // mark_down() with bind address breaks the connection
+  server.mark_down(client_addr);
+
+  ASSERT_FALSE(client_to_server->is_connected());
+  ASSERT_FALSE(server_to_client->is_connected());
+
+  ASSERT_EQ(0, client.shutdown());
+  client.wait();
+
+  ASSERT_EQ(0, server.shutdown());
+  server.wait();
+}
+
+/// test connection and messenger interfaces before calls to set_direct_peer()
+TEST(DirectMessenger, StartWithoutPeer)
+{
+  auto cct = g_ceph_context;
+
+  DirectMessenger client(cct, entity_name_t::CLIENT(1),
+                         "client", 0, new FastStrategy());
+  DirectMessenger server(cct, entity_name_t::CLIENT(2),
+                         "server", 0, new FastStrategy());
+
+  // can't start until set_direct_peer()
+  ASSERT_EQ(-EINVAL, client.start());
+  ASSERT_EQ(-EINVAL, server.start());
+
+  ASSERT_EQ(0, client.set_direct_peer(&server));
+
+  // only client can start
+  ASSERT_EQ(0, client.start());
+  ASSERT_EQ(-EINVAL, server.start());
+
+  // client has a connection but can't send
+  auto conn = client.get_connection(server.get_myinst());
+  ASSERT_NE(nullptr, conn);
+  ASSERT_FALSE(conn->is_connected());
+  ASSERT_EQ(-ENOTCONN, conn->send_message(new MPing()));
+  ASSERT_EQ(-ENOTCONN, client.send_message(new MPing(), server.get_myinst()));
+
+  ASSERT_EQ(0, client.shutdown());
+  client.wait();
+}
+
+int main(int argc, char **argv)
+{
+  // command-line arguments
+  vector<const char*> args;
+  argv_to_vec(argc, (const char **)argv, args);
+  env_to_vec(args);
+
+  auto cct = global_init(nullptr, args, CEPH_ENTITY_TYPE_ANY,
+                         CODE_ENVIRONMENT_DAEMON, 0);
+  common_init_finish(cct.get());
+
+  ::testing::InitGoogleTest(&argc, argv);
+  return RUN_ALL_TESTS();
+}
index c1b7c3a3ae1550e3eae81a3eba7718a285d93a5b..ebdd00f2081637d4bf83635e76ed50b4cb6f6ebd 100644 (file)
@@ -41,4 +41,3 @@ if(HAVE_XIO)
     ${CMAKE_DL_LIBS}
     )
 endif(HAVE_XIO)
-