}
}
if (need_rewrite) {
- ret = static_cast<rgw::sal::RadosStore*>(store)->getRados()->rewrite_obj(bucket->get_info(), obj.get(), dpp(), null_yield);
+ ret = static_cast<rgw::sal::RadosStore*>(store)->getRados()->rewrite_obj(obj.get(), dpp(), null_yield);
if (ret < 0) {
cerr << "ERROR: object rewrite returned: " << cpp_strerror(-ret) << std::endl;
return -ret;
if (!need_rewrite) {
formatter->dump_string("status", "Skipped");
} else {
- r = static_cast<rgw::sal::RadosStore*>(store)->getRados()->rewrite_obj(bucket->get_info(), obj.get(), dpp(), null_yield);
+ r = static_cast<rgw::sal::RadosStore*>(store)->getRados()->rewrite_obj(obj.get(), dpp(), null_yield);
if (r == 0) {
formatter->dump_string("status", "Success");
} else {
std::vector<compression_block> blocks;
public:
RGWPutObj_Compress(CephContext* cct_, CompressorRef compressor,
- rgw::putobj::DataProcessor *next)
+ rgw::sal::DataProcessor *next)
: Pipe(next), cct(cct_), compressor(compressor) {}
int process(bufferlist&& data, uint64_t logical_offset) override;
}
RGWPutObj_BlockEncrypt::RGWPutObj_BlockEncrypt(CephContext* cct,
- rgw::putobj::DataProcessor *next,
+ rgw::sal::DataProcessor *next,
std::unique_ptr<BlockCrypt> crypt)
: Pipe(next),
cct(cct),
const size_t block_size; /**< snapshot of \ref BlockCrypt.get_block_size() */
public:
RGWPutObj_BlockEncrypt(CephContext* cct,
- rgw::putobj::DataProcessor *next,
+ rgw::sal::DataProcessor *next,
std::unique_ptr<BlockCrypt> crypt);
int process(bufferlist&& data, uint64_t logical_offset) override;
namespace rgw::putobj {
int create_etag_verifier(const DoutPrefixProvider *dpp,
- CephContext* cct, DataProcessor* filter,
+ CephContext* cct, rgw::sal::DataProcessor* filter,
const bufferlist& manifest_bl,
const std::optional<RGWCompressionInfo>& compression,
etag_verifier_ptr& verifier)
string calculated_etag;
public:
- ETagVerifier(CephContext* cct_, rgw::putobj::DataProcessor *next)
+ ETagVerifier(CephContext* cct_, rgw::sal::DataProcessor *next)
: Pipe(next), cct(cct_) {}
virtual void calculate_etag() = 0;
class ETagVerifier_Atomic : public ETagVerifier
{
public:
- ETagVerifier_Atomic(CephContext* cct_, rgw::putobj::DataProcessor *next)
+ ETagVerifier_Atomic(CephContext* cct_, rgw::sal::DataProcessor *next)
: ETagVerifier(cct_, next) {}
int process(bufferlist&& data, uint64_t logical_offset) override;
public:
ETagVerifier_MPU(CephContext* cct,
std::vector<uint64_t> part_ofs,
- rgw::putobj::DataProcessor *next)
+ rgw::sal::DataProcessor *next)
: ETagVerifier(cct, next),
part_ofs(std::move(part_ofs))
{}
using etag_verifier_ptr = ceph::static_ptr<ETagVerifier, max_etag_verifier_size>;
int create_etag_verifier(const DoutPrefixProvider *dpp,
- CephContext* cct, DataProcessor* next,
+ CephContext* cct, rgw::sal::DataProcessor* next,
const bufferlist& manifest_bl,
const std::optional<RGWCompressionInfo>& compression,
etag_verifier_ptr& verifier);
version_id = state->object->get_instance();
}
}
- processor.emplace(&*aio, get_store(), state->bucket.get(),
- &state->dest_placement,
- state->bucket_owner.get_id(),
- *static_cast<RGWObjectCtx *>(state->obj_ctx),
- state->object->clone(), olh_epoch, state->req_id,
- this, state->yield);
+ processor = get_store()->get_atomic_writer(this, state->yield, state->object->clone(),
+ state->bucket_owner.get_id(), *state->obj_ctx,
+ &state->dest_placement, 0, state->req_id);
op_ret = processor->prepare(state->yield);
if (op_ret < 0) {
const std::string& obj_name;
RGWFileHandle* rgw_fh;
std::optional<rgw::BlockingAioThrottle> aio;
- std::optional<rgw::putobj::AtomicObjectProcessor> processor;
- rgw::putobj::DataProcessor* filter;
+ std::unique_ptr<rgw::sal::Writer> processor;
+ rgw::sal::DataProcessor* filter;
boost::optional<RGWPutObj_Compress> compressor;
CompressorRef plugin;
buffer::list data;
// create the object processor
auto aio = rgw::make_throttle(s->cct->_conf->rgw_put_obj_min_window_size,
s->yield);
- using namespace rgw::putobj;
- constexpr auto max_processor_size = std::max({sizeof(MultipartObjectProcessor),
- sizeof(AtomicObjectProcessor),
- sizeof(AppendObjectProcessor)});
- ceph::static_ptr<ObjectProcessor, max_processor_size> processor;
+ std::unique_ptr<rgw::sal::Writer> processor;
- rgw_placement_rule *pdest_placement;
+ rgw_placement_rule *pdest_placement = &s->dest_placement;
if (multipart) {
std::unique_ptr<rgw::sal::MultipartUpload> upload;
s->dest_placement = *pdest_placement;
pdest_placement = &s->dest_placement;
ldpp_dout(this, 20) << "dest_placement for part=" << *pdest_placement << dendl;
- processor.emplace<MultipartObjectProcessor>(
- &*aio, store, s->bucket.get(), pdest_placement,
- s->owner.get_id(), obj_ctx, s->object->clone(),
- multipart_upload_id, multipart_part_num, multipart_part_str,
- this, s->yield);
+ processor = upload->get_writer(this, s->yield, s->object->clone(),
+ s->user->get_id(), obj_ctx, pdest_placement,
+ multipart_part_num, multipart_part_str);
} else if(append) {
if (s->bucket->versioned()) {
op_ret = -ERR_INVALID_BUCKET_STATE;
return;
}
- pdest_placement = &s->dest_placement;
- processor.emplace<AppendObjectProcessor>(
- &*aio, store, s->bucket.get(), pdest_placement, s->bucket_owner.get_id(),
- obj_ctx, s->object->clone(),
- s->req_id, position, &cur_accounted_size, this, s->yield);
+ processor = store->get_append_writer(this, s->yield, s->object->clone(),
+ s->bucket_owner.get_id(), obj_ctx,
+ pdest_placement, s->req_id, position,
+ &cur_accounted_size);
} else {
if (s->bucket->versioning_enabled()) {
if (!version_id.empty()) {
version_id = s->object->get_instance();
}
}
- pdest_placement = &s->dest_placement;
- processor.emplace<AtomicObjectProcessor>(
- &*aio, store, s->bucket.get(), pdest_placement,
- s->bucket_owner.get_id(), obj_ctx, s->object->clone(),
- olh_epoch, s->req_id, this, s->yield);
+ processor = store->get_atomic_writer(this, s->yield, s->object->clone(),
+ s->bucket_owner.get_id(), obj_ctx,
+ pdest_placement, olh_epoch, s->req_id);
}
op_ret = processor->prepare(s->yield);
fst = copy_source_range_fst;
// no filters by default
- DataProcessor *filter = processor.get();
+ rgw::sal::DataProcessor *filter = processor.get();
const auto& compression_type = store->get_zone()->get_params().get_compression_type(*pdest_placement);
CompressorRef plugin;
boost::optional<RGWPutObj_Compress> compressor;
- std::unique_ptr<DataProcessor> encrypt;
+ std::unique_ptr<rgw::sal::DataProcessor> encrypt;
if (!append) { // compression and encryption only apply to full object uploads
op_ret = get_encrypt_filter(&encrypt, filter);
auto aio = rgw::make_throttle(s->cct->_conf->rgw_put_obj_min_window_size,
s->yield);
- using namespace rgw::putobj;
- AtomicObjectProcessor processor(&*aio, store, s->bucket.get(),
- &s->dest_placement,
- s->bucket_owner.get_id(),
- *static_cast<RGWObjectCtx*>(s->obj_ctx),
- std::move(obj), 0, s->req_id, this, s->yield);
- op_ret = processor.prepare(s->yield);
+ std::unique_ptr<rgw::sal::Writer> processor;
+ processor = store->get_atomic_writer(this, s->yield, std::move(obj),
+ s->bucket_owner.get_id(), *s->obj_ctx,
+ &s->dest_placement, 0, s->req_id);
+ op_ret = processor->prepare(s->yield);
if (op_ret < 0) {
return;
}
/* No filters by default. */
- DataProcessor *filter = &processor;
+ rgw::sal::DataProcessor *filter = processor.get();
- std::unique_ptr<DataProcessor> encrypt;
+ std::unique_ptr<rgw::sal::DataProcessor> encrypt;
op_ret = get_encrypt_filter(&encrypt, filter);
if (op_ret < 0) {
return;
emplace_attr(RGW_ATTR_COMPRESSION, std::move(tmp));
}
- op_ret = processor.complete(s->obj_size, etag, nullptr, real_time(), attrs,
+ op_ret = processor->complete(s->obj_size, etag, nullptr, real_time(), attrs,
(delete_at ? *delete_at : real_time()),
nullptr, nullptr, nullptr, nullptr, nullptr,
s->yield);
rgw_placement_rule dest_placement = s->dest_placement;
dest_placement.inherit_from(bucket->get_placement_rule());
- auto aio = rgw::make_throttle(s->cct->_conf->rgw_put_obj_min_window_size,
- s->yield);
-
- using namespace rgw::putobj;
- AtomicObjectProcessor processor(&*aio, store, bucket.get(), &s->dest_placement, bowner.get_id(),
- obj_ctx, std::move(obj), 0, s->req_id, this, s->yield);
-
- op_ret = processor.prepare(s->yield);
+ std::unique_ptr<rgw::sal::Writer> processor;
+ processor = store->get_atomic_writer(this, s->yield, std::move(obj),
+ bowner.get_id(), obj_ctx,
+ &s->dest_placement, 0, s->req_id);
+ op_ret = processor->prepare(s->yield);
if (op_ret < 0) {
ldpp_dout(this, 20) << "cannot prepare processor due to ret=" << op_ret << dendl;
return op_ret;
}
/* No filters by default. */
- DataProcessor *filter = &processor;
+ rgw::sal::DataProcessor *filter = processor.get();
const auto& compression_type = store->get_zone()->get_params().get_compression_type(
dest_placement);
}
/* Complete the transaction. */
- op_ret = processor.complete(size, etag, nullptr, ceph::real_time(),
+ op_ret = processor->complete(size, etag, nullptr, ceph::real_time(),
attrs, ceph::real_time() /* delete_at */,
nullptr, nullptr, nullptr, nullptr, nullptr,
s->yield);
*filter = nullptr;
return 0;
}
- virtual int get_encrypt_filter(std::unique_ptr<rgw::putobj::DataProcessor> *filter,
- rgw::putobj::DataProcessor *cb) {
+ virtual int get_encrypt_filter(std::unique_ptr<rgw::sal::DataProcessor> *filter,
+ rgw::sal::DataProcessor *cb) {
return 0;
}
void pre_exec() override;
void execute(optional_yield y) override;
- virtual int get_encrypt_filter(std::unique_ptr<rgw::putobj::DataProcessor> *filter,
- rgw::putobj::DataProcessor *cb) {
+ virtual int get_encrypt_filter(std::unique_ptr<rgw::sal::DataProcessor> *filter,
+ rgw::sal::DataProcessor *cb) {
return 0;
}
virtual int get_params(optional_yield y) = 0;
#pragma once
#include "include/buffer.h"
+#include "rgw_sal.h"
namespace rgw::putobj {
-// a simple streaming data processing abstraction
-class DataProcessor {
- public:
- virtual ~DataProcessor() {}
-
- // consume a bufferlist in its entirety at the given object offset. an
- // empty bufferlist is given to request that any buffered data be flushed,
- // though this doesn't wait for completions
- virtual int process(bufferlist&& data, uint64_t offset) = 0;
-};
-
// for composing data processors into a pipeline
-class Pipe : public DataProcessor {
- DataProcessor *next;
+class Pipe : public rgw::sal::DataProcessor {
+ rgw::sal::DataProcessor *next;
public:
- explicit Pipe() : next(nullptr) {}
- explicit Pipe(DataProcessor *next) : next(next) {}
+ explicit Pipe(rgw::sal::DataProcessor *next) : next(next) {}
// passes the data on to the next processor
int process(bufferlist&& data, uint64_t offset) override {
uint64_t chunk_size;
bufferlist chunk; // leftover bytes from the last call to process()
public:
- ChunkProcessor() {}
- ChunkProcessor(DataProcessor *next, uint64_t chunk_size)
+ ChunkProcessor(rgw::sal::DataProcessor *next, uint64_t chunk_size)
: Pipe(next), chunk_size(chunk_size)
{}
StripeGenerator *gen;
std::pair<uint64_t, uint64_t> bounds; // bounds of current stripe
public:
- StripeProcessor() {}
- StripeProcessor(DataProcessor *next, StripeGenerator *gen,
+ StripeProcessor(rgw::sal::DataProcessor *next, StripeGenerator *gen,
uint64_t first_stripe_size)
: Pipe(next), gen(gen), bounds(0, first_stripe_size)
{}
return processor->process(std::move(data), write_offset);
}
+
+static int process_completed(const AioResultList& completed, RawObjSet *written)
+{
+ std::optional<int> error;
+ for (auto& r : completed) {
+ if (r.result >= 0) {
+ written->insert(r.obj.get_ref().obj);
+ } else if (!error) { // record first error code
+ error = r.result;
+ }
+ }
+ return error.value_or(0);
+}
+
+int RadosWriter::set_stripe_obj(const rgw_raw_obj& raw_obj)
+{
+ stripe_obj = store->svc()->rados->obj(raw_obj);
+ return stripe_obj.open(dpp);
+}
+
+int RadosWriter::process(bufferlist&& bl, uint64_t offset)
+{
+ bufferlist data = std::move(bl);
+ const uint64_t cost = data.length();
+ if (cost == 0) { // no empty writes, use aio directly for creates
+ return 0;
+ }
+ librados::ObjectWriteOperation op;
+ if (offset == 0) {
+ op.write_full(data);
+ } else {
+ op.write(offset, data);
+ }
+ constexpr uint64_t id = 0; // unused
+ auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id);
+ return process_completed(c, &written);
+}
+
+int RadosWriter::write_exclusive(const bufferlist& data)
+{
+ const uint64_t cost = data.length();
+
+ librados::ObjectWriteOperation op;
+ op.create(true); // exclusive create
+ op.write_full(data);
+
+ constexpr uint64_t id = 0; // unused
+ auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id);
+ auto d = aio->drain();
+ c.splice(c.end(), d);
+ return process_completed(c, &written);
+}
+
+int RadosWriter::drain()
+{
+ return process_completed(aio->drain(), &written);
+}
+
+RadosWriter::~RadosWriter()
+{
+ // wait on any outstanding aio completions
+ process_completed(aio->drain(), &written);
+
+ bool need_to_remove_head = false;
+ std::optional<rgw_raw_obj> raw_head;
+ if (!rgw::sal::Object::empty(head_obj.get())) {
+ raw_head.emplace();
+ rgw::sal::RadosObject* obj = dynamic_cast<rgw::sal::RadosObject*>(head_obj.get());
+ obj->get_raw_obj(&*raw_head);
+ }
+
+ /**
+ * We should delete the object in the "multipart" namespace to avoid race condition.
+ * Such race condition is caused by the fact that the multipart object is the gatekeeper of a multipart
+ * upload, when it is deleted, a second upload would start with the same suffix("2/"), therefore, objects
+ * written by the second upload may be deleted by the first upload.
+ * details is describled on #11749
+ *
+ * The above comment still stands, but instead of searching for a specific object in the multipart
+ * namespace, we just make sure that we remove the object that is marked as the head object after
+ * we remove all the other raw objects. Note that we use different call to remove the head object,
+ * as this one needs to go via the bucket index prepare/complete 2-phase commit scheme.
+ */
+ for (const auto& obj : written) {
+ if (raw_head && obj == *raw_head) {
+ ldpp_dout(dpp, 5) << "NOTE: we should not process the head object (" << obj << ") here" << dendl;
+ need_to_remove_head = true;
+ continue;
+ }
+
+ int r = store->delete_raw_obj(dpp, obj);
+ if (r < 0 && r != -ENOENT) {
+ ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
+ }
+ }
+
+ if (need_to_remove_head) {
+ std::string version_id;
+ ldpp_dout(dpp, 5) << "NOTE: we are going to process the head obj (" << *raw_head << ")" << dendl;
+ int r = head_obj->delete_object(dpp, &obj_ctx, null_yield);
+ if (r < 0 && r != -ENOENT) {
+ ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << *raw_head << "), leaked" << dendl;
+ }
+ }
+}
+
+
// advance to the next stripe
int ManifestObjectProcessor::next(uint64_t offset, uint64_t *pstripe_size)
{
if (r < 0) {
return r;
}
- r = writer->set_stripe_obj(stripe_obj);
+ r = writer.set_stripe_obj(stripe_obj);
if (r < 0) {
return r;
}
- chunk = ChunkProcessor(writer.get(), chunk_size);
+ chunk = ChunkProcessor(&writer, chunk_size);
*pstripe_size = manifest_gen.cur_stripe_max_size();
return 0;
}
uint64_t chunk_size = 0;
uint64_t alignment;
- int r = head_obj->get_max_chunk_size(dpp, bucket->get_placement_rule(),
+ int r = head_obj->get_max_chunk_size(dpp, head_obj->get_bucket()->get_placement_rule(),
&max_head_chunk_size, &alignment);
if (r < 0) {
return r;
}
bool same_pool = true;
- if (bucket->get_placement_rule() != tail_placement_rule) {
- if (!head_obj->placement_rules_match(bucket->get_placement_rule(), tail_placement_rule)) {
+ if (head_obj->get_bucket()->get_placement_rule() != tail_placement_rule) {
+ if (!head_obj->placement_rules_match(head_obj->get_bucket()->get_placement_rule(), tail_placement_rule)) {
same_pool = false;
r = head_obj->get_max_chunk_size(dpp, tail_placement_rule, &chunk_size);
if (r < 0) {
rgw_obj obj = head_obj->get_obj();
r = manifest_gen.create_begin(store->ctx(), &manifest,
- bucket->get_placement_rule(),
+ head_obj->get_bucket()->get_placement_rule(),
&tail_placement_rule,
obj.bucket, obj);
if (r < 0) {
rgw_raw_obj stripe_obj = manifest_gen.get_cur_obj(store);
- r = writer->set_stripe_obj(stripe_obj);
+ r = writer.set_stripe_obj(stripe_obj);
if (r < 0) {
return r;
}
set_head_chunk_size(head_max_size);
// initialize the processors
- chunk = ChunkProcessor(writer.get(), chunk_size);
+ chunk = ChunkProcessor(&writer, chunk_size);
stripe = StripeProcessor(&chunk, this, head_max_size);
return 0;
}
rgw_zone_set *zones_trace,
bool *pcanceled, optional_yield y)
{
- int r = writer->drain();
+ int r = writer.drain();
if (r < 0) {
return r;
}
std::unique_ptr<rgw::sal::Object::WriteOp> obj_op = head_obj->get_write_op(&obj_ctx);
/* some object types shouldn't be versioned, e.g., multipart parts */
- obj_op->params.versioning_disabled = !bucket->versioning_enabled();
+ obj_op->params.versioning_disabled = !head_obj->get_bucket()->versioning_enabled();
obj_op->params.data = &first_chunk;
obj_op->params.manifest = &manifest;
obj_op->params.ptag = &unique_tag; /* use req_id as operation tag */
}
if (!obj_op->params.canceled) {
// on success, clear the set of objects for deletion
- writer->clear_written();
+ writer.clear_written();
}
if (pcanceled) {
*pcanceled = obj_op->params.canceled;
{
// write the first chunk of the head object as part of an exclusive create,
// then drain to wait for the result in case of EEXIST
- int r = writer->write_exclusive(data);
+ int r = writer.write_exclusive(data);
if (r == -EEXIST) {
// randomize the oid prefix and reprepare the head/manifest
std::string oid_rand = gen_rand_alphanumeric(store->ctx(), 32);
return r;
}
// resubmit the write op on the new head object
- r = writer->write_exclusive(data);
+ r = writer.write_exclusive(data);
}
if (r < 0) {
return r;
manifest.set_multipart_part_rule(stripe_size, part_num);
r = manifest_gen.create_begin(store->ctx(), &manifest,
- bucket->get_placement_rule(),
+ head_obj->get_bucket()->get_placement_rule(),
&tail_placement_rule,
target_obj->get_bucket()->get_key(),
target_obj->get_obj());
head_obj->raw_obj_to_obj(stripe_obj);
head_obj->set_hash_source(target_obj->get_name());
- r = writer->set_stripe_obj(stripe_obj);
+ r = writer.set_stripe_obj(stripe_obj);
if (r < 0) {
return r;
}
stripe_size = manifest_gen.cur_stripe_max_size();
set_head_chunk_size(stripe_size);
- chunk = ChunkProcessor(writer.get(), chunk_size);
+ chunk = ChunkProcessor(&writer, chunk_size);
stripe = StripeProcessor(&chunk, this, stripe_size);
return 0;
}
rgw_zone_set *zones_trace,
bool *pcanceled, optional_yield y)
{
- int r = writer->drain();
+ int r = writer.drain();
if (r < 0) {
return r;
}
encode(info, bl);
std::unique_ptr<rgw::sal::Object> meta_obj =
- bucket->get_object(rgw_obj_key(mp.get_meta(), std::string(), RGW_OBJ_NS_MULTIPART));
+ head_obj->get_bucket()->get_object(rgw_obj_key(mp.get_meta(), std::string(), RGW_OBJ_NS_MULTIPART));
meta_obj->set_in_extra_data(true);
r = meta_obj->omap_set_val_by_key(dpp, p, bl, true, null_yield);
if (!obj_op->params.canceled) {
// on success, clear the set of objects for deletion
- writer->clear_written();
+ writer.clear_written();
}
if (pcanceled) {
*pcanceled = obj_op->params.canceled;
return 0;
}
-int AppendObjectProcessor::process_first_chunk(bufferlist &&data, rgw::putobj::DataProcessor **processor)
+int AppendObjectProcessor::process_first_chunk(bufferlist &&data, rgw::sal::DataProcessor **processor)
{
- int r = writer->write_exclusive(data);
+ int r = writer.write_exclusive(data);
if (r < 0) {
return r;
}
rgw_obj obj = head_obj->get_obj();
- r = manifest_gen.create_begin(store->ctx(), &manifest, bucket->get_placement_rule(), &tail_placement_rule, obj.bucket, obj);
+ r = manifest_gen.create_begin(store->ctx(), &manifest, head_obj->get_bucket()->get_placement_rule(), &tail_placement_rule, obj.bucket, obj);
if (r < 0) {
return r;
}
if (r < 0) {
return r;
}
- r = writer->set_stripe_obj(std::move(stripe_obj));
+ r = writer.set_stripe_obj(std::move(stripe_obj));
if (r < 0) {
return r;
}
set_head_chunk_size(max_head_size);
// initialize the processors
- chunk = ChunkProcessor(writer.get(), chunk_size);
+ chunk = ChunkProcessor(&writer, chunk_size);
stripe = StripeProcessor(&chunk, this, stripe_size);
return 0;
const string *user_data, rgw_zone_set *zones_trace, bool *pcanceled,
optional_yield y)
{
- int r = writer->drain();
+ int r = writer.drain();
if (r < 0)
return r;
const uint64_t actual_size = get_actual_size();
}
if (!obj_op->params.canceled) {
// on success, clear the set of objects for deletion
- writer->clear_written();
+ writer.clear_written();
}
if (pcanceled) {
*pcanceled = obj_op->params.canceled;
namespace rgw {
+namespace sal {
+ class RadosStore;
+}
+
class Aio;
namespace putobj {
-// a data consumer that writes an object in a bucket
-class ObjectProcessor : public DataProcessor {
- public:
- // prepare to start processing object data
- virtual int prepare(optional_yield y) = 0;
-
- // complete the operation and make its result visible to clients
- virtual int complete(size_t accounted_size, const std::string& etag,
- ceph::real_time *mtime, ceph::real_time set_mtime,
- std::map<std::string, bufferlist>& attrs,
- ceph::real_time delete_at,
- const char *if_match, const char *if_nomatch,
- const std::string *user_data,
- rgw_zone_set *zones_trace, bool *canceled,
- optional_yield y) = 0;
-};
-
// an object processor with special handling for the first chunk of the head.
// the virtual process_first_chunk() function returns a processor to handle the
// rest of the object
-class HeadObjectProcessor : public ObjectProcessor {
+class HeadObjectProcessor : public rgw::sal::ObjectProcessor {
uint64_t head_chunk_size;
// buffer to capture the first chunk of the head object
bufferlist head_data;
// initialized after process_first_chunk() to process everything else
- DataProcessor *processor = nullptr;
+ rgw::sal::DataProcessor *processor = nullptr;
uint64_t data_offset = 0; // maximum offset of data written (ie compressed)
protected:
uint64_t get_actual_size() const { return data_offset; }
// process the first chunk of data and return a processor for the rest
virtual int process_first_chunk(bufferlist&& data,
- DataProcessor **processor) = 0;
+ rgw::sal::DataProcessor **processor) = 0;
public:
HeadObjectProcessor(uint64_t head_chunk_size)
: head_chunk_size(head_chunk_size)
int process(bufferlist&& data, uint64_t logical_offset) final override;
};
+using RawObjSet = std::set<rgw_raw_obj>;
+
+// a data sink that writes to rados objects and deletes them on cancelation
+class RadosWriter : public rgw::sal::DataProcessor {
+ Aio *const aio;
+ rgw::sal::RadosStore *const store;
+ RGWObjectCtx& obj_ctx;
+ std::unique_ptr<rgw::sal::Object> head_obj;
+ RGWSI_RADOS::Obj stripe_obj; // current stripe object
+ RawObjSet written; // set of written objects for deletion
+ const DoutPrefixProvider *dpp;
+ optional_yield y;
+
+ public:
+ RadosWriter(Aio *aio, rgw::sal::RadosStore *store,
+ RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object> _head_obj,
+ const DoutPrefixProvider *dpp, optional_yield y)
+ : aio(aio), store(store),
+ obj_ctx(obj_ctx), head_obj(std::move(_head_obj)), dpp(dpp), y(y)
+ {}
+ RadosWriter(RadosWriter&& r)
+ : aio(r.aio), store(r.store),
+ obj_ctx(r.obj_ctx), head_obj(std::move(r.head_obj)), dpp(r.dpp), y(r.y)
+ {}
+
+ ~RadosWriter();
+
+ // change the current stripe object
+ int set_stripe_obj(const rgw_raw_obj& obj);
+
+ // write the data at the given offset of the current stripe object
+ int process(bufferlist&& data, uint64_t stripe_offset) override;
+
+ // write the data as an exclusive create and wait for it to complete
+ int write_exclusive(const bufferlist& data);
+
+ int drain();
+
+ // when the operation completes successfully, clear the set of written objects
+ // so they aren't deleted on destruction
+ void clear_written() { written.clear(); }
+
+};
+
// a rados object processor that stripes according to RGWObjManifest
class ManifestObjectProcessor : public HeadObjectProcessor,
public StripeGenerator {
protected:
- rgw::sal::Store* const store;
- rgw::sal::Bucket* bucket;
+ rgw::sal::RadosStore* const store;
rgw_placement_rule tail_placement_rule;
rgw_user owner;
RGWObjectCtx& obj_ctx;
std::unique_ptr<rgw::sal::Object> head_obj;
- std::unique_ptr<rgw::sal::Writer> writer;
+ RadosWriter writer;
RGWObjManifest manifest;
RGWObjManifest::generator manifest_gen;
ChunkProcessor chunk;
int next(uint64_t offset, uint64_t *stripe_size) override;
public:
- ManifestObjectProcessor(Aio *aio, rgw::sal::Store* store,
- rgw::sal::Bucket* bucket,
+ ManifestObjectProcessor(Aio *aio, rgw::sal::RadosStore* store,
const rgw_placement_rule *ptail_placement_rule,
const rgw_user& owner, RGWObjectCtx& obj_ctx,
std::unique_ptr<rgw::sal::Object> _head_obj,
const DoutPrefixProvider* dpp, optional_yield y)
: HeadObjectProcessor(0),
- store(store), bucket(bucket),
+ store(store),
owner(owner),
obj_ctx(obj_ctx), head_obj(std::move(_head_obj)),
- dpp(dpp) {
- writer = store->get_writer(aio, bucket, obj_ctx, head_obj->clone(), dpp, y);
- chunk = ChunkProcessor(writer.get(), 0);
- stripe = StripeProcessor(&chunk, this, 0);
+ writer(aio, store, obj_ctx, head_obj->clone(), dpp, y),
+ chunk(&writer, 0), stripe(&chunk, this, 0), dpp(dpp) {
if (ptail_placement_rule) {
tail_placement_rule = *ptail_placement_rule;
}
const std::string unique_tag;
bufferlist first_chunk; // written with the head in complete()
- int process_first_chunk(bufferlist&& data, DataProcessor **processor) override;
+ int process_first_chunk(bufferlist&& data, rgw::sal::DataProcessor **processor) override;
public:
- AtomicObjectProcessor(Aio *aio, rgw::sal::Store* store,
- rgw::sal::Bucket* bucket,
+ AtomicObjectProcessor(Aio *aio, rgw::sal::RadosStore* store,
const rgw_placement_rule *ptail_placement_rule,
const rgw_user& owner,
RGWObjectCtx& obj_ctx,
std::optional<uint64_t> olh_epoch,
const std::string& unique_tag,
const DoutPrefixProvider *dpp, optional_yield y)
- : ManifestObjectProcessor(aio, store, bucket, ptail_placement_rule,
+ : ManifestObjectProcessor(aio, store, ptail_placement_rule,
owner, obj_ctx, std::move(_head_obj), dpp, y),
olh_epoch(olh_epoch), unique_tag(unique_tag)
{}
// write the first chunk and wait on aio->drain() for its completion.
// on EEXIST, retry with random prefix
- int process_first_chunk(bufferlist&& data, DataProcessor **processor) override;
+ int process_first_chunk(bufferlist&& data, rgw::sal::DataProcessor **processor) override;
// prepare the head stripe and manifest
int prepare_head();
public:
- MultipartObjectProcessor(Aio *aio, rgw::sal::Store* store,
- rgw::sal::Bucket* bucket,
+ MultipartObjectProcessor(Aio *aio, rgw::sal::RadosStore* store,
const rgw_placement_rule *ptail_placement_rule,
const rgw_user& owner, RGWObjectCtx& obj_ctx,
std::unique_ptr<rgw::sal::Object> _head_obj,
const std::string& upload_id, uint64_t part_num,
const std::string& part_num_str,
const DoutPrefixProvider *dpp, optional_yield y)
- : ManifestObjectProcessor(aio, store, bucket, ptail_placement_rule,
+ : ManifestObjectProcessor(aio, store, ptail_placement_rule,
owner, obj_ctx, std::move(_head_obj), dpp, y),
target_obj(head_obj->clone()), upload_id(upload_id),
part_num(part_num), part_num_str(part_num_str),
RGWObjManifest *cur_manifest;
- int process_first_chunk(bufferlist&& data, DataProcessor **processor) override;
+ int process_first_chunk(bufferlist&& data, rgw::sal::DataProcessor **processor) override;
public:
- AppendObjectProcessor(Aio *aio, rgw::sal::Store* store,
- rgw::sal::Bucket* bucket,
+ AppendObjectProcessor(Aio *aio, rgw::sal::RadosStore* store,
const rgw_placement_rule *ptail_placement_rule,
const rgw_user& owner, RGWObjectCtx& obj_ctx,
std::unique_ptr<rgw::sal::Object> _head_obj,
const std::string& unique_tag, uint64_t position,
uint64_t *cur_accounted_size,
const DoutPrefixProvider *dpp, optional_yield y)
- : ManifestObjectProcessor(aio, store, bucket, ptail_placement_rule,
+ : ManifestObjectProcessor(aio, store, ptail_placement_rule,
owner, obj_ctx, std::move(_head_obj), dpp, y),
position(position), cur_size(0), cur_accounted_size(cur_accounted_size),
unique_tag(unique_tag), cur_manifest(nullptr)
const DoutPrefixProvider *dpp;
CephContext* cct;
rgw_obj obj;
- rgw::putobj::DataProcessor *filter;
+ rgw::sal::DataProcessor *filter;
boost::optional<RGWPutObj_Compress>& compressor;
bool try_etag_verify;
rgw::putobj::etag_verifier_ptr etag_verifier;
boost::optional<rgw::putobj::ChunkProcessor> buffering;
CompressorRef& plugin;
- rgw::putobj::ObjectProcessor *processor;
+ rgw::sal::ObjectProcessor *processor;
void (*progress_cb)(off_t, void *);
void *progress_data;
bufferlist extra_data_bl, manifest_bl;
CephContext* cct,
CompressorRef& plugin,
boost::optional<RGWPutObj_Compress>& compressor,
- rgw::putobj::ObjectProcessor *p,
+ rgw::sal::ObjectProcessor *p,
void (*_progress_cb)(off_t, void *),
void *_progress_data,
std::function<int(map<string, bufferlist>&)> _attrs_handler) :
}
}
-int RGWRados::rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw::sal::Object* obj, const DoutPrefixProvider *dpp, optional_yield y)
+int RGWRados::rewrite_obj(rgw::sal::Object* obj, const DoutPrefixProvider *dpp, optional_yield y)
{
RGWObjectCtx rctx(this->store);
- rgw::sal::RadosBucket bucket(store, dest_bucket_info);
- return obj->copy_obj_data(rctx, &bucket, obj, 0, NULL, dpp, y);
+ return obj->copy_obj_data(rctx, obj->get_bucket(), obj, 0, NULL, dpp, y);
}
struct obj_time_weight {
rgw::BlockingAioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
using namespace rgw::putobj;
- AtomicObjectProcessor processor(&aio, this->store, dest_bucket, nullptr, user_id,
+ AtomicObjectProcessor processor(&aio, this->store, nullptr, user_id,
obj_ctx, dest_obj->clone(), olh_epoch,
tag, dpp, null_yield);
RGWRESTConn *conn;
using namespace rgw::putobj;
// do not change the null_yield in the initialization of this AtomicObjectProcessor
// it causes crashes in the ragweed tests
- AtomicObjectProcessor processor(&aio, this->store, bucket, &dest_placement,
+ AtomicObjectProcessor processor(&aio, this->store, &dest_placement,
bucket->get_info().owner, obj_ctx,
dest_obj->clone(), olh_epoch, tag,
dpp, null_yield);
return 0;
}
RGWRados *store = target->get_store();
- BucketShard *bs;
+ BucketShard *bs = nullptr;
int ret = get_bucket_shard(&bs, dpp);
if (ret < 0) {
D3nDataCache* d3n_data_cache{nullptr};
- int rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw::sal::Object* obj, const DoutPrefixProvider *dpp, optional_yield y);
+ int rewrite_obj(rgw::sal::Object* obj, const DoutPrefixProvider *dpp, optional_yield y);
int stat_remote_obj(const DoutPrefixProvider *dpp,
RGWObjectCtx& obj_ctx,
}
int RGWPutObj_ObjStore_S3::get_encrypt_filter(
- std::unique_ptr<rgw::putobj::DataProcessor> *filter,
- rgw::putobj::DataProcessor *cb)
+ std::unique_ptr<rgw::sal::DataProcessor> *filter,
+ rgw::sal::DataProcessor *cb)
{
int res = 0;
if (!multipart_upload_id.empty()) {
}
int RGWPostObj_ObjStore_S3::get_encrypt_filter(
- std::unique_ptr<rgw::putobj::DataProcessor> *filter,
- rgw::putobj::DataProcessor *cb)
+ std::unique_ptr<rgw::sal::DataProcessor> *filter,
+ rgw::sal::DataProcessor *cb)
{
std::unique_ptr<BlockCrypt> block_crypt;
int res = rgw_s3_prepare_encrypt(s, attrs, &parts, &block_crypt,
int get_data(bufferlist& bl) override;
void send_response() override;
- int get_encrypt_filter(std::unique_ptr<rgw::putobj::DataProcessor> *filter,
- rgw::putobj::DataProcessor *cb) override;
+ int get_encrypt_filter(std::unique_ptr<rgw::sal::DataProcessor> *filter,
+ rgw::sal::DataProcessor *cb) override;
int get_decrypt_filter(std::unique_ptr<RGWGetObj_Filter>* filter,
RGWGetObj_Filter* cb,
map<string, bufferlist>& attrs,
void send_response() override;
int get_data(ceph::bufferlist& bl, bool& again) override;
- int get_encrypt_filter(std::unique_ptr<rgw::putobj::DataProcessor> *filter,
- rgw::putobj::DataProcessor *cb) override;
+ int get_encrypt_filter(std::unique_ptr<rgw::sal::DataProcessor> *filter,
+ rgw::sal::DataProcessor *cb) override;
};
class RGWDeleteObj_ObjStore_S3 : public RGWDeleteObj_ObjStore {
#include "rgw_user.h"
#include "rgw_notify_event_type.h"
-#include "rgw_putobj.h"
class RGWGetDataCB;
struct RGWObjState;
ATTRSMOD_MERGE = 2
};
+// a simple streaming data processing abstraction
+class DataProcessor {
+ public:
+ virtual ~DataProcessor() {}
+
+ // consume a bufferlist in its entirety at the given object offset. an
+ // empty bufferlist is given to request that any buffered data be flushed,
+ // though this doesn't wait for completions
+ virtual int process(bufferlist&& data, uint64_t offset) = 0;
+};
+
+// a data consumer that writes an object in a bucket
+class ObjectProcessor : public DataProcessor {
+ public:
+ // prepare to start processing object data
+ virtual int prepare(optional_yield y) = 0;
+
+ // complete the operation and make its result visible to clients
+ virtual int complete(size_t accounted_size, const std::string& etag,
+ ceph::real_time *mtime, ceph::real_time set_mtime,
+ std::map<std::string, bufferlist>& attrs,
+ ceph::real_time delete_at,
+ const char *if_match, const char *if_nomatch,
+ const std::string *user_data,
+ rgw_zone_set *zones_trace, bool *canceled,
+ optional_yield y) = 0;
+};
+
/**
* Base class for AIO completions
*/
virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s,
rgw::notify::EventType event_type, const std::string* object_name=nullptr) = 0;
virtual std::unique_ptr<GCChain> get_gc_chain(rgw::sal::Object* obj) = 0;
- virtual std::unique_ptr<Writer> get_writer(Aio* aio, rgw::sal::Bucket* bucket,
- RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object> _head_obj,
- const DoutPrefixProvider* dpp, optional_yield y) = 0;
virtual RGWLC* get_rgwlc(void) = 0;
virtual RGWCoroutinesManagerRegistry* get_cr_registry() = 0;
virtual int delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj) = 0;
const std::string& tenant,
vector<std::unique_ptr<RGWOIDCProvider>>& providers) = 0;
virtual std::unique_ptr<MultipartUpload> get_multipart_upload(Bucket* bucket, const std::string& oid, std::optional<std::string> upload_id=std::nullopt, ceph::real_time mtime=real_clock::now()) = 0;
+ virtual std::unique_ptr<Writer> get_append_writer(const DoutPrefixProvider *dpp,
+ optional_yield y,
+ std::unique_ptr<rgw::sal::Object> _head_obj,
+ const rgw_user& owner, RGWObjectCtx& obj_ctx,
+ const rgw_placement_rule *ptail_placement_rule,
+ const std::string& unique_tag,
+ uint64_t position,
+ uint64_t *cur_accounted_size) = 0;
+ virtual std::unique_ptr<Writer> get_atomic_writer(const DoutPrefixProvider *dpp,
+ optional_yield y,
+ std::unique_ptr<rgw::sal::Object> _head_obj,
+ const rgw_user& owner, RGWObjectCtx& obj_ctx,
+ const rgw_placement_rule *ptail_placement_rule,
+ uint64_t olh_epoch,
+ const std::string& unique_tag) = 0;
virtual void finalize(void) = 0;
virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs = nullptr) = 0;
+ virtual std::unique_ptr<Writer> get_writer(const DoutPrefixProvider *dpp,
+ optional_yield y,
+ std::unique_ptr<rgw::sal::Object> _head_obj,
+ const rgw_user& owner, RGWObjectCtx& obj_ctx,
+ const rgw_placement_rule *ptail_placement_rule,
+ uint64_t part_num,
+ const std::string& part_num_str) = 0;
+
friend inline ostream& operator<<(ostream& out, const MultipartUpload& u) {
out << u.get_meta();
if (!u.get_upload_id().empty())
virtual void delete_inline(const DoutPrefixProvider *dpp, const std::string& tag) = 0;
};
-using RawObjSet = std::set<rgw_raw_obj>;
-
-class Writer : public rgw::putobj::DataProcessor {
+class Writer : public ObjectProcessor {
protected:
- Aio* const aio;
- rgw::sal::Bucket* bucket;
- RGWObjectCtx& obj_ctx;
- std::unique_ptr<rgw::sal::Object> head_obj;
- RawObjSet written; // set of written objects for deletion
const DoutPrefixProvider* dpp;
- optional_yield y;
-
- public:
- Writer(Aio* aio,
- rgw::sal::Bucket* bucket,
- RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object> _head_obj,
- const DoutPrefixProvider* dpp, optional_yield y)
- : aio(aio), bucket(bucket),
- obj_ctx(obj_ctx), head_obj(std::move(_head_obj)), dpp(dpp), y(y)
- {}
- Writer(Writer&& r)
- : aio(r.aio), bucket(r.bucket),
- obj_ctx(r.obj_ctx), head_obj(std::move(r.head_obj)), dpp(r.dpp), y(r.y)
- {}
-
- ~Writer() = default;
-
- // change the current stripe object
- virtual int set_stripe_obj(const rgw_raw_obj& obj) = 0;
-
- // write the data as an exclusive create and wait for it to complete
- virtual int write_exclusive(const bufferlist& data) = 0;
-
- virtual int drain() = 0;
-
- // when the operation completes successfully, clear the set of written objects
- // so they aren't deleted on destruction
- virtual void clear_written() { written.clear(); }
+public:
+ Writer(const DoutPrefixProvider *_dpp, optional_yield y) : dpp(_dpp) {}
+ virtual ~Writer() = default;
+
+ // prepare to start processing object data
+ virtual int prepare(optional_yield y) = 0;
+
+ // Process a bufferlist
+ virtual int process(bufferlist&& data, uint64_t offset) = 0;
+
+ // complete the operation and make its result visible to clients
+ virtual int complete(size_t accounted_size, const std::string& etag,
+ ceph::real_time *mtime, ceph::real_time set_mtime,
+ std::map<std::string, bufferlist>& attrs,
+ ceph::real_time delete_at,
+ const char *if_match, const char *if_nomatch,
+ const std::string *user_data,
+ rgw_zone_set *zones_trace, bool *canceled,
+ optional_yield y) = 0;
};
class Zone {
#include "rgw_multi.h"
#include "rgw_acl_s3.h"
#include "rgw_aio.h"
+#include "rgw_aio_throttle.h"
#include "rgw_zone.h"
#include "rgw_rest_conn.h"
return 0;
}
-static int process_completed(const AioResultList& completed, RawObjSet* written)
-{
- std::optional<int> error;
- for (auto& r : completed) {
- if (r.result >= 0) {
- written->insert(r.obj.get_ref().obj);
- } else if (!error) { // record first error code
- error = r.result;
- }
- }
- return error.value_or(0);
-}
-
int RadosCompletions::drain()
{
int ret = 0;
return std::unique_ptr<GCChain>(new RadosGCChain(this, obj));
}
-std::unique_ptr<Writer> RadosStore::get_writer(Aio* aio, rgw::sal::Bucket* bucket,
- RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object> _head_obj,
- const DoutPrefixProvider* dpp, optional_yield y)
-{
- return std::unique_ptr<Writer>(new RadosWriter(aio, this, bucket, obj_ctx, std::move(_head_obj), dpp, y));
-}
-
int RadosStore::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj)
{
return rados->delete_raw_obj(dpp, obj);
return std::unique_ptr<MultipartUpload>(new RadosMultipartUpload(this, bucket, oid, upload_id, mtime));
}
+std::unique_ptr<Writer> RadosStore::get_append_writer(const DoutPrefixProvider *dpp,
+ optional_yield y,
+ std::unique_ptr<rgw::sal::Object> _head_obj,
+ const rgw_user& owner, RGWObjectCtx& obj_ctx,
+ const rgw_placement_rule *ptail_placement_rule,
+ const std::string& unique_tag,
+ uint64_t position,
+ uint64_t *cur_accounted_size)
+{
+ auto aio = rgw::make_throttle(ctx()->_conf->rgw_put_obj_min_window_size, y);
+ return std::unique_ptr<Writer>(new RadosAppendWriter(dpp, y,
+ std::move(_head_obj),
+ this, std::move(aio), owner, obj_ctx,
+ ptail_placement_rule,
+ unique_tag, position,
+ cur_accounted_size));
+}
+
+std::unique_ptr<Writer> RadosStore::get_atomic_writer(const DoutPrefixProvider *dpp,
+ optional_yield y,
+ std::unique_ptr<rgw::sal::Object> _head_obj,
+ const rgw_user& owner, RGWObjectCtx& obj_ctx,
+ const rgw_placement_rule *ptail_placement_rule,
+ uint64_t olh_epoch,
+ const std::string& unique_tag)
+{
+ auto aio = rgw::make_throttle(ctx()->_conf->rgw_put_obj_min_window_size, y);
+ return std::unique_ptr<Writer>(new RadosAtomicWriter(dpp, y,
+ std::move(_head_obj),
+ this, std::move(aio), owner, obj_ctx,
+ ptail_placement_rule,
+ olh_epoch, unique_tag));
+}
+
int RadosStore::get_obj_head_ioctx(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, const rgw_obj& obj, librados::IoCtx* ioctx)
{
return rados->get_obj_head_ioctx(dpp, bucket_info, obj, ioctx);
return 0;
}
+std::unique_ptr<Writer> RadosMultipartUpload::get_writer(
+ const DoutPrefixProvider *dpp,
+ optional_yield y,
+ std::unique_ptr<rgw::sal::Object> _head_obj,
+ const rgw_user& owner, RGWObjectCtx& obj_ctx,
+ const rgw_placement_rule *ptail_placement_rule,
+ uint64_t part_num,
+ const std::string& part_num_str)
+{
+ auto aio = rgw::make_throttle(store->ctx()->_conf->rgw_put_obj_min_window_size, y);
+ return std::unique_ptr<Writer>(new RadosMultipartWriter(dpp, y, this,
+ std::move(_head_obj), store, std::move(aio), owner,
+ obj_ctx, ptail_placement_rule, part_num, part_num_str));
+}
+
MPRadosSerializer::MPRadosSerializer(const DoutPrefixProvider *dpp, RadosStore* store, RadosObject* obj, const std::string& lock_name) :
lock(lock_name)
{
store->getRados()->delete_objs_inline(dpp, chain, tag);
}
-int RadosWriter::set_stripe_obj(const rgw_raw_obj& raw_obj)
+int RadosAtomicWriter::prepare(optional_yield y)
{
- stripe_obj = store->svc()->rados->obj(raw_obj);
- return stripe_obj.open(dpp);
+ return processor.prepare(y);
}
-int RadosWriter::process(bufferlist&& bl, uint64_t offset)
+int RadosAtomicWriter::process(bufferlist&& data, uint64_t offset)
{
- bufferlist data = std::move(bl);
- const uint64_t cost = data.length();
- if (cost == 0) { // no empty writes, use aio directly for creates
- return 0;
- }
- librados::ObjectWriteOperation op;
- if (offset == 0) {
- op.write_full(data);
- } else {
- op.write(offset, data);
- }
- constexpr uint64_t id = 0; // unused
- auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id);
- return process_completed(c, &written);
+ return processor.process(std::move(data), offset);
}
-int RadosWriter::write_exclusive(const bufferlist& data)
+int RadosAtomicWriter::complete(size_t accounted_size, const std::string& etag,
+ ceph::real_time *mtime, ceph::real_time set_mtime,
+ std::map<std::string, bufferlist>& attrs,
+ ceph::real_time delete_at,
+ const char *if_match, const char *if_nomatch,
+ const std::string *user_data,
+ rgw_zone_set *zones_trace, bool *canceled,
+ optional_yield y)
{
- const uint64_t cost = data.length();
-
- librados::ObjectWriteOperation op;
- op.create(true); // exclusive create
- op.write_full(data);
-
- constexpr uint64_t id = 0; // unused
- auto c = aio->get(stripe_obj, Aio::librados_op(std::move(op), y), cost, id);
- auto d = aio->drain();
- c.splice(c.end(), d);
- return process_completed(c, &written);
+ return processor.complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at,
+ if_match, if_nomatch, user_data, zones_trace, canceled, y);
}
-int RadosWriter::drain()
+int RadosAppendWriter::prepare(optional_yield y)
{
- return process_completed(aio->drain(), &written);
+ return processor.prepare(y);
}
-RadosWriter::~RadosWriter()
+int RadosAppendWriter::process(bufferlist&& data, uint64_t offset)
{
- // wait on any outstanding aio completions
- process_completed(aio->drain(), &written);
-
- bool need_to_remove_head = false;
- std::optional<rgw_raw_obj> raw_head;
- if (!rgw::sal::Object::empty(head_obj.get())) {
- raw_head.emplace();
- dynamic_cast<RadosObject*>(head_obj.get())->get_raw_obj(&*raw_head);
- }
+ return processor.process(std::move(data), offset);
+}
- /**
- * We should delete the object in the "multipart" namespace to avoid race condition.
- * Such race condition is caused by the fact that the multipart object is the gatekeeper of a multipart
- * upload, when it is deleted, a second upload would start with the same suffix("2/"), therefore, objects
- * written by the second upload may be deleted by the first upload.
- * details is describled on #11749
- *
- * The above comment still stands, but instead of searching for a specific object in the multipart
- * namespace, we just make sure that we remove the object that is marked as the head object after
- * we remove all the other raw objects. Note that we use different call to remove the head object,
- * as this one needs to go via the bucket index prepare/complete 2-phase commit scheme.
- */
- for (const auto& obj : written) {
- if (raw_head && obj == *raw_head) {
- ldpp_dout(dpp, 5) << "NOTE: we should not process the head object (" << obj << ") here" << dendl;
- need_to_remove_head = true;
- continue;
- }
+int RadosAppendWriter::complete(size_t accounted_size, const std::string& etag,
+ ceph::real_time *mtime, ceph::real_time set_mtime,
+ std::map<std::string, bufferlist>& attrs,
+ ceph::real_time delete_at,
+ const char *if_match, const char *if_nomatch,
+ const std::string *user_data,
+ rgw_zone_set *zones_trace, bool *canceled,
+ optional_yield y)
+{
+ return processor.complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at,
+ if_match, if_nomatch, user_data, zones_trace, canceled, y);
+}
- int r = store->delete_raw_obj(dpp, obj);
- if (r < 0 && r != -ENOENT) {
- ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
- }
- }
+int RadosMultipartWriter::prepare(optional_yield y)
+{
+ return processor.prepare(y);
+}
- if (need_to_remove_head) {
- ldpp_dout(dpp, 5) << "NOTE: we are going to process the head obj (" << *raw_head << ")" << dendl;
- std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = head_obj->get_delete_op(&obj_ctx);
- del_op->params.bucket_owner = bucket->get_acl_owner();
+int RadosMultipartWriter::process(bufferlist&& data, uint64_t offset)
+{
+ return processor.process(std::move(data), offset);
+}
- int r = del_op->delete_obj(dpp, y);
- if (r < 0 && r != -ENOENT) {
- ldpp_dout(dpp, 0) << "WARNING: failed to remove obj (" << *raw_head << "), leaked" << dendl;
- }
- }
+int RadosMultipartWriter::complete(size_t accounted_size, const std::string& etag,
+ ceph::real_time *mtime, ceph::real_time set_mtime,
+ std::map<std::string, bufferlist>& attrs,
+ ceph::real_time delete_at,
+ const char *if_match, const char *if_nomatch,
+ const std::string *user_data,
+ rgw_zone_set *zones_trace, bool *canceled,
+ optional_yield y)
+{
+ return processor.complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at,
+ if_match, if_nomatch, user_data, zones_trace, canceled, y);
}
const RGWZoneGroup& RadosZone::get_zonegroup()
#include "rgw_oidc_provider.h"
#include "rgw_role.h"
#include "rgw_multi.h"
+#include "rgw_putobj_processor.h"
#include "services/svc_tier_rados.h"
#include "cls/lock/cls_lock_client.h"
virtual std::unique_ptr<Completions> get_completions(void) override;
virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s, rgw::notify::EventType event_type, const std::string* object_name=nullptr) override;
virtual std::unique_ptr<GCChain> get_gc_chain(rgw::sal::Object* obj) override;
- virtual std::unique_ptr<Writer> get_writer(Aio* aio, rgw::sal::Bucket* bucket,
- RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object> _head_obj,
- const DoutPrefixProvider* dpp, optional_yield y) override;
virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); }
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); }
virtual int delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj) override;
const std::string& tenant,
vector<std::unique_ptr<RGWOIDCProvider>>& providers) override;
virtual std::unique_ptr<MultipartUpload> get_multipart_upload(Bucket* bucket, const std::string& oid, std::optional<std::string> upload_id=std::nullopt, ceph::real_time mtime=real_clock::now()) override;
+ virtual std::unique_ptr<Writer> get_append_writer(const DoutPrefixProvider *dpp,
+ optional_yield y,
+ std::unique_ptr<rgw::sal::Object> _head_obj,
+ const rgw_user& owner, RGWObjectCtx& obj_ctx,
+ const rgw_placement_rule *ptail_placement_rule,
+ const std::string& unique_tag,
+ uint64_t position,
+ uint64_t *cur_accounted_size) override;
+ virtual std::unique_ptr<Writer> get_atomic_writer(const DoutPrefixProvider *dpp,
+ optional_yield y,
+ std::unique_ptr<rgw::sal::Object> _head_obj,
+ const rgw_user& owner, RGWObjectCtx& obj_ctx,
+ const rgw_placement_rule *ptail_placement_rule,
+ uint64_t olh_epoch,
+ const std::string& unique_tag) override;
virtual void finalize(void) override;
uint64_t& accounted_size, bool& compressed,
RGWCompressionInfo& cs_info, off_t& ofs) override;
virtual int get_info(const DoutPrefixProvider *dpp, optional_yield y, RGWObjectCtx* obj_ctx, rgw_placement_rule** rule, rgw::sal::Attrs* attrs = nullptr) override;
+ virtual std::unique_ptr<Writer> get_writer(const DoutPrefixProvider *dpp,
+ optional_yield y,
+ std::unique_ptr<rgw::sal::Object> _head_obj,
+ const rgw_user& owner, RGWObjectCtx& obj_ctx,
+ const rgw_placement_rule *ptail_placement_rule,
+ uint64_t part_num,
+ const std::string& part_num_str) override;
};
class MPRadosSerializer : public MPSerializer {
virtual void delete_inline(const DoutPrefixProvider *dpp, const std::string& tag) override;
};
-class RadosWriter : public Writer {
+class RadosAtomicWriter : public Writer {
+protected:
rgw::sal::RadosStore* store;
- RGWSI_RADOS::Obj stripe_obj; // current stripe object
-
- public:
- RadosWriter(Aio* aio, rgw::sal::RadosStore* _store,
- rgw::sal::Bucket* bucket,
- RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object> _head_obj,
- const DoutPrefixProvider* dpp, optional_yield y)
- : Writer(aio, bucket, obj_ctx, std::move(_head_obj), dpp, y), store(_store)
- {}
+ std::unique_ptr<Aio> aio;
+ rgw::putobj::AtomicObjectProcessor processor;
- ~RadosWriter();
+public:
+ RadosAtomicWriter(const DoutPrefixProvider *dpp,
+ optional_yield y,
+ std::unique_ptr<rgw::sal::Object> _head_obj,
+ RadosStore* _store, std::unique_ptr<Aio> _aio,
+ const rgw_user& owner, RGWObjectCtx& obj_ctx,
+ const rgw_placement_rule *ptail_placement_rule,
+ uint64_t olh_epoch,
+ const std::string& unique_tag) :
+ Writer(dpp, y),
+ store(_store),
+ aio(std::move(_aio)),
+ processor(&*aio, store,
+ ptail_placement_rule, owner, obj_ctx,
+ std::move(_head_obj), olh_epoch, unique_tag,
+ dpp, y)
+ {}
+ ~RadosAtomicWriter() = default;
+
+ // prepare to start processing object data
+ virtual int prepare(optional_yield y) override;
+
+ // Process a bufferlist
+ virtual int process(bufferlist&& data, uint64_t offset) override;
+
+ // complete the operation and make its result visible to clients
+ virtual int complete(size_t accounted_size, const std::string& etag,
+ ceph::real_time *mtime, ceph::real_time set_mtime,
+ std::map<std::string, bufferlist>& attrs,
+ ceph::real_time delete_at,
+ const char *if_match, const char *if_nomatch,
+ const std::string *user_data,
+ rgw_zone_set *zones_trace, bool *canceled,
+ optional_yield y) override;
+};
- // change the current stripe object
- virtual int set_stripe_obj(const rgw_raw_obj& obj) override;
+class RadosAppendWriter : public Writer {
+protected:
+ rgw::sal::RadosStore* store;
+ std::unique_ptr<Aio> aio;
+ rgw::putobj::AppendObjectProcessor processor;
- // write the data at the given offset of the current stripe object
- virtual int process(bufferlist&& data, uint64_t stripe_offset) override;
+public:
+ RadosAppendWriter(const DoutPrefixProvider *dpp,
+ optional_yield y,
+ std::unique_ptr<rgw::sal::Object> _head_obj,
+ RadosStore* _store, std::unique_ptr<Aio> _aio,
+ const rgw_user& owner, RGWObjectCtx& obj_ctx,
+ const rgw_placement_rule *ptail_placement_rule,
+ const std::string& unique_tag,
+ uint64_t position,
+ uint64_t *cur_accounted_size) :
+ Writer(dpp, y),
+ store(_store),
+ aio(std::move(_aio)),
+ processor(&*aio, store,
+ ptail_placement_rule, owner, obj_ctx,
+ std::move(_head_obj), unique_tag, position,
+ cur_accounted_size, dpp, y)
+ {}
+ ~RadosAppendWriter() = default;
+
+ // prepare to start processing object data
+ virtual int prepare(optional_yield y) override;
+
+ // Process a bufferlist
+ virtual int process(bufferlist&& data, uint64_t offset) override;
+
+ // complete the operation and make its result visible to clients
+ virtual int complete(size_t accounted_size, const std::string& etag,
+ ceph::real_time *mtime, ceph::real_time set_mtime,
+ std::map<std::string, bufferlist>& attrs,
+ ceph::real_time delete_at,
+ const char *if_match, const char *if_nomatch,
+ const std::string *user_data,
+ rgw_zone_set *zones_trace, bool *canceled,
+ optional_yield y) override;
+};
- // write the data as an exclusive create and wait for it to complete
- virtual int write_exclusive(const bufferlist& data) override;
+class RadosMultipartWriter : public Writer {
+protected:
+ rgw::sal::RadosStore* store;
+ std::unique_ptr<Aio> aio;
+ rgw::putobj::MultipartObjectProcessor processor;
- virtual int drain() override;
+public:
+ RadosMultipartWriter(const DoutPrefixProvider *dpp,
+ optional_yield y, MultipartUpload* upload,
+ std::unique_ptr<rgw::sal::Object> _head_obj,
+ RadosStore* _store, std::unique_ptr<Aio> _aio,
+ const rgw_user& owner, RGWObjectCtx& obj_ctx,
+ const rgw_placement_rule *ptail_placement_rule,
+ uint64_t part_num, const std::string& part_num_str) :
+ Writer(dpp, y),
+ store(_store),
+ aio(std::move(_aio)),
+ processor(&*aio, store,
+ ptail_placement_rule, owner, obj_ctx,
+ std::move(_head_obj), upload->get_upload_id(),
+ part_num, part_num_str, dpp, y)
+ {}
+ ~RadosMultipartWriter() = default;
+
+ // prepare to start processing object data
+ virtual int prepare(optional_yield y) override;
+
+ // Process a bufferlist
+ virtual int process(bufferlist&& data, uint64_t offset) override;
+
+ // complete the operation and make its result visible to clients
+ virtual int complete(size_t accounted_size, const std::string& etag,
+ ceph::real_time *mtime, ceph::real_time set_mtime,
+ std::map<std::string, bufferlist>& attrs,
+ ceph::real_time delete_at,
+ const char *if_match, const char *if_nomatch,
+ const std::string *user_data,
+ rgw_zone_set *zones_trace, bool *canceled,
+ optional_yield y) override;
};
class RadosLuaScriptManager : public LuaScriptManager {
string req_id = store->zone_unique_id(store->get_new_req_id());
- using namespace rgw::putobj;
- AtomicObjectProcessor processor(&aio, store, b.get(), nullptr,
- owner.get_id(), obj_ctx, std::move(obj), olh_epoch,
- req_id, dpp, y);
+ std::unique_ptr<rgw::sal::Writer> processor;
+ processor = store->get_atomic_writer(dpp, y, std::move(obj),
+ owner.get_id(), obj_ctx,
+ nullptr, olh_epoch, req_id);
- int ret = processor.prepare(y);
+ int ret = processor->prepare(y);
if (ret < 0)
return ret;
- DataProcessor *filter = &processor;
+ rgw::sal::DataProcessor *filter = processor.get();
CompressorRef plugin;
boost::optional<RGWPutObj_Compress> compressor;
puser_data = &(*user_data);
}
- return processor.complete(obj_size, etag,
+ return processor->complete(obj_size, etag,
&mtime, mtime,
attrs, delete_at,
nullptr, nullptr,
ceph::real_time mtime;
string etag;
- std::optional<uint64_t> olh_epoch;
+ uint64_t olh_epoch{0};
ceph::real_time delete_at;
std::optional<string> user_data;
}
};
-class ut_put_sink: public rgw::putobj::DataProcessor
+class ut_put_sink: public rgw::sal::DataProcessor
{
bufferlist sink;
public:
}
};
-class ut_put_sink: public rgw::putobj::DataProcessor
+class ut_put_sink: public rgw::sal::DataProcessor
{
std::stringstream sink;
public:
return out << "{off=" << op.offset << " data='" << op.data << "'}";
}
-struct MockProcessor : rgw::putobj::DataProcessor {
+struct MockProcessor : rgw::sal::DataProcessor {
std::vector<Op> ops;
int process(bufferlist&& data, uint64_t offset) override {