From 647888bb65b04028a590d8eb4e1c33405fb3f109 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 14 Aug 2015 12:33:11 -0700 Subject: [PATCH] rgw: coroutines producer consumers Create a new producer-consumer(s) abstraction for the coroutines infrastructure. Move environment info from the op into the stack. Stack can be sent to sleep and awaken explicitly. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_coroutine.cc | 125 ++++++++++++++++++++++----------------- src/rgw/rgw_coroutine.h | 97 +++++++++++++++++++++++------- src/rgw/rgw_sync.cc | 92 ++++++++++++---------------- 3 files changed, 185 insertions(+), 129 deletions(-) diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index 22c66cf8e3f8b..9afaa5fcb55bf 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -10,28 +10,39 @@ RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr), - done_flag(false), error_flag(false), blocked_flag(false) { + done_flag(false), error_flag(false), blocked_flag(false), + sleep_flag(false), + retcode(0), + env(NULL) +{ if (start) { ops.push_back(start); } pos = ops.begin(); } -int RGWCoroutinesStack::operate(RGWCoroutinesEnv *env) +int RGWCoroutinesStack::operate(RGWCoroutinesEnv *_env) { + env = _env; RGWCoroutine *op = *pos; - int r = op->do_operate(env); + op->stack = this; + int r = op->operate(); if (r < 0) { ldout(cct, 0) << "ERROR: op->operate() returned r=" << r << dendl; } error_flag = op->is_error(); - blocked_flag = op->is_blocked(); + blocked_flag = op->is_io_blocked(); + sleep_flag = op->is_sleeping(); if (op->is_done()) { + int op_retcode = op->get_ret_status(); op->put(); r = unwind(r); done_flag = (pos == ops.end()); + if (done_flag) { + retcode = op_retcode; + } return r; } @@ -59,6 +70,24 @@ int RGWCoroutinesStack::call(RGWCoroutine *next_op, int ret) { return ret; } +void RGWCoroutinesStack::spawn(RGWCoroutine *op, bool wait) +{ + op->get(); + + RGWCoroutinesStack *stack = env->manager->allocate_stack(); + spawned_stacks.push_back(stack); + + stack->get(); /* we'll need to collect the stack */ + int r = stack->call(op, 0); + assert(r == 0); + + env->stacks->push_back(stack); + + if (wait) { + set_blocked_by(stack); + } +} + int RGWCoroutinesStack::unwind(int retcode) { if (pos == ops.begin()) { @@ -73,12 +102,27 @@ int RGWCoroutinesStack::unwind(int retcode) return 0; } -void RGWCoroutinesStack::set_blocked(bool flag) +void RGWCoroutinesStack::set_io_blocked(bool flag) { blocked_flag = flag; if (pos != ops.end()) { - (*pos)->set_blocked(flag); + (*pos)->set_io_blocked(flag); + } +} + +int RGWCoroutinesStack::complete_spawned() +{ + int ret = 0; + for (list::iterator iter = spawned_stacks.begin(); iter != spawned_stacks.end(); ++iter) { + int r = (*iter)->get_ret_status(); + if (r < 0) { + ret = r; + } + + (*iter)->put(); } + spawned_stacks.clear(); + return ret; } static void _aio_completion_notifier_cb(librados::completion_t cb, void *arg); @@ -122,20 +166,20 @@ void RGWCoroutinesManager::report_error(RGWCoroutinesStack *op) lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl; } -void RGWCoroutinesManager::handle_unblocked_stack(list& stacks, RGWCoroutinesStack *stack, int *waiting_count) +void RGWCoroutinesManager::handle_unblocked_stack(list& stacks, RGWCoroutinesStack *stack, int *blocked_count) { - --(*waiting_count); - stack->set_blocked(false); + --(*blocked_count); + stack->set_io_blocked(false); if (!stack->is_done()) { stacks.push_back(stack); } else { - delete stack; + stack->put(); } } int RGWCoroutinesManager::run(list& stacks) { - int waiting_count = 0; + int blocked_count = 0; RGWCoroutinesEnv env; env.manager = this; @@ -153,47 +197,49 @@ int RGWCoroutinesManager::run(list& stacks) report_error(stack); } - if (stack->is_blocked_by_stack()) { - /* do nothing, we'll re-add the stack when the blocking stack is done */ - } else if (stack->is_blocked()) { - waiting_count++; + if (stack->is_blocked_by_stack() || stack->is_sleeping()) { + /* do nothing, we'll re-add the stack when the blocking stack is done, + * or when we're awaken + */ + } else if (stack->is_io_blocked()) { + blocked_count++; } else if (stack->is_done()) { RGWCoroutinesStack *s; while (stack->unblock_stack(&s)) { if (!s->is_blocked_by_stack() && !s->is_done()) { - if (s->is_blocked()) { - waiting_count++; + if (s->is_io_blocked()) { + blocked_count++; } else { stacks.push_back(s); } } } - delete stack; + stack->put(); } else { stacks.push_back(stack); } RGWCoroutinesStack *blocked_stack; while (completion_mgr.try_get_next((void **)&blocked_stack)) { - handle_unblocked_stack(stacks, blocked_stack, &waiting_count); + handle_unblocked_stack(stacks, blocked_stack, &blocked_count); } - if (waiting_count >= ops_window) { + if (blocked_count >= ops_window) { int ret = completion_mgr.get_next((void **)&blocked_stack); if (ret < 0) { ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl; } - handle_unblocked_stack(stacks, blocked_stack, &waiting_count); + handle_unblocked_stack(stacks, blocked_stack, &blocked_count); } ++iter; stacks.pop_front(); - while (iter == stacks.end() && waiting_count > 0) { + while (iter == stacks.end() && blocked_count > 0) { int ret = completion_mgr.get_next((void **)&blocked_stack); if (ret < 0) { ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl; } - handle_unblocked_stack(stacks, blocked_stack, &waiting_count); + handle_unblocked_stack(stacks, blocked_stack, &blocked_count); iter = stacks.begin(); } } @@ -232,40 +278,13 @@ RGWAioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCo void RGWCoroutine::call(RGWCoroutine *op) { - int r = env->stack->call(op, 0); - assert(r == 0); -} - -void RGWCoroutine::spawn(RGWCoroutine *op, bool wait) -{ - op->get(); - spawned_ops.push_back(op); - - RGWCoroutinesStack *stack = env->manager->allocate_stack(); - int r = stack->call(op, 0); assert(r == 0); - - env->stacks->push_back(stack); - - if (wait) { - env->stack->set_blocked_by(stack); - } } -int RGWCoroutine::complete_spawned() +void RGWCoroutine::spawn(RGWCoroutine *op, bool wait) { - int ret = 0; - for (list::iterator iter = spawned_ops.begin(); iter != spawned_ops.end(); ++iter) { - int r = (*iter)->get_ret_status(); - if (r < 0) { - ret = r; - } - - (*iter)->put(); - } - spawned_ops.clear(); - return ret; + stack->spawn(op, wait); } int RGWSimpleCoroutine::operate() @@ -295,7 +314,7 @@ int RGWSimpleCoroutine::state_send_request() if (ret < 0) { return set_state(RGWCoroutine_Error, ret); } - return block(0); + return io_block(0); } int RGWSimpleCoroutine::state_request_complete() diff --git a/src/rgw/rgw_coroutine.h b/src/rgw/rgw_coroutine.h index a91fae9f203e4..43dc1b22bb291 100644 --- a/src/rgw/rgw_coroutine.h +++ b/src/rgw/rgw_coroutine.h @@ -66,37 +66,30 @@ class RGWCoroutine : public RefCountedObject, public boost::asio::coroutine { protected: CephContext *cct; - RGWCoroutinesEnv *env; - bool blocked; + RGWCoroutinesStack *stack; + bool io_blocked; /* wait for an actual io to complete, will need manager to wait on event */ + bool sleep; /* was set to sleep manually will be awaken manually, e.g., in producer consumer scenario */ int retcode; int state; stringstream error_stream; - list spawned_ops; - int set_state(int s, int ret = 0) { state = s; return ret; } - void set_blocked(bool flag) { blocked = flag; } - int block(int ret) { - set_blocked(true); + void set_io_blocked(bool flag) { io_blocked = flag; } + void set_sleeping(bool flag) { sleep = flag; } + int io_block(int ret) { + set_io_blocked(true); return ret; } - int do_operate(RGWCoroutinesEnv *_env) { - env = _env; - return operate(); - } - - void call(RGWCoroutine *op); - void spawn(RGWCoroutine *op, bool wait); - - int complete_spawned(); + void call(RGWCoroutine *op); /* call at the same stack we're in */ + void spawn(RGWCoroutine *op, bool wait); /* execute on a different stack */ public: - RGWCoroutine(CephContext *_cct) : cct(_cct), env(NULL), blocked(false), retcode(0), state(RGWCoroutine_Run) {} + RGWCoroutine(CephContext *_cct) : cct(_cct), stack(NULL), io_blocked(false), sleep(false), retcode(0), state(RGWCoroutine_Run) {} virtual ~RGWCoroutine() {} virtual int operate() = 0; @@ -109,7 +102,8 @@ public: return error_stream.str(); } - bool is_blocked() { return blocked; } + bool is_io_blocked() { return io_blocked; } + bool is_sleeping() { return sleep; } void set_retcode(int r) { retcode = r; @@ -120,7 +114,43 @@ public: } }; -class RGWCoroutinesStack { +template +class RGWConsumerCR : public RGWCoroutine { + list product; + +public: + RGWConsumerCR(CephContext *_cct) : RGWCoroutine(_cct) {} + + bool has_product() { + return product.empty(); + } + + void wait_for_product() { + if (!has_product()) { + set_sleeping(true); + } + } + + bool consume(T *p) { + if (product.empty()) { + return false; + } + *p = product.front(); + product.pop_front(); + return true; + } + + void receive(const T& p, bool wakeup = true) { + product.push_back(p); + if (wakeup) { + set_sleeping(false); + } + } +}; + +class RGWCoroutinesStack : public RefCountedObject { + friend class RGWCoroutine; + CephContext *cct; RGWCoroutinesManager *ops_mgr; @@ -128,13 +158,20 @@ class RGWCoroutinesStack { list ops; list::iterator pos; + list spawned_stacks; + set blocked_by_stack; set blocking_stacks; - bool done_flag; bool error_flag; bool blocked_flag; + bool sleep_flag; + + int retcode; + +protected: + RGWCoroutinesEnv *env; public: RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL); @@ -150,17 +187,27 @@ public: bool is_blocked_by_stack() { return !blocked_by_stack.empty(); } - bool is_blocked() { + bool is_io_blocked() { return blocked_flag || is_blocked_by_stack(); } + bool is_sleeping() { + return sleep_flag; + } + + int get_ret_status() { + return retcode; + } - void set_blocked(bool flag); + void set_io_blocked(bool flag); string error_str(); int call(RGWCoroutine *next_op, int ret = 0); + void spawn(RGWCoroutine *next_op, bool wait); int unwind(int retcode); + int complete_spawned(); + RGWAioCompletionNotifier *create_completion_notifier(); RGWCompletionManager *get_completion_mgr(); @@ -170,6 +217,8 @@ public: } bool unblock_stack(RGWCoroutinesStack **s); + + RGWCoroutinesEnv *get_env() { return env; } }; class RGWCoroutinesManager { @@ -195,7 +244,9 @@ public: RGWCompletionManager *get_completion_mgr() { return &completion_mgr; } RGWCoroutinesStack *allocate_stack() { - return new RGWCoroutinesStack(cct, this); + RGWCoroutinesStack *stack = new RGWCoroutinesStack(cct, this); + stack->get(); + return stack; } }; diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index a082cc3e55389..aa3f08c55d5e8 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -500,7 +500,7 @@ template int RGWSimpleRadosReadCR::send_request() { rgw_obj obj = rgw_obj(pool, oid); - req = new RGWAsyncGetSystemObj(env->stack->create_completion_notifier(), + req = new RGWAsyncGetSystemObj(stack->create_completion_notifier(), store, &obj_ctx, NULL, obj, &bl, 0, -1); @@ -558,7 +558,7 @@ public: int send_request() { rgw_obj obj = rgw_obj(pool, oid); - req = new RGWAsyncPutSystemObj(env->stack->create_completion_notifier(), + req = new RGWAsyncPutSystemObj(stack->create_completion_notifier(), store, NULL, obj, false, bl); async_rados->queue(req); return 0; @@ -596,7 +596,7 @@ public: int send_request() { rgw_obj obj = rgw_obj(pool, oid); - req = new RGWAsyncWriteOmapKeys(env->stack->create_completion_notifier(), store, obj, entries); + req = new RGWAsyncWriteOmapKeys(stack->create_completion_notifier(), store, obj, entries); async_rados->queue(req); return 0; } @@ -638,7 +638,7 @@ public: int send_request() { rgw_obj obj = rgw_obj(pool, oid); - req = new RGWAsyncLockSystemObj(env->stack->create_completion_notifier(), + req = new RGWAsyncLockSystemObj(stack->create_completion_notifier(), store, NULL, obj, lock_name, cookie, duration); async_rados->queue(req); return 0; @@ -678,7 +678,7 @@ public: int send_request() { rgw_obj obj = rgw_obj(pool, oid); - req = new RGWAsyncUnlockSystemObj(env->stack->create_completion_notifier(), + req = new RGWAsyncUnlockSystemObj(stack->create_completion_notifier(), store, NULL, obj, lock_name, cookie); async_rados->queue(req); return 0; @@ -708,7 +708,7 @@ public: int send_request() { http_op = new RGWRESTReadResource(conn, path, params, NULL, http_manager); - http_op->set_user_info((void *)env->stack); + http_op->set_user_info((void *)stack); int ret = http_op->aio_read(); if (ret < 0) { @@ -783,7 +783,7 @@ public: lock_name, cookie)); } yield { - int ret = complete_spawned(); + int ret = stack->complete_spawned(); if (ret < 0) { return set_state(RGWCoroutine_Error); } @@ -832,10 +832,10 @@ int RGWReadSyncStatusCoroutine::handle_data(rgw_meta_sync_info& data) int RGWReadSyncStatusCoroutine::finish() { - return complete_spawned(); + return stack->complete_spawned(); } -class RGWOmapAppend : public RGWCoroutine { +class RGWOmapAppend : public RGWConsumerCR { RGWAsyncRadosProcessor *async_rados; RGWRados *store; RGWCoroutinesEnv *env; @@ -847,38 +847,27 @@ class RGWOmapAppend : public RGWCoroutine { public: RGWOmapAppend(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWCoroutinesEnv *_env, rgw_bucket& _pool, const string& _oid) - : RGWCoroutine(_store->ctx()), async_rados(_async_rados), + : RGWConsumerCR(_store->ctx()), async_rados(_async_rados), store(_store), env(_env), pool(_pool), oid(_oid) {} - int append(const string& entry) { - entries[entry] = bufferlist(); -#define OMAP_APPEND_MAX_ENTRIES 10 -ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl; - if (entries.size() >= OMAP_APPEND_MAX_ENTRIES) { -ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl; - int r = flush(); - if (r < 0) { - ldout(cct, 0) << "ERROR: " << __func__ << "(): failed to flush entries" << dendl; - return r; - } - entries.clear(); - } - return 0; - } - - int flush() { -ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl; - return env->stack->call(this); - } +#define OMAP_APPEND_MAX_ENTRIES 100 int operate() { -ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl; reenter(this) { - yield { -ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl; - call(new RGWRadosSetOmapKeysCR(async_rados, store, pool, oid, entries)); - } - if (get_ret_status() < 0) { - return set_state(RGWCoroutine_Error); + for (;;) { + yield wait_for_product(); + yield { + string entry; + while (consume(&entry)) { + entries[entry] = bufferlist(); + if (entries.size() >= OMAP_APPEND_MAX_ENTRIES) { + break; + } + } + call(new RGWRadosSetOmapKeysCR(async_rados, store, pool, oid, entries)); + } + if (get_ret_status() < 0) { + return set_state(RGWCoroutine_Error); + } } return set_state(RGWCoroutine_Done); } @@ -904,11 +893,12 @@ public: snprintf(buf, sizeof(buf), "%s.%d", oid_prefix.c_str(), i); RGWOmapAppend *shard = new RGWOmapAppend(async_rados, store, env, pool, buf); shards.push_back(shard); + env->stack->spawn(shard, false); } } - int append(const string& entry) { + void append(const string& entry) { static int counter = 0; - return (shards[++counter % shards.size()]->append(entry)); + shards[++counter % shards.size()]->receive(entry); } }; @@ -940,8 +930,7 @@ public: RGWRESTConn *conn = store->rest_master_conn; reenter(this) { -ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl; - entries_index = new RGWShardedOmapCRManager(async_rados, store, env, num_shards, + entries_index = new RGWShardedOmapCRManager(async_rados, store, stack->get_env(), num_shards, store->get_zone_params().log_pool, "meta.full-sync.index"); yield { call(new RGWReadRESTResourceCR >(store->ctx(), conn, http_manager, @@ -955,7 +944,7 @@ ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl; for (; sections_iter != sections.end(); ++sections_iter) { yield { string entrypoint = string("/admin/metadata/") + *sections_iter; -#warning need a better scaling solution here +#warning need a better scaling solution here, requires streaming output call(new RGWReadRESTResourceCR >(store->ctx(), conn, http_manager, entrypoint, NULL, &result)); } @@ -967,11 +956,8 @@ ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl; for (list::iterator iter = result.begin(); iter != result.end(); ++iter) { ldout(store->ctx(), 20) << "list metadata: section=" << *sections_iter << " key=" << *iter << dendl; string s = *sections_iter + ":" + *iter; - int r = entries_index->append(s); - if (r < 0) { - ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): failed to append entry into index" << dendl; - return set_state(RGWCoroutine_Error); - } + entries_index->append(s); +#warning error handling of shards } } } @@ -1119,13 +1105,13 @@ int RGWCloneMetaLogCoroutine::state_init() int RGWCloneMetaLogCoroutine::state_read_shard_status() { - int ret = mdlog->get_info_async(shard_id, &shard_info, env->stack->get_completion_mgr(), (void *)env->stack, &req_ret); + int ret = mdlog->get_info_async(shard_id, &shard_info, stack->get_completion_mgr(), (void *)stack, &req_ret); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret << dendl; return set_state(RGWCoroutine_Error, ret); } - return block(0); + return io_block(0); } int RGWCloneMetaLogCoroutine::state_read_shard_status_complete() @@ -1157,7 +1143,7 @@ int RGWCloneMetaLogCoroutine::state_send_rest_request() http_op = new RGWRESTReadResource(conn, "/admin/log", pairs, NULL, http_manager); - http_op->set_user_info((void *)env->stack); + http_op->set_user_info((void *)stack); int ret = http_op->aio_read(); if (ret < 0) { @@ -1167,7 +1153,7 @@ int RGWCloneMetaLogCoroutine::state_send_rest_request() return ret; } - return block(0); + return io_block(0); } int RGWCloneMetaLogCoroutine::state_receive_rest_response() @@ -1215,7 +1201,7 @@ int RGWCloneMetaLogCoroutine::state_store_mdlog_entries() marker = entry.id; } - RGWAioCompletionNotifier *cn = env->stack->create_completion_notifier(); + RGWAioCompletionNotifier *cn = stack->create_completion_notifier(); int ret = store->meta_mgr->store_md_log_entries(dest_entries, shard_id, cn->completion()); if (ret < 0) { @@ -1223,7 +1209,7 @@ int RGWCloneMetaLogCoroutine::state_store_mdlog_entries() ldout(store->ctx(), 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl; return set_state(RGWCoroutine_Error, ret); } - return block(0); + return io_block(0); } int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete() -- 2.39.5