return 0;
}
-void RGWAsyncOp::call(RGWAsyncOp *op)
+void RGWCoroutine::call(RGWCoroutine *op)
{
int r = env->stack->call(op, 0);
assert(r == 0);
}
-void RGWAsyncOp::call_concurrent(RGWAsyncOp *op)
+void RGWCoroutine::spawn(RGWCoroutine *op)
{
- RGWAsyncOpsStack *stack = env->manager->allocate_stack();
+ RGWCoroutinesStack *stack = env->manager->allocate_stack();
int r = stack->call(op, 0);
assert(r == 0);
env->stack->set_blocked_by(stack);
}
-class RGWSimpleAsyncOp : public RGWAsyncOp {
+class RGWSimpleCoroutine : public RGWCoroutine {
enum State {
Init = 0,
SendRequest = 1,
CephContext *cct;
public:
- RGWSimpleAsyncOp(CephContext *_cct) : state(Init), cct(_cct) {}
+ RGWSimpleCoroutine(CephContext *_cct) : state(Init), cct(_cct) {}
virtual int init() { return 0; }
virtual int send_request() = 0;
bool is_error() { return (state == Error); }
};
-int RGWSimpleAsyncOp::operate()
+int RGWSimpleCoroutine::operate()
{
switch (state) {
case Init:
return 0;
}
-int RGWSimpleAsyncOp::state_init()
+int RGWSimpleCoroutine::state_init()
{
int ret = init();
if (ret < 0) {
return set_state(SendRequest);
}
-int RGWSimpleAsyncOp::state_send_request()
+int RGWSimpleCoroutine::state_send_request()
{
int ret = send_request();
if (ret < 0) {
return yield(set_state(RequestComplete));
}
-int RGWSimpleAsyncOp::state_request_complete()
+int RGWSimpleCoroutine::state_request_complete()
{
int ret = request_complete();
if (ret < 0) {
return set_state(AllComplete);
}
-int RGWSimpleAsyncOp::state_all_complete()
+int RGWSimpleCoroutine::state_all_complete()
{
int ret = finish();
if (ret < 0) {
}
template <class T>
-class RGWSimpleRadosAsyncOp : public RGWSimpleAsyncOp {
+class RGWSimpleRadosCoroutine : public RGWSimpleCoroutine {
RGWAsyncRadosProcessor *async_rados;
RGWRados *store;
RGWObjectCtx& obj_ctx;
RGWAsyncGetSystemObj *req;
public:
- RGWSimpleRadosAsyncOp(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ RGWSimpleRadosCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
RGWObjectCtx& _obj_ctx,
rgw_bucket& _pool, const string& _oid,
- T *_result) : RGWSimpleAsyncOp(_store->ctx()),
+ T *_result) : RGWSimpleCoroutine(_store->ctx()),
async_rados(_async_rados), store(_store),
obj_ctx(_obj_ctx),
pool(_pool), oid(_oid),
result(_result),
req(NULL) { }
- ~RGWSimpleRadosAsyncOp() {
+ ~RGWSimpleRadosCoroutine() {
delete req;
}
};
template <class T>
-int RGWSimpleRadosAsyncOp<T>::send_request()
+int RGWSimpleRadosCoroutine<T>::send_request()
{
rgw_obj obj = rgw_obj(pool, oid);
req = new RGWAsyncGetSystemObj(env->stack->create_completion_notifier(),
}
template <class T>
-int RGWSimpleRadosAsyncOp<T>::request_complete()
+int RGWSimpleRadosCoroutine<T>::request_complete()
{
int ret = req->get_ret_status();
if (ret != -ENOENT) {
return handle_data(*result);
}
-class RGWReadSyncStatusOp : public RGWSimpleRadosAsyncOp<RGWMetaSyncGlobalStatus> {
+class RGWReadSyncStatusCoroutine : public RGWSimpleRadosCoroutine<RGWMetaSyncGlobalStatus> {
RGWAsyncRadosProcessor *async_rados;
RGWRados *store;
RGWObjectCtx& obj_ctx;
rgw_sync_marker sync_marker;
public:
- RGWReadSyncStatusOp(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ RGWReadSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
RGWObjectCtx& _obj_ctx,
- RGWMetaSyncGlobalStatus *_gs) : RGWSimpleRadosAsyncOp(_async_rados, _store, _obj_ctx,
+ RGWMetaSyncGlobalStatus *_gs) : RGWSimpleRadosCoroutine(_async_rados, _store, _obj_ctx,
_store->get_zone_params().log_pool,
"mdlog.state.global",
_gs),
int handle_data(RGWMetaSyncGlobalStatus& data);
};
-int RGWReadSyncStatusOp::handle_data(RGWMetaSyncGlobalStatus& data)
+int RGWReadSyncStatusCoroutine::handle_data(RGWMetaSyncGlobalStatus& data)
{
- call_concurrent(new RGWSimpleRadosAsyncOp<rgw_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
+ spawn(new RGWSimpleRadosCoroutine<rgw_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
"mdlog.state.0", &sync_marker));
- call_concurrent(new RGWSimpleRadosAsyncOp<rgw_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
+ spawn(new RGWSimpleRadosCoroutine<rgw_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
"mdlog.state.1", &sync_marker));
return 0;
}
-class RGWMetaSyncOp : public RGWAsyncOp {
+class RGWMetaSyncCoroutine : public RGWCoroutine {
RGWRados *store;
RGWMetadataLog *mdlog;
RGWHTTPManager *http_manager;
return ret;
}
public:
- RGWMetaSyncOp(RGWRados *_store, RGWHTTPManager *_mgr, int _id) : RGWAsyncOp(), store(_store),
+ RGWMetaSyncCoroutine(RGWRados *_store, RGWHTTPManager *_mgr, int _id) : RGWCoroutine(), store(_store),
mdlog(store->meta_mgr->get_log()),
http_manager(_mgr),
shard_id(_id),
max_entries(CLONE_MAX_ENTRIES),
http_op(NULL),
- state(RGWMetaSyncOp::Init) {}
+ state(RGWMetaSyncCoroutine::Init) {}
int operate();
bool is_error() { return (state == Error); }
};
-int RGWMetaSyncOp::operate()
+int RGWMetaSyncCoroutine::operate()
{
switch (state) {
case Init:
return 0;
}
-int RGWMetaSyncOp::state_init()
+int RGWMetaSyncCoroutine::state_init()
{
return 0;
}
-int RGWMetaSyncOp::state_read_sync_status()
+int RGWMetaSyncCoroutine::state_read_sync_status()
{
return 0;
}
-int RGWMetaSyncOp::state_read_sync_status_complete()
+int RGWMetaSyncCoroutine::state_read_sync_status_complete()
{
return 0;
}
-class RGWCloneMetaLogOp : public RGWAsyncOp {
+class RGWCloneMetaLogCoroutine : public RGWCoroutine {
RGWRados *store;
RGWMetadataLog *mdlog;
RGWHTTPManager *http_manager;
return ret;
}
public:
- RGWCloneMetaLogOp(RGWRados *_store, RGWHTTPManager *_mgr,
- int _id, const string& _marker) : RGWAsyncOp(), store(_store),
+ RGWCloneMetaLogCoroutine(RGWRados *_store, RGWHTTPManager *_mgr,
+ int _id, const string& _marker) : RGWCoroutine(), store(_store),
mdlog(store->meta_mgr->get_log()),
http_manager(_mgr), shard_id(_id),
marker(_marker), truncated(false), max_entries(CLONE_MAX_ENTRIES),
http_op(NULL), md_op_notifier(NULL),
req_ret(0),
- state(RGWCloneMetaLogOp::Init) {}
+ state(RGWCloneMetaLogCoroutine::Init) {}
int operate();
bool is_error() { return (state == Error); }
};
-RGWAsyncOpsStack::RGWAsyncOpsStack(CephContext *_cct, RGWAsyncOpsManager *_ops_mgr, RGWAsyncOp *start) : cct(_cct), ops_mgr(_ops_mgr),
+RGWCoroutinesStack::RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start) : cct(_cct), ops_mgr(_ops_mgr),
done_flag(false), error_flag(false), blocked_flag(false) {
if (start) {
ops.push_back(start);
pos = ops.begin();
}
-int RGWAsyncOpsStack::operate(RGWAsyncOpsEnv *env)
+int RGWCoroutinesStack::operate(RGWCoroutinesEnv *env)
{
- RGWAsyncOp *op = *pos;
+ RGWCoroutine *op = *pos;
int r = op->do_operate(env);
if (r < 0) {
ldout(cct, 0) << "ERROR: op->operate() returned r=" << r << dendl;
return 0;
}
-string RGWAsyncOpsStack::error_str()
+string RGWCoroutinesStack::error_str()
{
if (pos != ops.end()) {
return (*pos)->error_str();
return string();
}
-int RGWAsyncOpsStack::call(RGWAsyncOp *next_op, int ret) {
+int RGWCoroutinesStack::call(RGWCoroutine *next_op, int ret) {
ops.push_back(next_op);
if (pos != ops.end()) {
++pos;
return ret;
}
-int RGWAsyncOpsStack::unwind(int retcode)
+int RGWCoroutinesStack::unwind(int retcode)
{
if (pos == ops.begin()) {
pos = ops.end();
--pos;
ops.pop_back();
- RGWAsyncOp *op = *pos;
+ RGWCoroutine *op = *pos;
op->set_retcode(retcode);
return 0;
}
-void RGWAsyncOpsStack::set_blocked(bool flag)
+void RGWCoroutinesStack::set_blocked(bool flag)
{
blocked_flag = flag;
if (pos != ops.end()) {
}
}
-AioCompletionNotifier *RGWAsyncOpsStack::create_completion_notifier()
+AioCompletionNotifier *RGWCoroutinesStack::create_completion_notifier()
{
return ops_mgr->create_completion_notifier(this);
}
-RGWCompletionManager *RGWAsyncOpsStack::get_completion_mgr()
+RGWCompletionManager *RGWCoroutinesStack::get_completion_mgr()
{
return ops_mgr->get_completion_mgr();
}
-bool RGWAsyncOpsStack::unblock_stack(RGWAsyncOpsStack **s)
+bool RGWCoroutinesStack::unblock_stack(RGWCoroutinesStack **s)
{
if (blocking_stacks.empty()) {
return false;
}
- set<RGWAsyncOpsStack *>::iterator iter = blocking_stacks.begin();
+ set<RGWCoroutinesStack *>::iterator iter = blocking_stacks.begin();
*s = *iter;
blocking_stacks.erase(iter);
(*s)->blocked_by_stack.erase(this);
return true;
}
-void RGWAsyncOpsManager::report_error(RGWAsyncOpsStack *op)
+void RGWCoroutinesManager::report_error(RGWCoroutinesStack *op)
{
#warning need to have error logging infrastructure that logs on backend
lderr(cct) << "ERROR: failed operation: " << op->error_str() << dendl;
}
-void RGWAsyncOpsManager::handle_unblocked_stack(list<RGWAsyncOpsStack *>& stacks, RGWAsyncOpsStack *stack, int *waiting_count)
+void RGWCoroutinesManager::handle_unblocked_stack(list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *waiting_count)
{
--(*waiting_count);
stack->set_blocked(false);
}
}
-int RGWAsyncOpsManager::run(list<RGWAsyncOpsStack *>& stacks)
+int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
{
int waiting_count = 0;
- RGWAsyncOpsEnv env;
+ RGWCoroutinesEnv env;
env.manager = this;
env.stacks = &stacks;
- for (list<RGWAsyncOpsStack *>::iterator iter = stacks.begin(); iter != stacks.end();) {
- RGWAsyncOpsStack *stack = *iter;
+ for (list<RGWCoroutinesStack *>::iterator iter = stacks.begin(); iter != stacks.end();) {
+ RGWCoroutinesStack *stack = *iter;
env.stack = stack;
int ret = stack->operate(&env);
if (ret < 0) {
} else if (stack->is_blocked()) {
waiting_count++;
} else if (stack->is_done()) {
- RGWAsyncOpsStack *s;
+ RGWCoroutinesStack *s;
while (stack->unblock_stack(&s)) {
if (!s->is_blocked_by_stack() && !s->is_done()) {
if (s->is_blocked()) {
stacks.push_back(stack);
}
- RGWAsyncOpsStack *blocked_stack;
+ RGWCoroutinesStack *blocked_stack;
while (completion_mgr.try_get_next((void **)&blocked_stack)) {
handle_unblocked_stack(stacks, blocked_stack, &waiting_count);
}
return 0;
}
-int RGWAsyncOpsManager::run(RGWAsyncOp *op)
+int RGWCoroutinesManager::run(RGWCoroutine *op)
{
- list<RGWAsyncOpsStack *> stacks;
- RGWAsyncOpsStack *stack = allocate_stack();
+ list<RGWCoroutinesStack *> stacks;
+ RGWCoroutinesStack *stack = allocate_stack();
int r = stack->call(op);
if (r < 0) {
ldout(cct, 0) << "ERROR: stack->call() returned r=" << r << dendl;
return r;
}
-AioCompletionNotifier *RGWAsyncOpsManager::create_completion_notifier(RGWAsyncOpsStack *stack)
+AioCompletionNotifier *RGWCoroutinesManager::create_completion_notifier(RGWCoroutinesStack *stack)
{
return new AioCompletionNotifier(&completion_mgr, (void *)stack);
}
int RGWRemoteMetaLog::clone_shards()
{
- list<RGWAsyncOpsStack *> stacks;
+ list<RGWCoroutinesStack *> stacks;
for (int i = 0; i < (int)log_info.num_shards; i++) {
- RGWAsyncOpsStack *stack = new RGWAsyncOpsStack(store->ctx(), this);
- int r = stack->call(new RGWCloneMetaLogOp(store, &http_manager, i, clone_markers[i]));
+ RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), this);
+ int r = stack->call(new RGWCloneMetaLogCoroutine(store, &http_manager, i, clone_markers[i]));
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: stack->call() returned r=" << r << dendl;
return r;
int RGWRemoteMetaLog::fetch()
{
- list<RGWAsyncOpsStack *> stacks;
+ list<RGWCoroutinesStack *> stacks;
for (int i = 0; i < (int)log_info.num_shards; i++) {
- RGWAsyncOpsStack *stack = new RGWAsyncOpsStack(store->ctx(), this);
- int r = stack->call(new RGWCloneMetaLogOp(store, &http_manager, i, clone_markers[i]));
+ RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), this);
+ int r = stack->call(new RGWCloneMetaLogCoroutine(store, &http_manager, i, clone_markers[i]));
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: stack->call() returned r=" << r << dendl;
return r;
int RGWRemoteMetaLog::get_sync_status(RGWMetaSyncGlobalStatus *sync_status)
{
RGWObjectCtx obj_ctx(store, NULL);
- return run(new RGWReadSyncStatusOp(async_rados, store, obj_ctx, sync_status));
+ return run(new RGWReadSyncStatusCoroutine(async_rados, store, obj_ctx, sync_status));
}
int RGWRemoteMetaLog::get_shard_sync_marker(int shard_id, rgw_sync_marker *shard_status)
return 0;
}
-int RGWCloneMetaLogOp::operate()
+int RGWCloneMetaLogCoroutine::operate()
{
switch (state) {
case Init:
return 0;
}
-int RGWCloneMetaLogOp::state_init()
+int RGWCloneMetaLogCoroutine::state_init()
{
data = rgw_mdlog_shard_data();
return set_state(ReadShardStatus);
}
-int RGWCloneMetaLogOp::state_read_shard_status()
+int RGWCloneMetaLogCoroutine::state_read_shard_status()
{
int ret = mdlog->get_info_async(shard_id, &shard_info, env->stack->get_completion_mgr(), (void *)env->stack, &req_ret);
if (ret < 0) {
return yield(set_state(ReadShardStatusComplete));
}
-int RGWCloneMetaLogOp::state_read_shard_status_complete()
+int RGWCloneMetaLogCoroutine::state_read_shard_status_complete()
{
ldout(store->ctx(), 20) << "shard_id=" << shard_id << " marker=" << shard_info.marker << " last_update=" << shard_info.last_update << dendl;
return set_state(SendRESTRequest);
}
-int RGWCloneMetaLogOp::state_send_rest_request()
+int RGWCloneMetaLogCoroutine::state_send_rest_request()
{
RGWRESTConn *conn = store->rest_master_conn;
return yield(set_state(ReceiveRESTResponse));
}
-int RGWCloneMetaLogOp::state_receive_rest_response()
+int RGWCloneMetaLogCoroutine::state_receive_rest_response()
{
int ret = http_op->wait(&data);
if (ret < 0) {
}
-int RGWCloneMetaLogOp::state_store_mdlog_entries()
+int RGWCloneMetaLogCoroutine::state_store_mdlog_entries()
{
list<cls_log_entry> dest_entries;
return yield(set_state(StoreMDLogEntriesComplete));
}
-int RGWCloneMetaLogOp::state_store_mdlog_entries_complete()
+int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete()
{
if (truncated) {
return state_init();
#define RGW_ASYNC_OPS_MGR_WINDOW 16
-class RGWAsyncOpsStack;
-class RGWAsyncOpsManager;
+class RGWCoroutinesStack;
+class RGWCoroutinesManager;
class AioCompletionNotifier;
-struct RGWAsyncOpsEnv {
- RGWAsyncOpsManager *manager;
- list<RGWAsyncOpsStack *> *stacks;
- RGWAsyncOpsStack *stack;
+struct RGWCoroutinesEnv {
+ RGWCoroutinesManager *manager;
+ list<RGWCoroutinesStack *> *stacks;
+ RGWCoroutinesStack *stack;
- RGWAsyncOpsEnv() : manager(NULL), stacks(NULL), stack(NULL) {}
+ RGWCoroutinesEnv() : manager(NULL), stacks(NULL), stack(NULL) {}
};
-class RGWAsyncOp : public RefCountedObject {
- friend class RGWAsyncOpsStack;
+class RGWCoroutine : public RefCountedObject {
+ friend class RGWCoroutinesStack;
protected:
- RGWAsyncOpsEnv *env;
+ RGWCoroutinesEnv *env;
bool blocked;
int retcode;
return ret;
}
- int do_operate(RGWAsyncOpsEnv *_env) {
+ int do_operate(RGWCoroutinesEnv *_env) {
env = _env;
return operate();
}
- void call(RGWAsyncOp *op);
- void call_concurrent(RGWAsyncOp *op);
+ void call(RGWCoroutine *op);
+ void spawn(RGWCoroutine *op);
public:
- RGWAsyncOp() : env(NULL), blocked(false), retcode(0) {}
- virtual ~RGWAsyncOp() {}
+ RGWCoroutine() : env(NULL), blocked(false), retcode(0) {}
+ virtual ~RGWCoroutine() {}
virtual int operate() = 0;
}
};
-class RGWAsyncOpsStack {
+class RGWCoroutinesStack {
CephContext *cct;
- RGWAsyncOpsManager *ops_mgr;
+ RGWCoroutinesManager *ops_mgr;
- list<RGWAsyncOp *> ops;
- list<RGWAsyncOp *>::iterator pos;
+ list<RGWCoroutine *> ops;
+ list<RGWCoroutine *>::iterator pos;
- set<RGWAsyncOpsStack *> blocked_by_stack;
- set<RGWAsyncOpsStack *> blocking_stacks;
+ set<RGWCoroutinesStack *> blocked_by_stack;
+ set<RGWCoroutinesStack *> blocking_stacks;
bool done_flag;
bool blocked_flag;
public:
- RGWAsyncOpsStack(CephContext *_cct, RGWAsyncOpsManager *_ops_mgr, RGWAsyncOp *start = NULL);
+ RGWCoroutinesStack(CephContext *_cct, RGWCoroutinesManager *_ops_mgr, RGWCoroutine *start = NULL);
- int operate(RGWAsyncOpsEnv *env);
+ int operate(RGWCoroutinesEnv *env);
bool is_done() {
return done_flag;
string error_str();
- int call(RGWAsyncOp *next_op, int ret = 0);
+ int call(RGWCoroutine *next_op, int ret = 0);
int unwind(int retcode);
AioCompletionNotifier *create_completion_notifier();
RGWCompletionManager *get_completion_mgr();
- void set_blocked_by(RGWAsyncOpsStack *s) {
+ void set_blocked_by(RGWCoroutinesStack *s) {
blocked_by_stack.insert(s);
s->blocking_stacks.insert(this);
}
- bool unblock_stack(RGWAsyncOpsStack **s);
+ bool unblock_stack(RGWCoroutinesStack **s);
};
-class RGWAsyncOpsManager {
+class RGWCoroutinesManager {
CephContext *cct;
- void handle_unblocked_stack(list<RGWAsyncOpsStack *>& stacks, RGWAsyncOpsStack *stack, int *waiting_count);
+ void handle_unblocked_stack(list<RGWCoroutinesStack *>& stacks, RGWCoroutinesStack *stack, int *waiting_count);
protected:
RGWCompletionManager completion_mgr;
void put_completion_notifier(AioCompletionNotifier *cn);
public:
- RGWAsyncOpsManager(CephContext *_cct) : cct(_cct), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {}
- virtual ~RGWAsyncOpsManager() {}
+ RGWCoroutinesManager(CephContext *_cct) : cct(_cct), ops_window(RGW_ASYNC_OPS_MGR_WINDOW) {}
+ virtual ~RGWCoroutinesManager() {}
- int run(list<RGWAsyncOpsStack *>& ops);
- int run(RGWAsyncOp *op);
+ int run(list<RGWCoroutinesStack *>& ops);
+ int run(RGWCoroutine *op);
- virtual void report_error(RGWAsyncOpsStack *op);
+ virtual void report_error(RGWCoroutinesStack *op);
- AioCompletionNotifier *create_completion_notifier(RGWAsyncOpsStack *stack);
+ AioCompletionNotifier *create_completion_notifier(RGWCoroutinesStack *stack);
RGWCompletionManager *get_completion_mgr() { return &completion_mgr; }
- RGWAsyncOpsStack *allocate_stack() {
- return new RGWAsyncOpsStack(cct, this);
+ RGWCoroutinesStack *allocate_stack() {
+ return new RGWCoroutinesStack(cct, this);
}
};
class RGWAsyncRadosProcessor;
-class RGWRemoteMetaLog : public RGWAsyncOpsManager {
+class RGWRemoteMetaLog : public RGWCoroutinesManager {
RGWRados *store;
RGWRESTConn *conn;
RGWAsyncRadosProcessor *async_rados;
RGWMetaSyncStatusManager status_manager;
public:
- RGWRemoteMetaLog(RGWRados *_store) : RGWAsyncOpsManager(_store->ctx()), store(_store),
+ RGWRemoteMetaLog(RGWRados *_store) : RGWCoroutinesManager(_store->ctx()), store(_store),
conn(NULL), ts_to_shard_lock("ts_to_shard_lock"),
http_manager(store->ctx(), &completion_mgr),
status_manager(store) {}