+------------------------+-----------+------------------------------------------+
| ``StorageClass`` | String | Should always return ``STANDARD``. |
+------------------------+-----------+------------------------------------------+
+| ``Type`` | String | ``Appendable`` or ``Normal``. |
++------------------------+-----------+------------------------------------------+
Get Bucket Location
-------------------
::
DELETE /{bucket}/{object}?uploadId= HTTP/1.1
+
+
+Append Object
+----------
+Append data to an object. You must have write permissions on the bucket to perform this operation.
+It is used to upload files in appending mode.The type of the objects created by the Append Object
+operation is Appendable Object, and the type of the objects uploaded with the Put Object operation is Normal Object.
+**Append Object can't be used if bucket versioning is enabled.**
+
+
+Syntax
+~~~~~~
+
+::
+
+ PUT /{bucket}/{object}?append&position= HTTP/1.1
+
+Request Headers
+~~~~~~~~~~~~~~~
+
++----------------------+--------------------------------------------+-------------------------------------------------------------------------------+------------+
+| Name | Description | Valid Values | Required |
++======================+============================================+===============================================================================+============+
+| **content-md5** | A base64 encoded MD-5 hash of the message. | A string. No defaults or constraints. | No |
++----------------------+--------------------------------------------+-------------------------------------------------------------------------------+------------+
+| **content-type** | A standard MIME type. | Any MIME type. Default: ``binary/octet-stream`` | No |
++----------------------+--------------------------------------------+-------------------------------------------------------------------------------+------------+
+| **x-amz-meta-<...>** | User metadata. Stored with the object. | A string up to 8kb. No defaults. | No |
++----------------------+--------------------------------------------+-------------------------------------------------------------------------------+------------+
+| **x-amz-acl** | A canned ACL. | ``private``, ``public-read``, ``public-read-write``, ``authenticated-read`` | No |
++----------------------+--------------------------------------------+-------------------------------------------------------------------------------+------------+
+
+Response Headers
+~~~~~~~~~~~~~~~~
+
++--------------------------------+------------------------------------------------------------------+
+| Name | Description |
++================================+==================================================================+
+| **x-rgw-next-append-position** | Next position to append object |
++--------------------------------+------------------------------------------------------------------+
+
+HTTP Response
+~~~~~~~~~~~~~
+
+The following HTTP response may be returned:
+
++---------------+----------------------------+---------------------------------------------------+
+| HTTP Status | Status Code | Description |
++===============+============================+===================================================+
+| **409** | PositionNotEqualToLength | Specified position does not match object length |
++---------------+----------------------------+---------------------------------------------------+
+| **409** | ObjectNotAppendable | Specified object can not be appended |
++---------------+----------------------------+---------------------------------------------------+
\ No newline at end of file
encode_json("content_type", content_type, f);
encode_json("accounted_size", accounted_size, f);
encode_json("user_data", user_data, f);
+ encode_json("appendable", appendable, f);
}
void rgw_bucket_dir_entry_meta::decode_json(JSONObj *obj) {
JSONDecoder::decode_json("content_type", content_type, obj);
JSONDecoder::decode_json("accounted_size", accounted_size, obj);
JSONDecoder::decode_json("user_data", user_data, obj);
+ JSONDecoder::decode_json("appendable", appendable, obj);
}
void rgw_bucket_dir_entry::generate_test_instances(list<rgw_bucket_dir_entry*>& o)
uint64_t accounted_size;
string user_data;
string storage_class;
+ bool appendable;
rgw_bucket_dir_entry_meta() :
- category(RGWObjCategory::None), size(0), accounted_size(0) { }
+ category(RGWObjCategory::None), size(0), accounted_size(0), appendable(false) { }
void encode(bufferlist &bl) const {
- ENCODE_START(6, 3, bl);
+ ENCODE_START(7, 3, bl);
encode(category, bl);
encode(size, bl);
encode(mtime, bl);
encode(accounted_size, bl);
encode(user_data, bl);
encode(storage_class, bl);
+ encode(appendable, bl);
ENCODE_FINISH(bl);
}
+
void decode(bufferlist::const_iterator &bl) {
DECODE_START_LEGACY_COMPAT_LEN(6, 3, 3, bl);
decode(category, bl);
decode(user_data, bl);
if (struct_v >= 6)
decode(storage_class, bl);
+ if (struct_v >= 7)
+ decode(appendable, bl);
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
{ ERR_TAG_CONFLICT, {409, "OperationAborted"}},
{ ERR_ROLE_EXISTS, {409, "EntityAlreadyExists"}},
{ ERR_DELETE_CONFLICT, {409, "DeleteConflict"}},
+ { ERR_POSITION_NOT_EQUAL_TO_LENGTH, {409, "PositionNotEqualToLength"}},
+ { ERR_OBJECT_NOT_APPENDABLE, {409, "ObjectNotAppendable"}},
{ ERR_INVALID_SECRET_KEY, {400, "InvalidSecretKey"}},
{ ERR_INVALID_KEY_TYPE, {400, "InvalidKeyType"}},
{ ERR_INVALID_CAP, {400, "InvalidCapability"}},
(name.compare("website") == 0) ||
(name.compare("requestPayment") == 0) ||
(name.compare("torrent") == 0) ||
- (name.compare("tagging") == 0)) {
+ (name.compare("tagging") == 0) ||
+ (name.compare("append") == 0) ||
+ (name.compare("position") == 0)) {
sub_resources[name] = val;
} else if (name[0] == 'r') { // root of all evil
if ((name.compare("response-content-type") == 0) ||
#define RGW_ATTR_COMPRESSION RGW_ATTR_PREFIX "compression"
+#define RGW_ATTR_APPEND_PART_NUM RGW_ATTR_PREFIX "append_part_num"
+
/* IAM Policy */
#define RGW_ATTR_IAM_POLICY RGW_ATTR_PREFIX "iam-policy"
#define RGW_ATTR_USER_POLICY RGW_ATTR_PREFIX "user-policy"
#define ERR_NO_CORS_FOUND 2216
#define ERR_INVALID_WEBSITE_ROUTING_RULES_ERROR 2217
#define ERR_RATE_LIMITED 2218
+#define ERR_POSITION_NOT_EQUAL_TO_LENGTH 2219
+#define ERR_OBJECT_NOT_APPENDABLE 2220
#define ERR_BUSY_RESHARDING 2300
#define ERR_NO_SUCH_ENTITY 2301
return 0;
}
+
void RGWPutObj::pre_exec()
{
rgw_bucket_object_pre_exec(s);
rgw::AioThrottle aio(store->ctx()->_conf->rgw_put_obj_min_window_size);
using namespace rgw::putobj;
constexpr auto max_processor_size = std::max(sizeof(MultipartObjectProcessor),
- sizeof(AtomicObjectProcessor));
+ sizeof(AtomicObjectProcessor),
+ sizeof(AppendObjectProcessor));
ceph::static_ptr<ObjectProcessor, max_processor_size> processor;
rgw_placement_rule *pdest_placement;
&aio, store, s->bucket_info, pdest_placement,
s->owner.get_id(), obj_ctx, obj,
multipart_upload_id, multipart_part_num, multipart_part_str);
+ } else if(append) {
+ processor.emplace<AppendObjectProcessor>(
+ &aio, store, s->bucket_info, s->bucket_owner.get_id(),obj_ctx, obj,
+ s->req_id, position, &cur_accounted_size);
} else {
if (s->bucket_info.versioning_enabled()) {
if (!version_id.empty()) {
int multipart_part_num = 0;
boost::optional<ceph::real_time> delete_at;
+ //append obj
+ bool append;
+ uint64_t position;
+ uint64_t cur_accounted_size;
public:
RGWPutObj() : ofs(0),
chunked_upload(0),
dlo_manifest(NULL),
slo_info(NULL),
- olh_epoch(0) {}
+ olh_epoch(0),
+ append(false),
+ position(0),
+ cur_accounted_size(0) {}
~RGWPutObj() override {
delete slo_info;
return 0;
}
+int AppendObjectProcessor::process_first_chunk(bufferlist &&data, rgw::putobj::DataProcessor **processor)
+{
+ int r = writer.write_exclusive(data);
+ if (r < 0) {
+ return r;
+ }
+ *processor = &stripe;
+ return 0;
+}
+
+int AppendObjectProcessor::prepare()
+{
+ RGWObjState *astate;
+ int r = store->get_obj_state(&obj_ctx, bucket_info, head_obj, &astate);
+ if (r < 0) {
+ return r;
+ }
+ cur_size = astate->size;
+ *cur_accounted_size = astate->accounted_size;
+ if (!astate->exists) {
+ if (position != 0) {
+ ldout(store->ctx(), 5) << "ERROR: Append position should be zero" << dendl;
+ return -ERR_POSITION_NOT_EQUAL_TO_LENGTH;
+ } else {
+ cur_part_num = 1;
+ //set the prefix
+ char buf[33];
+ gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
+ string oid_prefix = head_obj.key.name;
+ oid_prefix.append(".");
+ oid_prefix.append(buf);
+ oid_prefix.append("_");
+ manifest.set_prefix(oid_prefix);
+ }
+ } else {
+ // check whether the object appendable
+ map<string, bufferlist>::iterator iter = astate->attrset.find(RGW_ATTR_APPEND_PART_NUM);
+ if (iter == astate->attrset.end()) {
+ ldout(store->ctx(), 5) << "ERROR: The object is not appendable" << dendl;
+ return -ERR_OBJECT_NOT_APPENDABLE;
+ }
+ if (position != *cur_accounted_size) {
+ ldout(store->ctx(), 5) << "ERROR: Append position should be equal to the obj size" << dendl;
+ return -ERR_POSITION_NOT_EQUAL_TO_LENGTH;
+ }
+ try {
+ decode(cur_part_num, iter->second);
+ } catch (buffer::error& err) {
+ ldout(store->ctx(), 5) << "ERROR: failed to decode part num" << dendl;
+ return -EIO;
+ }
+ cur_part_num++;
+ //get the current obj etag
+ iter = astate->attrset.find(RGW_ATTR_ETAG);
+ if (iter != astate->attrset.end()) {
+ string s = rgw_string_unquote(iter->second.c_str());
+ size_t pos = s.find("-");
+ cur_etag = s.substr(0, pos);
+ }
+ cur_manifest = &astate->manifest;
+ manifest.set_prefix(cur_manifest->get_prefix());
+ }
+ manifest.set_multipart_part_rule(store->ctx()->_conf->rgw_obj_stripe_size, cur_part_num);
+
+ r = manifest_gen.create_begin(store->ctx(), &manifest, bucket_info.placement_rule, head_obj.bucket, head_obj);
+ if (r < 0) {
+ return r;
+ }
+ rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
+
+ uint64_t chunk_size = 0;
+ r = store->get_max_chunk_size(stripe_obj.pool, &chunk_size);
+ if (r < 0) {
+ return r;
+ }
+ r = writer.set_stripe_obj(std::move(stripe_obj));
+ if (r < 0) {
+ return r;
+ }
+
+ uint64_t stripe_size = manifest_gen.cur_stripe_max_size();
+
+ uint64_t max_head_size = std::min(chunk_size, stripe_size);
+ set_head_chunk_size(max_head_size);
+
+ // initialize the processors
+ chunk = ChunkProcessor(&writer, chunk_size);
+ stripe = StripeProcessor(&chunk, this, stripe_size);
+
+ return 0;
+}
+
+int AppendObjectProcessor::complete(size_t accounted_size, const string &etag, ceph::real_time *mtime,
+ ceph::real_time set_mtime, map <string, bufferlist> &attrs,
+ ceph::real_time delete_at, const char *if_match, const char *if_nomatch,
+ const string *user_data, rgw_zone_set *zones_trace, bool *pcanceled)
+{
+ int r = writer.drain();
+ if (r < 0)
+ return r;
+ const uint64_t actual_size = get_actual_size();
+ r = manifest_gen.create_next(actual_size);
+ if (r < 0) {
+ return r;
+ }
+ obj_ctx.obj.set_atomic(head_obj);
+ RGWRados::Object op_target(store, bucket_info, obj_ctx, head_obj);
+ //For Append obj, disable versioning
+ op_target.set_versioning_disabled(true);
+ RGWRados::Object::Write obj_op(&op_target);
+ if (cur_manifest) {
+ cur_manifest->append(manifest, store);
+ obj_op.meta.manifest = cur_manifest;
+ } else {
+ obj_op.meta.manifest = &manifest;
+ }
+ obj_op.meta.ptag = &unique_tag; /* use req_id as operation tag */
+ obj_op.meta.mtime = mtime;
+ obj_op.meta.set_mtime = set_mtime;
+ obj_op.meta.owner = owner;
+ obj_op.meta.flags = PUT_OBJ_CREATE;
+ obj_op.meta.delete_at = delete_at;
+ obj_op.meta.user_data = user_data;
+ obj_op.meta.zones_trace = zones_trace;
+ obj_op.meta.modify_tail = true;
+ obj_op.meta.appendable = true;
+ //Add the append part number
+ bufferlist cur_part_num_bl;
+ encode(cur_part_num, cur_part_num_bl);
+ attrs[RGW_ATTR_APPEND_PART_NUM] = cur_part_num_bl;
+ //calculate the etag
+ if (!cur_etag.empty()) {
+ MD5 hash;
+ char petag[CEPH_CRYPTO_MD5_DIGESTSIZE];
+ char final_etag[CEPH_CRYPTO_MD5_DIGESTSIZE];
+ char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
+ hex_to_buf(cur_etag.c_str(), petag, CEPH_CRYPTO_MD5_DIGESTSIZE);
+ hash.Update((const unsigned char *)petag, sizeof(petag));
+ hex_to_buf(etag.c_str(), petag, CEPH_CRYPTO_MD5_DIGESTSIZE);
+ hash.Update((const unsigned char *)petag, sizeof(petag));
+ hash.Final((unsigned char *)final_etag);
+ buf_to_hex((unsigned char *)final_etag, sizeof(final_etag), final_etag_str);
+ snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2], sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2,
+ "-%lld", (long long)cur_part_num);
+ bufferlist etag_bl;
+ etag_bl.append(final_etag_str, strlen(final_etag_str) + 1);
+ attrs[RGW_ATTR_ETAG] = etag_bl;
+ }
+ r = obj_op.write_meta(actual_size + cur_size, accounted_size + *cur_accounted_size, attrs);
+ if (r < 0) {
+ return r;
+ }
+ if (!obj_op.meta.canceled) {
+ // on success, clear the set of objects for deletion
+ writer.clear_written();
+ }
+ if (pcanceled) {
+ *pcanceled = obj_op.meta.canceled;
+ }
+ *cur_accounted_size += accounted_size;
+
+ return 0;
+}
+
} // namespace rgw::putobj
rgw_zone_set *zones_trace, bool *canceled) override;
};
+ class AppendObjectProcessor : public ManifestObjectProcessor {
+ uint64_t cur_part_num;
+ uint64_t position;
+ uint64_t cur_size;
+ uint64_t *cur_accounted_size;
+ string cur_etag;
+ const std::string unique_tag;
+
+ RGWObjManifest *cur_manifest;
+
+ int process_first_chunk(bufferlist&& data, DataProcessor **processor) override;
+
+ public:
+ AppendObjectProcessor(Aio *aio, RGWRados *store, const RGWBucketInfo& bucket_info,
+ const rgw_user& owner, RGWObjectCtx& obj_ctx,const rgw_obj& head_obj,
+ const std::string& unique_tag, uint64_t position, uint64_t *cur_accounted_size)
+ : ManifestObjectProcessor(aio, store, bucket_info, owner, obj_ctx, head_obj),
+ position(position), cur_size(0), cur_accounted_size(cur_accounted_size),
+ unique_tag(unique_tag), cur_manifest(nullptr)
+ {}
+ int prepare() override;
+ int complete(size_t accounted_size, const string& etag,
+ ceph::real_time *mtime, ceph::real_time set_mtime,
+ map<string, bufferlist>& attrs, ceph::real_time delete_at,
+ const char *if_match, const char *if_nomatch, const string *user_data,
+ rgw_zone_set *zones_trace, bool *canceled) override;
+ };
+
} // namespace putobj
} // namespace rgw
+
void RGWRados::finalize()
{
cct->get_admin_socket()->unregister_commands(this);
-
if (run_sync_thread) {
Mutex::Locker l(meta_sync_thread_lock);
meta_sync_processor_thread->stop();
r = index_op->complete(poolid, epoch, size, accounted_size,
meta.set_mtime, etag, content_type,
storage_class, &acl_bl,
- meta.category, meta.remove_objs, meta.user_data);
+ meta.category, meta.remove_objs, meta.user_data, meta.appendable);
tracepoint(rgw_rados, complete_exit, req_id.c_str());
if (r < 0)
goto done_cancel;
const string& content_type, const string& storage_class,
bufferlist *acl_bl,
RGWObjCategory category,
- list<rgw_obj_index_key> *remove_objs, const string *user_data)
+ list<rgw_obj_index_key> *remove_objs, const string *user_data,
+ bool appendable)
{
if (blind) {
return 0;
ent.meta.owner = owner.get_id().to_str();
ent.meta.owner_display_name = owner.get_display_name();
ent.meta.content_type = content_type;
+ ent.meta.appendable = appendable;
ret = store->cls_obj_complete_add(*bs, obj, optag, poolid, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
rgw_zone_set *zones_trace;
bool modify_tail;
bool completeMultipart;
+ bool appendable;
MetaParams() : mtime(NULL), rmattrs(NULL), data(NULL), manifest(NULL), ptag(NULL),
remove_objs(NULL), category(RGWObjCategory::Main), flags(0),
if_match(NULL), if_nomatch(NULL), canceled(false), user_data(nullptr), zones_trace(nullptr),
- modify_tail(false), completeMultipart(false) {}
+ modify_tail(false), completeMultipart(false), appendable(false) {}
} meta;
explicit Write(RGWRados::Object *_target) : target(_target) {}
const string& etag, const string& content_type,
const string& storage_class,
bufferlist *acl_bl, RGWObjCategory category,
- list<rgw_obj_index_key> *remove_objs, const string *user_data = nullptr);
+ list<rgw_obj_index_key> *remove_objs, const string *user_data = nullptr, bool appendable = false);
int complete_del(int64_t poolid, uint64_t epoch,
ceph::real_time& removed_mtime, /* mtime of removed object */
list<rgw_obj_index_key> *remove_objs);
dump_content_length(s, total_len);
dump_last_modified(s, lastmod);
dump_header_if_nonempty(s, "x-amz-version-id", version_id);
-
+ if (attrs.find(RGW_ATTR_APPEND_PART_NUM) != attrs.end()) {
+ dump_header(s, "x-rgw-object-type", "Appendable");
+ dump_header(s, "x-rgw-next-append-position", s->obj_size);
+ } else {
+ dump_header(s, "x-rgw-object-type", "Normal");
+ }
if (! op_ret) {
if (! lo_etag.empty()) {
/* Handle etag of Swift API's large objects (DLO/SLO). It's entirerly
s->formatter->dump_string("StorageClass", storage_class.c_str());
}
dump_owner(s, iter->meta.owner, iter->meta.owner_display_name);
+ if (iter->meta.appendable) {
+ s->formatter->dump_string("Type", "Appendable");
+ } else {
+ s->formatter->dump_string("Type", "Normal");
+ }
s->formatter->close_section();
}
if (objs_container) {
if (s->system_request) {
s->formatter->dump_string("RgwxTag", iter->tag);
}
+ if (iter->meta.appendable) {
+ s->formatter->dump_string("Type", "Appendable");
+ } else {
+ s->formatter->dump_string("Type", "Normal");
+ }
s->formatter->close_section();
}
if (!common_prefixes.empty()) {
return -EINVAL;
}
+ append = s->info.args.exists("append");
+ if (append) {
+ string pos_str = s->info.args.get("position");
+ if (pos_str.empty()) {
+ return -EINVAL;
+ } else {
+ position = strtoull(pos_str.c_str(), NULL, 10);
+ }
+ }
+
return RGWPutObj_ObjStore::get_params();
}
return;
}
}
+ if (append) {
+ if (op_ret == 0 || op_ret == -ERR_POSITION_NOT_EQUAL_TO_LENGTH) {
+ dump_header(s, "x-rgw-next-append-position", cur_accounted_size);
+ }
+ }
if (s->system_request && !real_clock::is_zero(mtime)) {
dump_epoch_header(s, "Rgwx-Mtime", mtime);
}