From 958e3ece4b3db7f88fbc8c5c13702306027d94f4 Mon Sep 17 00:00:00 2001 From: sageweil Date: Mon, 12 Mar 2007 02:52:30 +0000 Subject: [PATCH] fixed lots of memory leaks! cleaned up C_Gather. Thread has stricter interface (join() dies if thread didn't start). lots of objectcacher cleanup (including memory leakage). some fakemessenger cleanup. git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1207 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/client/Client.cc | 16 +++-- trunk/ceph/client/FileCache.cc | 9 +++ trunk/ceph/client/FileCache.h | 5 ++ trunk/ceph/client/fuse.cc | 2 - trunk/ceph/common/Logger.cc | 4 +- trunk/ceph/common/Thread.h | 4 +- trunk/ceph/common/Timer.cc | 6 +- trunk/ceph/ebofs/BlockDevice.h | 7 ++ trunk/ceph/ebofs/Table.h | 5 +- trunk/ceph/fakesyn.cc | 36 +++++----- trunk/ceph/include/Context.h | 45 ++++++++---- trunk/ceph/mds/MDS.cc | 4 ++ trunk/ceph/mon/MonitorStore.cc | 4 +- trunk/ceph/mon/OSDMonitor.cc | 2 + trunk/ceph/msg/FakeMessenger.cc | 68 +++++++----------- trunk/ceph/osd/OSD.cc | 10 +-- trunk/ceph/osdc/ObjectCacher.cc | 112 +++++++++++++++++------------ trunk/ceph/osdc/ObjectCacher.h | 40 ++++++----- trunk/ceph/osdc/Objecter.cc | 123 +++++++++++++++++--------------- 19 files changed, 294 insertions(+), 208 deletions(-) diff --git a/trunk/ceph/client/Client.cc b/trunk/ceph/client/Client.cc index 72de4379443e1..b01b13b7cf291 100644 --- a/trunk/ceph/client/Client.cc +++ b/trunk/ceph/client/Client.cc @@ -127,13 +127,19 @@ Client::Client(Messenger *m, MonMap *mm) Client::~Client() { - if (messenger) { delete messenger; messenger = 0; } + tear_down_cache(); + + if (objectcacher) { + delete objectcacher; + objectcacher = 0; + } + if (filer) { delete filer; filer = 0; } - if (objectcacher) { delete objectcacher; objectcacher = 0; } if (objecter) { delete objecter; objecter = 0; } if (osdmap) { delete osdmap; osdmap = 0; } + if (mdsmap) { delete mdsmap; mdsmap = 0; } - tear_down_cache(); + if (messenger) { delete messenger; messenger = 0; } } @@ -792,6 +798,7 @@ void Client::handle_file_caps(MClientFileCaps *m) if (cap_reap_queue[in->ino()].empty()) cap_reap_queue.erase(in->ino()); } + delete m; return; } @@ -819,7 +826,7 @@ void Client::handle_file_caps(MClientFileCaps *m) } else { //dout(0) << "didn't put_inode" << endl; } - + delete m; return; } @@ -879,7 +886,6 @@ void Client::handle_file_caps(MClientFileCaps *m) } } in->fc.set_caps(new_caps, onimplement); - } else { // caching off. diff --git a/trunk/ceph/client/FileCache.cc b/trunk/ceph/client/FileCache.cc index 5d572ab7b670c..2a1dd1576ae59 100644 --- a/trunk/ceph/client/FileCache.cc +++ b/trunk/ceph/client/FileCache.cc @@ -50,6 +50,15 @@ void FileCache::empty(Context *onempty) } +void FileCache::tear_down() +{ + off_t unclean = release_clean(); + if (unclean) { + dout(0) << "tear_down " << unclean << " unclean bytes, purging" << endl; + oc->purge_set(inode.ino); + } +} + // caps void FileCache::set_caps(int caps, Context *onimplement) diff --git a/trunk/ceph/client/FileCache.h b/trunk/ceph/client/FileCache.h index 742ec98733d9b..6bef22f4e0c6a 100644 --- a/trunk/ceph/client/FileCache.h +++ b/trunk/ceph/client/FileCache.h @@ -34,6 +34,9 @@ class FileCache { latest_caps(0), num_reading(0), num_writing(0),// num_unsafe(0), waitfor_release(false) {} + ~FileCache() { + tear_down(); + } // waiters/waiting bool can_read() { return latest_caps & CAP_FILE_RD; } @@ -52,6 +55,8 @@ class FileCache { bool is_cached(); bool is_dirty(); + void tear_down(); + int get_caps() { return latest_caps; } void set_caps(int caps, Context *onimplement=0); void check_caps(); diff --git a/trunk/ceph/client/fuse.cc b/trunk/ceph/client/fuse.cc index 86a2ce3602315..f4a1c2d3f7797 100644 --- a/trunk/ceph/client/fuse.cc +++ b/trunk/ceph/client/fuse.cc @@ -276,8 +276,6 @@ int ceph_fuse_main(Client *c, int argc, char *argv[]) // go fuse go cout << "ok, calling fuse_main" << endl; - cout << "cwd was " << get_current_dir_name() << endl; int r = fuse_main(newargc, newargv, &ceph_oper); - cout << "cwd now " << get_current_dir_name() << endl; return r; } diff --git a/trunk/ceph/common/Logger.cc b/trunk/ceph/common/Logger.cc index 2fb722a5b0f7c..91164658a80e5 100644 --- a/trunk/ceph/common/Logger.cc +++ b/trunk/ceph/common/Logger.cc @@ -36,7 +36,9 @@ Logger::Logger(string fn, LogType *type) { filename = ""; if (g_conf.use_abspaths) { - filename = get_current_dir_name(); + char *cwd = get_current_dir_name(); + filename = cwd; + delete cwd; filename += "/"; } diff --git a/trunk/ceph/common/Thread.h b/trunk/ceph/common/Thread.h index 43e2942e84c5f..8565ce9effd92 100644 --- a/trunk/ceph/common/Thread.h +++ b/trunk/ceph/common/Thread.h @@ -45,7 +45,9 @@ class Thread { } int join(void **prval = 0) { - if (thread_id == 0) return -1; // never started. + assert(thread_id); + //if (thread_id == 0) return -1; // never started. + int status = pthread_join(thread_id, prval); if (status == 0) thread_id = 0; diff --git a/trunk/ceph/common/Timer.cc b/trunk/ceph/common/Timer.cc index adacf0c5eb6c6..522a623d5ebac 100644 --- a/trunk/ceph/common/Timer.cc +++ b/trunk/ceph/common/Timer.cc @@ -234,6 +234,10 @@ bool Timer::cancel_event(Context *callback) scheduled.erase(tp); lock.Unlock(); + + // delete the canceled event. + delete callback; + return true; } @@ -290,7 +294,7 @@ void SafeTimer::cancel_event(Context *c) if (g_timer.cancel_event(scheduled[c])) { // hosed wrapper. hose original event too. - delete scheduled[c]; + delete c; } else { // clean up later. canceled[c] = scheduled[c]; diff --git a/trunk/ceph/ebofs/BlockDevice.h b/trunk/ceph/ebofs/BlockDevice.h index 25adf62606947..18f639f7176b6 100644 --- a/trunk/ceph/ebofs/BlockDevice.h +++ b/trunk/ceph/ebofs/BlockDevice.h @@ -143,6 +143,13 @@ class BlockDevice { BarrierQueue(BlockDevice *bd, const char *d) : bdev(bd), dev(d) { barrier(); } + ~BarrierQueue() { + for (list::iterator p = qls.begin(); + p != qls.end(); + ++p) + delete *p; + qls.clear(); + } int size() { // this isn't perfectly accurate. if (!qls.empty()) diff --git a/trunk/ceph/ebofs/Table.h b/trunk/ceph/ebofs/Table.h index e6b3fb39660e4..f16e506a9dd63 100644 --- a/trunk/ceph/ebofs/Table.h +++ b/trunk/ceph/ebofs/Table.h @@ -666,8 +666,9 @@ class Table { assert(cursor.open[cursor.level].size() == 0); assert(depth == 1); root = -1; - depth = 0; - pool.release(cursor.open[0].node); + depth = 0; + if (cursor.open[0].node) + pool.release(cursor.open[0].node); } verify("remove 1"); return 0; diff --git a/trunk/ceph/fakesyn.cc b/trunk/ceph/fakesyn.cc index dca78939b9282..d4fc63a4cbba8 100644 --- a/trunk/ceph/fakesyn.cc +++ b/trunk/ceph/fakesyn.cc @@ -31,9 +31,6 @@ using namespace std; #include "common/Timer.h" -#define NUMMDS g_conf.num_mds -#define NUMOSD g_conf.num_osd -#define NUMCLIENT g_conf.num_client class C_Test : public Context { public: @@ -97,9 +94,9 @@ int main(int argc, char **argv) } // create mds - MDS *mds[NUMMDS]; - OSD *mdsosd[NUMMDS]; - for (int i=0; iinit(); } - for (int i=0; iinit(); if (g_conf.mds_local_osd) mdsosd[i]->init(); } - for (int i=0; iinit(); } // create client(s) - for (int i=0; iinit(); // use my argc, argv (make sure you pass a mount point!) @@ -158,7 +155,7 @@ int main(int argc, char **argv) } - for (int i=0; ijoin_thread(); @@ -174,13 +171,16 @@ int main(int argc, char **argv) fakemessenger_wait(); // cleanup - for (int i=0; ifinish(0); + delete onfinish; + onfinish = 0; + return true; + } + class C_GatherSub : public Context { C_Gather *gather; int num; public: C_GatherSub(C_Gather *g, int n) : gather(g), num(n) {} void finish(int r) { - gather->finish(num); + if (gather->sub_finish(num)) + delete gather; // last one! } }; + Context *new_sub() { + num++; + waitfor.insert(num); + return new C_GatherSub(this, num); + } + private: Context *onfinish; std::set waitfor; int num; public: - C_Gather(Context *f) : onfinish(f), num(0) {} - + C_Gather(Context *f) : onfinish(f), num(0) { + //cout << "C_Gather new " << this << endl; + } + ~C_Gather() { + //cout << "C_Gather delete " << this << endl; + assert(!onfinish); + } void finish(int r) { - assert(waitfor.count(r)); - waitfor.erase(r); - if (waitfor.empty()) { - onfinish->finish(0); - delete onfinish; - } + // nobody should ever call me. + assert(0); } - Context *new_sub() { - num++; - waitfor.insert(num); - return new C_GatherSub(this, num); - } }; #endif diff --git a/trunk/ceph/mds/MDS.cc b/trunk/ceph/mds/MDS.cc index 298c50bf5ac16..2c0cf8973f06d 100644 --- a/trunk/ceph/mds/MDS.cc +++ b/trunk/ceph/mds/MDS.cc @@ -120,6 +120,10 @@ MDS::~MDS() { if (anchormgr) { delete anchormgr; anchormgr = NULL; } if (anchorclient) { delete anchorclient; anchorclient = NULL; } if (osdmap) { delete osdmap; osdmap = 0; } + if (mdsmap) { delete mdsmap; mdsmap = 0; } + + if (server) { delete server; server = 0; } + if (locker) { delete locker; locker = 0; } if (filer) { delete filer; filer = 0; } if (objecter) { delete objecter; objecter = 0; } diff --git a/trunk/ceph/mon/MonitorStore.cc b/trunk/ceph/mon/MonitorStore.cc index 8ac98631a0c54..55389973c8c6a 100644 --- a/trunk/ceph/mon/MonitorStore.cc +++ b/trunk/ceph/mon/MonitorStore.cc @@ -39,7 +39,9 @@ void MonitorStore::mount() if (g_conf.use_abspaths) { // combine it with the cwd, in case fuse screws things up (i.e. fakefuse) string old = dir; - dir = get_current_dir_name(); + char *cwd = get_current_dir_name(); + dir = cwd; + delete cwd; dir += "/"; dir += old; } diff --git a/trunk/ceph/mon/OSDMonitor.cc b/trunk/ceph/mon/OSDMonitor.cc index 43ec4eddf2eca..fe9d54b189de6 100644 --- a/trunk/ceph/mon/OSDMonitor.cc +++ b/trunk/ceph/mon/OSDMonitor.cc @@ -399,6 +399,8 @@ void OSDMonitor::handle_osd_boot(MOSDBoot *m) << (osdmap.osds.size() - osdmap.osd_inst.size()) << " osds to boot" << endl; } + + delete m; return; } diff --git a/trunk/ceph/msg/FakeMessenger.cc b/trunk/ceph/msg/FakeMessenger.cc index d2db8c8f7e11c..2aa6c6b06b75b 100644 --- a/trunk/ceph/msg/FakeMessenger.cc +++ b/trunk/ceph/msg/FakeMessenger.cc @@ -143,7 +143,6 @@ int fakemessenger_do_loop_2() dout(18) << "messenger " << mgr << " at " << mgr->get_myname() << " has " << mgr->num_incoming() << " queued" << endl; - if (!mgr->is_ready()) { dout(18) << "messenger " << mgr << " at " << mgr->get_myname() << " has no dispatcher, skipping" << endl; it++; @@ -247,7 +246,11 @@ FakeMessenger::FakeMessenger(entity_name_t me) : Messenger(me) FakeMessenger::~FakeMessenger() { - + // hose any undelivered messages + for (list::iterator p = incoming.begin(); + p != incoming.end(); + ++p) + delete *p; } @@ -258,15 +261,6 @@ int FakeMessenger::shutdown() assert(directory.count(_myinst.addr) == 1); shutdown_set.insert(_myinst.addr); - /* - directory.erase(myaddr); - if (directory.empty()) { - dout(1) << "fakemessenger: last shutdown" << endl; - ::fm_shutdown = true; - cond.Signal(); // why not - } - */ - /* if (loggers[myaddr]) { delete loggers[myaddr]; @@ -302,42 +296,34 @@ int FakeMessenger::send_message(Message *m, entity_inst_t inst, int port, int fr lock.Lock(); - // deliver - try { #ifdef LOG_MESSAGES - // stats - loggers[get_myaddr()]->inc("+send",1); - loggers[dest]->inc("-recv",1); - - char s[20]; - sprintf(s,"+%s", m->get_type_name()); - loggers[get_myaddr()]->inc(s); - sprintf(s,"-%s", m->get_type_name()); - loggers[dest]->inc(s); + // stats + loggers[get_myaddr()]->inc("+send",1); + loggers[dest]->inc("-recv",1); + + char s[20]; + sprintf(s,"+%s", m->get_type_name()); + loggers[get_myaddr()]->inc(s); + sprintf(s,"-%s", m->get_type_name()); + loggers[dest]->inc(s); #endif - // queue - FakeMessenger *dm = directory[inst.addr]; - if (!dm) { - dout(1) << "** destination " << inst << " dne" << endl; - for (map::iterator p = directory.begin(); - p != directory.end(); - ++p) { - dout(1) << "** have " << p->first << " to " << p->second << endl; - } - //assert(dm); - } - dm->queue_incoming(m); - + // queue + if (directory.count(inst.addr)) { dout(1) << "--> " << get_myname() << " -> " << inst.name << " " << *m << endl; - - } - catch (...) { - cout << "no destination " << dest << endl; - assert(0); + directory[inst.addr]->queue_incoming(m); + } else { + dout(0) << "--> " << get_myname() << " -> " << inst.name << " " << *m + << " *** destination DNE ***" << endl; + for (map::iterator p = directory.begin(); + p != directory.end(); + ++p) { + dout(0) << "** have " << p->first << " to " << p->second << endl; + } + //assert(dm); + delete m; } - // wake up loop? if (!awake) { dout(10) << "waking up fakemessenger thread" << endl; diff --git a/trunk/ceph/osd/OSD.cc b/trunk/ceph/osd/OSD.cc index 0aa0e707fbd0c..058692fab3fc0 100644 --- a/trunk/ceph/osd/OSD.cc +++ b/trunk/ceph/osd/OSD.cc @@ -739,8 +739,9 @@ void OSD::ms_handle_failure(Message *m, const entity_inst_t& inst) if (dest.is_osd()) { // failed osd. drop message, report to mon. int mon = monmap->pick_mon(); - dout(0) << "ms_handle_failure " << dest << " inst " << inst + dout(0) << "ms_handle_failure " << inst << ", dropping and reporting to mon" << mon + << " " << *m << endl; messenger->send_message(new MOSDFailure(inst, osdmap->get_epoch()), monmap->get_inst(mon)); @@ -748,15 +749,16 @@ void OSD::ms_handle_failure(Message *m, const entity_inst_t& inst) } else if (dest.is_mon()) { // resend to a different monitor. int mon = monmap->pick_mon(true); - dout(0) << "ms_handle_failure " << dest << " inst " << inst + dout(0) << "ms_handle_failure " << inst << ", resending to mon" << mon + << " " << *m << endl; messenger->send_message(m, monmap->get_inst(mon)); } else { // client? - dout(0) << "ms_handle_failure " << dest << " inst " << inst - << ", dropping" << endl; + dout(0) << "ms_handle_failure " << inst + << ", dropping " << *m << endl; delete m; } } diff --git a/trunk/ceph/osdc/ObjectCacher.cc b/trunk/ceph/osdc/ObjectCacher.cc index cb8db645f49e7..0933675ae2880 100644 --- a/trunk/ceph/osdc/ObjectCacher.cc +++ b/trunk/ceph/osdc/ObjectCacher.cc @@ -96,50 +96,7 @@ void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right) dout(10) << "merge_left result " << *left << endl; } -/* buggy possibly, but more importnatly, unnecessary. -void ObjectCacher::Object::merge_right(BufferHead *left, BufferHead *right) -{ - assert(left->end() == right->start()); - assert(left->get_state() == right->get_state()); - - dout(10) << "merge_right " << *left << " + " << *right << endl; - oc->bh_remove(this, left); - oc->bh_stat_sub(right); - data.erase(right->start()); - right->set_start( left->start() ); - data[right->start()] = right; - right->set_length( left->length() + right->length()); - oc->bh_stat_add(right); - - // data - bufferlist nbl; - nbl.claim(left->bl); - nbl.claim_append(right->bl); - right->bl.claim(nbl); - - // version - // note: this is sorta busted, but should only be used for dirty buffers - right->last_write_tid = MAX( left->last_write_tid, right->last_write_tid ); - - // waiters - map > old; - old.swap(right->waitfor_read); - // take left's waiters - right->waitfor_read.swap(left->waitfor_read); - - // shift old waiters - for (map >::iterator p = old.begin(); - p != old.end(); - p++) - right->waitfor_read[p->first + left->length()].swap( p->second ); - - // hose left - delete left; - - dout(10) << "merge_right result " << *right << endl; -} -*/ /* * map a range of bytes into buffer_heads. @@ -374,8 +331,25 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(Objecter::OSDWrite *wr #define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_objectcacher) cout << g_clock.now() << " " << objecter->messenger->get_myname() << ".objectcacher " + /* private */ +void ObjectCacher::close_object(Object *ob) +{ + dout(10) << "close_object " << *ob << endl; + assert(ob->can_close()); + + // ok! + objects.erase(ob->get_oid()); + objects_by_ino[ob->get_ino()].erase(ob); + if (objects_by_ino[ob->get_ino()].empty()) + objects_by_ino.erase(ob->get_ino()); + delete ob; +} + + + + void ObjectCacher::bh_read(BufferHead *bh) { dout(7) << "bh_read on " << *bh << endl; @@ -817,6 +791,9 @@ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish) } dout(10) << "readx result is " << rd->bl->length() << endl; + // done with read. + delete rd; + trim(); return pos; @@ -1258,6 +1235,26 @@ bool ObjectCacher::set_is_dirty_or_committing(inodeno_t ino) } +// purge. non-blocking. violently removes dirty buffers from cache. +void ObjectCacher::purge(Object *ob) +{ + dout(10) << "purge " << *ob << endl; + + for (map::iterator p = ob->data.begin(); + p != ob->data.end(); + p++) { + BufferHead *bh = p->second; + dout(0) << "purge forcibly removing " << *bh << endl; + bh_remove(ob, bh); + delete bh; + } + + if (ob->can_close()) { + dout(10) << "trim trimming " << *ob << endl; + close_object(ob); + } +} + // flush. non-blocking. no callback. // true if clean, already flushed. // false if we wrote something. @@ -1366,6 +1363,24 @@ bool ObjectCacher::commit_set(inodeno_t ino, Context *onfinish) return false; } +void ObjectCacher::purge_set(inodeno_t ino) +{ + if (objects_by_ino.count(ino) == 0) { + dout(10) << "purge_set on " << ino << " dne" << endl; + return; + } + + dout(10) << "purge_set " << ino << endl; + + set& s = objects_by_ino[ino]; + for (set::iterator i = s.begin(); + i != s.end(); + i++) { + Object *ob = *i; + purge(ob); + } +} + off_t ObjectCacher::release(Object *ob) { @@ -1384,8 +1399,15 @@ off_t ObjectCacher::release(Object *ob) for (list::iterator p = clean.begin(); p != clean.end(); - p++) + p++) { bh_remove(ob, *p); + delete *p; + } + + if (ob->can_close()) { + dout(10) << "trim trimming " << *ob << endl; + close_object(ob); + } return o_unclean; } @@ -1402,7 +1424,7 @@ off_t ObjectCacher::release_set(inodeno_t ino) dout(10) << "release_set " << ino << endl; - set& s = objects_by_ino[ino]; + set s = objects_by_ino[ino]; for (set::iterator i = s.begin(); i != s.end(); i++) { diff --git a/trunk/ceph/osdc/ObjectCacher.h b/trunk/ceph/osdc/ObjectCacher.h index 27b154023209d..e9a4041008666 100644 --- a/trunk/ceph/osdc/ObjectCacher.h +++ b/trunk/ceph/osdc/ObjectCacher.h @@ -133,6 +133,9 @@ class ObjectCacher { last_write_tid(0), last_ack_tid(0), last_commit_tid(0), lock_state(LOCK_NONE), wrlock_ref(0), rdlock_ref(0) {} + ~Object() { + assert(data.empty()); + } object_t get_oid() { return oid; } inodeno_t get_ino() { return ino; } @@ -227,16 +230,7 @@ class ObjectCacher { objects_by_ino[ino].insert(o); return o; } - void close_object(Object *ob) { - assert(ob->can_close()); - - // ok! - objects.erase(ob->get_oid()); - objects_by_ino[ob->get_ino()].erase(ob); - if (objects_by_ino[ob->get_ino()].empty()) - objects_by_ino.erase(ob->get_ino()); - delete ob; - } + void close_object(Object *ob); // bh stats Cond stat_cond; @@ -315,18 +309,22 @@ class ObjectCacher { void bh_add(Object *ob, BufferHead *bh) { ob->add_bh(bh); - if (bh->is_dirty()) + if (bh->is_dirty()) { lru_dirty.lru_insert_top(bh); - else + dirty_bh.insert(bh); + } else { lru_rest.lru_insert_top(bh); + } bh_stat_add(bh); } void bh_remove(Object *ob, BufferHead *bh) { ob->remove_bh(bh); - if (bh->is_dirty()) + if (bh->is_dirty()) { lru_dirty.lru_remove(bh); - else + dirty_bh.erase(bh); + } else { lru_rest.lru_remove(bh); + } bh_stat_sub(bh); } @@ -339,6 +337,7 @@ class ObjectCacher { bool flush(Object *o); off_t release(Object *o); + void purge(Object *o); void rdlock(Object *o); void rdunlock(Object *o); @@ -413,10 +412,17 @@ class ObjectCacher { flusher_thread.create(); } ~ObjectCacher() { - //lock.Lock(); // hmm.. watch out for deadlock! + // we should be empty. + assert(objects.empty()); + assert(lru_rest.lru_get_size() == 0); + assert(lru_dirty.lru_get_size() == 0); + assert(dirty_bh.empty()); + + assert(flusher_thread.is_started()); + lock.Lock(); // hmm.. watch out for deadlock! flusher_stop = true; flusher_cond.Signal(); - //lock.Unlock(); + lock.Unlock(); flusher_thread.join(); } @@ -457,6 +463,8 @@ class ObjectCacher { bool commit_set(inodeno_t ino, Context *oncommit); void commit_all(Context *oncommit=0); + void purge_set(inodeno_t ino); + off_t release_set(inodeno_t ino); // returns # of bytes not released (ie non-clean) void kick_sync_writers(inodeno_t ino); diff --git a/trunk/ceph/osdc/Objecter.cc b/trunk/ceph/osdc/Objecter.cc index ddd22325747db..9e49a43ace89b 100644 --- a/trunk/ceph/osdc/Objecter.cc +++ b/trunk/ceph/osdc/Objecter.cc @@ -269,27 +269,31 @@ tid_t Objecter::stat_submit(OSDStat *st) ObjectExtent &ex = st->extents.front(); PG &pg = get_pg( ex.pgid ); - // send + // pick tid last_tid++; assert(client_inc >= 0); - MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid, - ex.oid, ex.pgid, osdmap->get_epoch(), - OSD_OP_STAT); + + // add to gather set + st->tid = last_tid; + op_stat[last_tid] = st; + + pg.active_tids.insert(last_tid); + + // send? dout(10) << "stat_submit " << st << " tid " << last_tid << " oid " << ex.oid << " pg " << ex.pgid << " osd" << pg.acker() << endl; - if (pg.acker() >= 0) + if (pg.acker() >= 0) { + MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid, + ex.oid, ex.pgid, osdmap->get_epoch(), + OSD_OP_STAT); + messenger->send_message(m, osdmap->get_inst(pg.acker())); + } - // add to gather set - st->tid = last_tid; - op_stat[last_tid] = st; - - pg.active_tids.insert(last_tid); - return last_tid; } @@ -381,14 +385,17 @@ tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex) // find OSD PG &pg = get_pg( ex.pgid ); - // send + // pick tid last_tid++; assert(client_inc >= 0); - MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid, - ex.oid, ex.pgid, osdmap->get_epoch(), - OSD_OP_READ); - m->set_length(ex.length); - m->set_offset(ex.start); + + // add to gather set + rd->ops[last_tid] = ex; + op_read[last_tid] = rd; + + pg.active_tids.insert(last_tid); + + // send? dout(10) << "readx_submit " << rd << " tid " << last_tid << " oid " << ex.oid << " " << ex.start << "~" << ex.length << " (" << ex.buffer_extents.size() << " buffer fragments)" @@ -396,15 +403,16 @@ tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex) << " osd" << pg.acker() << endl; - if (pg.acker() >= 0) + if (pg.acker() >= 0) { + MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid, + ex.oid, ex.pgid, osdmap->get_epoch(), + OSD_OP_READ); + m->set_length(ex.length); + m->set_offset(ex.start); + messenger->send_message(m, osdmap->get_inst(pg.acker())); + } - // add to gather set - rd->ops[last_tid] = ex; - op_read[last_tid] = rd; - - pg.active_tids.insert(last_tid); - return last_tid; } @@ -641,43 +649,13 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid) // find PG &pg = get_pg( ex.pgid ); - // send + // pick tid tid_t tid; if (usetid > 0) tid = usetid; else tid = ++last_tid; - MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, tid, - ex.oid, ex.pgid, osdmap->get_epoch(), - wr->op); - m->set_length(ex.length); - m->set_offset(ex.start); - m->set_rev(ex.rev); - - if (wr->tid_version.count(tid)) - m->set_version(wr->tid_version[tid]); // we're replaying this op! - - // what type of op? - switch (wr->op) { - case OSD_OP_WRITE: - { - // map buffer segments into this extent - // (may be fragmented bc of striping) - bufferlist cur; - for (map::iterator bit = ex.buffer_extents.begin(); - bit != ex.buffer_extents.end(); - bit++) { - bufferlist thisbit; - thisbit.substr_of(((OSDWrite*)wr)->bl, bit->first, bit->second); - cur.claim_append(thisbit); - } - assert(cur.length() == ex.length); - m->set_data(cur);//.claim(cur); - } - break; - } - // add to gather set wr->waitfor_ack[tid] = ex; wr->waitfor_commit[tid] = ex; @@ -687,15 +665,46 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid) ++num_unacked; ++num_uncommitted; - // send + // send? dout(10) << "modifyx_submit " << MOSDOp::get_opname(wr->op) << " tid " << tid << " oid " << ex.oid << " " << ex.start << "~" << ex.length << " pg " << ex.pgid << " osd" << pg.primary() << endl; - if (pg.primary() >= 0) + if (pg.primary() >= 0) { + MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, tid, + ex.oid, ex.pgid, osdmap->get_epoch(), + wr->op); + m->set_length(ex.length); + m->set_offset(ex.start); + m->set_rev(ex.rev); + + if (wr->tid_version.count(tid)) + m->set_version(wr->tid_version[tid]); // we're replaying this op! + + // what type of op? + switch (wr->op) { + case OSD_OP_WRITE: + { + // map buffer segments into this extent + // (may be fragmented bc of striping) + bufferlist cur; + for (map::iterator bit = ex.buffer_extents.begin(); + bit != ex.buffer_extents.end(); + bit++) { + bufferlist thisbit; + thisbit.substr_of(((OSDWrite*)wr)->bl, bit->first, bit->second); + cur.claim_append(thisbit); + } + assert(cur.length() == ex.length); + m->set_data(cur);//.claim(cur); + } + break; + } + messenger->send_message(m, osdmap->get_inst(pg.primary())); + } dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << endl; -- 2.39.5