]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: bucket sync fetches remote objects
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 18 Sep 2015 23:35:55 +0000 (16:35 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:12:52 +0000 (16:12 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_cr_rados.cc
src/rgw/rgw_cr_rados.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_rest_client.cc
src/rgw/rgw_rest_conn.cc

index 9a5828bbdf6b198c3a9f60403a37f7e132b9bcbc..c5a0d33d8f2940430e7955ddc338fe6143627422 100644 (file)
@@ -346,4 +346,67 @@ void RGWOmapAppend::finish() {
   set_sleeping(false);
 }
 
+int RGWAsyncGetBucketInstanceInfo::_send_request()
+{
+  string id = bucket_name + ":" + bucket_id;
+  RGWObjectCtx obj_ctx(store);
+
+  int r = store->get_bucket_instance_info(obj_ctx, id, *bucket_info, NULL, NULL);
+  if (r < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to get bucket instance info for bucket id=" << id << dendl;
+    return r;
+  }
+
+  return 0;
+}
+
+int RGWAsyncFetchRemoteObj::_send_request()
+{
+  RGWObjectCtx obj_ctx(store);
+
+  string user_id;
+  char buf[16];
+  snprintf(buf, sizeof(buf), ".%lld", (long long)store->instance_id());
+  string client_id = store->zone_id() + buf;
+  string op_id = store->unique_id(store->get_new_req_id());
+  map<string, bufferlist> attrs;
+
+  rgw_obj src_obj(bucket_info.bucket, obj_name);
+  src_obj.set_instance(obj_version_id);
+
+  rgw_obj dest_obj(src_obj);
+
+  int r = store->fetch_remote_obj(obj_ctx,
+                       user_id,
+                       client_id,
+                       op_id,
+                       NULL, /* req_info */
+                       source_zone,
+                       dest_obj,
+                       src_obj,
+                       bucket_info, /* dest */
+                       bucket_info, /* source */
+                       NULL, /* time_t *src_mtime, */
+                       NULL, /* time_t *mtime, */
+                       NULL, /* const time_t *mod_ptr, */
+                       NULL, /* const time_t *unmod_ptr, */
+                       NULL, /* const char *if_match, */
+                       NULL, /* const char *if_nomatch, */
+                       RGWRados::ATTRSMOD_NONE,
+                       copy_if_newer,
+                       attrs,
+                       RGW_OBJ_CATEGORY_MAIN,
+                       versioned_epoch,
+                       NULL, /* string *version_id, */
+                       NULL, /* string *ptag, */
+                       NULL, /* string *petag, */
+                       NULL, /* struct rgw_err *err, */
+                       NULL, /* void (*progress_cb)(off_t, void *), */
+                       NULL); /* void *progress_data*); */
+
+  if (r < 0) {
+    ldout(store->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
+  }
+  return r;
+}
 
index e84419236ffdf2b90b899b9fb351306fa9f422ad..5efdd7d238611f5c0577a4871c5f86b19a289562 100644 (file)
@@ -438,4 +438,128 @@ public:
   }
 };
 
+class RGWAsyncGetBucketInstanceInfo : public RGWAsyncRadosRequest {
+  RGWRados *store;
+  string bucket_name;
+  string bucket_id;
+  RGWBucketInfo *bucket_info;
+
+protected:
+  int _send_request();
+public:
+  RGWAsyncGetBucketInstanceInfo(RGWAioCompletionNotifier *cn, RGWRados *_store,
+                       const string& _bucket_name, const string& _bucket_id,
+                        RGWBucketInfo *_bucket_info) : RGWAsyncRadosRequest(cn), store(_store),
+                                                       bucket_name(_bucket_name), bucket_id(_bucket_id),
+                                                       bucket_info(_bucket_info) {}
+};
+
+class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine {
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+  string bucket_name;
+  string bucket_id;
+  RGWBucketInfo *bucket_info;
+
+  RGWAsyncGetBucketInstanceInfo *req;
+  
+public:
+  RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                       const string& _bucket_name, const string& _bucket_id,
+                        RGWBucketInfo *_bucket_info) : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
+                                                       bucket_name(_bucket_name), bucket_id(_bucket_id),
+                                                       bucket_info(_bucket_info), req(NULL) {}
+  ~RGWGetBucketInstanceInfoCR() {
+    delete req;
+  }
+
+  int send_request() {
+    req = new RGWAsyncGetBucketInstanceInfo(stack->create_completion_notifier(), store, bucket_name, bucket_id, bucket_info);
+    async_rados->queue(req);
+    return 0;
+  }
+  int request_complete() {
+    return req->get_ret_status();
+  }
+};
+
+class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
+  RGWRados *store;
+  string source_zone;
+
+  RGWBucketInfo bucket_info;
+
+  string obj_name;
+  string obj_version_id;
+  uint64_t versioned_epoch;
+
+  time_t src_mtime;
+
+  bool copy_if_newer;
+
+protected:
+  int _send_request();
+public:
+  RGWAsyncFetchRemoteObj(RGWAioCompletionNotifier *cn, RGWRados *_store,
+                         const string& _source_zone,
+                         RGWBucketInfo& _bucket_info,
+                         const string& _obj_name, const string& _version_id,
+                         uint64_t _versioned_epoch,
+                         bool _if_newer) : RGWAsyncRadosRequest(cn), store(_store),
+                                                      source_zone(_source_zone),
+                                                      bucket_info(_bucket_info),
+                                                      obj_name(_obj_name), obj_version_id(_version_id),
+                                                      versioned_epoch(_versioned_epoch),
+                                                      copy_if_newer(_if_newer) {}
+};
+
+class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
+  CephContext *cct;
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+  string source_zone;
+
+  RGWBucketInfo bucket_info;
+
+  string obj_name;
+  string obj_version_id;
+  uint64_t versioned_epoch;
+
+  time_t src_mtime;
+
+  bool copy_if_newer;
+
+  RGWAsyncFetchRemoteObj *req;
+
+public:
+  RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+                      const string& _source_zone,
+                      RGWBucketInfo& _bucket_info,
+                      const string& _obj_name, const string& _version_id,
+                      uint64_t _versioned_epoch,
+                      bool _if_newer) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
+                                       async_rados(_async_rados), store(_store),
+                                       source_zone(_source_zone),
+                                       bucket_info(_bucket_info),
+                                       obj_name(_obj_name), obj_version_id(_version_id),
+                                       versioned_epoch(_versioned_epoch),
+                                       copy_if_newer(_if_newer), req(NULL) {}
+
+
+  ~RGWFetchRemoteObjCR() {
+    delete req;
+  }
+
+  int send_request() {
+    req = new RGWAsyncFetchRemoteObj(stack->create_completion_notifier(), store, source_zone, bucket_info,
+                                     obj_name, obj_version_id, versioned_epoch, copy_if_newer);
+    async_rados->queue(req);
+    return 0;
+  }
+
+  int request_complete() {
+    return req->get_ret_status();
+  }
+};
+
 #endif
