]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: clean up mdlog clone states, other fixes
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 23 Jul 2015 21:31:21 +0000 (14:31 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 9 Feb 2016 20:58:59 +0000 (12:58 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_sync.cc
src/rgw/rgw_sync.h

index 455adaee6585b7aec7d0b7271c27a6cea648e4d9..3b2bea264705e6c0ba7f624ce2d31020f6d4eef6 100644 (file)
@@ -194,30 +194,39 @@ class RGWCloneMetaLogOp : public RGWAsyncOp {
 
   AioCompletionNotifier *md_op_notifier;
 
-  bool finished;
+  rgw_mdlog_shard_data data;
 
   enum State {
     Init = 0,
-    SentRESTRequest = 1,
-    ReceivedRESTResponse = 2,
-    StoringMDLogEntries = 3,
-    Done = 4,
-    Error = 5,
+    ReadShardStatus = 1,
+    SendRESTRequest = 2,
+    ReceiveRESTResponse = 3,
+    StoreMDLogEntries = 4,
+    StoreMDLogEntriesComplete = 5,
+    Done = 6,
+    Error = 7,
   } state;
+
+  int set_state(State s, int ret = 0) {
+    state = s;
+    return ret;
+  }
 public:
   RGWCloneMetaLogOp(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncOpsManager *_ops_mgr,
                    int _id, const string& _marker) : RGWAsyncOp(_ops_mgr), store(_store),
                                                       http_manager(_mgr), shard_id(_id),
                                                       marker(_marker), truncated(false), max_entries(CLONE_MAX_ENTRIES),
                                                      http_op(NULL), md_op_notifier(NULL),
-                                                     finished(false),
                                                       state(RGWCloneMetaLogOp::Init) {}
 
   int operate();
 
   int state_init();
-  int state_sent_rest_request();
-  int state_storing_mdlog_entries();
+  int state_read_shard_status();
+  int state_send_rest_request();
+  int state_receive_rest_response();
+  int state_store_mdlog_entries();
+  int state_store_mdlog_entries_complete();
 
   bool is_done() { return (state == Done || state == Error); }
   bool is_error() { return (state == Error); }
@@ -239,25 +248,31 @@ int RGWAsyncOpsManager::run(list<RGWAsyncOp *>& ops)
       ldout(cct, 0) << "ERROR: op->operate() returned ret=" << ret << dendl;
     }
 
+    if (op->is_error()) {
+      report_error(op);
+    }
+
     if (op->is_blocked()) {
       waiting_count++;
+    } else if (op->is_done()) {
+      delete op;
+    } else {
+      ops.push_back(op);
     }
 
-    if (op->is_error()) {
-      report_error(op);
-    }
     if (waiting_count >= ops_window) {
-      RGWCloneMetaLogOp *op;
-      int ret = completion_mgr.get_next((void **)&op);
+      RGWCloneMetaLogOp *blocked_op;
+      int ret = completion_mgr.get_next((void **)&blocked_op);
       if (ret < 0) {
        ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
       } else {
         waiting_count--;
       }
-      if (!op->is_done()) {
-       ops.push_back(op);
+      blocked_op->set_blocked(false);
+      if (!blocked_op->is_done()) {
+       ops.push_back(blocked_op);
       } else {
-       delete op;
+       delete blocked_op;
       }
     }
   }
@@ -296,16 +311,20 @@ int RGWCloneMetaLogOp::operate()
 {
   switch (state) {
     case Init:
-      ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": sending request" << dendl;
+      ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": init request" << dendl;
       return state_init();
-    case SentRESTRequest:
-      ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": handling response" << dendl;
-      return state_sent_rest_request();
-    case ReceivedRESTResponse:
-      assert(0);
-      break; /* unreachable */
-    case StoringMDLogEntries:
-      return state_storing_mdlog_entries();
+    case SendRESTRequest:
+      ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": sending rest request" << dendl;
+      return state_send_rest_request();
+    case ReceiveRESTResponse:
+      ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": receiving rest response" << dendl;
+      return state_receive_rest_response();
+    case StoreMDLogEntries:
+      ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries" << dendl;
+      return state_store_mdlog_entries();
+    case StoreMDLogEntriesComplete:
+      ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries complete" << dendl;
+      return state_store_mdlog_entries_complete();
     case Done:
       ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": done" << dendl;
       break;
@@ -318,6 +337,13 @@ int RGWCloneMetaLogOp::operate()
 }
 
 int RGWCloneMetaLogOp::state_init()
+{
+  data = rgw_mdlog_shard_data();
+
+  return set_state(SendRESTRequest);
+}
+
+int RGWCloneMetaLogOp::state_send_rest_request()
 {
   RGWRESTConn *conn = store->rest_master_conn;
 
@@ -347,38 +373,34 @@ int RGWCloneMetaLogOp::state_init()
     return ret;
   }
 
-  set_blocked(true);
-  state = SentRESTRequest;
-
-  return 0;
+  return yield(set_state(ReceiveRESTResponse));
 }
 
-int RGWCloneMetaLogOp::state_sent_rest_request()
+int RGWCloneMetaLogOp::state_receive_rest_response()
 {
-  rgw_mdlog_shard_data data;
-
   int ret = http_op->wait(&data);
-  set_blocked(false);
   if (ret < 0) {
     error_stream << "http operation failed: " << http_op->to_str() << " status=" << http_op->get_http_status() << std::endl;
     ldout(store->ctx(), 0) << "ERROR: failed to wait for op, ret=" << ret << dendl;
-    state = Error;
     http_op->put();
-    return ret;
+    return set_state(Error, ret);
   }
   http_op->put();
 
-  state = ReceivedRESTResponse;
-
   ldout(store->ctx(), 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
 
   truncated = ((int)data.entries.size() == max_entries);
 
   if (data.entries.empty()) {
-    state = Done;
-    return 0;
+    return set_state(Done);
   }
 
+  return set_state(StoreMDLogEntries);
+}
+
+
+int RGWCloneMetaLogOp::state_store_mdlog_entries()
+{
   list<cls_log_entry> dest_entries;
 
   vector<rgw_mdlog_entry>::iterator iter;
@@ -399,31 +421,23 @@ int RGWCloneMetaLogOp::state_sent_rest_request()
     marker = entry.id;
   }
 
-  state = StoringMDLogEntries;
-
   AioCompletionNotifier *cn = ops_mgr->create_completion_notifier(this);
 
-  ret = store->meta_mgr->store_md_log_entries(dest_entries, shard_id, cn->completion());
+  int ret = store->meta_mgr->store_md_log_entries(dest_entries, shard_id, cn->completion());
   if (ret < 0) {
     cn->put();
-    state = Error;
     ldout(store->ctx(), 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl;
-    return ret;
+    return set_state(Error, ret);
   }
-  set_blocked(true);
-  return 0;
+  return yield(set_state(StoreMDLogEntriesComplete));
 }
 
-int RGWCloneMetaLogOp::state_storing_mdlog_entries()
+int RGWCloneMetaLogOp::state_store_mdlog_entries_complete()
 {
-  set_blocked(false);
   if (truncated) {
     return state_init();
-  } else {
-    state = Done;
   }
-
-  return 0;
+  return set_state(Done);
 }
 
 
index 65680c99abb480eaf2b9724a62b6ccc71b996334..084ec85062e1601ee0f7e760a6a42b3a8d83ea69 100644 (file)
@@ -26,6 +26,7 @@ class RGWAsyncOpsManager;
 class AioCompletionNotifier;
 
 class RGWAsyncOp {
+  friend class RGWAsyncOpsManager;
 protected:
   RGWAsyncOpsManager *ops_mgr;
 
@@ -34,6 +35,10 @@ protected:
   stringstream error_stream;
 
   void set_blocked(int flag) { blocked = flag; }
+  int yield(int ret) {
+    set_blocked(true);
+    return ret;
+  }
 
 public:
   RGWAsyncOp(RGWAsyncOpsManager *_ops_mgr) : ops_mgr(_ops_mgr), blocked(false) {}