]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/Stack: add abstract Stack
authorHaomai Wang <haomai@xsky.com>
Mon, 4 Jul 2016 06:41:13 +0000 (14:41 +0800)
committerHaomai Wang <haomai@xsky.com>
Tue, 16 Aug 2016 15:16:07 +0000 (23:16 +0800)
Stack is a network IO framework which encapsulates all necessary basic network
interface, then it manages threads to work.

Different network backend like posix, dpdk even RDMA need to inherit Stack
class to implement necessary interfaces. So it will make ease for other
network backend to integrated into ceph. Otherwise, each backend need to
implement the whole Messenger logics like reconnect, policy handle, session
maintain...

Signed-off-by: Haomai Wang <haomai@xsky.com>
src/CMakeLists.txt
src/msg/Makefile.am
src/msg/async/Stack.cc [new file with mode: 0644]
src/msg/async/Stack.h [new file with mode: 0644]

index 5a7e6126b9fa5a274b50b6e37a5d6d7ffe472334..8ade253592b1942a7b7cbc2f57389f099ae91f30 100644 (file)
@@ -424,6 +424,7 @@ set(libcommon_files
   msg/async/Event.cc
   msg/async/EventEpoll.cc
   msg/async/EventSelect.cc
+  msg/async/Stack.cc
   msg/async/net_handler.cc
   ${xio_common_srcs}
   msg/msg_types.cc
index 3081566d80d6504570866d7e83dd7592da1d450b..0c6ad392c8aede23db726ec5e00b702f4f3f293b 100644 (file)
@@ -22,6 +22,7 @@ libmsg_la_SOURCES += \
        msg/async/AsyncMessenger.cc \
        msg/async/Event.cc \
        msg/async/net_handler.cc \
+       msg/async/Stack.cc \
        msg/async/EventSelect.cc
 
 if LINUX
@@ -47,6 +48,7 @@ noinst_HEADERS += \
        msg/async/Event.h \
        msg/async/EventEpoll.h \
        msg/async/EventSelect.h \
+       msg/async/Stack.h \
        msg/async/net_handler.h
 
 if LINUX
diff --git a/src/msg/async/Stack.cc b/src/msg/async/Stack.cc
new file mode 100644 (file)
index 0000000..9b27b0e
--- /dev/null
@@ -0,0 +1,162 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSky <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#include "common/Cond.h"
+#include "common/errno.h"
+
+#include "common/dout.h"
+#include "include/assert.h"
+
+#define dout_subsys ceph_subsys_ms
+#undef dout_prefix
+#define dout_prefix *_dout << "stack "
+
+void NetworkStack::add_thread(unsigned i)
+{
+  assert(threads.size() <= i);
+  Worker *w = workers[i];
+  threads.emplace_back(
+    [this, w]() {
+      const uint64_t InitEventNumber = 5000;
+      const uint64_t EventMaxWaitUs = 30000000;
+      w->center.init(InitEventNumber, w->id);
+      ldout(cct, 10) << __func__ << " starting" << dendl;
+      w->initialize();
+      w->init_done();
+      while (!w->done) {
+        ldout(cct, 30) << __func__ << " calling event process" << dendl;
+
+        int r = w->center.process_events(EventMaxWaitUs);
+        if (r < 0) {
+          ldout(cct, 20) << __func__ << " process events failed: "
+                         << cpp_strerror(errno) << dendl;
+          // TODO do something?
+        }
+      }
+      w->reset();
+    }
+  );
+}
+
+std::shared_ptr<NetworkStack> NetworkStack::create(CephContext *c, const string &t)
+{
+  return nullptr;
+}
+
+Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned i)
+{
+  return nullptr;
+}
+
+NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c)
+{
+  for (unsigned i = 0; i < cct->_conf->ms_async_max_op_threads; ++i) {
+    Worker *w = create_worker(cct, type, i);
+    workers.push_back(w);
+  }
+  num_workers = cct->_conf->ms_async_op_threads;
+}
+
+void NetworkStack::start()
+{
+  pool_spin.lock();
+  if (started) {
+    pool_spin.unlock();
+    return ;
+  }
+  for (unsigned i = 0; i < num_workers; ++i)
+    add_thread(i);
+  spawn_workers(threads);
+  started = true;
+  pool_spin.unlock();
+
+  for (unsigned i = 0; i < num_workers; ++i)
+    workers[i]->wait_for_init();
+}
+
+Worker* NetworkStack::get_worker()
+{
+  ldout(cct, 10) << __func__ << dendl;
+
+   // start with some reasonably large number
+  unsigned min_load = std::numeric_limits<int>::max();
+  Worker* current_best = nullptr;
+
+  pool_spin.lock();
+  // find worker with least references
+  // tempting case is returning on references == 0, but in reality
+  // this will happen so rarely that there's no need for special case.
+  for (unsigned i = 0; i < num_workers; ++i) {
+    unsigned worker_load = workers[i]->references.load();
+    if (worker_load < min_load) {
+      current_best = workers[i];
+      min_load = worker_load;
+    }
+  }
+
+  pool_spin.unlock();
+  assert(current_best);
+  ++current_best->references;
+  return current_best;
+}
+
+void NetworkStack::stop()
+{
+  Spinlock::Locker l(pool_spin);
+  for (unsigned i = 0; i < num_workers; ++i) {
+    workers[i]->done = true;
+    workers[i]->center.wakeup();
+  }
+  join_workers();
+  threads.clear();
+  started = false;
+}
+
+class C_drain : public EventCallback {
+  Mutex drain_lock;
+  Cond drain_cond;
+  std::atomic<unsigned> drain_count;
+
+ public:
+  explicit C_drain(size_t c)
+      : drain_lock("C_drain::drain_lock"),
+        drain_count(c) {}
+  void do_request(int id) {
+    Mutex::Locker l(drain_lock);
+    drain_count--;
+    drain_cond.Signal();
+  }
+  void wait() {
+    Mutex::Locker l(drain_lock);
+    while (drain_count.load())
+      drain_cond.Wait(drain_lock);
+  }
+};
+
+void NetworkStack::drain()
+{
+  ldout(cct, 10) << __func__ << " started." << dendl;
+  pthread_t cur = pthread_self();
+  pool_spin.lock();
+  C_drain drain(num_workers);
+  for (unsigned i = 0; i < num_workers; ++i) {
+    assert(cur != workers[i]->center.get_owner());
+    workers[i]->center.dispatch_event_external(EventCallbackRef(&drain));
+  }
+  pool_spin.unlock();
+  drain.wait();
+  ldout(cct, 10) << __func__ << " end." << dendl;
+}
diff --git a/src/msg/async/Stack.h b/src/msg/async/Stack.h
new file mode 100644 (file)
index 0000000..9996ed2
--- /dev/null
@@ -0,0 +1,327 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MSG_ASYNC_STACK_H
+#define CEPH_MSG_ASYNC_STACK_H
+
+#include "include/Spinlock.h"
+#include "common/perf_counters.h"
+#include "common/simple_spin.h"
+#include "msg/msg_types.h"
+#include "msg/async/Event.h"
+
+class ConnectedSocketImpl {
+ public:
+  virtual ~ConnectedSocketImpl() {}
+  virtual int is_connected() = 0;
+  virtual ssize_t read(char*, size_t) = 0;
+  virtual ssize_t zero_copy_read(bufferptr&) = 0;
+  virtual ssize_t send(bufferlist &bl, bool more) = 0;
+  virtual void shutdown() = 0;
+  virtual void close() = 0;
+  virtual int fd() const = 0;
+};
+
+class ConnectedSocket;
+struct SocketOptions {
+  bool nonblock = true;
+  bool nodelay = true;
+  int rcbuf_size = 0;
+};
+
+/// \cond internal
+class ServerSocketImpl {
+ public:
+  virtual ~ServerSocketImpl() {}
+  virtual int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out) = 0;
+  virtual void abort_accept() = 0;
+  /// Get file descriptor
+  virtual int fd() const = 0;
+};
+/// \endcond
+
+/// \addtogroup networking-module
+/// @{
+
+/// A TCP (or other stream-based protocol) connection.
+///
+/// A \c ConnectedSocket represents a full-duplex stream between
+/// two endpoints, a local endpoint and a remote endpoint.
+class ConnectedSocket {
+  std::unique_ptr<ConnectedSocketImpl> _csi;
+
+ public:
+  /// Constructs a \c ConnectedSocket not corresponding to a connection
+  ConnectedSocket() {};
+  /// \cond internal
+  explicit ConnectedSocket(std::unique_ptr<ConnectedSocketImpl> csi)
+      : _csi(std::move(csi)) {}
+  /// \endcond
+   ~ConnectedSocket() {
+    if (_csi)
+      _csi->close();
+  }
+  /// Moves a \c ConnectedSocket object.
+  ConnectedSocket(ConnectedSocket&& cs) = default;
+  /// Move-assigns a \c ConnectedSocket object.
+  ConnectedSocket& operator=(ConnectedSocket&& cs) = default;
+
+  int is_connected() {
+    return _csi->is_connected();
+  }
+  /// Read the input stream with copy.
+  ///
+  /// Copy an object returning data sent from the remote endpoint.
+  ssize_t read(char* buf, size_t len) {
+    return _csi->read(buf, len);
+  }
+  /// Gets the input stream.
+  ///
+  /// Gets an object returning data sent from the remote endpoint.
+  ssize_t zero_copy_read(bufferptr &data) {
+    return _csi->zero_copy_read(data);
+  }
+  /// Gets the output stream.
+  ///
+  /// Gets an object that sends data to the remote endpoint.
+  ssize_t send(bufferlist &bl, bool more) {
+    return _csi->send(bl, more);
+  }
+  /// Disables output to the socket.
+  ///
+  /// Current or future writes that have not been successfully flushed
+  /// will immediately fail with an error.  This is useful to abort
+  /// operations on a socket that is not making progress due to a
+  /// peer failure.
+  void shutdown() {
+    return _csi->shutdown();
+  }
+  /// Disables input from the socket.
+  ///
+  /// Current or future reads will immediately fail with an error.
+  /// This is useful to abort operations on a socket that is not making
+  /// progress due to a peer failure.
+  void close() {
+    _csi->close();
+    _csi.reset();
+  }
+
+  /// Get file descriptor
+  int fd() const {
+    return _csi->fd();
+  }
+
+  explicit operator bool() const {
+    return _csi.get();
+  }
+};
+/// @}
+
+/// \addtogroup networking-module
+/// @{
+
+/// A listening socket, waiting to accept incoming network connections.
+class ServerSocket {
+  std::unique_ptr<ServerSocketImpl> _ssi;
+ public:
+  /// Constructs a \c ServerSocket not corresponding to a connection
+  ServerSocket() {}
+  /// \cond internal
+  explicit ServerSocket(std::unique_ptr<ServerSocketImpl> ssi)
+      : _ssi(std::move(ssi)) {}
+  ~ServerSocket() {
+    if (_ssi)
+      _ssi->abort_accept();
+  }
+  /// \endcond
+  /// Moves a \c ServerSocket object.
+  ServerSocket(ServerSocket&& ss) = default;
+  /// Move-assigns a \c ServerSocket object.
+  ServerSocket& operator=(ServerSocket&& cs) = default;
+
+  /// Accepts the next connection to successfully connect to this socket.
+  ///
+  /// \Accepts a \ref ConnectedSocket representing the connection, and
+  ///          a \ref entity_addr_t describing the remote endpoint.
+  int accept(ConnectedSocket *sock, const SocketOptions &opt, entity_addr_t *out) {
+    return _ssi->accept(sock, opt, out);
+  }
+
+  /// Stops any \ref accept() in progress.
+  ///
+  /// Current and future \ref accept() calls will terminate immediately
+  /// with an error.
+  void abort_accept() {
+    _ssi->abort_accept();
+    _ssi.reset();
+  }
+
+  /// Get file descriptor
+  int fd() const {
+    return _ssi->fd();
+  }
+
+  explicit operator bool() const {
+    return _ssi.get();
+  }
+};
+/// @}
+
+class NetworkStack;
+
+enum {
+  l_msgr_first = 94000,
+  l_msgr_recv_messages,
+  l_msgr_send_messages,
+  l_msgr_send_messages_inline,
+  l_msgr_recv_bytes,
+  l_msgr_send_bytes,
+  l_msgr_created_connections,
+  l_msgr_active_connections,
+  l_msgr_last,
+};
+
+class Worker {
+  std::mutex init_lock;
+  std::condition_variable init_cond;
+  bool init = false;
+
+ public:
+  bool done = false;
+
+  CephContext *cct;
+  PerfCounters *perf_logger;
+  unsigned id;
+
+  std::atomic_uint references;
+  EventCenter center;
+
+  Worker(const Worker&) = delete;
+  Worker& operator=(const Worker&) = delete;
+
+  Worker(CephContext *c, unsigned i)
+    : cct(c), perf_logger(NULL), id(i), references(0), center(c) {
+    char name[128];
+    sprintf(name, "AsyncMessenger::Worker-%d", id);
+    // initialize perf_logger
+    PerfCountersBuilder plb(cct, name, l_msgr_first, l_msgr_last);
+
+    plb.add_u64_counter(l_msgr_recv_messages, "msgr_recv_messages", "Network received messages");
+    plb.add_u64_counter(l_msgr_send_messages, "msgr_send_messages", "Network sent messages");
+    plb.add_u64_counter(l_msgr_send_messages_inline, "msgr_send_messages_inline", "Network sent inline messages");
+    plb.add_u64_counter(l_msgr_recv_bytes, "msgr_recv_bytes", "Network received bytes");
+    plb.add_u64_counter(l_msgr_send_bytes, "msgr_send_bytes", "Network received bytes");
+    plb.add_u64_counter(l_msgr_active_connections, "msgr_active_connections", "Active connection number");
+    plb.add_u64_counter(l_msgr_created_connections, "msgr_created_connections", "Created connection number");
+
+    perf_logger = plb.create_perf_counters();
+    cct->get_perfcounters_collection()->add(perf_logger);
+  }
+  virtual ~Worker() {
+    if (perf_logger) {
+      cct->get_perfcounters_collection()->remove(perf_logger);
+      delete perf_logger;
+    }
+  }
+
+  virtual int listen(entity_addr_t &addr,
+                     const SocketOptions &opts, ServerSocket *) = 0;
+  virtual int connect(const entity_addr_t &addr,
+                      const SocketOptions &opts, ConnectedSocket *socket) = 0;
+
+  virtual void initialize() {}
+  PerfCounters *get_perf_counter() { return perf_logger; }
+  void release_worker() {
+    int oldref = references.fetch_sub(1);
+    assert(oldref > 0);
+  }
+  void init_done() {
+    init_lock.lock();
+    init = true;
+    init_cond.notify_all();
+    init_lock.unlock();
+  }
+  void wait_for_init() {
+    std::unique_lock<std::mutex> l(init_lock);
+    while (!init)
+      init_cond.wait(l);
+  }
+  void reset() {
+    init_lock.lock();
+    init = false;
+    init_cond.notify_all();
+    init_lock.unlock();
+    done = false;
+  }
+};
+
+class NetworkStack {
+  std::string type;
+  std::atomic_bool started;
+  unsigned num_workers = 0;
+  Spinlock pool_spin;
+
+  void add_thread(unsigned i);
+
+ protected:
+  CephContext *cct;
+  vector<Worker*> workers;
+  std::vector<std::function<void ()>> threads;
+  // Used to indicate whether thread started
+
+  explicit NetworkStack(CephContext *c, const string &t);
+ public:
+  virtual ~NetworkStack() {
+    for (auto &&w : workers)
+      delete w;
+  }
+
+  static std::shared_ptr<NetworkStack> create(
+          CephContext *c, const string &type);
+
+  static Worker* create_worker(
+          CephContext *c, const string &t, unsigned i);
+  // backend need to override this method if supports zero copy read
+  virtual bool support_zero_copy_read() const { return false; }
+  // backend need to override this method if backend doesn't support shared
+  // listen table.
+  // For example, posix backend has in kernel global listen table. If one
+  // thread bind a port, other threads also aware this.
+  // But for dpdk backend, we maintain listen table in each thread. So we
+  // need to let each thread do binding port.
+  virtual bool support_local_listen_table() const { return false; }
+
+  void start();
+  void stop();
+  virtual Worker *get_worker();
+  Worker *get_worker(unsigned i) {
+    return workers[i];
+  }
+  void drain();
+  unsigned get_num_worker() const {
+    return num_workers;
+  }
+
+  // direct is used in tests only
+  virtual void spawn_workers(std::vector<std::function<void ()>> &) = 0;
+  virtual void join_workers() = 0;
+
+ private:
+  NetworkStack(const NetworkStack &);
+  NetworkStack& operator=(const NetworkStack &);
+};
+
+#endif //CEPH_MSG_ASYNC_STACK_H