From 3dff6252e2758de18e12c56cf6757da1cfdb9595 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 5 Oct 2015 09:31:53 -0700 Subject: [PATCH] rgw: revive io window and several other fixes. Don't send unbounded number of IO requests, send back http related errors (that didn't have a specific http status). Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_coroutine.cc | 8 ++++++++ src/rgw/rgw_data_sync.cc | 24 ++++++++++++++++++++++++ src/rgw/rgw_http_client.cc | 6 +++++- src/rgw/rgw_sync.cc | 2 +- 4 files changed, 38 insertions(+), 2 deletions(-) diff --git a/src/rgw/rgw_coroutine.cc b/src/rgw/rgw_coroutine.cc index 0d9977ebd1700..ddebd6646c07b 100644 --- a/src/rgw/rgw_coroutine.cc +++ b/src/rgw/rgw_coroutine.cc @@ -365,6 +365,14 @@ int RGWCoroutinesManager::run(list& stacks) handle_unblocked_stack(stacks, blocked_stack, &blocked_count); } + while (blocked_count >= ops_window) { + int ret = completion_mgr.get_next((void **)&blocked_stack); + if (ret < 0) { + ldout(cct, 0) << "ERROR: failed to clone shard, completion_mgr.get_next() returned ret=" << ret << dendl; + } + handle_unblocked_stack(stacks, blocked_stack, &blocked_count); + } + ++iter; stacks.pop_front(); diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 66cee6bbf8ff6..c1ffde2a97202 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1640,12 +1640,14 @@ int RGWBucketShardFullSyncCR::operate() int ret; reenter(this) { list_marker = full_marker.position; +ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << dendl; marker_tracker = new RGWBucketFullSyncShardMarkerTrack(store, async_rados, RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id), full_marker); do { yield { ldout(store->ctx(), 20) << __func__ << "(): listing bucket for full sync" << dendl; +ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << " list_marker=" << list_marker << dendl; int r = call(new RGWListBucketShardCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id, list_marker, &list_result)); if (r < 0) { @@ -1656,6 +1658,7 @@ int RGWBucketShardFullSyncCR::operate() if (retcode < 0 && retcode != -ENOENT) { return set_state(RGWCoroutine_Error, retcode); } +ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << " list_result.size()=" << list_result.entries.size() << dendl; entries_iter = list_result.entries.begin(); for (; entries_iter != list_result.entries.end(); ++entries_iter) { ldout(store->ctx(), 20) << "[full sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << dendl; @@ -1750,6 +1753,7 @@ int RGWBucketShardIncrementalSyncCR::operate() inc_marker); do { yield { +ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << " inc_marker.position=" << inc_marker.position << dendl; ldout(store->ctx(), 20) << __func__ << "(): listing bilog for incremental sync" << dendl; int r = call(new RGWListBucketIndexLogCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id, inc_marker.position, &list_result)); @@ -1810,6 +1814,7 @@ int RGWRunBucketSyncCoroutine::operate() } } +ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << " shard_id=" << shard_id << " sync_status.inc_marker=" << sync_status.inc_marker.position << dendl; if (retcode < 0 && retcode != -ENOENT) { ldout(store->ctx(), 0) << "ERROR: failed to read sync status for bucket=" << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << dendl; return set_state(RGWCoroutine_Error, retcode); @@ -1828,6 +1833,23 @@ int RGWRunBucketSyncCoroutine::operate() return set_state(RGWCoroutine_Error, retcode); } + yield { + if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateInit) { + int r = call(new RGWInitBucketShardSyncStatusCoroutine(async_rados, store, http_manager, obj_ctx, source_zone, + conn, bucket_name, bucket_id, shard_id)); + if (r < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status" << dendl; + return r; + } + } + + sync_status.state = rgw_bucket_shard_sync_info::StateFullSync; + } + + if (retcode < 0) { + ldout(store->ctx(), 0) << "ERROR: full sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl; + return set_state(RGWCoroutine_Error, retcode); + } yield { if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) { int r = call(new RGWBucketShardFullSyncCR(http_manager, async_rados, conn, store, @@ -1838,6 +1860,7 @@ int RGWRunBucketSyncCoroutine::operate() return r; } } + sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync; } if (retcode < 0) { @@ -1846,6 +1869,7 @@ int RGWRunBucketSyncCoroutine::operate() } yield { +ldout(store->ctx(), 0) << __FILE__ << ":" << __LINE__ << " sync_status.inc_marker=" << sync_status.inc_marker.position << dendl; if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) { int r = call(new RGWBucketShardIncrementalSyncCR(http_manager, async_rados, conn, store, source_zone, bucket_name, bucket_id, shard_id, diff --git a/src/rgw/rgw_http_client.cc b/src/rgw/rgw_http_client.cc index f554ac8daf7ef..0ad9f1cf93368 100644 --- a/src/rgw/rgw_http_client.cc +++ b/src/rgw/rgw_http_client.cc @@ -584,12 +584,16 @@ void *RGWHTTPManager::reqs_thread_entry() curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status); int status = rgw_http_error_to_errno(http_status); + if (result != CURLE_OK && status == 0) { + status = -EAGAIN; + } + int id = req_data->id; finish_request(req_data, status); switch (result) { case CURLE_OK: break; default: - dout(20) << "ERROR: msg->data.result=" << result << dendl; + dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl; break; } } diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index 7e632cafa0604..b722fe55e41a6 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -685,7 +685,7 @@ protected: int _send_request() { int ret = store->meta_mgr->put(raw_key, bl, RGWMetadataHandler::APPLY_ALWAYS); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: can't put key: ret=" << ret << dendl; + ldout(store->ctx(), 0) << "ERROR: can't store key: " << raw_key << " ret=" << ret << dendl; return ret; } return 0; -- 2.39.5