uint64_t used_bytes = 0;
bool init_complete = false;
+ /**
+ * projected_used_bytes
+ *
+ * Sum of projected bytes used by each transaction between throttle
+ * acquisition and commit completion. See await_throttle()
+ */
+ uint64_t projected_used_bytes = 0;
+
struct {
uint64_t segments_released = 0;
} stats;
get_bytes_available_current_segment() +
get_bytes_scanned_current_segment();
}
+ size_t get_projected_available_bytes() const {
+ return (get_available_bytes() > projected_used_bytes) ?
+ get_available_bytes() - projected_used_bytes:
+ 0;
+ }
/// Returns total space available
size_t get_total_bytes() const {
size_t get_unavailable_bytes() const {
return get_total_bytes() - get_available_bytes();
}
+ size_t get_projected_unavailable_bytes() const {
+ return (get_total_bytes() > get_projected_available_bytes()) ?
+ (get_total_bytes() - get_projected_available_bytes()) :
+ 0;
+ }
/// Returns bytes currently occupied by live extents (not journal)
size_t get_used_bytes() const {
return used_bytes;
}
+ size_t get_projected_used_bytes() const {
+ return used_bytes + projected_used_bytes;
+ }
/// Return bytes contained in segments in journal
size_t get_journal_segment_bytes() const {
else
return 0;
}
+ size_t get_projected_reclaimable_bytes() const {
+ auto ret = get_projected_unavailable_bytes() - get_projected_used_bytes();
+ if (ret > get_journal_segment_bytes())
+ return ret - get_journal_segment_bytes();
+ else
+ return 0;
+ }
/**
* get_reclaim_ratio
if (get_unavailable_bytes() == 0) return 0;
return (double)get_reclaimable_bytes() / (double)get_unavailable_bytes();
}
+ double get_projected_reclaim_ratio() const {
+ if (get_projected_unavailable_bytes() == 0) return 0;
+ return (double)get_reclaimable_bytes() /
+ (double)get_projected_unavailable_bytes();
+ }
/**
* get_available_ratio
double get_available_ratio() const {
return (double)get_available_bytes() / (double)get_total_bytes();
}
+ double get_projected_available_ratio() const {
+ return (double)get_projected_available_bytes() /
+ (double)get_total_bytes();
+ }
/**
* should_block_on_gc
* Encapsulates whether block pending gc.
*/
bool should_block_on_gc() const {
- auto aratio = get_available_ratio();
+ // TODO: probably worth projecting journal usage as well
+ auto aratio = get_projected_available_ratio();
return (
((aratio < config.available_ratio_gc_max) &&
- (get_reclaim_ratio() > config.reclaim_ratio_hard_limit ||
- aratio < config.available_ratio_hard_limit)) ||
+ ((get_projected_reclaim_ratio() >
+ config.reclaim_ratio_hard_limit) ||
+ (aratio < config.available_ratio_hard_limit))) ||
(get_dirty_tail_limit() > journal_tail_target)
);
}
}
public:
- seastar::future<> await_hard_limits() {
+ seastar::future<> reserve_projected_usage(size_t projected_usage) {
// The pipeline configuration prevents another IO from entering
// prepare until the prior one exits and clears this.
ceph_assert(!blocked_io_wake);
[this] {
blocked_io_wake = seastar::promise<>();
return blocked_io_wake->get_future();
- });
+ }
+ ).then([this, projected_usage] {
+ ceph_assert(!blocked_io_wake);
+ projected_used_bytes += projected_usage;
+ });
+ }
+
+ void release_projected_usage(size_t projected_usage) {
+ ceph_assert(projected_used_bytes >= projected_usage);
+ projected_used_bytes -= projected_usage;
+ return maybe_wake_gc_blocked_io();
}
private:
void maybe_wake_gc_blocked_io() {
Transaction &t)
{
LOG_PREFIX(TransactionManager::submit_transaction);
- DEBUGT("about to await throttle", t);
- return trans_intr::make_interruptible(segment_cleaner->await_hard_limits()
- ).then_interruptible([this, &t]() {
+ size_t projected_usage = t.get_allocation_size();
+ DEBUGT("waiting for projected_usage: {}", t, projected_usage);
+ return trans_intr::make_interruptible(
+ segment_cleaner->reserve_projected_usage(projected_usage)
+ ).then_interruptible([this, &t] {
return submit_transaction_direct(t);
+ }).finally([this, FNAME, projected_usage, &t] {
+ DEBUGT("releasing projected_usage: {}", t, projected_usage);
+ segment_cleaner->release_projected_usage(projected_usage);
});
}