]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: don't allow multiple writers to same multiobject part
authorYehuda Sadeh <yehuda@inktank.com>
Sat, 3 May 2014 00:06:05 +0000 (17:06 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Tue, 6 May 2014 20:18:11 +0000 (13:18 -0700)
Fixes: #8269
Backport: firefly, dumpling

A client might need to retry a multipart part write. The original thread
might race with the new one, trying to clean up after it, clobbering the
part's data.
The fix is to detect whether an original part already existed, and if so
use a different part name for it.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/rgw/rgw_op.cc
src/rgw/rgw_op.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index 920b7ae58e7364a16b348965bcbbd299fbc5f64e..7fc81cc0d65cc055c873ca7ff224d8848a7ea441 100644 (file)
@@ -1357,22 +1357,26 @@ class RGWPutObjProcessor_Multipart : public RGWPutObjProcessor_Atomic
   string upload_id;
 
 protected:
-  bool immutable_head() { return true; }
-  int prepare(RGWRados *store, void *obj_ctx);
+  int prepare(RGWRados *store, void *obj_ctx, string *oid_rand);
   int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
 
 public:
+  bool immutable_head() { return true; }
   RGWPutObjProcessor_Multipart(const string& bucket_owner, uint64_t _p, req_state *_s) :
                    RGWPutObjProcessor_Atomic(bucket_owner, _s->bucket, _s->object_str, _p, _s->req_id), s(_s) {}
 };
 
-int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx)
+int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx, string *oid_rand)
 {
-  RGWPutObjProcessor::prepare(store, obj_ctx);
+  RGWPutObjProcessor::prepare(store, obj_ctx, NULL);
 
   string oid = obj_str;
   upload_id = s->info.args.get("uploadId");
-  mp.init(oid, upload_id);
+  if (!oid_rand) {
+    mp.init(oid, upload_id);
+  } else {
+    mp.init(oid, upload_id, *oid_rand);
+  }
 
   part_num = s->info.args.get("partNumber");
   if (part_num.empty()) {
@@ -1388,7 +1392,13 @@ int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx)
     return -EINVAL;
   }
 
-  string upload_prefix = oid + "." + upload_id;
+  string upload_prefix = oid + ".";
+
+  if (!oid_rand) {
+    upload_prefix.append(upload_id);
+  } else {
+    upload_prefix.append(*oid_rand);
+  }
 
   rgw_obj target_obj;
   target_obj.init(bucket, oid);
@@ -1466,7 +1476,7 @@ int RGWPutObjProcessor_Multipart::do_complete(string& etag, time_t *mtime, time_
 }
 
 
-RGWPutObjProcessor *RGWPutObj::select_processor()
+RGWPutObjProcessor *RGWPutObj::select_processor(bool *is_multipart)
 {
   RGWPutObjProcessor *processor;
 
@@ -1482,6 +1492,10 @@ RGWPutObjProcessor *RGWPutObj::select_processor()
     processor = new RGWPutObjProcessor_Multipart(bucket_owner, part_size, s);
   }
 
+  if (is_multipart) {
+    *is_multipart = multipart;
+  }
+
   return processor;
 }
 
@@ -1507,6 +1521,7 @@ void RGWPutObj::execute()
   map<string, bufferlist> attrs;
   int len;
   map<string, string>::iterator iter;
+  bool multipart;
 
 
   perfcounter->inc(l_rgw_put);
@@ -1547,9 +1562,9 @@ void RGWPutObj::execute()
     supplied_md5[sizeof(supplied_md5) - 1] = '\0';
   }
 
-  processor = select_processor();
+  processor = select_processor(&multipart);
 
-  ret = processor->prepare(store, s->obj_ctx);
+  ret = processor->prepare(store, s->obj_ctx, NULL);
   if (ret < 0)
     goto done;
 
@@ -1572,9 +1587,48 @@ void RGWPutObj::execute()
 
     hash.Update(data_ptr, len);
 