index 85ef2ea5f2b7840840ef1503981d4623fb55e500..249a0e8650664d7f75b5c41af1400705f041d355 100644 (file)
@@ -885,8 +885,10 @@ class RGWRunBucketSyncCoroutine : public RGWCoroutine {
   string source_zone;
   string bucket_name;
   string bucket_id;
+  RGWBucketInfo bucket_info;
   int shard_id;
   bucket_list_result list_result;
+  list<bucket_list_entry>::iterator entries_iter;
   rgw_bucket_shard_sync_info sync_status;
 
 public:
@@ -899,6 +901,7 @@ public:
                                                                            obj_ctx(_obj_ctx), source_zone(_source_zone),
                                                                             bucket_name(_bucket_name),
                                                                            bucket_id(_bucket_id), shard_id(_shard_id) {}
+
   int operate();
 };
 
@@ -919,6 +922,19 @@ int RGWRunBucketSyncCoroutine::operate()
       return set_state(RGWCoroutine_Error, retcode);
     }
 
+    yield {
+      int r = call(new RGWGetBucketInstanceInfoCR(async_rados, store, bucket_name, bucket_id, &bucket_info));
+      if (r < 0) {
+        ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status" << dendl;
+        return r;
+      }
+    }
+
+    if (retcode < 0) {
+      ldout(store->ctx(), 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_name << " bucket_id=" << bucket_id << dendl;
+      return set_state(RGWCoroutine_Error, retcode);
+    }
+
     if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
       do {
         yield {
@@ -933,6 +949,24 @@ int RGWRunBucketSyncCoroutine::operate()
         if (retcode < 0 && retcode != -ENOENT) {
           return set_state(RGWCoroutine_Error, retcode);
         }
+        entries_iter = list_result.entries.begin();
+        for (; entries_iter != list_result.entries.end(); ++entries_iter) {
+          ldout(store->ctx(), 20) << "[full sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << " [" << entries_iter->version_id << "]" << dendl;
+          yield {
+            bucket_list_entry& entry = *entries_iter;
+            int r = call(new RGWFetchRemoteObjCR(async_rados, store, source_zone, bucket_info,
+                                                 entry.key, entry.version_id, entry.versioned_epoch,
+                                                 true));
+            if (r < 0) {
+              ldout(store->ctx(), 0) << "ERROR: failed to call RGWFetchRemoteObjCR()" << dendl;
+              return r;
+            }
+          }
+          if (retcode < 0 && retcode != -ENOENT) {
+            ldout(store->ctx(), 0) << "ERROR: failed to sync object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << " [" << entries_iter->version_id << "]" << dendl;
+            return set_state(RGWCoroutine_Error, retcode);
+          }
+        }
       } while (list_result.is_truncated);
     }
   }
