]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: keep track of written_objs correctly 9154/head
authorYehuda Sadeh <yehuda@redhat.com>
Mon, 16 May 2016 21:35:12 +0000 (14:35 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Mon, 16 May 2016 22:44:31 +0000 (15:44 -0700)
Fixes: http://tracker.ceph.com/issues/15886
Only add a rados object to the written_objs list if the write
was successful. Otherwise if the write will be canceled for some
reason, we'd remove an object that we didn't write to. This was
a problem in a case where there's multiple writes that went to
the same part. The second writer should fail the write, since
we do an exclusive write. However, we added the object's name
to the written_objs list anyway, which was a real problem when
the old processor was disposed (as it was clearing the objects).

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_op.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index c8b1cd9c8c9af3dc8d6e846efd9f8e8eb62571b4..d0884ea5b3b09fca71f0844a7c4e1a97c025b959 100644 (file)
@@ -1372,12 +1372,13 @@ static inline int put_data_and_throttle(RGWPutObjProcessor *processor,
 
   do {
     void *handle;
+    rgw_obj obj;
 
-    int ret = processor->handle_data(data, ofs, hash, &handle, &again);
+    int ret = processor->handle_data(data, ofs, hash, &handle, &obj, &again);
     if (ret < 0)
       return ret;
 
-    ret = processor->throttle_data(handle, need_to_wait);
+    ret = processor->throttle_data(handle, obj, need_to_wait);
     if (ret < 0)
       return ret;
 
index 7121045c3647683875fe922f5f1b350c98e24038..e258896250fdb37455da87717bac94f1ac06b987 100644 (file)
@@ -2145,7 +2145,7 @@ RGWPutObjProcessor_Aio::~RGWPutObjProcessor_Aio()
   if (is_complete)
     return;
 
-  list<rgw_obj>::iterator iter;
+  set<rgw_obj>::iterator iter;
   bool is_multipart_obj = false;
   rgw_obj multipart_obj;
 
@@ -2157,7 +2157,7 @@ RGWPutObjProcessor_Aio::~RGWPutObjProcessor_Aio()
    * details is describled on #11749
    */ 
   for (iter = written_objs.begin(); iter != written_objs.end(); ++iter) {
-    rgw_obj &obj = *iter;
+    const rgw_obj &obj = *iter;
     if (RGW_OBJ_NS_MULTIPART == obj.ns) {
       ldout(store->ctx(), 5) << "NOTE: we should not process the multipart object (" << obj << ") here" << dendl;
       multipart_obj = *iter;
@@ -2186,7 +2186,6 @@ int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t
     obj_len = abs_ofs + bl.length();
 
   if (!(obj == last_written_obj)) {
-    add_written_obj(obj);
     last_written_obj = obj;
   }
 
@@ -2196,7 +2195,6 @@ int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t
                                      bl,
                                      ((ofs != 0) ? ofs : -1),
                                      exclusive, phandle);
-
   return r;
 }
 
@@ -2215,6 +2213,11 @@ int RGWPutObjProcessor_Aio::wait_pending_front()
   }
   struct put_obj_aio_info info = pop_pending();
   int ret = store->aio_wait(info.handle);
+
+  if (ret >= 0) {
+    add_written_obj(info.obj);
+  }
+
   return ret;
 }
 
@@ -2238,13 +2241,14 @@ int RGWPutObjProcessor_Aio::drain_pending()
   return ret;
 }
 
-int RGWPutObjProcessor_Aio::throttle_data(void *handle, bool need_to_wait)
+int RGWPutObjProcessor_Aio::throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait)
 {
   bool _wait = need_to_wait;
 
   if (handle) {
     struct put_obj_aio_info info;
     info.handle = handle;
+    info.obj = obj;
     pending.push_back(info);
   }
   size_t orig_size = pending.size();
@@ -2272,7 +2276,7 @@ int RGWPutObjProcessor_Aio::throttle_data(void *handle, bool need_to_wait)
   return 0;
 }
 
-int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive)
+int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool exclusive)
 {
   if (ofs >= next_part_ofs) {
     int r = prepare_next_part(ofs);
@@ -2281,10 +2285,12 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan
     }
   }
 
+  *pobj = cur_obj;
+
   return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
 }
 
