ldpp_dout(dpp, 5) << "Copy object " << src_obj.bucket << ":" << src_obj.get_oid() << " => " << dest_obj.bucket << ":" << dest_obj.get_oid() << dendl;
if (remote_src || !source_zone.empty()) {
- // null_yield resolves a crash when calling progress_cb(), because the beast
- // frontend tried to use this same yield context to write the progress
- // response to the frontend socket. call fetch_remote_obj() synchronously so
- // that only one thread tries to suspend that coroutine
- const req_context rctx{dpp, null_yield, nullptr};
- const rgw_owner remote_user_owner(remote_user);
- return fetch_remote_obj(dest_obj_ctx, &remote_user_owner, &remote_user, info, source_zone,
- dest_obj, src_obj, dest_bucket_info, &src_bucket_info,
- dest_placement, src_mtime, mtime, mod_ptr,
- unmod_ptr, high_precision_time,
- if_match, if_nomatch, attrs_mod, copy_if_newer, attrs, category,
- olh_epoch, delete_at, ptag, petag, progress_cb, progress_data, rctx,
- nullptr /* filter */, stat_follow_olh, stat_dest_obj, std::nullopt);
+ RGWCopyObj *op = static_cast<RGWCopyObj *>(progress_data);
+ auto& progress_tracker = op->get_progress_tracker();
+
+ int ret = 0;
+ boost::asio::spawn(driver->get_io_context(),
+ [&](boost::asio::yield_context yield) {
+ const req_context rctx{dpp, yield, nullptr};
+ const rgw_owner remote_user_owner(remote_user);
+ ret = fetch_remote_obj(dest_obj_ctx, &remote_user_owner, &remote_user, info, source_zone,
+ dest_obj, src_obj, dest_bucket_info, &src_bucket_info,
+ dest_placement, src_mtime, mtime, mod_ptr,
+ unmod_ptr, high_precision_time,
+ if_match, if_nomatch, attrs_mod, copy_if_newer, attrs, category,
+ olh_epoch, delete_at, ptag, petag, progress_cb, progress_data, rctx,
+ nullptr /* filter */, stat_follow_olh, stat_dest_obj, std::nullopt);
+
+ progress_tracker.done = true;
+ progress_tracker.cv.notify_one();
+ }, [] (std::exception_ptr eptr) {
+ if (eptr) std::rethrow_exception(eptr);
+ });
+
+ op->progress_cb_handler();
+
+ return ret;
}
map<string, bufferlist> src_attrs;
return;
}
- send_partial_response(ofs);
+ std::lock_guard<std::mutex> l(progress_tracker->mtx);
+ progress_tracker->ofs_queue.push(ofs);
+ progress_tracker->cv.notify_one();
last_ofs = ofs;
}
+void RGWCopyObj::progress_cb_handler()
+{
+ std::unique_lock<std::mutex> l(progress_tracker->mtx);
+ while(!progress_tracker->done || !progress_tracker->ofs_queue.empty()) {
+ progress_tracker->cv.wait(l, [&]() {
+ return progress_tracker->done || !progress_tracker->ofs_queue.empty();
+ });
+
+ while (!progress_tracker->ofs_queue.empty()) {
+ auto ofs = progress_tracker->ofs_queue.front();
+ progress_tracker->ofs_queue.pop();
+ l.unlock();
+ send_partial_response(ofs);
+ l.lock();
+ }
+ }
+}
+
void RGWCopyObj::pre_exec()
{
rgw_bucket_object_pre_exec(s);
RGWObjectRetention *obj_retention;
RGWObjectLegalHold *obj_legal_hold;
+ // remote copy progress helper
+ struct ProgressTracker {
+ std::mutex mtx;
+ std::condition_variable cv;
+ std::queue<off_t> ofs_queue;
+ std::atomic<bool> done{false};
+
+ ProgressTracker() {}
+ };
+ std::optional<ProgressTracker> progress_tracker;
+
int init_common();
public:
void pre_exec() override;
void execute(optional_yield y) override;
void progress_cb(off_t ofs);
+ void progress_cb_handler();
+ ProgressTracker& get_progress_tracker() {
+ if (!progress_tracker) {
+ progress_tracker.emplace();
+ }
+ return *progress_tracker;
+ }
virtual int check_storage_class(const rgw_placement_rule& src_placement) {
return 0;