From 07feb134cd9a97f260bbb9c3c0578fee9a42d82b Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 14 Sep 2015 16:37:36 -0700 Subject: [PATCH] rgw: coroutine stack wait util Instead of using a separate coroutine for waiting on a condition. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_coroutine.cc | 61 ++++++++++++++++++++++++++++++++++++++++ src/rgw/rgw_coroutine.h | 35 +++++++++++++++++++++-- 2 files changed, 94 insertions(+), 2 deletions(-) diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index 06b20f8d90643..48a824ed73269 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -8,9 +8,26 @@ #define dout_subsys ceph_subsys_rgw +RGWCompletionManager::RGWCompletionManager(CephContext *_cct) : cct(_cct), lock("RGWCompletionManager::lock"), + timer(cct, lock) +{ + timer.init(); +} + +RGWCompletionManager::~RGWCompletionManager() +{ + Mutex::Locker l(lock); + timer.cancel_all_events(); +} + void RGWCompletionManager::complete(void *user_info) { Mutex::Locker l(lock); + _complete(user_info); +} + +void RGWCompletionManager::_complete(void *user_info) +{ complete_reqs.push_back(user_info); cond.Signal(); } @@ -47,6 +64,26 @@ void RGWCompletionManager::go_down() cond.Signal(); } +void RGWCompletionManager::wait_interval(void *opaque, utime_t& interval, void *user_info) +{ + Mutex::Locker l(lock); + assert(waiters.find(opaque) != waiters.end()); + waiters[opaque] = user_info; + timer.add_event_after(interval, new WaitContext(this, opaque)); +} + +void RGWCompletionManager::wakeup(void *opaque) +{ + Mutex::Locker l(lock); + map::iterator iter = waiters.find(opaque); + if (iter != waiters.end()) { + void *user_id = iter->second; + waiters.erase(iter); + _complete(user_id); + } +} + + void RGWCoroutine::set_io_blocked(bool flag) { stack->set_io_blocked(flag); } @@ -150,6 +187,20 @@ void RGWCoroutinesStack::spawn(RGWCoroutine *op, bool wait) spawn(NULL, op, wait); } +int RGWCoroutinesStack::wait(utime_t& interval) +{ + RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr(); + completion_mgr->wait_interval((void *)this, interval, (void *)this); + set_io_blocked(true); + return 0; +} + +void RGWCoroutinesStack::wakeup() +{ + RGWCompletionManager *completion_mgr = env->manager->get_completion_mgr(); + completion_mgr->wakeup((void *)this); +} + int RGWCoroutinesStack::unwind(int retcode) { rgw_spawned_stacks *src_spawned = &(*pos)->spawned; @@ -376,6 +427,16 @@ bool RGWCoroutine::collect(int *ret) /* returns true if needs to be called again return stack->collect(this, ret); } +int RGWCoroutine::wait(utime_t& interval) +{ + return stack->wait(interval); +} + +void RGWCoroutine::wakeup() +{ + stack->wakeup(); +} + int RGWSimpleCoroutine::operate() { int ret = 0; diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h index 5c4d8d8a43a26..e9130a1d03913 100644 --- a/src/rgw/rgw_coroutine.h +++ b/src/rgw/rgw_coroutine.h @@ -13,8 +13,10 @@ #pragma pop_macro("_ASSERT_H") #endif +#include "include/utime.h" #include "common/RefCountedObj.h" #include "common/debug.h" +#include "common/Timer.h" #include "rgw_common.h" @@ -24,21 +26,44 @@ class RGWCoroutinesStack; class RGWCoroutinesManager; class RGWCompletionManager { + CephContext *cct; list complete_reqs; Mutex lock; Cond cond; + SafeTimer timer; + atomic_t going_down; + map waiters; + + class WaitContext : public Context { + RGWCompletionManager *manager; + void *opaque; + public: + WaitContext(RGWCompletionManager *_cm, void *_opaque) : manager(_cm), opaque(_opaque) {} + void finish(int r) { + manager->wakeup(opaque); + } + }; + + void _complete(void *user_info); public: - RGWCompletionManager() : lock("RGWCompletionManager::lock") {} + RGWCompletionManager(CephContext *_cct); + ~RGWCompletionManager(); void complete(void *user_info); int get_next(void **user_info); bool try_get_next(void **user_info); void go_down(); + + /* + * wait for interval length to complete user_info + */ + void wait_interval(void *opaque, utime_t& interval, void *user_info); + void wakeup(void *opaque); }; /* a single use librados aio completion notifier that hooks into the RGWCompletionManager */ @@ -133,6 +158,9 @@ public: int call(RGWCoroutine *op); /* call at the same stack we're in */ void spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */ bool collect(int *ret); /* returns true if needs to be called again */ + + int wait(utime_t& interval); + void wakeup(); }; template @@ -248,6 +276,9 @@ public: void spawn(RGWCoroutine *next_op, bool wait); int unwind(int retcode); + int wait(utime_t& interval); + void wakeup(); + bool collect(int *ret); /* returns true if needs to be called again */ RGWAioCompletionNotifier *create_completion_notifier(); @@ -294,7 +325,7 @@ protected: void put_completion_notifier(RGWAioCompletionNotifier *cn); public: - RGWCoroutinesManager(CephContext *_cct) : cct(_cct), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {} + RGWCoroutinesManager(CephContext *_cct) : cct(_cct), completion_mgr(cct), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {} virtual ~RGWCoroutinesManager() {} int run(list& ops); -- 2.39.5