]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
msg/async/rdma: Extract sockets stuff from RDMAStack.h 14179/head
authorAmir Vadai <amir@vadai.me>
Wed, 22 Mar 2017 07:03:31 +0000 (09:03 +0200)
committerAdir Lev <adirl@mellanox.com>
Tue, 28 Mar 2017 06:52:07 +0000 (09:52 +0300)
This is a preparation commit, in order to make review easier. In this
commit I move code from RDMAStack.h into the new file
RDMAConnectedSocketImpl.h - without changing the code.

In the next commit, the actual logic changes will be done and socket
classes will be split into a base RDMAConnected classes and child
classes with TCP connection establishment specific code.

Issue: 995322
Change-Id: I639fda490a6fbd02addb95d3158c5ac1e7390ef0
Signed-off-by: Amir Vadai <amir@vadai.me>
src/msg/async/rdma/RDMAConnectedSocketImpl.h [new file with mode: 0644]
src/msg/async/rdma/RDMAStack.h

diff --git a/src/msg/async/rdma/RDMAConnectedSocketImpl.h b/src/msg/async/rdma/RDMAConnectedSocketImpl.h
new file mode 100644 (file)
index 0000000..e9b1ef9
--- /dev/null
@@ -0,0 +1,126 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2016 XSKY <haomai@xsky.com>
+ *
+ * Author: Haomai Wang <haomaiwang@gmail.com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef CEPH_MSG_RDMA_CONNECTED_SOCKET_IMPL_H
+#define CEPH_MSG_RDMA_CONNECTED_SOCKET_IMPL_H
+
+#include "common/ceph_context.h"
+#include "common/debug.h"
+#include "common/errno.h"
+#include "msg/async/Stack.h"
+#include "Infiniband.h"
+
+class RDMAWorker;
+class RDMADispatcher;
+
+class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
+ public:
+  typedef Infiniband::MemoryManager::Chunk Chunk;
+  typedef Infiniband::CompletionChannel CompletionChannel;
+  typedef Infiniband::CompletionQueue CompletionQueue;
+
+ private:
+  CephContext *cct;
+  Infiniband::QueuePair *qp;
+  Device *ibdev;
+  int ibport;
+  IBSYNMsg peer_msg;
+  IBSYNMsg my_msg;
+  int connected;
+  int error;
+  Infiniband* infiniband;
+  RDMADispatcher* dispatcher;
+  RDMAWorker* worker;
+  std::vector<Chunk*> buffers;
+  int notify_fd = -1;
+  bufferlist pending_bl;
+
+  Mutex lock;
+  std::vector<ibv_wc> wc;
+  bool is_server;
+  EventCallbackRef con_handler;
+  int tcp_fd = -1;
+  bool active;// qp is active ?
+
+  void notify();
+  ssize_t read_buffers(char* buf, size_t len);
+  int post_work_request(std::vector<Chunk*>&);
+
+ public:
+  RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
+                          RDMAWorker *w);
+  virtual ~RDMAConnectedSocketImpl();
+
+  Device *get_device() { return ibdev; }
+
+  void pass_wc(std::vector<ibv_wc> &&v);
+  void get_wc(std::vector<ibv_wc> &w);
+  virtual int is_connected() override { return connected; }
+
+  virtual ssize_t read(char* buf, size_t len) override;
+  virtual ssize_t zero_copy_read(bufferptr &data) override;
+  virtual ssize_t send(bufferlist &bl, bool more) override;
+  virtual void shutdown() override;
+  virtual void close() override;
+  virtual int fd() const override { return notify_fd; }
+  void fault();
+  const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); }
+  ssize_t submit(bool more);
+  int activate();
+  void fin();
+  void handle_connection();
+  void cleanup();
+  void set_accept_fd(int sd);
+  int try_connect(const entity_addr_t&, const SocketOptions &opt);
+
+  class C_handle_connection : public EventCallback {
+    RDMAConnectedSocketImpl *csi;
+    bool active;
+   public:
+    C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {}
+    void do_request(int fd) {
+      if (active)
+        csi->handle_connection();
+    }
+    void close() {
+      active = false;
+    }
+  };
+};
+
+class RDMAServerSocketImpl : public ServerSocketImpl {
+  CephContext *cct;
+  Device *ibdev;
+  int ibport;
+  NetHandler net;
+  int server_setup_socket;
+  Infiniband* infiniband;
+  RDMADispatcher *dispatcher;
+  RDMAWorker *worker;
+  entity_addr_t sa;
+
+ public:
+  RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
+
+  int listen(entity_addr_t &sa, const SocketOptions &opt);
+  virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
+  virtual void abort_accept() override;
+  virtual int fd() const override { return server_setup_socket; }
+  int get_fd() { return server_setup_socket; }
+};
+
+#endif
+
index d5bd0ac3b5e50d5e8f009445dd110dc97c2e9d87..21355daab3863b989323546af343406595d28d0c 100644 (file)
@@ -28,6 +28,7 @@
 #include "common/errno.h"
 #include "msg/async/Stack.h"
 #include "Infiniband.h"
