From dd29310bd6273ec3643bf90e51f7cae4801629c7 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Mon, 16 May 2016 14:35:12 -0700 Subject: [PATCH] rgw: keep track of written_objs correctly Fixes: http://tracker.ceph.com/issues/15886 Only add a rados object to the written_objs list if the write was successful. Otherwise if the write will be canceled for some reason, we'd remove an object that we didn't write to. This was a problem in a case where there's multiple writes that went to the same part. The second writer should fail the write, since we do an exclusive write. However, we added the object's name to the written_objs list anyway, which was a real problem when the old processor was disposed (as it was clearing the objects). Signed-off-by: Yehuda Sadeh (cherry picked from commit 8fd74d11682f9d0c9085d2dc445fc3eb5631f6e0) --- src/rgw/rgw_op.cc | 5 +++-- src/rgw/rgw_rados.cc | 41 +++++++++++++++++++++++++---------------- src/rgw/rgw_rados.h | 19 ++++++++++--------- 3 files changed, 38 insertions(+), 27 deletions(-) diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index cd8785f19dea8..dd413dd3c8c04 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -1684,12 +1684,13 @@ static int put_data_and_throttle(RGWPutObjProcessor *processor, bufferlist& data do { void *handle; + rgw_obj obj; - int ret = processor->handle_data(data, ofs, hash, &handle, &again); + int ret = processor->handle_data(data, ofs, hash, &handle, &obj, &again); if (ret < 0) return ret; - ret = processor->throttle_data(handle, need_to_wait); + ret = processor->throttle_data(handle, obj, need_to_wait); if (ret < 0) return ret; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 05c41ef4c33db..91f7ce1e0e28e 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -914,7 +914,7 @@ RGWPutObjProcessor_Aio::~RGWPutObjProcessor_Aio() if (is_complete) return; - list::iterator iter; + set::iterator iter; bool is_multipart_obj = false; rgw_obj multipart_obj; @@ -926,7 +926,7 @@ RGWPutObjProcessor_Aio::~RGWPutObjProcessor_Aio() * details is describled on #11749 */ for (iter = written_objs.begin(); iter != written_objs.end(); ++iter) { - rgw_obj &obj = *iter; + const rgw_obj &obj = *iter; if (RGW_OBJ_NS_MULTIPART == obj.ns) { ldout(store->ctx(), 5) << "NOTE: we should not process the multipart object (" << obj << ") here" << dendl; multipart_obj = *iter; @@ -955,7 +955,6 @@ int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t obj_len = abs_ofs + bl.length(); if (!(obj == last_written_obj)) { - add_written_obj(obj); last_written_obj = obj; } @@ -965,7 +964,6 @@ int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t bl, ((ofs != 0) ? ofs : -1), exclusive, phandle); - return r; } @@ -984,6 +982,11 @@ int RGWPutObjProcessor_Aio::wait_pending_front() } struct put_obj_aio_info info = pop_pending(); int ret = store->aio_wait(info.handle); + + if (ret >= 0) { + add_written_obj(info.obj); + } + return ret; } @@ -1007,11 +1010,12 @@ int RGWPutObjProcessor_Aio::drain_pending() return ret; } -int RGWPutObjProcessor_Aio::throttle_data(void *handle, bool need_to_wait) +int RGWPutObjProcessor_Aio::throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) { if (handle) { struct put_obj_aio_info info; info.handle = handle; + info.obj = obj; pending.push_back(info); } size_t orig_size = pending.size(); @@ -1042,7 +1046,7 @@ int RGWPutObjProcessor_Aio::throttle_data(void *handle, bool need_to_wait) return 0; } -int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive) +int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool exclusive) { if (ofs >= next_part_ofs) { int r = prepare_next_part(ofs); @@ -1051,10 +1055,12 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan } } + *pobj = cur_obj; + return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive); } -int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again) +int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, rgw_obj *pobj, bool *again) { *again = false; @@ -1103,7 +1109,7 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, MD5 *hash, bool exclusive = (!write_ofs && immutable_head()); /* immutable head object, need to verify nothing exists there we could be racing with another upload, to the same object and cleanup can be messy */ - int ret = write_data(bl, write_ofs, phandle, exclusive); + int ret = write_data(bl, write_ofs, phandle, pobj, exclusive); if (ret >= 0) { /* we might return, need to clear bl as it was already sent */ if (hash) { hash->Update((const byte *)bl.c_str(), bl.length()); @@ -1185,12 +1191,13 @@ int RGWPutObjProcessor_Atomic::complete_writing_data() } if (pending_data_bl.length()) { void *handle; - int r = write_data(pending_data_bl, data_ofs, &handle, false); + rgw_obj obj; + int r = write_data(pending_data_bl, data_ofs, &handle, &obj, false); if (r < 0) { ldout(store->ctx(), 0) << "ERROR: write_data() returned " << r << dendl; return r; } - r = throttle_data(handle, false); + r = throttle_data(handle, obj, false); if (r < 0) { ldout(store->ctx(), 0) << "ERROR: throttle_data() returned " << r << dendl; return r; @@ -3658,7 +3665,8 @@ public: do { void *handle; - int ret = processor->handle_data(bl, ofs, NULL, &handle, &again); + rgw_obj obj; + int ret = processor->handle_data(bl, ofs, NULL, &handle, &obj, &again); if (ret < 0) return ret; @@ -3669,7 +3677,7 @@ public: ret = opstate->renew_state(); if (ret < 0) { ldout(processor->ctx(), 0) << "ERROR: RGWRadosPutObj::handle_data(): failed to renew op state ret=" << ret << dendl; - int r = processor->throttle_data(handle, false); + int r = processor->throttle_data(handle, obj, false); if (r < 0) { ldout(processor->ctx(), 0) << "ERROR: RGWRadosPutObj::handle_data(): processor->throttle_data() returned " << r << dendl; } @@ -3680,7 +3688,7 @@ public: need_opstate = false; } - ret = processor->throttle_data(handle, false); + ret = processor->throttle_data(handle, obj, false); if (ret < 0) return ret; } while (again); @@ -4240,12 +4248,13 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx, do { void *handle; + rgw_obj obj; - ret = processor.handle_data(bl, ofs, NULL, &handle, &again); + ret = processor.handle_data(bl, ofs, NULL, &handle, &obj, &again); if (ret < 0) { return ret; } - ret = processor.throttle_data(handle, false); + ret = processor.throttle_data(handle, obj, false); if (ret < 0) return ret; } while (again); @@ -4794,7 +4803,7 @@ int RGWRados::Object::Delete::delete_obj() return 0; } -int RGWRados::delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, rgw_obj& obj, +int RGWRados::delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& obj, int versioning_status, uint16_t bilog_flags) { RGWRados::Object del_target(this, bucket_info, obj_ctx, obj); diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 37c7e8a731d3b..38d7c29f71a82 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1487,7 +1487,7 @@ public: int complete_atomic_modification(); public: - Object(RGWRados *_store, RGWBucketInfo& _bucket_info, RGWObjectCtx& _ctx, rgw_obj& _obj) : store(_store), bucket_info(_bucket_info), + Object(RGWRados *_store, RGWBucketInfo& _bucket_info, RGWObjectCtx& _ctx, const rgw_obj& _obj) : store(_store), bucket_info(_bucket_info), ctx(_ctx), obj(_obj), bs(store), state(NULL), versioning_disabled(false), bs_initialized(false) {} @@ -1857,7 +1857,7 @@ public: int bucket_suspended(rgw_bucket& bucket, bool *suspended); /** Delete an object.*/ - virtual int delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_owner, rgw_obj& src_obj, + virtual int delete_obj(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_owner, const rgw_obj& src_obj, int versioning_status, uint16_t bilog_flags = 0); /* Delete a system object */ @@ -2317,8 +2317,8 @@ public: store = _store; return 0; } - virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again) = 0; - virtual int throttle_data(void *handle, bool need_to_wait) = 0; + virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, rgw_obj *pobj, bool *again) = 0; + virtual int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) = 0; virtual void complete_hash(MD5 *hash) { assert(0); } @@ -2331,6 +2331,7 @@ public: struct put_obj_aio_info { void *handle; + rgw_obj obj; }; class RGWPutObjProcessor_Aio : public RGWPutObjProcessor @@ -2347,17 +2348,17 @@ class RGWPutObjProcessor_Aio : public RGWPutObjProcessor protected: uint64_t obj_len; - list written_objs; + set written_objs; void add_written_obj(const rgw_obj& obj) { - written_objs.push_back(obj); + written_objs.insert(obj); } int drain_pending(); int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive); public: - int throttle_data(void *handle, bool need_to_wait); + int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait); RGWPutObjProcessor_Aio(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info) : RGWPutObjProcessor(obj_ctx, bucket_info), max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {} virtual ~RGWPutObjProcessor_Aio(); @@ -2392,7 +2393,7 @@ protected: RGWObjManifest manifest; RGWObjManifest::generator manifest_gen; - int write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive); + int write_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool exclusive); virtual int do_complete(string& etag, time_t *mtime, time_t set_mtime, map& attrs, const char *if_match = NULL, const char *if_nomatch = NULL); @@ -2425,7 +2426,7 @@ public: void set_extra_data_len(uint64_t len) { extra_data_len = len; } - virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again); + virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, rgw_obj *pobj, bool *again); virtual void complete_hash(MD5 *hash); bufferlist& get_extra_data() { return extra_data_bl; } -- 2.39.5