]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
msg: move DispatchStrategy into test/
authorhaoyixing <haoyixing@kuaishou.com>
Sun, 18 Oct 2020 01:23:48 +0000 (09:23 +0800)
committerhaoyixing <haoyixing@kuaishou.com>
Fri, 23 Oct 2020 03:27:02 +0000 (11:27 +0800)
Since commit cc9a914, xio related code were all removed, which made
direct_messenger become the only user of DispatchStraegy. So move
these code there.

Signed-off-by: haoyixing <haoyixing@kuaishou.com>
12 files changed:
src/msg/CMakeLists.txt
src/msg/DispatchStrategy.h [deleted file]
src/msg/FastStrategy.h [deleted file]
src/msg/QueueStrategy.cc [deleted file]
src/msg/QueueStrategy.h [deleted file]
src/test/direct_messenger/CMakeLists.txt
src/test/direct_messenger/DirectMessenger.cc
src/test/direct_messenger/DispatchStrategy.h [new file with mode: 0644]
src/test/direct_messenger/FastStrategy.h [new file with mode: 0644]
src/test/direct_messenger/QueueStrategy.cc [new file with mode: 0644]
src/test/direct_messenger/QueueStrategy.h [new file with mode: 0644]
src/test/direct_messenger/test_direct_messenger.cc

index fada39b457f4f80a0ca4cca4ab0c3f4eff98a4de..e6d0b589b42997c4def507855fd23a0ecb10249b 100644 (file)
@@ -2,7 +2,6 @@ set(msg_srcs
   DispatchQueue.cc
   Message.cc
   Messenger.cc
-  QueueStrategy.cc
   Connection.cc
   msg_types.cc)
 
