processor.emplace<MultipartObjectProcessor>(
&aio, store, s->bucket_info, pdest_placement,
s->owner.get_id(), obj_ctx, obj,
- multipart_upload_id, multipart_part_num, multipart_part_str);
+ multipart_upload_id, multipart_part_num, multipart_part_str, this);
} else if(append) {
if (s->bucket_info.versioned()) {
op_ret = -ERR_INVALID_BUCKET_STATE;
pdest_placement = &s->dest_placement;
processor.emplace<AppendObjectProcessor>(
&aio, store, s->bucket_info, pdest_placement, s->bucket_owner.get_id(),obj_ctx, obj,
- s->req_id, position, &cur_accounted_size);
+ s->req_id, position, &cur_accounted_size, this);
} else {
if (s->bucket_info.versioning_enabled()) {
if (!version_id.empty()) {
pdest_placement = &s->dest_placement;
processor.emplace<AtomicObjectProcessor>(
&aio, store, s->bucket_info, pdest_placement,
- s->bucket_owner.get_id(), obj_ctx, obj, olh_epoch, s->req_id);
+ s->bucket_owner.get_id(), obj_ctx, obj, olh_epoch, s->req_id, this);
}
op_ret = processor->prepare();
&s->dest_placement,
s->bucket_owner.get_id(),
*static_cast<RGWObjectCtx*>(s->obj_ctx),
- obj, 0, s->req_id);
+ obj, 0, s->req_id, this);
op_ret = processor.prepare();
if (op_ret < 0) {
return;
using namespace rgw::putobj;
AtomicObjectProcessor processor(&aio, store, binfo, &s->dest_placement, bowner.get_id(),
- obj_ctx, obj, 0, s->req_id);
+ obj_ctx, obj, 0, s->req_id, this);
op_ret = processor.prepare();
if (op_ret < 0) {
*/
for (const auto& obj : written) {
if (raw_head && obj == *raw_head) {
- ldout(store->ctx(), 5) << "NOTE: we should not process the head object (" << obj << ") here" << dendl;
+ ldpp_dout(dpp, 5) << "NOTE: we should not process the head object (" << obj << ") here" << dendl;
need_to_remove_head = true;
continue;
}
int r = store->delete_raw_obj(obj);
if (r < 0 && r != -ENOENT) {
- ldout(store->ctx(), 5) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
+ ldpp_dout(dpp, 5) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
}
}
if (need_to_remove_head) {
- ldout(store->ctx(), 5) << "NOTE: we are going to process the head obj (" << *raw_head << ")" << dendl;
+ ldpp_dout(dpp, 5) << "NOTE: we are going to process the head obj (" << *raw_head << ")" << dendl;
int r = store->delete_obj(obj_ctx, bucket_info, head_obj, 0, 0);
if (r < 0 && r != -ENOENT) {
- ldout(store->ctx(), 0) << "WARNING: failed to remove obj (" << *raw_head << "), leaked" << dendl;
+ ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << *raw_head << "), leaked" << dendl;
}
}
}
}
+
int AtomicObjectProcessor::process_first_chunk(bufferlist&& data,
DataProcessor **processor)
{
int r = store->get_max_chunk_size(tail_placement_rule, target_obj, &chunk_size, &alignment);
if (r < 0) {
- ldout(store->ctx(), 0) << "ERROR: unexpected: get_max_chunk_size(): placement_rule=" << tail_placement_rule.to_str() << " obj=" << target_obj << " returned r=" << r << dendl;
+ ldpp_dout(dpp, 0) << "ERROR: unexpected: get_max_chunk_size(): placement_rule=" << tail_placement_rule.to_str() << " obj=" << target_obj << " returned r=" << r << dendl;
return r;
}
store->get_max_aligned_size(default_stripe_size, alignment, &stripe_size);
bool compressed;
r = rgw_compression_info_from_attrset(attrs, compressed, info.cs_info);
if (r < 0) {
- ldout(store->ctx(), 1) << "cannot get compression info" << dendl;
+ ldpp_dout(dpp, 1) << "cannot get compression info" << dendl;
return r;
}
*cur_accounted_size = astate->accounted_size;
if (!astate->exists) {
if (position != 0) {
- ldout(store->ctx(), 5) << "ERROR: Append position should be zero" << dendl;
+ ldpp_dout(dpp, 5) << "ERROR: Append position should be zero" << dendl;
return -ERR_POSITION_NOT_EQUAL_TO_LENGTH;
} else {
cur_part_num = 1;
// check whether the object appendable
map<string, bufferlist>::iterator iter = astate->attrset.find(RGW_ATTR_APPEND_PART_NUM);
if (iter == astate->attrset.end()) {
- ldout(store->ctx(), 5) << "ERROR: The object is not appendable" << dendl;
+ ldpp_dout(dpp, 5) << "ERROR: The object is not appendable" << dendl;
return -ERR_OBJECT_NOT_APPENDABLE;
}
if (position != *cur_accounted_size) {
- ldout(store->ctx(), 5) << "ERROR: Append position should be equal to the obj size" << dendl;
+ ldpp_dout(dpp, 5) << "ERROR: Append position should be equal to the obj size" << dendl;
return -ERR_POSITION_NOT_EQUAL_TO_LENGTH;
}
try {
decode(cur_part_num, iter->second);
} catch (buffer::error& err) {
- ldout(store->ctx(), 5) << "ERROR: failed to decode part num" << dendl;
+ ldpp_dout(dpp, 5) << "ERROR: failed to decode part num" << dendl;
return -EIO;
}
cur_part_num++;
const rgw_obj head_obj;
RGWSI_RADOS::Obj stripe_obj; // current stripe object
RawObjSet written; // set of written objects for deletion
+ const DoutPrefixProvider *dpp;
public:
RadosWriter(Aio *aio, RGWRados *store, const RGWBucketInfo& bucket_info,
- RGWObjectCtx& obj_ctx, const rgw_obj& head_obj)
+ RGWObjectCtx& obj_ctx, const rgw_obj& head_obj, const DoutPrefixProvider *dpp)
: aio(aio), store(store), bucket_info(bucket_info),
- obj_ctx(obj_ctx), head_obj(head_obj)
+ obj_ctx(obj_ctx), head_obj(head_obj), dpp(dpp)
{}
~RadosWriter();
// when the operation completes successfully, clear the set of written objects
// so they aren't deleted on destruction
void clear_written() { written.clear(); }
+
};
// a rados object processor that stripes according to RGWObjManifest
const rgw_user& owner;
RGWObjectCtx& obj_ctx;
rgw_obj head_obj;
+ const DoutPrefixProvider *dpp;
RadosWriter writer;
RGWObjManifest manifest;
const RGWBucketInfo& bucket_info,
const rgw_placement_rule *ptail_placement_rule,
const rgw_user& owner, RGWObjectCtx& obj_ctx,
- const rgw_obj& head_obj)
+ const rgw_obj& head_obj,
+ const DoutPrefixProvider* dpp)
: HeadObjectProcessor(0),
store(store), bucket_info(bucket_info),
owner(owner),
obj_ctx(obj_ctx), head_obj(head_obj),
- writer(aio, store, bucket_info, obj_ctx, head_obj),
+ writer(aio, store, bucket_info, obj_ctx, head_obj, dpp),
+ dpp(dpp),
chunk(&writer, 0), stripe(&chunk, this, 0) {
if (ptail_placement_rule) {
tail_placement_rule = *ptail_placement_rule;
void set_tail_placement(const rgw_placement_rule&& tpr) {
tail_placement_rule = tpr;
}
+
};
const std::optional<uint64_t> olh_epoch;
const std::string unique_tag;
bufferlist first_chunk; // written with the head in complete()
+ const DoutPrefixProvider *dpp;
int process_first_chunk(bufferlist&& data, DataProcessor **processor) override;
public:
const rgw_user& owner,
RGWObjectCtx& obj_ctx, const rgw_obj& head_obj,
std::optional<uint64_t> olh_epoch,
- const std::string& unique_tag)
+ const std::string& unique_tag,
+ const DoutPrefixProvider *dpp)
: ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule,
- owner, obj_ctx, head_obj),
- olh_epoch(olh_epoch), unique_tag(unique_tag)
+ owner, obj_ctx, head_obj, dpp),
+ olh_epoch(olh_epoch), unique_tag(unique_tag), dpp(dpp)
{}
// prepare a trivial manifest
const std::string upload_id;
const int part_num;
const std::string part_num_str;
+ const DoutPrefixProvider *dpp;
RGWMPObj mp;
// write the first chunk and wait on aio->drain() for its completion.
const rgw_user& owner, RGWObjectCtx& obj_ctx,
const rgw_obj& head_obj,
const std::string& upload_id, uint64_t part_num,
- const std::string& part_num_str)
+ const std::string& part_num_str,
+ const DoutPrefixProvider *dpp)
: ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule,
- owner, obj_ctx, head_obj),
+ owner, obj_ctx, head_obj, dpp),
target_obj(head_obj), upload_id(upload_id),
- part_num(part_num), part_num_str(part_num_str),
- mp(head_obj.key.name, upload_id)
+ part_num(part_num), part_num_str(part_num_str), dpp(dpp),
+ mp(head_obj.key.name, upload_id)
{}
// prepare a multipart manifest
const char *if_match, const char *if_nomatch,
const std::string *user_data,
rgw_zone_set *zones_trace, bool *canceled) override;
+
};
class AppendObjectProcessor : public ManifestObjectProcessor {
uint64_t *cur_accounted_size;
string cur_etag;
const std::string unique_tag;
+ const DoutPrefixProvider *dpp;
RGWObjManifest *cur_manifest;
AppendObjectProcessor(Aio *aio, RGWRados *store, const RGWBucketInfo& bucket_info,
const rgw_placement_rule *ptail_placement_rule,
const rgw_user& owner, RGWObjectCtx& obj_ctx,const rgw_obj& head_obj,
- const std::string& unique_tag, uint64_t position, uint64_t *cur_accounted_size)
- : ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule, owner, obj_ctx, head_obj),
- position(position), cur_size(0), cur_accounted_size(cur_accounted_size),
+ const std::string& unique_tag, uint64_t position, uint64_t *cur_accounted_size, const DoutPrefixProvider *dpp)
+ : ManifestObjectProcessor(aio, store, bucket_info, ptail_placement_rule, owner, obj_ctx, head_obj, dpp),
+ position(position), cur_size(0), cur_accounted_size(cur_accounted_size), dpp(dpp),
unique_tag(unique_tag), cur_manifest(nullptr)
{}
int prepare() override;