From 742c3bf618315fbf0cf3e02a7cdf9d929e4b2803 Mon Sep 17 00:00:00 2001 From: sage Date: Mon, 30 Jan 2006 20:16:27 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@580 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/TODO | 7 +- ceph/config.cc | 4 ++ ceph/ebofs/BlockDevice.cc | 139 +++++++++++++++++++++++++----------- ceph/ebofs/BlockDevice.h | 6 +- ceph/ebofs/types.h | 9 ++- ceph/include/interval_set.h | 15 ++++ ceph/osd/FakeStore.h | 59 +++++++-------- ceph/osd/OBFSStore.cc | 134 ++++++++++++++++++++++++++++++++++ ceph/osd/OBFSStore.h | 100 ++++++++++++++++++++------ 9 files changed, 376 insertions(+), 97 deletions(-) diff --git a/ceph/TODO b/ceph/TODO index 5c9732614980b..ce6ae71574572 100644 --- a/ceph/TODO +++ b/ceph/TODO @@ -7,6 +7,7 @@ client bdev - multiple io_threads.. need block-range locks + - threads may need to sleep.. need to be woken up, etc.. ebofs - combine inodes into same blocks @@ -27,11 +28,13 @@ osd - 'dirty' log on primary? - fast recovery from degraded mode - -ardos - watch osd utilization; adjust cluster map +lazy posix +- softstat, etc. + + cluster issues - general problem: how to do posix ordering on object boundaries using an object store diff --git a/ceph/config.cc b/ceph/config.cc index cb227854e83c5..3ac22d654513f 100644 --- a/ceph/config.cc +++ b/ceph/config.cc @@ -323,6 +323,10 @@ void parse_config_options(vector& args) else if (strcmp(args[i], "--osd_maxthreads") == 0) g_conf.osd_maxthreads = atoi(args[++i]); + else if (strcmp(args[i], "--bdev_iothreads") == 0) + g_conf.bdev_iothreads = atoi(args[++i]); + + else { nargs.push_back(args[i]); } diff --git a/ceph/ebofs/BlockDevice.cc b/ceph/ebofs/BlockDevice.cc index 249b3d54f3840..4d3c49ab0ce2d 100644 --- a/ceph/ebofs/BlockDevice.cc +++ b/ceph/ebofs/BlockDevice.cc @@ -19,7 +19,7 @@ #include #undef dout -#define dout(x) if (x <= g_conf.debug_bdev) cout << "dev." +#define dout(x) if (x <= g_conf.debug_bdev) cout << "bdev(" << dev << ")." inline ostream& operator<<(ostream& out, BlockDevice::biovec &bio) @@ -53,7 +53,9 @@ int BlockDevice::io_thread_entry() lock.Lock(); int whoami = io_threads_started++; - dout(10) << "io_thread" << whoami << " start" << endl; + io_threads_running++; + assert(io_threads_running <= g_conf.bdev_iothreads); + dout(10) << "io_thread" << whoami << " start, " << io_threads_running << " now running" << endl; if (whoami == 0) { // i'm the first one starting... el_dir_forward = true; @@ -67,9 +69,15 @@ int BlockDevice::io_thread_entry() assert(fd > 0); while (!io_stop) { - // queue? - if (!io_queue.empty()) { - + + bool at_end = false; + bool do_sleep = false; + + // queue empty? + if (io_queue.empty()) { + // sleep + do_sleep = true; + } else { // go until (we) reverse dout(20) << "io_thread" << whoami << " going" << endl; @@ -78,26 +86,45 @@ int BlockDevice::io_thread_entry() multimap::iterator i; if (el_dir_forward) { i = io_queue.lower_bound(el_pos); - if (i == io_queue.end()) break; + if (i == io_queue.end()) { + at_end = true; + break; + } } else { i = io_queue.upper_bound(el_pos); - if (i == io_queue.begin()) break; + if (i == io_queue.begin()) { + at_end = true; + break; + } i--; // and back down one (to get i <= pos) } // merge contiguous ops + block_t start, length; list biols; char type = i->second->type; int n = 0; // count eventual iov's for readv/writev - el_pos = i->first; + start = el_pos = i->first; + length = 0; while (el_pos == i->first && type == i->second->type) { // while (contiguous) biovec *bio = i->second; + // ok? + if (io_block_lock.intersects(bio->start, bio->length)) { + dout(20) << "io_thread" << whoami << " dequeue " << bio->start << "~" << bio->length + << " intersects block_lock " << io_block_lock << endl; + break; // go to sleep, or go with what we've got so far + } + + // add to biols int nv = bio->bl.buffers().size(); // how many iov's in this bio's bufferlist? if (n + nv >= g_conf.bdev_iov_max) break; n += nv; + start = MIN(start, bio->start); + length += bio->length; + if (el_dir_forward) { dout(20) << "io_thread" << whoami << " fw dequeue io at " << el_pos << " " << *i->second << endl; biols.push_back(bio); // at back @@ -106,69 +133,98 @@ int BlockDevice::io_thread_entry() biols.push_front(bio); // at front } + // next bio? multimap::iterator prev; - bool stop = false; if (el_dir_forward) { el_pos += bio->length; // cont. next would start right after us prev = i; i++; - if (i == io_queue.end()) stop = true; + if (i == io_queue.end()) { + at_end = true; + } } else { prev = i; if (i == io_queue.begin()) { - stop = true; + at_end = true; } else { i--; // cont. would start before us... if (i->first + i->second->length == el_pos) el_pos = i->first; // yep, it does! } } - + // dequeue io_queue_map.erase(bio); io_queue.erase(prev); - if (stop) break; + if (at_end) break; } - // drop lock to do the io - lock.Unlock(); - do_io(fd, biols); - lock.Lock(); + if (biols.empty()) { + // failed to dequeue a do-able op, sleep for now + assert(io_threads_running > 1); + do_sleep = true; + break; + } + + { // lock blocks + assert(start == biols.front()->start); + io_block_lock.insert(start, length); + + // drop lock to do the io + lock.Unlock(); + do_io(fd, biols); + lock.Lock(); + + // unlock blocks + io_block_lock.erase(start, length); + } + if ((int)io_queue.size() > io_threads_running) // someone might have blocked on our block_lock + io_wakeup.SignalAll(); + utime_t now = g_clock.now(); if (now > el_stop) break; } - // reverse? - if (g_conf.bdev_el_bidir) { - dout(20) << "io_thread" << whoami << " reversing" << endl; - el_dir_forward = !el_dir_forward; - } + if (at_end) { + at_end = false; + // reverse? + if (g_conf.bdev_el_bidir) { + dout(20) << "io_thread" << whoami << " reversing" << endl; + el_dir_forward = !el_dir_forward; + } - // reset disk pointers, timers - el_stop = g_clock.now(); - if (el_dir_forward) { - el_pos = 0; - utime_t max(0, 1000*g_conf.bdev_el_fw_max_ms); // (s,us), convert ms -> us! - el_stop += max; - dout(20) << "io_thread" << whoami << " forward sweep for " << max << endl; - } else { - el_pos = num_blocks; - utime_t max(0, 1000*g_conf.bdev_el_bw_max_ms); // (s,us), convert ms -> us! - el_stop += max; - dout(20) << "io_thread" << whoami << " reverse sweep for " << max << endl; + // reset disk pointers, timers + el_stop = g_clock.now(); + if (el_dir_forward) { + el_pos = 0; + utime_t max(0, 1000*g_conf.bdev_el_fw_max_ms); // (s,us), convert ms -> us! + el_stop += max; + dout(20) << "io_thread" << whoami << " forward sweep for " << max << endl; + } else { + el_pos = num_blocks; + utime_t max(0, 1000*g_conf.bdev_el_bw_max_ms); // (s,us), convert ms -> us! + el_stop += max; + dout(20) << "io_thread" << whoami << " reverse sweep for " << max << endl; + } } - - } else { + } + + if (do_sleep) { + do_sleep = false; // sleep - dout(20) << "io_thread" << whoami << " sleeping" << endl; + io_threads_running--; + dout(20) << "io_thread" << whoami << " sleeping, " << io_threads_running << " threads now running" << endl; io_wakeup.Wait(lock); - dout(20) << "io_thread" << whoami << " woke up" << endl; + io_threads_running++; + assert(io_threads_running <= g_conf.bdev_iothreads); + dout(20) << "io_thread" << whoami << " woke up, " << io_threads_running << " threads now running" << endl; } } ::close(fd); + io_threads_running--; lock.Unlock(); @@ -284,12 +340,12 @@ void BlockDevice::_submit_io(biovec *b) dout(15) << "_submit_io " << *b << endl; // wake up io_thread(s)? - if (io_queue.empty()) + if ((int)io_queue.size() == io_threads_running) io_wakeup.SignalOne(); - else + else if ((int)io_queue.size() > io_threads_running) io_wakeup.SignalAll(); - // check for overlapping ios + // [DEBUG] check for overlapping ios { // BUG: this doesn't catch everything! eg 1~10000000 will be missed.... multimap::iterator p = io_queue.lower_bound(b->start); @@ -307,6 +363,7 @@ void BlockDevice::_submit_io(biovec *b) // queue anew io_queue.insert(pair(b->start, b)); io_queue_map[b] = b->start; + } int BlockDevice::_cancel_io(biovec *bio) diff --git a/ceph/ebofs/BlockDevice.h b/ceph/ebofs/BlockDevice.h index 17b7134fea055..0823f9510ee3b 100644 --- a/ceph/ebofs/BlockDevice.h +++ b/ceph/ebofs/BlockDevice.h @@ -2,6 +2,7 @@ #define __EBOFS_BLOCKDEVICE_H #include "include/bufferlist.h" +#include "include/interval_set.h" #include "include/Context.h" #include "common/Mutex.h" #include "common/Cond.h" @@ -50,11 +51,12 @@ class BlockDevice { friend ostream& operator<<(ostream& out, biovec &bio); + interval_set io_block_lock; // blocks currently dispatched to kernel multimap io_queue; map io_queue_map; Cond io_wakeup; bool io_stop; - int io_threads_started; + int io_threads_started, io_threads_running; void _submit_io(biovec *b); int _cancel_io(biovec *bio); @@ -103,7 +105,7 @@ class BlockDevice { public: BlockDevice(char *d) : dev(d), fd(0), num_blocks(0), - io_stop(false), io_threads_started(0), + io_stop(false), io_threads_started(0), io_threads_running(0), el_dir_forward(true), el_pos(0), complete_thread(this) { }; diff --git a/ceph/ebofs/types.h b/ceph/ebofs/types.h index ed1f213241638..b7ccd8e55dc0c 100644 --- a/ceph/ebofs/types.h +++ b/ceph/ebofs/types.h @@ -14,9 +14,12 @@ using namespace std; using namespace __gnu_cxx; -#define MIN(a,b) ((a)<=(b) ? (a):(b)) -#define MAX(a,b) ((a)>=(b) ? (a):(b)) - +#ifndef MIN +# define MIN(a,b) ((a)<=(b) ? (a):(b)) +#endif +#ifndef MAX +# define MAX(a,b) ((a)>=(b) ? (a):(b)) +#endif class C_Cond : public Context { diff --git a/ceph/include/interval_set.h b/ceph/include/interval_set.h index f837f52a1d80a..3c269e40743f8 100644 --- a/ceph/include/interval_set.h +++ b/ceph/include/interval_set.h @@ -6,6 +6,13 @@ #include using namespace std; +#ifndef MIN +# define MIN(a,b) ((a)<=(b) ? (a):(b)) +#endif +#ifndef MAX +# define MAX(a,b) ((a)>=(b) ? (a):(b)) +#endif + template class interval_set { @@ -84,6 +91,14 @@ class interval_set { if (p->first+p->second < start+len) return false; return true; } + bool intersects(T start, T len) const { + interval_set a; + a.insert(start, len); + interval_set i; + i.intersection_of( *this, a ); + if (i.empty()) return false; + return true; + } // outer range of set bool empty() const { diff --git a/ceph/osd/FakeStore.h b/ceph/osd/FakeStore.h index 21afd80519bcf..418e40a7b378e 100644 --- a/ceph/osd/FakeStore.h +++ b/ceph/osd/FakeStore.h @@ -14,40 +14,41 @@ using namespace __gnu_cxx; // fake attributes in memory, if we need to. -class FakeAttrSet { - public: - map attrs; - - int getattr(const char *name, void *value, size_t size) { - if (attrs.count(name)) { - size_t l = attrs[name].length(); - if (l > size) l = size; - bufferlist bl; - bl.append(attrs[name]); - bl.copy(0, l, (char*)value); - return l; - } - return -1; - } - - int setattr(const char *name, void *value, size_t size) { - bufferptr bp(new buffer((char*)value,size)); - attrs[name] = bp; - return 0; - } - - int listattr(char *attrs, size_t size) { - assert(0); - } - - bool empty() { return attrs.empty(); } -}; - class FakeStore : public ObjectStore { string basedir; int whoami; + + class FakeAttrSet { + public: + map attrs; + + int getattr(const char *name, void *value, size_t size) { + if (attrs.count(name)) { + size_t l = attrs[name].length(); + if (l > size) l = size; + bufferlist bl; + bl.append(attrs[name]); + bl.copy(0, l, (char*)value); + return l; + } + return -1; + } + + int setattr(const char *name, void *value, size_t size) { + bufferptr bp(new buffer((char*)value,size)); + attrs[name] = bp; + return 0; + } + + int listattr(char *attrs, size_t size) { + assert(0); + } + + bool empty() { return attrs.empty(); } + }; + Mutex lock; hash_map fakeoattrs; hash_map fakecattrs; diff --git a/ceph/osd/OBFSStore.cc b/ceph/osd/OBFSStore.cc index 7e6cad2db9658..3262ad4c99964 100644 --- a/ceph/osd/OBFSStore.cc +++ b/ceph/osd/OBFSStore.cc @@ -182,3 +182,137 @@ int OBFSStore::write(object_t oid, size_t len, } + +// ------------------ +// attributes + +int OBFSStore::setattr(object_t oid, const char *name, + void *value, size_t size) +{ + lock.Lock(); + int r = fakeoattrs[oid].setattr(name, value, size); + lock.Unlock(); + return r; +} + + +int OBFSStore::getattr(object_t oid, const char *name, + void *value, size_t size) +{ + lock.Lock(); + int r = fakeoattrs[oid].getattr(name, value, size); + lock.Unlock(); + return r; +} + +int OBFSStore::listattr(object_t oid, char *attrs, size_t size) +{ + lock.Lock(); + int r = fakeoattrs[oid].listattr(attrs,size); + lock.Unlock(); + return r; +} + +int OBFSStore::list_collections(list& ls) +{ + lock.Lock(); + int r = 0; + for (hash_map< coll_t, set >::iterator p = fakecollections.begin(); + p != fakecollections.end(); + p++) { + r++; + ls.push_back(p->first); + } + lock.Unlock(); + return r; +} + +int OBFSStore::create_collection(coll_t c) +{ + lock.Lock(); + fakecollections[c].size(); + lock.Unlock(); + return 0; +} + +int OBFSStore::destroy_collection(coll_t c) +{ + int r = 0; + lock.Lock(); + if (fakecollections.count(c)) { + fakecollections.erase(c); + fakecattr.erase(c); + } else + r = -1; + lock.Unlock(); + return r; +} + +int OBFSStore::collection_stat(coll_t c, struct stat *st) +{ + +} + +bool OBFSStore::collection_exists(coll_t c) +{ + lock.Lock(); + int r = fakecollections.count(c); + lock.Unlock(); + return r; +} + +int OBFSStore::collection_add(coll_t c, object_t o) +{ + lock.Lock(); + fakecollections[c].insert(o); + lock.Unlock(); + return 0; +} + +int OBFSStore::collection_remove(coll_t c, object_t o) +{ + lock.Lock(); + fakecollections[c].erase(o); + lock.Unlock(); + return 0; +} + +int OBFSStore::collection_list(coll_t c, list& o) +{ + lock.Lock(); + int r = 0; + for (set::iterator p = fakecollections[c].begin(); + p != collectsion[c].end(); + p++) { + o.push_back(*p); + r++; + } + lock.Unlock(); + return r; +} + +int OBFSStore::collection_setattr(coll_t c, const char *name, + void *value, size_t size) +{ + lock.Lock(); + int r = fakecattr[c].setattr(name, value, size); + lock.Unlock(); + return r; +} + +int OBFSStore::collection_getattr(coll_t c, const char *name, + void *value, size_t size) +{ + lock.Lock(); + int r = fakecattr[c].getattr(name,value,size); + lock.Unlock(); + return r; +} + +int OBFSStore::collection_listattr(coll_t c, char *attrs, size_t size) +{ + lock.Lock(); + int r = fakecattr[c].listattr(attrs,size); + lock.Unlock(); + return r; +} diff --git a/ceph/osd/OBFSStore.h b/ceph/osd/OBFSStore.h index 4466d4a593d14..f588c807039c9 100644 --- a/ceph/osd/OBFSStore.h +++ b/ceph/osd/OBFSStore.h @@ -4,32 +4,92 @@ #include "ObjectStore.h" -class OBFSStore: public ObjectStore { - int whoami; - int bdev_id; - int mounted; - char dev[128]; - char param[128]; +class OBFSStore : public ObjectStore { - public: - OBFSStore(int whoami, char *param, char *dev); + class FakeAttrSet { + public: + map attrs; + + int getattr(const char *name, void *value, size_t size) { + if (attrs.count(name)) { + size_t l = attrs[name].length(); + if (l > size) l = size; + bufferlist bl; + bl.append(attrs[name]); + bl.copy(0, l, (char*)value); + return l; + } + return -1; + } + + int setattr(const char *name, void *value, size_t size) { + bufferptr bp(new buffer((char*)value,size)); + attrs[name] = bp; + return 0; + } + + int listattr(char *attrs, size_t size) { + assert(0); + } + + bool empty() { return attrs.empty(); } + }; - int mount(void); - int umount(void); - int mkfs(void); + Mutex lock; + hash_map fakeoattrs; + hash_map fakecattrs; + hash_map > fakecollections; - bool exists(object_t oid); - int stat(object_t oid, struct stat *st); + int whoami; + int bdev_id; + int mounted; + char dev[128]; + char param[128]; + + public: + OBFSStore(int whoami, char *param, char *dev); + + int mount(void); + int umount(void); + int mkfs(void); + + bool exists(object_t oid); + int stat(object_t oid, struct stat *st); + + int remove(object_t oid); + int truncate(object_t oid, off_t size); + + int read(object_t oid, size_t len, + off_t offset, char *buffer); + int write(object_t oid, size_t len, + off_t offset,char *buffer, + bool fsync); + + // faked attrs + int setattr(object_t oid, const char *name, + void *value, size_t size); + int getattr(object_t oid, const char *name, + void *value, size_t size); + int listattr(object_t oid, char *attrs, size_t size); - int remove(object_t oid); - int truncate(object_t oid, off_t size); - int read(object_t oid, size_t len, - off_t offset, char *buffer); - int write(object_t oid, size_t len, - off_t offset,char *buffer, - bool fsync); + // faked collections + int list_collections(list& ls); + int create_collection(coll_t c); + int destroy_collection(coll_t c); + int collection_stat(coll_t c, struct stat *st); + bool collection_exists(coll_t c); + int collection_add(coll_t c, object_t o); + int collection_remove(coll_t c, object_t o); + int collection_list(coll_t c, list& o); + int collection_setattr(coll_t c, const char *name, + void *value, size_t size); + int collection_getattr(coll_t c, const char *name, + void *value, size_t size); + int collection_listattr(coll_t c, char *attrs, size_t size); + + }; #endif -- 2.39.5