]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: revive io window
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 5 Oct 2015 16:31:53 +0000 (09:31 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:12:56 +0000 (16:12 -0800)
and several other fixes. Don't send unbounded number of IO requests,
send back http related errors (that didn't have a specific http status).

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_coroutine.cc
src/rgw/rgw_data_sync.cc
src/rgw/rgw_http_client.cc
src/rgw/rgw_sync.cc

index 0d9977ebd1700e3904fc626d87468868a9c8a102..ddebd6646c07bddebe87063d1103be124c09780d 100644 (file)
@@ -365,6 +365,14 @@ int RGWCoroutinesManager::run(list<RGWCoroutinesStack *>& stacks)
       handle_unblocked_stack(stacks, blocked_stack, &blocked_count);
     }
 
+    while (blocked_count >= ops_window) {
+      int ret = completion_mgr.get_next((void **)&blocked_stack);
+      if (ret < 0) {
+       ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl;
+      }
+      handle_unblocked_stack(stacks, blocked_stack, &blocked_count);
+    }
+
     ++iter;
     stacks.pop_front();
 
index 66cee6bbf8ff648ef5604b14ab21944f76f5ac5a..c1ffde2a9720216d5e7f79df45e5b31314f8b08d 100644 (file)
@@ -1640,12 +1640,14 @@ int RGWBucketShardFullSyncCR::operate()
   int ret;
   reenter(this) {
     list_marker = full_marker.position;
+ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl;
     marker_tracker = new RGWBucketFullSyncShardMarkerTrack(store, async_rados, 
                                                            RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id),
                                                            full_marker);
     do {
       yield {
         ldout(store->ctx(), 20) << __func__ << "(): listing bucket for full sync" << dendl;
+ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << " list_marker=" << list_marker << dendl;
         int r = call(new RGWListBucketShardCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id,
                                               list_marker, &list_result));
         if (r < 0) {
@@ -1656,6 +1658,7 @@ int RGWBucketShardFullSyncCR::operate()
       if (retcode < 0 && retcode != -ENOENT) {
         return set_state(RGWCoroutine_Error, retcode);
       }
+ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << " list_result.size()=" << list_result.entries.size() << dendl;
       entries_iter = list_result.entries.begin();
       for (; entries_iter != list_result.entries.end(); ++entries_iter) {
         ldout(store->ctx(), 20) << "[full sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << dendl;
@@ -1750,6 +1753,7 @@ int RGWBucketShardIncrementalSyncCR::operate()
                                                           inc_marker);
     do {
       yield {
+ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << " inc_marker.position=" << inc_marker.position << dendl;
         ldout(store->ctx(), 20) << __func__ << "(): listing bilog for incremental sync" << dendl;
         int r = call(new RGWListBucketIndexLogCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id,
                                               inc_marker.position, &list_result));
@@ -1810,6 +1814,7 @@ int RGWRunBucketSyncCoroutine::operate()
       }
     }
 
+ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << " shard_id=" << shard_id << " sync_status.inc_marker=" << sync_status.inc_marker.position << dendl;
     if (retcode < 0 && retcode != -ENOENT) {
       ldout(store->ctx(), 0) << "ERROR: failed to read sync status for bucket=" << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << dendl;
       return set_state(RGWCoroutine_Error, retcode);
@@ -1828,6 +1833,23 @@ int RGWRunBucketSyncCoroutine::operate()
       return set_state(RGWCoroutine_Error, retcode);
     }
 
+    yield {
+      if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
+        int r = call(new RGWInitBucketShardSyncStatusCoroutine(async_rados, store, http_manager, obj_ctx, source_zone,
+                                                               conn, bucket_name, bucket_id, shard_id));
+        if (r < 0) {
+          ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status" << dendl;
+          return r;
+        }
+      }
+
+      sync_status.state = rgw_bucket_shard_sync_info::StateFullSync;
+    }
+
+    if (retcode < 0) {
+      ldout(store->ctx(), 0) << "ERROR: full sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl;
+      return set_state(RGWCoroutine_Error, retcode);
+    }
     yield {
       if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
         int r = call(new RGWBucketShardFullSyncCR(http_manager, async_rados, conn, store,
@@ -1838,6 +1860,7 @@ int RGWRunBucketSyncCoroutine::operate()
           return r;
         }
       }
+      sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
     }
 
     if (retcode < 0) {
@@ -1846,6 +1869,7 @@ int RGWRunBucketSyncCoroutine::operate()
     }
 
     yield {
+ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << " sync_status.inc_marker=" << sync_status.inc_marker.position << dendl;
       if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
         int r = call(new RGWBucketShardIncrementalSyncCR(http_manager, async_rados, conn, store,
                                                          source_zone, bucket_name, bucket_id, shard_id,
index f554ac8daf7eff4fb7d2693e563b1f8ada21bb70..0ad9f1cf93368e1be95602c12c92c556bc68420d 100644 (file)
@@ -584,12 +584,16 @@ void *RGWHTTPManager::reqs_thread_entry()
        curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
 
        int status = rgw_http_error_to_errno(http_status);
+        if (result != CURLE_OK && status == 0) {
+          status = -EAGAIN;
+        }
+        int id = req_data->id;
        finish_request(req_data, status);
         switch (result) {
           case CURLE_OK:
             break;
           default:
-            dout(20) << "ERROR: msg->data.result=" << result << dendl;
+            dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl;
            break;
         }
       }
index 7e632cafa06040c9b72ea344a11f03b3174e2293..b722fe55e41a67ea6176cc48bec4bcc9b7291579 100644 (file)
@@ -685,7 +685,7 @@ protected:
   int _send_request() {
     int ret = store->meta_mgr->put(raw_key, bl, RGWMetadataHandler::APPLY_ALWAYS);
     if (ret < 0) {
-      ldout(store->ctx(), 0) << "ERROR: can't put key: ret=" << ret << dendl;
+      ldout(store->ctx(), 0) << "ERROR: can't store key: " << raw_key << " ret=" << ret << dendl;
       return ret;
     }
     return 0;