string upload_id;
protected:
- int prepare(RGWRados *store, void *obj_ctx, string *oid_rand);
+ int prepare(RGWRados *store, string *oid_rand);
int do_complete(string& etag, time_t *mtime, time_t set_mtime,
map<string, bufferlist>& attrs,
const char *if_match = NULL, const char *if_nomatch = NULL);
public:
bool immutable_head() { return true; }
- RGWPutObjProcessor_Multipart(const string& bucket_owner, uint64_t _p, req_state *_s) :
- RGWPutObjProcessor_Atomic(bucket_owner, _s->bucket, _s->object.name, _p, _s->req_id, false), s(_s) {}
+ RGWPutObjProcessor_Multipart(RGWRados::ObjectCtx& obj_ctx, const string& bucket_owner, uint64_t _p, req_state *_s) :
+ RGWPutObjProcessor_Atomic(obj_ctx, bucket_owner, _s->bucket, _s->object.name, _p, _s->req_id, false), s(_s) {}
};
-int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx, string *oid_rand)
+int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, string *oid_rand)
{
- int r = prepare_init(store, obj_ctx, NULL);
+ int r = prepare_init(store, NULL);
if (r < 0) {
return r;
}
params.mtime = mtime;
params.owner = s->owner.get_id();
- int r = store->put_obj_meta(obj_ctx, head_obj, s->obj_size, attrs, RGW_OBJ_CATEGORY_MAIN, 0, params);
+ int r = store->put_obj_meta(&obj_ctx, head_obj, s->obj_size, attrs, RGW_OBJ_CATEGORY_MAIN, 0, params);
if (r < 0)
return r;
}
-RGWPutObjProcessor *RGWPutObj::select_processor(bool *is_multipart)
+RGWPutObjProcessor *RGWPutObj::select_processor(RGWRados::ObjectCtx& obj_ctx, bool *is_multipart)
{
RGWPutObjProcessor *processor;
const string& bucket_owner = s->bucket_owner.get_id();
if (!multipart) {
- processor = new RGWPutObjProcessor_Atomic(bucket_owner, s->bucket, s->object.name, part_size, s->req_id, s->bucket_info.versioning_enabled());
+ processor = new RGWPutObjProcessor_Atomic(obj_ctx, bucket_owner, s->bucket, s->object.name, part_size, s->req_id, s->bucket_info.versioning_enabled());
} else {
- processor = new RGWPutObjProcessor_Multipart(bucket_owner, part_size, s);
+ processor = new RGWPutObjProcessor_Multipart(obj_ctx, bucket_owner, part_size, s);
}
if (is_multipart) {
supplied_md5[sizeof(supplied_md5) - 1] = '\0';
}
- processor = select_processor(&multipart);
+ processor = select_processor(*(RGWRados::ObjectCtx *)s->obj_ctx, &multipart);
- ret = processor->prepare(store, s->obj_ctx, NULL);
+ ret = processor->prepare(store, NULL);
if (ret < 0)
goto done;
/* restart processing with different oid suffix */
dispose_processor(processor);
- processor = select_processor(&multipart);
+ processor = select_processor(*(RGWRados::ObjectCtx *)s->obj_ctx, &multipart);
string oid_rand;
char buf[33];
gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
oid_rand.append(buf);
- ret = processor->prepare(store, s->obj_ctx, &oid_rand);
+ ret = processor->prepare(store, &oid_rand);
if (ret < 0) {
ldout(s->cct, 0) << "ERROR: processor->prepare() returned " << ret << dendl;
goto done;
return 0;
}
-RGWPutObjProcessor *RGWPostObj::select_processor()
+RGWPutObjProcessor *RGWPostObj::select_processor(RGWRados::ObjectCtx& obj_ctx)
{
RGWPutObjProcessor *processor;
uint64_t part_size = s->cct->_conf->rgw_obj_stripe_size;
- processor = new RGWPutObjProcessor_Atomic(s->bucket_owner.get_id(), s->bucket, s->object.name, part_size, s->req_id, s->bucket_info.versioning_enabled());
+ processor = new RGWPutObjProcessor_Atomic(obj_ctx, s->bucket_owner.get_id(), s->bucket, s->object.name, part_size, s->req_id, s->bucket_info.versioning_enabled());
return processor;
}
goto done;
}
- processor = select_processor();
+ processor = select_processor(*(RGWRados::ObjectCtx *)s->obj_ctx);
- ret = processor->prepare(store, s->obj_ctx, NULL);
+ ret = processor->prepare(store, NULL);
if (ret < 0)
goto done;
/* no need to track object versioning, need it for bucket's data only */
RGWObjVersionTracker *ptracker = (!s->object.empty() ? NULL : &s->bucket_info.objv_tracker);
- if (s->object) {
+ if (!s->object.empty()) {
/* check if obj exists, read orig attrs */
ret = get_obj_attrs(store, s, obj, orig_attrs, NULL, ptracker);
if (ret < 0)
orig_attrs = s->bucket_attrs;
}
- if (!s->object && !placement_rule.empty()) {
+ if (s->object.empty() && !placement_rule.empty()) {
if (placement_rule != s->bucket_info.placement_rule) {
ret = -EEXIST;
return;
rgw_obj src_obj(src_bucket, src_object);
rgw_obj dst_obj(dest_bucket, dest_object);
- store->set_atomic(s->obj_ctx, src_obj);
- store->set_atomic(s->obj_ctx, dst_obj);
+ RGWRados::ObjectCtx& obj_ctx = *(RGWRados::ObjectCtx *)s->obj_ctx;
+ obj_ctx.set_atomic(src_obj);
+ obj_ctx.set_atomic(dst_obj);
- ret = store->copy_obj(s->obj_ctx,
+ ret = store->copy_obj(obj_ctx,
s->user.user_id,
client_id,
op_id,
store->gen_rand_obj_instance_name(&target_obj);
}
- store->set_atomic(s->obj_ctx, target_obj);
+ RGWRados::ObjectCtx& obj_ctx = *(RGWRados::ObjectCtx *)s->obj_ctx;
+
+ obj_ctx.set_atomic(target_obj);
RGWRados::PutObjMetaExtraParams extra_params;
extra_params.ptag = &s->req_id; /* use req_id as operation tag */
extra_params.owner = s->owner.get_id();
- ret = store->put_obj_meta(s->obj_ctx, target_obj, ofs, attrs,
+ ret = store->put_obj_meta(&obj_ctx, target_obj, ofs, attrs,
RGW_OBJ_CATEGORY_MAIN, PUT_OBJ_CREATE,
extra_params);
if (ret < 0)
return;
if (versioned_object) {
- ret = store->set_olh(s->obj_ctx, s->bucket_owner.get_id(), target_obj, false);
+ ret = store->set_olh(obj_ctx, s->bucket_owner.get_id(), target_obj, false);
if (ret < 0) {
return;
}
}
// remove the upload obj
- store->delete_obj(s->obj_ctx, s->bucket_owner.get_id(), meta_obj, false);
+ store->delete_obj(&obj_ctx, s->bucket_owner.get_id(), meta_obj, false);
}
int RGWAbortMultipart::verify_permission()
policy.set_ctx(s->cct);
}
- RGWPutObjProcessor *select_processor(bool *is_multipart);
+ RGWPutObjProcessor *select_processor(RGWRados::ObjectCtx& obj_ctx, bool *is_multipart);
void dispose_processor(RGWPutObjProcessor *processor);
int verify_permission();
void pre_exec();
void execute();
- RGWPutObjProcessor *select_processor();
+ RGWPutObjProcessor *select_processor(RGWRados::ObjectCtx& obj_ctx);
void dispose_processor(RGWPutObjProcessor *processor);
virtual int get_params() = 0;
list<rgw_obj>::iterator iter;
for (iter = written_objs.begin(); iter != written_objs.end(); ++iter) {
rgw_obj& obj = *iter;
- int r = store->delete_obj(obj_ctx, bucket_owner, obj, false);
+ int r = store->delete_obj(&obj_ctx, bucket_owner, obj, false);
if (r < 0 && r != -ENOENT) {
ldout(store->ctx(), 0) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
}
}
-int RGWPutObjProcessor_Atomic::prepare_init(RGWRados *store, void *obj_ctx, string *oid_rand)
+int RGWPutObjProcessor_Atomic::prepare_init(RGWRados *store, string *oid_rand)
{
- RGWPutObjProcessor::prepare(store, obj_ctx, oid_rand);
+ RGWPutObjProcessor::prepare(store, oid_rand);
int r = store->get_max_chunk_size(bucket, &max_chunk_size);
if (r < 0) {
return 0;
}
-int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, void *obj_ctx, string *oid_rand)
+int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, string *oid_rand)
{
- int r = prepare_init(store, obj_ctx, oid_rand);
+ int r = prepare_init(store, oid_rand);
if (r < 0) {
return r;
}
if (r < 0)
return r;
- store->set_atomic(obj_ctx, head_obj);
+ obj_ctx.set_atomic(head_obj);
RGWRados::PutObjMetaExtraParams extra_params;
extra_params.set_mtime = set_mtime;
extra_params.owner = bucket_owner;
- RGWRados::ObjectCtx *rctx = static_cast<RGWRados::ObjectCtx *>(obj_ctx);
-
bool is_olh = false;
if (head_obj.get_instance().empty()) {
RGWObjState *astate = NULL;
- r = store->get_obj_state(rctx, head_obj, &astate, NULL, false); /* don't follow olh */
+ r = store->get_obj_state(&obj_ctx, head_obj, &astate, NULL, false); /* don't follow olh */
if (r < 0) {
return r;
}
is_olh = astate->is_olh;
}
- r = store->put_obj_meta(obj_ctx, head_obj, obj_len, attrs,
+ r = store->put_obj_meta(&obj_ctx, head_obj, obj_len, attrs,
RGW_OBJ_CATEGORY_MAIN, PUT_OBJ_CREATE,
extra_params);
if (r < 0) {
return ret;
}
- return copy_obj_data((void *)&rctx, dest_bucket_info, &handle, end, obj, obj, max_chunk_size, NULL, mtime, attrset, RGW_OBJ_CATEGORY_MAIN, NULL, NULL, NULL);
+ return copy_obj_data(rctx, dest_bucket_info, &handle, end, obj, obj, max_chunk_size, NULL, mtime, attrset, RGW_OBJ_CATEGORY_MAIN, NULL, NULL, NULL);
}
/**
* err: stores any errors resulting from the get of the original object
* Returns: 0 on success, -ERR# otherwise.
*/
-int RGWRados::copy_obj(void *ctx,
+int RGWRados::copy_obj(ObjectCtx& obj_ctx,
const string& user_id,
const string& client_id,
const string& op_id,
off_t ofs = 0;
off_t end = -1;
if (!remote_src && source_zone.empty()) {
- ret = prepare_get_obj(ctx, src_obj, &ofs, &end, &src_attrs,
+ ret = prepare_get_obj(&obj_ctx, src_obj, &ofs, &end, &src_attrs,
mod_ptr, unmod_ptr, &lastmod, if_match, if_nomatch, &total_len, &obj_size, NULL, &handle, err);
if (ret < 0)
return ret;
string tag;
append_rand_alpha(cct, tag, tag, 32);
- RGWPutObjProcessor_Atomic processor(dest_bucket_info.owner, dest_obj.bucket, dest_obj.get_object(),
+ RGWPutObjProcessor_Atomic processor(obj_ctx,
+ dest_bucket_info.owner, dest_obj.bucket, dest_obj.get_object(),
cct->_conf->rgw_obj_stripe_size, tag, dest_bucket_info.versioning_enabled());
- ret = processor.prepare(this, ctx, NULL);
+ ret = processor.prepare(this, NULL);
if (ret < 0)
return ret;
RGWObjManifest manifest;
RGWObjState *astate = NULL;
- ObjectCtx *rctx = static_cast<ObjectCtx *>(ctx);
- ret = get_obj_state(rctx, src_obj, &astate, NULL);
+ ret = get_obj_state(&obj_ctx, src_obj, &astate, NULL);
if (ret < 0)
return ret;
if (ret < 0)
return ret;
- ret = get_obj_iterate(ctx, &handle, src_obj, 0, astate->size - 1, out_stream_req->get_out_cb());
+ ret = get_obj_iterate(&obj_ctx, &handle, src_obj, 0, astate->size - 1, out_stream_req->get_out_cb());
if (ret < 0)
return ret;
}
if (copy_data) { /* refcounting tail wouldn't work here, just copy the data */
- return copy_obj_data(ctx, dest_bucket_info, &handle, end, dest_obj, src_obj,
+ return copy_obj_data(obj_ctx, dest_bucket_info, &handle, end, dest_obj, src_obj,
max_chunk_size, mtime, 0, src_attrs, category, ptag, petag, err);
}
}
if (copy_first) {
- ret = get_obj(ctx, NULL, &handle, src_obj, first_chunk, 0, max_chunk_size, NULL);
+ ret = get_obj(&obj_ctx, NULL, &handle, src_obj, first_chunk, 0, max_chunk_size, NULL);
if (ret < 0)
goto done_ret;
ep.owner = dest_bucket_info.owner;
ep.mtime = mtime;
- ret = put_obj_meta(ctx, dest_obj, end + 1, src_attrs, category, PUT_OBJ_CREATE, ep);
+ ret = put_obj_meta(&obj_ctx, dest_obj, end + 1, src_attrs, category, PUT_OBJ_CREATE, ep);
if (petag) {
map<string, bufferlist>::iterator iter = src_attrs.find(RGW_ATTR_ETAG);
}
-int RGWRados::copy_obj_data(void *ctx,
+int RGWRados::copy_obj_data(ObjectCtx& obj_ctx,
RGWBucketInfo& dest_bucket_info,
void **handle, off_t end,
rgw_obj& dest_obj,
string tag;
append_rand_alpha(cct, tag, tag, 32);
- RGWPutObjProcessor_Atomic processor(dest_bucket_info.owner, dest_obj.bucket, dest_obj.get_object(),
+ RGWPutObjProcessor_Atomic processor(obj_ctx,
+ dest_bucket_info.owner, dest_obj.bucket, dest_obj.get_object(),
cct->_conf->rgw_obj_stripe_size, tag, dest_bucket_info.versioning_enabled());
- int ret = processor.prepare(this, ctx, NULL);
+ int ret = processor.prepare(this, NULL);
if (ret < 0)
return ret;
do {
bufferlist bl;
- ret = get_obj(ctx, NULL, handle, src_obj, bl, ofs, end, NULL);
+ ret = get_obj(&obj_ctx, NULL, handle, src_obj, bl, ofs, end, NULL);
if (ret < 0)
return ret;
rgw_obj marker = obj;
gen_rand_obj_instance_name(&marker);
- r = set_olh(ctx, bucket_owner, marker, true);
+ r = set_olh(*rctx, bucket_owner, marker, true);
if (r < 0) {
return r;
}
return (iter != attrs.end());
}
-int RGWRados::get_olh_target_state(ObjectCtx *rctx, rgw_obj& obj, RGWObjState *olh_state,
+int RGWRados::get_olh_target_state(ObjectCtx& obj_ctx, rgw_obj& obj, RGWObjState *olh_state,
RGWObjState **target_state, RGWObjVersionTracker *objv_tracker)
{
assert(olh_state->is_olh);
rgw_obj target;
- int r = RGWRados::follow_olh((void *)rctx, olh_state, obj, &target); /* might return -EAGAIN */
+ int r = RGWRados::follow_olh(obj_ctx, olh_state, obj, &target); /* might return -EAGAIN */
if (r < 0) {
return r;
}
- r = get_obj_state(rctx, target, target_state, objv_tracker, false);
+ r = get_obj_state(&obj_ctx, target, target_state, objv_tracker, false);
if (r < 0) {
return r;
}
*state = s;
if (s->has_attrs) {
if (s->is_olh && follow_olh) {
- return get_olh_target_state(rctx, obj, s, state, objv_tracker);
+ return get_olh_target_state(*rctx, obj, s, state, objv_tracker);
}
return 0;
}
s->is_olh = true;
if (follow_olh) {
- return get_olh_target_state(rctx, obj, s, state, objv_tracker);
+ return get_olh_target_state(*rctx, obj, s, state, objv_tracker);
}
}
uint64_t epoch = ref.ioctx.get_last_version();
int64_t poolid = ref.ioctx.get_id();
utime_t mtime = ceph_clock_now(cct);
- r = complete_update_index(bucket, obj.object, tag, poolid, epoch, state->size,
+ r = complete_update_index(bucket, obj, tag, poolid, epoch, state->size,
mtime, etag, content_type, &acl_bl, RGW_OBJ_CATEGORY_MAIN, NULL);
} else {
- int ret = complete_update_index_cancel(bucket, obj.object, tag);
+ int ret = complete_update_index_cancel(bucket, obj, tag);
if (ret < 0) {
ldout(cct, 0) << "ERROR: comlete_update_index_cancel() returned r=" << r << dendl;
}
op.setxattr(name, bl);
}
-int RGWRados::apply_olh_log(void *ctx, const string& bucket_owner, rgw_obj& obj,
+int RGWRados::apply_olh_log(ObjectCtx& obj_ctx, const string& bucket_owner, rgw_obj& obj,
bufferlist& obj_tag, map<uint64_t, rgw_bucket_olh_log_entry>& log,
uint64_t *plast_ver)
{
cls_rgw_obj_key& key = *liter;
rgw_obj obj_instance(bucket, key.name);
obj_instance.set_instance(key.instance);
- int ret = delete_obj(ctx, bucket_owner, obj_instance, false);
+ int ret = delete_obj(&obj_ctx, bucket_owner, obj_instance, false);
if (ret < 0 && ret != -ENOENT) {
ldout(cct, 0) << "ERROR: delete_obj() returned " << ret << " obj_instance=" << obj_instance << dendl;
return ret;
/*
* read olh log and apply it
*/
-int RGWRados::update_olh(void *ctx, RGWObjState *state, const string& bucket_owner, rgw_obj& obj)
+int RGWRados::update_olh(ObjectCtx& obj_ctx, RGWObjState *state, const string& bucket_owner, rgw_obj& obj)
{
map<uint64_t, rgw_bucket_olh_log_entry> log;
bool is_truncated;
if (ret < 0) {
return ret;
}
- ret = apply_olh_log(ctx, bucket_owner, obj, state->obj_tag, log, &ver_marker);
+ ret = apply_olh_log(obj_ctx, bucket_owner, obj, state->obj_tag, log, &ver_marker);
if (ret < 0) {
return ret;
}
return 0;
}
-int RGWRados::set_olh(void *ctx, const string& bucket_owner, rgw_obj& target_obj, bool delete_marker)
+int RGWRados::set_olh(ObjectCtx& obj_ctx, const string& bucket_owner, rgw_obj& target_obj, bool delete_marker)
{
string op_tag;
string obj_tag;
olh_obj.clear_instance();
RGWObjState *state = NULL;
- ObjectCtx *rctx = static_cast<ObjectCtx *>(ctx);
int ret;
do {
- ret = get_obj_state(rctx, olh_obj, &state, NULL, false); /* don't follow olh */
+ ret = get_obj_state(&obj_ctx, olh_obj, &state, NULL, false); /* don't follow olh */
if (ret < 0)
return ret;
ret = olh_init_modification(state, olh_obj, &obj_tag, &op_tag);
if (ret == -ECANCELED) {
- rctx->invalidate(olh_obj);
+ obj_ctx.invalidate(olh_obj);
continue;
}
if (ret < 0) {
return ret;
}
- ret = update_olh(ctx, state, bucket_owner, olh_obj);
+ ret = update_olh(obj_ctx, state, bucket_owner, olh_obj);
if (ret < 0) {
ldout(cct, 20) << "update_olh() target_obj=" << target_obj << " returned " << ret << dendl;
return ret;
return 0;
}
-int RGWRados::follow_olh(void *ctx, RGWObjState *state, rgw_obj& olh_obj, rgw_obj *target)
+int RGWRados::follow_olh(ObjectCtx& obj_ctx, RGWObjState *state, rgw_obj& olh_obj, rgw_obj *target)
{
map<string, bufferlist> pending_entries;
filter_attrset(state->attrset, RGW_ATTR_OLH_PENDING_PREFIX, &pending_entries);
if (!pending_entries.empty()) {
#warning FIXME: bucket_owner
string bucket_owner;
- int ret = update_olh(ctx, state, bucket_owner, olh_obj);
+ int ret = update_olh(obj_ctx, state, bucket_owner, olh_obj);
if (ret < 0) {
return ret;
}
};
WRITE_CLASS_ENCODER(RGWUploadPartInfo)
-class RGWPutObjProcessor
-{
-protected:
- RGWRados *store;
- void *obj_ctx;
- bool is_complete;
- string bucket_owner;
-
- virtual int do_complete(string& etag, time_t *mtime, time_t set_mtime,
- map<string, bufferlist>& attrs,
- const char *if_match = NULL, const char *if_nomatch = NULL) = 0;
-
-public:
- RGWPutObjProcessor(const string& _bo) : store(NULL), obj_ctx(NULL), is_complete(false), bucket_owner(_bo) {}
- virtual ~RGWPutObjProcessor() {}
- virtual int prepare(RGWRados *_store, void *_o, string *oid_rand) {
- store = _store;
- obj_ctx = _o;
- return 0;
- }
- virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again) = 0;
- virtual int throttle_data(void *handle, bool need_to_wait) = 0;
- virtual void complete_hash(MD5 *hash) {
- assert(0);
- }
- virtual int complete(string& etag, time_t *mtime, time_t set_mtime,
- map<string, bufferlist>& attrs,
- const char *if_match = NULL, const char *if_nomatch = NULL);
-
- CephContext *ctx();
-};
-
-struct put_obj_aio_info {
- void *handle;
-};
-
-class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
-{
- list<struct put_obj_aio_info> pending;
- size_t max_chunks;
-
- struct put_obj_aio_info pop_pending();
- int wait_pending_front();
- bool pending_has_completed();
-
- rgw_obj last_written_obj;
-
-protected:
- uint64_t obj_len;
-
- list<rgw_obj> written_objs;
-
- void add_written_obj(const rgw_obj& obj) {
- written_objs.push_back(obj);
- }
-
- int drain_pending();
- int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive);
-
-public:
- int throttle_data(void *handle, bool need_to_wait);
-
- RGWPutObjProcessor_Aio(const string& bucket_owner) : RGWPutObjProcessor(bucket_owner), max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {}
- virtual ~RGWPutObjProcessor_Aio();
-};
-
-class RGWPutObjProcessor_Atomic : public RGWPutObjProcessor_Aio
-{
- bufferlist first_chunk;
- uint64_t part_size;
- off_t cur_part_ofs;
- off_t next_part_ofs;
- int cur_part_id;
- off_t data_ofs;
-
- uint64_t extra_data_len;
- bufferlist extra_data_bl;
- bufferlist pending_data_bl;
- uint64_t max_chunk_size;
-
- bool versioned_object;
-
-protected:
- rgw_bucket bucket;
- string obj_str;
-
- string unique_tag;
-
- rgw_obj head_obj;
- rgw_obj cur_obj;
- RGWObjManifest manifest;
- RGWObjManifest::generator manifest_gen;
-
- int write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive);
- virtual int do_complete(string& etag, time_t *mtime, time_t set_mtime,
- map<string, bufferlist>& attrs,
- const char *if_match = NULL, const char *if_nomatch = NULL);
-
- int prepare_next_part(off_t ofs);
- int complete_parts();
- int complete_writing_data();
-
- int prepare_init(RGWRados *store, void *obj_ctx, string *oid_rand);
-
-public:
- ~RGWPutObjProcessor_Atomic() {}
- RGWPutObjProcessor_Atomic(const string& bucket_owner, rgw_bucket& _b, const string& _o, uint64_t _p, const string& _t, bool versioned) :
- RGWPutObjProcessor_Aio(bucket_owner),
- part_size(_p),
- cur_part_ofs(0),
- next_part_ofs(_p),
- cur_part_id(0),
- data_ofs(0),
- extra_data_len(0),
- max_chunk_size(0),
- versioned_object(versioned),
- bucket(_b),
- obj_str(_o),
- unique_tag(_t) {}
- int prepare(RGWRados *store, void *obj_ctx, string *oid_rand);
- virtual bool immutable_head() { return false; }
- void set_extra_data_len(uint64_t len) {
- extra_data_len = len;
- }
- virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again);
- virtual void complete_hash(MD5 *hash);
- bufferlist& get_extra_data() { return extra_data_bl; }
-};
-
-
struct RGWObjState {
rgw_obj obj;
bool is_atomic;
int get_obj_ref(const rgw_obj& obj, rgw_rados_ref *ref, rgw_bucket *bucket, bool ref_system_obj = false);
uint64_t max_bucket_id;
- int get_olh_target_state(ObjectCtx *rctx, rgw_obj& obj, RGWObjState *olh_state,
+ int get_olh_target_state(ObjectCtx& rctx, rgw_obj& obj, RGWObjState *olh_state,
RGWObjState **target_state, RGWObjVersionTracker *objv_tracker);
int get_obj_state_impl(ObjectCtx *rctx, rgw_obj& obj, RGWObjState **state, RGWObjVersionTracker *objv_tracker, bool follow_olh);
int append_atomic_test(ObjectCtx *rctx, rgw_obj& obj,
int get_state(RGWObjState **pstate);
int prepare_atomic_modification(librados::ObjectWriteOperation& op, bool reset_obj, const string *ptag,
- const string *ifmatch, const string *ifnomatch);
+ const char *ifmatch, const char *ifnomatch);
int complete_atomic_modification();
public:
string owner;
RGWObjCategory category;
int flags;
- const string *if_match;
- const string *if_nomatch;
+ const char *if_match;
+ const char *if_nomatch;
MetaParams() : mtime(NULL), rmattrs(NULL), data(NULL), manifest(NULL), ptag(NULL),
remove_objs(NULL), set_mtime(0), category(RGW_OBJ_CATEGORY_MAIN), flags(0),
* err: stores any errors resulting from the get of the original object
* Returns: 0 on success, -ERR# otherwise.
*/
- virtual int copy_obj(void *ctx,
+ virtual int copy_obj(ObjectCtx& obj_ctx,
const string& user_id,
const string& client_id,
const string& op_id,
void (*progress_cb)(off_t, void *),
void *progress_data);
- int copy_obj_data(void *ctx,
+ int copy_obj_data(ObjectCtx& obj_ctx,
RGWBucketInfo& dest_bucket_info,
void **handle, off_t end,
rgw_obj& dest_obj,
int bucket_index_read_olh_log(RGWObjState *state, rgw_obj& obj_instance, uint64_t ver_marker,
map<uint64_t, rgw_bucket_olh_log_entry> *log, bool *is_truncated);
int bucket_index_trim_olh_log(rgw_obj& obj_instance, uint64_t ver);
- int apply_olh_log(void *ctx, const string& bucket_owner, rgw_obj& obj,
+ int apply_olh_log(ObjectCtx& ctx, const string& bucket_owner, rgw_obj& obj,
bufferlist& obj_tag, map<uint64_t, rgw_bucket_olh_log_entry>& log,
uint64_t *plast_ver);
- int update_olh(void *ctx, RGWObjState *state, const string& bucket_owner, rgw_obj& obj);
- int set_olh(void *ctx, const string& bucket_owner, rgw_obj& target_obj, bool delete_marker);
+ int update_olh(ObjectCtx& obj_ctx, RGWObjState *state, const string& bucket_owner, rgw_obj& obj);
+ int set_olh(ObjectCtx& obj_ctx, const string& bucket_owner, rgw_obj& target_obj, bool delete_marker);
- int follow_olh(void *ctx, RGWObjState *state, rgw_obj& olh_obj, rgw_obj *target);
+ int follow_olh(ObjectCtx& ctx, RGWObjState *state, rgw_obj& olh_obj, rgw_obj *target);
int get_olh(rgw_obj& obj, RGWOLHInfo *olh);
void gen_rand_obj_instance_name(rgw_obj *target);
}
};
+class RGWPutObjProcessor
+{
+protected:
+ RGWRados *store;
+ RGWRados::ObjectCtx& obj_ctx;
+ bool is_complete;
+ string bucket_owner;
+
+ virtual int do_complete(string& etag, time_t *mtime, time_t set_mtime,
+ map<string, bufferlist>& attrs,
+ const char *if_match = NULL, const char *if_nomatch = NULL) = 0;
+
+public:
+ RGWPutObjProcessor(RGWRados::ObjectCtx& _obj_ctx, const string& _bo) : store(NULL), obj_ctx(_obj_ctx), is_complete(false), bucket_owner(_bo) {}
+ virtual ~RGWPutObjProcessor() {}
+ virtual int prepare(RGWRados *_store, string *oid_rand) {
+ store = _store;
+ return 0;
+ }
+ virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again) = 0;
+ virtual int throttle_data(void *handle, bool need_to_wait) = 0;
+ virtual void complete_hash(MD5 *hash) {
+ assert(0);
+ }
+ virtual int complete(string& etag, time_t *mtime, time_t set_mtime,
+ map<string, bufferlist>& attrs,
+ const char *if_match = NULL, const char *if_nomatch = NULL);
+
+ CephContext *ctx();
+};
+
+struct put_obj_aio_info {
+ void *handle;
+};
+
+class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
+{
+ list<struct put_obj_aio_info> pending;
+ size_t max_chunks;
+
+ struct put_obj_aio_info pop_pending();
+ int wait_pending_front();
+ bool pending_has_completed();
+
+ rgw_obj last_written_obj;
+
+protected:
+ uint64_t obj_len;
+
+ list<rgw_obj> written_objs;
+
+ void add_written_obj(const rgw_obj& obj) {
+ written_objs.push_back(obj);
+ }
+
+ int drain_pending();
+ int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive);
+
+public:
+ int throttle_data(void *handle, bool need_to_wait);
+
+ RGWPutObjProcessor_Aio(RGWRados::ObjectCtx& obj_ctx, const string& bucket_owner) : RGWPutObjProcessor(obj_ctx, bucket_owner), max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {}
+ virtual ~RGWPutObjProcessor_Aio();
+};
+
+class RGWPutObjProcessor_Atomic : public RGWPutObjProcessor_Aio
+{
+ bufferlist first_chunk;
+ uint64_t part_size;
+ off_t cur_part_ofs;
+ off_t next_part_ofs;
+ int cur_part_id;
+ off_t data_ofs;
+
+ uint64_t extra_data_len;
+ bufferlist extra_data_bl;
+ bufferlist pending_data_bl;
+ uint64_t max_chunk_size;
+
+ bool versioned_object;
+
+protected:
+ rgw_bucket bucket;
+ string obj_str;
+
+ string unique_tag;
+
+ rgw_obj head_obj;
+ rgw_obj cur_obj;
+ RGWObjManifest manifest;
+ RGWObjManifest::generator manifest_gen;
+
+ int write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive);
+ virtual int do_complete(string& etag, time_t *mtime, time_t set_mtime,
+ map<string, bufferlist>& attrs,
+ const char *if_match = NULL, const char *if_nomatch = NULL);
+
+ int prepare_next_part(off_t ofs);
+ int complete_parts();
+ int complete_writing_data();
+
+ int prepare_init(RGWRados *store, string *oid_rand);
+
+public:
+ ~RGWPutObjProcessor_Atomic() {}
+ RGWPutObjProcessor_Atomic(RGWRados::ObjectCtx& obj_ctx, const string& bucket_owner,
+ rgw_bucket& _b, const string& _o, uint64_t _p, const string& _t, bool versioned) :
+ RGWPutObjProcessor_Aio(obj_ctx, bucket_owner),
+ part_size(_p),
+ cur_part_ofs(0),
+ next_part_ofs(_p),
+ cur_part_id(0),
+ data_ofs(0),
+ extra_data_len(0),
+ max_chunk_size(0),
+ versioned_object(versioned),
+ bucket(_b),
+ obj_str(_o),
+ unique_tag(_t) {}
+ int prepare(RGWRados *store, string *oid_rand);
+ virtual bool immutable_head() { return false; }
+ void set_extra_data_len(uint64_t len) {
+ extra_data_len = len;
+ }
+ virtual int handle_data(bufferlist& bl, off_t ofs, MD5 *hash, void **phandle, bool *again);
+ virtual void complete_hash(MD5 *hash);
+ bufferlist& get_extra_data() { return extra_data_bl; }
+};
#endif