From: Yehuda Sadeh Date: Mon, 16 May 2016 21:35:12 +0000 (-0700) Subject: rgw: keep track of written_objs correctly X-Git-Tag: v10.2.2~53^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ffd545bf173c5a076c47541cbe3889175188d73e;p=ceph.git rgw: keep track of written_objs correctly Fixes: http://tracker.ceph.com/issues/15886 Only add a rados object to the written_objs list if the write was successful. Otherwise if the write will be canceled for some reason, we'd remove an object that we didn't write to. This was a problem in a case where there's multiple writes that went to the same part. The second writer should fail the write, since we do an exclusive write. However, we added the object's name to the written_objs list anyway, which was a real problem when the old processor was disposed (as it was clearing the objects). Signed-off-by: Yehuda Sadeh (cherry picked from commit 8fd74d11682f9d0c9085d2dc445fc3eb5631f6e0) --- diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index c8b1cd9c8c9a..d0884ea5b3b0 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -1372,12 +1372,13 @@ static inline int put_data_and_throttle(RGWPutObjProcessor *processor, do { void *handle; + rgw_obj obj; - int ret = processor->handle_data(data, ofs, hash, &handle, &again); + int ret = processor->handle_data(data, ofs, hash, &handle, &obj, &again); if (ret < 0) return ret; - ret = processor->throttle_data(handle, need_to_wait); + ret = processor->throttle_data(handle, obj, need_to_wait); if (ret < 0) return ret; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 8a3420c05c0b..e6cb00170151 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -2146,7 +2146,7 @@ RGWPutObjProcessor_Aio::~RGWPutObjProcessor_Aio() if (is_complete) return; - list::iterator iter; + set::iterator iter; bool is_multipart_obj = false; rgw_obj multipart_obj; @@ -2158,7 +2158,7 @@ RGWPutObjProcessor_Aio::~RGWPutObjProcessor_Aio() * details is describled on #11749 */ for (iter = written_objs.begin(); iter != written_objs.end(); ++iter) { - rgw_obj &obj = *iter; + const rgw_obj &obj = *iter; if (RGW_OBJ_NS_MULTIPART == obj.ns) { ldout(store->ctx(), 5) << "NOTE: we should not process the multipart object (" << obj << ") here" << dendl; multipart_obj = *iter; @@ -2187,7 +2187,6 @@ int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t obj_len = abs_ofs + bl.length(); if (!(obj == last_written_obj)) { - add_written_obj(obj); last_written_obj = obj; } @@ -2197,7 +2196,6 @@ int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t bl, ((ofs != 0) ? ofs : -1), exclusive, phandle); - return r; } @@ -2216,6 +2214,11 @@ int RGWPutObjProcessor_Aio::wait_pending_front() } struct put_obj_aio_info info = pop_pending(); int ret = store->aio_wait(info.handle); + + if (ret >= 0) { + add_written_obj(info.obj); + } + return ret; } @@ -2239,13 +2242,14 @@ int RGWPutObjProcessor_Aio::drain_pending() return ret; } -int RGWPutObjProcessor_Aio::throttle_data(void *handle, bool need_to_wait) +int RGWPutObjProcessor_Aio::throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) { bool _wait = need_to_wait; if (handle) { struct put_obj_aio_info info; info.handle = handle; + info.obj = obj; pending.push_back(info); } size_t orig_size = pending.size(); @@ -2273,7 +2277,7 @@ int RGWPutObjProcessor_Aio::throttle_data(void *handle, bool need_to_wait) return 0; } -int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive) +int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool exclusive) { if (ofs >= next_part_ofs) { int r = prepare_next_part(ofs); @@ -2282,10 +2286,12 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan } } + *pobj = cur_obj; + return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive); } -int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again) +int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, rgw_obj *pobj, bool *again) { *again = false; @@ -2334,7 +2340,7 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, bool exclusive = (!write_ofs && immutable_head()); /* immutable head object, need to verify nothing exists there we could be racing with another upload, to the same object and cleanup can be messy */ - int ret = write_data(bl, write_ofs, phandle, exclusive); + int ret = write_data(bl, write_ofs, phandle, pobj, exclusive); if (ret >= 0) { /* we might return, need to clear bl as it was already sent */ if (hash) { hash->Update((const byte *)bl.c_str(), bl.length()); @@ -2422,19 +2428,20 @@ int RGWPutObjProcessor_Atomic::complete_writing_data() } while (pending_data_bl.length()) { void *handle; + rgw_obj obj; uint64_t max_write_size = MIN(max_chunk_size, (uint64_t)next_part_ofs - data_ofs); if (max_write_size > pending_data_bl.length()) { max_write_size = pending_data_bl.length(); } bufferlist bl; pending_data_bl.splice(0, max_write_size, &bl); - int r = write_data(bl, data_ofs, &handle, false); + int r = write_data(bl, data_ofs, &handle, &obj, false); if (r < 0) { ldout(store->ctx(), 0) << "ERROR: write_data() returned " << r << dendl; return r; } data_ofs += bl.length(); - r = throttle_data(handle, false); + r = throttle_data(handle, obj, false); if (r < 0) { ldout(store->ctx(), 0) << "ERROR: throttle_data() returned " << r << dendl; return r; @@ -6190,7 +6197,8 @@ public: do { void *handle; - int ret = processor->handle_data(bl, ofs, NULL, &handle, &again); + rgw_obj obj; + int ret = processor->handle_data(bl, ofs, NULL, &handle, &obj, &again); if (ret < 0) return ret; @@ -6201,7 +6209,7 @@ public: ret = opstate->renew_state(); if (ret < 0) { ldout(processor->ctx(), 0) << "ERROR: RGWRadosPutObj::handle_data(): failed to renew op state ret=" << ret << dendl; - int r = processor->throttle_data(handle, false); + int r = processor->throttle_data(handle, obj, false); if (r < 0) { ldout(processor->ctx(), 0) << "ERROR: RGWRadosPutObj::handle_data(): processor->throttle_data() returned " << r << dendl; } @@ -6212,7 +6220,7 @@ public: need_opstate = false; } - ret = processor->throttle_data(handle, false); + ret = processor->throttle_data(handle, obj, false); if (ret < 0) return ret; } while (again); @@ -6963,12 +6971,13 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx, do { void *handle; + rgw_obj obj; - ret = processor.handle_data(bl, ofs, NULL, &handle, &again); + ret = processor.handle_data(bl, ofs, NULL, &handle, &obj, &again); if (ret < 0) { return ret; } - ret = processor.throttle_data(handle, false); + ret = processor.throttle_data(handle, obj, false); if (ret < 0) return ret; } while (again); @@ -7592,7 +7601,7 @@ int RGWRados::Object::Delete::delete_obj() int RGWRados::delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, - rgw_obj& obj, + const rgw_obj& obj, int versioning_status, uint16_t bilog_flags, const real_time& expiration_time) diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 0ced215a7c37..af290cb44def 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -2153,7 +2153,7 @@ public: int complete_atomic_modification(); public: - Object(RGWRados *_store, RGWBucketInfo& _bucket_info, RGWObjectCtx& _ctx, rgw_obj& _obj) : store(_store), bucket_info(_bucket_info), + Object(RGWRados *_store, RGWBucketInfo& _bucket_info, RGWObjectCtx& _ctx, const rgw_obj& _obj) : store(_store), bucket_info(_bucket_info), ctx(_ctx), obj(_obj), bs(store), state(NULL), versioning_disabled(false), bs_initialized(false) {} @@ -2564,7 +2564,7 @@ public: /** Delete an object.*/ virtual int delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_owner, - rgw_obj& src_obj, + const rgw_obj& src_obj, int versioning_status, uint16_t bilog_flags = 0, const ceph::real_time& expiration_time = ceph::real_time()); @@ -3075,8 +3075,8 @@ public: store = _store; return 0; } - virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again) = 0; - virtual int throttle_data(void *handle, bool need_to_wait) = 0; + virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, rgw_obj *pobj, bool *again) = 0; + virtual int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) = 0; virtual void complete_hash(MD5 *hash) { assert(0); } @@ -3091,6 +3091,7 @@ public: struct put_obj_aio_info { void *handle; + rgw_obj obj; }; class RGWPutObjProcessor_Aio : public RGWPutObjProcessor @@ -3107,17 +3108,17 @@ class RGWPutObjProcessor_Aio : public RGWPutObjProcessor protected: uint64_t obj_len; - list written_objs; + set written_objs; void add_written_obj(const rgw_obj& obj) { - written_objs.push_back(obj); + written_objs.insert(obj); } int drain_pending(); int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive); public: - int throttle_data(void *handle, bool need_to_wait); + int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait); RGWPutObjProcessor_Aio(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info) : RGWPutObjProcessor(obj_ctx, bucket_info), max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {} virtual ~RGWPutObjProcessor_Aio(); @@ -3152,7 +3153,7 @@ protected: RGWObjManifest manifest; RGWObjManifest::generator manifest_gen; - int write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive); + int write_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool exclusive); virtual int do_complete(string& etag, ceph::real_time *mtime, ceph::real_time set_mtime, map& attrs, ceph::real_time delete_at, const char *if_match = NULL, const char *if_nomatch = NULL); @@ -3185,7 +3186,7 @@ public: void set_extra_data_len(uint64_t len) { extra_data_len = len; } - virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again); + virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, rgw_obj *pobj, bool *again); virtual void complete_hash(MD5 *hash); bufferlist& get_extra_data() { return extra_data_bl; }