-    ret = processor->throttle_data(handle);
-    if (ret < 0)
-      goto done;
+    /* do we need this operation to be synchronous? if we're dealing with an object with immutable
+     * head, e.g., multipart object we need to make sure we're the first one writing to this object
+     */
+    bool need_to_wait = (ofs == 0) && multipart;
+
+    ret = processor->throttle_data(handle, need_to_wait);
+    if (ret < 0) {
+      if (!need_to_wait || ret != -EEXIST) {
+        ldout(s->cct, 20) << "processor->thottle_data() returned ret=" << ret << dendl;
+        goto done;
+      }
+
+      ldout(s->cct, 5) << "NOTICE: processor->throttle_data() returned -EEXIST, need to restart write" << dendl;
+
+      /* restart processing with different oid suffix */
+
+      dispose_processor(processor);
+      processor = select_processor(&multipart);
+
+      string oid_rand;
+      char buf[33];
+      gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
+      oid_rand.append(buf);
+
+      ret = processor->prepare(store, s->obj_ctx, &oid_rand);
+      if (ret < 0) {
+        ldout(s->cct, 0) << "ERROR: processor->prepare() returned " << ret << dendl;
+        goto done;
+      }
+
+      ret = processor->handle_data(data, ofs, &handle);
+      if (ret < 0) {
+        ldout(s->cct, 0) << "ERROR: processor->handle_data() returned " << ret << dendl;
+        goto done;
+      }
+
+      ret = processor->throttle_data(handle, false);
+      if (ret < 0) {
+        ldout(s->cct, 0) << "ERROR: processor->throttle_data() returned " << ret << dendl;
+        goto done;
+      }
+    }
 
     ofs += len;
   } while (len > 0);
@@ -1683,7 +1737,7 @@ void RGWPostObj::execute()
 
   processor = select_processor();
 
-  ret = processor->prepare(store, s->obj_ctx);
+  ret = processor->prepare(store, s->obj_ctx, NULL);
   if (ret < 0)
     goto done;
 
@@ -1708,7 +1762,7 @@ void RGWPostObj::execute()
 
      hash.Update(data_ptr, len);
 
-     ret = processor->throttle_data(handle);
+     ret = processor->throttle_data(handle, false);
      if (ret < 0)
        goto done;
 
index 9c1fa5333a901f7b0c58ec88fa64edb40cac4659..c28485dcf93a605d27495af7cbe094ad0217c311 100644 (file)
@@ -340,7 +340,7 @@ public:
     policy.set_ctx(s->cct);
   }
 
-  RGWPutObjProcessor *select_processor();
+  RGWPutObjProcessor *select_processor(bool *is_multipart);
   void dispose_processor(RGWPutObjProcessor *processor);
 
   int verify_permission();
