bluefs->get_block_device_size(BlueFS::BDEV_DB) - BLUEFS_START);
}
bluefs_shared_bdev = BlueFS::BDEV_SLOW;
+ bluefs_single_shared_device = false;
} else {
bluefs_shared_bdev = BlueFS::BDEV_DB;
}
BDEV_LABEL_BLOCK_SIZE);
}
cct->_conf->set_val("rocksdb_separate_wal_dir", "true");
+ bluefs_single_shared_device = false;
} else {
cct->_conf->set_val("rocksdb_separate_wal_dir", "false");
}
std::unique_lock<std::mutex> l(kv_lock);
while (true) {
assert(kv_committing.empty());
- if (kv_queue.empty() && deferred_cleanup_queue.empty()) {
+ if (kv_queue.empty() &&
+ deferred_done_queue.empty() &&
+ deferred_stable_queue.empty()) {
if (kv_stop)
break;
dout(20) << __func__ << " sleep" << dendl;
dout(20) << __func__ << " wake" << dendl;
} else {
deque<TransContext*> kv_submitting;
- deque<TransContext*> deferred_cleaning;
+ deque<TransContext*> deferred_done, deferred_stable;
dout(20) << __func__ << " committing " << kv_queue.size()
<< " submitting " << kv_queue_unsubmitted.size()
- << " cleaning " << deferred_cleanup_queue.size() << dendl;
+ << " deferred done " << deferred_done_queue.size()
+ << " stable " << deferred_stable_queue.size()
+ << dendl;
kv_committing.swap(kv_queue);
kv_submitting.swap(kv_queue_unsubmitted);
- deferred_cleaning.swap(deferred_cleanup_queue);
+ deferred_done.swap(deferred_done_queue);
+ deferred_stable.swap(deferred_stable_queue);
utime_t start = ceph_clock_now();
l.unlock();
- dout(30) << __func__ << " committing txc " << kv_committing << dendl;
- dout(30) << __func__ << " submitting txc " << kv_submitting << dendl;
- dout(30) << __func__ << " deferred_cleaning txc " << deferred_cleaning << dendl;
+ dout(30) << __func__ << " committing " << kv_committing << dendl;
+ dout(30) << __func__ << " submitting " << kv_submitting << dendl;
+ dout(30) << __func__ << " deferred_done " << deferred_done << dendl;
+ dout(30) << __func__ << " deferred_stable " << deferred_stable << dendl;
- // flush/barrier on block device
- bdev->flush();
+ int num_aios = 0;
+ for (auto txc : kv_committing) {
+ if (txc->had_ios) {
+ ++num_aios;
+ }
+ }
+
+ bool force_flush = false;
+ // if bluefs is sharing the same device as data (only), then we
+ // can rely on the bluefs commit to flush the device and make
+ // deferred aios stable. that means that if we do have done deferred
+ // txcs AND we are not on a single device, we need to force a flush.
+ if (!bluefs || (!bluefs_single_shared_device && !deferred_done.empty())) {
+ force_flush = true;
+ }
+ if (kv_committing.empty() && kv_submitting.empty() &&
+ deferred_stable.empty()) {
+ force_flush = true; // there's nothing else to commit!
+ }
+ if (deferred_aggressive) {
+ force_flush = true;
+ }
+
+ if (num_aios || force_flush) {
+ dout(20) << __func__ << " num_aios=" << num_aios
+ << " force_flush=" << (int)force_flush
+ << ", flushing, deferred done->stable" << dendl;
+ // flush/barrier on block device
+ bdev->flush();
+
+ // if we flush then deferred done are now deferred stable
+ deferred_stable.insert(deferred_stable.end(), deferred_done.begin(),
+ deferred_done.end());
+ deferred_done.clear();
+ }
// we will use one final transaction to force a sync
KeyValueDB::Transaction synct = db->get_transaction();
txc->osr->qcond.notify_all();
}
}
- for (auto txc : kv_committing) {
- if (txc->had_ios) {
- --txc->osr->txc_with_unstable_io;
+ if (num_aios) {
+ for (auto txc : kv_committing) {
+ if (txc->had_ios) {
+ --txc->osr->txc_with_unstable_io;
+ }
}
}
}
// cleanup sync deferred keys
- for (auto txc : deferred_cleaning) {
+ for (auto txc : deferred_stable) {
bluestore_deferred_transaction_t& wt = *txc->deferred_txn;
if (!wt.released.empty()) {
// kraken replay compat only
utime_t finish = ceph_clock_now();
utime_t dur = finish - start;
dout(20) << __func__ << " committed " << kv_committing.size()
- << " cleaned " << deferred_cleaning.size()
+ << " cleaned " << deferred_stable.size()
<< " in " << dur << dendl;
while (!kv_committing.empty()) {
TransContext *txc = kv_committing.front();
_txc_state_proc(txc);
kv_committing.pop_front();
}
- while (!deferred_cleaning.empty()) {
- TransContext *txc = deferred_cleaning.front();
+ while (!deferred_stable.empty()) {
+ TransContext *txc = deferred_stable.front();
_txc_state_proc(txc);
- deferred_cleaning.pop_front();
+ deferred_stable.pop_front();
}
if (!deferred_aggressive) {
}
l.lock();
+ // previously deferred "done" are now "stable" by virtue of this
+ // commit cycle.
+ deferred_stable_queue.swap(deferred_done);
}
}
dout(10) << __func__ << " finish" << dendl;
txc->osr->qcond.notify_all();
throttle_deferred_ops.put(txc->ops);
throttle_deferred_bytes.put(txc->bytes);
- deferred_cleanup_queue.push_back(txc);
+ deferred_done_queue.push_back(txc);
}
finished.clear();