dout(20) << __func__
<< " last_{nid,blobid} exceeds max, submit via kv thread"
<< dendl;
+ } else if (txc->osr->kv_committing_serially) {
+ dout(20) << __func__ << " prior txc submitted via kv thread, us too"
+ << dendl;
+ // note: this is starvation-prone. once we have a txc in a busy
+ // sequencer that is committing serially it is possible to keep
+ // submitting new transactions fast enough that we get stuck doing
+ // so. the alternative is to block here... fixme?
} else {
_txc_finalize_kv(txc, txc->t);
txc->kv_submitted = true;
std::lock_guard<std::mutex> l(kv_lock);
kv_queue.push_back(txc);
kv_cond.notify_one();
+ if (!txc->kv_submitted) {
+ kv_queue_unsubmitted.push_back(txc);
+ ++txc->osr->kv_committing_serially;
+ }
}
return;
case TransContext::STATE_KV_QUEUED:
std::unique_lock<std::mutex> l(kv_lock);
while (true) {
assert(kv_committing.empty());
- assert(wal_cleaning.empty());
if (kv_queue.empty() && wal_cleanup_queue.empty()) {
if (kv_stop)
break;
kv_cond.wait(l);
dout(20) << __func__ << " wake" << dendl;
} else {
+ deque<TransContext*> kv_submitting;
+ deque<TransContext*> wal_cleaning;
dout(20) << __func__ << " committing " << kv_queue.size()
+ << " submitting " << kv_queue_unsubmitted.size()
<< " cleaning " << wal_cleanup_queue.size() << dendl;
kv_committing.swap(kv_queue);
+ kv_submitting.swap(kv_queue_unsubmitted);
wal_cleaning.swap(wal_cleanup_queue);
utime_t start = ceph_clock_now(NULL);
l.unlock();
dout(30) << __func__ << " committing txc " << kv_committing << dendl;
+ dout(30) << __func__ << " submitting txc " << kv_submitting << dendl;
dout(30) << __func__ << " wal_cleaning txc " << wal_cleaning << dendl;
alloc->commit_start();
// flush/barrier on block device
bdev->flush();
- uint64_t high_nid = 0, high_blobid = 0;
- bool any_to_submit = false;
- for (auto txc : kv_committing) {
- if (!txc->kv_submitted) {
- _txc_finalize_kv(txc, txc->t);
- if (txc->last_nid > high_nid) {
- high_nid = txc->last_nid;
- }
- if (txc->last_blobid > high_blobid) {
- high_blobid = txc->last_blobid;
- }
- txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
- any_to_submit = true;
- }
- }
- if (any_to_submit) {
- if (high_nid || high_blobid) {
- TransContext *first_txc = kv_committing.front();
- std::lock_guard<std::mutex> l(id_lock);
- if (high_nid + g_conf->bluestore_nid_prealloc/2 > nid_max) {
- nid_max = high_nid + g_conf->bluestore_nid_prealloc;
- bufferlist bl;
- ::encode(nid_max, bl);
- first_txc->t->set(PREFIX_SUPER, "nid_max", bl);
- dout(10) << __func__ << " nid_max now " << nid_max << dendl;
- }
- if (high_blobid + g_conf->bluestore_blobid_prealloc/2 > blobid_max) {
- blobid_max = high_blobid + g_conf->bluestore_blobid_prealloc;
- bufferlist bl;
- ::encode(blobid_max, bl);
- first_txc->t->set(PREFIX_SUPER, "blobid_max", bl);
- dout(10) << __func__ << " blobid_max now " << blobid_max << dendl;
- }
- }
- for (auto txc : kv_committing) {
- if (!txc->kv_submitted) {
- txc->kv_submitted = true;
- int r = db->submit_transaction(txc->t);
- assert(r == 0);
- }
- }
+ // we will use one final transaction to force a sync
+ KeyValueDB::Transaction synct = db->get_transaction();
+
+ // increase {nid,blobid}_max? note that this covers both the
+ // case wehre we are approaching the max and the case we passed
+ // it. in either case, we increase the max in the earlier txn
+ // we submit.
+ uint64_t new_nid_max = 0, new_blobid_max = 0;
+ if (nid_last + g_conf->bluestore_nid_prealloc/2 > nid_max) {
+ KeyValueDB::Transaction t =
+ kv_submitting.empty() ? synct : kv_submitting.front()->t;
+ new_nid_max = nid_last + g_conf->bluestore_nid_prealloc;
+ bufferlist bl;
+ ::encode(new_nid_max, bl);
+ t->set(PREFIX_SUPER, "nid_max", bl);
+ dout(10) << __func__ << " new_nid_max " << new_nid_max << dendl;
+ }
+ if (blobid_last + g_conf->bluestore_blobid_prealloc/2 > blobid_max) {
+ KeyValueDB::Transaction t =
+ kv_submitting.empty() ? synct : kv_submitting.front()->t;
+ new_blobid_max = blobid_last + g_conf->bluestore_blobid_prealloc;
+ bufferlist bl;
+ ::encode(new_blobid_max, bl);
+ t->set(PREFIX_SUPER, "blobid_max", bl);
+ dout(10) << __func__ << " new_blobid_max " << new_blobid_max << dendl;
+ }
+ for (auto txc : kv_submitting) {
+ assert(!txc->kv_submitted);
+ _txc_finalize_kv(txc, txc->t);
+ txc->log_state_latency(logger, l_bluestore_state_kv_queued_lat);
+ int r = db->submit_transaction(txc->t);
+ assert(r == 0);
+ --txc->osr->kv_committing_serially;
+ txc->kv_submitted = true;
}
- // one final transaction to force a sync
- KeyValueDB::Transaction t = db->get_transaction();
-
vector<bluestore_pextent_t> bluefs_gift_extents;
if (bluefs) {
int r = _balance_bluefs_freespace(&bluefs_gift_extents);
::encode(bluefs_extents, bl);
dout(10) << __func__ << " bluefs_extents now 0x" << std::hex
<< bluefs_extents << std::dec << dendl;
- t->set(PREFIX_SUPER, "bluefs_extents", bl);
+ synct->set(PREFIX_SUPER, "bluefs_extents", bl);
}
}
++it) {
bluestore_wal_transaction_t& wt =*(*it)->wal_txn;
// kv metadata updates
- _txc_finalize_kv(*it, t);
+ _txc_finalize_kv(*it, synct);
// cleanup the wal
string key;
get_wal_key(wt.seq, &key);
- t->rm_single_key(PREFIX_WAL, key);
+ synct->rm_single_key(PREFIX_WAL, key);
}
- int r = db->submit_transaction_sync(t);
+
+ // submit synct synchronously (block and wait for it to commit)
+ int r = db->submit_transaction_sync(synct);
assert(r == 0);
+ if (new_nid_max) {
+ nid_max = new_nid_max;
+ dout(10) << __func__ << " nid_max now " << nid_max << dendl;
+ }
+ if (new_blobid_max) {
+ blobid_max = new_blobid_max;
+ dout(10) << __func__ << " blobid_max now " << blobid_max << dendl;
+ }
+
utime_t finish = ceph_clock_now(NULL);
utime_t dur = finish - start;
dout(20) << __func__ << " committed " << kv_committing.size()
<< " in " << dur << dendl;
while (!kv_committing.empty()) {
TransContext *txc = kv_committing.front();
+ assert(txc->kv_submitted);
_txc_state_proc(txc);
kv_committing.pop_front();
}