template <class T, class K>
class RGWSyncShardMarkerTrack {
struct marker_entry {
- uint64_t tracker_pos;
- T marker;
- K key;
uint64_t pos;
real_time timestamp;
marker_entry() : pos(0) {}
- marker_entry(uint64_t _tracker_pos,
- const T& _marker,
- std::optional<K> _key,
- uint64_t _p, const
- real_time& _ts) : tracker_pos(_tracker_pos),
- marker(_marker),
- key(_key.value_or(K())),
- pos(_p),
- timestamp(_ts) {}
+ marker_entry(uint64_t _p, const real_time& _ts) : pos(_p), timestamp(_ts) {}
};
+ typename std::map<T, marker_entry> pending;
- typename std::map<T, typename std::list<marker_entry>::iterator> markers;
- typename std::list<marker_entry> pending;
-
- std::map<uint64_t, marker_entry> finish_markers;
- uint64_t max_keys{0};
-
- bool high_entry_exists{false};
- marker_entry high_entry; /* highest pos flushed */
+ std::map<T, marker_entry> finish_markers;
int window_size;
int updates_since_flush;
protected:
typename std::set<K> need_retry_set;
- virtual RGWCoroutine *store_marker(const DoutPrefixProvider *dpp, const T& new_marker, const K& key, uint64_t index_pos, const real_time& timestamp) = 0;
+ virtual RGWCoroutine *store_marker(const T& new_marker, uint64_t index_pos, const real_time& timestamp) = 0;
virtual RGWOrderCallCR *allocate_order_control_cr() = 0;
virtual void handle_finish(const T& marker) { }
public:
- using marker_entry_type = marker_entry;
-
RGWSyncShardMarkerTrack(int _window_size) : window_size(_window_size), updates_since_flush(0) {}
virtual ~RGWSyncShardMarkerTrack() {
if (order_cr) {
}
}
- bool start(const T& marker, std::optional<K> key, int index_pos, const real_time& timestamp) {
- if (markers.find(marker) != markers.end()) {
+ bool start(const T& pos, int index_pos, const real_time& timestamp) {
+ if (pending.find(pos) != pending.end()) {
return false;
}
- auto i = ++max_keys;
- pending.push_back(marker_entry(i, marker, key, index_pos, timestamp));
- markers[marker] = std::prev(pending.end());
+ pending[pos] = marker_entry(index_pos, timestamp);
return true;
}
- void try_update_high_marker(const T& marker, std::optional<K> key, int index_pos, const real_time& timestamp) {
- auto i = ++max_keys;
- finish_markers[i] = marker_entry(i, marker, key, index_pos, timestamp);
+ void try_update_high_marker(const T& pos, int index_pos, const real_time& timestamp) {
+ finish_markers[pos] = marker_entry(index_pos, timestamp);
}
- RGWCoroutine *finish(const DoutPrefixProvider *dpp, const T& marker) {
+ RGWCoroutine *finish(const T& pos) {
if (pending.empty()) {
/* can happen, due to a bug that ended up with multiple objects with the same name and version
* -- which can happen when versioning is enabled an the version is 'null'.
*/
- return nullptr;
- }
-
- auto miter = markers.find(marker);
- if (miter == markers.end()) {
- return nullptr;
+ return NULL;
}
- auto& marker_iter = miter->second;
- auto iter = pending.begin();
+ typename std::map<T, marker_entry>::iterator iter = pending.begin();
- bool is_first = (marker_iter == iter);
+ bool is_first = (pos == iter->first);
- if (marker_iter == pending.end()) {
+ typename std::map<T, marker_entry>::iterator pos_iter = pending.find(pos);
+ if (pos_iter == pending.end()) {
/* see pending.empty() comment */
- return nullptr;
+ return NULL;
}
- finish_markers[marker_iter->tracker_pos] = *marker_iter;
+ finish_markers[pos] = pos_iter->second;
- pending.erase(marker_iter);
- markers.erase(miter);
+ pending.erase(pos);
- handle_finish(marker);
+ handle_finish(pos);
updates_since_flush++;
-
if (is_first && (updates_since_flush >= window_size || pending.empty())) {
- return flush(dpp);
+ return flush();
}
- return nullptr;
+ return NULL;
}
- RGWCoroutine *flush(const DoutPrefixProvider *dpp) {
+ RGWCoroutine *flush() {
if (finish_markers.empty()) {
return NULL;
}
- typename std::map<uint64_t, marker_entry>::iterator i;
+ typename std::map<T, marker_entry>::iterator i;
if (pending.empty()) {
i = finish_markers.end();
} else {
- i = finish_markers.lower_bound(pending.begin()->tracker_pos);
+ i = finish_markers.lower_bound(pending.begin()->first);
}
if (i == finish_markers.begin()) {
return NULL;
auto last = i;
--i;
- high_entry = i->second;
- high_entry_exists = true;
- RGWCoroutine *cr = order(store_marker(dpp, high_entry.marker, high_entry.key, high_entry.pos, high_entry.timestamp));
+ const T& high_marker = i->first;
+ marker_entry& high_entry = i->second;
+ RGWCoroutine *cr = order(store_marker(high_marker, high_entry.pos, high_entry.timestamp));
finish_markers.erase(finish_markers.begin(), last);
return cr;
}
order_cr->call_cr(cr);
return nullptr; /* don't call it a second time */
}
-
- bool get_lowerbound(marker_entry_type *he) {
- if (high_entry_exists) {
- *he = high_entry;
- }
- return high_entry_exists;
- }
};
class RGWMetaSyncShardMarkerTrack;
: RGWCoroutine(_cct), max_concurrent(_max_concurrent)
{}
- virtual bool spawn_next(const DoutPrefixProvider *dpp) = 0;
+ virtual bool spawn_next() = 0;
int operate(const DoutPrefixProvider *dpp) override;
};