};
typename std::map<T, marker_entry> pending;
- T high_marker;
- T last_stored_marker;
- marker_entry high_entry;
+ map<T, marker_entry> finish_markers;
int window_size;
int updates_since_flush;
}
void try_update_high_marker(const T& pos, int index_pos, const real_time& timestamp) {
- if (!(pos <= high_marker)) {
- high_marker = pos;
- high_entry = marker_entry(index_pos, timestamp);
- }
+ finish_markers[pos] = marker_entry(index_pos, timestamp);
}
RGWCoroutine *finish(const T& pos) {
return NULL;
}
- if (!(pos <= high_marker)) {
- high_marker = pos;
- high_entry = pos_iter->second;
- }
+ finish_markers[pos] = pos_iter->second;
pending.erase(pos);
}
RGWCoroutine *flush() {
- if (last_stored_marker == high_marker) {
+ if (finish_markers.empty()) {
return NULL;
}
+ typename std::map<T, marker_entry>::iterator i;
+
+ if (pending.empty()) {
+ i = finish_markers.end();
+ } else {
+ i = finish_markers.lower_bound(pending.begin()->first);
+ }
+ if (i == finish_markers.begin()) {
+ return NULL;
+ }
updates_since_flush = 0;
- last_stored_marker = high_marker;
- return store_marker(high_marker, high_entry.pos, high_entry.timestamp);
+
+ auto last = i;
+ --i;
+ const T& high_marker = i->first;
+ marker_entry& high_entry = i->second;
+ RGWCoroutine *cr = store_marker(high_marker, high_entry.pos, high_entry.timestamp);
+ finish_markers.erase(finish_markers.begin(), last);
+ return cr;
}
/*