]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: keep track of written_objs correctly 9240/head
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 16 May 2016 21:35:12 +0000 (14:35 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 20 May 2016 21:47:08 +0000 (14:47 -0700)
Fixes: http://tracker.ceph.com/issues/15886
Only add a rados object to the written_objs list if the write
was successful. Otherwise if the write will be canceled for some
reason, we'd remove an object that we didn't write to. This was
a problem in a case where there's multiple writes that went to
the same part. The second writer should fail the write, since
we do an exclusive write. However, we added the object's name
to the written_objs list anyway, which was a real problem when
the old processor was disposed (as it was clearing the objects).

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
(cherry picked from commit 8fd74d11682f9d0c9085d2dc445fc3eb5631f6e0)

src/rgw/rgw_op.cc
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index cd8785f19dea8b8591d1e2b859cc9c63229f0fcd..dd413dd3c8c04ac59deb3d0526249cd41d401b0c 100644 (file)
@@ -1684,12 +1684,13 @@ static int put_data_and_throttle(RGWPutObjProcessor *processor, bufferlist& data
 
   do {
     void *handle;
+    rgw_obj obj;
 
-    int ret = processor->handle_data(data, ofs, hash, &handle, &again);
+    int ret = processor->handle_data(data, ofs, hash, &handle, &obj, &again);
     if (ret < 0)
       return ret;
 
-    ret = processor->throttle_data(handle, need_to_wait);
+    ret = processor->throttle_data(handle, obj, need_to_wait);
     if (ret < 0)
       return ret;
 
index 05c41ef4c33db9174f6f02d0b50b55848a3abb31..91f7ce1e0e28e5b506db5d36100b37e731e6335e 100644 (file)
@@ -914,7 +914,7 @@ RGWPutObjProcessor_Aio::~RGWPutObjProcessor_Aio()
   if (is_complete)
     return;
 
-  list<rgw_obj>::iterator iter;
+  set<rgw_obj>::iterator iter;
   bool is_multipart_obj = false;
   rgw_obj multipart_obj;
 
@@ -926,7 +926,7 @@ RGWPutObjProcessor_Aio::~RGWPutObjProcessor_Aio()
    * details is describled on #11749
    */ 
   for (iter = written_objs.begin(); iter != written_objs.end(); ++iter) {
-    rgw_obj &obj = *iter;
+    const rgw_obj &obj = *iter;
     if (RGW_OBJ_NS_MULTIPART == obj.ns) {
       ldout(store->ctx(), 5) << "NOTE: we should not process the multipart object (" << obj << ") here" << dendl;
       multipart_obj = *iter;
@@ -955,7 +955,6 @@ int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t
     obj_len = abs_ofs + bl.length();
 
   if (!(obj == last_written_obj)) {
-    add_written_obj(obj);
     last_written_obj = obj;
   }
 
@@ -965,7 +964,6 @@ int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t
                                      bl,
                                      ((ofs != 0) ? ofs : -1),
                                      exclusive, phandle);
-
   return r;
 }
 
@@ -984,6 +982,11 @@ int RGWPutObjProcessor_Aio::wait_pending_front()
   }
   struct put_obj_aio_info info = pop_pending();
   int ret = store->aio_wait(info.handle);
+
+  if (ret >= 0) {
+    add_written_obj(info.obj);
+  }
+
   return ret;
 }
 
@@ -1007,11 +1010,12 @@ int RGWPutObjProcessor_Aio::drain_pending()
   return ret;
 }
 
-int RGWPutObjProcessor_Aio::throttle_data(void *handle, bool need_to_wait)
+int RGWPutObjProcessor_Aio::throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait)
 {
   if (handle) {
     struct put_obj_aio_info info;
     info.handle = handle;
+    info.obj = obj;
     pending.push_back(info);
   }
   size_t orig_size = pending.size();
@@ -1042,7 +1046,7 @@ int RGWPutObjProcessor_Aio::throttle_data(void *handle, bool need_to_wait)
   return 0;
 }
 
-int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive)
+int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool exclusive)
 {
   if (ofs >= next_part_ofs) {
     int r = prepare_next_part(ofs);
@@ -1051,10 +1055,12 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan
     }
   }
 
+  *pobj = cur_obj;
+
   return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
 }
 