@@ -754,21 +754,22 @@ class RGWMPObj {
   string upload_id;
 public:
   RGWMPObj() {}
-  RGWMPObj(string& _oid, string& _upload_id) {
-    init(_oid, _upload_id);
+  RGWMPObj(const string& _oid, const string& _upload_id) {
+    init(_oid, _upload_id, _upload_id);
+  }
+  void init(const string& _oid, const string& _upload_id) {
+    init(_oid, _upload_id, _upload_id);
   }
-  void init(string& _oid, string& _upload_id) {
+  void init(const string& _oid, const string& _upload_id, const string& part_unique_str) {
     if (_oid.empty()) {
       clear();
       return;
     }
     oid = _oid;
     upload_id = _upload_id;
-    prefix = oid;
-    prefix.append(".");
-    prefix.append(upload_id);
-    meta = prefix;
-    meta.append(MP_META_SUFFIX);
+    prefix = oid + ".";
+    meta = prefix + upload_id + MP_META_SUFFIX;
+    prefix.append(part_unique_str);
   }
   string& get_meta() { return meta; }
   string get_part(int num) {
@@ -799,7 +800,7 @@ public:
       return false;
     oid = meta.substr(0, mid_pos);
     upload_id = meta.substr(mid_pos + 1, end_pos - mid_pos - 1);
-    init(oid, upload_id);
+    init(oid, upload_id, upload_id);
     return true;
   }
   void clear() {
index 0ea7057f5bd584c578b31fc24c2ab84243b4e669..4b1c3559af4ddfe5bf0f8ecd9266dab594f10f33 100644 (file)
@@ -719,10 +719,10 @@ int RGWObjManifest::generator::create_begin(CephContext *cct, RGWObjManifest *_m
   manifest->set_head(_h);
   last_ofs = 0;
 
-  char buf[33];
-  gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
-
   if (manifest->get_prefix().empty()) {
+    char buf[33];
+    gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
+
     string oid_prefix = ".";
     oid_prefix.append(buf);
     oid_prefix.append("_");
@@ -1006,9 +1006,9 @@ RGWPutObjProcessor::~RGWPutObjProcessor()
   }
 }
 
-int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx)
+int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx, string *oid_rand)
 {
-  RGWPutObjProcessor::prepare(store, obj_ctx);
+  RGWPutObjProcessor::prepare(store, obj_ctx, oid_rand);
 
   obj.init(bucket, obj_str);
 
@@ -1041,7 +1041,7 @@ int RGWPutObjProcessor_Plain::do_complete(string& etag, time_t *mtime, time_t se
 }
 
 
-int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle)
+int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive)
 {
   if ((uint64_t)abs_ofs + bl.length() > obj_len)
     obj_len = abs_ofs + bl.length();
@@ -1051,7 +1051,7 @@ int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t
   int r = store->aio_put_obj_data(NULL, obj,
                                      bl,
                                      ((ofs != 0) ? ofs : -1),
-                                     false, phandle);
+                                     exclusive, phandle);
 
   return r;
 }
@@ -1091,7 +1091,7 @@ int RGWPutObjProcessor_Aio::drain_pending()
   return ret;
 }
 
-int RGWPutObjProcessor_Aio::throttle_data(void *handle)
+int RGWPutObjProcessor_Aio::throttle_data(void *handle, bool need_to_wait)
 {
   if (handle) {
     struct put_obj_aio_info info;
@@ -1099,10 +1099,13 @@ int RGWPutObjProcessor_Aio::throttle_data(void *handle)
     pending.push_back(info);
   }
   size_t orig_size = pending.size();
-  while (pending_has_completed()) {
+  while (pending_has_completed()
+         || need_to_wait) {
     int r = wait_pending_front();
     if (r < 0)
       return r;
+
+    need_to_wait = false;
   }
 
   /* resize window in case messages are draining too fast */
@@ -1118,7 +1121,7 @@ int RGWPutObjProcessor_Aio::throttle_data(void *handle)
   return 0;
 }
 
-int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle)
+int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive)
 {
   if (ofs >= next_part_ofs) {
     int r = prepare_next_part(ofs);
@@ -1127,7 +1130,7 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan
     }
   }
 
-  return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle);
+  return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
 }
 
 int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle)
@@ -1168,12 +1171,15 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **pha
   }
   off_t write_ofs = data_ofs;
   data_ofs = write_ofs + bl.length();
-  return write_data(bl, write_ofs, phandle);
+  bool exclusive = (!write_ofs && immutable_head()); /* immutable head object, need to verify nothing exists there
+                                                        we could be racing with another upload, to the same
+                                                        object and cleanup can be messy */
+  return write_data(bl, write_ofs, phandle, exclusive);
 }
 
-int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, void *obj_ctx)
+int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, void *obj_ctx, string *oid_rand)
 {
-  RGWPutObjProcessor::prepare(store, obj_ctx);
+  RGWPutObjProcessor::prepare(store, obj_ctx, oid_rand);
 
   head_obj.init(bucket, obj_str);
 
@@ -1220,12 +1226,12 @@ int RGWPutObjProcessor_Atomic::complete_writing_data()
   }
   if (pending_data_bl.length()) {
     void *handle;
-    int r = write_data(pending_data_bl, data_ofs, &handle);
+    int r = write_data(pending_data_bl, data_ofs, &handle, false);
     if (r < 0) {
       ldout(store->ctx(), 0) << "ERROR: write_data() returned " << r << dendl;
       return r;
     }
-    r = throttle_data(handle);
+    r = throttle_data(handle, false);
     if (r < 0) {
       ldout(store->ctx(), 0) << "ERROR: throttle_data() returned " << r << dendl;
       return r;
@@ -3014,7 +3020,7 @@ public:
       }
     }
 
