#include <stdint.h>
#include <arpa/inet.h>
#include "include/Context.h"
+#include "include/atomic.h"
#include "global/global_init.h"
#include "common/ceph_argparse.h"
#include "msg/async/Event.h"
center.delete_file_event(*it, EVENT_READABLE);
}
+
+class Worker : public Thread {
+ CephContext *cct;
+ bool done;
+
+ public:
+ EventCenter center;
+ Worker(CephContext *c): cct(c), done(false), center(c) {
+ center.init(100);
+ }
+ void stop() {
+ done = true;
+ center.wakeup();
+ }
+ void* entry() {
+ center.set_owner(pthread_self());
+ while (!done)
+ center.process_events(1000000);
+ return 0;
+ }
+};
+
+class CountEvent: public EventCallback {
+ atomic_t *count;
+ Mutex *lock;
+ Cond *cond;
+
+ public:
+ CountEvent(atomic_t *atomic, Mutex *l, Cond *c): count(atomic), lock(l), cond(c) {}
+ void do_request(int id) {
+ lock->Lock();
+ count->dec();
+ cond->Signal();
+ lock->Unlock();
+ }
+};
+
+TEST(EventCenterTest, DispatchTest) {
+ Worker worker1(g_ceph_context), worker2(g_ceph_context);
+ atomic_t count(0);
+ Mutex lock("DispatchTest::lock");
+ Cond cond;
+ worker1.create();
+ worker2.create();
+ for (int i = 0; i < 10000; ++i) {
+ count.inc();
+ worker1.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond)));
+ count.inc();
+ worker2.center.dispatch_event_external(EventCallbackRef(new CountEvent(&count, &lock, &cond)));
+ Mutex::Locker l(lock);
+ while (count.read())
+ cond.Wait(lock);
+ }
+ worker1.stop();
+ worker2.stop();
+}
+
INSTANTIATE_TEST_CASE_P(
AsyncMessenger,
EventDriverTest,