-int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again)
+int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, rgw_obj *pobj, bool *again)
 {
   *again = false;
 
@@ -2333,7 +2339,7 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash,
   bool exclusive = (!write_ofs && immutable_head()); /* immutable head object, need to verify nothing exists there
                                                         we could be racing with another upload, to the same
                                                         object and cleanup can be messy */
-  int ret = write_data(bl, write_ofs, phandle, exclusive);
+  int ret = write_data(bl, write_ofs, phandle, pobj, exclusive);
   if (ret >= 0) { /* we might return, need to clear bl as it was already sent */
     if (hash) {
       hash->Update((const byte *)bl.c_str(), bl.length());
@@ -2421,19 +2427,20 @@ int RGWPutObjProcessor_Atomic::complete_writing_data()
   }
   while (pending_data_bl.length()) {
     void *handle;
+    rgw_obj obj;
     uint64_t max_write_size = MIN(max_chunk_size, (uint64_t)next_part_ofs - data_ofs);
     if (max_write_size > pending_data_bl.length()) {
       max_write_size = pending_data_bl.length();
     }
     bufferlist bl;
     pending_data_bl.splice(0, max_write_size, &bl);
-    int r = write_data(bl, data_ofs, &handle, false);
+    int r = write_data(bl, data_ofs, &handle, &obj, false);
     if (r < 0) {
       ldout(store->ctx(), 0) << "ERROR: write_data() returned " << r << dendl;
       return r;
     }
     data_ofs += bl.length();
-    r = throttle_data(handle, false);
+    r = throttle_data(handle, obj, false);
     if (r < 0) {
       ldout(store->ctx(), 0) << "ERROR: throttle_data() returned " << r << dendl;
       return r;
@@ -6190,7 +6197,8 @@ public:
 
     do {
       void *handle;
-      int ret = processor->handle_data(bl, ofs, NULL, &handle, &again);
+      rgw_obj obj;
+      int ret = processor->handle_data(bl, ofs, NULL, &handle, &obj, &again);
       if (ret < 0)
         return ret;
 
@@ -6201,7 +6209,7 @@ public:
         ret = opstate->renew_state();
         if (ret < 0) {
           ldout(processor->ctx(), 0) << "ERROR: RGWRadosPutObj::handle_data(): failed to renew op state ret=" << ret << dendl;
-          int r = processor->throttle_data(handle, false);
+          int r = processor->throttle_data(handle, obj, false);
           if (r < 0) {
             ldout(processor->ctx(), 0) << "ERROR: RGWRadosPutObj::handle_data(): processor->throttle_data() returned " << r << dendl;
           }
@@ -6212,7 +6220,7 @@ public:
         need_opstate = false;
       }
 
-      ret = processor->throttle_data(handle, false);
+      ret = processor->throttle_data(handle, obj, false);
       if (ret < 0)
         return ret;
     } while (again);
@@ -6963,12 +6971,13 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
 
     do {
       void *handle;
+      rgw_obj obj;
 
-      ret = processor.handle_data(bl, ofs, NULL, &handle, &again);
+      ret = processor.handle_data(bl, ofs, NULL, &handle, &obj, &again);
       if (ret < 0) {
         return ret;
       }
-      ret = processor.throttle_data(handle, false);
+      ret = processor.throttle_data(handle, obj, false);
       if (ret < 0)
         return ret;
     } while (again);
@@ -7592,7 +7601,7 @@ int RGWRados::Object::Delete::delete_obj()
 
 int RGWRados::delete_obj(RGWObjectCtx& obj_ctx,
                          RGWBucketInfo& bucket_info,
-                         rgw_obj& obj,
+                         const rgw_obj& obj,
                          int versioning_status,
                          uint16_t bilog_flags,
                          const real_time& expiration_time)
index 0ced215a7c3719e9d8d16128a2782b6328f2d92d..af290cb44def01a5144df81ed10d450dea7ce4a9 100644 (file)
@@ -2153,7 +2153,7 @@ public:
     int complete_atomic_modification();
 
   public:
-    Object(RGWRados *_store, RGWBucketInfo& _bucket_info, RGWObjectCtx& _ctx, rgw_obj& _obj) : store(_store), bucket_info(_bucket_info),
+    Object(RGWRados *_store, RGWBucketInfo& _bucket_info, RGWObjectCtx& _ctx, const rgw_obj& _obj) : store(_store), bucket_info(_bucket_info),
                                                                                                ctx(_ctx), obj(_obj), bs(store),
                                                                                                state(NULL), versioning_disabled(false),
                                                                                                bs_initialized(false) {}
@@ -2564,7 +2564,7 @@ public:
   /** Delete an object.*/
   virtual int delete_obj(RGWObjectCtx& obj_ctx,
                          RGWBucketInfo& bucket_owner,
-                         rgw_obj& src_obj,
+                         const rgw_obj& src_obj,
                          int versioning_status,
                          uint16_t bilog_flags = 0,
                          const ceph::real_time& expiration_time = ceph::real_time());
@@ -3075,8 +3075,8 @@ public:
     store = _store;
     return 0;
   }
-  virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again) = 0;
-  virtual int throttle_data(void *handle, bool need_to_wait) = 0;
+  virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, rgw_obj *pobj, bool *again) = 0;
+  virtual int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) = 0;
   virtual void complete_hash(MD5 *hash) {
     assert(0);
   }
@@ -3091,6 +3091,7 @@ public:
 
 struct put_obj_aio_info {
   void *handle;
+  rgw_obj obj;
 };
 
 class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
@@ -3107,17 +3108,17 @@ class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
 protected:
   uint64_t obj_len;
 
-  list<rgw_obj> written_objs;
+  set<rgw_obj> written_objs;
 
   void add_written_obj(const rgw_obj& obj) {
-    written_objs.push_back(obj);
+    written_objs.insert(obj);
   }
 
   int drain_pending();
   int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive);
 
 public:
-  int throttle_data(void *handle, bool need_to_wait);
+  int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait);
 
   RGWPutObjProcessor_Aio(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info) : RGWPutObjProcessor(obj_ctx, bucket_info), max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {}
   virtual ~RGWPutObjProcessor_Aio();
@@ -3152,7 +3153,7 @@ protected:
   RGWObjManifest manifest;
   RGWObjManifest::generator manifest_gen;
 
-  int write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive);
+  int write_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool exclusive);
   virtual int do_complete(string& etag, ceph::real_time *mtime, ceph::real_time set_mtime,
                           map<string, bufferlist>& attrs, ceph::real_time delete_at,
                           const char *if_match = NULL, const char *if_nomatch = NULL);
@@ -3185,7 +3186,7 @@ public:
   void set_extra_data_len(uint64_t len) {
     extra_data_len = len;
   }
-  virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again);
+  virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, rgw_obj *pobj, bool *again);
   virtual void complete_hash(MD5 *hash);
   bufferlist& get_extra_data() { return extra_data_bl; }