index 3fe90b7bb9440584485c162958c06a10ffd60478..91752558e8b4c398e8dfcf6f6a0824a71e90fd39 100644 (file)
@@ -4927,7 +4927,9 @@ public:
                                                                        progress_cb(_progress_cb),
                                                                        progress_data(_progress_data) {}
   int handle_data(bufferlist& bl, off_t ofs, off_t len) {
-    progress_cb(ofs, progress_data);
+    if (progress_cb) {
+      progress_cb(ofs, progress_data);
+    }
 
     bool again;
 
@@ -5079,7 +5081,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
   append_rand_alpha(cct, tag, tag, 32);
 
   RGWPutObjProcessor_Atomic processor(obj_ctx,
-                                      dest_bucket_info, dest_obj.bucket, dest_obj.get_object(),
+                                      dest_bucket_info, dest_obj.bucket, dest_obj.get_orig_obj(),
                                       cct->_conf->rgw_obj_stripe_size, tag, dest_bucket_info.versioning_enabled());
   int ret = processor.prepare(this, NULL);
   if (ret < 0) {
index f7b29cf4418d675c4dc6cf91fdc590585048522b..83ac7ef0700c5cb3c7212e5671f20ea6a1aa84da 100644 (file)
@@ -2568,6 +2568,10 @@ public:
   int check_quota(const rgw_user& bucket_owner, rgw_bucket& bucket,
                   RGWQuotaInfo& user_quota, RGWQuotaInfo& bucket_quota, uint64_t obj_size);
 
+  uint64_t instance_id();
+  const string& zone_id() {
+    return zone.get_id();
+  }
   string unique_id(uint64_t unique_num) {
     char buf[32];
     snprintf(buf, sizeof(buf), ".%llu.%llu", (unsigned long long)instance_id(), (unsigned long long)unique_num);
@@ -2684,7 +2688,6 @@ public:
   int pool_iterate(RGWPoolIterCtx& ctx, uint32_t num, vector<RGWObjEnt>& objs,
                    bool *is_truncated, RGWAccessListFilter *filter);
 
-  uint64_t instance_id();
   uint64_t next_bucket_id();
 };
 
index 8ec7b271cad74f93f220052dca10adb74421bc6b..b161e5d5a1f843bebf7c5d40c9b85c3b06ca3c8e 100644 (file)
@@ -543,7 +543,7 @@ int RGWRESTStreamRWRequest::get_obj(RGWAccessKey& key, map<string, string>& extr
 {
   string urlsafe_bucket, urlsafe_object;
   url_encode(obj.bucket.name, urlsafe_bucket);
-  url_encode(obj.get_object(), urlsafe_object);
+  url_encode(obj.get_orig_obj(), urlsafe_object);
   string resource = urlsafe_bucket + "/" + urlsafe_object;
 
   return get_resource(key, extra_headers, resource);
index 54259347ec976983aef112a49322210df505b216..0738992f7465ca7858bae6f55d4a07695393b37e 100644 (file)
@@ -116,9 +116,10 @@ int RGWRESTConn::get_obj(const rgw_user& uid, req_info *info /* optional */, rgw
   if (ret < 0)
     return ret;
 
-  string uid_str = uid.to_str();
   list<pair<string, string> > params;
-  params.push_back(pair<string, string>(RGW_SYS_PARAM_PREFIX "uid", uid_str));
+  if (!uid.empty()) {
+    params.push_back(pair<string, string>(RGW_SYS_PARAM_PREFIX "uid", uid.to_str()));
+  }
   params.push_back(pair<string, string>(RGW_SYS_PARAM_PREFIX "region", zone_group));
   if (prepend_metadata) {
     params.push_back(pair<string, string>(RGW_SYS_PARAM_PREFIX "prepend-metadata", zone_group));