}
// kick waiting threads
- for (list<Cond*>::iterator p = waiting_for_session[from].begin();
- p != waiting_for_session[from].end();
- ++p)
- (*p)->Signal();
+ signal_cond_list(waiting_for_session[from]);
waiting_for_session.erase(from);
delete m;
// kick any waiting threads
list<Cond*> ls;
ls.swap(waiting_for_mdsmap);
- for (list<Cond*>::iterator p = ls.begin(); p != ls.end(); ++p)
- (*p)->Signal();
+ signal_cond_list(ls);
delete m;
}
}
-class C_Client_ImplementedCaps : public Context {
- Client *client;
- MClientFileCaps *msg;
- Inode *in;
-public:
- C_Client_ImplementedCaps(Client *c, MClientFileCaps *m, Inode *i) : client(c), msg(m), in(i) {}
- void finish(int r) {
- client->implemented_caps(msg,in);
- }
-};
-
void Client::wait_on_list(list<Cond*>& ls)
{
}
+
+// flush dirty data (from objectcache)
+
+struct C_Flush : public Context {
+ Client *client;
+ Inode *in;
+ C_Flush(Client *c, Inode *i) : client(c), in(i) {}
+ void finish(int r) {
+ client->_flushed(in);
+ }
+};
+
+void Client::_flush(Inode *in)
+{
+ dout(10) << "_flush " << *in << dendl;
+ if (in->cap_refs[CEPH_CAP_WRBUFFER] == 1) {
+ in->get_cap_ref(CEPH_CAP_WRBUFFER); // for the (one!) waiter
+ in->fc.flush_dirty(0);
+ in->fc.add_safe_waiter(new C_Flush(this, in));
+ }
+}
+
+void Client::_flushed(Inode *in)
+{
+ dout(10) << "_flushed " << *in << dendl;
+ assert(in->cap_refs[CEPH_CAP_WRBUFFER] == 2);
+ put_cap_ref(in, CEPH_CAP_WRBUFFER);
+ put_cap_ref(in, CEPH_CAP_WRBUFFER);
+}
+
+
+
/** handle_file_caps
* handle caps update from mds. including mds to mds caps transitions.
* do not block.
cap.seq = m->get_seq();
bool ack = false;
- bool invalidate = false;
- bool writeback = false;
-
+
if (old_caps & ~new_caps) {
dout(10) << " revocation of " << cap_string(~new_caps & old_caps) << dendl;
cap.issued = new_caps;
if ((cap.issued & ~new_caps) & CEPH_CAP_RDCACHE)
- invalidate = true;
+ in->fc.release_clean();
+
if ((used & ~new_caps) & CEPH_CAP_WRBUFFER)
- writeback = true;
+ _flush(in);
else {
ack = true;
cap.implemented = new_caps;
if (ack)
messenger->send_message(m, m->get_source_inst());
-
- if (invalidate)
- in->fc.release_clean();
-
- if (writeback)
- in->fc.flush_dirty();
-
-}
-
-void Client::implemented_caps(MClientFileCaps *m, Inode *in)
-{
- dout(5) << "implemented_caps " << cap_string(m->get_caps())
- << ", acking to " << m->get_source() << dendl;
-
- messenger->send_message(m, m->get_source_inst());
+ else
+ delete m;
}
-
-
// -------------------
// MOUNT
// do sync read
Cond cond;
bool done = false;
- C_Cond *onfinish = new C_Cond(&cond, &done, &rvalue);
+ Context *onfinish = new C_SafeCond(&client_lock, &cond, &done, &rvalue);
Objecter::OSDRead *rd = filer->prepare_read(in->inode, offset, size, bl, 0);
if (in->hack_balance_reads || g_conf.client_hack_balance_reads)
if (g_conf.client_oc) {
// buffer cache
+
+ if (in->cap_refs[CEPH_CAP_WRBUFFER] == 0)
+ in->get_cap_ref(CEPH_CAP_WRBUFFER);
+
in->fc.write(offset, size, blist, client_lock);
} else {
// simple, non-atomic sync write
Cond cond;
bool done = false;
- Context *onfinish = new C_Cond(&cond, &done);
+ Context *onfinish = new C_SafeCond(&client_lock, &cond, &done);
Context *onsafe = new C_Client_SyncCommit(this, in);
unsafe_sync_write++;
Inode *in = f->inode;
+ dout(3) << "_fsync(" << f << ", " << (syndataonly ? "dataonly)":"data+metadata)") << dendl;
+
// metadata?
- if (!syncdataonly) {
+ if (!syncdataonly)
dout(0) << "fsync - not syncing metadata yet.. implement me" << dendl;
- }
- if (g_conf.client_oc) {
- // data?
- Cond cond;
- bool done = false;
- if (!objectcacher->commit_set(in->ino(),
- new C_Cond(&cond, &done))) {
- // wait for callback
- while (!done) cond.Wait(client_lock);
- }
- } else {
- while (in->cap_refs[CEPH_CAP_WRBUFFER] > 0) {
- dout(10) << "ino " << in->inode.ino << " has " << in->cap_refs[CEPH_CAP_WRBUFFER] << " uncommitted, waiting" << dendl;
- wait_on_list(in->waitfor_commit);
- }
- dout(10) << "ino " << in->inode.ino << " has no uncommitted writes" << dendl;
- }
+ if (g_conf.client_oc)
+ _flush(in);
+
+ while (in->cap_refs[CEPH_CAP_WRBUFFER] > 0) {
+ dout(10) << "ino " << in->inode.ino << " has " << in->cap_refs[CEPH_CAP_WRBUFFER] << " uncommitted, waiting" << dendl;
+ wait_on_list(in->waitfor_commit);
+ }
+ dout(10) << "ino " << in->inode.ino << " has no uncommitted writes" << dendl;
+
return r;
}