From 78192d3c49611c63ded36a514fa58a6816c4276c Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 14 Aug 2015 17:34:47 -0700 Subject: [PATCH] rgw: fix async sleep and wakeup logic lot's of cleanups and fixes Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_coroutine.cc | 62 +++++++++++++++++++++------------ src/rgw/rgw_coroutine.h | 74 ++++++++++++++++++++++++++++------------ src/rgw/rgw_sync.cc | 62 ++++++++++++++++++++++++++------- 3 files changed, 143 insertions(+), 55 deletions(-) diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index 9afaa5fcb55bf..afaa94877a9ee 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -9,9 +9,22 @@ +void RGWCoroutine::set_io_blocked(bool flag) { + stack->set_io_blocked(flag); +} + +void RGWCoroutine::set_sleeping(bool flag) { + stack->set_sleeping(flag); +} + +int RGWCoroutine::io_block(int ret) { + set_io_blocked(true); + return ret; +} + RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr), done_flag(false), error_flag(false), blocked_flag(false), - sleep_flag(false), + sleep_flag(false), is_scheduled(false), retcode(0), env(NULL) { @@ -32,8 +45,6 @@ int RGWCoroutinesStack::operate(RGWCoroutinesEnv *_env) } error_flag = op->is_error(); - blocked_flag = op->is_io_blocked(); - sleep_flag = op->is_sleeping(); if (op->is_done()) { int op_retcode = op->get_ret_status(); @@ -102,27 +113,26 @@ int RGWCoroutinesStack::unwind(int retcode) return 0; } -void RGWCoroutinesStack::set_io_blocked(bool flag) +bool RGWCoroutinesStack::collect(int *ret) /* returns true if needs to be called again */ { - blocked_flag = flag; - if (pos != ops.end()) { - (*pos)->set_io_blocked(flag); - } -} + *ret = 0; + list new_list; -int RGWCoroutinesStack::complete_spawned() -{ - int ret = 0; for (list::iterator iter = spawned_stacks.begin(); iter != spawned_stacks.end(); ++iter) { - int r = (*iter)->get_ret_status(); + RGWCoroutinesStack *stack = *iter; + if (!stack->is_done()) { + new_list.push_back(stack); + continue; + } + int r = stack->get_ret_status(); if (r < 0) { - ret = r; + *ret = r; } - (*iter)->put(); + stack->put(); } - spawned_stacks.clear(); - return ret; + spawned_stacks.swap(new_list); + return (!new_list.empty()); } static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg); @@ -188,7 +198,9 @@ int RGWCoroutinesManager::run(list& stacks) for (list::iterator iter = stacks.begin(); iter != stacks.end();) { RGWCoroutinesStack *stack = *iter; env.stack = stack; + int ret = stack->operate(&env); + stack->set_is_scheduled(false); if (ret < 0) { ldout(cct, 0) << "ERROR: stack->operate() returned ret=" << ret << dendl; } @@ -201,22 +213,26 @@ int RGWCoroutinesManager::run(list& stacks) /* do nothing, we'll re-add the stack when the blocking stack is done, * or when we're awaken */ + ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is_blocked_by_stack()=" << stack->is_blocked_by_stack() + << " is_sleeping=" << stack->is_sleeping() << dendl; } else if (stack->is_io_blocked()) { + ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is io blocked" << dendl; blocked_count++; } else if (stack->is_done()) { + ldout(cct, 20) << __func__ << ":" << " stack=" << (void *)stack << " is done" << dendl; RGWCoroutinesStack *s; while (stack->unblock_stack(&s)) { if (!s->is_blocked_by_stack() && !s->is_done()) { if (s->is_io_blocked()) { blocked_count++; } else { - stacks.push_back(s); + s->schedule(); } } } stack->put(); } else { - stacks.push_back(stack); + stack->schedule(); } RGWCoroutinesStack *blocked_stack; @@ -258,7 +274,7 @@ int RGWCoroutinesManager::run(RGWCoroutine *op) return r; } - stacks.push_back(stack); + stack->schedule(&stacks); r = run(stacks); if (r < 0) { @@ -289,14 +305,18 @@ void RGWCoroutine::spawn(RGWCoroutine *op, bool wait) int RGWSimpleCoroutine::operate() { + int ret = 0; reenter(this) { yield return state_init(); yield return state_send_request(); yield return state_request_complete(); yield return state_all_complete(); + while (stack->collect(&ret)) { + yield; + } } - return set_state(RGWCoroutine_Done); + return set_state(RGWCoroutine_Done, ret); } int RGWSimpleCoroutine::state_init() diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h index 43dc1b22bb291..7c57146fc6c63 100644 --- a/src/rgw/rgw_coroutine.h +++ b/src/rgw/rgw_coroutine.h @@ -67,8 +67,6 @@ protected: CephContext *cct; RGWCoroutinesStack *stack; - bool io_blocked; /* wait for an actual io to complete, will need manager to wait on event */ - bool sleep; /* was set to sleep manually will be awaken manually, e.g., in producer consumer scenario */ int retcode; int state; @@ -78,18 +76,15 @@ protected: state = s; return ret; } - void set_io_blocked(bool flag) { io_blocked = flag; } - void set_sleeping(bool flag) { sleep = flag; } - int io_block(int ret) { - set_io_blocked(true); - return ret; - } + void set_io_blocked(bool flag); + void set_sleeping(bool flag); + int io_block(int ret); void call(RGWCoroutine *op); /* call at the same stack we're in */ void spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */ public: - RGWCoroutine(CephContext *_cct) : cct(_cct), stack(NULL), io_blocked(false), sleep(false), retcode(0), state(RGWCoroutine_Run) {} + RGWCoroutine(CephContext *_cct) : cct(_cct), stack(NULL), retcode(0), state(RGWCoroutine_Run) {} virtual ~RGWCoroutine() {} virtual int operate() = 0; @@ -102,9 +97,6 @@ public: return error_stream.str(); } - bool is_io_blocked() { return io_blocked; } - bool is_sleeping() { return sleep; } - void set_retcode(int r) { retcode = r; } @@ -122,7 +114,7 @@ public: RGWConsumerCR(CephContext *_cct) : RGWCoroutine(_cct) {} bool has_product() { - return product.empty(); + return !product.empty(); } void wait_for_product() { @@ -140,12 +132,8 @@ public: return true; } - void receive(const T& p, bool wakeup = true) { - product.push_back(p); - if (wakeup) { - set_sleeping(false); - } - } + void receive(const T& p, bool wakeup = true); + void receive(list& l, bool wakeup = true); }; class RGWCoroutinesStack : public RefCountedObject { @@ -168,6 +156,8 @@ class RGWCoroutinesStack : public RefCountedObject { bool blocked_flag; bool sleep_flag; + bool is_scheduled; + int retcode; protected: @@ -187,26 +177,47 @@ public: bool is_blocked_by_stack() { return !blocked_by_stack.empty(); } + void set_io_blocked(bool flag) { + blocked_flag = flag; + } bool is_io_blocked() { return blocked_flag || is_blocked_by_stack(); } + void set_sleeping(bool flag) { + bool wakeup = sleep_flag & !flag; + sleep_flag = flag; + if (wakeup) { + schedule(); + } + } bool is_sleeping() { return sleep_flag; } + void set_is_scheduled(bool flag) { + is_scheduled = flag; + } + + void schedule(list *stacks = NULL) { + if (!stacks) { + stacks = env->stacks; + } + if (!is_scheduled) { + stacks->push_back(this); + is_scheduled = true; + } + } int get_ret_status() { return retcode; } - void set_io_blocked(bool flag); - string error_str(); int call(RGWCoroutine *next_op, int ret = 0); void spawn(RGWCoroutine *next_op, bool wait); int unwind(int retcode); - int complete_spawned(); + bool collect(int *ret); /* returns true if needs to be called again */ RGWAioCompletionNotifier *create_completion_notifier(); RGWCompletionManager *get_completion_mgr(); @@ -221,6 +232,25 @@ public: RGWCoroutinesEnv *get_env() { return env; } }; +template +void RGWConsumerCR::receive(list& l, bool wakeup) +{ + product.splice(product.end(), l); + if (wakeup) { + set_sleeping(false); + } +} + + +template +void RGWConsumerCR::receive(const T& p, bool wakeup) +{ + product.push_back(p); + if (wakeup) { + set_sleeping(false); + } +} + class RGWCoroutinesManager { CephContext *cct; diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index aa3f08c55d5e8..b8d2e65e13531 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -782,13 +782,14 @@ public: call(new RGWSimpleRadosUnlockCR(async_rados, store, store->get_zone_params().log_pool, mdlog_sync_status_oid, lock_name, cookie)); } - yield { - int ret = stack->complete_spawned(); + int ret; + while (stack->collect(&ret)) { if (ret < 0) { return set_state(RGWCoroutine_Error); } - return set_state(RGWCoroutine_Done); + yield; } + return set_state(RGWCoroutine_Done); } return 0; } @@ -813,7 +814,6 @@ public: sync_status(_status) {} int handle_data(rgw_meta_sync_info& data); - int finish(); }; int RGWReadSyncStatusCoroutine::handle_data(rgw_meta_sync_info& data) @@ -830,11 +830,6 @@ int RGWReadSyncStatusCoroutine::handle_data(rgw_meta_sync_info& data) return 0; } -int RGWReadSyncStatusCoroutine::finish() -{ - return stack->complete_spawned(); -} - class RGWOmapAppend : public RGWConsumerCR { RGWAsyncRadosProcessor *async_rados; RGWRados *store; @@ -843,17 +838,25 @@ class RGWOmapAppend : public RGWConsumerCR { rgw_bucket pool; string oid; + bool going_down; + + int num_pending_entries; + list pending_entries; + map entries; public: RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutinesEnv *_env, rgw_bucket& _pool, const string& _oid) : RGWConsumerCR(_store->ctx()), async_rados(_async_rados), - store(_store), env(_env), pool(_pool), oid(_oid) {} + store(_store), env(_env), pool(_pool), oid(_oid), going_down(false), num_pending_entries(0) {} #define OMAP_APPEND_MAX_ENTRIES 100 int operate() { reenter(this) { for (;;) { + if (!has_product() && going_down) { + break; + } yield wait_for_product(); yield { string entry; @@ -863,16 +866,38 @@ public: break; } } - call(new RGWRadosSetOmapKeysCR(async_rados, store, pool, oid, entries)); + if (entries.size() >= OMAP_APPEND_MAX_ENTRIES || going_down) { + call(new RGWRadosSetOmapKeysCR(async_rados, store, pool, oid, entries)); + entries.clear(); + } } if (get_ret_status() < 0) { return set_state(RGWCoroutine_Error); } } + /* done with coroutine */ return set_state(RGWCoroutine_Done); } return 0; } + + void flush_pending() { + receive(pending_entries); + num_pending_entries = 0; + } + + void append(const string& s) { + pending_entries.push_back(s); + if (++num_pending_entries >= OMAP_APPEND_MAX_ENTRIES) { + flush_pending(); + } + } + + void finish() { + going_down = true; + flush_pending(); + set_sleeping(false); + } }; class RGWShardedOmapCRManager { @@ -898,7 +923,12 @@ public: } void append(const string& entry) { static int counter = 0; - shards[++counter % shards.size()]->receive(entry); + shards[++counter % shards.size()]->append(entry); + } + void finish() { + for (vector::iterator iter = shards.begin(); iter != shards.end(); ++iter) { + (*iter)->finish(); + } } }; @@ -961,6 +991,14 @@ public: } } } + yield entries_index->finish(); + int ret; + while (stack->collect(&ret)) { + if (ret < 0) { + return set_state(RGWCoroutine_Error); + } + yield; + } yield return set_state(RGWCoroutine_Done); } return 0; -- 2.39.5