-int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again)
+int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, rgw_obj *pobj, bool *again)
 {
   *again = false;
 
@@ -1103,7 +1109,7 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash,
   bool exclusive = (!write_ofs && immutable_head()); /* immutable head object, need to verify nothing exists there
                                                         we could be racing with another upload, to the same
                                                         object and cleanup can be messy */
-  int ret = write_data(bl, write_ofs, phandle, exclusive);
+  int ret = write_data(bl, write_ofs, phandle, pobj, exclusive);
   if (ret >= 0) { /* we might return, need to clear bl as it was already sent */
     if (hash) {
       hash->Update((const byte *)bl.c_str(), bl.length());
@@ -1185,12 +1191,13 @@ int RGWPutObjProcessor_Atomic::complete_writing_data()
   }
   if (pending_data_bl.length()) {
     void *handle;
-    int r = write_data(pending_data_bl, data_ofs, &handle, false);
+    rgw_obj obj;
+    int r = write_data(pending_data_bl, data_ofs, &handle, &obj, false);
     if (r < 0) {
       ldout(store->ctx(), 0) << "ERROR: write_data() returned " << r << dendl;
       return r;
     }
-    r = throttle_data(handle, false);
+    r = throttle_data(handle, obj, false);
     if (r < 0) {
       ldout(store->ctx(), 0) << "ERROR: throttle_data() returned " << r << dendl;
       return r;
@@ -3658,7 +3665,8 @@ public:
 
     do {
       void *handle;
-      int ret = processor->handle_data(bl, ofs, NULL, &handle, &again);
+      rgw_obj obj;
+      int ret = processor->handle_data(bl, ofs, NULL, &handle, &obj, &again);
       if (ret < 0)
         return ret;
 
@@ -3669,7 +3677,7 @@ public:
         ret = opstate->renew_state();
         if (ret < 0) {
           ldout(processor->ctx(), 0) << "ERROR: RGWRadosPutObj::handle_data(): failed to renew op state ret=" << ret << dendl;
-          int r = processor->throttle_data(handle, false);
+          int r = processor->throttle_data(handle, obj, false);
           if (r < 0) {
             ldout(processor->ctx(), 0) << "ERROR: RGWRadosPutObj::handle_data(): processor->throttle_data() returned " << r << dendl;
           }
@@ -3680,7 +3688,7 @@ public:
         need_opstate = false;
       }
 
-      ret = processor->throttle_data(handle, false);
+      ret = processor->throttle_data(handle, obj, false);
       if (ret < 0)
         return ret;
     } while (again);
@@ -4240,12 +4248,13 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
 
     do {
       void *handle;
+      rgw_obj obj;
 
-      ret = processor.handle_data(bl, ofs, NULL, &handle, &again);
+      ret = processor.handle_data(bl, ofs, NULL, &handle, &obj, &again);
       if (ret < 0) {
         return ret;
       }
-      ret = processor.throttle_data(handle, false);
+      ret = processor.throttle_data(handle, obj, false);
       if (ret < 0)
         return ret;
     } while (again);
@@ -4794,7 +4803,7 @@ int RGWRados::Object::Delete::delete_obj()
   return 0;
 }
 
-int RGWRados::delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, rgw_obj& obj,
+int RGWRados::delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& obj,
                          int versioning_status, uint16_t bilog_flags)
 {
   RGWRados::Object del_target(this, bucket_info, obj_ctx, obj);
index 37c7e8a731d3bea79c6cb418353172333d0cdca2..38d7c29f71a82302636e5c1f26b023125aa276e1 100644 (file)
@@ -1487,7 +1487,7 @@ public:
     int complete_atomic_modification();
 
   public:
-    Object(RGWRados *_store, RGWBucketInfo& _bucket_info, RGWObjectCtx& _ctx, rgw_obj& _obj) : store(_store), bucket_info(_bucket_info),
+    Object(RGWRados *_store, RGWBucketInfo& _bucket_info, RGWObjectCtx& _ctx, const rgw_obj& _obj) : store(_store), bucket_info(_bucket_info),
                                                                                                ctx(_ctx), obj(_obj), bs(store),
                                                                                                state(NULL), versioning_disabled(false),
                                                                                                bs_initialized(false) {}
@@ -1857,7 +1857,7 @@ public:
   int bucket_suspended(rgw_bucket& bucket, bool *suspended);
 
   /** Delete an object.*/
-  virtual int delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_owner, rgw_obj& src_obj,
+  virtual int delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_owner, const rgw_obj& src_obj,
                          int versioning_status, uint16_t bilog_flags = 0);
 
   /* Delete a system object */
@@ -2317,8 +2317,8 @@ public:
     store = _store;
     return 0;
   }
-  virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again) = 0;
-  virtual int throttle_data(void *handle, bool need_to_wait) = 0;
+  virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, rgw_obj *pobj, bool *again) = 0;
+  virtual int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) = 0;
   virtual void complete_hash(MD5 *hash) {
     assert(0);
   }
@@ -2331,6 +2331,7 @@ public:
 
 struct put_obj_aio_info {
   void *handle;
+  rgw_obj obj;
 };
 
 class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
@@ -2347,17 +2348,17 @@ class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
 protected:
   uint64_t obj_len;
 
-  list<rgw_obj> written_objs;
+  set<rgw_obj> written_objs;
 
   void add_written_obj(const rgw_obj& obj) {
-    written_objs.push_back(obj);
+    written_objs.insert(obj);
   }
 
   int drain_pending();
   int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive);
 
 public:
-  int throttle_data(void *handle, bool need_to_wait);
+  int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait);
 
   RGWPutObjProcessor_Aio(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info) : RGWPutObjProcessor(obj_ctx, bucket_info), max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {}
   virtual ~RGWPutObjProcessor_Aio();
@@ -2392,7 +2393,7 @@ protected:
   RGWObjManifest manifest;
   RGWObjManifest::generator manifest_gen;
 
-  int write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive);
+  int write_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool exclusive);
   virtual int do_complete(string& etag, time_t *mtime, time_t set_mtime,
                           map<string, bufferlist>& attrs,
                           const char *if_match = NULL, const char *if_nomatch = NULL);
@@ -2425,7 +2426,7 @@ public:
   void set_extra_data_len(uint64_t len) {
     extra_data_len = len;
   }
-  virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again);
+  virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, rgw_obj *pobj, bool *again);
   virtual void complete_hash(MD5 *hash);
   bufferlist& get_extra_data() { return extra_data_bl; }