From 9e8c86173a4980d6fa0514497b852659f66d0adb Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 27 Nov 2012 11:50:38 -0800 Subject: [PATCH] rgw: unaccount multipart parts when completing upload We need to do it atomically, so info about parts is sent to the bucket index objclass. Signed-off-by: Yehuda Sadeh --- src/cls/rgw/cls_rgw.cc | 85 ++++++++++++++++++++++++----------- src/cls/rgw/cls_rgw_client.cc | 5 ++- src/cls/rgw/cls_rgw_client.h | 3 +- src/cls/rgw/cls_rgw_ops.h | 10 ++++- src/rgw/rgw_cache.h | 6 +-- src/rgw/rgw_op.cc | 21 +++++---- src/rgw/rgw_rados.cc | 29 ++++++------ src/rgw/rgw_rados.h | 11 ++--- 8 files changed, 111 insertions(+), 59 deletions(-) diff --git a/src/cls/rgw/cls_rgw.cc b/src/cls/rgw/cls_rgw.cc index 835521d941e10..577d6637b31cd 100644 --- a/src/cls/rgw/cls_rgw.cc +++ b/src/cls/rgw/cls_rgw.cc @@ -301,6 +301,34 @@ int rgw_bucket_prepare_op(cls_method_context_t hctx, bufferlist *in, bufferlist return rc; } +static void unaccount_entry(struct rgw_bucket_dir_header& header, struct rgw_bucket_dir_entry& entry) +{ + struct rgw_bucket_category_stats& stats = header.stats[entry.meta.category]; + stats.num_entries--; + stats.total_size -= entry.meta.size; + stats.total_size_rounded -= get_rounded_size(entry.meta.size); +} + +static int read_index_entry(cls_method_context_t hctx, string& name, struct rgw_bucket_dir_entry *entry) +{ + bufferlist current_entry; + int rc = cls_cxx_map_get_val(hctx, name, ¤t_entry); + if (rc < 0) { + return rc; + } + + bufferlist::iterator cur_iter = current_entry.begin(); + try { + ::decode(*entry, cur_iter); + } catch (buffer::error& err) { + CLS_LOG(1, "ERROR: read_index_entry(): failed to decode entry\n"); + return -EIO; + } + + CLS_LOG(1, "read_index_entry(): existing entry: epoch=%lld name=%s locator=%s\n", entry->epoch, entry->name.c_str(), entry->locator.c_str()); + return 0; +} + int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist *out) { // decode request @@ -327,29 +355,18 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist return -EINVAL; } - bufferlist current_entry; struct rgw_bucket_dir_entry entry; bool ondisk = true; - rc = cls_cxx_map_get_val(hctx, op.name, ¤t_entry); - if (rc < 0) { - if (rc != -ENOENT) { - return rc; - } else { - rc = 0; - entry.name = op.name; - entry.epoch = op.epoch; - entry.meta = op.meta; - entry.locator = op.locator; - ondisk = false; - } - } else { - bufferlist::iterator cur_iter = current_entry.begin(); - try { - ::decode(entry, cur_iter); - CLS_LOG(1, "rgw_bucket_complete_op(): existing entry: epoch=%lld name=%s locator=%s\n", entry.epoch, entry.name.c_str(), entry.locator.c_str()); - } catch (buffer::error& err) { - CLS_LOG(1, "ERROR: rgw_bucket_complete_op(): failed to decode entry\n"); - } + + rc = read_index_entry(hctx, op.name, &entry); + if (rc == -ENOENT) { + entry.name = op.name; + entry.epoch = op.epoch; + entry.meta = op.meta; + entry.locator = op.locator; + ondisk = false; + } else if (rc < 0) { + return rc; } if (op.tag.size()) { @@ -384,10 +401,7 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist } if (entry.exists) { - struct rgw_bucket_category_stats& stats = header.stats[entry.meta.category]; - stats.num_entries--; - stats.total_size -= entry.meta.size; - stats.total_size_rounded -= get_rounded_size(entry.meta.size); + unaccount_entry(header, entry); } switch (op.op) { @@ -429,6 +443,27 @@ int rgw_bucket_complete_op(cls_method_context_t hctx, bufferlist *in, bufferlist break; } + list::iterator remove_iter; + CLS_LOG(0, "rgw_bucket_complete_op(): remove_objs.size()=%d\n", op.remove_objs.size()); + for (remove_iter = op.remove_objs.begin(); remove_iter != op.remove_objs.end(); ++remove_iter) { + string& remove_oid_name = *remove_iter; + CLS_LOG(1, "rgw_bucket_complete_op(): removing entries, read_index_entry name=%s\n", remove_oid_name.c_str()); + struct rgw_bucket_dir_entry remove_entry; + int ret = read_index_entry(hctx, remove_oid_name, &remove_entry); + if (ret < 0) { + CLS_LOG(1, "rgw_bucket_complete_op(): removing entries, read_index_entry name=%s ret=%d\n", remove_oid_name.c_str(), rc); + continue; + } + CLS_LOG(0, "rgw_bucket_complete_op(): entry.name=%s entry.meta.category=%d\n", remove_entry.name.c_str(), remove_entry.meta.category); + unaccount_entry(header, remove_entry); + + ret = cls_cxx_map_remove_key(hctx, remove_oid_name); + if (ret < 0) { + CLS_LOG(1, "rgw_bucket_complete_op(): cls_cxx_map_remove_key, failed to remove entry, name=%s read_index_entry ret=%d\n", remove_oid_name.c_str(), rc); + continue; + } + } + bufferlist new_header_bl; ::encode(header, new_header_bl); return cls_cxx_map_write_header(hctx, &new_header_bl); diff --git a/src/cls/rgw/cls_rgw_client.cc b/src/cls/rgw/cls_rgw_client.cc index b873a86690384..aa140bd49b829 100644 --- a/src/cls/rgw/cls_rgw_client.cc +++ b/src/cls/rgw/cls_rgw_client.cc @@ -35,7 +35,8 @@ void cls_rgw_bucket_prepare_op(ObjectWriteOperation& o, uint8_t op, string& tag, } void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, uint8_t op, string& tag, - uint64_t epoch, string& name, rgw_bucket_dir_entry_meta& dir_meta) + uint64_t epoch, string& name, rgw_bucket_dir_entry_meta& dir_meta, + list *remove_objs) { bufferlist in; @@ -45,6 +46,8 @@ void cls_rgw_bucket_complete_op(ObjectWriteOperation& o, uint8_t op, string& tag call.name = name; call.epoch = epoch; call.meta = dir_meta; + if (remove_objs) + call.remove_objs = *remove_objs; ::encode(call, in); o.exec("rgw", "bucket_complete_op", in); } diff --git a/src/cls/rgw/cls_rgw_client.h b/src/cls/rgw/cls_rgw_client.h index d2218b9de7b87..4ab8e902d4f7f 100644 --- a/src/cls/rgw/cls_rgw_client.h +++ b/src/cls/rgw/cls_rgw_client.h @@ -14,7 +14,8 @@ void cls_rgw_bucket_prepare_op(librados::ObjectWriteOperation& o, uint8_t op, st string& name, string& locator); void cls_rgw_bucket_complete_op(librados::ObjectWriteOperation& o, uint8_t op, string& tag, - uint64_t epoch, string& name, rgw_bucket_dir_entry_meta& dir_meta); + uint64_t epoch, string& name, rgw_bucket_dir_entry_meta& dir_meta, + list *remove_objs); int cls_rgw_list_op(librados::IoCtx& io_ctx, string& oid, string& start_obj, string& filter_prefix, uint32_t num_entries, diff --git a/src/cls/rgw/cls_rgw_ops.h b/src/cls/rgw/cls_rgw_ops.h index 09c908fccd1d5..2ba72299fac07 100644 --- a/src/cls/rgw/cls_rgw_ops.h +++ b/src/cls/rgw/cls_rgw_ops.h @@ -66,18 +66,21 @@ struct rgw_cls_obj_complete_op struct rgw_bucket_dir_entry_meta meta; string tag; + list remove_objs; + void encode(bufferlist &bl) const { - ENCODE_START(3, 3, bl); + ENCODE_START(4, 3, bl); ::encode(op, bl); ::encode(name, bl); ::encode(epoch, bl); ::encode(meta, bl); ::encode(tag, bl); ::encode(locator, bl); + ::encode(remove_objs, bl); ENCODE_FINISH(bl); } void decode(bufferlist::iterator &bl) { - DECODE_START_LEGACY_COMPAT_LEN(3, 3, 3, bl); + DECODE_START_LEGACY_COMPAT_LEN(4, 3, 3, bl); ::decode(op, bl); ::decode(name, bl); ::decode(epoch, bl); @@ -86,6 +89,9 @@ struct rgw_cls_obj_complete_op if (struct_v >= 2) { ::decode(locator, bl); } + if (struct_v >= 4) { + ::decode(remove_objs, bl); + } DECODE_FINISH(bl); } void dump(Formatter *f) const; diff --git a/src/rgw/rgw_cache.h b/src/rgw/rgw_cache.h index c2ac64d7b0b51..fb6ca10692d83 100644 --- a/src/rgw/rgw_cache.h +++ b/src/rgw/rgw_cache.h @@ -192,7 +192,7 @@ public: int put_obj_meta(void *ctx, rgw_obj& obj, uint64_t size, time_t *mtime, map& attrs, RGWObjCategory category, int flags, map* rmattrs, const bufferlist *data, - RGWObjManifest *manifest, const string *ptag); + RGWObjManifest *manifest, const string *ptag, list *remove_objs); int put_obj_data(void *ctx, rgw_obj& obj, const char *data, off_t ofs, size_t len, bool exclusive); @@ -347,7 +347,7 @@ template int RGWCache::put_obj_meta(void *ctx, rgw_obj& obj, uint64_t size, time_t *mtime, map& attrs, RGWObjCategory category, int flags, map* rmattrs, const bufferlist *data, - RGWObjManifest *manifest, const string *ptag) + RGWObjManifest *manifest, const string *ptag, list *remove_objs) { rgw_bucket bucket; string oid; @@ -364,7 +364,7 @@ int RGWCache::put_obj_meta(void *ctx, rgw_obj& obj, uint64_t size, time_t *mt info.flags |= CACHE_FLAG_DATA; } } - int ret = T::put_obj_meta(ctx, obj, size, mtime, attrs, category, flags, rmattrs, data, manifest, ptag); + int ret = T::put_obj_meta(ctx, obj, size, mtime, attrs, category, flags, rmattrs, data, manifest, ptag, remove_objs); if (cacheable) { string name = normal_name(bucket, oid); if (ret >= 0) { diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 49863c3046d94..40bae78d72804 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -945,7 +945,7 @@ int RGWPutObjProcessor_Plain::handle_data(bufferlist& bl, off_t _ofs, void **pha int RGWPutObjProcessor_Plain::do_complete(string& etag, map& attrs) { int r = store->put_obj_meta(s->obj_ctx, obj, data.length(), NULL, attrs, - RGW_OBJ_CATEGORY_MAIN, PUT_OBJ_CREATE, NULL, &data, NULL, NULL); + RGW_OBJ_CATEGORY_MAIN, PUT_OBJ_CREATE, NULL, &data, NULL, NULL, NULL); return r; } @@ -1154,7 +1154,7 @@ int RGWPutObjProcessor_Atomic::do_complete(string& etag, map store->set_atomic(s->obj_ctx, head_obj); int r = store->put_obj_meta(s->obj_ctx, head_obj, obj_len, NULL, attrs, - RGW_OBJ_CATEGORY_MAIN, PUT_OBJ_CREATE, NULL, &first_chunk, &manifest, NULL); + RGW_OBJ_CATEGORY_MAIN, PUT_OBJ_CREATE, NULL, &first_chunk, &manifest, NULL, NULL); return r; } @@ -1199,7 +1199,7 @@ int RGWPutObjProcessor_Multipart::do_complete(string& etag, mapput_obj_meta(s->obj_ctx, head_obj, s->obj_size, NULL, attrs, RGW_OBJ_CATEGORY_MAIN, 0, NULL, NULL, NULL, NULL); + int r = store->put_obj_meta(s->obj_ctx, head_obj, s->obj_size, NULL, attrs, RGW_OBJ_CATEGORY_MAIN, 0, NULL, NULL, NULL, NULL, NULL); if (r < 0) return r; @@ -1859,7 +1859,7 @@ void RGWInitMultipart::execute() obj.init_ns(s->bucket, tmp_obj_name, mp_ns); // the meta object will be indexed with 0 size, we c - ret = store->put_obj_meta(s->obj_ctx, obj, 0, NULL, attrs, RGW_OBJ_CATEGORY_MULTIMETA, PUT_OBJ_CREATE_EXCL, NULL, NULL, NULL, NULL); + ret = store->put_obj_meta(s->obj_ctx, obj, 0, NULL, attrs, RGW_OBJ_CATEGORY_MULTIMETA, PUT_OBJ_CREATE_EXCL, NULL, NULL, NULL, NULL, NULL); } while (ret == -EEXIST); } @@ -2007,15 +2007,16 @@ void RGWCompleteMultipart::execute() attrs[RGW_ATTR_ETAG] = etag_bl; target_obj.init(s->bucket, s->object_str); + + list remove_objs; /* objects to be removed from index listing */ for (obj_iter = obj_parts.begin(); obj_iter != obj_parts.end(); ++obj_iter) { RGWUploadPartInfo& obj_part = obj_iter->second; + string oid = mp.get_part(obj_iter->second.num); + rgw_obj src_obj; + src_obj.init_ns(s->bucket, oid, mp_ns); if (obj_part.manifest.empty()) { - string oid = mp.get_part(obj_iter->second.num); - rgw_obj src_obj; - src_obj.init_ns(s->bucket, oid, mp_ns); - RGWObjManifestPart& part = manifest.objs[ofs]; part.loc = src_obj; @@ -2025,6 +2026,8 @@ void RGWCompleteMultipart::execute() manifest.append(obj_part.manifest); } + remove_objs.push_back(src_obj.object); + ofs += obj_part.size; } @@ -2033,7 +2036,7 @@ void RGWCompleteMultipart::execute() store->set_atomic(s->obj_ctx, target_obj); ret = store->put_obj_meta(s->obj_ctx, target_obj, ofs, NULL, attrs, - RGW_OBJ_CATEGORY_MAIN, PUT_OBJ_CREATE, NULL, NULL, &manifest, NULL); + RGW_OBJ_CATEGORY_MAIN, PUT_OBJ_CREATE, NULL, NULL, &manifest, NULL, &remove_objs); if (ret < 0) return; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index aa25d507dfb02..c09fffba4408e 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -1017,7 +1017,8 @@ int RGWRados::put_obj_meta(void *ctx, rgw_obj& obj, uint64_t size, map* rmattrs, const bufferlist *data, RGWObjManifest *manifest, - const string *ptag) + const string *ptag, + list *remove_objs) { rgw_bucket bucket; std::string oid, key; @@ -1117,7 +1118,7 @@ int RGWRados::put_obj_meta(void *ctx, rgw_obj& obj, uint64_t size, ut = ceph_clock_now(cct); r = complete_update_index(bucket, obj.object, index_tag, epoch, size, - ut, etag, content_type, &acl_bl, category); + ut, etag, content_type, &acl_bl, category, remove_objs); if (r < 0) goto done_cancel; @@ -1340,7 +1341,7 @@ int RGWRados::copy_obj(void *ctx, manifest.obj_size = total_len; - ret = put_obj_meta(ctx, dest_obj, end + 1, NULL, attrset, category, PUT_OBJ_CREATE, NULL, &first_chunk, &manifest, &tag); + ret = put_obj_meta(ctx, dest_obj, end + 1, NULL, attrset, category, PUT_OBJ_CREATE, NULL, &first_chunk, &manifest, &tag, NULL); if (mtime) obj_stat(ctx, dest_obj, NULL, mtime, NULL, NULL, NULL); @@ -1429,7 +1430,7 @@ int RGWRados::copy_obj_data(void *ctx, } manifest.obj_size = ofs; - ret = put_obj_meta(ctx, dest_obj, end + 1, NULL, attrs, category, PUT_OBJ_CREATE, NULL, &first_chunk, &manifest, NULL); + ret = put_obj_meta(ctx, dest_obj, end + 1, NULL, attrs, category, PUT_OBJ_CREATE, NULL, &first_chunk, &manifest, NULL, NULL); if (mtime) obj_stat(ctx, dest_obj, NULL, mtime, NULL, NULL, NULL); @@ -2349,7 +2350,8 @@ int RGWRados::prepare_update_index(RGWObjState *state, rgw_bucket& bucket, } int RGWRados::complete_update_index(rgw_bucket& bucket, string& oid, string& tag, uint64_t epoch, uint64_t size, - utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category) + utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category, + list *remove_objs) { if (bucket_is_system(bucket)) return 0; @@ -2370,7 +2372,7 @@ int RGWRados::complete_update_index(rgw_bucket& bucket, string& oid, string& tag ent.owner_display_name = owner.get_display_name(); ent.content_type = content_type; - int ret = cls_obj_complete_add(bucket, tag, epoch, ent, category); + int ret = cls_obj_complete_add(bucket, tag, epoch, ent, category, remove_objs); return ret; } @@ -2489,7 +2491,7 @@ done: if (update_index) { if (ret >= 0) { ret = complete_update_index(bucket, dst_obj.object, tag, epoch, size, - ut, etag, content_type, &acl_bl, category); + ut, etag, content_type, &acl_bl, category, NULL); } else { int r = complete_update_index_cancel(bucket, dst_obj.object, tag); if (r < 0) { @@ -3016,7 +3018,8 @@ int RGWRados::cls_obj_prepare_op(rgw_bucket& bucket, uint8_t op, string& tag, return r; } -int RGWRados::cls_obj_complete_op(rgw_bucket& bucket, uint8_t op, string& tag, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category) +int RGWRados::cls_obj_complete_op(rgw_bucket& bucket, uint8_t op, string& tag, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, + list *remove_objs) { librados::IoCtx io_ctx; string oid; @@ -3034,7 +3037,7 @@ int RGWRados::cls_obj_complete_op(rgw_bucket& bucket, uint8_t op, string& tag, u dir_meta.owner_display_name = ent.owner_display_name; dir_meta.content_type = ent.content_type; dir_meta.category = category; - cls_rgw_bucket_complete_op(o, op, tag, epoch, ent.name, dir_meta); + cls_rgw_bucket_complete_op(o, op, tag, epoch, ent.name, dir_meta, remove_objs); AioCompletion *c = librados::Rados::aio_create_completion(NULL, NULL, NULL); r = io_ctx.aio_operate(oid, c, &o); @@ -3042,23 +3045,23 @@ int RGWRados::cls_obj_complete_op(rgw_bucket& bucket, uint8_t op, string& tag, u return r; } -int RGWRados::cls_obj_complete_add(rgw_bucket& bucket, string& tag, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category) +int RGWRados::cls_obj_complete_add(rgw_bucket& bucket, string& tag, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list *remove_objs) { - return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, epoch, ent, category); + return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, epoch, ent, category, remove_objs); } int RGWRados::cls_obj_complete_del(rgw_bucket& bucket, string& tag, uint64_t epoch, string& name) { RGWObjEnt ent; ent.name = name; - return cls_obj_complete_op(bucket, CLS_RGW_OP_DEL, tag, epoch, ent, RGW_OBJ_CATEGORY_NONE); + return cls_obj_complete_op(bucket, CLS_RGW_OP_DEL, tag, epoch, ent, RGW_OBJ_CATEGORY_NONE, NULL); } int RGWRados::cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name) { RGWObjEnt ent; ent.name = name; - return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, 0, ent, RGW_OBJ_CATEGORY_NONE); + return cls_obj_complete_op(bucket, CLS_RGW_OP_ADD, tag, 0, ent, RGW_OBJ_CATEGORY_NONE, NULL); } int RGWRados::cls_bucket_list(rgw_bucket& bucket, string start, string prefix, diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index d8963e2a91e1b..0209977763fb2 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -462,7 +462,7 @@ public: virtual int put_obj_meta(void *ctx, rgw_obj& obj, uint64_t size, time_t *mtime, map& attrs, RGWObjCategory category, int flags, map* rmattrs, const bufferlist *data, - RGWObjManifest *manifest, const string *ptag); + RGWObjManifest *manifest, const string *ptag, list *remove_objs); virtual int put_obj_data(void *ctx, rgw_obj& obj, const char *data, off_t ofs, size_t len, bool exclusive); virtual int aio_put_obj_data(void *ctx, rgw_obj& obj, bufferlist& bl, @@ -475,7 +475,7 @@ public: int flags = PUT_OBJ_CREATE; if (exclusive) flags |= PUT_OBJ_EXCL; - int ret = put_obj_meta(ctx, obj, len, mtime, attrs, RGW_OBJ_CATEGORY_NONE, flags, NULL, &bl, NULL, NULL); + int ret = put_obj_meta(ctx, obj, len, mtime, attrs, RGW_OBJ_CATEGORY_NONE, flags, NULL, &bl, NULL, NULL, NULL); return ret; } virtual int aio_wait(void *handle); @@ -673,8 +673,8 @@ public: int cls_obj_prepare_op(rgw_bucket& bucket, uint8_t op, string& tag, string& name, string& locator); int cls_obj_complete_op(rgw_bucket& bucket, uint8_t op, string& tag, uint64_t epoch, - RGWObjEnt& ent, RGWObjCategory category); - int cls_obj_complete_add(rgw_bucket& bucket, string& tag, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category); + RGWObjEnt& ent, RGWObjCategory category, list *remove_objs); + int cls_obj_complete_add(rgw_bucket& bucket, string& tag, uint64_t epoch, RGWObjEnt& ent, RGWObjCategory category, list *remove_objs); int cls_obj_complete_del(rgw_bucket& bucket, string& tag, uint64_t epoch, string& name); int cls_obj_complete_cancel(rgw_bucket& bucket, string& tag, string& name); int cls_bucket_list(rgw_bucket& bucket, string start, string prefix, uint32_t num, @@ -684,7 +684,8 @@ public: int prepare_update_index(RGWObjState *state, rgw_bucket& bucket, rgw_obj& oid, string& tag); int complete_update_index(rgw_bucket& bucket, string& oid, string& tag, uint64_t epoch, uint64_t size, - utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category); + utime_t& ut, string& etag, string& content_type, bufferlist *acl_bl, RGWObjCategory category, + list *remove_objs); int complete_update_index_del(rgw_bucket& bucket, string& oid, string& tag, uint64_t epoch) { return cls_obj_complete_del(bucket, tag, epoch, oid); } -- 2.39.5