From 0ca67f3aeb8f02f2d85de524df5dcc6f589cc833 Mon Sep 17 00:00:00 2001 From: sage Date: Wed, 7 Jul 2004 23:07:29 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@14 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/CDir.cc | 4 +- ceph/LogStream.cc | 95 +++++++++++++++++++++++++++++++++++++++ ceph/MDLog.cc | 66 +++++++++++++++++++++++++++ ceph/MDStore.cc | 63 +++++++++++++++++++++++--- ceph/Makefile | 8 ++-- ceph/OSD.cc | 6 +-- ceph/include/CDir.h | 42 ++++++++++++++++- ceph/include/LogEvent.h | 38 ++++++++++++++++ ceph/include/LogEvent.h~ | 12 +++++ ceph/include/LogStream.h | 46 +++++++++++++++++++ ceph/include/LogStream.h~ | 10 +++++ ceph/include/MDLog.h | 36 +++++++++++++-- ceph/include/MDS.h | 17 ++----- ceph/include/MDStore.h | 9 +++- ceph/include/OSD.h | 6 ++- 15 files changed, 422 insertions(+), 36 deletions(-) create mode 100644 ceph/LogStream.cc create mode 100644 ceph/MDLog.cc create mode 100644 ceph/include/LogEvent.h create mode 100644 ceph/include/LogEvent.h~ create mode 100644 ceph/include/LogStream.h create mode 100644 ceph/include/LogStream.h~ diff --git a/ceph/CDir.cc b/ceph/CDir.cc index a962a08d3ecf0..95f17ded3697c 100644 --- a/ceph/CDir.cc +++ b/ceph/CDir.cc @@ -55,8 +55,10 @@ void CDir::dump(int depth) { iter++; } - if (!complete) + if (!(state & CDIR_MASK_COMPLETE)) cout << ind << "..." << endl; + if (state & CDIR_MASK_DIRTY) + cout << ind << "[dirty]" << endl; } diff --git a/ceph/LogStream.cc b/ceph/LogStream.cc new file mode 100644 index 0000000000000..c8ad7bd3f8ade --- /dev/null +++ b/ceph/LogStream.cc @@ -0,0 +1,95 @@ + +#include "include/LogStream.h" +#include "include/osd.h" +#include "include/LogEvent.h" + +#include +using namespace std; + + +// writing + +int LogStream::append(LogEvent *e, Context *c) +{ + // serialize + char *buf; + size_t buflen; + e->serialize(&buf, &buflen); + + // advance ptr for later + append_pos += buflen; + + // submit write + osd_write(osd, oid, + buflen, append_pos, + buf, + 0, + c); + return 0; +} + + +// reading + +#define READ_INC 1024 // make this bigger than biggest event + +class C_LSReadNext : public Context { + LogStream *ls; + LogEvent **le; + Context *c; +public: + C_LSReadNext(LogStream *ls, LogEvent **le, Context *c) { + this->ls = ls; + this->le = le; + this->c = c; + } + void finish(int result) { + ls->read_next(le,c,1); + } +}; + +int LogStream::read_next(LogEvent **le, Context *c, int step) +{ + if (step == 1) { + // alloc buffer? + if (!buf) { + buf = new char[READ_INC]; + buflen = READ_INC; + buf_start = -1; + } + + // does buffer have what we want? + if (buf_start > cur_pos || + buf_start+buflen < cur_pos+4) { + // nope. re-read a chunk + buf_start = cur_pos; + osd_read(osd, oid, + READ_INC, cur_pos, + buf, new C_LSReadNext(this, le, c)); + return 0; + } + step = 1; + } + + + if (step == 1) { + // decode event + unsigned off = cur_pos-buf_start; + __uint32_t type = *((__uint32_t*)(buf+off)); + switch (type) { + case 1: // string + cout << "it's a string event" << endl; + *le = new LogEvent(buf + off + 8); + break; + + default: + cout << "uh oh, unknown event type " << type << endl; + } + + // finish + if (c) { + c->finish(0); + delete c; + } + } +} diff --git a/ceph/MDLog.cc b/ceph/MDLog.cc new file mode 100644 index 0000000000000..29d99e79da9a0 --- /dev/null +++ b/ceph/MDLog.cc @@ -0,0 +1,66 @@ + +#include "include/MDLog.h" +#include "include/LogStream.h" +#include "include/LogEvent.h" + +// cons/des + +MDLog::MDLog() +{ + num_events = 0; + reader = new LogStream(666,666); + writer = new LogStream(666,666); +} + + + +int MDLog::submit_entry( LogEvent *e, + Context *c ) +{ + // write it + writer->append(e, c); +} + + +class C_MDL_Trim : public Context { +protected: + MDLog *mdl; + Context *con; +public: + LogEvent *le; + + C_MDL_Trim(MDLog *m, Context *c) { + mdl = m; con = c; + } + void finish(int res) { + mdl->trim_2(le, + con); + } +}; + +int MDLog::trim(Context *c) +{ + while (trimming.size() + num_events > max_events) { + // pull off an event + C_MDL_Trim *n = new C_MDL_Trim(this, c); + reader->read_next(&n->le, n); + } +} + +int MDLog::trim_2(LogEvent *e, + Context *c) +{ + /* + if (e.discardable()) { + // discard + + } else { + e.retire(new C_MDL_TrimFinisher(c)); + } + */ + // add to limbo list + trimming.push_back(e); + + // + +} diff --git a/ceph/MDStore.cc b/ceph/MDStore.cc index ff1e2ff1e8b7d..615daca9a60e7 100644 --- a/ceph/MDStore.cc +++ b/ceph/MDStore.cc @@ -61,13 +61,14 @@ bool MDStore::fetch_dir( CInode *in, #endif } -bool MDStore::fetch_dir_2( int result, char *buf, size_t buflen, CInode *dir, Context *c ) +bool MDStore::fetch_dir_2( int result, char *buf, size_t buflen, CInode *dir, Context *c) { cout << "fetch_dir_2" << endl; // parse buffer contents into cache - size_t p = 0; - while (p < buflen) { + __uint32_t num = *((__uint32_t*)buf); + size_t p = 4; + while (p < buflen && num > 0) { // dentry string dname = buf+p; p += dname.length() + 1; @@ -94,6 +95,7 @@ bool MDStore::fetch_dir_2( int result, char *buf, size_t buflen, CInode *dir, Co mds->mdcache->add_inode( in ); mds->mdcache->link_inode( dir, dname, in ); } + num--; } // ok! @@ -118,6 +120,7 @@ class MDCommitDirContext : public Context { MDStore *ms; CInode *in; Context *c; + __uint64_t version; public: char *buf; @@ -129,13 +132,29 @@ class MDCommitDirContext : public Context { this->c = c; buf = 0; buflen = 0; + version = in->dir->get_version(); } MDCommitDirContext() { if (buf) { delete buf; buf = 0; } } void finish(int result) { - ms->commit_dir_2( result, in, c ); + ms->commit_dir_2( result, in, c, version ); + } +}; + + +class MDFetchForCommitContext : public Context { +protected: + MDStore *ms; + CInode *in; + Context *co; +public: + MDFetchForCommitContext(MDStore *m, CInode *i, Context *c) { + ms = m; in = i; co = c; + } + void finish(int result) { + ms->commit_dir( in, co ); } }; @@ -144,13 +163,31 @@ class MDCommitDirContext : public Context { bool MDStore::commit_dir( CInode *in, Context *c ) { + // already committing? + if (in->dir->get_state() & CDIR_MASK_MID_COMMIT) { + // already mid-commit! + cout << "dir already mid-commit" << endl; + return false; + } + + // is it complete? + if (in->dir->get_state() & CDIR_MASK_COMPLETE == 0) { + cout << "dir not complete, fetching first" << endl; + // fetch dir first + Context *fin = new MDFetchForCommitContext(this, in, c); + fetch_dir(in, fin); + return false; + } + + // get continuation ready MDCommitDirContext *fin = new MDCommitDirContext(this, in, c); // buffer fin->buflen = in->dir->serial_size(); fin->buf = new char[fin->buflen]; - size_t off = 0; - + __uint32_t num = 0; + size_t off = sizeof(num); + // fill CDir_map_t::iterator it = in->dir->begin(); while (it != in->dir->end()) { @@ -166,10 +203,13 @@ bool MDStore::commit_dir( CInode *in, off += sizeof(inode_t); it++; + num++; } + *((__uint32_t*)fin->buf) = num; // pin inode in->get(); + in->dir->state_set(CDIR_MASK_MID_COMMIT); // submit to osd int osd = in->inode.ino % 10; @@ -178,14 +218,23 @@ bool MDStore::commit_dir( CInode *in, osd_write( osd, oid, off, 0, fin->buf, + 0, fin ); + return true; } bool MDStore::commit_dir_2( int result, CInode *in, - Context *c ) + Context *c, + __uint64_t committed_version) { + // is the dir now clean? + if (committed_version == in->dir->get_version()) { + in->dir->state_clear(CDIR_MASK_DIRTY); // clear dirty bit + } + in->dir->state_clear(CDIR_MASK_MID_COMMIT); + // unpin inode in->put(); diff --git a/ceph/Makefile b/ceph/Makefile index 4db895ff60eed..baac800962e0d 100644 --- a/ceph/Makefile +++ b/ceph/Makefile @@ -3,10 +3,10 @@ CC=g++ CFLAGS=-D __gnu_cxx=std -g SRCS=*.cc -OBJS=CDentry.o CDir.o CInode.o MDCache.o MDStore.o clock.o mds.o osd.o Messenger.o FakeMessenger.o +OBJS=CDentry.o CDir.o CInode.o MDCache.o MDStore.o clock.o mds.o osd.o Messenger.o FakeMessenger.o LogStream.o MDLog.o +TARGETS=test import - -all: depend test import +all: depend ${TARGETS} test: test.cc pmds.o @@ -15,6 +15,8 @@ test: test.cc pmds.o import: pmds.o import.cc ${CC} ${CFLAGS} pmds.o import.cc -o import +clean: + rm *.o ${TARGETS} %.o: %.cc ${CC} ${CFLAGS} -c $< diff --git a/ceph/OSD.cc b/ceph/OSD.cc index ca06470ba9af9..5873e85419baa 100644 --- a/ceph/OSD.cc +++ b/ceph/OSD.cc @@ -76,15 +76,15 @@ int osd_read_all(int osd, object_t oid, void **bufptr, size_t *buflen, Context * // -- osd_write -int osd_write(int osd, object_t oid, size_t len, size_t offset, void *buf, Context *c) +int osd_write(int osd, object_t oid, size_t len, size_t offset, void *buf, int flags, Context *c) { // fake it char *f = get_filename(osd,oid); - int fd = open(f, O_WRONLY|O_CREAT); + int fd = open(f, O_RDWR|O_CREAT|flags); if (fd < 0 && errno == 2) { // create dir and retry mkdir(get_dir(osd), 0755); cout << "mkdir errno " << errno << " on " << get_dir(osd) << endl; - fd = open(f, O_WRONLY|O_CREAT); + fd = open(f, O_RDWR|O_CREAT|flags); } if (fd < 0) { cout << "err opening " << f << " " << errno << endl; diff --git a/ceph/include/CDir.h b/ceph/include/CDir.h index a6e7296cb85e2..e67f29e4fd230 100644 --- a/ceph/include/CDir.h +++ b/ceph/include/CDir.h @@ -17,6 +17,17 @@ class CInode; class CDentry; class MDS; + +// state bits +#define CDIR_MASK_COMPLETE 1 // the complete contents are in cache +#define CDIR_MASK_COMPLETE_LOCK 2 // complete contents are in cache, and locked that way! (not yet implemented) +#define CDIR_MASK_DIRTY 4 // has been modified since last fetch/commit +#define CDIR_MASK_MID_COMMIT 8 // mid-commit + +// common states +#define CDIR_STATE_CLEAN 0 +#define CDIR_STATE_INITIAL 0 // ? + // CDir typedef map CDir_map_t; @@ -27,15 +38,40 @@ class CDir { CDir_map_t items; // use map; ordered list __uint64_t nitems; size_t namesize; - bool complete; + unsigned state; + __uint64_t version; public: CDir(CInode *in) { inode = in; nitems = 0; - complete = false; + state = CDIR_STATE_INITIAL; + version = 0; + } + + // state + unsigned get_state() { + return state; + } + void set_state(unsigned s) { + state = s; + } + void state_clear(unsigned mask) { + state &= ~mask; + } + void state_set(unsigned mask) { + state |= mask; + } + + // version + __uint64_t get_version() { + return version; + } + void touch_version() { + version++; } + // for storing.. size_t serial_size() { return nitems * (10+sizeof(inode_t)) + namesize; } @@ -47,10 +83,12 @@ class CDir { return items.end(); } + // manipulation void add_child(CDentry *d); void remove_child(CDentry *d); CDentry* lookup(string n); + // debuggin void dump(int d = 0); void dump_to_disk(MDS *m); }; diff --git a/ceph/include/LogEvent.h b/ceph/include/LogEvent.h new file mode 100644 index 0000000000000..82633a9a1af2e --- /dev/null +++ b/ceph/include/LogEvent.h @@ -0,0 +1,38 @@ + +#ifndef __LOGEVENT_H +#define __LOGEVENT_H + +#include +#include +using namespace std; + +// generic log event +class LogEvent { + protected: + string event; + + char *serial; + + public: + LogEvent(string e) { + event = e; + } + LogEvent(char *e) { + event = e; + } + ~LogEvent() { + if (serial) { delete serial; serial = 0; } + } + + // note: LogEvent owns serialized buffer + virtual int serialize(char **buf, size_t *len) { + *len = event.length()+1+4+4; // pad with NULL term + *buf = new char[*len]; + *((__uint32_t*)*buf) = *len; + *((__uint32_t*)*buf +1) = 1; //EVENT_STRING; + memcpy(*buf+8, event.c_str(), *len-8); + return 0; + } +}; + +#endif diff --git a/ceph/include/LogEvent.h~ b/ceph/include/LogEvent.h~ new file mode 100644 index 0000000000000..dab32bf82880e --- /dev/null +++ b/ceph/include/LogEvent.h~ @@ -0,0 +1,12 @@ + +#ifndef __LOGEVENT_H +#define __LOGEVENT_H + +class LogEvent { + protected: + + public: + +}; + +#endif diff --git a/ceph/include/LogStream.h b/ceph/include/LogStream.h new file mode 100644 index 0000000000000..4af05174e85af --- /dev/null +++ b/ceph/include/LogStream.h @@ -0,0 +1,46 @@ +#ifndef __LOGSTREAM_H +#define __LOGSTREAM_H + +#include +#include "osd.h" +#include "Context.h" + +class LogEvent; + +class LogStream { + protected: + off_t cur_pos, append_pos; + int osd; + object_t oid; + + char *buf; + size_t buflen; + off_t buf_start; + + public: + LogStream(int osd, object_t oid) { + this->osd = osd; + this->oid = oid; + cur_pos = 0; + append_pos = 0; // fixme + buf = 0; + } + ~LogStream() { + if (buf) { delete buf; buf = 0; } + } + + off_t seek(off_t offset) { + cur_pos = offset; + } + + off_t get_pos() { + return cur_pos; + } + + // WARNING: non-reentrant; single reader only + int read_next(LogEvent **le, Context *c, int step=1); + + int append(LogEvent *e, Context *c); // append at cur_pos, mind you! +}; + +#endif diff --git a/ceph/include/LogStream.h~ b/ceph/include/LogStream.h~ new file mode 100644 index 0000000000000..a7b80ee36ca93 --- /dev/null +++ b/ceph/include/LogStream.h~ @@ -0,0 +1,10 @@ + + + +class LogStream { + + + off_t seek(off_t offset); + LogEvent *read_next(); + +}; diff --git a/ceph/include/MDLog.h b/ceph/include/MDLog.h index 9c90dcf06f6a0..5e81990c2c6fc 100644 --- a/ceph/include/MDLog.h +++ b/ceph/include/MDLog.h @@ -1,10 +1,9 @@ - #ifndef __MDLOG_H #define __MDLOG_H /* -Things that go in the MDS log: +hmm, some things that go in the MDS log: prepare + commit versions of many of these? @@ -22,18 +21,47 @@ prepare + commit versions of many of these? */ +#include "Context.h" + +#include + +using namespace std; + +class LogStream; +class LogEvent; class MDLog { protected: + + size_t num_events; // in events + size_t max_events; + LogStream *reader; + LogStream *writer; + list trimming; // events currently being trimmed public: + MDLog(); + ~MDLog(); - void submit_entry( MDLogEntry *e, + void set_max_events(size_t max) { + max_events = max; + } + size_t get_max_events() { + return max_events; + } + size_t get_num_events() { + return num_events; + } + + int submit_entry( LogEvent *e, Context *c ); - void trim(); + int trim(Context *c); + int trim_2(LogEvent *e, Context *c); }; + +#endif diff --git a/ceph/include/MDS.h b/ceph/include/MDS.h index 173b7a76fc5f9..91faa1f238e6c 100644 --- a/ceph/include/MDS.h +++ b/ceph/include/MDS.h @@ -10,21 +10,12 @@ class CInode; - -//#include "MDCache.h" class DentryCache; - - -//#include "MDStore.h" class MDStore; - - -//#include "Messenger.h" +class MDLog; class Messenger; class Message; -//#include "MDLog.h" -//#include "MDBalancer.h" using namespace std; @@ -46,10 +37,10 @@ class MDS { // sub systems DentryCache *mdcache; // cache MDStore *mdstore; // storage interface - Messenger *messenger; // message processing + Messenger *messenger; // message processing - //MDLog *logger; - //MDBalancer *balancer; + MDLog *logger; + //MDBalancer *balancer; diff --git a/ceph/include/MDStore.h b/ceph/include/MDStore.h index bcad53dc98caf..9a04329610404 100644 --- a/ceph/include/MDStore.h +++ b/ceph/include/MDStore.h @@ -26,7 +26,11 @@ bool fetch_inode( mdloc_t where, */ bool fetch_dir( CInode *in, Context *c ); - bool fetch_dir_2( int result, char *buf, size_t buflen, CInode *in, Context *c ); + bool fetch_dir_2( int result, + char *buf, + size_t buflen, + CInode *in, + Context *c ); @@ -34,7 +38,8 @@ bool fetch_inode( mdloc_t where, Context *c ); bool commit_dir_2( int result, CInode *in, - Context *c ); + Context *c, + __uint64_t committed_version ); // process a message diff --git a/ceph/include/OSD.h b/ceph/include/OSD.h index 8bd307eca8d3f..70b04c1259b95 100644 --- a/ceph/include/OSD.h +++ b/ceph/include/OSD.h @@ -2,12 +2,16 @@ #ifndef __OSD_H #define __OSD_H +#include "Context.h" + typedef size_t object_t; // functions int osd_read(int osd, object_t oid, size_t len, size_t offset, void *buf, Context *c); int osd_read_all(int osd, object_t oid, void **bufptr, size_t *buflen, Context *c); -int osd_write(int osd, object_t oid, size_t len, size_t offset, void *buf, Context *c); + +int osd_write(int osd, object_t oid, size_t len, size_t offset, void *buf, int flags, Context *c); + int osd_remove(int osd, object_t oid, Context *c); -- 2.39.5