]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: user master region's version of bucket
authorYehuda Sadeh <yehuda@inktank.com>
Wed, 29 May 2013 15:11:59 +0000 (08:11 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Wed, 29 May 2013 15:11:59 +0000 (08:11 -0700)
When creating a bucket, retrieve the bucket object
version from the master region and use it.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
12 files changed:
src/rgw/rgw_bucket.cc
src/rgw/rgw_common.h
src/rgw/rgw_json_enc.cc
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h
src/rgw/rgw_rest_client.cc
src/rgw/rgw_rest_client.h
src/rgw/rgw_rest_conn.cc
src/rgw/rgw_rest_conn.h
src/rgw/rgw_rest_s3.cc

index 257bf960d1ddf8e2fa0eb58474836f84185d05fe..de0d7b6c0c9d335171cdf7388a7c648d0af1f396 100644 (file)
@@ -146,7 +146,7 @@ int RGWBucket::create_bucket(string bucket_str, string& user_id, string& region_
 
   rgw_bucket& bucket = bucket_info.bucket;
 
-  ret = store->create_bucket(user_id, bucket, region_name, attrs, objv_tracker);
+  ret = store->create_bucket(user_id, bucket, region_name, attrs, objv_tracker, NULL);
   if (ret && ret != -EEXIST)
     goto done;
 
index f7c0799bab425ce8aba0b6a712573cb9493cb5b9..219fb504f2c9a82ce83621c797d168c2ac0a046a 100644 (file)
@@ -390,8 +390,11 @@ public:
 };
 WRITE_CLASS_ENCODER(RGWUserCaps);
 
+void encode_json(const char *name, const obj_version& v, Formatter *f);
 void encode_json(const char *name, const RGWUserCaps& val, Formatter *f);
 
+void decode_json_obj(obj_version& v, JSONObj *obj);
+
 struct RGWUserInfo
 {
   uint64_t auid;
index 7b72e3c5ac523e17f6c9db66dd528486ff29f142..fa155043fd1bd7f47f74b334a06c141a5fc5dd9c 100644 (file)
@@ -9,11 +9,26 @@
 #include "common/ceph_json.h"
 #include "common/Formatter.h"
 
+void encode_json(const char *name, const obj_version& v, Formatter *f)
+{
+  f->open_object_section(name);
+  f->dump_string("tag", v.tag);
+  f->dump_unsigned("ver", v.ver);
+  f->close_section();
+}
+
+void decode_json_obj(obj_version& v, JSONObj *obj)
+{
+  JSONDecoder::decode_json("tag", v.tag, obj);
+  JSONDecoder::decode_json("ver", v.ver, obj);
+}
+
 void encode_json(const char *name, const RGWUserCaps& val, Formatter *f)
 {
   val.dump(f, name);
 }
 
+
 void RGWObjManifestPart::dump(Formatter *f) const
 {
   f->open_object_section("loc");
index 0e313ecbb33e009cf709676ba7bb23790f445fb3..f14d57b37f85498a34ec1db53a2192cc1ad7aa86 100644 (file)
@@ -8,6 +8,7 @@
 #include "common/armor.h"
 #include "common/mime.h"
 #include "common/utf8.h"
+#include "common/ceph_json.h"
 
 #include "rgw_rados.h"
 #include "rgw_op.h"
@@ -853,22 +854,17 @@ void RGWCreateBucket::execute()
   bool existed;
   int r;
   rgw_obj obj(store->zone.domain_root, s->bucket_name_str);
+  obj_version objv, *pobjv = NULL;
 
   ret = get_params();
   if (ret < 0)
     return;
 
-  if (!store->region.is_master) {
-    if (store->region.api_name != location_constraint) {
-      ldout(s->cct, 0) << "location constraint (" << location_constraint << ") doesn't match region" << " (" << store->region.api_name << ")" << dendl;
-      ret = -EINVAL;
-      return;
-    }
-
-    ldout(s->cct, 0) << "sending create_bucket request to master region" << dendl;
-    ret = store->rest_conn->forward(s->user.user_id, s->info, &in_data);
-    if (ret < 0)
-      return;
+  if (!store->region.is_master &&
+      store->region.api_name != location_constraint) {
+    ldout(s->cct, 0) << "location constraint (" << location_constraint << ") doesn't match region" << " (" << store->region.api_name << ")" << dendl;
+    ret = -EINVAL;
+    return;
   }
 
   s->bucket_owner.set_id(s->user.user_id);
@@ -880,6 +876,28 @@ void RGWCreateBucket::execute()
       return;
     }
   }
+
+  if (!store->region.is_master) {
+    ldout(s->cct, 0) << "sending create_bucket request to master region" << dendl;
+    bufferlist response;
+#define MAX_REST_RESPONSE (128 * 1024) // we expect a very small response
+    ret = store->rest_conn->forward(s->user.user_id, s->info, MAX_REST_RESPONSE, &in_data, &response);
+    if (ret < 0)
+      return;
+
+    ldout(s->cct, 20) << "response: " << response.c_str() << dendl;
+    JSONParser jp;
+    ret = jp.parse(response.c_str(), response.length());
+    if (ret < 0) {
+      ldout(s->cct, 0) << "failed parsing response from master region" << dendl;
+      return;
+    }
+    JSONDecoder::decode_json("object_ver", objv, &jp);
+    pobjv = &objv;
+
+    ldout(s->cct, 20) << "parsed: objv.tag=" << objv.tag << " objv.ver=" << objv.ver << dendl;
+  }
+
   string region_name;
 
   if (s->system_request) {
@@ -896,8 +914,7 @@ void RGWCreateBucket::execute()
   attrs[RGW_ATTR_ACL] = aclbl;
 
   s->bucket.name = s->bucket_name_str;
-  RGWObjVersionTracker objv_tracker;
-  ret = store->create_bucket(s->user.user_id, s->bucket, region_name, attrs, objv_tracker, true);
+  ret = store->create_bucket(s->user.user_id, s->bucket, region_name, attrs, objv_tracker, pobjv, true);
   /* continue if EEXIST and create_bucket will fail below.  this way we can recover
    * from a partial create by retrying it. */
   ldout(s->cct, 20) << "rgw_create_bucket returned ret=" << ret << " bucket=" << s->bucket << dendl;
index ec802d7893e26f2ef70c43ced4df324f7c919400..41b8511d9fd8e86600619b992f581cfee566c8a6 100644 (file)
@@ -233,6 +233,7 @@ protected:
   int ret;
   RGWAccessControlPolicy policy;
   string location_constraint;
+  RGWObjVersionTracker objv_tracker;
 
   bufferlist in_data;
 
index 88b4f4fc1c25f0c0a12c44fe704d50e1a3d4657a..72c8d33970bcaa01428a2a68fd6b7e87cefb5daa 100644 (file)
@@ -1346,57 +1346,77 @@ int RGWRados::create_bucket(string& owner, rgw_bucket& bucket,
                             const string& region_name,
                            map<std::string, bufferlist>& attrs,
                             RGWObjVersionTracker& objv_tracker,
+                            obj_version *pobjv,
                            bool exclusive)
 {
-  int ret = 0;
-  ret = select_bucket_placement(bucket.name, bucket);
-  if (ret < 0)
-    return ret;
-  librados::IoCtx index_ctx; // context for new bucket
+#define MAX_CREATE_RETRIES 20 /* need to bound retries */
+  for (int i = 0; i < MAX_CREATE_RETRIES; i++) {
+    int ret = 0;
+    ret = select_bucket_placement(bucket.name, bucket);
+    if (ret < 0)
+      return ret;
+    librados::IoCtx index_ctx; // context for new bucket
 
-  int r = open_bucket_index_ctx(bucket, index_ctx);
-  if (r < 0)
-    return r;
+    int r = open_bucket_index_ctx(bucket, index_ctx);
+    if (r < 0)
+      return r;
 
-  bufferlist bl;
-  uint32_t nop = 0;
-  ::encode(nop, bl);
+    bufferlist bl;
+    uint32_t nop = 0;
+    ::encode(nop, bl);
 
-  const string& pool = zone.domain_root.name;
-  const char *pool_str = pool.c_str();
-  librados::IoCtx id_io_ctx;
-  r = rados->ioctx_create(pool_str, id_io_ctx);
-  if (r < 0)
-    return r;
+    const string& pool = zone.domain_root.name;
+    const char *pool_str = pool.c_str();
+    librados::IoCtx id_io_ctx;
+    r = rados->ioctx_create(pool_str, id_io_ctx);
+    if (r < 0)
+      return r;
 
-  uint64_t iid = instance_id();
-  uint64_t bid = next_bucket_id();
-  char buf[32];
-  snprintf(buf, sizeof(buf), "%llu.%llu", (long long)iid, (long long)bid); 
-  bucket.marker = buf;
-  bucket.bucket_id = bucket.marker;
+    uint64_t iid = instance_id();
+    uint64_t bid = next_bucket_id();
+    char buf[32];
+    snprintf(buf, sizeof(buf), "%llu.%llu", (long long)iid, (long long)bid); 
+    bucket.marker = buf;
+    bucket.bucket_id = bucket.marker;
 
-  string dir_oid =  dir_oid_prefix;
-  dir_oid.append(bucket.marker);
+    string dir_oid =  dir_oid_prefix;
+    dir_oid.append(bucket.marker);
 
-  librados::ObjectWriteOperation op;
-  op.create(true);
-  r = cls_rgw_init_index(index_ctx, op, dir_oid);
-  if (r < 0 && r != -EEXIST)
-    return r;
+    librados::ObjectWriteOperation op;
+    op.create(true);
+    r = cls_rgw_init_index(index_ctx, op, dir_oid);
+    if (r < 0 && r != -EEXIST)
+      return r;
 
-  objv_tracker.generate_new_write_ver(cct);
+    if (pobjv) {
+      objv_tracker.write_version = *pobjv;
+    } else {
+      objv_tracker.generate_new_write_ver(cct);
+    }
 
-  RGWBucketInfo info;
-  info.bucket = bucket;
-  info.owner = owner;
-  info.region = region_name;
-  ret = put_bucket_info(bucket.name, info, exclusive, &objv_tracker, &attrs);
-  if (ret == -EEXIST) {
-    index_ctx.remove(dir_oid);
+    RGWBucketInfo info;
+    info.bucket = bucket;
+    info.owner = owner;
+    info.region = region_name;
+    ret = put_bucket_info(bucket.name, info, exclusive, &objv_tracker, &attrs);
+    if (ret == -EEXIST) {
+      index_ctx.remove(dir_oid);
+      /* we need this for this objv_tracker */
+      int r = get_bucket_info(NULL, bucket.name, info, &objv_tracker, NULL);
+      if (r < 0) {
+        if (r == -ENOENT) {
+          continue;
+        }
+        ldout(cct, 0) << "get_bucket_info returned " << r << dendl;
+        return r;
+      }
+    }
+    return ret;
   }
 
-  return ret;
+  /* this is highly unlikely */
+  ldout(cct, 0) << "ERROR: could not create bucket, continuously raced with bucket creation and removal" << dendl;
+  return -ENOENT;
 }
 
 int RGWRados::select_bucket_placement(string& bucket_name, rgw_bucket& bucket)
index 2997c3c0d5e035b5712bacaf25596dfa6581dd28..aea9bb7ada955d6d7272922249c4d82964e38527 100644 (file)
@@ -646,6 +646,7 @@ public:
                             const string& region_name,
                             map<std::string,bufferlist>& attrs,
                             RGWObjVersionTracker& objv_tracker,
+                            obj_version *pobjv,
                             bool exclusive = true);
   virtual int add_bucket_placement(std::string& new_pool);
   virtual int remove_bucket_placement(std::string& new_pool);
index 6d530a8c16745d98f18e0ceb66b9dd8c16e8d1b1..4c4c43d3415a2f61ebf436465972ce5caf7e74a8 100644 (file)
@@ -115,6 +115,18 @@ int RGWRESTClient::send_data(void *ptr, size_t len)
   return len;
 }
 
+int RGWRESTClient::receive_data(void *ptr, size_t len)
+{
+  if (response.length() > max_response)
+    return 0; /* don't read extra data */
+
+  bufferptr p((char *)ptr, len);
+
+  response.append(p);
+
+  return 0;
+
+}
 void RGWRESTClient::append_param(string& dest, const string& name, const string& val)
 {
   if (dest.empty()) {
@@ -142,7 +154,7 @@ void RGWRESTClient::get_params_str(map<string, string>& extra_args, string& dest
   }
 }
 
-int RGWRESTClient::forward_request(RGWAccessKey& key, req_info& info, bufferlist *inbl)
+int RGWRESTClient::forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl)
 {
 
   string date_str;
@@ -213,5 +225,11 @@ int RGWRESTClient::forward_request(RGWAccessKey& key, req_info& info, bufferlist
   if (r < 0)
     return r;
 
+  response.append((char)0); /* NULL terminate response */
+
+  if (outbl) {
+    outbl->claim(response);
+  }
+
   return rgw_http_error_to_errno(status);
 }
index 0adfba2b9520f6dfc2dfb747c058edbaa8954bc0..5e871bbdd96cc77ecc5683696a916c5e9ca3a92a 100644 (file)
@@ -18,11 +18,15 @@ protected:
 
   bufferlist::iterator *send_iter;
 
+  size_t max_response; /* we need this as we don't stream out response */
+  bufferlist response;
+
   void append_param(string& dest, const string& name, const string& val);
   void get_params_str(map<string, string>& extra_args, string& dest);
 public:
   RGWRESTClient(CephContext *_cct, string& _url, list<pair<string, string> > *_headers,
-                list<pair<string, string> > *_params) : cct(_cct), status(0), url(_url), send_iter(NULL) {
+                list<pair<string, string> > *_params) : cct(_cct), status(0), url(_url), send_iter(NULL),
+                                                        max_response(0) {
     if (_headers)
       headers = *_headers;
 
@@ -31,10 +35,13 @@ public:
   }
 
   int receive_header(void *ptr, size_t len);
+  virtual int receive_data(void *ptr, size_t len);
   int send_data(void *ptr, size_t len);
 
+  bufferlist& get_response() { return response; }
+
   int execute(RGWAccessKey& key, const char *method, const char *resource);
-  int forward_request(RGWAccessKey& key, req_info& info, bufferlist *inbl);
+  int forward_request(RGWAccessKey& key, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl);
 };
 
 
index cbbabe5e475eeb6142873219cb154a4abc6ddf8f..66d5d9bf48444f1b21fcbcdafbb82eb9f6c0f14c 100644 (file)
@@ -27,7 +27,7 @@ int RGWRegionConnection::get_url(string& endpoint)
   return 0;
 }
 
-int RGWRegionConnection::forward(const string& uid, req_info& info, bufferlist *inbl)
+int RGWRegionConnection::forward(const string& uid, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl)
 {
   string url;
   int ret = get_url(url);
@@ -37,18 +37,6 @@ int RGWRegionConnection::forward(const string& uid, req_info& info, bufferlist *
   params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "uid", uid));
   params.push_back(make_pair<string, string>(RGW_SYS_PARAM_PREFIX "region", region));
   RGWRESTClient client(cct, url, NULL, &params);
-  return client.forward_request(key, info, inbl);
+  return client.forward_request(key, info, max_response, inbl, outbl);
 }
 
