ObjectStore::Transaction& t)
{
LOG_PREFIX(PG::update_snap_map);
- for (auto i = log_entries.cbegin(); i != log_entries.cend(); ++i) {
- OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
- if (i->soid.snap < CEPH_MAXSNAP) {
- if (i->is_delete()) {
- co_await OpsExecuter::snap_map_remove(
- i->soid,
- snap_mapper,
- osdriver,
- t);
- } else if (i->is_update()) {
- ceph_assert(i->snaps.length() > 0);
- vector<snapid_t> snaps;
- bufferlist snapbl = i->snaps;
- auto p = snapbl.cbegin();
- try {
- decode(snaps, p);
- } catch (...) {
- ERRORDPP("Failed to decode snaps on {}", *this, *i);
- snaps.clear();
- }
- set<snapid_t> _snaps(snaps.begin(), snaps.end());
-
- if (i->is_clone() || i->is_promote()) {
- co_await OpsExecuter::snap_map_clone(
- i->soid,
- _snaps,
- snap_mapper,
- osdriver,
- t);
- } else if (i->is_modify()) {
- co_await OpsExecuter::snap_map_modify(
- i->soid,
- _snaps,
- snap_mapper,
- osdriver,
- t);
- } else {
- ceph_assert(i->is_clean());
- }
- }
+ DEBUGDPP("", *this);
+ return interruptor::do_for_each(
+ log_entries,
+ [this, &t](const auto& entry) mutable {
+ if (entry.soid.snap < CEPH_MAXSNAP) {
+ return interruptor::async(
+ [this, entry, _t=osdriver.get_transaction(&t)]() mutable {
+ snap_mapper.update_snap_map(entry, &_t);
+ });
}
- }
+ return interruptor::now();
+ });
}
void PG::log_operation(
* handles these cases.
*/
#if 0
- if (transaction_applied) {
- //TODO:
- //update_snap_map(logv, t);
- }
auto last = logv.rbegin();
if (is_primary() && last != logv.rend()) {
projected_log.skip_can_rollback_to_to_head();