diff --git a/src/msg/DispatchStrategy.h b/src/msg/DispatchStrategy.h
deleted file mode 100644 (file)
index 4c9726e..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 CohortFS, LLC
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-#ifndef DISPATCH_STRATEGY_H
-#define DISPATCH_STRATEGY_H
-
-#include "msg/Message.h"
-
-class Messenger;
-
-class DispatchStrategy
-{
-protected:
-  Messenger *msgr = nullptr;
-public:
-  DispatchStrategy() {}
-  Messenger *get_messenger() { return msgr; }
-  void set_messenger(Messenger *_msgr) { msgr = _msgr; }
-  virtual void ds_dispatch(Message *m) = 0;
-  virtual void shutdown() = 0;
-  virtual void start() = 0;
-  virtual void wait() = 0;
-  virtual ~DispatchStrategy() {}
-};
-
-#endif /* DISPATCH_STRATEGY_H */
diff --git a/src/msg/FastStrategy.h b/src/msg/FastStrategy.h
deleted file mode 100644 (file)
index 001ff40..0000000
+++ /dev/null
@@ -1,35 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 CohortFS, LLC
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-
-#ifndef FAST_STRATEGY_H
-#define FAST_STRATEGY_H
-#include "DispatchStrategy.h"
-
-class FastStrategy : public DispatchStrategy {
-public:
-  FastStrategy() {}
-  void ds_dispatch(Message *m) override {
-    msgr->ms_fast_preprocess(m);
-    if (msgr->ms_can_fast_dispatch(m))
-      msgr->ms_fast_dispatch(m);
-    else
-      msgr->ms_deliver_dispatch(m);
-  }
-  void shutdown() override {}
-  void start() override {}
-  void wait() override {}
-  virtual ~FastStrategy() {}
-};
-#endif /* FAST_STRATEGY_H */
diff --git a/src/msg/QueueStrategy.cc b/src/msg/QueueStrategy.cc
deleted file mode 100644 (file)
index 342494c..0000000
+++ /dev/null
@@ -1,107 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 CohortFS, LLC
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-#include <string>
-#include "QueueStrategy.h"
-#define dout_subsys ceph_subsys_ms
-#include "common/debug.h"
-
-QueueStrategy::QueueStrategy(int _n_threads)
-  : n_threads(_n_threads),
-    stop(false),
-    mqueue(),
-    disp_threads()
-{
-}
-
-void QueueStrategy::ds_dispatch(Message *m) {
-  msgr->ms_fast_preprocess(m);
-  if (msgr->ms_can_fast_dispatch(m)) {
-    msgr->ms_fast_dispatch(m);
-    return;
-  }
-  std::lock_guard l{lock};
-  mqueue.push_back(*m);
-  if (disp_threads.size()) {
-    if (! disp_threads.empty()) {
-      QSThread *thrd = &disp_threads.front();
-      disp_threads.pop_front();
-      thrd->cond.notify_all();
-    }
-  }
-}
-
-void QueueStrategy::entry(QSThread *thrd)
-{
-  for (;;) {
-    ceph::ref_t<Message> m;
-    std::unique_lock l{lock};
-    for (;;) {
-      if (! mqueue.empty()) {
-       m = ceph::ref_t<Message>(&mqueue.front(), false);
-       mqueue.pop_front();
-       break;
-      }
-      if (stop)
-       break;
-      disp_threads.push_front(*thrd);
-      thrd->cond.wait(l);
-    }
-    l.unlock();
-    if (stop) {
-       if (!m) break;
-       continue;
-    }
-    get_messenger()->ms_deliver_dispatch(m);
-  }
-}
-
-void QueueStrategy::shutdown()
-{
-  QSThread *thrd;
-  std::lock_guard l{lock};
-  stop = true;
-  while (disp_threads.size()) {
-    thrd = &(disp_threads.front());
-    disp_threads.pop_front();
-    thrd->cond.notify_all();
-  }
-}
-
-void QueueStrategy::wait()
-{
-  std::unique_lock l{lock};
-  ceph_assert(stop);
-  for (auto& thread : threads) {
-    l.unlock();
-
-    // join outside of lock
-    thread->join();
-
-    l.lock();
-  }
-}
-
-void QueueStrategy::start()
-{
-  ceph_assert(!stop);
-  std::lock_guard l{lock};
-  threads.reserve(n_threads);
-  for (int ix = 0; ix < n_threads; ++ix) {
-    std::string thread_name = "ms_qs_";
-    thread_name.append(std::to_string(ix));
-    auto thrd = std::make_unique<QSThread>(this);
-    thrd->create(thread_name.c_str());
-    threads.emplace_back(std::move(thrd));
-  }
-}
diff --git a/src/msg/QueueStrategy.h b/src/msg/QueueStrategy.h
deleted file mode 100644 (file)
index b7f6df8..0000000
+++ /dev/null
@@ -1,63 +0,0 @@
-// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
-/*
- * Ceph - scalable distributed file system
- *
- * Copyright (C) 2014 CohortFS, LLC
- *
- * This is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License version 2.1, as published by the Free Software
- * Foundation.  See file COPYING.
- *
- */
-
-
-#ifndef QUEUE_STRATEGY_H
-#define QUEUE_STRATEGY_H
-
-#include <vector>
-#include <memory>
-#include <boost/intrusive/list.hpp>
-#include "DispatchStrategy.h"
-#include "msg/Messenger.h"
-
-namespace bi = boost::intrusive;
-
-class QueueStrategy : public DispatchStrategy {
-  ceph::mutex lock = ceph::make_mutex("QueueStrategy::lock");
-  const int n_threads;
-  bool stop;
-
-  Message::Queue mqueue;
-
-  class QSThread : public Thread {
-  public:
-    bi::list_member_hook<> thread_q;
-    QueueStrategy *dq;
-    ceph::condition_variable cond;
-    explicit QSThread(QueueStrategy *dq) : thread_q(), dq(dq) {}
-    void* entry() {
-      dq->entry(this);
-      return NULL;
-    }
-
-    typedef bi::list< QSThread,
-                     bi::member_hook< QSThread,
-                                      bi::list_member_hook<>,
-                                      &QSThread::thread_q > > Queue;
-  };
-
-  std::vector<std::unique_ptr<QSThread>> threads; //< all threads
-  QSThread::Queue disp_threads; //< waiting threads
-
-public:
-  explicit QueueStrategy(int n_threads);
-  void ds_dispatch(Message *m) override;
-  void shutdown() override;
-  void start() override;
-  void wait() override;
-  void entry(QSThread *thrd);
-  virtual ~QueueStrategy() {}
-};
-#endif /* QUEUE_STRATEGY_H */
index 2e7f2780002171a7ccbf71befddb01d6c021a27f..6ca56946eb410502e9e1038501b5a19b03f52fb6 100644 (file)
@@ -1,4 +1,5 @@
 # unittest_direct_messenger
