From 8659ff197d3baded954648695545c9a79fefaafd Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 18 Sep 2015 16:35:55 -0700 Subject: [PATCH] rgw: bucket sync fetches remote objects Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_cr_rados.cc | 63 +++++++++++++++++++ src/rgw/rgw_cr_rados.h | 124 +++++++++++++++++++++++++++++++++++++ src/rgw/rgw_data_sync.cc | 34 ++++++++++ src/rgw/rgw_rados.cc | 6 +- src/rgw/rgw_rados.h | 5 +- src/rgw/rgw_rest_client.cc | 2 +- src/rgw/rgw_rest_conn.cc | 5 +- 7 files changed, 233 insertions(+), 6 deletions(-) diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc index 9a5828bbdf6b1..c5a0d33d8f294 100644 --- a/src/rgw/rgw_cr_rados.cc +++ b/src/rgw/rgw_cr_rados.cc @@ -346,4 +346,67 @@ void RGWOmapAppend::finish() { set_sleeping(false); } +int RGWAsyncGetBucketInstanceInfo::_send_request() +{ + string id = bucket_name + ":" + bucket_id; + RGWObjectCtx obj_ctx(store); + + int r = store->get_bucket_instance_info(obj_ctx, id, *bucket_info, NULL, NULL); + if (r < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to get bucket instance info for bucket id=" << id << dendl; + return r; + } + + return 0; +} + +int RGWAsyncFetchRemoteObj::_send_request() +{ + RGWObjectCtx obj_ctx(store); + + string user_id; + char buf[16]; + snprintf(buf, sizeof(buf), ".%lld", (long long)store->instance_id()); + string client_id = store->zone_id() + buf; + string op_id = store->unique_id(store->get_new_req_id()); + map attrs; + + rgw_obj src_obj(bucket_info.bucket, obj_name); + src_obj.set_instance(obj_version_id); + + rgw_obj dest_obj(src_obj); + + int r = store->fetch_remote_obj(obj_ctx, + user_id, + client_id, + op_id, + NULL, /* req_info */ + source_zone, + dest_obj, + src_obj, + bucket_info, /* dest */ + bucket_info, /* source */ + NULL, /* time_t *src_mtime, */ + NULL, /* time_t *mtime, */ + NULL, /* const time_t *mod_ptr, */ + NULL, /* const time_t *unmod_ptr, */ + NULL, /* const char *if_match, */ + NULL, /* const char *if_nomatch, */ + RGWRados::ATTRSMOD_NONE, + copy_if_newer, + attrs, + RGW_OBJ_CATEGORY_MAIN, + versioned_epoch, + NULL, /* string *version_id, */ + NULL, /* string *ptag, */ + NULL, /* string *petag, */ + NULL, /* struct rgw_err *err, */ + NULL, /* void (*progress_cb)(off_t, void *), */ + NULL); /* void *progress_data*); */ + + if (r < 0) { + ldout(store->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r << dendl; + } + return r; +} diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index e84419236ffdf..5efdd7d238611 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -438,4 +438,128 @@ public: } }; +class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest { + RGWRados *store; + string bucket_name; + string bucket_id; + RGWBucketInfo *bucket_info; + +protected: + int _send_request(); +public: + RGWAsyncGetBucketInstanceInfo(RGWAioCompletionNotifier *cn, RGWRados *_store, + const string& _bucket_name, const string& _bucket_id, + RGWBucketInfo *_bucket_info) : RGWAsyncRadosRequest(cn), store(_store), + bucket_name(_bucket_name), bucket_id(_bucket_id), + bucket_info(_bucket_info) {} +}; + +class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + string bucket_name; + string bucket_id; + RGWBucketInfo *bucket_info; + + RGWAsyncGetBucketInstanceInfo *req; + +public: + RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + const string& _bucket_name, const string& _bucket_id, + RGWBucketInfo *_bucket_info) : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), + bucket_name(_bucket_name), bucket_id(_bucket_id), + bucket_info(_bucket_info), req(NULL) {} + ~RGWGetBucketInstanceInfoCR() { + delete req; + } + + int send_request() { + req = new RGWAsyncGetBucketInstanceInfo(stack->create_completion_notifier(), store, bucket_name, bucket_id, bucket_info); + async_rados->queue(req); + return 0; + } + int request_complete() { + return req->get_ret_status(); + } +}; + +class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest { + RGWRados *store; + string source_zone; + + RGWBucketInfo bucket_info; + + string obj_name; + string obj_version_id; + uint64_t versioned_epoch; + + time_t src_mtime; + + bool copy_if_newer; + +protected: + int _send_request(); +public: + RGWAsyncFetchRemoteObj(RGWAioCompletionNotifier *cn, RGWRados *_store, + const string& _source_zone, + RGWBucketInfo& _bucket_info, + const string& _obj_name, const string& _version_id, + uint64_t _versioned_epoch, + bool _if_newer) : RGWAsyncRadosRequest(cn), store(_store), + source_zone(_source_zone), + bucket_info(_bucket_info), + obj_name(_obj_name), obj_version_id(_version_id), + versioned_epoch(_versioned_epoch), + copy_if_newer(_if_newer) {} +}; + +class RGWFetchRemoteObjCR : public RGWSimpleCoroutine { + CephContext *cct; + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + string source_zone; + + RGWBucketInfo bucket_info; + + string obj_name; + string obj_version_id; + uint64_t versioned_epoch; + + time_t src_mtime; + + bool copy_if_newer; + + RGWAsyncFetchRemoteObj *req; + +public: + RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + const string& _source_zone, + RGWBucketInfo& _bucket_info, + const string& _obj_name, const string& _version_id, + uint64_t _versioned_epoch, + bool _if_newer) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()), + async_rados(_async_rados), store(_store), + source_zone(_source_zone), + bucket_info(_bucket_info), + obj_name(_obj_name), obj_version_id(_version_id), + versioned_epoch(_versioned_epoch), + copy_if_newer(_if_newer), req(NULL) {} + + + ~RGWFetchRemoteObjCR() { + delete req; + } + + int send_request() { + req = new RGWAsyncFetchRemoteObj(stack->create_completion_notifier(), store, source_zone, bucket_info, + obj_name, obj_version_id, versioned_epoch, copy_if_newer); + async_rados->queue(req); + return 0; + } + + int request_complete() { + return req->get_ret_status(); + } +}; + #endif diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 85ef2ea5f2b78..249a0e8650664 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -885,8 +885,10 @@ class RGWRunBucketSyncCoroutine : public RGWCoroutine { string source_zone; string bucket_name; string bucket_id; + RGWBucketInfo bucket_info; int shard_id; bucket_list_result list_result; + list::iterator entries_iter; rgw_bucket_shard_sync_info sync_status; public: @@ -899,6 +901,7 @@ public: obj_ctx(_obj_ctx), source_zone(_source_zone), bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id) {} + int operate(); }; @@ -919,6 +922,19 @@ int RGWRunBucketSyncCoroutine::operate() return set_state(RGWCoroutine_Error, retcode); } + yield { + int r = call(new RGWGetBucketInstanceInfoCR(async_rados, store, bucket_name, bucket_id, &bucket_info)); + if (r < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status" << dendl; + return r; + } + } + + if (retcode < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_name << " bucket_id=" << bucket_id << dendl; + return set_state(RGWCoroutine_Error, retcode); + } + if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) { do { yield { @@ -933,6 +949,24 @@ int RGWRunBucketSyncCoroutine::operate() if (retcode < 0 && retcode != -ENOENT) { return set_state(RGWCoroutine_Error, retcode); } + entries_iter = list_result.entries.begin(); + for (; entries_iter != list_result.entries.end(); ++entries_iter) { + ldout(store->ctx(), 20) << "[full sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << " [" << entries_iter->version_id << "]" << dendl; + yield { + bucket_list_entry& entry = *entries_iter; + int r = call(new RGWFetchRemoteObjCR(async_rados, store, source_zone, bucket_info, + entry.key, entry.version_id, entry.versioned_epoch, + true)); + if (r < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to call RGWFetchRemoteObjCR()" << dendl; + return r; + } + } + if (retcode < 0 && retcode != -ENOENT) { + ldout(store->ctx(), 0) << "ERROR: failed to sync object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << " [" << entries_iter->version_id << "]" << dendl; + return set_state(RGWCoroutine_Error, retcode); + } + } } while (list_result.is_truncated); } } diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 3fe90b7bb9440..91752558e8b4c 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -4927,7 +4927,9 @@ public: progress_cb(_progress_cb), progress_data(_progress_data) {} int handle_data(bufferlist& bl, off_t ofs, off_t len) { - progress_cb(ofs, progress_data); + if (progress_cb) { + progress_cb(ofs, progress_data); + } bool again; @@ -5079,7 +5081,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx, append_rand_alpha(cct, tag, tag, 32); RGWPutObjProcessor_Atomic processor(obj_ctx, - dest_bucket_info, dest_obj.bucket, dest_obj.get_object(), + dest_bucket_info, dest_obj.bucket, dest_obj.get_orig_obj(), cct->_conf->rgw_obj_stripe_size, tag, dest_bucket_info.versioning_enabled()); int ret = processor.prepare(this, NULL); if (ret < 0) { diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index f7b29cf4418d6..83ac7ef0700c5 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -2568,6 +2568,10 @@ public: int check_quota(const rgw_user& bucket_owner, rgw_bucket& bucket, RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota, uint64_t obj_size); + uint64_t instance_id(); + const string& zone_id() { + return zone.get_id(); + } string unique_id(uint64_t unique_num) { char buf[32]; snprintf(buf, sizeof(buf), ".%llu.%llu", (unsigned long long)instance_id(), (unsigned long long)unique_num); @@ -2684,7 +2688,6 @@ public: int pool_iterate(RGWPoolIterCtx& ctx, uint32_t num, vector& objs, bool *is_truncated, RGWAccessListFilter *filter); - uint64_t instance_id(); uint64_t next_bucket_id(); }; diff --git a/src/rgw/rgw_rest_client.cc b/src/rgw/rgw_rest_client.cc index 8ec7b271cad74..b161e5d5a1f84 100644 --- a/src/rgw/rgw_rest_client.cc +++ b/src/rgw/rgw_rest_client.cc @@ -543,7 +543,7 @@ int RGWRESTStreamRWRequest::get_obj(RGWAccessKey& key, map& extr { string urlsafe_bucket, urlsafe_object; url_encode(obj.bucket.name, urlsafe_bucket); - url_encode(obj.get_object(), urlsafe_object); + url_encode(obj.get_orig_obj(), urlsafe_object); string resource = urlsafe_bucket + "/" + urlsafe_object; return get_resource(key, extra_headers, resource); diff --git a/src/rgw/rgw_rest_conn.cc b/src/rgw/rgw_rest_conn.cc index 54259347ec976..0738992f7465c 100644 --- a/src/rgw/rgw_rest_conn.cc +++ b/src/rgw/rgw_rest_conn.cc @@ -116,9 +116,10 @@ int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, rgw if (ret < 0) return ret; - string uid_str = uid.to_str(); list > params; - params.push_back(pair(RGW_SYS_PARAM_PREFIX "uid", uid_str)); + if (!uid.empty()) { + params.push_back(pair(RGW_SYS_PARAM_PREFIX "uid", uid.to_str())); + } params.push_back(pair(RGW_SYS_PARAM_PREFIX "region", zone_group)); if (prepend_metadata) { params.push_back(pair(RGW_SYS_PARAM_PREFIX "prepend-metadata", zone_group)); -- 2.39.5