From 1d0de0d749c1e619fe49f6f6b019179d921aba53 Mon Sep 17 00:00:00 2001 From: sage Date: Sat, 29 Apr 2006 16:00:18 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@751 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/Makefile | 1 + ceph/common/Timer.cc | 52 +++------- ceph/msg/TCPMessenger.cc | 4 +- ceph/osdc/Filer.h | 36 ++++--- ceph/osdc/ObjectCacher.cc | 31 ++++++ ceph/osdc/ObjectCacher.h | 196 ++++++++++++++++++++++++++++++-------- 6 files changed, 226 insertions(+), 94 deletions(-) create mode 100644 ceph/osdc/ObjectCacher.cc diff --git a/ceph/Makefile b/ceph/Makefile index b79a9af125077..17f036519e0ca 100644 --- a/ceph/Makefile +++ b/ceph/Makefile @@ -53,6 +53,7 @@ COMMON_OBJS= \ msg/Dispatcher.o\ msg/HostMonitor.o\ osd/Filer.o\ + osd/ObjectCacher.o\ osd/Objecter.o\ osd/OSDMap.o\ mds/MDCluster.o\ diff --git a/ceph/common/Timer.cc b/ceph/common/Timer.cc index 99cda93fcb196..317ae5adbd044 100644 --- a/ceph/common/Timer.cc +++ b/ceph/common/Timer.cc @@ -20,8 +20,6 @@ #include "config.h" #include "include/Context.h" -//#include "msg/Messenger.h" - #undef dout #define dout(x) if (x <= g_conf.debug) cout << "Timer: " @@ -34,10 +32,6 @@ // single global instance Timer g_timer; -//Context *messenger_kicker = 0; -//Messenger *messenger = 0; - - /**** thread solution *****/ @@ -65,18 +59,14 @@ void Timer::timer_entry() utime_t t = it->first; dout(DBL) << "queueing event(s) scheduled at " << t << endl; - /*if (messenger) { - for (multiset::iterator cit = it->second.begin(); - cit != it->second.end(); - cit++) { - pending.push_back(*cit); - event_times.erase(*cit); - num_event--; - } + for (multiset::iterator cit = it->second.begin(); + cit != it->second.end(); + cit++) { + pending.push_back(*cit); + event_times.erase(*cit); + num_event--; } - */ - //pending[t] = it->second; map< utime_t, multiset >::iterator previt = it; it++; scheduled.erase(previt); @@ -86,17 +76,14 @@ void Timer::timer_entry() sleeping = false; lock.Unlock(); { // make sure we're not holding any locks while we do callbacks (or talk to the messenger) - if (1) { - // make the callbacks myself. - for (list::iterator cit = pending.begin(); - cit != pending.end(); - cit++) - (*cit)->finish(0); - pending.clear(); - } else { - // give them to the messenger - //messenger->queue_callbacks(pending); + // make the callbacks myself. + for (list::iterator cit = pending.begin(); + cit != pending.end(); + cit++) { + dout(DBL) << "doing callback " << *cit << endl; + (*cit)->finish(0); } + pending.clear(); assert(pending.empty()); } lock.Lock(); @@ -133,19 +120,6 @@ void Timer::timer_entry() * Timer bits */ -/* -void Timer::set_messenger(Messenger *m) -{ - dout(10) << "set messenger " << m << endl; - messenger = m; -} -void Timer::unset_messenger() -{ - dout(10) << "unset messenger" << endl; - messenger = 0; -} -*/ - void Timer::register_timer() { if (timer_thread.is_started()) { diff --git a/ceph/msg/TCPMessenger.cc b/ceph/msg/TCPMessenger.cc index f91f7072427b0..67df8b8622729 100644 --- a/ceph/msg/TCPMessenger.cc +++ b/ceph/msg/TCPMessenger.cc @@ -1268,7 +1268,7 @@ TCPMessenger::TCPMessenger(msg_addr_t myaddr) : // register to execute timer events //g_timer.set_messenger_kicker(new C_TCPKicker()); - g_timer.set_messenger(this); + // g_timer.set_messenger(this); } @@ -1365,7 +1365,7 @@ int TCPMessenger::shutdown() //pthread_t whoami = pthread_self(); // no more timer events - g_timer.unset_messenger(); + //g_timer.unset_messenger(); // close incoming sockets //void *r; diff --git a/ceph/osdc/Filer.h b/ceph/osdc/Filer.h index d94fdbac17dbf..7bc7698259afe 100644 --- a/ceph/osdc/Filer.h +++ b/ceph/osdc/Filer.h @@ -40,6 +40,7 @@ using namespace __gnu_cxx; #include "OSDMap.h" #include "Objecter.h" +#include "ObjectCacher.h" class Context; class Messenger; @@ -71,11 +72,11 @@ class Filer { file_to_extents(inode, len, offset, rd->extents); // cacheless async? - if (oc == 0) + if (1 || oc == 0) return objecter->readx(rd, onfinish); - // write me - + // use cache + oc->readx(rd, inode.ino, onfinish); return 0; } @@ -90,11 +91,11 @@ class Filer { file_to_extents(inode, len, offset, wr->extents); // cacheles async? - if (oc == 0) + if (1 || oc == 0) return objecter->writex(wr, onack, oncommit); - // write me - + // use cache + oc->writex(wr, inode.ino, onack, oncommit); return 0; } @@ -121,23 +122,28 @@ class Filer { int atomic_sync_read(inode_t& inode, size_t len, off_t offset, - bufferlist& bl) { - assert(oc); - - // write me + bufferlist *bl) { + Objecter::OSDRead *rd = new Objecter::OSDRead(bl); + file_to_extents(inode, len, offset, rd->extents); - return 0; + assert(oc); + int r = oc->atomic_sync_readx(rd, inode.ino, + 0); // block. + return r; } int atomic_sync_write(inode_t& inode, size_t len, off_t offset, bufferlist& bl, Context *oncommit) { - assert(oc); - - // write me + Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl); + file_to_extents(inode, len, offset, wr->extents); - return 0; + assert(oc); + int r = oc->atomic_sync_writex(wr, inode.ino, + 0, // block + oncommit); + return r; } diff --git a/ceph/osdc/ObjectCacher.cc b/ceph/osdc/ObjectCacher.cc new file mode 100644 index 0000000000000..74546a9cd11ae --- /dev/null +++ b/ceph/osdc/ObjectCacher.cc @@ -0,0 +1,31 @@ + +#include "ObjectCacher.h" + + + +int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish) +{ + assert(0); + return 0; +} + +int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino, Context *onack, Context *oncommit) +{ + assert(0); + return 0; +} + + +// blocking. atomic+sync. +int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish) +{ + assert(0); + return 0; +} + +int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Context *onack, Context *oncommit) +{ + assert(0); + return 0; +} + diff --git a/ceph/osdc/ObjectCacher.h b/ceph/osdc/ObjectCacher.h index 6d6918f6f8de5..99339d85cb66d 100644 --- a/ceph/osdc/ObjectCacher.h +++ b/ceph/osdc/ObjectCacher.h @@ -1,65 +1,185 @@ +#ifndef __OBJECTCACHER_H_ +#define __OBJECTCACHER_H_ +#include "include/types.h" +#include "include/lru.h" +#include "common/Cond.h" -class ObjectCacher { - Objecter *objecter; +#include "Objecter.h" +class Objecter; +class Objecter::OSDRead; +class Objecter::OSDWrite; +class ObjectCacher { + private: + Objecter *objecter; + + // ******* Object ********* class Object { + public: - class BufferHead { + // ******* Object::BufferHead ********* + class BufferHead : public LRUObject { public: - const int CLEAN = 1; - const int DIRTY = 2; - const int RX = 3; - const int TX = 4; + // states + static const int STATE_MISSING = 0; + static const int STATE_CLEAN = 1; + static const int STATE_DIRTY = 2; + static const int STATE_RX = 3; + static const int STATE_TX = 4; + + // my fields int state; + int ref; - }; + version_t version; // version of object (if non-zero) + bufferlist bl; - map bh_map; + map > waitfor_read; - class Lock { - public: - const int NONE = 0; - const int WRLOCK = 1; - //const int RDLOCK = 2; - - int state; + size_t length() { return bl.length(); } - Lock() : state(NONE) {} + // states + void set_state(int s) { + if (s == STATE_RX || s == STATE_TX) get(); + if (state == STATE_RX || state == STATE_TX) put(); + state = s; + } + int get_state() { return state; } + + bool is_missing() { return state == STATE_MISSING; } + bool is_dirty() { return state == STATE_DIRTY; } + bool is_clean() { return state == STATE_CLEAN; } + bool is_tx() { return state == STATE_TX; } + bool is_rx() { return state == STATE_RX; } + + // reference counting + int get() { + assert(ref >= 0); + if (ref == 0) lru_pin(); + return ++ref; + } + int put() { + assert(ref > 0); + if (ref == 1) lru_unpin(); + --ref; + return ref; + } + + BufferHead() : + state(STATE_MISSING), + ref(0), + version(0) {} }; + // ObjectCacher::Object fields + object_t oid; + inodeno_t ino; + + map bh_map; + + Object(object_t o, inodeno_t i) : oid(o), ino(i) {} }; + // ObjectCacher fields + hash_map objects; + + set dirty_bh; + LRU lru_dirty, lru_rest; + + + // bh stats + Cond stat_cond; + int stat_waiter; + + off_t stat_clean; + off_t stat_dirty; + off_t stat_rx; + off_t stat_tx; + off_t stat_partial; + off_t stat_missing; + + void bh_stat_add(Object::BufferHead *bh) { + switch (bh->get_state()) { + case Object::BufferHead::STATE_MISSING: stat_missing += bh->length(); break; + case Object::BufferHead::STATE_CLEAN: stat_clean += bh->length(); break; + case Object::BufferHead::STATE_DIRTY: stat_dirty += bh->length(); break; + case Object::BufferHead::STATE_TX: stat_tx += bh->length(); break; + case Object::BufferHead::STATE_RX: stat_rx += bh->length(); break; + } + if (stat_waiter) stat_cond.Signal(); + } + void bh_stat_sub(Object::BufferHead *bh) { + switch (bh->get_state()) { + case Object::BufferHead::STATE_MISSING: stat_missing -= bh->length(); break; + case Object::BufferHead::STATE_CLEAN: stat_clean -= bh->length(); break; + case Object::BufferHead::STATE_DIRTY: stat_dirty -= bh->length(); break; + case Object::BufferHead::STATE_TX: stat_tx -= bh->length(); break; + case Object::BufferHead::STATE_RX: stat_rx -= bh->length(); break; + } + } + off_t get_stat_tx() { return stat_tx; } + off_t get_stat_rx() { return stat_rx; } + off_t get_stat_dirty() { return stat_dirty; } + off_t get_stat_clean() { return stat_clean; } + off_t get_stat_partial() { return stat_partial; } + + // bh states + void bh_set_state(Object::BufferHead *bh, int s) { + // move between lru lists? + if (s == Object::BufferHead::STATE_DIRTY && bh->get_state() != Object::BufferHead::STATE_DIRTY) { + lru_rest.lru_remove(bh); + lru_dirty.lru_insert_top(bh); + dirty_bh.insert(bh); + } + if (s != Object::BufferHead::STATE_DIRTY && bh->get_state() == Object::BufferHead::STATE_DIRTY) { + lru_dirty.lru_remove(bh); + lru_rest.lru_insert_mid(bh); + dirty_bh.erase(bh); + } + + // set state + bh_stat_sub(bh); + bh->set_state(s); + bh_stat_add(bh); + } + + void copy_bh_state(Object::BufferHead *bh1, Object::BufferHead *bh2) { + bh_set_state(bh2, bh1->get_state()); + } - - int map_read(OSDRead *rd); - int map_write(OSDWrite *wr); + void mark_missing(Object::BufferHead *bh) { bh_set_state(bh, Object::BufferHead::STATE_MISSING); }; + void mark_clean(Object::BufferHead *bh) { bh_set_state(bh, Object::BufferHead::STATE_CLEAN); }; + void mark_rx(Object::BufferHead *bh) { bh_set_state(bh, Object::BufferHead::STATE_RX); }; + void mark_tx(Object::BufferHead *bh) { bh_set_state(bh, Object::BufferHead::STATE_TX); }; + void mark_dirty(Object::BufferHead *bh) { + bh_set_state(bh, Object::BufferHead::STATE_DIRTY); + //bh->set_dirty_stamp(g_clock.now()); + }; - void flush(set& objects); - void flush_all(); - void commit(set& objects); - void commit_all(); + int map_read(Objecter::OSDRead *rd); + int map_write(Objecter::OSDWrite *wr); + public: + // blocking. async. + int readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish); + int writex(Objecter::OSDWrite *wr, inodeno_t ino, Context *onack, Context *oncommit); + // blocking. atomic+sync. + int atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish); + int atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Context *onack, Context *oncommit); + + void flush_set(inodeno_t ino, Context *onfinish=0); + void flush_all(Context *onfinish=0); + + void commit_set(inodeno_t ino, Context *oncommit=0); + void commit_all(Context *oncommit=0); }; -/* -// sync write (correct) -Filer->atomic_sync_write(); - map - ObjectCache->atomic_sync_writex(...); // blocks until sync write happens, or i get write locks - -// async write -Filer->write(); - map - ObjectCache->writex(...); // non-blocking. update cache. - or - map - Objecter->writex(...); // non-blocking. no cache. (MDS) -*/ +#endif -- 2.39.5