return (s.empty() || (unsigned char)s[0] != BI_PREFIX_CHAR);
}
-int bi_entry_type(const string& s)
+static int bi_entry_type(const string& s)
{
if (bi_is_plain_entry(s)) {
return BI_BUCKET_OBJS_INDEX;
return 0;
}
-int rgw_user_usage_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+static int rgw_user_usage_log_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
CLS_LOG(10, "entered %s", __func__);
return 0;
}
+static int rgw_mp_upload_part_info_update(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
+{
+ CLS_LOG(10, "entered %s", __func__);
+ cls_rgw_mp_upload_part_info_update_op op;
+ auto in_iter = in->cbegin();
+ try {
+ decode(op, in_iter);
+ } catch (ceph::buffer::error& err) {
+ CLS_LOG(1, "ERROR: rgw_cls_mp_upload_part_info_update(): failed to decode op\n");
+ return -EINVAL;
+ }
+
+ RGWUploadPartInfo stored_info;
+
+ int ret = read_omap_entry(hctx, op.part_key, &stored_info);
+ if (ret < 0 and ret != -ENOENT) {
+ return ret;
+ }
+
+ /* merge all the prior (stored) manifest prefixes to carry forward */
+ if (not stored_info.manifest.empty()) {
+ op.info.past_prefixes.insert(stored_info.manifest.get_prefix());
+ }
+ op.info.past_prefixes.merge(stored_info.past_prefixes);
+
+ if (op.info.past_prefixes.contains(op.info.manifest.get_prefix())) {
+ // Somehow the current chosen prefix collides with one of previous ones.
+ // Better fail this part upload so it can pick a different one in the next.
+ const object_info_t& oi = cls_get_object_info(hctx);
+ CLS_LOG(1, "ERROR: oid [%s]: Current prefix %s is also a past prefix for part %s",
+ oi.soid.oid.name.c_str(),
+ op.info.manifest.get_prefix().c_str(),
+ op.part_key.c_str());
+ return -EINVAL;
+ }
+
+ bufferlist bl;
+ encode(op.info, bl);
+ ret = cls_cxx_map_set_val(hctx, op.part_key, &bl);
+ CLS_LOG(10, "part info update on key [%s]: %zu past prefixes, ret %d", op.part_key.c_str(), op.info.past_prefixes.size(), ret);
+ return ret;
+}
+
static int rgw_reshard_add(cls_method_context_t hctx, bufferlist *in, bufferlist *out)
{
CLS_LOG(10, "entered %s", __func__);
cls_method_handle_t h_rgw_lc_put_head;
cls_method_handle_t h_rgw_lc_get_head;
cls_method_handle_t h_rgw_lc_list_entries;
+ cls_method_handle_t h_rgw_mp_upload_part_info_update;
cls_method_handle_t h_rgw_reshard_add;
cls_method_handle_t h_rgw_reshard_list;
cls_method_handle_t h_rgw_reshard_get;
cls_register_cxx_method(h_class, RGW_LC_GET_HEAD, CLS_METHOD_RD, rgw_cls_lc_get_head, &h_rgw_lc_get_head);
cls_register_cxx_method(h_class, RGW_LC_LIST_ENTRIES, CLS_METHOD_RD, rgw_cls_lc_list_entries, &h_rgw_lc_list_entries);
+ /* multipart */
+ cls_register_cxx_method(h_class, RGW_MP_UPLOAD_PART_INFO_UPDATE, CLS_METHOD_RD | CLS_METHOD_WR, rgw_mp_upload_part_info_update, &h_rgw_mp_upload_part_info_update);
+
/* resharding */
cls_register_cxx_method(h_class, RGW_RESHARD_ADD, CLS_METHOD_RD | CLS_METHOD_WR, rgw_reshard_add, &h_rgw_reshard_add);
cls_register_cxx_method(h_class, RGW_RESHARD_LIST, CLS_METHOD_RD, rgw_reshard_list, &h_rgw_reshard_list);
return r;
}
+int cls_rgw_mp_upload_part_info_update(librados::IoCtx& io_ctx,
+ const std::string& oid,
+ const std::string& part_key,
+ const RGWUploadPartInfo& info)
+{
+ buffer::list in, out;
+ cls_rgw_mp_upload_part_info_update_op op;
+
+ // For now, there doesn't appear to be a need for an encoded
+ // result -- we might in future want to return a copy of the final
+ // RGWUploadPartInfo
+
+ op.part_key = part_key;
+ op.info = info;
+ encode(op, in);
+
+ int r = io_ctx.exec(oid, RGW_CLASS, RGW_MP_UPLOAD_PART_INFO_UPDATE, in, out);
+ return r;
+}
+
void cls_rgw_reshard_add(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry)
{
bufferlist in;
std::vector<cls_rgw_lc_entry>& entries);
#endif
+/* multipart */
+int cls_rgw_mp_upload_part_info_update(librados::IoCtx& io_ctx, const std::string& oid, const std::string& part_key, const RGWUploadPartInfo& info);
+
/* resharding */
void cls_rgw_reshard_add(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry);
void cls_rgw_reshard_remove(librados::ObjectWriteOperation& op, const cls_rgw_reshard_entry& entry);
#define RGW_LC_GET_HEAD "lc_get_head"
#define RGW_LC_LIST_ENTRIES "lc_list_entries"
+/* multipart */
+#define RGW_MP_UPLOAD_PART_INFO_UPDATE "mp_upload_part_info_update"
+
/* resharding */
#define RGW_RESHARD_ADD "reshard_add"
#define RGW_RESHARD_LIST "reshard_list"
ls.back()->truncated = true;
}
+void cls_rgw_mp_upload_part_info_update_op::generate_test_instances(std::list<cls_rgw_mp_upload_part_info_update_op*>& ls)
+{
+ ls.push_back(new cls_rgw_mp_upload_part_info_update_op);
+ ls.back()->part_key = "part1";
+ ls.push_back(new cls_rgw_mp_upload_part_info_update_op);
+ ls.back()->part_key = "part2";
+}
+
+void cls_rgw_mp_upload_part_info_update_op::dump(Formatter* f) const
+{
+ encode_json("part_key", part_key, f);
+ encode_json("part_num", info.num, f);
+ encode_json("part_prefix", info.manifest.get_prefix(), f);
+}
+
void cls_rgw_reshard_add_op::generate_test_instances(list<cls_rgw_reshard_add_op*>& ls)
{
ls.push_back(new cls_rgw_reshard_add_op);
};
WRITE_CLASS_ENCODER(cls_rgw_lc_list_entries_ret)
+struct cls_rgw_mp_upload_part_info_update_op {
+ std::string part_key;
+ RGWUploadPartInfo info;
+
+ cls_rgw_mp_upload_part_info_update_op() {}
+
+ void encode(buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(part_key, bl);
+ encode(info, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(part_key, bl);
+ decode(info, bl);
+ DECODE_FINISH(bl);
+ }
+
+ static void generate_test_instances(std::list<cls_rgw_mp_upload_part_info_update_op*>& ls);
+ void dump(Formatter* f) const;
+};
+WRITE_CLASS_ENCODER(cls_rgw_mp_upload_part_info_update_op)
+
struct cls_rgw_reshard_add_op {
cls_rgw_reshard_entry entry;
return r;
}
- encode(info, bl);
-
std::unique_ptr<rgw::sal::Object> meta_obj =
head_obj->get_bucket()->get_object(rgw_obj_key(mp.get_meta(), std::string(), RGW_OBJ_NS_MULTIPART));
meta_obj->set_in_extra_data(true);
- r = meta_obj->omap_set_val_by_key(dpp, p, bl, true, null_yield);
+ rgw_raw_obj meta_raw_obj;
+ dynamic_cast<rgw::sal::RadosObject*>(meta_obj.get())->get_raw_obj(&meta_raw_obj);
+
+ rgw_rados_ref meta_obj_ref;
+ r = store->getRados()->get_raw_obj_ref(dpp, meta_raw_obj, &meta_obj_ref);
+ if (r < 0) {
+ ldpp_dout(dpp, -1) << "ERROR: failed to get obj ref of meta obj with ret=" << r << dendl;
+ return r;
+ }
+
+ r = cls_rgw_mp_upload_part_info_update(meta_obj_ref.pool.ioctx(), meta_obj_ref.obj.oid, p, info);
+ ldpp_dout(dpp, 20) << "Update meta: " << meta_obj_ref.obj.oid << " part " << p << " prefix " << info.manifest.get_prefix() << " return " << r << dendl;
+
if (r < 0) {
return r == -ENOENT ? -ERR_NO_SUCH_UPLOAD : r;
}
y);
}
+int RadosMultipartUpload::cleanup_part_history(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ RadosMultipartPart *part,
+ list<rgw_obj_index_key>& remove_objs)
+{
+ cls_rgw_obj_chain chain;
+ for (auto& ppfx : part->get_past_prefixes()) {
+ rgw_obj past_obj;
+ past_obj.init_ns(bucket->get_key(), ppfx + "." + std::to_string(part->info.num), mp_ns);
+ rgw_obj_index_key past_key;
+ past_obj.key.get_index_key(&past_key);
+ // Remove past upload part objects from index, too.
+ remove_objs.push_back(past_key);
+
+ RGWObjManifest manifest = part->get_manifest();
+ manifest.set_prefix(ppfx);
+ RGWObjManifest::obj_iterator miter = manifest.obj_begin(dpp);
+ for (; miter != manifest.obj_end(dpp); ++miter) {
+ rgw_raw_obj raw_part_obj = miter.get_location().get_raw_obj(store);
+ cls_rgw_obj_key part_key(raw_part_obj.oid);
+ chain.push_obj(raw_part_obj.pool.to_str(), part_key, raw_part_obj.loc);
+ }
+ }
+ if (store->getRados()->get_gc() == nullptr) {
+ // Delete objects inline if gc hasn't been initialised (in case when bypass gc is specified)
+ store->getRados()->delete_objs_inline(dpp, chain, mp_obj.get_upload_id());
+ } else {
+ // use upload id as tag and do it synchronously
+ auto [ret, leftover_chain] = store->getRados()->send_chain_to_gc(chain, mp_obj.get_upload_id());
+ if (ret < 0 && leftover_chain) {
+ ldpp_dout(dpp, 5) << __func__ << ": gc->send_chain() returned " << ret << dendl;
+ if (ret == -ENOENT) {
+ return -ERR_NO_SUCH_UPLOAD;
+ }
+ // Delete objects inline if send chain to gc fails
+ store->getRados()->delete_objs_inline(dpp, *leftover_chain, mp_obj.get_upload_id());
+ }
+ }
+ return 0;
+}
+
+
int RadosMultipartUpload::abort(const DoutPrefixProvider *dpp, CephContext *cct)
{
std::unique_ptr<rgw::sal::Object> meta_obj = get_meta_obj();
rgw_obj_index_key key;
head->get_key().get_index_key(&key);
remove_objs.push_back(key);
+
+ cleanup_part_history(dpp, null_yield, obj_part, remove_objs);
}
}
parts_accounted_size += obj_part->info.accounted_size;
return ret;
} else {
manifest.append(dpp, obj_part.manifest, store->svc()->zone->get_zonegroup(), store->svc()->zone->get_zone_params());
+ auto manifest_prefix = part->info.manifest.get_prefix();
+ if (not manifest_prefix.empty()) {
+ // It has an explicit prefix. Override the default one.
+ src_obj.init_ns(bucket->get_key(), manifest_prefix + "." + std::to_string(part->info.num), mp_ns);
+ }
}
bool part_compressed = (obj_part.cs_info.compression_type != "none");
remove_objs.push_back(remove_key);
+ cleanup_part_history(dpp, y, part, remove_objs);
+
ofs += obj_part.size;
accounted_size += obj_part.accounted_size;
}
/* For RadosStore code */
RGWObjManifest& get_manifest() { return info.manifest; }
+ const std::set<std::string>& get_past_prefixes() const { return info.past_prefixes; }
friend class RadosMultipartUpload;
};
const rgw_placement_rule *ptail_placement_rule,
uint64_t part_num,
const std::string& part_num_str) override;
+protected:
+ int cleanup_part_history(const DoutPrefixProvider* dpp,
+ optional_yield y,
+ RadosMultipartPart* part,
+ std::list<rgw_obj_index_key>& remove_objs);
};
class MPRadosSerializer : public StoreMPSerializer {
RGWObjManifest manifest;
RGWCompressionInfo cs_info;
+ // Previous part obj prefixes. Recorded here for later cleanup.
+ std::set<std::string> past_prefixes;
+
RGWUploadPartInfo() : num(0), size(0) {}
void encode(bufferlist& bl) const {
- ENCODE_START(4, 2, bl);
+ ENCODE_START(5, 2, bl);
encode(num, bl);
encode(size, bl);
encode(etag, bl);
encode(manifest, bl);
encode(cs_info, bl);
encode(accounted_size, bl);
+ encode(past_prefixes, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
- DECODE_START_LEGACY_COMPAT_LEN(4, 2, 2, bl);
+ DECODE_START_LEGACY_COMPAT_LEN(5, 2, 2, bl);
decode(num, bl);
decode(size, bl);
decode(etag, bl);
} else {
accounted_size = size;
}
+ if (struct_v >= 5) {
+ decode(past_prefixes, bl);
+ }
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
encode_json("etag", etag, f);
utime_t ut(modified);
encode_json("modified", ut, f);
+ encode_json("past_prefixes", past_prefixes, f);
}