+#include "RDMAConnectedSocketImpl.h"
 
 class RDMAConnectedSocketImpl;
 class RDMAServerSocketImpl;
@@ -190,101 +191,6 @@ class RDMAWorker : public Worker {
   }
 };
 
-class RDMAConnectedSocketImpl : public ConnectedSocketImpl {
- public:
-  typedef Infiniband::MemoryManager::Chunk Chunk;
-  typedef Infiniband::CompletionChannel CompletionChannel;
-  typedef Infiniband::CompletionQueue CompletionQueue;
-
- private:
-  CephContext *cct;
-  Infiniband::QueuePair *qp;
-  Device *ibdev;
-  int ibport;
-  IBSYNMsg peer_msg;
-  IBSYNMsg my_msg;
-  int connected;
-  int error;
-  Infiniband* infiniband;
-  RDMADispatcher* dispatcher;
-  RDMAWorker* worker;
-  std::vector<Chunk*> buffers;
-  int notify_fd = -1;
-  bufferlist pending_bl;
-
-  Mutex lock;
-  std::vector<ibv_wc> wc;
-  bool is_server;
-  EventCallbackRef con_handler;
-  int tcp_fd = -1;
-  bool active;// qp is active ?
-
-  void notify();
-  ssize_t read_buffers(char* buf, size_t len);
-  int post_work_request(std::vector<Chunk*>&);
-
- public:
-  RDMAConnectedSocketImpl(CephContext *cct, Infiniband* ib, RDMADispatcher* s,
-                          RDMAWorker *w);
-  virtual ~RDMAConnectedSocketImpl();
-
-  Device *get_device() { return ibdev; }
-
-  void pass_wc(std::vector<ibv_wc> &&v);
-  void get_wc(std::vector<ibv_wc> &w);
-  virtual int is_connected() override { return connected; }
-
-  virtual ssize_t read(char* buf, size_t len) override;
-  virtual ssize_t zero_copy_read(bufferptr &data) override;
-  virtual ssize_t send(bufferlist &bl, bool more) override;
-  virtual void shutdown() override;
-  virtual void close() override;
-  virtual int fd() const override { return notify_fd; }
-  void fault();
-  const char* get_qp_state() { return Infiniband::qp_state_string(qp->get_state()); }
-  ssize_t submit(bool more);
-  int activate();
-  void fin();
-  void handle_connection();
-  void cleanup();
-  void set_accept_fd(int sd);
-  int try_connect(const entity_addr_t&, const SocketOptions &opt);
-
-  class C_handle_connection : public EventCallback {
-    RDMAConnectedSocketImpl *csi;
-    bool active;
-   public:
-    C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {}
-    void do_request(int fd) {
-      if (active)
-        csi->handle_connection();
-    }
-    void close() {
-      active = false;
-    }
-  };
-};
-
-class RDMAServerSocketImpl : public ServerSocketImpl {
-  CephContext *cct;
-  Device *ibdev;
-  int ibport;
-  NetHandler net;
-  int server_setup_socket;
-  Infiniband* infiniband;
-  RDMADispatcher *dispatcher;
-  RDMAWorker *worker;
-  entity_addr_t sa;
-
- public:
-  RDMAServerSocketImpl(CephContext *cct, Infiniband* i, RDMADispatcher *s, RDMAWorker *w, entity_addr_t& a);
-
-  int listen(entity_addr_t &sa, const SocketOptions &opt);
-  virtual int accept(ConnectedSocket *s, const SocketOptions &opts, entity_addr_t *out, Worker *w) override;
-  virtual void abort_accept() override;
-  virtual int fd() const override { return server_setup_socket; }
-  int get_fd() { return server_setup_socket; }
-};
 
 class RDMAStack : public NetworkStack {
   vector<std::thread> threads;
@@ -301,5 +207,4 @@ class RDMAStack : public NetworkStack {
   virtual void join_worker(unsigned i) override;
   RDMADispatcher *get_dispatcher() { return dispatcher; }
 };
-
 #endif