From eba9b2d2ff625ff0f04ba2b1434d874016c0bc19 Mon Sep 17 00:00:00 2001 From: sage Date: Mon, 2 Jan 2006 19:20:09 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@565 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/TODO | 34 +++++++--------- ceph/common/ThreadPool.h | 88 ++++++++++++++++++++-------------------- ceph/config.cc | 8 ++-- ceph/osd/OSD.cc | 62 +++++++++++++++++++--------- ceph/osd/OSD.h | 6 ++- 5 files changed, 111 insertions(+), 87 deletions(-) diff --git a/ceph/TODO b/ceph/TODO index e9945d1a69945..f9f931f5cfebe 100644 --- a/ceph/TODO +++ b/ceph/TODO @@ -5,13 +5,14 @@ client ebofs -- full fs +- combine inodes into same blocks - zero regions? - journaling -- combine inodes into same blocks +- clone() + osd -- unordered ops +- thread queue + object locks... need to be ordered! - rep ops trickyness? - ardos validation @@ -26,6 +27,11 @@ osd - fast recovery from degraded mode +ardos +- watch osd utilization; adjust cluster map + + + cluster issues - general problem: how to do posix ordering on object boundaries using an object store @@ -51,13 +57,6 @@ cluster issues -fakestore -- sync notification is totally broken, use '--fake_osd_sync 1' for now - - -osd foo - - REPLICATION @@ -67,7 +66,6 @@ requirements - - interactive hash/unhash interface - test hashed readdir - make logstream.flush align itself to stipes @@ -79,16 +77,12 @@ requirements KNOWN MDS BUGS to fix after fast -- redo mds capabilities code ... distribute!! -- fix softlock, stat +- posix extensions + - softstat - implement truncate() for real -- hard links! - - - -finish HARD LINKS -- reclaim danglers from inode file on discover... -- fix rename +- finish hard links! + - reclaim danglers from inode file on discover... + - fix rename wrt hard links diff --git a/ceph/common/ThreadPool.h b/ceph/common/ThreadPool.h index beeefa4a53927..f82f2fd711654 100644 --- a/ceph/common/ThreadPool.h +++ b/ceph/common/ThreadPool.h @@ -10,8 +10,8 @@ // debug output #include "config.h" -#define tpdout(x) if (x <= g_conf.debug) cout << myname << " " -#define DBLVL 10 +#define tpdout(x) if (x <= g_conf.debug) cout << myname +#define DBLVL 15 using namespace std; @@ -25,11 +25,14 @@ class ThreadPool { queue q; Mutex q_lock; Semaphore q_sem; - pthread_t *thread; + int num_ops; int num_threads; - void (*func)(U*,T*); + vector thread; + U *u; + void (*func)(U*,T*); + void (*prefunc)(U*,T*); string myname; static void *foo(void *arg) @@ -39,21 +42,17 @@ class ThreadPool { return 0; } - void * do_ops(void *nothing) + void *do_ops(void *nothing) { - T* op; - - tpdout(DBLVL) << "Thread "<< pthread_self() << " ready for action\n"; - while(1) { + tpdout(DBLVL) << ".do_ops starting " << pthread_self() << endl; + while (1) { q_sem.Get(); - op = get_op(); - - if(op == NULL) { - tpdout(DBLVL) << "Thread exiting\n"; - //pthread_exit(0); + if (q.empty()) { + tpdout(DBLVL) << ".do_ops thread exiting" << pthread_self() << endl; return 0; // like this, i think! } - tpdout(DBLVL) << "Thread "<< pthread_self() << " calling the function on " << op << endl; + T *op = get_op(); + tpdout(DBLVL) << ".func thread "<< pthread_self() << " on " << op << endl; func(u, op); } return 0; @@ -63,11 +62,17 @@ class ThreadPool { T* get_op() { T* op; - q_lock.Lock(); - op = q.front(); - q.pop(); - num_ops--; + { + op = q.front(); + q.pop(); + num_ops--; + + if (prefunc && op) { + tpdout(DBLVL) << ".prefunc thread "<< pthread_self() << " on " << op << endl; + prefunc(u, op); + } + } q_lock.Unlock(); return op; @@ -75,40 +80,35 @@ class ThreadPool { public: - ThreadPool(char *myname, int howmany, void (*f)(U*,T*), U *obj) - { - int status; - - this->myname = myname; - u = obj; - num_ops = 0; - func = f; - num_threads = howmany; - thread = new pthread_t[num_threads]; - - for(int i = 0; i < howmany; i++) { + ThreadPool(char *myname, int howmany, void (*f)(U*,T*), U *obj, void (*pf)(U*,T*) = 0) : + num_ops(0), num_threads(howmany), + thread(num_threads), + u(obj), + func(f), prefunc(pf), + myname(myname) { + // start threads + int status; + for(int i = 0; i < howmany; i++) { status = pthread_create(&thread[i], NULL, (void*(*)(void *))&ThreadPool::foo, this); + assert(status == 0); } } - - ~ThreadPool() - { - // put null ops to make threads exit cleanly - for(int i = 0; i < num_threads; i++) - put_op(0); - + + ~ThreadPool() { + // bump sem to make threads exit cleanly + for(int i = 0; i < num_threads; i++) + q_sem.Put(); + // wait for them to die for(int i = 0; i < num_threads; i++) { - tpdout(DBLVL) << "Joining thread " << i << "\n"; + tpdout(DBLVL) << ".des joining thread " << thread[i] << endl; void *rval = 0; // we don't actually care pthread_join(thread[i], &rval); } - delete[] thread; } - - void put_op(T* op) - { - tpdout(DBLVL) << "put_op " << op << endl; + + void put_op(T* op) { + tpdout(DBLVL) << ".put_op " << op << endl; q_lock.Lock(); q.push(op); num_ops++; diff --git a/ceph/config.cc b/ceph/config.cc index b987c1e93bfed..66bddc4055b68 100644 --- a/ceph/config.cc +++ b/ceph/config.cc @@ -112,7 +112,7 @@ md_config_t g_conf = { osd_max_rep: 4, osd_fsync: true, osd_writesync: false, - osd_maxthreads: 0, // 0 == no threading! + osd_maxthreads: 1, // 0 == no threading! osd_mkfs: false, osd_fakestore_syncthreads: 4, @@ -122,8 +122,8 @@ md_config_t g_conf = { ebofs_commit_interval: 2, // seconds. 0 = no timeout (for debugging/tracing) ebofs_oc_size: 1000, ebofs_cc_size: 1000, - ebofs_bc_size: (150 *256), // measured in 4k blocks, or *256 for MB - ebofs_bc_max_dirty: (110 *256), // before write() will wait for data to flush + ebofs_bc_size: (15 *256), // 4k blocks, or *256 for MB + ebofs_bc_max_dirty: (10 *256), // before write() will block ebofs_abp_zero: false, ebofs_abp_max_alloc: 4096*32, @@ -289,6 +289,8 @@ void parse_config_options(vector& args) else if (strcmp(args[i], "--ebofs") == 0) g_conf.ebofs = 1; + else if (strcmp(args[i], "--fakestore") == 0) + g_conf.ebofs = 0; else if (strcmp(args[i], "--osd_mkfs") == 0) g_conf.osd_mkfs = atoi(args[++i]); diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index fbf7946d39627..072f00f94a136 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -139,7 +139,10 @@ OSD::OSD(int id, Messenger *m) { char name[80]; sprintf(name,"osd%d.threadpool", whoami); - threadpool = new ThreadPool(name, g_conf.osd_maxthreads, (void (*)(OSD*, MOSDOp*))doop, this); + threadpool = new ThreadPool(name, g_conf.osd_maxthreads, + static_doop, + this, + static_dequeueop); } } @@ -1460,7 +1463,7 @@ void OSD::pull_replica(PG *pg, object_t oid) void OSD::op_rep_pull(MOSDOp *op) { long got = 0; - lock_object(op->get_oid()); + //lock_object(op->get_oid()); { dout(7) << "rep_pull on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl; @@ -1490,7 +1493,7 @@ void OSD::op_rep_pull(MOSDOp *op) messenger->send_message(reply, op->get_asker()); } - unlock_object(op->get_oid()); + //unlock_object(op->get_oid()); delete op; logger->inc("r_pull"); @@ -1618,7 +1621,7 @@ void OSD::push_replica(PG *pg, object_t oid) void OSD::op_rep_push(MOSDOp *op) { - lock_object(op->get_oid()); + //lock_object(op->get_oid()); { dout(7) << "rep_push on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl; @@ -1654,7 +1657,7 @@ void OSD::op_rep_push(MOSDOp *op) messenger->send_message(reply, op->get_asker()); } - unlock_object(op->get_oid()); + //unlock_object(op->get_oid()); delete op; } @@ -1748,7 +1751,7 @@ void OSD::remove_replica(PG *pg, object_t oid) void OSD::op_rep_remove(MOSDOp *op) { - lock_object(op->get_oid()); + //lock_object(op->get_oid()); { dout(7) << "rep_remove on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl; @@ -1768,7 +1771,7 @@ void OSD::op_rep_remove(MOSDOp *op) messenger->send_message(new MOSDOpReply(op, r, osdmap, true), op->get_asker()); } - unlock_object(op->get_oid()); + //unlock_object(op->get_oid()); delete op; } @@ -1829,7 +1832,7 @@ void OSD::op_rep_modify(MOSDOp *op) // when we introduce unordered messaging.. FIXME object_t oid = op->get_oid(); - lock_object(oid); + //lock_object(oid); { version_t ov = 0; if (store->exists(oid)) @@ -1874,7 +1877,7 @@ void OSD::op_rep_modify(MOSDOp *op) delete op; } } - unlock_object(oid); + //unlock_object(oid); } @@ -2003,23 +2006,42 @@ void OSD::handle_op(MOSDOp *op) pending_ops++; if (g_conf.osd_maxthreads < 1) { osd_lock.Unlock(); - do_op(op); // or, just do it now + { + dequeue_op(op); + do_op(op); + } osd_lock.Lock(); } else { - threadpool->put_op(op); + osd_lock.Unlock(); // because put_op might block, bc threadpool may be calling dequeue_op w/ q_lock + { + threadpool->put_op(op); + } + osd_lock.Lock(); } } -void doop(OSD *u, MOSDOp *p) { - u->do_op(p); + +/* + * called serially (but in worker thread) as items are dequeued from the threadpool + */ +void OSD::dequeue_op(MOSDOp *op) +{ + dout(12) << "dequeue_op " << op << endl; + lock_object(op->get_oid()); } +/* + * called asynchronously by worker thread after items are dequeued + */ void OSD::do_op(MOSDOp *op) { dout(12) << "do_op " << op << endl; logger->inc("op"); + object_t oid = op->get_oid(); + //lock_object(oid); // dequeue_op does this now + // replication ops? if (OSD_OP_IS_REP(op->get_op())) { // replication/recovery @@ -2063,6 +2085,8 @@ void OSD::do_op(MOSDOp *op) } } + unlock_object(oid); + //dout(12) << "finish op " << op << endl; // finish @@ -2100,7 +2124,7 @@ void OSD::wait_for_no_ops() void OSD::op_read(MOSDOp *op) { object_t oid = op->get_oid(); - lock_object(oid); + //lock_object(oid); { // read into a buffer bufferlist bl; @@ -2130,14 +2154,14 @@ void OSD::op_read(MOSDOp *op) // send it messenger->send_message(reply, op->get_asker()); } - unlock_object(oid); + //unlock_object(oid); delete op; } void OSD::op_stat(MOSDOp *op) { object_t oid = op->get_oid(); - lock_object(oid); + //lock_object(oid); { struct stat st; memset(&st, sizeof(st), 0); @@ -2151,7 +2175,7 @@ void OSD::op_stat(MOSDOp *op) logger->inc("stat"); } - unlock_object(oid); + //unlock_object(oid); delete op; } @@ -2283,7 +2307,7 @@ void OSD::op_modify(MOSDOp *op) if (op->get_op() == OSD_OP_DELETE) opname = "op_delete"; if (op->get_op() == OSD_OP_TRUNCATE) opname = "op_truncate"; - lock_object(oid); + //lock_object(oid); { // version? clean? version_t ov = 0; // 0 == dne (yet) @@ -2348,7 +2372,7 @@ void OSD::op_modify(MOSDOp *op) messenger->send_message(reply, op->get_asker()); } } - unlock_object(oid); + //unlock_object(oid); } diff --git a/ceph/osd/OSD.h b/ceph/osd/OSD.h index 3969e183b15b1..ddb188b981caf 100644 --- a/ceph/osd/OSD.h +++ b/ceph/osd/OSD.h @@ -93,9 +93,13 @@ class OSD : public Dispatcher { public: void do_op(class MOSDOp *m); - static void doop(OSD *o, MOSDOp *op) { + static void static_doop(OSD *o, MOSDOp *op) { o->do_op(op); }; + void dequeue_op(class MOSDOp *m); + static void static_dequeueop(OSD *o, MOSDOp *op) { + o->dequeue_op(op); + }; protected: -- 2.39.5