From 1418c0d89f752795cc2f1b21acddbb86284c8568 Mon Sep 17 00:00:00 2001 From: sage Date: Fri, 28 Apr 2006 22:02:36 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@748 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/Makefile | 1 + ceph/TODO | 315 +--------- ceph/client/Client.cc | 9 +- ceph/client/Client.h | 13 +- ceph/common/Timer.cc | 13 +- ceph/common/Timer.h | 5 +- ceph/crush/test/cluster_movement_rush.cc | 218 +++++++ ceph/crush/test/creeping_failure.cc | 276 +++++++++ ceph/crush/test/creeping_failure_variance.cc | 281 +++++++++ ceph/crush/test/speed_rush.cc | 145 +++++ ceph/ebofs/Ebofs.cc | 2 +- ceph/mds/MDCache.cc | 7 +- ceph/mds/MDS.cc | 49 +- ceph/mds/MDS.h | 9 + ceph/messages/MOSDOp.h | 10 +- ceph/messages/MOSDOpReply.h | 8 +- ceph/msg/FakeMessenger.cc | 6 +- ceph/osd/OSD.cc | 189 ++---- ceph/osd/OSD.h | 32 +- ceph/osd/PG.h | 12 + ceph/osdc/Filer.cc | 603 +------------------ ceph/osdc/Filer.h | 156 ++--- ceph/osdc/ObjectCacher.h | 66 ++ ceph/osdc/Objecter.cc | 516 ++++++++++++++++ ceph/osdc/Objecter.h | 126 ++++ 25 files changed, 1914 insertions(+), 1153 deletions(-) create mode 100644 ceph/crush/test/cluster_movement_rush.cc create mode 100644 ceph/crush/test/creeping_failure.cc create mode 100644 ceph/crush/test/creeping_failure_variance.cc create mode 100644 ceph/crush/test/speed_rush.cc create mode 100644 ceph/osdc/ObjectCacher.h create mode 100644 ceph/osdc/Objecter.cc create mode 100644 ceph/osdc/Objecter.h diff --git a/ceph/Makefile b/ceph/Makefile index e6f6e5395529c..b79a9af125077 100644 --- a/ceph/Makefile +++ b/ceph/Makefile @@ -53,6 +53,7 @@ COMMON_OBJS= \ msg/Dispatcher.o\ msg/HostMonitor.o\ osd/Filer.o\ + osd/Objecter.o\ osd/OSDMap.o\ mds/MDCluster.o\ common/Logger.o\ diff --git a/ceph/TODO b/ceph/TODO index 1fe8490f351fc..fa939faa9cdbf 100644 --- a/ceph/TODO +++ b/ceph/TODO @@ -1,171 +1,4 @@ - -make graphs from dat runs: - - /makedirs - /ossh.lib - /ossh.include - - makedirs tput vs lat - - openshared - writefiles - - - - - -llnl direct deposit fax 925 424 2663 - - -mpiexec -l -n 28 ./tcpsyn --mkfs --ebofs --syn until 100 --syn writefile 1000 1048576 --nummds 1 --numclient 112 --numosd 7 --kill_after 120 --osd_object_layout hashino --osd_pg_layout hash --osd_pg_bits 12 --file_layout_num_rep 1 --debug_after 110 --debug_osd 15 --debug_filer 15 --debug 5 --log_name osd/striping.cperbig/cper=16,osd_object_layout=hashino,osd_pg_layout=hash,writefile_size=1048576 > log/osd/striping.cperbig/cper=16,osd_object_layout=hashino,osd_pg_layout=hash,writefile_size=1048576/o && touch log/osd/striping.cperbig/cper=16,osd_object_layout=hashino,osd_pg_layout=hash,writefile_size=1048576/.done & - -mpiexec -l -n 28 ./tcpsyn --mkfs --ebofs --syn until 100 --syn writefile 1000 1048576 --nummds 1 --numclient 896 --numosd 7 --kill_after 120 --osd_object_layout hashino --osd_pg_layout crush --osd_pg_bits 12 --file_layout_num_rep 1 --debug_after 110 --debug_osd 15 --debug_filer 15 --debug 5 --log_name osd/striping.cperbig/cper=128,osd_object_layout=hashino,osd_pg_layout=crush,writefile_size=1048576 > log/osd/striping.cperbig/cper=128,osd_object_layout=hashino,osd_pg_layout=crush,writefile_size=1048576/o && touch log/osd/striping.cperbig/cper=128,osd_object_layout=hashino,osd_pg_layout=crush,writefile_size=1048576/.done & - -mpiexec -l -n 28 ./tcpsyn --mkfs --ebofs --syn until 100 --syn writefile 1000 1048576 --nummds 1 --numclient 448 --numosd 7 --kill_after 120 --osd_object_layout hashino --osd_pg_layout hash --osd_pg_bits 12 --file_layout_num_rep 1 --debug_after 110 --debug_osd 15 --debug_filer 15 --debug 5 --log_name osd/striping.cperbig/cper=64,osd_object_layout=hashino,osd_pg_layout=hash,writefile_size=1048576 > log/osd/striping.cperbig/cper=64,osd_object_layout=hashino,osd_pg_layout=hash,writefile_size=1048576/o && touch log/osd/striping.cperbig/cper=64,osd_object_layout=hashino,osd_pg_layout=hash,writefile_size=1048576/.done & - - - - -541 488 0496 warren - - -TONIGHT TODO - -/make comb calc latency - -/finish openshared run - -vary cper on makedirs mds=96, make a nice tput/lat curve -do big makedirs run w/ smaller set of mds sizes - -fill out ossh.lib (read to go, just exclude cper 50 .. or adjust bc of above) - - -FINAL data sets... - -makedirs - n1 good. - n3 is skewed down.. not clear why? - big has low cper values! doh. **rerun if possible?** - -makedirs.tput - - - - - - -MAR 1 - -makedirs - final --> 12 - fixed lb -> m2 - - -> do it again with cper 50,100 , max 2,3 .. ready! -> all about the same. -plot "log/alcdat/makedirs.n1/c" u 1:2 t "cper=100,mds_bal_max=2", "log/alcdat/makedirs.n1/c" u 1:3 t "cper=100,mds_bal_max=3", "log/alcdat/makedirs.n1/c" u 1:4 t "cper=50,mds_bal_max=2", "log/alcdat/makedirs.n1/c" u 1:5 t "cper=50,mds_bal_max=3"; - - /**-> bal_max=1, cper=100 (n2) -plot "log/alcdat/makedirs.n2/c" u 1:2 t "cper=100,mds_bal_max=1"; - .. same same - - **-> bal_max=2 cper=15,25 (n3) ready Q/ - -makedirs.big - - **-> nbig1 .. flop! overlay no worky - **-> nbig4 .. Q/ - - -makedirs.tput - finalish -> 1 - - **-> ready.. run (last!) to fill in gaps .. (2) Q - - ** running subset of points tho, as (2)! be careful merging results, not all data points for all runs!! - - -ossh.lib - cper 50 looks good when nummds=48.. - - f14 is ok... - repeat w/ range of cper (besides just 50) - - -> run with cper range .. ready -plot "log/alcdat/ossh.lib.n1/c" u 1:2 t "cper=100", "log/alcdat/ossh.lib.n1/c" u 1:3 t "cper=25", "log/alcdat/ossh.lib.n1/c" u 1:4 t "cper=50"; - .. only 25 is good at scale! - - **-> cper=10,16 (n2) ready Q - - -ossh.lib.big - - **-> ready - - - -ossh.include - max 2 @ 80, cper 75 - - best so far is d12. - no, f13 is better. - -d9 80 was good too? cper=50 : log/alcdat/ossh.include.d9/cper=50,nummds=128/o ... srun --wait=120 --exclude=jobs/ltest.ignore -l -t 7 -N 385 -p ltest ./tcpsyn --mkfs --ebofs --syn until 300 --nummds 128 --numclient 6400 --numosd 128 --kill_after 400 --mds_bal_rep 1700 --mds_bal_interval 45 --mds_bal_max 2 --mds_decay_halflife 30 --mds_bal_hash_rd 100000 --tcp_skip_rank0 --mds_shutdown_check 60 --syn only 0 --syn trace traces/openssh/untar.include 1 --syn sleep 30 --syn trace traces/openssh/make.include 1000 --log_name alcdat/ossh.include.d9/cper=50,nummds=128 > log/alcdat/ossh.include.d9/cper=50,nummds=128/o && touch log/alcdat/ossh.include.d9/cper=50,nummds=128/.done & - - -> run with cper range ... ready (n1 good!) -plot "log/alcdat/ossh.include.n1/c" u 1:2 t "cper=100", "log/alcdat/ossh.include.n1/c" u 1:3 t "cper=25", "log/alcdat/ossh.include.n1/c" u 1:4 t "cper=50"; - - - **-> extend w/ cper 15,20 (n2) ..only if time.. - - -ossh.include.big - - **-> nbig2 ready - - - -makefiles - cper + nummds ... ok, 4 ... go back and fill in data points if time! - 150 > 100. check 200 if time - - -openshared - f3 .. has various cper - - - -mdtest - - - - -striping - light vs saturated vs supersaturated? - stripe size - - - -gotchas - - watch out for cper too large.. bogs down mds0, fucks load balancer - - - -opensshinclude - choose osdfac -> 1 - test overlay_clients .. scale nummds - or condense clients into fewer nodes? - fix up ossh.include - - - - -- aged object stores on googoo - -- confirm block dev versus big file - -- ldceph + mdtest - - stability - ebofs table.remove() thing - fakestore crapping out.. missing timer events? @@ -181,165 +14,63 @@ mpiexec -l -n 30 ./tcpsyn --mkfs --ebofs --syn until 100 --syn writefile 1000 65 - - tcpmessenger inq count - - -- mds - - vary log stripe size, count (on ebofs and fakestore) - ? 4k for ebofs, 64k for fakestore - ? scount=4... or 2? - -> striping mostly useless - -- makedirs vs ebo/fake. - - streaming small writes (mds log) - -> ebofs needs 200 clients per MDS to get close to fakestore (compensate for latency?) - -- quick retest of crush vs linear - - -- scaling (scale mds+osd+clients together) - - metadata - - makedirs -** 100 c per mds is good on googoo ... testing on alc as mds_log.1 - alc: 4 osd, 200 client per mds (osd/mds_log.1 .. makedirs) - - - creates in same dir - --syn makefiles 10000 1000 0 - - repeat creat+stat+unlink - - creates in private dir ??? easy? - - repeat creat+stat+unlink ??? easy? - --syn makefiles 10000 1000 1 - - - zillion opens of the same file - --syn only 0 --syn createshared 10 --syn openshared 10 1000 - - - local compile - openssh again? - - shared compile (/lib, /include) - need something with shared files.. so not a linux kernel :( - -* get rid of randomsleep? - - - - data scaling .. aggregate tput - - scale_wr - writes to local files - - strided write to shared file ??? meaningless? (later!) - - strided write with O_LAZY ??? later! - - - tput per client - - local file - - strided to shared file - - strided w/ O_LAZY - - - crush vs linear - - at a large scale! - - - - - - -OSDI - -- tcp recv throttling - -- fix object ov/nv thing? (tcpmessenger locking bug?) - -- tune ebofs - -- vary osd_maxthreads, [ ebofs, fakestore ], write_size - -> medium, large writes: ebofs 10% faster - -> small writes: fake 20% faster -- obfs? - -- osd write tests - - ebofs vs fakestore: - plot "log/osd/write_sizes.swap1.block/c" u 1:2 t "fs=ebofs", "log/osd/write_sizes.sdb2.ext3.fake/c" u 1:2 t "fs=fakestore.sdb2", "log/osd/write_sizes.sdb2.ext3.fake2/c" u 1:2 t "fs=fakestore.sdb2.again", "log/osd/write_sizes.sdb2.ebo/c" u 1:2 t "fs=ebofs.again" - - get obfs working? - -- client buffer cache! - -- ld_preload? - - +- rados + - redo all the recovery stuff + - fix object ov/nv thing? (tcpmessenger locking bug?) + (object version AND mtime?) +- client OBJECT cache + - lazyio_* - clean up tcpmessenger -- fix possible race when Cond uses relative now instead of real_now - -- figure out weird 40ms latency with double log entries - - - -need post osdi: +- mds + - figure out weird 40ms latency with double log entries mds - statlite - stat single writer +- statlite +- stat single writer +- truncate() +- chdir (directory opens!) +- delayed replica caps release... we need to set a timer event! (and cancel it when appropriate) client - flesh out posix layer - statlite - readdir + stat - readdirplus - lazy_* - -osd - new rados mech - - +- flesh out posix layer +- statlite +/- readdir + stat +- readdirplus +- lazy_* (after object cache) -client - test client caps with meta exports - some heuristic behavior to consolidate caps to inode auth - client will re-tx anything it needed to say upon rx of new mds notification (?) - readv+writev, readx+writex - serialized! -- O_LAZY - - synchronize_* -- statlite - -- LD_PRELOAD -filer -- (optional) serial behavior when read spans objects - - (altho we still can't get atomicity) - - ...do a short return in those cases? ..maybe a 'bool atomic' flag.. +filer/osd +- atomic write spanning objects... -mds -- delayed replica caps release... we need to set a timer event! (and cancel it when appropriate) -- implement/test truncate() -- chdir - - client handles for directories! ebofs -- fix NEAR_LAST_FWD +- fix NEAR_LAST_FWD (?) - combine inodes into same blocks? - delay allocation - actually, reallocate if dirty is cancelable - journaling -- clone(), and/or snapshots +- nix oc_tab, move into onode. +- clone() osd -- new ardos scheme - - directory! - osdmap history distribution - handle down osds - pg_bit changes -- 'dirty' log on primary? - - fast recovery from degraded mode - - watch osd utilization; adjust cluster map -lazy posix -- softstat, etc. - cluster issues diff --git a/ceph/client/Client.cc b/ceph/client/Client.cc index b509492a0d342..d34c661fa14b0 100644 --- a/ceph/client/Client.cc +++ b/ceph/client/Client.cc @@ -32,6 +32,7 @@ #include "messages/MGenericMessage.h" #include "osd/Filer.h" +#include "osd/Objecter.h" #include "common/Cond.h" #include "common/Mutex.h" @@ -76,7 +77,8 @@ Client::Client(Messenger *m) // osd interfaces osdmap = new OSDMap(); // initially blank.. see mount() - filer = new Filer(messenger, osdmap); + objecter = new Objecter(messenger, osdmap); + filer = new Filer(objecter); // Buffercache bc = new Buffercache(); @@ -87,6 +89,7 @@ Client::~Client() { if (messenger) { delete messenger; messenger = 0; } if (filer) { delete filer; filer = 0; } + if (objecter) { delete objecter; objecter = 0; } if (osdmap) { delete osdmap; osdmap = 0; } tear_down_bcache(); @@ -518,11 +521,11 @@ void Client::dispatch(Message *m) switch (m->get_type()) { // osd case MSG_OSD_OPREPLY: - filer->handle_osd_op_reply((MOSDOpReply*)m); + objecter->handle_osd_op_reply((MOSDOpReply*)m); break; case MSG_OSD_MAP: - filer->handle_osd_map((class MOSDMap*)m); + objecter->handle_osd_map((class MOSDMap*)m); break; // client diff --git a/ceph/client/Client.h b/ceph/client/Client.h index 0bfc26cc75a8c..dace7ab0acd1d 100644 --- a/ceph/client/Client.h +++ b/ceph/client/Client.h @@ -43,8 +43,11 @@ using namespace std; #include using namespace __gnu_cxx; +#define O_LAZY 01000000 + class Filer; +class Objecter; extern class LogType client_logtype; extern class Logger *client_logger; @@ -256,6 +259,8 @@ struct Fh { int mds; // have to talk to mds we opened with (for now) int mode; // the mode i opened the file with + bool is_lazy() { return mode & O_LAZY; } + Fh() : inode(0), pos(0), mds(0), mode(0) {} }; @@ -286,7 +291,8 @@ public: void hack_sync_write_safe(); protected: - Filer *filer; // (non-blocking) osd interface + Filer *filer; + Objecter *objecter; // (non-blocking) osd interface // cache hash_map inode_map; @@ -505,6 +511,11 @@ protected: //int truncate(fh_t fh, long long size); int fsync(fh_t fh, bool syncdataonly); + // hpc extensions + int lazyio_propogate(int fd, off_t offset, size_t count); + int lazyio_synchronize(int fd, off_t offset, size_t count); + //int lstatlite(const char *path, struct statlite *buf); + }; #endif diff --git a/ceph/common/Timer.cc b/ceph/common/Timer.cc index 06a35fdcbb2af..99cda93fcb196 100644 --- a/ceph/common/Timer.cc +++ b/ceph/common/Timer.cc @@ -20,7 +20,7 @@ #include "config.h" #include "include/Context.h" -#include "msg/Messenger.h" +//#include "msg/Messenger.h" #undef dout #define dout(x) if (x <= g_conf.debug) cout << "Timer: " @@ -35,7 +35,7 @@ Timer g_timer; //Context *messenger_kicker = 0; -Messenger *messenger = 0; +//Messenger *messenger = 0; @@ -65,7 +65,7 @@ void Timer::timer_entry() utime_t t = it->first; dout(DBL) << "queueing event(s) scheduled at " << t << endl; - if (messenger) { + /*if (messenger) { for (multiset::iterator cit = it->second.begin(); cit != it->second.end(); cit++) { @@ -74,6 +74,7 @@ void Timer::timer_entry() num_event--; } } + */ //pending[t] = it->second; map< utime_t, multiset >::iterator previt = it; @@ -94,7 +95,7 @@ void Timer::timer_entry() pending.clear(); } else { // give them to the messenger - messenger->queue_callbacks(pending); + //messenger->queue_callbacks(pending); } assert(pending.empty()); } @@ -132,6 +133,7 @@ void Timer::timer_entry() * Timer bits */ +/* void Timer::set_messenger(Messenger *m) { dout(10) << "set messenger " << m << endl; @@ -142,6 +144,7 @@ void Timer::unset_messenger() dout(10) << "unset messenger" << endl; messenger = 0; } +*/ void Timer::register_timer() { @@ -154,7 +157,7 @@ void Timer::register_timer() sleep_cond.SignalAll(); } else { dout(DBL) << "register_timer doing nothing; thread is alive but not sleeping" << endl; - // it's probably delivering callbacks to the messenger loop + // it's probably doing callbacks. } } else { dout(DBL) << "register_timer starting thread" << endl; diff --git a/ceph/common/Timer.h b/ceph/common/Timer.h index 41063448ba5c1..d5604bb301356 100644 --- a/ceph/common/Timer.h +++ b/ceph/common/Timer.h @@ -35,7 +35,7 @@ using namespace __gnu_cxx; * schedule callbacks */ -class Messenger; +//class Messenger; namespace __gnu_cxx { @@ -120,12 +120,13 @@ class Timer { cancel_timer(); } + /* void set_messenger_kicker(Context *c); void unset_messenger_kicker(); void set_messenger(Messenger *m); void unset_messenger(); - + */ // schedule events void add_event_after(float seconds, diff --git a/ceph/crush/test/cluster_movement_rush.cc b/ceph/crush/test/cluster_movement_rush.cc new file mode 100644 index 0000000000000..90cc197c24f65 --- /dev/null +++ b/ceph/crush/test/cluster_movement_rush.cc @@ -0,0 +1,218 @@ + + +#include "../crush.h" +using namespace crush; + +#include + +#include +#include +using namespace std; + +int buckettype = 0; + +Bucket *make_bucket(Crush& c, vector& wid, int h, map< int, list >& buckets, int& ndisks) +{ + if (h == 0) { + // uniform + Hash hash(123); + vector disks; + for (int i=0; imake_primes(hash); + c.add_bucket(b); + //cout << h << " uniformbucket with " << wid[h] << " disks" << endl; + buckets[h].push_back(b); + return b; + } else { + // mixed + //Bucket *b = new TreeBucket(h+1); + //Bucket *b = new ListBucket(h+1); + //Bucket *b = new StrawBucket(h+1); + Bucket *b; + if (buckettype == 0) + b = new TreeBucket(h+1); + else if (buckettype == 1 || buckettype == 2) + b = new ListBucket(h+1); + else if (buckettype == 3) + b = new StrawBucket(h+1); + + c.add_bucket(b); + for (int i=0; iadd_item(n->get_id(), n->get_weight()); + n->set_parent(b->get_id()); + } + buckets[h].push_back(b); + //cout << b->get_id() << " mixedbucket with " << wid[h] << " at " << h << endl; + return b; + } +} + +int make_hierarchy(Crush& c, vector& wid, map< int, list >& buckets, int& ndisks) +{ + Bucket *b = make_bucket(c, wid, wid.size()-1, buckets, ndisks); + return b->get_id(); +} + + +void place(Crush& c, Rule& rule, int numpg, int numrep, map >& placement) +{ + vector v(numrep); + map ocount; + + for (int x=1; x<=numpg; x++) { + + //cout << H(x) << "\t" << h(x) << endl; + c.do_rule(rule, x, v); + //cout << "v = " << v << endl;// " " << v[0] << " " << v[1] << " " << v[2] << endl; + + bool bad = false; + for (int i=0; i::iterator it = ocount.begin(); + it != ocount.end(); + it++) + cout << it->first << "\t" << it->second << endl; + +} + + +float testmovement(int depth, int branching, int udisks, int add, int modifydepth) +{ + Hash h(73232313); + + // crush + Crush c; + + + // buckets + int root = -1; + int ndisks = 0; + + vector wid; + wid.push_back(udisks); + for (int d=1; d > buckets; + + root = make_hierarchy(c, wid, buckets, ndisks); + + //c.print(cout,root); + + // rule + int numrep = 2; + Rule rule; + rule.steps.push_back(RuleStep(CRUSH_RULE_TAKE, root)); + rule.steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, numrep, 0)); + rule.steps.push_back(RuleStep(CRUSH_RULE_EMIT)); + + //c.overload[10] = .1; + + + int pg_per = 100; + int numpg = pg_per*ndisks/numrep; + + vector ocount(ndisks); + + /* + cout << ndisks << " disks, " << endl; + cout << pg_per << " pgs per disk" << endl; + cout << numpg << " logical pgs" << endl; + cout << "numrep is " << numrep << endl; + */ + map > placement1, placement2; + + //c.print(cout, root); + + + // ORIGINAL + place(c, rule, numpg, numrep, placement1); + + int olddisks = ndisks; + + // add disks + //cout << " adding " << add << " disks" << endl; + vector disks; + for (int i=0; imake_primes(h); + + //Bucket *o = buckets[2].back(); + Bucket *o; + if (buckettype == 2) + o = buckets[modifydepth].front(); + else + o = buckets[modifydepth].back(); + + c.add_bucket(b); + //cout << " adding under " << o->get_id() << endl; + c.add_item(o->get_id(), b->get_id(), b->get_weight(), buckettype == 2); + //((MixedBucket*)o)->add_item(b->get_id(), b->get_weight()); + //newbucket = b; + + + // ADDED + //c.print(cout, root); + place(c, rule, numpg, numrep, placement2); + + int moved = 0; + for (int x=1; x<=numpg; x++) + if (placement1[x] != placement2[x]) + for (int j=0; j + +#include +#include +using namespace std; + + +Clock g_clock; + + +Bucket *make_bucket(Crush& c, vector& wid, int h, int& ndisks) +{ + if (h == 0) { + // uniform + Hash hash(123); + vector disks; + for (int i=0; imake_primes(hash); + c.add_bucket(b); + //cout << h << " uniformbucket with " << wid[h] << " disks weight " << w << endl; + return b; + } else { + // mixed + Bucket *b = new TreeBucket(h+1); + for (int i=0; iadd_item(n->get_id(), n->get_weight()); + } + c.add_bucket(b); + //cout << h << " mixedbucket with " << wid[h] << endl; + return b; + } +} + +int make_hierarchy(Crush& c, vector& wid, int& ndisks) +{ + Bucket *b = make_bucket(c, wid, wid.size()-1, ndisks); + return b->get_id(); +} + + + +float go(int dep, int failpc) +{ + Hash h(73232313); + + //int overloadcutoff = (int)((float)10000.0 / (float)utilization); + + //cout << "util " << utilization << " cutoff " << overloadcutoff << endl; + // crush + Crush c; + + + // buckets + int root = -1; + int ndisks = 0; + + vector wid; + for (int d=0; d ocount(ndisks); + //cout << ndisks << " disks, " << endl; + //cout << pg_per << " pgs per disk" << endl; + // cout << numpg << " logical pgs" << endl; + //cout << "numrep is " << numrep << endl; + + + int place = 1000000; + int times = place / numpg; + if (!times) times = 1; + + + //cout << "looping " << times << " times" << endl; + + float tavg[10]; + float tvar[10]; + for (int j=0;j<10;j++) { + tvar[j] = 0; + tavg[j] = 0; + } + int tvarnum = 0; + float trvar = 0.0; + + float overloadsum = 0.0; + float adjustsum = 0.0; + float afteroverloadsum = 0.0; + float aslowdown = 0.0; + int chooses = 0; + int xs = 1; + for (int t=0; t v(numrep); + + c.out.clear(); + + for (int z=0; z= ndisks) cout << "v[i] " << i << " is " << v[i] << " .. x = " << x << endl; + //assert(v[i] < ndisks); + ocount[v[i]]++; + } + } + utime_t t1b = g_clock.now(); + + // add in numf failed disks + for (int f = 0; f < numf; f++) { + int d = rand() % ndisks; + while (c.out.count(d)) d = rand() % ndisks; + c.out.insert(d); + } + + utime_t t3a = g_clock.now(); + for (int x=xs; x + +#include +#include +using namespace std; + + +Bucket *make_bucket(Crush& c, vector& wid, int h, int& ndisks) +{ + if (h == 0) { + // uniform + Hash hash(123); + vector disks; + for (int i=0; imake_primes(hash); + c.add_bucket(b); + //cout << h << " uniformbucket with " << wid[h] << " disks" << endl; + return b; + } else { + // mixed + MixedBucket *b = new MixedBucket(h+1); + for (int i=0; iadd_item(n->get_id(), n->get_weight()); + } + c.add_bucket(b); + //cout << h << " mixedbucket with " << wid[h] << endl; + return b; + } +} + +int make_hierarchy(Crush& c, vector& wid, int& ndisks) +{ + Bucket *b = make_bucket(c, wid, wid.size()-1, ndisks); + return b->get_id(); +} + + +Bucket *make_random(Crush& c, int wid, int height, int& ndisks) +{ + int w = rand() % (wid-1) + 2; + + if (height == 0) { + // uniform + Hash hash(123); + vector disks; + for (int i=0; imake_primes(hash); + c.add_bucket(b); + //cout << h << " uniformbucket with " << wid[h] << " disks" << endl; + return b; + } else { + // mixed + int h = rand() % height + 1; + MixedBucket *b = new MixedBucket(h+1); + for (int i=0; iadd_item(n->get_id(), n->get_weight()); + } + c.add_bucket(b); + //cout << h << " mixedbucket with " << wid[h] << endl; + return b; + } + +} + + +float go(int dep, int overloadcutoff) +{ + Hash h(73232313); + + // crush + Crush c; + + + // buckets + int root = -1; + int ndisks = 0; + + vector wid; + for (int d=0; dget_id(); + //c.print(cout, root); + } + if (0) { + MixedBucket *b = new MixedBucket(1); + for (int i=0; i<10000; i++) + b->add_item(ndisks++, 10); + root = c.add_bucket(b); + } + if (0) { + vector disks; + for (int i=0; i<10000; i++) + disks.push_back(ndisks++); + UniformBucket *b = new UniformBucket(1, 0, 10000, disks); + Hash h(123); + b->make_primes(h); + root = c.add_bucket(b); + } + //cout << ndisks << " disks" << endl; + + + + // rule + int numrep = 1; + Rule rule; + rule.steps.push_back(RuleStep(CRUSH_RULE_TAKE, root)); + rule.steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, numrep, 0)); + rule.steps.push_back(RuleStep(CRUSH_RULE_EMIT)); + + //c.overload[10] = .1; + + + int pg_per = 100; + int numpg = pg_per*ndisks/numrep; + + vector ocount(ndisks); + //cout << ndisks << " disks, " << endl; + //cout << pg_per << " pgs per disk" << endl; + // cout << numpg << " logical pgs" << endl; + //cout << "numrep is " << numrep << endl; + + + int place = 1000000; + int times = place / numpg; + if (!times) times = 1; + + + //cout << "looping " << times << " times" << endl; + + float tvar = 0; + int tvarnum = 0; + + float overloadsum = 0.0; + float adjustsum = 0.0; + float afteroverloadsum = 0.0; + int chooses = 0; + int xs = 1; + for (int t=0; t v(numrep); + + c.overload.clear(); + + for (int z=0; z overloadcutoff) + overloaded++; + + if (ocount[i] > 100+(overloadcutoff-100)/2) { + adjusted++; + c.overload[i] = 100.0 / (float)ocount[i]; + //cout << "disk " << i << " has " << ocount[i] << endl; + } + ocount[i] = 0; + } + //cout << overloaded << " overloaded" << endl; + overloadsum += (float)overloaded / (float)ndisks; + adjustsum += (float)adjusted / (float)ndisks; + + + for (int x=xs; x overloadcutoff) { + still++; + //c.overload[ocount[i]] = 100.0 / (float)ocount[i]; + //cout << "disk " << i << " has " << ocount[i] << endl; + } + } + //if (still) cout << "overload was " << overloaded << " now " << still << endl; + afteroverloadsum += (float)still / (float)ndisks; + + //cout << "collisions: " << c.collisions << endl; + //cout << "r bumps: " << c.bumps << endl; + + float avg = 0.0; + for (int i=0; i100; d -= 5) { + float var = go(3,d); + //cout << "## depth = " << d << endl; + //cout << d << "\t" << var << endl; + } +} diff --git a/ceph/crush/test/speed_rush.cc b/ceph/crush/test/speed_rush.cc new file mode 100644 index 0000000000000..93a5584a2680a --- /dev/null +++ b/ceph/crush/test/speed_rush.cc @@ -0,0 +1,145 @@ + +#include "../../common/Clock.h" +#include "../crush.h" +using namespace crush; + + +Clock g_clock; + +#include + +#include +#include +using namespace std; + + +int branching = 10; +bool linear = false; +int numrep = 1; + +Bucket *make_bucket(Crush& c, vector& wid, int h, int& ndisks) +{ + if (h == 0) { + // uniform + Hash hash(123); + vector disks; + for (int i=0; imake_primes(hash); + c.add_bucket(b); + //cout << h << " uniformbucket with " << wid[h] << " disks" << endl; + return b; + } else { + // mixed + Bucket *b; + if (linear) + b = new ListBucket(h+1); + else + b = new TreeBucket(h+1); + for (int i=0; iadd_item(n->get_id(), n->get_weight()); + } + c.add_bucket(b); + //cout << h << " mixedbucket with " << wid[h] << endl; + return b; + } +} + +int make_hierarchy(Crush& c, vector& wid, int& ndisks) +{ + Bucket *b = make_bucket(c, wid, wid.size()-1, ndisks); + return b->get_id(); +} + + +double go(int s) +{ + int dep = 2; + Hash h(73232313); + + // crush + Crush c; + + + // buckets + int root = -1; + int ndisks = 0; + + vector wid; + if (1) { + //for (int d=0; d v(numrep); + + utime_t start = g_clock.now(); + + for (int x=1; x <= place; x++) + c.do_rule(rule, x, v); + + utime_t end = g_clock.now(); + + end -= start; + double el = (double)end; + + cout << "\t" << ndisks; + + return el; +} + + +int main() +{ + branching = 8; + + int d = 2; + numrep = 2; + + for (int s = 64; s <= 32768; s *= 8) { + cout << "t"; + linear = false; + double el = go(s, d); + cout << "\t" << el; + + cout << "\tp"; + linear = true; + el = go(s, d); + cout << "\t" << el; + + cout << endl; + } +} diff --git a/ceph/ebofs/Ebofs.cc b/ceph/ebofs/Ebofs.cc index a5cd806cd5a3b..97c63bb150fab 100644 --- a/ceph/ebofs/Ebofs.cc +++ b/ceph/ebofs/Ebofs.cc @@ -240,9 +240,9 @@ int Ebofs::umount() dout(5) << "umount cleaning up" << endl; close_tables(); dev.close(); + readonly = unmounting = mounted = false; dout(1) << "umount done on " << dev.get_device_name() << endl; - readonly = unmounting = mounted = false; ebofs_lock.Unlock(); return 0; } diff --git a/ceph/mds/MDCache.cc b/ceph/mds/MDCache.cc index 270ff9343b8ae..ee076f5b9b125 100644 --- a/ceph/mds/MDCache.cc +++ b/ceph/mds/MDCache.cc @@ -529,9 +529,10 @@ bool MDCache::trim(int max) { // last link? if (in->inode.nlink == 0) { dout(17) << "last link, removing file content " << *in << endl; // FIXME THIS IS WRONG PLACE FOR THIS! - mds->filer->remove(in->inode, - in->inode.size, - NULL, NULL); // FIXME + mds->filer->zero(in->inode, + in->inode.size, + 0, + NULL, NULL); // FIXME } // remove it diff --git a/ceph/mds/MDS.cc b/ceph/mds/MDS.cc index 48646a45b5806..5d0bea6f97347 100644 --- a/ceph/mds/MDS.cc +++ b/ceph/mds/MDS.cc @@ -19,6 +19,7 @@ #include "msg/Messenger.h" #include "osd/OSDMap.h" +#include "osd/Objecter.h" #include "osd/Filer.h" #include "MDS.h" @@ -144,7 +145,8 @@ MDS::MDS(MDCluster *mdc, int whoami, Messenger *m) { // - filer = new Filer(messenger, osdmap); + objecter = new Objecter(messenger, osdmap); + filer = new Filer(objecter); mdlog->set_max_events(g_conf.mds_log_max_len); @@ -224,6 +226,7 @@ MDS::~MDS() { if (osdmap) { delete osdmap; osdmap = 0; } if (filer) { delete filer; filer = 0; } + if (objecter) { delete objecter; objecter = 0; } if (messenger) { delete messenger; messenger = NULL; } if (logger) { delete logger; logger = 0; } @@ -407,10 +410,10 @@ void MDS::proc_message(Message *m) switch (m->get_type()) { // OSD =============== case MSG_OSD_MKFS_ACK: - filer->handle_osd_mkfs_ack(m); + handle_osd_mkfs_ack(m); return; case MSG_OSD_OPREPLY: - filer->handle_osd_op_reply((class MOSDOpReply*)m); + objecter->handle_osd_op_reply((class MOSDOpReply*)m); return; case MSG_OSD_MAP: handle_osd_map((MOSDMap*)m); @@ -726,6 +729,44 @@ void MDS::handle_osd_map(MOSDMap *m) } +void MDS::mkfs(Context *onfinish) +{ + dout(7) << "mkfs, wiping all OSDs" << endl; + + // send MKFS to osds + set ls; + osdmap->get_all_osds(ls); + + for (set::iterator it = ls.begin(); + it != ls.end(); + it++) { + + // issue mkfs + messenger->send_message(new MOSDMap(osdmap, true), + MSG_ADDR_OSD(*it)); + pending_mkfs.insert(*it); + } + + waiting_for_mkfs = onfinish; +} + +void MDS::handle_osd_mkfs_ack(Message *m) +{ + int from = MSG_ADDR_NUM(m->get_source()); + + assert(pending_mkfs.count(from)); + pending_mkfs.erase(from); + + if (pending_mkfs.empty()) { + dout(2) << "done with mkfs" << endl; + waiting_for_mkfs->finish(0); + delete waiting_for_mkfs; + waiting_for_mkfs = 0; + } +} + + + void MDS::handle_client_mount(MClientMount *m) { // mkfs? (sorta hack!) @@ -751,7 +792,7 @@ void MDS::handle_client_mount(MClientMount *m) // init osds too dout(3) << "wiping osds too" << endl; - filer->mkfs(new C_MDS_Unpause(this)); + mkfs(new C_MDS_Unpause(this)); waiting_for_unpause.push_back(new C_MDS_RetryMessage(this, m)); return; diff --git a/ceph/mds/MDS.h b/ceph/mds/MDS.h index 88440922d81cc..066b74b852f80 100644 --- a/ceph/mds/MDS.h +++ b/ceph/mds/MDS.h @@ -69,6 +69,7 @@ typedef __uint64_t object_t; class filepath; class OSDMap; +class Objecter; class Filer; class AnchorTable; @@ -110,6 +111,7 @@ class MDS : public Dispatcher { MDCluster *mdcluster; public: OSDMap *osdmap; + Objecter *objecter; Filer *filer; // for reading/writing to/from osds AnchorTable *anchormgr; OSDMonitor *osdmonitor; @@ -182,6 +184,13 @@ class MDS : public Dispatcher { int shutdown_start(); int shutdown_final(); + // osd fun +private: + set pending_mkfs; + Context *waiting_for_mkfs; +public: + void mkfs(Context *onfinish); + void handle_osd_mkfs_ack(Message *m); void bcast_osd_map(); // messages diff --git a/ceph/messages/MOSDOp.h b/ceph/messages/MOSDOp.h index 6befc35fcdbf0..fcc202737c994 100644 --- a/ceph/messages/MOSDOp.h +++ b/ceph/messages/MOSDOp.h @@ -33,7 +33,7 @@ #define OSD_OP_STAT 10 #define OSD_OP_DELETE 11 #define OSD_OP_TRUNCATE 12 -#define OSD_OP_ZERORANGE 13 +#define OSD_OP_ZERO 13 #define OSD_OP_IS_REP(x) ((x) >= 30) @@ -64,7 +64,7 @@ typedef struct { version_t old_version; bool want_ack; - bool want_safe; + bool want_commit; size_t _data_len; } MOSDOp_st; @@ -94,7 +94,7 @@ class MOSDOp : public Message { const size_t get_offset() { return st.offset; } const bool wants_ack() { return st.want_ack; } - const bool wants_safe() { return st.want_safe; } + const bool wants_commit() { return st.want_commit; } void set_data(bufferlist &d) { data.claim(d); @@ -123,7 +123,7 @@ class MOSDOp : public Message { this->st.op = op; this->st.want_ack = true; - this->st.want_safe = true; + this->st.want_commit = true; } MOSDOp() {} @@ -136,7 +136,7 @@ class MOSDOp : public Message { void set_old_version(version_t ov) { st.old_version = ov; } void set_want_ack(bool b) { st.want_ack = b; } - void set_want_safe(bool b) { st.want_safe = b; } + void set_want_commit(bool b) { st.want_commit = b; } // marshalling virtual void decode_payload() { diff --git a/ceph/messages/MOSDOpReply.h b/ceph/messages/MOSDOpReply.h index bae5050999370..0d5b02ca19ecb 100644 --- a/ceph/messages/MOSDOpReply.h +++ b/ceph/messages/MOSDOpReply.h @@ -40,7 +40,7 @@ typedef struct { // reply int result; - bool safe; + bool commit; size_t length, offset; size_t object_size; version_t version; @@ -59,7 +59,7 @@ class MOSDOpReply : public Message { long get_tid() { return st.tid; } object_t get_oid() { return st.oid; } int get_op() { return st.op; } - bool get_safe() { return st.safe; } + bool get_commit() { return st.commit; } int get_result() { return st.result; } size_t get_length() { return st.length; } @@ -92,7 +92,7 @@ class MOSDOpReply : public Message { void set_pcid(long pcid) { this->st.pcid = pcid; } long get_pcid() { return st.pcid; } - MOSDOpReply(MOSDOp *req, int result, OSDMap *oc, bool safe) : + MOSDOpReply(MOSDOp *req, int result, OSDMap *oc, bool commit) : Message(MSG_OSD_OPREPLY) { memset(&st, 0, sizeof(st)); this->st.pcid = req->st.pcid; @@ -101,7 +101,7 @@ class MOSDOpReply : public Message { this->st.oid = req->st.oid; this->st.op = req->st.op; this->st.result = result; - this->st.safe = safe; + this->st.commit = commit; this->st.length = req->st.length; // speculative... OSD should ensure these are correct this->st.offset = req->st.offset; diff --git a/ceph/msg/FakeMessenger.cc b/ceph/msg/FakeMessenger.cc index efddd16cf2ced..5207c1378ba9a 100644 --- a/ceph/msg/FakeMessenger.cc +++ b/ceph/msg/FakeMessenger.cc @@ -99,9 +99,9 @@ void *fakemessenger_thread(void *ptr) } lock.Unlock(); - cout << "unsetting messenger" << endl; + //cout << "unsetting messenger" << endl; //g_timer.unset_messenger_kicker(); - g_timer.unset_messenger(); + //g_timer.unset_messenger(); //msgr_callback_kicker = 0; dout(1) << "thread finish (i woke up but no messages, bye)" << endl; @@ -246,7 +246,7 @@ FakeMessenger::FakeMessenger(msg_addr_t me) : Messenger(me) cout << "fakemessenger " << myaddr << " messenger is " << this << endl; - g_timer.set_messenger(this); + //g_timer.set_messenger(this); qlen = 0; diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index 624243dcefa89..fbd2d5770640a 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -272,7 +272,7 @@ void OSD::_lock_object(object_t oid) Cond c; dout(15) << "lock_object " << hex << oid << dec << " waiting as " << &c << endl; - list& ls = object_lock_waiters[oid]; // this is safe, right? + list& ls = object_lock_waiters[oid]; // this is commit, right? ls.push_back(&c); while (object_lock.count(oid) || @@ -443,7 +443,7 @@ void OSD::handle_op_reply(MOSDOpReply *m) case OSD_OP_REP_WRITE: case OSD_OP_REP_TRUNCATE: case OSD_OP_REP_DELETE: - handle_rep_op_ack(m->get_tid(), m->get_result(), m->get_safe(), MSG_ADDR_NUM(m->get_source())); + handle_rep_op_ack(m->get_tid(), m->get_result(), m->get_commit(), MSG_ADDR_NUM(m->get_source())); delete m; break; @@ -452,7 +452,7 @@ void OSD::handle_op_reply(MOSDOpReply *m) } } -void OSD::handle_rep_op_ack(__uint64_t tid, int result, bool safe, int fromosd) +void OSD::handle_rep_op_ack(__uint64_t tid, int result, bool commit, int fromosd) { if (!replica_ops.count(tid)) { dout(7) << "not waiting for tid " << tid << " replica op reply, map must have changed, dropping." << endl; @@ -463,16 +463,16 @@ void OSD::handle_rep_op_ack(__uint64_t tid, int result, bool safe, int fromosd) MOSDOp *op = repop->op; pg_t pgid = op->get_pg(); - dout(7) << "handle_rep_op_ack " << tid << " op " << op << " result " << result << " safe " << safe << " from osd" << fromosd << endl; + dout(7) << "handle_rep_op_ack " << tid << " op " << op << " result " << result << " commit " << commit << " from osd" << fromosd << endl; if (result >= 0) { // success get_repop(repop); - if (safe) { - // safe - assert(repop->waitfor_safe.count(tid)); - repop->waitfor_safe.erase(tid); + if (commit) { + // commit + assert(repop->waitfor_commit.count(tid)); + repop->waitfor_commit.erase(tid); repop->waitfor_ack.erase(tid); replica_ops.erase(tid); @@ -495,7 +495,7 @@ void OSD::handle_rep_op_ack(__uint64_t tid, int result, bool safe, int fromosd) // forget about this failed attempt.. repop->osds.erase(fromosd); repop->waitfor_ack.erase(tid); - repop->waitfor_safe.erase(tid); + repop->waitfor_commit.erase(tid); replica_ops.erase(tid); @@ -523,8 +523,8 @@ void OSD::handle_rep_op_ack(__uint64_t tid, int result, bool safe, int fromosd) replica_pg_osd_tids[pgid][it->second].erase(it->first); if (replica_pg_osd_tids[pgid][it->second].empty()) replica_pg_osd_tids[pgid].erase(it->second); } - for (map<__uint64_t,int>::iterator it = repop->waitfor_safe.begin(); - it != repop->waitfor_safe.end(); + for (map<__uint64_t,int>::iterator it = repop->waitfor_commit.begin(); + it != repop->waitfor_commit.end(); it++) { replica_ops.erase(it->first); replica_pg_osd_tids[pgid][it->second].erase(it->first); @@ -534,13 +534,13 @@ void OSD::handle_rep_op_ack(__uint64_t tid, int result, bool safe, int fromosd) assert(0); // this is all busted /* - if (repop->local_safe) { + if (repop->local_commit) { repop->lock.Unlock(); delete repop; } else { assert(0); repop->op = 0; // we're forwarding it - repop->cancel = true; // will get deleted by local safe callback + repop->cancel = true; // will get deleted by local commit callback repop->lock.Unlock(); }*/ did = true; @@ -736,79 +736,10 @@ OSDMap* OSD::get_osd_map(version_t v) } + // ====================================================== // REPLICATION - - - -// ------------------------------------ -// placement supersets -/* -void OSD::get_ps_list(list& ls) -{ - list& cl; - store->list_collections(cl); - - for (list::iterator it = cl.begin(); - it != cl.end(); - it++) { - // is it a PS (and not a PG)? - if (*it & PG_PS_MASK == *it) - ls.push_back(*it); - } -} - -bool OSD::ps_exists(ps_t psid) -{ - struct stat st; - if (store->collection_stat(psid, &st) == 0) - return true; - else - return false; -} - -PS* OSD::create_ps(ps_t psid) -{ - assert(ps_map.count(psid) == 0); - assert(!ps_exists(psid)); - - PS *ps = new PS(psid); - ps->store(store); - ps_map[psid] = ps; - return ps; -} - -PS* OSD::open_ps(ps_t psid) -{ - // already open? - if (ps_map.count(psid)) - return ps_map[psid]; - - // exists? - if (!ps_exists(psid)) - return 0; - - // open, stat collection - PS *ps = new PS(whoami, psid); - ps->fetch(store); - ps_map[psid] = ps; - - return ps; -} - -void OSD::close_ps(ps_t psid) -{ - assert(0); -} - -void OSD::remove_ps(ps_t psid) -{ - assert(0); -} -*/ - - // PG void OSD::get_pg_list(list& ls) @@ -820,12 +751,6 @@ void OSD::get_pg_list(list& ls) bool OSD::pg_exists(pg_t pgid) { return store->collection_exists(pgid); - /*struct stat st; - if (store->collection_stat(pgid, &st) == 0) - return true; - else - return false; - */ } PG *OSD::create_pg(pg_t pgid) @@ -1887,26 +1812,26 @@ void OSD::op_rep_remove_reply(MOSDOpReply *op) } -class C_OSD_RepModifySafe : public Context { +class C_OSD_RepModifyCommit : public Context { public: OSD *osd; MOSDOp *op; - C_OSD_RepModifySafe(OSD *o, MOSDOp *oo) : osd(o), op(oo) { } + C_OSD_RepModifyCommit(OSD *o, MOSDOp *oo) : osd(o), op(oo) { } void finish(int r) { - osd->op_rep_modify_safe(op); + osd->op_rep_modify_commit(op); } }; -void OSD::op_rep_modify_safe(MOSDOp *op) +void OSD::op_rep_modify_commit(MOSDOp *op) { // hack: hack_blah is true until 'ack' has been sent. if (op->hack_blah) { - dout(0) << "got rep_modify_safe before rep_modify applied, waiting" << endl; - g_timer.add_event_after(1, new C_OSD_RepModifySafe(this, op)); + dout(0) << "got rep_modify_commit before rep_modify applied, waiting" << endl; + g_timer.add_event_after(1, new C_OSD_RepModifyCommit(this, op)); } else { - dout(10) << "rep_modify_safe on op " << *op << endl; - MOSDOpReply *safe = new MOSDOpReply(op, 0, osdmap, true); - messenger->send_message(safe, op->get_asker()); + dout(10) << "rep_modify_commit on op " << *op << endl; + MOSDOpReply *commit = new MOSDOpReply(op, 0, osdmap, true); + messenger->send_message(commit, op->get_asker()); delete op; } } @@ -1930,15 +1855,15 @@ void OSD::op_rep_modify(MOSDOp *op) dout(12) << "rep_modify in " << *pg << " o " << hex << oid << dec << " v " << op->get_version() << " (i have " << ov << ")" << endl; int r = 0; - Context *onsafe = 0; + Context *oncommit = 0; - op->hack_blah = true; // hack: make sure any 'safe' goes out _after_ our ack + op->hack_blah = true; // hack: make sure any 'commit' goes out _after_ our ack if (op->get_op() == OSD_OP_REP_WRITE) { // write assert(op->get_data().length() == op->get_length()); - onsafe = new C_OSD_RepModifySafe(this, op); - r = apply_write(op, op->get_version(), onsafe); + oncommit = new C_OSD_RepModifyCommit(this, op); + r = apply_write(op, op->get_version(), oncommit); if (ov == 0) pg->add_object(store, oid); logger->inc("r_wr"); @@ -1952,18 +1877,18 @@ void OSD::op_rep_modify(MOSDOp *op) r = store->truncate(oid, op->get_offset()); } else assert(0); - if (onsafe) { + if (oncommit) { // ack MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, false); messenger->send_message(ack, op->get_asker()); } else { - // safe, safe + // commit, commit MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, true); messenger->send_message(ack, op->get_asker()); delete op; } - op->hack_blah = false; // hack: make sure any 'safe' goes out _after_ our ack + op->hack_blah = false; // hack: make sure any 'commit' goes out _after_ our ack } @@ -2302,7 +2227,7 @@ void OSD::op_stat(MOSDOp *op) // WRITE OPS -int OSD::apply_write(MOSDOp *op, version_t v, Context *onsafe) +int OSD::apply_write(MOSDOp *op, version_t v, Context *oncommit) { // take buffers from the message bufferlist bl; @@ -2311,16 +2236,16 @@ int OSD::apply_write(MOSDOp *op, version_t v, Context *onsafe) // write int r = 0; - if (onsafe) { + if (oncommit) { /*if (g_conf.fake_osd_sync) { - // fake a delayed safe + // fake a delayed commit r = store->write(op->get_oid(), op->get_length(), op->get_offset(), bl, false); g_timer.add_event_after(1.0, - onsafe); + oncommit); } else { */ // for real @@ -2330,10 +2255,10 @@ int OSD::apply_write(MOSDOp *op, version_t v, Context *onsafe) op->get_length(), op->get_offset(), bl, - onsafe); + oncommit); } else { // don't actually write, but say we did, for network throughput testing... - g_timer.add_event_after(2.0, onsafe); + g_timer.add_event_after(2.0, oncommit); } } else { // normal business @@ -2384,7 +2309,7 @@ void OSD::issue_replica_op(PG *pg, OSDReplicaOp *repop, int osd) repop->osds.insert(osd); repop->waitfor_ack[tid] = osd; - repop->waitfor_safe[tid] = osd; + repop->waitfor_commit[tid] = osd; replica_ops[tid] = repop; replica_pg_osd_tids[pg->get_pgid()][osd].insert(tid); @@ -2401,13 +2326,13 @@ void OSD::put_repop(OSDReplicaOp *repop) { dout(10) << "put_repop " << *repop << endl; - // safe? - if (repop->can_send_safe() && - repop->op->wants_safe()) { + // commit? + if (repop->can_send_commit() && + repop->op->wants_commit()) { MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osdmap, true); - dout(10) << "put_repop sending safe on " << *repop << " " << reply << endl; + dout(10) << "put_repop sending commit on " << *repop << " " << reply << endl; messenger->send_message(reply, repop->op->get_asker()); - repop->sent_safe = true; + repop->sent_commit = true; } // ack? @@ -2435,22 +2360,22 @@ void OSD::put_repop(OSDReplicaOp *repop) } } -class C_OSD_WriteSafe : public Context { +class C_OSD_WriteCommit : public Context { public: OSD *osd; OSDReplicaOp *repop; - C_OSD_WriteSafe(OSD *o, OSDReplicaOp *op) : osd(o), repop(op) {} + C_OSD_WriteCommit(OSD *o, OSDReplicaOp *op) : osd(o), repop(op) {} void finish(int r) { - osd->op_modify_safe(repop); + osd->op_modify_commit(repop); } }; -void OSD::op_modify_safe(OSDReplicaOp *repop) +void OSD::op_modify_commit(OSDReplicaOp *repop) { - dout(10) << "op_modify_safe on op " << *repop->op << endl; + dout(10) << "op_modify_commit on op " << *repop->op << endl; get_repop(repop); - assert(repop->waitfor_safe.count(0)); - repop->waitfor_safe.erase(0); + assert(repop->waitfor_commit.count(0)); + repop->waitfor_commit.erase(0); put_repop(repop); } @@ -2483,8 +2408,8 @@ void OSD::op_modify(MOSDOp *op) // issue replica writes OSDReplicaOp *repop = new OSDReplicaOp(op, nv, ov); repop->start = g_clock.now(); - repop->waitfor_ack[0] = whoami; // will need local ack, safe - repop->waitfor_safe[0] = whoami; + repop->waitfor_ack[0] = whoami; // will need local ack, commit + repop->waitfor_commit[0] = whoami; PG *pg; osd_lock.Lock(); @@ -2507,8 +2432,8 @@ void OSD::op_modify(MOSDOp *op) if (op->get_op() == OSD_OP_WRITE) { // write assert(op->get_data().length() == op->get_length()); - Context *onsafe = new C_OSD_WriteSafe(this, repop); - r = apply_write(op, nv, onsafe); + Context *oncommit = new C_OSD_WriteCommit(this, repop); + r = apply_write(op, nv, oncommit); // put new object in proper collection if (ov == 0) @@ -2527,9 +2452,9 @@ void OSD::op_modify(MOSDOp *op) r = store->truncate(oid, op->get_offset()); get_repop(repop); assert(repop->waitfor_ack.count(0)); - assert(repop->waitfor_safe.count(0)); + assert(repop->waitfor_commit.count(0)); repop->waitfor_ack.erase(0); - repop->waitfor_safe.erase(0); + repop->waitfor_commit.erase(0); put_repop(repop); } else if (op->get_op() == OSD_OP_DELETE) { @@ -2538,9 +2463,9 @@ void OSD::op_modify(MOSDOp *op) r = store->remove(oid); get_repop(repop); assert(repop->waitfor_ack.count(0)); - assert(repop->waitfor_safe.count(0)); + assert(repop->waitfor_commit.count(0)); repop->waitfor_ack.erase(0); - repop->waitfor_safe.erase(0); + repop->waitfor_commit.erase(0); put_repop(repop); } else assert(0); diff --git a/ceph/osd/OSD.h b/ceph/osd/OSD.h index 3f54a5f603e48..fb27a4c6502c3 100644 --- a/ceph/osd/OSD.h +++ b/ceph/osd/OSD.h @@ -41,35 +41,35 @@ class OSDReplicaOp { class MOSDOp *op; Mutex lock; map<__uint64_t,int> waitfor_ack; - map<__uint64_t,int> waitfor_safe; + map<__uint64_t,int> waitfor_commit; utime_t start; bool cancel; - bool sent_ack, sent_safe; + bool sent_ack, sent_commit; set osds; version_t new_version, old_version; OSDReplicaOp(class MOSDOp *o, version_t nv, version_t ov) : op(o), - //local_ack(false), local_safe(false), + //local_ack(false), local_commit(false), cancel(false), - sent_ack(false), sent_safe(false), + sent_ack(false), sent_commit(false), new_version(nv), old_version(ov) { } - bool can_send_ack() { return !sent_ack && !sent_safe && //!cancel && + bool can_send_ack() { return !sent_ack && !sent_commit && //!cancel && waitfor_ack.empty(); } - bool can_send_safe() { return !sent_safe && //!cancel && - waitfor_ack.empty() && waitfor_safe.empty(); } - bool can_delete() { return waitfor_ack.empty() && waitfor_safe.empty(); } + bool can_send_commit() { return !sent_commit && //!cancel && + waitfor_ack.empty() && waitfor_commit.empty(); } + bool can_delete() { return waitfor_ack.empty() && waitfor_commit.empty(); } }; inline ostream& operator<<(ostream& out, OSDReplicaOp& repop) { - out << "repop(wfack=" << repop.waitfor_ack << " wfsafe=" << repop.waitfor_safe; + out << "repop(wfack=" << repop.waitfor_ack << " wfcommit=" << repop.waitfor_commit; //if (repop.local_ack) out << " local_ack"; - //if (repop.local_safe) out << " local_safe"; + //if (repop.local_commit) out << " local_commit"; if (repop.cancel) out << " cancel"; out << " op=" << *(repop.op); out << " repop=" << &repop; @@ -122,11 +122,11 @@ class OSD : public Dispatcher { void wait_for_no_ops(); int apply_write(MOSDOp *op, version_t v, - Context *onsafe = 0); + Context *oncommit = 0); void get_repop(OSDReplicaOp*); - void put_repop(OSDReplicaOp*); // will send ack/safe msgs, and delete as necessary. + void put_repop(OSDReplicaOp*); // will send ack/commit msgs, and delete as necessary. void do_op(class MOSDOp *m); @@ -175,7 +175,7 @@ class OSD : public Dispatcher { map > > replica_pg_osd_tids; // pg -> osd -> tid void issue_replica_op(PG *pg, OSDReplicaOp *repop, int osd); - void handle_rep_op_ack(__uint64_t tid, int result, bool safe, int fromosd); + void handle_rep_op_ack(__uint64_t tid, int result, bool commit, int fromosd); // recovery map<__uint64_t,PGPeer*> pull_ops; // tid -> PGPeer* @@ -213,8 +213,8 @@ class OSD : public Dispatcher { void op_rep_remove_reply(class MOSDOpReply *op); void op_rep_modify(class MOSDOp *op); // write, trucnate, delete - void op_rep_modify_safe(class MOSDOp *op); - friend class C_OSD_RepModifySafe; + void op_rep_modify_commit(class MOSDOp *op); + friend class C_OSD_RepModifyCommit; public: OSD(int id, Messenger *m); @@ -233,7 +233,7 @@ class OSD : public Dispatcher { void op_read(class MOSDOp *m); void op_stat(class MOSDOp *m); void op_modify(class MOSDOp *m); - void op_modify_safe(class OSDReplicaOp *repop); + void op_modify_commit(class OSDReplicaOp *repop); // for replication void handle_op_reply(class MOSDOpReply *m); diff --git a/ceph/osd/PG.h b/ceph/osd/PG.h index 1ceec2a8188f6..fd4f9f6429ad2 100644 --- a/ceph/osd/PG.h +++ b/ceph/osd/PG.h @@ -22,6 +22,18 @@ using namespace __gnu_cxx; +struct PGSummary { + pg_t pgid; + version_t version,mtime; + version_t last_epoch_started; +}; + +struct PGContentSummary { + map objects; +}; + + + struct PGReplicaInfo { int state; version_t last_complete; diff --git a/ceph/osdc/Filer.cc b/ceph/osdc/Filer.cc index 0ae46db405e47..23689822bc3c3 100644 --- a/ceph/osdc/Filer.cc +++ b/ceph/osdc/Filer.cc @@ -34,612 +34,49 @@ #define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_filer) cout << MSG_ADDR_NICE(messenger->get_myaddr()) << ".filer " - - - -Filer::Filer(Messenger *m, OSDMap *o) -{ - last_tid = 0; - messenger = m; - osdmap = o; -} - -Filer::~Filer() -{ -} - -void Filer::dispatch(Message *m) -{ - switch (m->get_type()) { - case MSG_OSD_MKFS_ACK: - handle_osd_mkfs_ack(m); - break; - - case MSG_OSD_OPREPLY: - handle_osd_op_reply((MOSDOpReply*)m); - break; - - default: - dout(1) << "don't know message type " << m->get_type() << endl; - assert(0); - } -} - - -bool Filer::is_active() -{ - if (!op_reads.empty() || - !op_modify.empty() || - !op_probes.empty()) { - for (hash_map::iterator it = op_reads.begin(); - it != op_reads.end(); - it++) dout(10) << " pending read op " << it->first << endl; - for (hash_map::iterator it = op_modify.begin(); - it != op_modify.end(); - it++) dout(10) << " pending modify op " << it->first << endl; - return true; - } - return false; -} - -void Filer::handle_osd_map(MOSDMap *m) -{ - if (!osdmap || - m->get_version() > osdmap->get_version()) { - if (osdmap) { - dout(3) << "handle_osd_map got osd map version " << m->get_version() << " > " << osdmap->get_version() << endl; - } else { - dout(3) << "handle_osd_map got osd map version " << m->get_version() << endl; - } - - osdmap->decode(m->get_osdmap()); - - // kick requests who might be timing out on the wrong osds - // ** FIXME ** - - } else { - dout(3) << "handle_osd_map ignoring osd map version " << m->get_version() << " <= " << osdmap->get_version() << endl; - } -} - - - - -/* -void Filer::queue_outgoing(Message *m, int osd) -{ - outgoing.push_back(pair(m,osd)); -} - -void Filer::send_outgoing() -{ - // send messages AFTER all my structs are ready (locking can make replies appear to arrive quickly!) - for (list< pair >::iterator it = outgoing.begin(); - it != outgoing.end(); - it++) { - messenger->send_message(it->first, MSG_ADDR_OSD(it->second), 0); - } -} -*/ - - -// ------------------------------------------------------------ -// read - -int -Filer::read(inode_t& inode, - size_t len, - size_t offset, - bufferlist *bl, - Context *onfinish) -{ - // pending read record - PendingOSDRead_t *p = new PendingOSDRead_t; - p->read_result = bl; - p->orig_offset = offset; - p->bytes_read = 0; - p->onfinish = onfinish; - - // map buffer into OSD extents - file_to_extents(inode, len, offset, p->extents); - - dout(7) << "read ino " << hex << inode.ino << dec << " len " << len << " off " << offset << " in " << p->extents.size() << " object extents" << endl; - - // issue reads - for (list::iterator it = p->extents.begin(); - it != p->extents.end(); - it++) { - last_tid++; - - // issue read - MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(), - it->oid, it->pg, osdmap->get_version(), - OSD_OP_READ); - m->set_length(it->len); - m->set_offset(it->offset); - dout(15) << " read tid " << last_tid << " from osd" << it->osd - << " oid " << hex << it->oid << dec << " off " << it->offset << " len " << it->len - << " (" << it->buffer_extents.size() << " buffer fragments)" << endl; - messenger->send_message(m, MSG_ADDR_OSD(it->osd), 0); - - // add to gather set - p->outstanding_ops.insert(last_tid); - op_reads[last_tid] = p; - } - - return 0; -} - - -void -Filer::handle_osd_read_reply(MOSDOpReply *m) -{ - // get pio - tid_t tid = m->get_tid(); - dout(15) << "handle_osd_read_reply on " << tid << endl; - - assert(op_reads.count(tid)); - PendingOSDRead_t *p = op_reads[ tid ]; - op_reads.erase( tid ); - - // our op finished - p->outstanding_ops.erase(tid); - - // what buffer offset are we? - dout(7) << " got frag from " << hex << m->get_oid() << dec << " len " << m->get_length() << ", still have " << p->outstanding_ops.size() << " more ops" << endl; - - if (p->outstanding_ops.empty()) { - // all done - p->bytes_read = 0; - p->read_result->clear(); - if (p->read_data.size()) { - dout(15) << " assembling frags" << endl; - - /** FIXME This doesn't handle holes efficiently. - * It allocates zero buffers to fill whole buffer, and - * then discards trailing ones at the end. - * - * Actually, this whole thing is pretty messy with temporary bufferlist*'s all over - * the heap. - */ - - // we have other fragments, assemble them all... blech! - p->read_data[m->get_oid()] = new bufferlist; - p->read_data[m->get_oid()]->claim( m->get_data() ); - - // map extents back into buffer - map by_off; // buffer offset -> bufferlist - - // for each object extent... - for (list::iterator eit = p->extents.begin(); - eit != p->extents.end(); - eit++) { - bufferlist *ox_buf = p->read_data[eit->oid]; - unsigned ox_len = ox_buf->length(); - unsigned ox_off = 0; - assert(ox_len <= eit->len); - - // for each buffer extent we're mapping into... - for (map::iterator bit = eit->buffer_extents.begin(); - bit != eit->buffer_extents.end(); - bit++) { - dout(21) << " object " << hex << eit->oid << dec << " extent " << eit->offset << " len " << eit->len << " : ox offset " << ox_off << " -> buffer extent " << bit->first << " len " << bit->second << endl; - by_off[bit->first] = new bufferlist; - - if (ox_off + bit->second <= ox_len) { - // we got the whole bx - by_off[bit->first]->substr_of(*ox_buf, ox_off, bit->second); - if (p->bytes_read < bit->first + bit->second) - p->bytes_read = bit->first + bit->second; - } else if (ox_off + bit->second > ox_len && ox_off < ox_len) { - // we got part of this bx - by_off[bit->first]->substr_of(*ox_buf, ox_off, (ox_len-ox_off)); - if (p->bytes_read < bit->first + ox_len-ox_off) - p->bytes_read = bit->first + ox_len-ox_off; - - // zero end of bx - dout(21) << " adding some zeros to the end " << ox_off + bit->second-ox_len << endl; - bufferptr z = new buffer(ox_off + bit->second - ox_len); - memset(z.c_str(), 0, z.length()); - by_off[bit->first]->append( z ); - } else { - // we got none of this bx. zero whole thing. - assert(ox_off >= ox_len); - dout(21) << " adding all zeros for this bit " << bit->second << endl; - bufferptr z = new buffer(bit->second); - assert(z.length() == bit->second); - memset(z.c_str(), 0, z.length()); - by_off[bit->first]->append( z ); - } - ox_off += bit->second; - } - assert(ox_off == eit->len); - } - - // sort and string bits together - for (map::iterator it = by_off.begin(); - it != by_off.end(); - it++) { - assert(it->second->length()); - if (it->first < p->bytes_read) { - dout(21) << " concat buffer frag off " << it->first << " len " << it->second->length() << endl; - p->read_result->claim_append(*(it->second)); - } else { - dout(21) << " NO concat zero buffer frag off " << it->first << " len " << it->second->length() << endl; - } - delete it->second; - } - - // trim trailing zeros? - if (p->read_result->length() > p->bytes_read) { - dout(10) << " trimming off trailing zeros . bytes_read=" << p->bytes_read << " len=" << p->read_result->length() << endl; - p->read_result->splice(p->bytes_read, p->read_result->length() - p->bytes_read); - assert(p->bytes_read == p->read_result->length()); - } - - // hose p->read_data bufferlist*'s - for (map::iterator it = p->read_data.begin(); - it != p->read_data.end(); - it++) { - delete it->second; - } - } else { - dout(15) << " only one frag" << endl; - - // only one fragment, easy - p->read_result->claim( m->get_data() ); - p->bytes_read = p->read_result->length(); - } - - // finish, clean up - Context *onfinish = p->onfinish; - int result = p->bytes_read; - - dout(7) << " " << result << " bytes " << p->read_result->length() << endl; - - // done - delete p; // del pendingOsdRead_t - if (onfinish) { - onfinish->finish(result); - delete onfinish; - } - } else { - // store my bufferlist for later assembling - p->read_data[m->get_oid()] = new bufferlist; - p->read_data[m->get_oid()]->claim( m->get_data() ); - } - - delete m; -} - - - - -// ------------------------------------------------------------ -// modify ops - -// write - -int -Filer::write(inode_t& inode, - size_t len, - size_t offset, - bufferlist& bl, - int flags, - Context *onack, - Context *onsafe) -{ - last_tid++; - - // pending write record - PendingOSDOp_t *p = new PendingOSDOp_t; - p->onack = onack; - p->onsafe = onsafe; - - // find data - list extents; - file_to_extents(inode, len, offset, extents); - - //assert(onack || onsafe); - - dout(7) << "write ino " << hex << inode.ino << dec << " len " << len << " off " << offset - << " in " << extents.size() << " extents " - //<< onack << "/" << onsafe - << endl; - - size_t off = 0; // ptr into buffer - - for (list::iterator it = extents.begin(); - it != extents.end(); - it++) { - last_tid++; - - // issue write - MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(), - it->oid, it->pg, osdmap->get_version(), - OSD_OP_WRITE); - m->set_length(it->len); - m->set_offset(it->offset); - - // map buffer segments into this extent - // (may be fragmented bc of striping) - bufferlist cur; - for (map::iterator bit = it->buffer_extents.begin(); - bit != it->buffer_extents.end(); - bit++) { - bufferlist thisbit; - thisbit.substr_of(bl, bit->first, bit->second); - cur.claim_append(thisbit); - } - assert(cur.length() == it->len); - m->set_data(cur);//.claim(cur); - - off += it->len; - - // add to gather set - if (onack) - p->waitfor_ack.insert(last_tid); - else - m->set_want_ack(false); - - if (onsafe || !onack) // wait for safe if neither callback is provided (sloppy user) - p->waitfor_safe.insert(last_tid); - else - m->set_want_safe(false); - - op_modify[last_tid] = p; - - // send - dout(10) << " write tid " << last_tid << " osd" << it->osd - << " oid " << hex << it->oid << dec << " off " << it->offset << " len " << it->len << endl; - messenger->send_message(m, MSG_ADDR_OSD(it->osd), 0); - } - - return 0; -} - - -void -Filer::handle_osd_modify_reply(MOSDOpReply *m) -{ - // get pio - tid_t tid = m->get_tid(); - dout(7) << "handle_osd_modify_reply " << tid << " safe " << m->get_safe() << endl; - assert(op_modify.count(tid)); - PendingOSDOp_t *p = op_modify[ tid ]; - - Context *onack = 0; - Context *onsafe = 0; - - // ack or safe? - if (m->get_safe()) { - // safe. - //dout(15) << " handle_osd_modify_reply safe on " << tid << endl; - op_modify.erase( tid ); - p->waitfor_ack.erase(tid); - p->waitfor_safe.erase(tid); - - if (p->waitfor_safe.empty()) { - onack = p->onack; - onsafe = p->onsafe; - delete p; - } - } else { - // ack. - //dout(15) << " handle_osd_modify_reply ack on " << tid << endl; - assert(p->waitfor_ack.count(tid)); - p->waitfor_ack.erase(tid); - - if (p->waitfor_safe.empty()) { - op_modify.erase( tid ); // no safe requested (FIXME or ooo delivery) - assert(p->onsafe == 0); - } - - if (p->waitfor_ack.empty()) { - onack = p->onack; - p->onack = 0; - if (p->waitfor_safe.empty()) - delete p; - } - } - - // do callbacks - if (onack) { - onack->finish(0); - delete onack; - } - if (onsafe) { - onsafe->finish(0); - delete onsafe; - } - - delete m; -} - - - -// ....... - -void -Filer::handle_osd_op_reply(MOSDOpReply *m) -{ - // updated cluster info? - if (m->get_map_version() && - m->get_map_version() > osdmap->get_version()) { - dout(3) << "op reply has newer map " << m->get_map_version() << " > " << osdmap->get_version() << endl; - osdmap->decode( m->get_osdmap() ); - } - - - // read or write? - switch (m->get_op()) { - case OSD_OP_READ: - handle_osd_read_reply(m); - return; - - case OSD_OP_WRITE: - case OSD_OP_TRUNCATE: - case OSD_OP_DELETE: - handle_osd_modify_reply(m); - return; - - default: - assert(0); - } - -} - - -int Filer::truncate(inode_t& inode, - size_t new_size, size_t old_size, - Context *onack, - Context *onsafe) -{ - // pending write record - PendingOSDOp_t *p = new PendingOSDOp_t; - p->onack = onack; - p->onsafe = onsafe; - - // find data - list extents; - file_to_extents(inode, old_size, new_size, extents); - - dout(7) << "truncate ino " << hex << inode.ino << dec << " to new size " << new_size << " from old_size " << old_size << " in " << extents.size() << " extents" << endl; - - int n = 0; - for (list::iterator it = extents.begin(); - it != extents.end(); - it++) { - last_tid++; - - MOSDOp *m; - if (it->offset == 0) { - // issue delete - m = new MOSDOp(last_tid, messenger->get_myaddr(), - it->oid, it->pg, osdmap->get_version(), - OSD_OP_DELETE); - } else { - // issue a truncate - m = new MOSDOp(last_tid, messenger->get_myaddr(), - it->oid, it->pg, osdmap->get_version(), - OSD_OP_TRUNCATE); - m->set_length( it->offset ); - } - messenger->send_message(m, MSG_ADDR_OSD(it->osd), 0); - - // add to gather set - p->waitfor_ack.insert(last_tid); - p->waitfor_safe.insert(last_tid); - op_modify[last_tid] = p; - n++; - } - - if (n == 0) { - delete p; - if (onack) { - onack->finish(0); - delete onack; - } - if (onsafe) { - onsafe->finish(0); - delete onsafe; - } - } - - return 0; -} - - - -// ------------------------------------------- -// mkfs on all osds, wipe everything. - -int Filer::mkfs(Context *onfinish) -{ - dout(7) << "mkfs, wiping all OSDs" << endl; - - // send MKFS to osds - set ls; - osdmap->get_all_osds(ls); - - for (set::iterator it = ls.begin(); - it != ls.end(); - it++) { - - ++last_tid; - - // issue mkfs - messenger->send_message(new MOSDMap(osdmap, true), - MSG_ADDR_OSD(*it)); - pending_mkfs.insert(*it); - } - - waiting_for_mkfs = onfinish; - return 0; -} - - -void Filer::handle_osd_mkfs_ack(Message *m) -{ - int from = MSG_ADDR_NUM(m->get_source()); - - assert(pending_mkfs.count(from)); - pending_mkfs.erase(from); - - if (pending_mkfs.empty()) { - dout(2) << "done with mkfs" << endl; - waiting_for_mkfs->finish(0); - delete waiting_for_mkfs; - waiting_for_mkfs = 0; - } -} - - - - void Filer::file_to_extents(inode_t inode, size_t len, - size_t offset, - list& extents) + off_t offset, + list& extents) { /* we want only one extent per object! * this means that each extent we read may map into different bits of the * final read buffer.. hence OSDExtent.buffer_extents */ - map< object_t, OSDExtent > object_extents; + map< object_t, ObjectExtent > object_extents; assert(inode.layout.object_size >= inode.layout.stripe_size); - size_t stripes_per_object = inode.layout.object_size / inode.layout.stripe_size; + off_t stripes_per_object = inode.layout.object_size / inode.layout.stripe_size; - size_t cur = offset; - size_t left = len; + off_t cur = offset; + off_t left = len; while (left > 0) { // layout into objects - size_t blockno = cur / inode.layout.stripe_size; - size_t stripeno = blockno / inode.layout.stripe_count; - size_t stripepos = blockno % inode.layout.stripe_count; - size_t objectsetno = stripeno / stripes_per_object; - size_t objectno = objectsetno * inode.layout.stripe_count + stripepos; + off_t blockno = cur / inode.layout.stripe_size; + off_t stripeno = blockno / inode.layout.stripe_count; + off_t stripepos = blockno % inode.layout.stripe_count; + off_t objectsetno = stripeno / stripes_per_object; + off_t objectno = objectsetno * inode.layout.stripe_count + stripepos; // find oid, extent - OSDExtent *ex = 0; + ObjectExtent *ex = 0; object_t oid = file_to_object( inode.ino, objectno ); if (object_extents.count(oid)) ex = &object_extents[oid]; else { ex = &object_extents[oid]; ex->oid = oid; - ex->pg = osdmap->object_to_pg( oid, inode.layout ); - ex->osd = osdmap->get_pg_acting_primary( ex->pg ); + ex->pgid = objecter->osdmap->object_to_pg( oid, inode.layout ); + //ex->osd = objecter->osdmap->get_pg_acting_primary( ex->pg ); } // map range into object - size_t block_start = (stripeno % stripes_per_object)*inode.layout.stripe_size; - size_t block_off = cur % inode.layout.stripe_size; - size_t max = inode.layout.stripe_size - block_off; + off_t block_start = (stripeno % stripes_per_object)*inode.layout.stripe_size; + off_t block_off = cur % inode.layout.stripe_size; + off_t max = inode.layout.stripe_size - block_off; - size_t x_offset = block_start + block_off; - size_t x_len; + off_t x_offset = block_start + block_off; + off_t x_len; if (left > max) x_len = max; else @@ -664,7 +101,7 @@ void Filer::file_to_extents(inode_t inode, } // make final list - for (map::iterator it = object_extents.begin(); + for (map::iterator it = object_extents.begin(); it != object_extents.end(); it++) { extents.push_back(it->second); diff --git a/ceph/osdc/Filer.h b/ceph/osdc/Filer.h index 63b45b3e490d4..00b1b72e37977 100644 --- a/ceph/osdc/Filer.h +++ b/ceph/osdc/Filer.h @@ -37,127 +37,57 @@ using namespace std; using namespace __gnu_cxx; #include "include/types.h" -#include "msg/Dispatcher.h" + #include "OSDMap.h" +#include "Objecter.h" class Context; class Messenger; class OSDMap; -/*** types ***/ -typedef __uint64_t tid_t; - -//#define FILER_FLAG_TRUNCATE_AFTER_WRITE 1 - - - -/** OSDExtent - * for mapping (ino, offset, len) to a (list of) byte extents in objects on osds - */ -class OSDExtent { - public: - int osd; // (acting) primary osd - object_t oid; // object id - pg_t pg; // placement group - size_t offset, len; // extent within the object - map buffer_extents; // off -> len. extents in buffer being mapped (may be fragmented bc of striping!) - - OSDExtent() : osd(0), oid(0), pg(0), offset(0), len(0) { } -}; - - - - -/*** track pending operations ***/ -typedef struct { - set outstanding_ops; - size_t orig_offset; - list extents; - map read_data; // bits of data as they come back - - bufferlist *read_result; // eventaully condensed into here. - - size_t bytes_read; - Context *onfinish; -} PendingOSDRead_t; - -typedef struct { - set waitfor_ack; - Context *onack; - set waitfor_safe; - Context *onsafe; -} PendingOSDOp_t; - -typedef struct { - size_t *final_size; - size_t cur_offset; - Context *onfinish; -} PendingOSDProbe_t; - /**** Filer interface ***/ -class Filer : public Dispatcher { - OSDMap *osdmap; // what osds am i dealing with? - Messenger *messenger; +class Filer { //: public Dispatcher { + Objecter *objecter; - __uint64_t last_tid; - hash_map op_reads; - hash_map op_modify; - - hash_map op_probes; - - set pending_mkfs; - Context *waiting_for_mkfs; - public: - Filer(Messenger *m, OSDMap *o); - ~Filer(); + Filer(Objecter *o) : objecter(o) {} + ~Filer() {} - void dispatch(Message *m); - - bool is_active(); + bool is_active() { + return objecter->is_active(); + } - // osd fun + // ** async file interface ** int read(inode_t& inode, size_t len, - size_t offset, + off_t offset, bufferlist *bl, // ptr to data - Context *c); - - int probe_size(inode_t& inode, - size_t *size, Context *c); + Context *onfinish) { + Objecter::OSDRead *rd = prepare_read(inode, len, offset, bl); + return objecter->readx(rd, onfinish); + } int write(inode_t& inode, size_t len, - size_t offset, + off_t offset, bufferlist& bl, int flags, Context *onack, - Context *onsafe); - - int remove(inode_t& inode, - size_t old_size, - Context *onack, - Context *onsafe) { - return truncate(inode, 0, old_size, onack, onsafe); + Context *oncommit) { + Objecter::OSDWrite *wr = prepare_write(inode, len, offset, bl); + return objecter->writex(wr, onack, oncommit); } - int truncate(inode_t& ino, - size_t new_size, size_t old_size, - Context *onack, - Context *onsafe); - - //int zero(inodeno_t ino, size_t len, size_t offset, Context *c); - int mkfs(Context *c); - void handle_osd_mkfs_ack(Message *m); - - void handle_osd_op_reply(class MOSDOpReply *m); - void handle_osd_read_reply(class MOSDOpReply *m); - void handle_osd_modify_reply(class MOSDOpReply *m); - - void handle_osd_map(class MOSDMap *m); - + int zero(inode_t& inode, + size_t len, + off_t offset, + Context *onack, + Context *oncommit) { + Objecter::OSDZero *z = prepare_zero(inode, len, offset); + return objecter->zerox(z, onack, oncommit); + } /***** mapping *****/ @@ -177,9 +107,37 @@ class Filer : public Dispatcher { (byte ranges in objects on (primary) osds) */ void file_to_extents(inode_t inode, size_t len, - size_t offset, - list& extents); + off_t offset, + list& extents); + Objecter::OSDRead* + prepare_read(inode_t& inode, + size_t len, + off_t offset, + bufferlist *bl) { // result goes here + Objecter::OSDRead *rd = new Objecter::OSDRead(bl); + file_to_extents(inode, len, offset, rd->extents); + return rd; + } + + Objecter::OSDWrite* + prepare_write(inode_t& inode, + size_t len, + off_t offset, + bufferlist& bl) { + Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl); + file_to_extents(inode, len, offset, wr->extents); + return wr; + } + + Objecter::OSDZero* + prepare_zero(inode_t& inode, + size_t len, + off_t offset) { + Objecter::OSDZero *z = new Objecter::OSDZero; + file_to_extents(inode, len, offset, z->extents); + return z; + } }; diff --git a/ceph/osdc/ObjectCacher.h b/ceph/osdc/ObjectCacher.h new file mode 100644 index 0000000000000..4cdc25041757f --- /dev/null +++ b/ceph/osdc/ObjectCacher.h @@ -0,0 +1,66 @@ + + + +class ObjectCacher { + Objecter *objecter; + + + class Object { + + class BufferHead { + public: + const int CLEAN = 1; + const int DIRTY = 2; + const int RX = 3; + const int TX = 4; + int state; + + }; + + map bh_map; + + class Lock { + public: + const int NONE = 0; + const int WRLOCK = 1; + //const int RDLOCK = 2; + + int state; + + Lock() : state(NONE) {} + }; + + }; + + + + int map_read(OSDRead *rd); + int map_write(OSDWrite *wr); + + + void flush(set& objects); + void flush_all(); + + void commit(set& objects); + void commit_all(); + + + +}; + + + +// sync write (relaxed consistency, bypass cache) +Filer->prepare_write(); +Objecter->writex(...); // make async call +cond.Wait(); // wait for ack + +// sync write (correct) +Filer->prepare_write(); +ObjectCache->atomic_sync_writex(...); // blocks until sync write happens, or i get write locks + +// async write +Filer->prepare_write(); +ObjectCache->writex(...); // non-blocking. update cache. + or +Objecter->writex(...); // non-blocking. don't update cache. diff --git a/ceph/osdc/Objecter.cc b/ceph/osdc/Objecter.cc new file mode 100644 index 0000000000000..ec3e7b787488b --- /dev/null +++ b/ceph/osdc/Objecter.cc @@ -0,0 +1,516 @@ + +#include "Objecter.h" +#include "OSDMap.h" + +#include "msg/Messenger.h" +#include "msg/Message.h" + +#include "messages/MOSDOp.h" +#include "messages/MOSDOpReply.h" +#include "messages/MOSDMap.h" + + +#include "config.h" +#undef dout +#define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_filer) cout << MSG_ADDR_NICE(messenger->get_myaddr()) << ".objecter " + + +// messages ------------------------------ + +void Objecter::dispatch(Message *m) +{ + switch (m->get_type()) { + case MSG_OSD_OPREPLY: + handle_osd_op_reply((MOSDOpReply*)m); + break; + + default: + dout(1) << "don't know message type " << m->get_type() << endl; + assert(0); + } +} + +void Objecter::handle_osd_map(MOSDMap *m) +{ + if (!osdmap || + m->get_version() > osdmap->get_version()) { + if (osdmap) { + dout(3) << "handle_osd_map got osd map version " << m->get_version() + << " > " << osdmap->get_version() << endl; + } else { + dout(3) << "handle_osd_map got osd map version " << m->get_version() + << endl; + } + + osdmap->decode(m->get_osdmap()); + + // kick requests who might be timing out on the wrong osds + // ** FIXME ** + + } else { + dout(3) << "handle_osd_map ignoring osd map version " << m->get_version() + << " <= " << osdmap->get_version() << endl; + } +} + + +void Objecter::handle_osd_op_reply(MOSDOpReply *m) +{ + // updated cluster info? + if (m->get_map_version() && + m->get_map_version() > osdmap->get_version()) { + dout(3) << "op reply has newer map " << m->get_map_version() << " > " << osdmap->get_version() << endl; + osdmap->decode( m->get_osdmap() ); + } + + + // read or write? + switch (m->get_op()) { + case OSD_OP_READ: + handle_osd_read_reply(m); + return; + + case OSD_OP_WRITE: + handle_osd_write_reply(m); + return; + + case OSD_OP_ZERO: + handle_osd_zero_reply(m); + return; + + default: + assert(0); + } + +} + + +// read ----------------------------------- + + +int Objecter::read(object_t oid, off_t off, size_t len, bufferlist *bl, + Context *onfinish) +{ + OSDRead *rd = new OSDRead(bl); + rd->extents.push_back(ObjectExtent(oid, off, len)); + rd->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout ); + return readx(rd, onfinish); +} + +int Objecter::readx(OSDRead *rd, Context *onfinish) +{ + rd->onfinish = onfinish; + + // issue reads + for (list::iterator it = rd->extents.begin(); + it != rd->extents.end(); + it++) { + // find OSD + int osd = osdmap->get_pg_acting_primary( it->pgid ); + + // send + last_tid++; + MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(), + it->oid, it->pgid, osdmap->get_version(), + OSD_OP_READ); + m->set_length(it->len); + m->set_offset(it->offset); + dout(15) << " read tid " << last_tid << " from osd" << osd + << " oid " << hex << it->oid << dec << " off " << it->offset << " len " << it->len + << " (" << it->buffer_extents.size() << " buffer fragments)" << endl; + messenger->send_message(m, MSG_ADDR_OSD(osd), 0); + + // add to gather set + rd->ops[last_tid] = *it; + op_read[last_tid] = rd; + } + + return 0; +} + +void Objecter::handle_osd_read_reply(MOSDOpReply *m) +{ + // get pio + tid_t tid = m->get_tid(); + dout(15) << "handle_osd_read_reply on " << tid << endl; + + assert(op_read.count(tid)); + OSDRead *rd = op_read[ tid ]; + op_read.erase( tid ); + + // our op finished + rd->ops.erase(tid); + + // what buffer offset are we? + dout(7) << " got frag from " << hex << m->get_oid() << dec << " len " << m->get_length() << ", still have " << rd->ops.size() << " more ops" << endl; + + if (rd->ops.empty()) { + // all done + size_t bytes_read = 0; + rd->bl->clear(); + + if (rd->read_data.size()) { + dout(15) << " assembling frags" << endl; + + /** FIXME This doesn't handle holes efficiently. + * It allocates zero buffers to fill whole buffer, and + * then discards trailing ones at the end. + * + * Actually, this whole thing is pretty messy with temporary bufferlist*'s all over + * the heap. + */ + + // we have other fragments, assemble them all... blech! + rd->read_data[m->get_oid()] = new bufferlist; + rd->read_data[m->get_oid()]->claim( m->get_data() ); + + // map extents back into buffer + map by_off; // buffer offset -> bufferlist + + // for each object extent... + for (list::iterator eit = rd->extents.begin(); + eit != rd->extents.end(); + eit++) { + bufferlist *ox_buf = rd->read_data[eit->oid]; + unsigned ox_len = ox_buf->length(); + unsigned ox_off = 0; + assert(ox_len <= eit->len); + + // for each buffer extent we're mapping into... + for (map::iterator bit = eit->buffer_extents.begin(); + bit != eit->buffer_extents.end(); + bit++) { + dout(21) << " object " << hex << eit->oid << dec << " extent " << eit->offset << " len " << eit->len << " : ox offset " << ox_off << " -> buffer extent " << bit->first << " len " << bit->second << endl; + by_off[bit->first] = new bufferlist; + + if (ox_off + bit->second <= ox_len) { + // we got the whole bx + by_off[bit->first]->substr_of(*ox_buf, ox_off, bit->second); + if (bytes_read < bit->first + bit->second) + bytes_read = bit->first + bit->second; + } else if (ox_off + bit->second > ox_len && ox_off < ox_len) { + // we got part of this bx + by_off[bit->first]->substr_of(*ox_buf, ox_off, (ox_len-ox_off)); + if (bytes_read < bit->first + ox_len-ox_off) + bytes_read = bit->first + ox_len-ox_off; + + // zero end of bx + dout(21) << " adding some zeros to the end " << ox_off + bit->second-ox_len << endl; + bufferptr z = new buffer(ox_off + bit->second - ox_len); + memset(z.c_str(), 0, z.length()); + by_off[bit->first]->append( z ); + } else { + // we got none of this bx. zero whole thing. + assert(ox_off >= ox_len); + dout(21) << " adding all zeros for this bit " << bit->second << endl; + bufferptr z = new buffer(bit->second); + assert(z.length() == bit->second); + memset(z.c_str(), 0, z.length()); + by_off[bit->first]->append( z ); + } + ox_off += bit->second; + } + assert(ox_off == eit->len); + } + + // sort and string bits together + for (map::iterator it = by_off.begin(); + it != by_off.end(); + it++) { + assert(it->second->length()); + if (it->first < bytes_read) { + dout(21) << " concat buffer frag off " << it->first << " len " << it->second->length() << endl; + rd->bl->claim_append(*(it->second)); + } else { + dout(21) << " NO concat zero buffer frag off " << it->first << " len " << it->second->length() << endl; + } + delete it->second; + } + + // trim trailing zeros? + if (rd->bl->length() > bytes_read) { + dout(10) << " trimming off trailing zeros . bytes_read=" << bytes_read + << " len=" << rd->bl->length() << endl; + rd->bl->splice(bytes_read, rd->bl->length() - bytes_read); + assert(bytes_read == rd->bl->length()); + } + + // hose p->read_data bufferlist*'s + for (map::iterator it = rd->read_data.begin(); + it != rd->read_data.end(); + it++) { + delete it->second; + } + } else { + dout(15) << " only one frag" << endl; + + // only one fragment, easy + rd->bl->claim( m->get_data() ); + bytes_read = rd->bl->length(); + } + + // finish, clean up + Context *onfinish = rd->onfinish; + + dout(7) << " " << bytes_read << " bytes " << rd->bl->length() << endl; + + // done + delete rd; + if (onfinish) { + onfinish->finish(bytes_read); + delete onfinish; + } + } else { + // store my bufferlist for later assembling + rd->read_data[m->get_oid()] = new bufferlist; + rd->read_data[m->get_oid()]->claim( m->get_data() ); + } + + delete m; +} + + + +// write ------------------------------------ + +int Objecter::write(object_t oid, off_t off, size_t len, bufferlist &bl, + Context *onack, Context *oncommit) +{ + OSDWrite *wr = new OSDWrite(bl); + wr->extents.push_back(ObjectExtent(oid, off, len)); + wr->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout ); + return writex(wr, onack, oncommit); +} + +int Objecter::writex(OSDWrite *wr, Context *onack, Context *oncommit) +{ + wr->onack = onack; + wr->oncommit = oncommit; + + size_t off = 0; // ptr into buffer + + // issue writes + for (list::iterator it = wr->extents.begin(); + it != wr->extents.end(); + it++) { + // find + int osd = osdmap->get_pg_acting_primary( it->pgid ); + + // send + last_tid++; + MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(), + it->oid, it->pgid, osdmap->get_version(), + OSD_OP_WRITE); + m->set_length(it->len); + m->set_offset(it->offset); + + // map buffer segments into this extent + // (may be fragmented bc of striping) + bufferlist cur; + for (map::iterator bit = it->buffer_extents.begin(); + bit != it->buffer_extents.end(); + bit++) { + bufferlist thisbit; + thisbit.substr_of(wr->bl, bit->first, bit->second); + cur.claim_append(thisbit); + } + assert(cur.length() == it->len); + m->set_data(cur);//.claim(cur); + + off += it->len; + + // add to gather set + if (onack) + wr->waitfor_ack[last_tid] = *it; + else + m->set_want_ack(false); + + if (oncommit || !onack) // wait for commit if neither callback is provided (sloppy user) + wr->waitfor_commit[last_tid] = *it; + else + m->set_want_commit(false); + + op_write[last_tid] = wr; + + // send + dout(10) << " write tid " << last_tid << " osd" << osd + << " oid " << hex << it->oid << dec << " off " << it->offset << " len " << it->len << endl; + messenger->send_message(m, MSG_ADDR_OSD(osd), 0); + } + + return 0; +} + + + +void Objecter::handle_osd_write_reply(MOSDOpReply *m) +{ + // get pio + tid_t tid = m->get_tid(); + dout(7) << "handle_osd_write_reply " << tid << " commit " << m->get_commit() << endl; + assert(op_write.count(tid)); + OSDWrite *wr = op_write[ tid ]; + + Context *onack = 0; + Context *oncommit = 0; + + // ack or commit? + if (m->get_commit()) { + // commit. + //dout(15) << " handle_osd_write_reply commit on " << tid << endl; + op_write.erase( tid ); + wr->waitfor_ack.erase(tid); + wr->waitfor_commit.erase(tid); + + if (wr->waitfor_commit.empty()) { + onack = wr->onack; + oncommit = wr->oncommit; + delete wr; + } + } else { + // ack. + //dout(15) << " handle_osd_write_reply ack on " << tid << endl; + assert(wr->waitfor_ack.count(tid)); + wr->waitfor_ack.erase(tid); + + if (wr->waitfor_commit.empty()) { + op_write.erase( tid ); // no commit requested (FIXME or ooo delivery) + assert(wr->oncommit == 0); + } + + if (wr->waitfor_ack.empty()) { + onack = wr->onack; + wr->onack = 0; + if (wr->waitfor_commit.empty()) + delete wr; + } + } + + // do callbacks + if (onack) { + onack->finish(0); + delete onack; + } + if (oncommit) { + oncommit->finish(0); + delete oncommit; + } + + delete m; +} + + + +// zero --------------------------------------------------- + +int Objecter::zero(object_t oid, off_t off, size_t len, + Context *onack, Context *oncommit) +{ + OSDZero *z = new OSDZero; + z->extents.push_back(ObjectExtent(oid, off, len)); + z->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout ); + return zerox(z, onack, oncommit); +} + +int Objecter::zerox(OSDZero *z, Context *onack, Context *oncommit) +{ + z->onack = onack; + z->oncommit = oncommit; + + size_t off = 0; // ptr into buffer + + // issue writes + for (list::iterator it = z->extents.begin(); + it != z->extents.end(); + it++) { + // find + int osd = osdmap->get_pg_acting_primary( it->pgid ); + + // send + last_tid++; + MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(), + it->oid, it->pgid, osdmap->get_version(), + OSD_OP_ZERO); + m->set_length(it->len); + m->set_offset(it->offset); + + off += it->len; + + // add to gather set + if (onack) + z->waitfor_ack[last_tid] = *it; + else + m->set_want_ack(false); + + if (oncommit || !onack) // wait for commit if neither callback is provided (sloppy user) + z->waitfor_commit[last_tid] = *it; + else + m->set_want_commit(false); + + op_zero[last_tid] = z; + + // send + dout(10) << " zero tid " << last_tid << " osd" << osd + << " oid " << hex << it->oid << dec << " off " << it->offset << " len " << it->len << endl; + messenger->send_message(m, MSG_ADDR_OSD(osd), 0); + } + + return 0; +} + + +void Objecter::handle_osd_zero_reply(MOSDOpReply *m) +{ + // get pio + tid_t tid = m->get_tid(); + dout(7) << "handle_osd_zero_reply " << tid << " commit " << m->get_commit() << endl; + assert(op_write.count(tid)); + OSDZero *z = op_zero[ tid ]; + + Context *onack = 0; + Context *oncommit = 0; + + // ack or commit? + if (m->get_commit()) { + // commit. + //dout(15) << " handle_osd_write_reply commit on " << tid << endl; + op_write.erase( tid ); + z->waitfor_ack.erase(tid); + z->waitfor_commit.erase(tid); + + if (z->waitfor_commit.empty()) { + onack = z->onack; + oncommit = z->oncommit; + delete z; + } + } else { + // ack. + //dout(15) << " handle_osd_write_reply ack on " << tid << endl; + assert(z->waitfor_ack.count(tid)); + z->waitfor_ack.erase(tid); + + if (z->waitfor_commit.empty()) { + op_write.erase( tid ); // no commit requested (FIXME or ooo delivery) + assert(z->oncommit == 0); + } + + if (z->waitfor_ack.empty()) { + onack = z->onack; + z->onack = 0; + if (z->waitfor_commit.empty()) + delete z; + } + } + + // do callbacks + if (onack) { + onack->finish(0); + delete onack; + } + if (oncommit) { + oncommit->finish(0); + delete oncommit; + } + + delete m; +} diff --git a/ceph/osdc/Objecter.h b/ceph/osdc/Objecter.h new file mode 100644 index 0000000000000..9321c8ca924c4 --- /dev/null +++ b/ceph/osdc/Objecter.h @@ -0,0 +1,126 @@ +#ifndef __OBJECTER_H +#define __OBJECTER_H + +#include "include/types.h" +#include "include/bufferlist.h" + +#include +#include +#include +using namespace std; +using namespace __gnu_cxx; + +class Context; +class Messenger; +class OSDMap; +class Message; + + +// new types +typedef __uint64_t tid_t; + +class ObjectExtent { + public: + object_t oid; // object id + pg_t pgid; + size_t offset, len; // extent within the object + map buffer_extents; // off -> len. extents in buffer being mapped (may be fragmented bc of striping!) + + ObjectExtent(object_t o=0, off_t off=0, size_t l=0) : oid(o), offset(off), len(l) { } +}; + +class Objecter { + public: + Messenger *messenger; + OSDMap *osdmap; + + private: + tid_t last_tid; + + /*** track pending operations ***/ + // read + public: + class OSDRead { + public: + bufferlist *bl; + list extents; + Context *onfinish; + map ops; + map read_data; // bits of data as they come back + + OSDRead(bufferlist *b) : bl(b), onfinish(0) {} + }; + + // write + class OSDWrite { + public: + list extents; + bufferlist bl; + Context *onack; + Context *oncommit; + map waitfor_ack; + map waitfor_commit; + + OSDWrite(bufferlist &b) : bl(b), onack(0), oncommit(0) {} + }; + + // zero + class OSDZero { + public: + list extents; + Context *onack; + Context *oncommit; + map waitfor_ack; + map waitfor_commit; + + OSDZero() : onack(0), oncommit(0) {} + }; + + private: + // pending ops + hash_map op_read; + hash_map op_write; + hash_map op_zero; + + + public: + Objecter(Messenger *m, OSDMap *om) : + messenger(m), osdmap(om), + last_tid(0) + {} + ~Objecter() { + // clean up op_* + // *** + } + + // messages + public: + void dispatch(Message *m); + void handle_osd_op_reply(class MOSDOpReply *m); + void handle_osd_read_reply(class MOSDOpReply *m); + void handle_osd_write_reply(class MOSDOpReply *m); + void handle_osd_zero_reply(MOSDOpReply *m); + void handle_osd_map(class MOSDMap *m); + + private: + + + // public interface + public: + bool is_active() { + return !(op_read.empty() && op_write.empty()); + } + + int readx(OSDRead *read, Context *onfinish); + int writex(OSDWrite *write, Context *onack, Context *oncommit); + int zerox(OSDZero *zero, Context *onack, Context *oncommit); + + int read(object_t oid, off_t off, size_t len, bufferlist *bl, + Context *onfinish); + int write(object_t oid, off_t off, size_t len, bufferlist &bl, + Context *onack, Context *oncommit); + int zero(object_t oid, off_t off, size_t len, + Context *onack, Context *oncommit); +}; + +#endif -- 2.39.5