-int RGWRegionConnection::create_bucket(const string& uid, const string& bucket)
-{
-  list<pair<string, string> > params;
-  params.push_back(make_pair<string, string>("uid", uid));
-  params.push_back(make_pair<string, string>("bucket", bucket));
-  string url;
-  int ret = get_url(url);
-  if (ret < 0)
-    return ret;
-  RGWRESTClient client(cct, url, NULL, &params);
-  return client.execute(key, "PUT", "/admin/bucket");
-}
index 2ad9c8f659f69d5c92e2814076400c9485802457..6b296342d93a5b8911fde3d2d5d3b050179b99c1 100644 (file)
@@ -19,9 +19,7 @@ public:
   RGWRegionConnection(CephContext *_cct, RGWRados *store, RGWRegion& upstream);
   int get_url(string& endpoint);
 
-  int forward(const string& uid, req_info& info, bufferlist *inbl);
-  int create_bucket(const string& uid, const string& bucket);
-
+  int forward(const string& uid, req_info& info, size_t max_response, bufferlist *inbl, bufferlist *outbl);
 };
 
 #endif
index c82dd17aa4ff258fa520324c452e0b74ec80f3dc..8ba60c1abefa0f57f12e39a60f701e7ca4c7238b 100644 (file)
@@ -408,6 +408,18 @@ void RGWCreateBucket_ObjStore_S3::send_response()
     set_req_state_err(s, ret);
   dump_errno(s);
   end_header(s);
+
+  if (ret < 0)
+    return;
+
+  if (s->system_request) {
+    JSONFormatter f; /* use json formatter for system requests output */
+
+    f.open_object_section("info");
+    encode_json("object_ver", objv_tracker.read_version, &f);
+    f.close_section();
+    rgw_flush_formatter_and_reset(s, &f);
+  }
 }
 
 void RGWDeleteBucket_ObjStore_S3::send_response()