From: Samuel Just Date: Sun, 3 Mar 2013 23:47:48 +0000 (-0800) Subject: ObjectStore: add Context management to Transaction X-Git-Tag: v0.60~78^2~11 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0b436abe3a9c9ef7f7a4de58c4bc206c94622994;p=ceph.git ObjectStore: add Context management to Transaction ObjectStore now appends passed contexts in queue_transaction to the Transaction contexts and uses that to pass into the virtual queue_transactions. Signed-off-by: Samuel Just --- diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index cff138b97f46..bd235586f90a 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -2008,10 +2008,13 @@ struct C_JournaledAhead : public Context { }; int FileStore::queue_transactions(Sequencer *posr, list &tls, - Context *onreadable, Context *ondisk, - Context *onreadable_sync, TrackedOpRef osd_op) { + Context *onreadable; + Context *ondisk; + Context *onreadable_sync; + ObjectStore::Transaction::collect_contexts( + tls, &onreadable, &ondisk, &onreadable_sync); if (g_conf->filestore_blackhole) { dout(0) << "queue_transactions filestore_blackhole = TRUE, dropping transaction" << dendl; return 0; diff --git a/src/os/FileStore.h b/src/os/FileStore.h index 144939a26679..f1b9c1b04c9f 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -315,8 +315,6 @@ public: unsigned _do_transaction(Transaction& t, uint64_t op_seq, int trans_num); int queue_transactions(Sequencer *osr, list& tls, - Context *onreadable, Context *ondisk=0, - Context *onreadable_sync=0, TrackedOpRef op = TrackedOpRef()); /** diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h index a36a7fbb3c4b..23265041f29a 100644 --- a/src/os/ObjectStore.h +++ b/src/os/ObjectStore.h @@ -167,7 +167,52 @@ public: bool use_pool_override; bool replica; + list on_applied; + list on_commit; + list on_applied_sync; + public: + void register_on_applied(Context *c) { + on_applied.push_back(c); + } + void register_on_commit(Context *c) { + on_commit.push_back(c); + } + void register_on_applied_sync(Context *c) { + on_applied_sync.push_back(c); + } + + static void collect_contexts( + list &t, + Context **out_on_applied, + Context **out_on_commit, + Context **out_on_applied_sync) { + assert(out_on_applied); + assert(out_on_commit); + assert(out_on_applied_sync); + list on_applied, on_commit, on_applied_sync; + for (list::iterator i = t.begin(); + i != t.end(); + ++i) { + on_applied.splice(on_applied.end(), (*i)->on_applied); + on_commit.splice(on_commit.end(), (*i)->on_commit); + on_applied_sync.splice(on_applied_sync.end(), (*i)->on_applied_sync); + } + *out_on_applied = C_Contexts::list_to_context(on_applied); + *out_on_commit = C_Contexts::list_to_context(on_commit); + *out_on_applied_sync = C_Contexts::list_to_context(on_applied_sync); + } + + Context *get_on_applied() { + return C_Contexts::list_to_context(on_applied); + } + Context *get_on_commit() { + return C_Contexts::list_to_context(on_commit); + } + Context *get_on_applied_sync() { + return C_Contexts::list_to_context(on_applied_sync); + } + void set_pool_override(int64_t pool) { pool_override = pool; } @@ -181,6 +226,9 @@ public: std::swap(largest_data_len, other.largest_data_len); std::swap(largest_data_off, other.largest_data_off); std::swap(largest_data_off_in_tbl, other.largest_data_off_in_tbl); + std::swap(on_applied, other.on_applied); + std::swap(on_commit, other.on_commit); + std::swap(on_applied_sync, other.on_applied_sync); tbl.swap(other.tbl); } @@ -194,6 +242,9 @@ public: largest_data_off_in_tbl = tbl.length() + other.largest_data_off_in_tbl; } tbl.append(other.tbl); + on_applied.splice(on_applied.end(), other.on_applied); + on_commit.splice(on_commit.end(), other.on_commit); + on_applied_sync.splice(on_applied_sync.end(), other.on_applied_sync); } uint64_t get_encoded_bytes() { @@ -664,6 +715,7 @@ public: tls.push_back(t); return queue_transactions(osr, tls, new C_DeleteTransaction(t)); } + int queue_transaction(Sequencer *osr, Transaction *t, Context *onreadable, Context *ondisk=0, Context *onreadable_sync=0, TrackedOpRef op = TrackedOpRef()) { @@ -671,9 +723,22 @@ public: tls.push_back(t); return queue_transactions(osr, tls, onreadable, ondisk, onreadable_sync, op); } - virtual int queue_transactions(Sequencer *osr, list& tls, Context *onreadable, Context *ondisk=0, - Context *onreadable_sync=0, - TrackedOpRef op = TrackedOpRef()) = 0; + + int queue_transactions(Sequencer *osr, list& tls, + Context *onreadable, Context *ondisk=0, + Context *onreadable_sync=0, + TrackedOpRef op = TrackedOpRef()) { + assert(!tls.empty()); + tls.back()->register_on_applied(onreadable); + tls.back()->register_on_commit(ondisk); + tls.back()->register_on_applied_sync(onreadable_sync); + return queue_transactions(osr, tls, op); + } + + virtual int queue_transactions( + Sequencer *osr, list& tls, + TrackedOpRef op = TrackedOpRef()) = 0; + int queue_transactions( Sequencer *osr,