From: Yehuda Sadeh Date: Fri, 12 Oct 2018 17:36:18 +0000 (-0700) Subject: rgw: tiering: add support for multipart uploads X-Git-Tag: v14.1.0~314^2~42 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=3354bd48123b4fcec7abfec55b47bf7009dfe4a7;p=ceph.git rgw: tiering: add support for multipart uploads Encode multipart info structure in the upload meta object, store the storage_cass info there. Use the storage class that was set on upload init when uploading all the parts, and when completing the upload. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index c6b52c9486bf..30282babbf23 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -307,7 +307,7 @@ vector get_iam_user_policy_from_attr(CephContext* cct, return policies; } -static int get_obj_attrs(RGWRados *store, struct req_state *s, rgw_obj& obj, map& attrs) +static int get_obj_attrs(RGWRados *store, struct req_state *s, const rgw_obj& obj, map& attrs) { RGWRados::Object op_target(store, s->bucket_info, *static_cast(s->obj_ctx), obj); RGWRados::Object::Read read_op(&op_target); @@ -317,7 +317,116 @@ static int get_obj_attrs(RGWRados *store, struct req_state *s, rgw_obj& obj, map return read_op.prepare(); } -static int modify_obj_attr(RGWRados *store, struct req_state *s, rgw_obj& obj, const char* attr_name, bufferlist& attr_val) +static int get_obj_head(RGWRados *store, struct req_state *s, + const rgw_obj& obj, + map *attrs, + bufferlist *pbl) +{ + store->set_prefetch_data(s->obj_ctx, obj); + + RGWRados::Object op_target(store, s->bucket_info, *static_cast(s->obj_ctx), obj); + RGWRados::Object::Read read_op(&op_target); + + read_op.params.attrs = attrs; + + int ret = read_op.prepare(); + if (ret < 0) { + return ret; + } + + if (!pbl) { + return 0; + } + + ret = read_op.read(0, s->cct->_conf->rgw_max_chunk_size, *pbl); + + return 0; +} + +struct multipart_upload_info +{ + rgw_placement_rule dest_placement; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + encode(dest_placement, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::const_iterator& bl) { + DECODE_START(1, bl); + decode(dest_placement, bl); + DECODE_FINISH(bl); + } +}; +WRITE_CLASS_ENCODER(multipart_upload_info) + +static int get_multipart_info(RGWRados *store, struct req_state *s, + const rgw_obj& obj, + RGWAccessControlPolicy *policy, + map *attrs, + multipart_upload_info *upload_info) +{ + bufferlist header; + + bufferlist headbl; + bufferlist *pheadbl = (upload_info ? &headbl : nullptr); + + int op_ret = get_obj_head(store, s, obj, attrs, pheadbl); + if (op_ret < 0) { + if (op_ret == -ENOENT) { + return -ERR_NO_SUCH_UPLOAD; + } + return op_ret; + } + + if (upload_info && headbl.length() > 0) { + auto hiter = headbl.cbegin(); + try { + decode(*upload_info, hiter); + } catch (buffer::error& err) { + ldpp_dout(s, 0) << "ERROR: failed to decode multipart upload info" << dendl; + return -EIO; + } + } + + if (policy && attrs) { + for (auto& iter : *attrs) { + string name = iter.first; + if (name.compare(RGW_ATTR_ACL) == 0) { + bufferlist& bl = iter.second; + auto bli = bl.cbegin(); + try { + decode(*policy, bli); + } catch (buffer::error& err) { + ldpp_dout(s, 0) << "ERROR: could not decode policy" << dendl; + return -EIO; + } + break; + } + } + } + + return 0; +} + +static int get_multipart_info(RGWRados *store, struct req_state *s, + string& meta_oid, + RGWAccessControlPolicy *policy, + map *attrs, + multipart_upload_info *upload_info) +{ + map::iterator iter; + bufferlist header; + + rgw_obj meta_obj; + meta_obj.init_ns(s->bucket, meta_oid, mp_ns); + meta_obj.set_in_extra_data(true); + + return get_multipart_info(store, s, meta_obj, policy, attrs, upload_info); +} + +static int modify_obj_attr(RGWRados *store, struct req_state *s, const rgw_obj& obj, const char* attr_name, bufferlist& attr_val) { map attrs; RGWRados::Object op_target(store, s->bucket_info, *static_cast(s->obj_ctx), obj); @@ -3512,8 +3621,21 @@ void RGWPutObj::execute() ceph::static_ptr processor; if (multipart) { + RGWMPObj mp(s->object.name, multipart_upload_id); + + multipart_upload_info upload_info; + op_ret = get_multipart_info(store, s, mp.get_meta(), nullptr, nullptr, &upload_info); + if (op_ret < 0) { + if (op_ret != -ENOENT) { + ldpp_dout(this, 0) << "ERROR: get_multipart_info returned " << op_ret << ": " << cpp_strerror(-op_ret) << dendl; + } else {// -ENOENT: raced with upload complete/cancel, no need to spam log + ldpp_dout(this, 20) << "failed to get multipart info (returned " << op_ret << ": " << cpp_strerror(-op_ret) << "): probably raced with upload complete / cancel" << dendl; + } + return; + } + ldpp_dout(this, 20) << "dest_placement for part=" << upload_info.dest_placement << dendl; processor.emplace( - &aio, store, s->bucket_info, &s->dest_placement, + &aio, store, s->bucket_info, &upload_info.dest_placement, s->owner.get_id(), obj_ctx, obj, multipart_upload_id, multipart_part_num, multipart_part_str); } else { @@ -5349,48 +5471,15 @@ void RGWInitMultipart::execute() obj_op.meta.category = RGWObjCategory::MultiMeta; obj_op.meta.flags = PUT_OBJ_CREATE_EXCL; - op_ret = obj_op.write_meta(0, 0, attrs); - } while (op_ret == -EEXIST); -} + multipart_upload_info upload_info; + upload_info.dest_placement = s->dest_placement; -static int get_multipart_info(RGWRados *store, struct req_state *s, - string& meta_oid, - RGWAccessControlPolicy *policy, - map& attrs) -{ - map::iterator iter; - bufferlist header; - - rgw_obj obj; - obj.init_ns(s->bucket, meta_oid, mp_ns); - obj.set_in_extra_data(true); - - int op_ret = get_obj_attrs(store, s, obj, attrs); - if (op_ret < 0) { - if (op_ret == -ENOENT) { - return -ERR_NO_SUCH_UPLOAD; - } - return op_ret; - } - - if (policy) { - for (iter = attrs.begin(); iter != attrs.end(); ++iter) { - string name = iter->first; - if (name.compare(RGW_ATTR_ACL) == 0) { - bufferlist& bl = iter->second; - auto bli = bl.cbegin(); - try { - decode(*policy, bli); - } catch (buffer::error& err) { - ldpp_dout(s, 0) << "ERROR: could not decode policy" << dendl; - return -EIO; - } - break; - } - } - } + bufferlist bl; + encode(upload_info, bl); + obj_op.meta.data = &bl; - return 0; + op_ret = obj_op.write_meta(bl.length(), 0, attrs); + } while (op_ret == -EEXIST); } int RGWCompleteMultipart::verify_permission() @@ -5769,7 +5858,6 @@ void RGWAbortMultipart::execute() string upload_id; string meta_oid; upload_id = s->info.args.get("uploadId"); - map attrs; rgw_obj meta_obj; RGWMPObj mp; @@ -5779,7 +5867,7 @@ void RGWAbortMultipart::execute() mp.init(s->object.name, upload_id); meta_oid = mp.get_meta(); - op_ret = get_multipart_info(store, s, meta_oid, NULL, attrs); + op_ret = get_multipart_info(store, s, meta_oid, nullptr, nullptr, nullptr); if (op_ret < 0) return; @@ -5802,7 +5890,6 @@ void RGWListMultipart::pre_exec() void RGWListMultipart::execute() { - map xattrs; string meta_oid; RGWMPObj mp; @@ -5813,7 +5900,7 @@ void RGWListMultipart::execute() mp.init(s->object.name, upload_id); meta_oid = mp.get_meta(); - op_ret = get_multipart_info(store, s, meta_oid, &policy, xattrs); + op_ret = get_multipart_info(store, s, meta_oid, &policy, nullptr, nullptr); if (op_ret < 0) return; diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 3114a083d031..91d02e139fb3 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -1105,7 +1105,7 @@ public: assert (!obj.empty()); objs_state[obj].is_atomic = true; } - void set_prefetch_data(rgw_obj& obj) { + void set_prefetch_data(const rgw_obj& obj) { RWLock::WLocker wl(lock); assert (!obj.empty()); objs_state[obj].prefetch_data = true; @@ -2102,7 +2102,7 @@ public: RGWObjectCtx *rctx = static_cast(ctx); rctx->set_atomic(obj); } - void set_prefetch_data(void *ctx, rgw_obj& obj) { + void set_prefetch_data(void *ctx, const rgw_obj& obj) { RGWObjectCtx *rctx = static_cast(ctx); rctx->set_prefetch_data(obj); }