The only ones remaining are in calls from top-level threads.
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
#include <vector>
+#include "common/async/yield_context.h"
#include "common/debug.h"
#include "common/containers.h"
#include "common/errno.h"
cls_log_add_prepare_entry(e, utime_t(ut), {}, key, entry);
std::get<centries>(out).push_back(std::move(e));
}
- int push(const DoutPrefixProvider *dpp, int index, entries&& items) override {
+ int push(const DoutPrefixProvider *dpp, int index, entries&& items, optional_yield y) override {
lr::ObjectWriteOperation op;
cls_log_add(op, std::get<centries>(items), true);
- auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, null_yield);
+ auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, y);
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
<< ": failed to push to " << oids[index] << cpp_strerror(-r)
return r;
}
int push(const DoutPrefixProvider *dpp, int index, ceph::real_time now,
- const std::string& key,
- ceph::buffer::list&& bl) override {
+ const std::string& key, ceph::buffer::list&& bl,
+ optional_yield y) override {
lr::ObjectWriteOperation op;
cls_log_add(op, utime_t(now), {}, key, bl);
- auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, null_yield);
+ auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, y);
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
<< ": failed to push to " << oids[index]
int list(const DoutPrefixProvider *dpp, int index, int max_entries,
std::vector<rgw_data_change_log_entry>& entries,
std::optional<std::string_view> marker,
- std::string* out_marker, bool* truncated) override {
+ std::string* out_marker, bool* truncated,
+ optional_yield y) override {
std::list<cls_log_entry> log_entries;
lr::ObjectReadOperation op;
cls_log_list(op, {}, {}, std::string(marker.value_or("")),
max_entries, log_entries, out_marker, truncated);
- auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, nullptr, null_yield);
+ auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, nullptr, y);
if (r == -ENOENT) {
*truncated = false;
return 0;
}
return 0;
}
- int get_info(const DoutPrefixProvider *dpp, int index, RGWDataChangesLogInfo *info) override {
+ int get_info(const DoutPrefixProvider *dpp, int index,
+ RGWDataChangesLogInfo *info, optional_yield y) override {
cls_log_header header;
lr::ObjectReadOperation op;
cls_log_info(op, &header);
- auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, nullptr, null_yield);
+ auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, nullptr, y);
if (r == -ENOENT) r = 0;
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
}
return r;
}
- int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker) override {
+ int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker,
+ optional_yield y) override {
lr::ObjectWriteOperation op;
cls_log_trim(op, {}, {}, {}, std::string(marker));
- auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, null_yield);
+ auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, y);
if (r == -ENOENT) r = -ENODATA;
if (r < 0 && r != -ENODATA) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
std::string_view max_marker() const override {
return "99999999";
}
- int is_empty(const DoutPrefixProvider *dpp) override {
+ int is_empty(const DoutPrefixProvider *dpp, optional_yield y) override {
for (auto shard = 0u; shard < oids.size(); ++shard) {
std::list<cls_log_entry> log_entries;
lr::ObjectReadOperation op;
std::string out_marker;
bool truncated;
cls_log_list(op, {}, {}, {}, 1, log_entries, &out_marker, &truncated);
- auto r = rgw_rados_operate(dpp, ioctx, oids[shard], &op, nullptr, null_yield);
+ auto r = rgw_rados_operate(dpp, ioctx, oids[shard], &op, nullptr, y);
if (r == -ENOENT) {
continue;
}
}
std::get<centries>(out).push_back(std::move(entry));
}
- int push(const DoutPrefixProvider *dpp, int index, entries&& items) override {
- auto r = fifos[index].push(dpp, std::get<centries>(items), null_yield);
+ int push(const DoutPrefixProvider *dpp, int index, entries&& items,
+ optional_yield y) override {
+ auto r = fifos[index].push(dpp, std::get<centries>(items), y);
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
<< ": unable to push to FIFO: " << get_oid(index)
return r;
}
int push(const DoutPrefixProvider *dpp, int index, ceph::real_time,
- const std::string&,
- ceph::buffer::list&& bl) override {
- auto r = fifos[index].push(dpp, std::move(bl), null_yield);
+ const std::string&, ceph::buffer::list&& bl,
+ optional_yield y) override {
+ auto r = fifos[index].push(dpp, std::move(bl), y);
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
<< ": unable to push to FIFO: " << get_oid(index)
}
int list(const DoutPrefixProvider *dpp, int index, int max_entries,
std::vector<rgw_data_change_log_entry>& entries,
- std::optional<std::string_view> marker,
- std::string* out_marker, bool* truncated) override {
+ std::optional<std::string_view> marker, std::string* out_marker,
+ bool* truncated, optional_yield y) override {
std::vector<rgw::cls::fifo::list_entry> log_entries;
bool more = false;
auto r = fifos[index].list(dpp, max_entries, marker, &log_entries, &more,
- null_yield);
+ y);
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
<< ": unable to list FIFO: " << get_oid(index)
}
return 0;
}
- int get_info(const DoutPrefixProvider *dpp, int index, RGWDataChangesLogInfo *info) override {
+ int get_info(const DoutPrefixProvider *dpp, int index,
+ RGWDataChangesLogInfo *info, optional_yield y) override {
auto& fifo = fifos[index];
- auto r = fifo.read_meta(dpp, null_yield);
+ auto r = fifo.read_meta(dpp, y);
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
<< ": unable to get FIFO metadata: " << get_oid(index)
return r;
}
rados::cls::fifo::info m;
- fifo.meta(dpp, m, null_yield);
+ fifo.meta(dpp, m, y);
auto p = m.head_part_num;
if (p < 0) {
info->marker = "";
return 0;
}
rgw::cls::fifo::part_info h;
- r = fifo.get_part_info(dpp, p, &h, null_yield);
+ r = fifo.get_part_info(dpp, p, &h, y);
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
<< ": unable to get part info: " << get_oid(index) << "/" << p
info->last_update = h.max_time;
return 0;
}
- int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker) override {
- auto r = fifos[index].trim(dpp, marker, false, null_yield);
+ int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker,
+ optional_yield y) override {
+ auto r = fifos[index].trim(dpp, marker, false, y);
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
<< ": unable to trim FIFO: " << get_oid(index)
if (marker == rgw::cls::fifo::marker(0, 0).to_string()) {
rgw_complete_aio_completion(c, -ENODATA);
} else {
+ // This null_yield is used for lazily opening FIFOs.
+ //
+ // shouldn't exist, but it can't be eliminated
+ // since your caller is an RGWCoroutine in the data sync code.
+ //
+ // It can be eliminated after Reef when we can get rid of
+ // AioCompletion entirely.
fifos[index].trim(dpp, marker, false, c, null_yield);
}
return r;
rgw::cls::fifo::marker::max().to_string();
return std::string_view(mm);
}
- int is_empty(const DoutPrefixProvider *dpp) override {
+ int is_empty(const DoutPrefixProvider *dpp, optional_yield y) override {
std::vector<rgw::cls::fifo::list_entry> log_entries;
bool more = false;
for (auto shard = 0u; shard < fifos.size(); ++shard) {
- auto r = fifos[shard].list(dpp, 1, {}, &log_entries, &more,
- null_yield);
+ auto r = fifos[shard].list(dpp, 1, {}, &log_entries, &more, y);
if (r < 0) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
<< ": unable to list FIFO: " << get_oid(shard)
return -r;
}
+ // This null_yield is in startup code, so it doesn't matter that much.
auto besr = logback_generations::init<DataLogBackends>(
dpp, ioctx, metadata_log_oid(), [this](uint64_t gen_id, int shard) {
return get_oid(gen_id, shard);
auto now = real_clock::now();
- auto ret = be->push(dpp, index, std::move(entries));
+ // This null_yield can stay (for now) as we're in our own thread.
+ auto ret = be->push(dpp, index, std::move(entries), null_yield);
if (ret < 0) {
/* we don't really need to have a special handling for failed cases here,
* as this is just an optimization. */
int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp,
const RGWBucketInfo& bucket_info,
const rgw::bucket_log_layout_generation& gen,
- int shard_id)
+ int shard_id, optional_yield y)
{
auto& bucket = bucket_info.bucket;
- if (!filter_bucket(dpp, bucket, null_yield)) {
+ if (!filter_bucket(dpp, bucket, y)) {
return 0;
}
ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
auto be = bes->head();
- ret = be->push(dpp, index, now, change.key, std::move(bl));
+ ret = be->push(dpp, index, now, change.key, std::move(bl), y);
now = real_clock::now();
int DataLogBackends::list(const DoutPrefixProvider *dpp, int shard, int max_entries,
std::vector<rgw_data_change_log_entry>& entries,
- std::string_view marker,
- std::string* out_marker,
- bool* truncated)
+ std::string_view marker, std::string* out_marker,
+ bool* truncated, optional_yield y)
{
const auto [start_id, start_cursor] = cursorgen(marker);
auto gen_id = start_id;
gen_id = be->gen_id;
auto r = be->list(dpp, shard, max_entries, gentries,
gen_id == start_id ? start_cursor : std::string{},
- &out_cursor, truncated);
+ &out_cursor, truncated, y);
if (r < 0)
return r;
int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries,
std::vector<rgw_data_change_log_entry>& entries,
std::string_view marker,
- std::string* out_marker, bool* truncated)
+ std::string* out_marker, bool* truncated,
+ optional_yield y)
{
assert(shard < num_shards);
- return bes->list(dpp, shard, max_entries, entries, marker, out_marker, truncated);
+ return bes->list(dpp, shard, max_entries, entries, marker, out_marker,
+ truncated, y);
}
int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, int max_entries,
std::vector<rgw_data_change_log_entry>& entries,
- LogMarker& marker, bool *ptruncated)
+ LogMarker& marker, bool *ptruncated,
+ optional_yield y)
{
bool truncated;
entries.clear();
for (; marker.shard < num_shards && int(entries.size()) < max_entries;
marker.shard++, marker.marker.clear()) {
int ret = list_entries(dpp, marker.shard, max_entries - entries.size(),
- entries, marker.marker, NULL, &truncated);
+ entries, marker.marker, NULL, &truncated, y);
if (ret == -ENOENT) {
continue;
}
return 0;
}
-int RGWDataChangesLog::get_info(const DoutPrefixProvider *dpp, int shard_id, RGWDataChangesLogInfo *info)
+int RGWDataChangesLog::get_info(const DoutPrefixProvider *dpp, int shard_id,
+ RGWDataChangesLogInfo *info, optional_yield y)
{
assert(shard_id < num_shards);
auto be = bes->head();
- auto r = be->get_info(dpp, shard_id, info);
+ auto r = be->get_info(dpp, shard_id, info, y);
if (!info->marker.empty()) {
info->marker = gencursor(be->gen_id, info->marker);
}
return r;
}
-int DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker)
+int DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id,
+ std::string_view marker, optional_yield y)
{
auto [target_gen, cursor] = cursorgen(marker);
std::unique_lock l(m);
be = upper_bound(be->gen_id)->second) {
l.unlock();
auto c = be->gen_id == target_gen ? cursor : be->max_marker();
- r = be->trim(dpp, shard_id, c);
+ r = be->trim(dpp, shard_id, c, y);
if (r == -ENOENT)
r = -ENODATA;
if (r == -ENODATA && be->gen_id < target_gen)
return r;
}
-int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker)
+int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id,
+ std::string_view marker, optional_yield y)
{
assert(shard_id < num_shards);
- return bes->trim_entries(dpp, shard_id, marker);
+ return bes->trim_entries(dpp, shard_id, marker, y);
}
class GenTrim : public rgw::cls::fifo::Completion<GenTrim> {
be->trim(dpp, shard_id, cc, GenTrim::call(std::move(gt)));
}
-int DataLogBackends::trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through) {
+int DataLogBackends::trim_generations(const DoutPrefixProvider *dpp,
+ std::optional<uint64_t>& through,
+ optional_yield y) {
if (size() != 1) {
std::vector<mapped_type> candidates;
{
std::optional<uint64_t> highest;
for (auto& be : candidates) {
- auto r = be->is_empty(dpp);
+ auto r = be->is_empty(dpp, y);
if (r < 0) {
return r;
} else if (r == 1) {
if (!highest) {
return 0;
}
- auto ec = empty_to(dpp, *highest, null_yield);
+ auto ec = empty_to(dpp, *highest, y);
if (ec) {
return ceph::from_error_code(ec);
}
}
- return ceph::from_error_code(remove_empty(dpp, null_yield));
+ return ceph::from_error_code(remove_empty(dpp, y));
}
if (run == runs_per_prune) {
std::optional<uint64_t> through;
ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: pruning old generations" << dendl;
- trim_generations(&dp, through);
+ // This null_yield can stay, for now, as it's in its own thread.
+ trim_generations(&dp, through, null_yield);
if (r < 0) {
derr << "RGWDataChangesLog::ChangesRenewThread: failed pruning r="
<< r << dendl;
return ceph::from_error_code(bes->new_backing(dpp, type, y));
}
-int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through) {
- return bes->trim_generations(dpp, through);
+int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp,
+ std::optional<uint64_t>& through,
+ optional_yield y) {
+ return bes->trim_generations(dpp, through, y);
}
void RGWDataChangesLogInfo::dump(Formatter *f) const
#include <fmt/format.h>
+#include "common/async/yield_context.h"
#include "include/buffer.h"
#include "include/encoding.h"
#include "include/function2.hpp"
}
int list(const DoutPrefixProvider *dpp, int shard, int max_entries,
std::vector<rgw_data_change_log_entry>& entries,
- std::string_view marker,
- std::string* out_marker, bool* truncated);
- int trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker);
+ std::string_view marker, std::string* out_marker, bool* truncated,
+ optional_yield y);
+ int trim_entries(const DoutPrefixProvider *dpp, int shard_id,
+ std::string_view marker, optional_yield y);
void trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker,
librados::AioCompletion* c);
void set_zero(RGWDataChangesBE* be) {
bs::error_code handle_new_gens(entries_t e) noexcept override;
bs::error_code handle_empty_to(uint64_t new_tail) noexcept override;
- int trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through);
+ int trim_generations(const DoutPrefixProvider *dpp,
+ std::optional<uint64_t>& through,
+ optional_yield y);
};
struct BucketGen {
librados::Rados* lr);
int choose_oid(const rgw_bucket_shard& bs);
int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info,
- const rgw::bucket_log_layout_generation& gen, int shard_id);
+ const rgw::bucket_log_layout_generation& gen, int shard_id,
+ optional_yield y);
int get_log_shard_id(rgw_bucket& bucket, int shard_id);
int list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries,
std::vector<rgw_data_change_log_entry>& entries,
- std::string_view marker,
- std::string* out_marker, bool* truncated);
- int trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker);
+ std::string_view marker, std::string* out_marker,
+ bool* truncated, optional_yield y);
+ int trim_entries(const DoutPrefixProvider *dpp, int shard_id,
+ std::string_view marker, optional_yield y);
int trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker,
librados::AioCompletion* c); // :(
- int get_info(const DoutPrefixProvider *dpp, int shard_id, RGWDataChangesLogInfo *info);
+ int get_info(const DoutPrefixProvider *dpp, int shard_id,
+ RGWDataChangesLogInfo *info, optional_yield y);
using LogMarker = RGWDataChangesLogMarker;
int list_entries(const DoutPrefixProvider *dpp, int max_entries,
std::vector<rgw_data_change_log_entry>& entries,
- LogMarker& marker, bool* ptruncated);
+ LogMarker& marker, bool* ptruncated,
+ optional_yield y);
void mark_modified(int shard_id, const rgw_bucket_shard& bs, uint64_t gen);
auto read_clear_modified() {
int change_format(const DoutPrefixProvider *dpp, log_type type, optional_yield y);
- int trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through);
+ int trim_generations(const DoutPrefixProvider *dpp,
+ std::optional<uint64_t>& through,
+ optional_yield y);
};
class RGWDataChangesBE : public boost::intrusive_ref_counter<RGWDataChangesBE> {
const std::string& key,
ceph::buffer::list&& entry,
entries& out) = 0;
- virtual int push(const DoutPrefixProvider *dpp, int index, entries&& items) = 0;
+ virtual int push(const DoutPrefixProvider *dpp, int index, entries&& items,
+ optional_yield y) = 0;
virtual int push(const DoutPrefixProvider *dpp, int index, ceph::real_time now,
- const std::string& key,
- ceph::buffer::list&& bl) = 0;
+ const std::string& key, ceph::buffer::list&& bl,
+ optional_yield y) = 0;
virtual int list(const DoutPrefixProvider *dpp, int shard, int max_entries,
std::vector<rgw_data_change_log_entry>& entries,
std::optional<std::string_view> marker,
- std::string* out_marker, bool* truncated) = 0;
- virtual int get_info(const DoutPrefixProvider *dpp, int index, RGWDataChangesLogInfo *info) = 0;
- virtual int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker) = 0;
- virtual int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker,
- librados::AioCompletion* c) = 0;
+ std::string* out_marker, bool* truncated,
+ optional_yield y) = 0;
+ virtual int get_info(const DoutPrefixProvider *dpp, int index,
+ RGWDataChangesLogInfo *info, optional_yield y) = 0;
+ virtual int trim(const DoutPrefixProvider *dpp, int index,
+ std::string_view marker, optional_yield y) = 0;
+ virtual int trim(const DoutPrefixProvider *dpp, int index,
+ std::string_view marker, librados::AioCompletion* c) = 0;
virtual std::string_view max_marker() const = 0;
// 1 on empty, 0 on non-empty, negative on error.
- virtual int is_empty(const DoutPrefixProvider *dpp) = 0;
+ virtual int is_empty(const DoutPrefixProvider *dpp, optional_yield y) = 0;
};
void add_datalog_entry(const DoutPrefixProvider* dpp,
RGWDataChangesLog* datalog,
const RGWBucketInfo& bucket_info,
- uint32_t shard_id)
+ uint32_t shard_id, optional_yield y)
{
const auto& logs = bucket_info.layout.logs;
if (logs.empty()) {
return;
}
- int r = datalog->add_entry(dpp, bucket_info, logs.back(), shard_id);
+ int r = datalog->add_entry(dpp, bucket_info, logs.back(), shard_id, y);
if (r < 0) {
ldpp_dout(dpp, -1) << "ERROR: failed writing data log" << dendl;
} // datalog error is not fatal
continue;
}
- add_datalog_entry(&dpp, store->svc.datalog_rados, bucket_info, bs.shard_id);
+ // This null_yield can stay, for now, since we're in our own thread
+ add_datalog_entry(&dpp, store->svc.datalog_rados, bucket_info,
+ bs.shard_id, null_yield);
}
}
}
r = index_op->complete(dpp, poolid, epoch, size, accounted_size,
meta.set_mtime, etag, content_type,
storage_class, &acl_bl,
- meta.category, meta.remove_objs, meta.user_data, meta.appendable);
+ meta.category, meta.remove_objs, y,
+ meta.user_data, meta.appendable);
tracepoint(rgw_rados, complete_exit, req_id.c_str());
if (r < 0)
goto done_cancel;
}
else {
store->quota_handler->update_stats(meta.owner, obj.bucket, (orig_exists ? 0 : 1),
- accounted_size, orig_size);
+ accounted_size, orig_size);
}
return 0;
done_cancel:
- int ret = index_op->cancel(dpp, meta.remove_objs);
+ int ret = index_op->cancel(dpp, meta.remove_objs, y);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: index_op.cancel() returned ret=" << ret << dendl;
}
}
add_datalog_entry(dpp, store->svc.datalog_rados,
- target->get_bucket_info(), bs->shard_id);
+ target->get_bucket_info(), bs->shard_id, y);
return 0;
}
RGWRados::Bucket bop(store, bucket_info);
RGWRados::Bucket::UpdateIndex index_op(&bop, obj);
-
+
index_op.set_zones_trace(params.zones_trace);
index_op.set_bilog_flags(params.bilog_flags);
tombstone_entry entry{*state};
obj_tombstone_cache->add(obj, entry);
}
- r = index_op.complete_del(dpp, poolid, ioctx.get_last_version(), state->mtime, params.remove_objs);
-
+ r = index_op.complete_del(dpp, poolid, ioctx.get_last_version(), state->mtime, params.remove_objs, y);
+
int ret = target->complete_atomic_modification(dpp);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: complete_atomic_modification returned ret=" << ret << dendl;
}
/* other than that, no need to propagate error */
} else {
- int ret = index_op.cancel(dpp, params.remove_objs);
+ int ret = index_op.cancel(dpp, params.remove_objs, y);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: index_op.cancel() returned ret=" << ret << dendl;
}
return 0;
}
-int RGWRados::delete_obj_index(const rgw_obj& obj, ceph::real_time mtime, const DoutPrefixProvider *dpp)
+int RGWRados::delete_obj_index(const rgw_obj& obj, ceph::real_time mtime,
+ const DoutPrefixProvider *dpp, optional_yield y)
{
std::string oid, key;
get_obj_bucket_and_oid_loc(obj, oid, key);
RGWRados::Bucket bop(this, bucket_info);
RGWRados::Bucket::UpdateIndex index_op(&bop, obj);
- return index_op.complete_del(dpp, -1 /* pool */, 0, mtime, NULL);
+ return index_op.complete_del(dpp, -1 /* pool */, 0, mtime, nullptr, y);
}
static void generate_fake_tag(const DoutPrefixProvider *dpp, rgw::sal::Driver* store, map<string, bufferlist>& attrset, RGWObjManifest& manifest, bufferlist& manifest_bl, bufferlist& tag_bl)
int64_t poolid = ioctx.get_id();
r = index_op.complete(dpp, poolid, epoch, state->size, state->accounted_size,
mtime, etag, content_type, storage_class, &acl_bl,
- RGWObjCategory::Main, NULL);
+ RGWObjCategory::Main, nullptr, y);
} else {
- int ret = index_op.cancel(dpp, nullptr);
+ int ret = index_op.cancel(dpp, nullptr, y);
if (ret < 0) {
ldpp_dout(dpp, 0) << "ERROR: complete_update_index_cancel() returned ret=" << ret << dendl;
}
const string& content_type, const string& storage_class,
bufferlist *acl_bl,
RGWObjCategory category,
- list<rgw_obj_index_key> *remove_objs, const string *user_data,
+ list<rgw_obj_index_key> *remove_objs,
+ optional_yield y,
+ const string *user_data,
bool appendable)
{
if (blind) {
ret = store->cls_obj_complete_add(*bs, obj, optag, poolid, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
add_datalog_entry(dpp, store->svc.datalog_rados,
- target->bucket_info, bs->shard_id);
+ target->bucket_info, bs->shard_id, y);
return ret;
}
int RGWRados::Bucket::UpdateIndex::complete_del(const DoutPrefixProvider *dpp,
int64_t poolid, uint64_t epoch,
real_time& removed_mtime,
- list<rgw_obj_index_key> *remove_objs)
+ list<rgw_obj_index_key> *remove_objs,
+ optional_yield y)
{
if (blind) {
return 0;
ret = store->cls_obj_complete_del(*bs, optag, poolid, epoch, obj, removed_mtime, remove_objs, bilog_flags, zones_trace);
add_datalog_entry(dpp, store->svc.datalog_rados,
- target->bucket_info, bs->shard_id);
+ target->bucket_info, bs->shard_id, y);
return ret;
}
int RGWRados::Bucket::UpdateIndex::cancel(const DoutPrefixProvider *dpp,
- list<rgw_obj_index_key> *remove_objs)
+ list<rgw_obj_index_key> *remove_objs,
+ optional_yield y)
{
- if (blind) {
+ if (blind) {
return 0;
}
RGWRados *store = target->get_store();
* have no way to tell that they're all caught up
*/
add_datalog_entry(dpp, store->svc.datalog_rados,
- target->bucket_info, bs->shard_id);
+ target->bucket_info, bs->shard_id, y);
return ret;
}
struct rgw_bucket_dir_entry_meta *meta,
uint64_t olh_epoch,
real_time unmod_since, bool high_precision_time,
+ optional_yield y,
rgw_zone_set *_zones_trace, bool log_data_change)
{
rgw_rados_ref ref;
return r;
}
- add_datalog_entry(dpp, svc.datalog_rados, bucket_info, bs.shard_id);
+ add_datalog_entry(dpp, svc.datalog_rados, bucket_info, bs.shard_id, y);
return 0;
}
}
ret = bucket_index_link_olh(dpp, bucket_info, *state, target_obj->get_obj(),
delete_marker, op_tag, meta, olh_epoch, unmod_since,
- high_precision_time, zones_trace, log_data_change);
+ high_precision_time, y, zones_trace, log_data_change);
if (ret < 0) {
ldpp_dout(dpp, 20) << "bucket_index_link_olh() target_obj=" << target_obj << " delete_marker=" << (int)delete_marker << " returned " << ret << dendl;
if (ret == -ECANCELED) {
if (loc.key.ns == RGW_OBJ_NS_MULTIPART) {
ldout_bitx(bitx, dpp, 10) << "INFO: " << __func__ << " removing manifest part from index loc=" << loc << dendl_bitx;
- r = delete_obj_index(loc, astate->mtime, dpp);
+ r = delete_obj_index(loc, astate->mtime, dpp, y);
if (r < 0) {
ldout_bitx(bitx, dpp, 0) <<
"WARNING: " << __func__ << ": delete_obj_index returned r=" << r << dendl_bitx;
handles.push_back(c);
if (keep_index_consistent) {
- ret = delete_obj_index(obj, astate->mtime, dpp);
+ ret = delete_obj_index(obj, astate->mtime, dpp, y);
if (ret < 0) {
ldpp_dout(dpp, -1) << "ERROR: failed to delete obj index with ret=" << ret << dendl;
return ret;
const std::string& etag, const std::string& content_type,
const std::string& storage_class,
bufferlist *acl_bl, RGWObjCategory category,
- std::list<rgw_obj_index_key> *remove_objs, const std::string *user_data = nullptr, bool appendable = false);
+ std::list<rgw_obj_index_key> *remove_objs,
+ optional_yield y,
+ const std::string *user_data = nullptr,
+ bool appendable = false);
int complete_del(const DoutPrefixProvider *dpp,
int64_t poolid, uint64_t epoch,
ceph::real_time& removed_mtime, /* mtime of removed object */
- std::list<rgw_obj_index_key> *remove_objs);
+ std::list<rgw_obj_index_key> *remove_objs,
+ optional_yield y);
int cancel(const DoutPrefixProvider *dpp,
- std::list<rgw_obj_index_key> *remove_objs);
+ std::list<rgw_obj_index_key> *remove_objs,
+ optional_yield y);
const std::string *get_optag() { return &optag; }
int delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj);
/** Remove an object from the bucket index */
- int delete_obj_index(const rgw_obj& obj, ceph::real_time mtime, const DoutPrefixProvider *dpp);
+ int delete_obj_index(const rgw_obj& obj, ceph::real_time mtime,
+ const DoutPrefixProvider *dpp, optional_yield y);
/**
* Set an attr on an object.
const std::string& op_tag, struct rgw_bucket_dir_entry_meta *meta,
uint64_t olh_epoch,
ceph::real_time unmod_since, bool high_precision_time,
+ optional_yield y,
rgw_zone_set *zones_trace = nullptr,
bool log_data_change = false);
int bucket_index_unlink_instance(const DoutPrefixProvider *dpp,
// generation, and eventually transition to the next
// TODO: use a log layout to support types other than BucketLogType::InIndex
for (uint32_t shard_id = 0; shard_id < prev.current_index.layout.normal.num_shards; ++shard_id) {
- ret = store->svc()->datalog_rados->add_entry(dpp, bucket_info, prev.logs.back(), shard_id);
+ // This null_yield can stay, for now, since we're in our own thread
+ ret = store->svc()->datalog_rados->add_entry(dpp, bucket_info, prev.logs.back(), shard_id,
+ null_yield);
if (ret < 0) {
ldpp_dout(dpp, 1) << "WARNING: failed writing data log (bucket_info.bucket="
<< bucket_info.bucket << ", shard_id=" << shard_id << "of generation="
// Note that last_marker is updated to be the marker of the last
// entry listed
- op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->datalog_rados->list_entries(this, shard_id,
- max_entries, entries,
- marker, &last_marker,
- &truncated);
+ op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->
+ datalog_rados->list_entries(this, shard_id, max_entries, entries,
+ marker, &last_marker, &truncated, y);
}
void RGWOp_DATALog_List::send_response() {
return;
}
- op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->datalog_rados->get_info(this, shard_id, &info);
+ op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->
+ datalog_rados->get_info(this, shard_id, &info, y);
}
void RGWOp_DATALog_ShardInfo::send_response() {
return;
}
- op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->datalog_rados->trim_entries(this, shard_id, marker);
+ op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->
+ datalog_rados->trim_entries(this, shard_id, marker, y);
}
// not in header to avoid pulling in rgw_sync.h
if (specified_shard_id) {
ret = datalog_svc->list_entries(dpp(), shard_id, max_entries - count,
entries, marker,
- &marker, &truncated);
+ &marker, &truncated,
+ null_yield);
} else {
ret = datalog_svc->list_entries(dpp(), max_entries - count, entries,
- log_marker, &truncated);
+ log_marker, &truncated, null_yield);
}
if (ret < 0) {
cerr << "ERROR: datalog_svc->list_entries(): " << cpp_strerror(-ret) << std::endl;
list<cls_log_entry> entries;
RGWDataChangesLogInfo info;
- static_cast<rgw::sal::RadosStore*>(driver)->svc()->datalog_rados->get_info(dpp(), i, &info);
+ static_cast<rgw::sal::RadosStore*>(driver)->svc()->
+ datalog_rados->get_info(dpp(), i, &info, null_yield);
::encode_json("info", info, formatter.get());
}
auto datalog = static_cast<rgw::sal::RadosStore*>(driver)->svc()->datalog_rados;
- ret = datalog->trim_entries(dpp(), shard_id, marker);
+ ret = datalog->trim_entries(dpp(), shard_id, marker, null_yield);
if (ret < 0 && ret != -ENODATA) {
cerr << "ERROR: trim_entries(): " << cpp_strerror(-ret) << std::endl;
if (opt_cmd == OPT::DATALOG_PRUNE) {
auto datalog = static_cast<rgw::sal::RadosStore*>(driver)->svc()->datalog_rados;
std::optional<uint64_t> through;
- ret = datalog->trim_generations(dpp(), through);
+ ret = datalog->trim_generations(dpp(), through, null_yield);
if (ret < 0) {
cerr << "ERROR: trim_generations(): " << cpp_strerror(-ret) << std::endl;
virtual int handle_overwrite(const DoutPrefixProvider *dpp,
const RGWBucketInfo& info,
- const RGWBucketInfo& orig_info) = 0;
+ const RGWBucketInfo& orig_info,
+ optional_yield y) = 0;
};
-
return 0;
}
-int RGWSI_BucketIndex_RADOS::handle_overwrite(const DoutPrefixProvider *dpp,
+int RGWSI_BucketIndex_RADOS::handle_overwrite(const DoutPrefixProvider *dpp,
const RGWBucketInfo& info,
- const RGWBucketInfo& orig_info)
+ const RGWBucketInfo& orig_info,
+ optional_yield y)
{
bool new_sync_enabled = info.datasync_flag_enabled();
bool old_sync_enabled = orig_info.datasync_flag_enabled();
}
for (int i = 0; i < shards_num; ++i) {
- ret = svc.datalog_rados->add_entry(dpp, info, bilog, i);
+ ret = svc.datalog_rados->add_entry(dpp, info, bilog, i, y);
if (ret < 0) {
ldpp_dout(dpp, -1) << "ERROR: failed writing data log (info.bucket=" << info.bucket << ", shard_id=" << i << ")" << dendl;
} // datalog error is not fatal
std::list<cls_rgw_bucket_instance_entry> *status);
int handle_overwrite(const DoutPrefixProvider *dpp, const RGWBucketInfo& info,
- const RGWBucketInfo& orig_info) override;
+ const RGWBucketInfo& orig_info,
+ optional_yield y) override;
int open_bucket_index_shard(const DoutPrefixProvider *dpp,
const RGWBucketInfo& bucket_info,
}
if (orig_info && *orig_info && !exclusive) {
- int r = svc.bi->handle_overwrite(dpp, info, *(orig_info.value()));
+ int r = svc.bi->handle_overwrite(dpp, info, *(orig_info.value()), y);
if (r < 0) {
ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): svc.bi->handle_overwrite() of key=" << key << " returned r=" << r << dendl;
return r;