}
void decode(bufferlist::const_iterator& bl) {
- DECODE_START(2, bl);
+ DECODE_START(3, bl);
decode(event, bl);
decode(push_endpoint, bl);
decode(push_endpoint_args, bl);
const uint32_t reservations_cleanup_period_s;
queues_persistency_tracker topics_persistency_tracker;
public:
- librados::IoCtx& rados_ioctx;
+ rgw::sal::RadosStore& rados_store;
private:
librados::ObjectReadOperation op;
queues_t queues_chunk;
op.omap_get_keys2(start_after, max_chunk, &queues_chunk, &more, &rval);
- const auto ret = rgw_rados_operate(this, rados_ioctx, Q_LIST_OBJECT_NAME, &op, nullptr, y);
+ const auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), Q_LIST_OBJECT_NAME, &op, nullptr, y);
if (ret == -ENOENT) {
// queue list object was not created - nothing to do
return 0;
};
enum class EntryProcessingResult {
- Failure, Successful, Sleeping, Expired
+ Failure, Successful, Sleeping, Expired, Migrating
};
+ std::vector<std::string> entryProcessingResultString = {"Failure", "Successful", "Sleeping", "Expired", "Migrating"};
// processing of a specific entry
// return whether processing was successfull (true) or not (false)
return EntryProcessingResult::Failure;
}
+ if (event_entry.creation_time == ceph::coarse_real_clock::zero()) {
+ return EntryProcessingResult::Migrating;
+ }
+
const auto topic_persistency_ttl = event_entry.time_to_live != DEFAULT_GLOBAL_VALUE ?
event_entry.time_to_live : conf->rgw_topic_persistency_time_to_live;
const auto topic_persistency_max_retries = event_entry.max_retries != DEFAULT_GLOBAL_VALUE ?
if (time_now - entry_persistency_tracker.last_retry_time < std::chrono::seconds(topic_persistency_sleep_duration) ) {
return EntryProcessingResult::Sleeping;
}
- // TODO: write back the entry with creation time as if now
- // if event_entry.creation_time == zero
++entry_persistency_tracker.retires_num;
entry_persistency_tracker.last_retry_time = time_now;
"" /*no tag*/);
cls_2pc_queue_expire_reservations(op, stale_time);
// check ownership and do reservation cleanup in one batch
- auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+ auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, optional_yield(io_context, yield));
if (ret == -ENOENT) {
// queue was deleted
ldpp_dout(this, 5) << "INFO: queue: "
}
// get list of entries in the queue
+ auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
is_idle = true;
bool truncated = false;
std::string end_marker;
auto remove_entries = false;
auto entry_idx = 1U;
tokens_waiter waiter(io_context);
+ std::vector<bool> needs_migration_vector(entries.size(), false);
for (auto& entry : entries) {
if (has_error) {
// bail out on first error
entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name];
spawn::spawn(yield, [this, ¬ifs_persistency_tracker, &queue_name, entry_idx, total_entries, &end_marker,
- &remove_entries, &has_error, &waiter, &entry](yield_context yield) {
+ &remove_entries, &has_error, &waiter, &entry, &needs_migration_vector](yield_context yield) {
const auto token = waiter.make_token();
auto& persistency_tracker = notifs_persistency_tracker[entry.marker];
auto result = process_entry(this->get_cct()->_conf, persistency_tracker, entry, yield);
- if (result == EntryProcessingResult::Successful || result == EntryProcessingResult::Expired) {
- ldpp_dout(this, 20) << "INFO: processing of entry: " <<
- entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name
- << (result == EntryProcessingResult::Successful? " ok": " expired") << dendl;
+ if (result == EntryProcessingResult::Successful || result == EntryProcessingResult::Expired
+ || result == EntryProcessingResult::Migrating) {
+ ldpp_dout(this, 20) << "INFO: processing of entry: " << entry.marker
+ << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name
+ << entryProcessingResultString[static_cast<unsigned int>(result)] << dendl;
remove_entries = true;
+ needs_migration_vector[entry_idx - 1] = (result == EntryProcessingResult::Migrating);
notifs_persistency_tracker.erase(entry.marker);
} else {
if (set_min_marker(end_marker, entry.marker) < 0) {
- ldpp_dout(this, 1) << "ERROR: cannot determin minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
+ ldpp_dout(this, 1) << "ERROR: cannot determine minimum between malformed markers: " << end_marker << ", " << entry.marker << dendl;
} else {
ldpp_dout(this, 20) << "INFO: new end marker for removal: " << end_marker << " from: " << queue_name << dendl;
}
- has_error = true;
+ has_error = (result == EntryProcessingResult::Failure);
ldpp_dout(this, 20) << "INFO: processing of entry: " <<
entry.marker << " (" << entry_idx << "/" << total_entries << ") from: " << queue_name << " failed" << dendl;
}
// delete all published entries from queue
if (remove_entries) {
- uint64_t entries_to_remove = 0;
+ std::vector<cls_queue_entry> entries_to_migrate;
+ uint64_t index = 0;
+
for (const auto& entry: entries) {
if (end_marker == entry.marker) {
break;
}
- entries_to_remove++;
+ if (needs_migration_vector[index]) {
+ ldpp_dout(this, 20) << "INFO: migrating entry " << entry.marker << " from: " << queue_name << dendl;
+ entries_to_migrate.push_back(entry);
+ }
+ index++;
}
+ uint64_t entries_to_remove = index;
librados::ObjectWriteOperation op;
op.assert_exists();
rados::cls::lock::assert_locked(&op, queue_name+"_lock",
"" /*no tag*/);
cls_2pc_queue_remove_entries(op, end_marker, entries_to_remove);
// check ownership and deleted entries in one batch
- const auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+ auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
if (ret == -ENOENT) {
// queue was deleted
- ldpp_dout(this, 5) << "INFO: queue: "
- << queue_name << ". was removed. processing will stop" << dendl;
+ ldpp_dout(this, 5) << "INFO: queue: " << queue_name << ". was removed. processing will stop" << dendl;
return;
}
if (ret == -EBUSY) {
if (ret < 0) {
ldpp_dout(this, 1) << "ERROR: failed to remove entries and/or lock queue up to: " << end_marker << " from queue: "
<< queue_name << ". error: " << ret << dendl;
+ return;
} else {
- ldpp_dout(this, 20) << "INFO: removed entries up to: " << end_marker << " from queue: "
- << queue_name << dendl;
+ ldpp_dout(this, 20) << "INFO: removed entries up to: " << end_marker << " from queue: " << queue_name << dendl;
+ }
+
+ // reserving and committing the migrating entries
+ if (!entries_to_migrate.empty()) {
+ std::vector<bufferlist> migration_vector;
+ std::string tenant_name;
+ // TODO: extract tenant name from queue_name once it is fixed
+ uint64_t size_to_migrate = 0;
+ RGWPubSub ps(&rados_store, tenant_name);
+
+ rgw_pubsub_topic topic;
+ auto ret_of_get_topic = ps.get_topic(this, queue_name, topic, optional_yield(io_context, yield));
+ if (ret_of_get_topic < 0) {
+ // we can't migrate entries without topic info
+ ldpp_dout(this, 1) << "ERROR: failed to fetch topic: " << queue_name << " error: "
+ << ret_of_get_topic << ". Aborting migration!" << dendl;
+ return;
+ }
+
+ for (auto entry: entries_to_migrate) {
+ event_entry_t event_entry;
+ auto iter = entry.data.cbegin();
+ try {
+ decode(event_entry, iter);
+ } catch (buffer::error& err) {
+ ldpp_dout(this, 5) << "WARNING: failed to decode entry. error: " << err.what() << dendl;
+ continue;
+ }
+ size_to_migrate += entry.data.length();
+ event_entry.creation_time = ceph::coarse_real_clock::now();
+ event_entry.time_to_live = topic.dest.time_to_live;
+ event_entry.max_retries = topic.dest.max_retries;
+ event_entry.retry_sleep_duration = topic.dest.retry_sleep_duration;
+
+ bufferlist bl;
+ encode(event_entry, bl);
+ migration_vector.push_back(bl);
+ }
+
+ cls_2pc_reservation::id_t reservation_id;
+ buffer::list obl;
+ int rval;
+ cls_2pc_queue_reserve(op, size_to_migrate, migration_vector.size(), &obl, &rval);
+ ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield), librados::OPERATION_RETURNVEC);
+ if (ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to reserve migration space on queue: " << queue_name << ". error: " << ret << dendl;
+ return;
+ }
+ ret = cls_2pc_queue_reserve_result(obl, reservation_id);
+ if (ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to parse reservation id for migration. error: " << ret << dendl;
+ return;
+ }
+
+ cls_2pc_queue_commit(op, migration_vector, reservation_id);
+ ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+ reservation_id = cls_2pc_reservation::NO_ID;
+ if (ret < 0) {
+ ldpp_dout(this, 1) << "ERROR: failed to commit reservation to queue: " << queue_name << ". error: " << ret << dendl;
+ }
}
}
}
failover_time,
LOCK_FLAG_MAY_RENEW);
- ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
+ ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, optional_yield(io_context, yield));
if (ret == -EBUSY) {
// lock is already taken by another RGW
ldpp_dout(this, 20) << "INFO: queue: " << queue_name << " owned (locked) by another daemon" << dendl;
worker_count(_worker_count),
stale_reservations_period_s(_stale_reservations_period_s),
reservations_cleanup_period_s(_reservations_cleanup_period_s),
- rados_ioctx(store->getRados()->get_notif_pool_ctx())
+ rados_store(*store)
{
spawn::spawn(io_context, [this] (yield_context yield) {
process_queues(yield);
librados::ObjectWriteOperation op;
op.create(true);
cls_2pc_queue_init(op, topic_name, max_queue_size);
+ auto& rados_ioctx = rados_store.getRados()->get_notif_pool_ctx();
auto ret = rgw_rados_operate(this, rados_ioctx, topic_name, &op, y);
if (ret == -EEXIST) {
// queue already exists - nothing to do
if (!s_manager) {
return -EAGAIN;
}
- return remove_persistent_topic(s_manager, s_manager->rados_ioctx, topic_name, y);
+ return remove_persistent_topic(s_manager, s_manager->rados_store.getRados()->get_notif_pool_ctx(), topic_name, y);
}
rgw::sal::Object* get_object_with_atttributes(