#include <string.h>
+#include <algorithm>
#include <iostream>
+#include <unordered_map>
+#include "common/async/spawn_throttle.h"
#include "common/strtol.h" // for strict_strtoll()
#include "include/types.h"
return obj;
}
+void rgw::multi_delete::dispatch(const std::vector<Item>& items,
+ bool bucket_versioned,
+ uint32_t max_aio,
+ boost::asio::yield_context yield,
+ Exec exec,
+ OnDispatch on_dispatch)
+{
+ auto group = ceph::async::spawn_throttle{yield, std::max<uint32_t>(1, max_aio)};
+
+ if (!bucket_versioned) {
+ for (size_t i = 0; i < items.size(); ++i) {
+ group.spawn([&exec, &items, i] (boost::asio::yield_context y) {
+ exec(items[i], false, y);
+ });
+ if (on_dispatch) {
+ on_dispatch();
+ }
+ }
+ group.wait();
+ return;
+ }
+
+ // Preserve first-seen order within each key group so callers can keep
+ // request/result ordering stable while coalescing intermediate OLH updates.
+ std::vector<std::vector<size_t>> grouped_items;
+ grouped_items.reserve(items.size());
+ std::unordered_map<std::string, size_t> group_index;
+ group_index.reserve(items.size());
+
+ for (size_t i = 0; i < items.size(); ++i) {
+ const auto& name = items[i].key.name;
+ auto [it, inserted] = group_index.emplace(name, grouped_items.size());
+ if (inserted) {
+ grouped_items.emplace_back();
+ }
+ grouped_items[it->second].push_back(i);
+ }
+
+ for (const auto& indexes : grouped_items) {
+ for (size_t i = 0; i + 1 < indexes.size(); ++i) {
+ const auto index = indexes[i];
+ group.spawn([&exec, &items, index] (boost::asio::yield_context y) {
+ exec(items[index], true, y);
+ });
+ if (on_dispatch) {
+ on_dispatch();
+ }
+ }
+ }
+ group.wait();
+
+ for (const auto& indexes : grouped_items) {
+ const auto index = indexes.back();
+ group.spawn([&exec, &items, index] (boost::asio::yield_context y) {
+ exec(items[index], false, y);
+ });
+ if (on_dispatch) {
+ on_dispatch();
+ }
+ }
+ group.wait();
+}
#pragma once
+#include <functional>
#include <vector>
+
+#include <boost/asio/spawn.hpp>
+
#include "rgw_xml.h"
#include "rgw_common.h"
RGWMultiDelXMLParser() {}
~RGWMultiDelXMLParser() override {}
};
+
+namespace rgw::multi_delete {
+
+struct Item {
+ rgw_obj_key key;
+ size_t index{0};
+};
+
+using Exec = std::function<void(const Item& item,
+ bool skip_update_olh,
+ boost::asio::yield_context yield)>;
+using OnDispatch = std::function<void()>;
+
+void dispatch(const std::vector<Item>& items,
+ bool bucket_versioned,
+ uint32_t max_aio,
+ boost::asio::yield_context yield,
+ Exec exec,
+ OnDispatch on_dispatch = {});
+
+} // namespace rgw::multi_delete
send_partial_response(o, del_op->result.delete_marker, del_op->result.version_id, op_ret);
}
-void RGWDeleteMultiObj::handle_versioned_objects(const std::vector<RGWMultiDelObject>& objects,
- uint32_t max_aio,
- boost::asio::yield_context yield)
-{
- auto group = ceph::async::spawn_throttle{yield, max_aio};
- std::map<std::string, std::vector<RGWMultiDelObject>> grouped_objects;
-
- // group objects by their keys
- for (const auto& object : objects) {
- const std::string& key = object.get_key();
- grouped_objects[key].push_back(object);
- }
-
- // for each group of objects, handle all but the last object and skip update_olh
- for (const auto& [_, objects] : grouped_objects) {
- for (size_t i = 0; i + 1 < objects.size(); ++i) { // skip the last element
- group.spawn([this, &objects, i] (boost::asio::yield_context yield) {
- handle_individual_object(objects[i], yield, true /* skip_olh_obj_update */);
- });
-
- rgw_flush_formatter(s, s->formatter);
- }
- }
- group.wait();
-
- // Now handle the last object of each group with update_olh
- for (const auto& [_, objects] : grouped_objects) {
- const auto& object = objects.back();
- group.spawn([this, &object] (boost::asio::yield_context yield) {
- handle_individual_object(object, yield);
- });
-
- rgw_flush_formatter(s, s->formatter);
- }
- group.wait();
-}
-
-void RGWDeleteMultiObj::handle_non_versioned_objects(const std::vector<RGWMultiDelObject>& objects,
- uint32_t max_aio,
- boost::asio::yield_context yield)
-{
- auto group = ceph::async::spawn_throttle{yield, max_aio};
-
- for (const auto& object : objects) {
- group.spawn([this, &object] (boost::asio::yield_context yield) {
- handle_individual_object(object, yield);
- });
-
- rgw_flush_formatter(s, s->formatter);
- }
- group.wait();
-}
-
void RGWDeleteMultiObj::handle_objects(const std::vector<RGWMultiDelObject>& objects,
uint32_t max_aio,
boost::asio::yield_context yield)
{
- if (bucket->versioned()) {
- handle_versioned_objects(objects, max_aio, yield);
- } else {
- handle_non_versioned_objects(objects, max_aio, yield);
+ std::vector<rgw::multi_delete::Item> items;
+ items.reserve(objects.size());
+
+ for (size_t i = 0; i < objects.size(); ++i) {
+ items.push_back(rgw::multi_delete::Item{
+ rgw_obj_key(objects[i].get_key(), objects[i].get_version_id()),
+ i,
+ });
}
+
+ rgw::multi_delete::dispatch(
+ items,
+ bucket->versioned(),
+ max_aio,
+ yield,
+ [this, &objects] (const rgw::multi_delete::Item& item,
+ bool skip_update_olh,
+ boost::asio::yield_context y) {
+ handle_individual_object(objects[item.index], y, skip_update_olh);
+ },
+ [this] {
+ rgw_flush_formatter(s, s->formatter);
+ });
}
void RGWDeleteMultiObj::execute(optional_yield y)
optional_yield y,
const bool skip_olh_obj_update = false);
- void handle_versioned_objects(const std::vector<RGWMultiDelObject>& objects,
- uint32_t max_aio, boost::asio::yield_context yield);
- void handle_non_versioned_objects(const std::vector<RGWMultiDelObject>& objects,
- uint32_t max_aio, boost::asio::yield_context yield);
void handle_objects(const std::vector<RGWMultiDelObject>& objects,
uint32_t max_aio, boost::asio::yield_context yield);