From 549c27f80b9d510f17df120974c8e93dc261d537 Mon Sep 17 00:00:00 2001 From: sage Date: Fri, 10 Feb 2006 19:23:53 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@607 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/client/Client.cc | 28 ++++---- ceph/common/Timer.cc | 8 +-- ceph/msg/CheesySerializer.cc | 127 ----------------------------------- ceph/msg/CheesySerializer.h | 52 -------------- ceph/msg/MPIMessenger.cc | 1 - ceph/msg/Messenger.cc | 15 ++--- ceph/msg/Semaphore.h | 60 ----------------- ceph/msg/TCPMessenger.cc | 9 ++- ceph/osd/OSD.cc | 39 +++++++---- ceph/osd/OSD.h | 2 +- 10 files changed, 56 insertions(+), 285 deletions(-) delete mode 100644 ceph/msg/CheesySerializer.cc delete mode 100644 ceph/msg/CheesySerializer.h delete mode 100755 ceph/msg/Semaphore.h diff --git a/ceph/client/Client.cc b/ceph/client/Client.cc index 007122ab57dad..3eeac9604a973 100644 --- a/ceph/client/Client.cc +++ b/ceph/client/Client.cc @@ -501,8 +501,8 @@ void Client::flush_inode_buffers(Inode *in) dout(7) << "inflight buffers of sync write, waiting" << endl; Cond cond; in->waitfor_flushed.push_back(&cond); - cond.Wait(client_lock); - assert(in->inflight_buffers.empty()); + while (!in->inflight_buffers.empty()) + cond.Wait(client_lock); dout(7) << "inflight buffers flushed" << endl; } else if (g_conf.client_bcache && @@ -953,7 +953,7 @@ int Client::unmount() lru.lru_set_max(0); trim_cache(); - if (lru.lru_get_size() > 0 || !inode_map.empty()) { + while (lru.lru_get_size() > 0 || !inode_map.empty()) { dout(3) << "cache still has " << lru.lru_get_size() << "+" << inode_map.size() << " items, waiting (presumably for caps to be released?)" << endl; unmount_cond.Wait(client_lock); } @@ -1641,20 +1641,21 @@ int Client::close(fh_t fh) class C_Client_Cond : public Context { public: + bool *done; Cond *cond; Mutex *mutex; int *rvalue; - bool finished; - C_Client_Cond(Cond *cond, Mutex *mutex, int *rvalue) { + C_Client_Cond(bool *d, Cond *cond, Mutex *mutex, int *rvalue) { + this->done = d; this->cond = cond; this->mutex = mutex; this->rvalue = rvalue; - this->finished = false; + *done = false; } void finish(int r) { //mutex->Lock(); *rvalue = r; - finished = true; + *done = true; cond->Signal(); //mutex->Unlock(); } @@ -1735,9 +1736,11 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset) Cond cond; bufferlist blist; // data will go here - C_Client_Cond *onfinish = new C_Client_Cond(&cond, &client_lock, &rvalue); + bool done = false; + C_Client_Cond *onfinish = new C_Client_Cond(&done, &cond, &client_lock, &rvalue); filer->read(in->inode, size, offset, &blist, onfinish); - cond.Wait(client_lock); + while (!done) + cond.Wait(client_lock); // copy data into caller's buf blist.copy(0, blist.length(), buf); @@ -1941,13 +1944,14 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset) Cond cond; int rvalue = 0; - C_Client_Cond *onfinish = new C_Client_Cond(&cond, &client_lock, &rvalue); + bool done = false; + C_Client_Cond *onfinish = new C_Client_Cond(&done, &cond, &client_lock, &rvalue); filer->write(in->inode, size, offset, blist, 0, //NULL,NULL); // no wait hack onfinish, NULL); // applied //NULL, onfinish); // safe on disk - - cond.Wait(client_lock); + while (!done) + cond.Wait(client_lock); } } diff --git a/ceph/common/Timer.cc b/ceph/common/Timer.cc index 53ca621fcf85a..c997f112f9947 100644 --- a/ceph/common/Timer.cc +++ b/ceph/common/Timer.cc @@ -138,9 +138,9 @@ void Timer::register_timer() if (thread_id) { dout(DBL) << "register_timer kicking thread" << endl; if (timed_sleep) - timeout_cond.Signal(); + timeout_cond.SignalAll(); else - sleep_cond.Signal(); + sleep_cond.SignalAll(); } else { dout(DBL) << "register_timer starting thread" << endl; pthread_create(&thread_id, NULL, timer_thread_entrypoint, (void*)this); @@ -155,9 +155,9 @@ void Timer::cancel_timer() lock.Lock(); thread_stop = true; if (timed_sleep) - timeout_cond.Signal(); + timeout_cond.SignalAll(); else - sleep_cond.Signal(); + sleep_cond.SignalAll(); lock.Unlock(); dout(10) << "waiting for thread to finish" << endl; diff --git a/ceph/msg/CheesySerializer.cc b/ceph/msg/CheesySerializer.cc deleted file mode 100644 index cb3c179c1c1fc..0000000000000 --- a/ceph/msg/CheesySerializer.cc +++ /dev/null @@ -1,127 +0,0 @@ - -#include "CheesySerializer.h" -#include "Message.h" -#include "Messenger.h" - -#include -using namespace std; - -#include "config.h" -#undef dout -#define dout(l) if (l<=g_conf.debug) cout << "serializer: " - -#define DEBUGLVL 10 // debug level of output - -// --------- -// incoming messages - -void CheesySerializer::dispatch(Message *m) -{ - long pcid = m->get_pcid(); - Dispatcher *dispatcher = get_dispatcher(); - - lock.Lock(); - - // was i expecting it? - if (call_cond.count(pcid)) { - // yes, this is a reply to a pending call. - dout(DEBUGLVL) << "dispatch got reply for " << pcid << " " << m << endl; - call_reply[pcid] = m; // set reply - int r = call_cond[pcid]->Signal(); - //cout << "post = " << r << endl; - lock.Unlock(); - } else { - // no, this is an unsolicited message. - lock.Unlock(); - dout(DEBUGLVL) << "dispatch got unsolicited message pcid " << pcid << " m " << m << endl; - dispatcher->dispatch(m); - } -} - - -// --------- -// outgoing messages - -int CheesySerializer::send_message(Message *m, msg_addr_t dest, int port, int fromport) -{ - // just pass it on to the messenger - dout(DEBUGLVL) << "send " << m << endl; - //m->set_pcid(0); - messenger->send_message(m, dest, port, fromport); -} - -Message *CheesySerializer::sendrecv(Message *m, msg_addr_t dest, int port) -{ - int fromport = 0; - - Cond cond; - - // make up a pcid that is unique (to me!) - /* NOTE: since request+replies are matched up on pcid's alone, it means that - two nodes using this mechanism can't do calls of each other or else their - pcid's might overlap. - This should be fine. only the Client uses this so far (not MDS). - If OSDs want to use this, though, this must be made smarter!!! - */ - long pcid = ++last_pcid; - m->set_pcid(pcid); - - lock.Lock(); - - dout(DEBUGLVL) << "sendrecv sending " << m << " on pcid " << pcid << endl; - - // add call records - assert(call_cond.count(pcid) == 0); // pcid should be UNIQUE - call_cond[pcid] = &cond; - call_reply[pcid] = 0; // no reply yet - - // send. drop locks in case send_message is bad and blocks - lock.Unlock(); - messenger->send_message(m, dest, port, fromport); - lock.Lock(); - - // wait? - if (call_reply[pcid] == 0) { - dout(DEBUGLVL) << "sendrecv waiting for reply on pcid " << pcid << endl; - //cout << "wait start, value = " << sem->Value() << endl; - - cond.Wait(lock); - } else { - dout(DEBUGLVL) << "sendrecv reply is already here on pcid " << pcid << endl; - } - - // pick up reply - Message *reply = call_reply[pcid]; - //assert(reply); - call_reply.erase(pcid); // remove from call map - call_cond.erase(pcid); - - dout(DEBUGLVL) << "sendrecv got reply " << reply << " on pcid " << pcid << endl; - //delete sem; - - lock.Unlock(); - - return reply; -} - - -// ------------- - -int CheesySerializer::shutdown() -{ - dout(1) << "shutdown" << endl; - - // abort any pending sendrecv's - lock.Lock(); - for (map::iterator it = call_cond.begin(); - it != call_cond.end(); - it++) { - dout(1) << "shutdown waking up (hung) pcid " << it->first << endl; - it->second->Signal(); // wake up! - } - lock.Unlock(); - - // shutdown and delete underlying messenger. - messenger->shutdown(); - delete messenger; -} diff --git a/ceph/msg/CheesySerializer.h b/ceph/msg/CheesySerializer.h deleted file mode 100644 index 99698795d6e5e..0000000000000 --- a/ceph/msg/CheesySerializer.h +++ /dev/null @@ -1,52 +0,0 @@ -#ifndef __CHEESY_MESSENGER_H -#define __CHEESY_MESSENGER_H - -#include "Dispatcher.h" -#include "Message.h" - -#include "Messenger.h" - -#include "common/Cond.h" -#include "common/Mutex.h" - -#include - -#include -using namespace std; - -class CheesySerializer : public Messenger, - public Dispatcher { - protected: - long last_pcid; - - Messenger *messenger; // this is how i communicate - - Mutex lock; // protect call_sem, call_reply - map call_cond; - map call_reply; - - public: - CheesySerializer(Messenger *msg) : Messenger(msg->get_myaddr()) { - messenger = msg; - messenger->set_dispatcher(this); - last_pcid = 1; - } - virtual ~CheesySerializer() { - if (messenger) - delete messenger; - } - - int shutdown(); - - // incoming messages - void dispatch(Message *m); - - // outgoing messages - int send_message(Message *m, msg_addr_t dest, - int port=0, int fromport=0); // doesn't block - Message *sendrecv(Message *m, msg_addr_t dest, - int port=0); // blocks for matching reply - -}; - -#endif diff --git a/ceph/msg/MPIMessenger.cc b/ceph/msg/MPIMessenger.cc index 379ce1869c5c1..0ce050d063532 100644 --- a/ceph/msg/MPIMessenger.cc +++ b/ceph/msg/MPIMessenger.cc @@ -6,7 +6,6 @@ #include "common/Mutex.h" #include "MPIMessenger.h" -#include "CheesySerializer.h" #include "Message.h" #include diff --git a/ceph/msg/Messenger.cc b/ceph/msg/Messenger.cc index d63cf15101976..54214bebd510f 100644 --- a/ceph/msg/Messenger.cc +++ b/ceph/msg/Messenger.cc @@ -165,8 +165,9 @@ void Messenger::dispatch(Message *m) call_reply[pcid] = m; // set reply call_cond[pcid]->Signal(); - // wait for delivery - call_reply_finish_cond.Wait(_lock); + // wait for sendrecv to wake up + while (call_cond.count(pcid)) + call_reply_finish_cond.Wait(_lock); _lock.Unlock(); } else { @@ -213,18 +214,14 @@ Message *Messenger::sendrecv(Message *m, msg_addr_t dest, int port) _lock.Lock(); // wait? - if (call_reply[pcid] == 0) { + while (call_reply[pcid] == 0) { dout(DEBUGLVL) << "sendrecv waiting for reply on pcid " << pcid << endl; - //cout << "wait start, value = " << sem->Value() << endl; - cond.Wait(_lock); - } else { - dout(DEBUGLVL) << "sendrecv reply is already here on pcid " << pcid << endl; - } + } // pick up reply Message *reply = call_reply[pcid]; - //assert(reply); + assert(reply); call_reply.erase(pcid); // remove from call map call_cond.erase(pcid); diff --git a/ceph/msg/Semaphore.h b/ceph/msg/Semaphore.h deleted file mode 100755 index 3f8bb03f0d7f6..0000000000000 --- a/ceph/msg/Semaphore.h +++ /dev/null @@ -1,60 +0,0 @@ -/* - -don't use this.. you probably want a pthread Cond instead! - -*/ - -///////////////////////////////////////////////////////////////////// -// Written by Phillip Sitbon -// Copyright 2003 -// -// Posix/Semaphore.h -// - Resource counting mechanism -// -///////////////////////////////////////////////////////////////////// - -#ifndef _Semaphore_Posix_ -#define _Semaphore_Posix_ - -#include -#include -#include -using namespace std; - -class Semaphore -{ - sem_t S; - - public: - Semaphore( int init = 0 ) { - int r = sem_init(&S,0,init); - //cout << "sem_init = " << r << endl; - } - - virtual ~Semaphore() { - int r = sem_destroy(&S); - //cout << "sem_destroy = " << r << endl; - } - - void Wait() const { - while (1) { - int r = sem_wait((sem_t *)&S); - if (r == 0) break; - cout << "sem_wait returned " << r << ", trying again" << endl; - } - } - - int Wait_Try() const - { return (sem_trywait((sem_t *)&S)?errno:0); } - - int Post() const - { return (sem_post((sem_t *)&S)?errno:0); } - - int Value() const - { int V = -1; sem_getvalue((sem_t *)&S,&V); return V; } - - void Reset( int init = 0 ) - { sem_destroy(&S); sem_init(&S,0,init); } -}; - -#endif // !_Semaphore_Posix_ diff --git a/ceph/msg/TCPMessenger.cc b/ceph/msg/TCPMessenger.cc index 3889bd2a68038..49e1e14679fea 100644 --- a/ceph/msg/TCPMessenger.cc +++ b/ceph/msg/TCPMessenger.cc @@ -148,7 +148,7 @@ public: entity_rank[MSG_ADDR_RANK(my_rank)] = my_rank; rank_addr[my_rank] = listen_addr; - waiting_for_rank.Signal(); + waiting_for_rank.SignalAll(); delete m; } @@ -869,7 +869,8 @@ msg_addr_t register_entity(msg_addr_t addr) tcp_send(m); // wait for reply - waiting_for_rank.Wait(lookup_lock); + while (my_rank < 0) + waiting_for_rank.Wait(lookup_lock); assert(my_rank > 0); } @@ -881,9 +882,7 @@ msg_addr_t register_entity(msg_addr_t addr) tcp_send(m); // wait? - if (waiting_for_register_result.count(id)) { - // already here? - } else + while (!waiting_for_register_result.count(id)) cond.Wait(lookup_lock); // get result, clean up diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index ae8d999746a45..796b55c005443 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -226,33 +226,43 @@ void OSD::_lock_object(object_t oid) if (object_lock.count(oid)) { Cond c; dout(0) << "lock_object " << hex << oid << dec << " waiting as " << &c << endl; - object_lock_waiters[oid].push_back(&c); - c.Wait(osd_lock); - assert(object_lock.count(oid)); - } else { - dout(15) << "lock_object " << hex << oid << dec << endl; - object_lock.insert(oid); + + list& ls = object_lock_waiters[oid]; // this is safe, right? + ls.push_back(&c); + + while (object_lock.count(oid) || + ls.front() != &c) + c.Wait(osd_lock); + + assert(ls.front() == &c); + ls.pop_front(); + if (ls.empty()) + object_lock_waiters.erase(oid); } + + dout(15) << "lock_object " << hex << oid << dec << endl; + object_lock.insert(oid); } void OSD::unlock_object(object_t oid) { osd_lock.Lock(); + + // unlock assert(object_lock.count(oid)); + object_lock.erase(oid); + if (object_lock_waiters.count(oid)) { // someone is in line - list& ls = object_lock_waiters[oid]; - Cond *c = ls.front(); - ls.pop_front(); - dout(15) << "unlock_object " << hex << oid << dec << " waking up next guy " << c << endl; - if (ls.empty()) - object_lock_waiters.erase(oid); + Cond *c = object_lock_waiters[oid].front(); + assert(c); + dout(0) << "unlock_object " << hex << oid << dec << " waking up next guy " << c << endl; c->Signal(); } else { // nobody waiting dout(15) << "unlock_object " << hex << oid << dec << endl; - object_lock.erase(oid); } + osd_lock.Unlock(); } @@ -2122,7 +2132,8 @@ void OSD::wait_for_no_ops() if (pending_ops > 0) { dout(7) << "wait_for_no_ops - waiting for " << pending_ops << endl; waiting_for_no_ops = true; - no_pending_ops.Wait(osd_lock); + while (pending_ops > 0) + no_pending_ops.Wait(osd_lock); waiting_for_no_ops = false; assert(pending_ops == 0); } diff --git a/ceph/osd/OSD.h b/ceph/osd/OSD.h index d5811a4f139d3..f073339c79540 100644 --- a/ceph/osd/OSD.h +++ b/ceph/osd/OSD.h @@ -43,7 +43,7 @@ class OSDReplicaOp { new_version(nv), old_version(ov) { } bool can_send_ack() { return !sent_ack && !cancel && local_ack && waitfor_ack.empty(); } - bool can_send_safe() { return !sent_safe && !cancel && local_safe && waitfor_safe.empty(); } + bool can_send_safe() { return !sent_safe && !cancel && local_ack && local_safe && waitfor_safe.empty(); } bool can_delete() { return local_safe && (cancel || waitfor_safe.empty()); } }; -- 2.39.5