AioCompletionNotifier *md_op_notifier;
- bool finished;
+ rgw_mdlog_shard_data data;
enum State {
Init = 0,
- SentRESTRequest = 1,
- ReceivedRESTResponse = 2,
- StoringMDLogEntries = 3,
- Done = 4,
- Error = 5,
+ ReadShardStatus = 1,
+ SendRESTRequest = 2,
+ ReceiveRESTResponse = 3,
+ StoreMDLogEntries = 4,
+ StoreMDLogEntriesComplete = 5,
+ Done = 6,
+ Error = 7,
} state;
+
+ int set_state(State s, int ret = 0) {
+ state = s;
+ return ret;
+ }
public:
RGWCloneMetaLogOp(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncOpsManager *_ops_mgr,
int _id, const string& _marker) : RGWAsyncOp(_ops_mgr), store(_store),
http_manager(_mgr), shard_id(_id),
marker(_marker), truncated(false), max_entries(CLONE_MAX_ENTRIES),
http_op(NULL), md_op_notifier(NULL),
- finished(false),
state(RGWCloneMetaLogOp::Init) {}
int operate();
int state_init();
- int state_sent_rest_request();
- int state_storing_mdlog_entries();
+ int state_read_shard_status();
+ int state_send_rest_request();
+ int state_receive_rest_response();
+ int state_store_mdlog_entries();
+ int state_store_mdlog_entries_complete();
bool is_done() { return (state == Done || state == Error); }
bool is_error() { return (state == Error); }
ldout(cct, 0) << "ERROR: op->operate() returned ret=" << ret << dendl;
}
+ if (op->is_error()) {
+ report_error(op);
+ }
+
if (op->is_blocked()) {
waiting_count++;
+ } else if (op->is_done()) {
+ delete op;
+ } else {
+ ops.push_back(op);
}
- if (op->is_error()) {
- report_error(op);
- }
if (waiting_count >= ops_window) {
- RGWCloneMetaLogOp *op;
- int ret = completion_mgr.get_next((void **)&op);
+ RGWCloneMetaLogOp *blocked_op;
+ int ret = completion_mgr.get_next((void **)&blocked_op);
if (ret < 0) {
ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
} else {
waiting_count--;
}
- if (!op->is_done()) {
- ops.push_back(op);
+ blocked_op->set_blocked(false);
+ if (!blocked_op->is_done()) {
+ ops.push_back(blocked_op);
} else {
- delete op;
+ delete blocked_op;
}
}
}
{
switch (state) {
case Init:
- ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": sending request" << dendl;
+ ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": init request" << dendl;
return state_init();
- case SentRESTRequest:
- ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": handling response" << dendl;
- return state_sent_rest_request();
- case ReceivedRESTResponse:
- assert(0);
- break; /* unreachable */
- case StoringMDLogEntries:
- return state_storing_mdlog_entries();
+ case SendRESTRequest:
+ ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": sending rest request" << dendl;
+ return state_send_rest_request();
+ case ReceiveRESTResponse:
+ ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": receiving rest response" << dendl;
+ return state_receive_rest_response();
+ case StoreMDLogEntries:
+ ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries" << dendl;
+ return state_store_mdlog_entries();
+ case StoreMDLogEntriesComplete:
+ ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries complete" << dendl;
+ return state_store_mdlog_entries_complete();
case Done:
ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": done" << dendl;
break;
}
int RGWCloneMetaLogOp::state_init()
+{
+ data = rgw_mdlog_shard_data();
+
+ return set_state(SendRESTRequest);
+}
+
+int RGWCloneMetaLogOp::state_send_rest_request()
{
RGWRESTConn *conn = store->rest_master_conn;
return ret;
}
- set_blocked(true);
- state = SentRESTRequest;
-
- return 0;
+ return yield(set_state(ReceiveRESTResponse));
}
-int RGWCloneMetaLogOp::state_sent_rest_request()
+int RGWCloneMetaLogOp::state_receive_rest_response()
{
- rgw_mdlog_shard_data data;
-
int ret = http_op->wait(&data);
- set_blocked(false);
if (ret < 0) {
error_stream << "http operation failed: " << http_op->to_str() << " status=" << http_op->get_http_status() << std::endl;
ldout(store->ctx(), 0) << "ERROR: failed to wait for op, ret=" << ret << dendl;
- state = Error;
http_op->put();
- return ret;
+ return set_state(Error, ret);
}
http_op->put();
- state = ReceivedRESTResponse;
-
ldout(store->ctx(), 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
truncated = ((int)data.entries.size() == max_entries);
if (data.entries.empty()) {
- state = Done;
- return 0;
+ return set_state(Done);
}
+ return set_state(StoreMDLogEntries);
+}
+
+
+int RGWCloneMetaLogOp::state_store_mdlog_entries()
+{
list<cls_log_entry> dest_entries;
vector<rgw_mdlog_entry>::iterator iter;
marker = entry.id;
}
- state = StoringMDLogEntries;
-
AioCompletionNotifier *cn = ops_mgr->create_completion_notifier(this);
- ret = store->meta_mgr->store_md_log_entries(dest_entries, shard_id, cn->completion());
+ int ret = store->meta_mgr->store_md_log_entries(dest_entries, shard_id, cn->completion());
if (ret < 0) {
cn->put();
- state = Error;
ldout(store->ctx(), 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl;
- return ret;
+ return set_state(Error, ret);
}
- set_blocked(true);
- return 0;
+ return yield(set_state(StoreMDLogEntriesComplete));
}
-int RGWCloneMetaLogOp::state_storing_mdlog_entries()
+int RGWCloneMetaLogOp::state_store_mdlog_entries_complete()
{
- set_blocked(false);
if (truncated) {
return state_init();
- } else {
- state = Done;
}
-
- return 0;
+ return set_state(Done);
}