-    ret = processor->throttle_data(handle);
+    ret = processor->throttle_data(handle, false);
     if (ret < 0)
       return ret;
 
@@ -3161,7 +3167,7 @@ int RGWRados::copy_obj(void *ctx,
 
     RGWPutObjProcessor_Atomic processor(dest_bucket_info.owner, dest_obj.bucket, dest_obj.object,
                                         cct->_conf->rgw_obj_stripe_size, tag);
-    ret = processor.prepare(this, ctx);
+    ret = processor.prepare(this, ctx, NULL);
     if (ret < 0)
       return ret;
 
index b19d3bd8a0292a72a2ad93a89ab75abfad952ac8..3f6276b316c0e657751dcaba2cdc7a23859e36b7 100644 (file)
@@ -537,13 +537,13 @@ protected:
 public:
   RGWPutObjProcessor(const string& _bo) : store(NULL), obj_ctx(NULL), is_complete(false), bucket_owner(_bo) {}
   virtual ~RGWPutObjProcessor();
-  virtual int prepare(RGWRados *_store, void *_o) {
+  virtual int prepare(RGWRados *_store, void *_o, string *oid_rand) {
     store = _store;
     obj_ctx = _o;
     return 0;
   };
   virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle) = 0;
-  virtual int throttle_data(void *handle) = 0;
+  virtual int throttle_data(void *handle, bool need_to_wait) = 0;
   virtual int complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
 };
 
@@ -557,12 +557,12 @@ class RGWPutObjProcessor_Plain : public RGWPutObjProcessor
   off_t ofs;
 
 protected:
-  int prepare(RGWRados *store, void *obj_ctx);
+  int prepare(RGWRados *store, void *obj_ctx, string *oid_rand);
   int handle_data(bufferlist& bl, off_t ofs, void **phandle);
   int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
 
 public:
-  int throttle_data(void *handle) { return 0; }
+  int throttle_data(void *handle, bool need_to_wait) { return 0; }
   RGWPutObjProcessor_Plain(const string& bucket_owner, rgw_bucket& b, const string& o) : RGWPutObjProcessor(bucket_owner),
                                                                                          bucket(b), obj_str(o), ofs(0) {}
 };
@@ -584,10 +584,10 @@ protected:
   uint64_t obj_len;
 
   int drain_pending();
-  int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle);
+  int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive);
 
 public:
-  int throttle_data(void *handle);
+  int throttle_data(void *handle, bool need_to_wait);
 
   RGWPutObjProcessor_Aio(const string& bucket_owner) : RGWPutObjProcessor(bucket_owner), max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {}
   virtual ~RGWPutObjProcessor_Aio() {
@@ -618,9 +618,7 @@ protected:
   RGWObjManifest manifest;
   RGWObjManifest::generator manifest_gen;
 
-  virtual bool immutable_head() { return false; }
-
-  int write_data(bufferlist& bl, off_t ofs, void **phandle);
+  int write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive);
   virtual int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
 
   int prepare_next_part(off_t ofs);
@@ -640,11 +638,12 @@ public:
                                 bucket(_b),
                                 obj_str(_o),
                                 unique_tag(_t) {}
-  int prepare(RGWRados *store, void *obj_ctx);
+  int prepare(RGWRados *store, void *obj_ctx, string *oid_rand);
+  virtual bool immutable_head() { return false; }
   void set_extra_data_len(uint64_t len) {
     extra_data_len = len;
   }
-  int handle_data(bufferlist& bl, off_t ofs, void **phandle);
+  virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle);
   bufferlist& get_extra_data() { return extra_data_bl; }
 };