RGWMetadataLogInfo shard_info;
rgw_mdlog_shard_data data;
- enum State {
- Init = 0,
- ReadShardStatus = 1,
- ReadShardStatusComplete = 2,
- SendRESTRequest = 3,
- ReceiveRESTResponse = 4,
- StoreMDLogEntries = 5,
- StoreMDLogEntriesComplete = 6,
- Done = 100,
- Error = 200,
- } state;
-
- int set_state(State s, int ret = 0) {
- state = s;
- return ret;
- }
public:
RGWCloneMetaLogCoroutine(RGWRados *_store, RGWHTTPManager *_mgr,
int _id, const string& _marker) : RGWCoroutine(_store->ctx()), store(_store),
http_manager(_mgr), shard_id(_id),
marker(_marker), truncated(false), max_entries(CLONE_MAX_ENTRIES),
http_op(NULL), md_op_notifier(NULL),
- req_ret(0),
- state(RGWCloneMetaLogCoroutine::Init) {}
+ req_ret(0) {}
int operate();
int state_receive_rest_response();
int state_store_mdlog_entries();
int state_store_mdlog_entries_complete();
-
- bool is_done() { return (state == Done || state == Error); }
- bool is_error() { return (state == Error); }
};
int RGWRemoteMetaLog::clone_shards()
int RGWCloneMetaLogCoroutine::operate()
{
- switch (state) {
- case Init:
- ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": init request" << dendl;
- return state_init();
- case ReadShardStatus:
- ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status" << dendl;
- return state_read_shard_status();
- case ReadShardStatusComplete:
- ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status complete" << dendl;
- return state_read_shard_status_complete();
- case SendRESTRequest:
- ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": sending rest request" << dendl;
- return state_send_rest_request();
- case ReceiveRESTResponse:
- ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": receiving rest response" << dendl;
- return state_receive_rest_response();
- case StoreMDLogEntries:
- ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries" << dendl;
- return state_store_mdlog_entries();
- case StoreMDLogEntriesComplete:
+ reenter(this) {
+ do {
+ yield {
+ ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": init request" << dendl;
+ return state_init();
+ }
+ yield {
+ ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status" << dendl;
+ return state_read_shard_status();
+ }
+ yield {
+ ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status complete" << dendl;
+ return state_read_shard_status_complete();
+ }
+ yield {
+ ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": sending rest request" << dendl;
+ return state_send_rest_request();
+ }
+ yield {
+ ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": receiving rest response" << dendl;
+ return state_receive_rest_response();
+ }
+ yield {
+ ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries" << dendl;
+ return state_store_mdlog_entries();
+ }
+ } while (truncated);
+ yield {
ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries complete" << dendl;
return state_store_mdlog_entries_complete();
- case Done:
- ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": done" << dendl;
- break;
- case Error:
- ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": error" << dendl;
- break;
+ }
}
return 0;
{
data = rgw_mdlog_shard_data();
- return set_state(ReadShardStatus);
+ return 0;
}
int RGWCloneMetaLogCoroutine::state_read_shard_status()
int ret = mdlog->get_info_async(shard_id, &shard_info, env->stack->get_completion_mgr(), (void *)env->stack, &req_ret);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret << dendl;
- return set_state(Error, ret);
+ return set_state(RGWCoroutine_Error, ret);
}
- return block(set_state(ReadShardStatusComplete));
+ return block(0);
}
int RGWCloneMetaLogCoroutine::state_read_shard_status_complete()
marker = shard_info.marker;
- return set_state(SendRESTRequest);
+ return 0;
}
int RGWCloneMetaLogCoroutine::state_send_rest_request()
return ret;
}
- return block(set_state(ReceiveRESTResponse));
+ return block(0);
}
int RGWCloneMetaLogCoroutine::state_receive_rest_response()
error_stream << "http operation failed: " << http_op->to_str() << " status=" << http_op->get_http_status() << std::endl;
ldout(store->ctx(), 0) << "ERROR: failed to wait for op, ret=" << ret << dendl;
http_op->put();
- return set_state(Error, ret);
+ return set_state(RGWCoroutine_Error, ret);
}
http_op->put();
truncated = ((int)data.entries.size() == max_entries);
if (data.entries.empty()) {
- return set_state(Done);
+ return set_state(RGWCoroutine_Done);
}
- return set_state(StoreMDLogEntries);
+ return 0;
}
if (ret < 0) {
cn->put();
ldout(store->ctx(), 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl;
- return set_state(Error, ret);
+ return set_state(RGWCoroutine_Error, ret);
}
- return block(set_state(StoreMDLogEntriesComplete));
+ return block(0);
}
int RGWCloneMetaLogCoroutine::state_store_mdlog_entries_complete()
{
- if (truncated) {
- return state_init();
- }
- return set_state(Done);
+ return set_state(RGWCoroutine_Done);
}