*
*/
+#include "rgw_putobj_aio.h"
#include "rgw_putobj_processor.h"
+#define dout_subsys ceph_subsys_rgw
+
namespace rgw::putobj {
int HeadObjectProcessor::process(bufferlist&& data, uint64_t logical_offset)
return processor->process(std::move(data), write_offset);
}
+
+static int process_completed(const ResultList& completed, RawObjSet *written)
+{
+ std::optional<int> error;
+ for (auto& r : completed) {
+ if (r.result >= 0) {
+ written->insert(r.obj);
+ } else if (!error) { // record first error code
+ error = r.result;
+ }
+ }
+ return error.value_or(0);
+}
+
+int RadosWriter::set_stripe_obj(rgw_raw_obj&& obj)
+{
+ rgw_rados_ref ref;
+ int r = store->get_raw_obj_ref(obj, &ref);
+ if (r < 0) {
+ return r;
+ }
+ stripe_obj = std::move(obj);
+ stripe_ref = std::move(ref);
+ return 0;
+}
+
+int RadosWriter::process(bufferlist&& bl, uint64_t offset)
+{
+ bufferlist data = std::move(bl);
+ const uint64_t cost = data.length();
+ if (cost == 0) { // no empty writes, use aio directly for creates
+ return 0;
+ }
+ librados::ObjectWriteOperation op;
+ if (offset == 0) {
+ op.write_full(data);
+ } else {
+ op.write(offset, data);
+ }
+ auto c = aio->submit(stripe_ref, stripe_obj, &op, cost);
+ return process_completed(c, &written);
+}
+
+int RadosWriter::write_exclusive(const bufferlist& data)
+{
+ const uint64_t cost = data.length();
+
+ librados::ObjectWriteOperation op;
+ op.create(true); // exclusive create
+ op.write_full(data);
+
+ auto c = aio->submit(stripe_ref, stripe_obj, &op, cost);
+ auto d = aio->drain();
+ c.splice(c.end(), d);
+ return process_completed(c, &written);
+}
+
+int RadosWriter::drain()
+{
+ return process_completed(aio->drain(), &written);
+}
+
+RadosWriter::~RadosWriter()
+{
+ // wait on any outstanding aio completions
+ process_completed(aio->drain(), &written);
+
+ bool need_to_remove_head = false;
+ std::optional<rgw_raw_obj> raw_head;
+ if (!head_obj.empty()) {
+ raw_head.emplace();
+ store->obj_to_raw(bucket_info.placement_rule, head_obj, &*raw_head);
+ }
+
+ /**
+ * We should delete the object in the "multipart" namespace to avoid race condition.
+ * Such race condition is caused by the fact that the multipart object is the gatekeeper of a multipart
+ * upload, when it is deleted, a second upload would start with the same suffix("2/"), therefore, objects
+ * written by the second upload may be deleted by the first upload.
+ * details is describled on #11749
+ *
+ * The above comment still stands, but instead of searching for a specific object in the multipart
+ * namespace, we just make sure that we remove the object that is marked as the head object after
+ * we remove all the other raw objects. Note that we use different call to remove the head object,
+ * as this one needs to go via the bucket index prepare/complete 2-phase commit scheme.
+ */
+ for (const auto& obj : written) {
+ if (raw_head && obj == *raw_head) {
+ ldout(store->ctx(), 5) << "NOTE: we should not process the head object (" << obj << ") here" << dendl;
+ need_to_remove_head = true;
+ continue;
+ }
+
+ int r = store->delete_raw_obj(obj);
+ if (r < 0 && r != -ENOENT) {
+ ldout(store->ctx(), 5) << "WARNING: failed to remove obj (" << obj << "), leaked" << dendl;
+ }
+ }
+
+ if (need_to_remove_head) {
+ ldout(store->ctx(), 5) << "NOTE: we are going to process the head obj (" << *raw_head << ")" << dendl;
+ int r = store->delete_obj(obj_ctx, bucket_info, head_obj, 0, 0);
+ if (r < 0 && r != -ENOENT) {
+ ldout(store->ctx(), 0) << "WARNING: failed to remove obj (" << *raw_head << "), leaked" << dendl;
+ }
+ }
+}
+
} // namespace rgw::putobj
int process(bufferlist&& data, uint64_t logical_offset) final override;
};
+
+class Aio;
+using RawObjSet = std::set<rgw_raw_obj>;
+
+// a data sink that writes to rados objects and deletes them on cancelation
+class RadosWriter : public DataProcessor {
+ Aio *const aio;
+ RGWRados *const store;
+ const RGWBucketInfo& bucket_info;
+ RGWObjectCtx& obj_ctx;
+ const rgw_obj& head_obj;
+ rgw_rados_ref stripe_ref; // current stripe ref
+ rgw_raw_obj stripe_obj; // current stripe object
+ RawObjSet written; // set of written objects for deletion
+
+ public:
+ RadosWriter(Aio *aio, RGWRados *store, const RGWBucketInfo& bucket_info,
+ RGWObjectCtx& obj_ctx, const rgw_obj& head_obj)
+ : aio(aio), store(store), bucket_info(bucket_info),
+ obj_ctx(obj_ctx), head_obj(head_obj)
+ {}
+ ~RadosWriter();
+
+ // change the current stripe object
+ int set_stripe_obj(rgw_raw_obj&& obj);
+
+ // write the data at the given offset of the current stripe object
+ int process(bufferlist&& data, uint64_t stripe_offset) override;
+
+ // write the data as an exclusive create and wait for it to complete
+ int write_exclusive(const bufferlist& data);
+
+ int drain();
+
+ // when the operation completes successfully, clear the set of written objects
+ // so they aren't deleted on destruction
+ void clear_written() { written.clear(); }
+};
+
} // namespace rgw::putobj