-#add_executable(unittest_direct_messenger test_direct_messenger.cc DirectMessenger.cc)
+#add_library(QueueStrategy OBJECT QueueStrategy.cc)
+#add_executable(unittest_direct_messenger $<TARGET_OBJECTS:QueueStrategy> test_direct_messenger.cc DirectMessenger.cc)
 #add_ceph_unittest(unittest_direct_messenger)
 #target_link_libraries(unittest_direct_messenger global)
index c554a3794029376bbc37f828bcd14789ab0b7c70..3aeff4feee6f52236a9fd7509a3e1e41715c3dd2 100644 (file)
@@ -13,7 +13,7 @@
  */
 
 #include "DirectMessenger.h"
-#include "msg/DispatchStrategy.h"
+#include "DispatchStrategy.h"
 
 
 class DirectConnection : public Connection {
diff --git a/src/test/direct_messenger/DispatchStrategy.h b/src/test/direct_messenger/DispatchStrategy.h
new file mode 100644 (file)
index 0000000..4c9726e
--- /dev/null
@@ -0,0 +1,37 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef DISPATCH_STRATEGY_H
+#define DISPATCH_STRATEGY_H
+
+#include "msg/Message.h"
+
+class Messenger;
+
+class DispatchStrategy
+{
+protected:
+  Messenger *msgr = nullptr;
+public:
+  DispatchStrategy() {}
+  Messenger *get_messenger() { return msgr; }
+  void set_messenger(Messenger *_msgr) { msgr = _msgr; }
+  virtual void ds_dispatch(Message *m) = 0;
+  virtual void shutdown() = 0;
+  virtual void start() = 0;
+  virtual void wait() = 0;
+  virtual ~DispatchStrategy() {}
+};
+
+#endif /* DISPATCH_STRATEGY_H */
diff --git a/src/test/direct_messenger/FastStrategy.h b/src/test/direct_messenger/FastStrategy.h
new file mode 100644 (file)
index 0000000..001ff40
--- /dev/null
@@ -0,0 +1,35 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+
+#ifndef FAST_STRATEGY_H
+#define FAST_STRATEGY_H
+#include "DispatchStrategy.h"
+
+class FastStrategy : public DispatchStrategy {
+public:
+  FastStrategy() {}
+  void ds_dispatch(Message *m) override {
+    msgr->ms_fast_preprocess(m);
+    if (msgr->ms_can_fast_dispatch(m))
+      msgr->ms_fast_dispatch(m);
+    else
+      msgr->ms_deliver_dispatch(m);
+  }
+  void shutdown() override {}
+  void start() override {}
+  void wait() override {}
+  virtual ~FastStrategy() {}
+};
+#endif /* FAST_STRATEGY_H */
diff --git a/src/test/direct_messenger/QueueStrategy.cc b/src/test/direct_messenger/QueueStrategy.cc
new file mode 100644 (file)
index 0000000..342494c
--- /dev/null
@@ -0,0 +1,107 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+#include <string>
+#include "QueueStrategy.h"
+#define dout_subsys ceph_subsys_ms
+#include "common/debug.h"
+
+QueueStrategy::QueueStrategy(int _n_threads)
+  : n_threads(_n_threads),
+    stop(false),
+    mqueue(),
+    disp_threads()
+{
+}
+
+void QueueStrategy::ds_dispatch(Message *m) {
+  msgr->ms_fast_preprocess(m);
+  if (msgr->ms_can_fast_dispatch(m)) {
+    msgr->ms_fast_dispatch(m);
+    return;
+  }
+  std::lock_guard l{lock};
+  mqueue.push_back(*m);
+  if (disp_threads.size()) {
+    if (! disp_threads.empty()) {
+      QSThread *thrd = &disp_threads.front();
+      disp_threads.pop_front();
+      thrd->cond.notify_all();
+    }
+  }
+}
+
+void QueueStrategy::entry(QSThread *thrd)
+{
+  for (;;) {
+    ceph::ref_t<Message> m;
+    std::unique_lock l{lock};
+    for (;;) {
+      if (! mqueue.empty()) {
+       m = ceph::ref_t<Message>(&mqueue.front(), false);
+       mqueue.pop_front();
+       break;
+      }
+      if (stop)
+       break;
+      disp_threads.push_front(*thrd);
+      thrd->cond.wait(l);
+    }
+    l.unlock();
+    if (stop) {
+       if (!m) break;
+       continue;
+    }
+    get_messenger()->ms_deliver_dispatch(m);
+  }
+}
+
+void QueueStrategy::shutdown()
+{
+  QSThread *thrd;
+  std::lock_guard l{lock};
+  stop = true;
+  while (disp_threads.size()) {
+    thrd = &(disp_threads.front());
+    disp_threads.pop_front();
+    thrd->cond.notify_all();
+  }
+}
+
+void QueueStrategy::wait()
+{
+  std::unique_lock l{lock};
+  ceph_assert(stop);
+  for (auto& thread : threads) {
+    l.unlock();
+
+    // join outside of lock
+    thread->join();
+
+    l.lock();
+  }
+}
+
+void QueueStrategy::start()
+{
+  ceph_assert(!stop);
+  std::lock_guard l{lock};
+  threads.reserve(n_threads);
+  for (int ix = 0; ix < n_threads; ++ix) {
+    std::string thread_name = "ms_qs_";
+    thread_name.append(std::to_string(ix));
+    auto thrd = std::make_unique<QSThread>(this);
+    thrd->create(thread_name.c_str());
+    threads.emplace_back(std::move(thrd));
+  }
+}
diff --git a/src/test/direct_messenger/QueueStrategy.h b/src/test/direct_messenger/QueueStrategy.h
new file mode 100644 (file)
index 0000000..b7f6df8
--- /dev/null
@@ -0,0 +1,63 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2014 CohortFS, LLC
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+
+#ifndef QUEUE_STRATEGY_H
+#define QUEUE_STRATEGY_H
+
+#include <vector>
+#include <memory>
+#include <boost/intrusive/list.hpp>
+#include "DispatchStrategy.h"
+#include "msg/Messenger.h"
+
+namespace bi = boost::intrusive;
+
+class QueueStrategy : public DispatchStrategy {
+  ceph::mutex lock = ceph::make_mutex("QueueStrategy::lock");
+  const int n_threads;
+  bool stop;
+
+  Message::Queue mqueue;
+
+  class QSThread : public Thread {
+  public:
+    bi::list_member_hook<> thread_q;
+    QueueStrategy *dq;
+    ceph::condition_variable cond;
+    explicit QSThread(QueueStrategy *dq) : thread_q(), dq(dq) {}
+    void* entry() {
+      dq->entry(this);
+      return NULL;
+    }
+
+    typedef bi::list< QSThread,
+                     bi::member_hook< QSThread,
+                                      bi::list_member_hook<>,
+                                      &QSThread::thread_q > > Queue;
+  };
+
+  std::vector<std::unique_ptr<QSThread>> threads; //< all threads
+  QSThread::Queue disp_threads; //< waiting threads
+
+public:
+  explicit QueueStrategy(int n_threads);
+  void ds_dispatch(Message *m) override;
+  void shutdown() override;
+  void start() override;
+  void wait() override;
+  void entry(QSThread *thrd);
+  virtual ~QueueStrategy() {}
+};
+#endif /* QUEUE_STRATEGY_H */
index 2789e42277806fef79e015ba52fe27cb16fecdab..77093f13e47d0291a0a891fc563764ad9ee4c9a4 100644 (file)
@@ -11,8 +11,8 @@
 #include "common/ceph_argparse.h"
 
 #include "DirectMessenger.h"
-#include "msg/FastStrategy.h"
-#include "msg/QueueStrategy.h"
+#include "FastStrategy.h"
+#include "QueueStrategy.h"
 #include "messages/MPing.h"