From ebeb7c7616e42a71a353f71b7c3f786546337759 Mon Sep 17 00:00:00 2001 From: sage Date: Thu, 3 Aug 2006 19:59:26 +0000 Subject: [PATCH] objectcacher fixes mostly git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@786 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/TODO | 1 + ceph/client/Client.cc | 1 + ceph/client/SyntheticClient.cc | 13 ++++ ceph/client/SyntheticClient.h | 1 + ceph/common/Cond.h | 2 +- ceph/config.cc | 7 ++ ceph/config.h | 3 + ceph/mds/Capability.h | 2 +- ceph/messages/MOSDOp.h | 2 + ceph/osd/OSD.cc | 12 +++ ceph/osdc/ObjectCacher.cc | 135 +++++++++++++++++++++++---------- ceph/osdc/ObjectCacher.h | 3 +- ceph/osdc/Objecter.cc | 5 +- 13 files changed, 142 insertions(+), 45 deletions(-) diff --git a/ceph/TODO b/ceph/TODO index 238912544e39e..9747f883145b5 100644 --- a/ceph/TODO +++ b/ceph/TODO @@ -90,6 +90,7 @@ remaining hard problems osd/rados /- add caller to log /- make modify ops idempotent +- rdlocks - pg_bit changes - use pg->info.same_role_since wrt replication ops. - report crashed pgs? diff --git a/ceph/client/Client.cc b/ceph/client/Client.cc index 3f9606116aeab..64b3e4b698cfb 100644 --- a/ceph/client/Client.cc +++ b/ceph/client/Client.cc @@ -1545,6 +1545,7 @@ int Client::open(const char *relpath, int mode) f->inode->get(); int new_caps = reply->get_file_caps(); + new_caps &= CAP_FILE_WR|CAP_FILE_RD; assert(reply->get_file_caps_seq() >= f->inode->caps[mds].seq); if (reply->get_file_caps_seq() > f->inode->caps[mds].seq) { diff --git a/ceph/client/SyntheticClient.cc b/ceph/client/SyntheticClient.cc index b64f48cc41c71..84a0dccb5264d 100644 --- a/ceph/client/SyntheticClient.cc +++ b/ceph/client/SyntheticClient.cc @@ -51,6 +51,10 @@ void parse_syn_options(vector& args) syn_modes.push_back( SYNCLIENT_MODE_WRITEFILE ); syn_iargs.push_back( atoi(args[++i]) ); syn_iargs.push_back( atoi(args[++i]) ); + } else if (strcmp(args[i],"wrshared") == 0) { + syn_modes.push_back( SYNCLIENT_MODE_WRSHARED ); + syn_iargs.push_back( atoi(args[++i]) ); + syn_iargs.push_back( atoi(args[++i]) ); } else if (strcmp(args[i],"writebatch") == 0) { syn_modes.push_back( SYNCLIENT_MODE_WRITEBATCH ); syn_iargs.push_back( atoi(args[++i]) ); @@ -383,6 +387,15 @@ int SyntheticClient::run() write_file(sarg1, iarg1, iarg2); } break; + case SYNCLIENT_MODE_WRSHARED: + { + string sarg1 = "shared"; + int iarg1 = iargs.front(); iargs.pop_front(); + int iarg2 = iargs.front(); iargs.pop_front(); + if (run_me()) + write_file(sarg1, iarg1, iarg2); + } + break; case SYNCLIENT_MODE_WRITEBATCH: { int iarg1 = iargs.front(); iargs.pop_front(); diff --git a/ceph/client/SyntheticClient.h b/ceph/client/SyntheticClient.h index b5b22e4faed4b..9606bd3202030 100644 --- a/ceph/client/SyntheticClient.h +++ b/ceph/client/SyntheticClient.h @@ -38,6 +38,7 @@ #define SYNCLIENT_MODE_WRITEFILE 20 #define SYNCLIENT_MODE_READFILE 21 #define SYNCLIENT_MODE_WRITEBATCH 22 +#define SYNCLIENT_MODE_WRSHARED 23 #define SYNCLIENT_MODE_TRACE 30 diff --git a/ceph/common/Cond.h b/ceph/common/Cond.h index 91021b5cc0ded..9f0500106395a 100644 --- a/ceph/common/Cond.h +++ b/ceph/common/Cond.h @@ -119,7 +119,7 @@ public: void finish(int r) { lock->Lock(); if (rval) *rval = r; - *done = false; + *done = true; cond->Signal(); lock->Unlock(); } diff --git a/ceph/config.cc b/ceph/config.cc index daf8909b96a98..0464d26670536 100644 --- a/ceph/config.cc +++ b/ceph/config.cc @@ -71,6 +71,7 @@ md_config_t g_conf = { debug_mds_log: 1, debug_buffer: 0, debug_filer: 0, + debug_objectcacher: 0, debug_client: 0, debug_osd: 0, debug_ebofs: 1, @@ -107,6 +108,7 @@ md_config_t g_conf = { client_oc: false, client_oc_max_dirty: 1024*1024* 100, + client_oc_max_sync_write: 1024*1024, // writes >= this use wrlock /* client_bcache: 0, @@ -380,6 +382,11 @@ void parse_config_options(vector& args) g_conf.debug_filer = atoi(args[++i]); else g_debug_after_conf.debug_filer = atoi(args[++i]); + else if (strcmp(args[i], "--debug_objectcacher") == 0) + if (!g_conf.debug_after) + g_conf.debug_objectcacher = atoi(args[++i]); + else + g_debug_after_conf.debug_objectcacher = atoi(args[++i]); else if (strcmp(args[i], "--debug_client") == 0) if (!g_conf.debug_after) g_conf.debug_client = atoi(args[++i]); diff --git a/ceph/config.h b/ceph/config.h index e770429d00417..620053eebb9ea 100644 --- a/ceph/config.h +++ b/ceph/config.h @@ -45,6 +45,7 @@ struct md_config_t { int debug_mds_log; int debug_buffer; int debug_filer; + int debug_objectcacher; int debug_client; int debug_osd; int debug_ebofs; @@ -81,6 +82,8 @@ struct md_config_t { bool client_oc; int client_oc_max_dirty; + size_t client_oc_max_sync_write; + /* bool client_bcache; diff --git a/ceph/mds/Capability.h b/ceph/mds/Capability.h index 0701a282137ba..067e6a3ce07b4 100644 --- a/ceph/mds/Capability.h +++ b/ceph/mds/Capability.h @@ -111,7 +111,7 @@ public: // conflicts static int conflicts(int from) { int c = 0; - if (from & CAP_FILE_WRBUFFER) c |= CAP_FILE_RDCACHE|CAP_FILE_RD; + if (from & CAP_FILE_WRBUFFER) c |= CAP_FILE_RDCACHE|CAP_FILE_RD;//|CAP_FILE_WRBUFFER|CAP_FILE_WR; if (from & CAP_FILE_WR) c |= CAP_FILE_RDCACHE; if (from & CAP_FILE_RD) c |= CAP_FILE_WRBUFFER; if (from & CAP_FILE_RDCACHE) c |= CAP_FILE_WRBUFFER|CAP_FILE_WR; diff --git a/ceph/messages/MOSDOp.h b/ceph/messages/MOSDOp.h index 35a2e6c8c5cdf..fd9ae0669d7bf 100644 --- a/ceph/messages/MOSDOp.h +++ b/ceph/messages/MOSDOp.h @@ -56,6 +56,8 @@ #define OSD_OP_REP_DELETE (100+OSD_OP_DELETE) #define OSD_OP_REP_WRLOCK (100+OSD_OP_WRLOCK) #define OSD_OP_REP_WRUNLOCK (100+OSD_OP_WRUNLOCK) +#define OSD_OP_REP_RDLOCK (100+OSD_OP_RDLOCK) +#define OSD_OP_REP_RDUNLOCK (100+OSD_OP_RDUNLOCK) #define OSD_OP_REP_PULL 30 // whole object read //#define OSD_OP_REP_PUSH 31 // whole object write diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index 700143ce441ae..55ebd25e61a5b 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -560,6 +560,10 @@ void OSD::handle_op_reply(MOSDOpReply *m) case OSD_OP_REP_WRITE: case OSD_OP_REP_TRUNCATE: case OSD_OP_REP_DELETE: + case OSD_OP_REP_WRLOCK: + case OSD_OP_REP_WRUNLOCK: + case OSD_OP_REP_RDLOCK: + case OSD_OP_REP_RDUNLOCK: { const pg_t pgid = m->get_pg(); if (pg_map.count(pgid)) { @@ -2292,6 +2296,10 @@ void OSD::do_op(MOSDOp *op, PG *pg) case OSD_OP_REP_WRITE: case OSD_OP_REP_TRUNCATE: case OSD_OP_REP_DELETE: + case OSD_OP_REP_WRLOCK: + case OSD_OP_REP_WRUNLOCK: + case OSD_OP_REP_RDLOCK: + case OSD_OP_REP_RDUNLOCK: op_rep_modify(op, pg); break; @@ -2311,6 +2319,10 @@ void OSD::do_op(MOSDOp *op, PG *pg) case OSD_OP_ZERO: case OSD_OP_DELETE: case OSD_OP_TRUNCATE: + case OSD_OP_WRLOCK: + case OSD_OP_WRUNLOCK: + case OSD_OP_RDLOCK: + case OSD_OP_RDUNLOCK: op_modify(op, pg); break; default: diff --git a/ceph/osdc/ObjectCacher.cc b/ceph/osdc/ObjectCacher.cc index f36b4e22b1490..9d498804a07be 100644 --- a/ceph/osdc/ObjectCacher.cc +++ b/ceph/osdc/ObjectCacher.cc @@ -1,9 +1,8 @@ +#include "msg/Messenger.h" #include "ObjectCacher.h" #include "Objecter.h" -#undef dout -#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_client) cout << "ocacher." << pthread_self() << " " /*** ObjectCacher::BufferHead ***/ @@ -11,6 +10,10 @@ /*** ObjectCacher::Object ***/ +#undef dout +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_objectcacher) cout << oc->objecter->messenger->get_myaddr() << ".objectcacher.object(" << hex << oid << dec << ") " + + ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *bh, off_t off) { dout(20) << "split " << *bh << " at " << off << endl; @@ -223,6 +226,7 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(Objecter::OSDWrite *wr final->set_start( cur ); final->set_length( max ); oc->bh_add(this, final); + dout(10) << "map_write adding trailing bh " << *final << endl; } else { final->set_length( final->length() + max ); } @@ -298,8 +302,14 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(Objecter::OSDWrite *wr return final; } + + /*** ObjectCacher ***/ +#undef dout +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_client) cout << objecter->messenger->get_myaddr() << ".objectcacher " + + /* private */ void ObjectCacher::bh_read(Object *ob, BufferHead *bh) @@ -390,6 +400,8 @@ void ObjectCacher::bh_write(Object *ob, BufferHead *bh) oncommit->tid = tid; ob->last_write_tid = tid; bh->last_write_tid = tid; + + mark_tx(bh); } void ObjectCacher::lock_ack(list& oids, tid_t tid) @@ -461,10 +473,10 @@ void ObjectCacher::bh_write_ack(object_t oid, off_t start, size_t length, tid_t // apply to bh's! off_t opos = start; - map::iterator p = ob->data.lower_bound(opos); - while (p != ob->data.end() && - opos < start+length) { + for (map::iterator p = ob->data.lower_bound(opos); + p != ob->data.end() && opos < start+length; + p++) { BufferHead *bh = p->second; if (bh->start() < start && @@ -488,6 +500,7 @@ void ObjectCacher::bh_write_ack(object_t oid, off_t start, size_t length, tid_t // ok! mark bh clean. mark_clean(bh); + dout(10) << "bh_write_ack clean " << *bh << endl; } // update object last_ack. @@ -722,36 +735,49 @@ int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& << " in " << hex << ino << dec << endl; - // sort by object... - map by_oid; - for (list::iterator ex_it = rd->extents.begin(); - ex_it != rd->extents.end(); - ex_it++) - by_oid[ex_it->oid] = *ex_it; - - // lock - for (map::iterator i = by_oid.begin(); - i != by_oid.end(); - i++) { - Object *o = get_object(i->first, ino); - rdlock(o); - } - - // do the read, into our cache - Cond cond; - bool done = false; - readx(rd, ino, new C_SafeCond(&lock, &cond, &done)); + if (rd->extents.size() == 1) { + // single object. + // just write synchronously. + Cond cond; + bool done = false; + objecter->readx(rd, new C_SafeCond(&lock, &cond, &done)); - // block - while (!done) cond.Wait(lock); - - // release the locks - for (list::iterator ex_it = rd->extents.begin(); - ex_it != rd->extents.end(); - ex_it++) { - assert(objects.count(ex_it->oid)); - Object *o = objects[ex_it->oid]; - rdunlock(o); + // block + while (!done) cond.Wait(lock); + } else { + // spans multiple objects. + + // sort by object... + map by_oid; + for (list::iterator ex_it = rd->extents.begin(); + ex_it != rd->extents.end(); + ex_it++) + by_oid[ex_it->oid] = *ex_it; + + // lock + for (map::iterator i = by_oid.begin(); + i != by_oid.end(); + i++) { + Object *o = get_object(i->first, ino); + rdlock(o); + } + + // do the read, into our cache + Cond cond; + bool done = false; + readx(rd, ino, new C_SafeCond(&lock, &cond, &done)); + + // block + while (!done) cond.Wait(lock); + + // release the locks + for (list::iterator ex_it = rd->extents.begin(); + ex_it != rd->extents.end(); + ex_it++) { + assert(objects.count(ex_it->oid)); + Object *o = objects[ex_it->oid]; + rdunlock(o); + } } return 0; @@ -763,13 +789,42 @@ int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mute << " in " << hex << ino << dec << endl; + if (wr->extents.size() == 1 && + wr->extents.front().length <= g_conf.client_oc_max_sync_write) { + // single object. + + // make sure we aren't already locking/locked... + object_t oid = wr->extents.front()->oid; + Object *o = 0; + if (objects.count(oid)) o = get_object(iod, ino); + if (!o || + (o->lock_state != Object::LOCK_WRLOCK && + o->lock_state != Object::LOCK_WRLOCKING && + o->lock_state != Object::LOCK_UPGRADING)) { + // just write synchronously. + dout(10) << "atomic_sync_writex " << wr + << " in " << hex << ino << dec + << " doing sync write" + << endl; + + Cond cond; + bool done = false; + objecter->modifyx(wr, new C_SafeCond(&lock, &cond, &done), 0); + + // block + while (!done) cond.Wait(lock); + return 0; + } + } + + // spans multiple objects. // sort by object... map by_oid; for (list::iterator ex_it = wr->extents.begin(); ex_it != wr->extents.end(); ex_it++) by_oid[ex_it->oid] = *ex_it; - + // wrlock for (map::iterator i = by_oid.begin(); i != by_oid.end(); @@ -791,7 +846,7 @@ int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mute wrunlock(o); } - + return 0; } @@ -810,7 +865,7 @@ void ObjectCacher::rdlock(Object *o) o->lock_state = Object::LOCK_RDLOCKING; C_LockAck *ack = new C_LockAck(this, o->get_oid()); - C_WriteCommit *commit = new C_WriteCommit(this, o->get_oid(), 0, 1); + C_WriteCommit *commit = new C_WriteCommit(this, o->get_oid(), 0, 0); commit->tid = ack->tid = @@ -854,7 +909,7 @@ void ObjectCacher::wrlock(Object *o) } C_LockAck *ack = new C_LockAck(this, o->get_oid()); - C_WriteCommit *commit = new C_WriteCommit(this, o->get_oid(), 0, 1); + C_WriteCommit *commit = new C_WriteCommit(this, o->get_oid(), 0, 0); commit->tid = ack->tid = @@ -899,7 +954,7 @@ void ObjectCacher::rdunlock(Object *o) o->lock_state = Object::LOCK_RDUNLOCKING; C_LockAck *lockack = new C_LockAck(this, o->get_oid()); - C_WriteCommit *commit = new C_WriteCommit(this, o->get_oid(), 0, 1); + C_WriteCommit *commit = new C_WriteCommit(this, o->get_oid(), 0, 0); commit->tid = lockack->tid = o->last_write_tid = @@ -932,7 +987,7 @@ void ObjectCacher::wrunlock(Object *o) } C_LockAck *lockack = new C_LockAck(this, o->get_oid()); - C_WriteCommit *commit = new C_WriteCommit(this, o->get_oid(), 0, 1); + C_WriteCommit *commit = new C_WriteCommit(this, o->get_oid(), 0, 0); commit->tid = lockack->tid = o->last_write_tid = diff --git a/ceph/osdc/ObjectCacher.h b/ceph/osdc/ObjectCacher.h index ce77ab581ac6c..fd93b6f2342cd 100644 --- a/ceph/osdc/ObjectCacher.h +++ b/ceph/osdc/ObjectCacher.h @@ -455,7 +455,8 @@ class ObjectCacher { inline ostream& operator<<(ostream& out, ObjectCacher::BufferHead &bh) { out << "bh[" - << bh.start() << "~" << bh.end() + << bh.start() << "~" << bh.length() + << " (" << bh.bl.length() << ")" << " v " << bh.last_write_tid; if (bh.is_tx()) out << " tx"; if (bh.is_rx()) out << " rx"; diff --git a/ceph/osdc/Objecter.cc b/ceph/osdc/Objecter.cc index 94ac4b4dbd7f4..b6f45dbf38c8e 100644 --- a/ceph/osdc/Objecter.cc +++ b/ceph/osdc/Objecter.cc @@ -432,11 +432,12 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m) // write ------------------------------------ tid_t Objecter::write(object_t oid, off_t off, size_t len, bufferlist &bl, - Context *onack, Context *oncommit) + Context *onack, Context *oncommit) { OSDWrite *wr = new OSDWrite(bl); wr->extents.push_back(ObjectExtent(oid, off, len)); wr->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout ); + wr->extents.front().buffer_extents[0] = len; modifyx(wr, onack, oncommit); return last_tid; } @@ -460,7 +461,7 @@ tid_t Objecter::lock(int op, object_t oid, Context *onack, Context *oncommit) { OSDModify *l = new OSDModify(op); - l->extents.push_back(ObjectExtent(oid, 0, 1)); + l->extents.push_back(ObjectExtent(oid, 0, 0)); l->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout ); modifyx(l, onack, oncommit); return last_tid; -- 2.39.5