set_sleeping(false);
}
+int RGWAsyncGetBucketInstanceInfo::_send_request()
+{
+ string id = bucket_name + ":" + bucket_id;
+ RGWObjectCtx obj_ctx(store);
+
+ int r = store->get_bucket_instance_info(obj_ctx, id, *bucket_info, NULL, NULL);
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to get bucket instance info for bucket id=" << id << dendl;
+ return r;
+ }
+
+ return 0;
+}
+
+int RGWAsyncFetchRemoteObj::_send_request()
+{
+ RGWObjectCtx obj_ctx(store);
+
+ string user_id;
+ char buf[16];
+ snprintf(buf, sizeof(buf), ".%lld", (long long)store->instance_id());
+ string client_id = store->zone_id() + buf;
+ string op_id = store->unique_id(store->get_new_req_id());
+ map<string, bufferlist> attrs;
+
+ rgw_obj src_obj(bucket_info.bucket, obj_name);
+ src_obj.set_instance(obj_version_id);
+
+ rgw_obj dest_obj(src_obj);
+
+ int r = store->fetch_remote_obj(obj_ctx,
+ user_id,
+ client_id,
+ op_id,
+ NULL, /* req_info */
+ source_zone,
+ dest_obj,
+ src_obj,
+ bucket_info, /* dest */
+ bucket_info, /* source */
+ NULL, /* time_t *src_mtime, */
+ NULL, /* time_t *mtime, */
+ NULL, /* const time_t *mod_ptr, */
+ NULL, /* const time_t *unmod_ptr, */
+ NULL, /* const char *if_match, */
+ NULL, /* const char *if_nomatch, */
+ RGWRados::ATTRSMOD_NONE,
+ copy_if_newer,
+ attrs,
+ RGW_OBJ_CATEGORY_MAIN,
+ versioned_epoch,
+ NULL, /* string *version_id, */
+ NULL, /* string *ptag, */
+ NULL, /* string *petag, */
+ NULL, /* struct rgw_err *err, */
+ NULL, /* void (*progress_cb)(off_t, void *), */
+ NULL); /* void *progress_data*); */
+
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
+ }
+ return r;
+}
}
};
+class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ string bucket_name;
+ string bucket_id;
+ RGWBucketInfo *bucket_info;
+
+protected:
+ int _send_request();
+public:
+ RGWAsyncGetBucketInstanceInfo(RGWAioCompletionNotifier *cn, RGWRados *_store,
+ const string& _bucket_name, const string& _bucket_id,
+ RGWBucketInfo *_bucket_info) : RGWAsyncRadosRequest(cn), store(_store),
+ bucket_name(_bucket_name), bucket_id(_bucket_id),
+ bucket_info(_bucket_info) {}
+};
+
+class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine {
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ string bucket_name;
+ string bucket_id;
+ RGWBucketInfo *bucket_info;
+
+ RGWAsyncGetBucketInstanceInfo *req;
+
+public:
+ RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ const string& _bucket_name, const string& _bucket_id,
+ RGWBucketInfo *_bucket_info) : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
+ bucket_name(_bucket_name), bucket_id(_bucket_id),
+ bucket_info(_bucket_info), req(NULL) {}
+ ~RGWGetBucketInstanceInfoCR() {
+ delete req;
+ }
+
+ int send_request() {
+ req = new RGWAsyncGetBucketInstanceInfo(stack->create_completion_notifier(), store, bucket_name, bucket_id, bucket_info);
+ async_rados->queue(req);
+ return 0;
+ }
+ int request_complete() {
+ return req->get_ret_status();
+ }
+};
+
+class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
+ RGWRados *store;
+ string source_zone;
+
+ RGWBucketInfo bucket_info;
+
+ string obj_name;
+ string obj_version_id;
+ uint64_t versioned_epoch;
+
+ time_t src_mtime;
+
+ bool copy_if_newer;
+
+protected:
+ int _send_request();
+public:
+ RGWAsyncFetchRemoteObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
+ const string& _source_zone,
+ RGWBucketInfo& _bucket_info,
+ const string& _obj_name, const string& _version_id,
+ uint64_t _versioned_epoch,
+ bool _if_newer) : RGWAsyncRadosRequest(cn), store(_store),
+ source_zone(_source_zone),
+ bucket_info(_bucket_info),
+ obj_name(_obj_name), obj_version_id(_version_id),
+ versioned_epoch(_versioned_epoch),
+ copy_if_newer(_if_newer) {}
+};
+
+class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
+ CephContext *cct;
+ RGWAsyncRadosProcessor *async_rados;
+ RGWRados *store;
+ string source_zone;
+
+ RGWBucketInfo bucket_info;
+
+ string obj_name;
+ string obj_version_id;
+ uint64_t versioned_epoch;
+
+ time_t src_mtime;
+
+ bool copy_if_newer;
+
+ RGWAsyncFetchRemoteObj *req;
+
+public:
+ RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ const string& _source_zone,
+ RGWBucketInfo& _bucket_info,
+ const string& _obj_name, const string& _version_id,
+ uint64_t _versioned_epoch,
+ bool _if_newer) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
+ async_rados(_async_rados), store(_store),
+ source_zone(_source_zone),
+ bucket_info(_bucket_info),
+ obj_name(_obj_name), obj_version_id(_version_id),
+ versioned_epoch(_versioned_epoch),
+ copy_if_newer(_if_newer), req(NULL) {}
+
+
+ ~RGWFetchRemoteObjCR() {
+ delete req;
+ }
+
+ int send_request() {
+ req = new RGWAsyncFetchRemoteObj(stack->create_completion_notifier(), store, source_zone, bucket_info,
+ obj_name, obj_version_id, versioned_epoch, copy_if_newer);
+ async_rados->queue(req);
+ return 0;
+ }
+
+ int request_complete() {
+ return req->get_ret_status();
+ }
+};
+
#endif
string source_zone;
string bucket_name;
string bucket_id;
+ RGWBucketInfo bucket_info;
int shard_id;
bucket_list_result list_result;
+ list<bucket_list_entry>::iterator entries_iter;
rgw_bucket_shard_sync_info sync_status;
public:
obj_ctx(_obj_ctx), source_zone(_source_zone),
bucket_name(_bucket_name),
bucket_id(_bucket_id), shard_id(_shard_id) {}
+
int operate();
};
return set_state(RGWCoroutine_Error, retcode);
}
+ yield {
+ int r = call(new RGWGetBucketInstanceInfoCR(async_rados, store, bucket_name, bucket_id, &bucket_info));
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status" << dendl;
+ return r;
+ }
+ }
+
+ if (retcode < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_name << " bucket_id=" << bucket_id << dendl;
+ return set_state(RGWCoroutine_Error, retcode);
+ }
+
if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
do {
yield {
if (retcode < 0 && retcode != -ENOENT) {
return set_state(RGWCoroutine_Error, retcode);
}
+ entries_iter = list_result.entries.begin();
+ for (; entries_iter != list_result.entries.end(); ++entries_iter) {
+ ldout(store->ctx(), 20) << "[full sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << " [" << entries_iter->version_id << "]" << dendl;
+ yield {
+ bucket_list_entry& entry = *entries_iter;
+ int r = call(new RGWFetchRemoteObjCR(async_rados, store, source_zone, bucket_info,
+ entry.key, entry.version_id, entry.versioned_epoch,
+ true));
+ if (r < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to call RGWFetchRemoteObjCR()" << dendl;
+ return r;
+ }
+ }
+ if (retcode < 0 && retcode != -ENOENT) {
+ ldout(store->ctx(), 0) << "ERROR: failed to sync object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << " [" << entries_iter->version_id << "]" << dendl;
+ return set_state(RGWCoroutine_Error, retcode);
+ }
+ }
} while (list_result.is_truncated);
}
}
progress_cb(_progress_cb),
progress_data(_progress_data) {}
int handle_data(bufferlist& bl, off_t ofs, off_t len) {
- progress_cb(ofs, progress_data);
+ if (progress_cb) {
+ progress_cb(ofs, progress_data);
+ }
bool again;
append_rand_alpha(cct, tag, tag, 32);
RGWPutObjProcessor_Atomic processor(obj_ctx,
- dest_bucket_info, dest_obj.bucket, dest_obj.get_object(),
+ dest_bucket_info, dest_obj.bucket, dest_obj.get_orig_obj(),
cct->_conf->rgw_obj_stripe_size, tag, dest_bucket_info.versioning_enabled());
int ret = processor.prepare(this, NULL);
if (ret < 0) {
int check_quota(const rgw_user& bucket_owner, rgw_bucket& bucket,
RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota, uint64_t obj_size);
+ uint64_t instance_id();
+ const string& zone_id() {
+ return zone.get_id();
+ }
string unique_id(uint64_t unique_num) {
char buf[32];
snprintf(buf, sizeof(buf), ".%llu.%llu", (unsigned long long)instance_id(), (unsigned long long)unique_num);
int pool_iterate(RGWPoolIterCtx& ctx, uint32_t num, vector<RGWObjEnt>& objs,
bool *is_truncated, RGWAccessListFilter *filter);
- uint64_t instance_id();
uint64_t next_bucket_id();
};
{
string urlsafe_bucket, urlsafe_object;
url_encode(obj.bucket.name, urlsafe_bucket);
- url_encode(obj.get_object(), urlsafe_object);
+ url_encode(obj.get_orig_obj(), urlsafe_object);
string resource = urlsafe_bucket + "/" + urlsafe_object;
return get_resource(key, extra_headers, resource);
if (ret < 0)
return ret;
- string uid_str = uid.to_str();
list<pair<string, string> > params;
- params.push_back(pair<string, string>(RGW_SYS_PARAM_PREFIX "uid", uid_str));
+ if (!uid.empty()) {
+ params.push_back(pair<string, string>(RGW_SYS_PARAM_PREFIX "uid", uid.to_str()));
+ }
params.push_back(pair<string, string>(RGW_SYS_PARAM_PREFIX "region", zone_group));
if (prepend_metadata) {
params.push_back(pair<string, string>(RGW_SYS_PARAM_PREFIX "prepend-metadata", zone_group));