case CEPH_LOCK_IFLOCK:
::encode(inode.version, bl);
- ::encode(fcntl_locks, bl);
- ::encode(flock_locks, bl);
+ _encode_file_locks(bl);
break;
case CEPH_LOCK_IPOLICY:
case CEPH_LOCK_IFLOCK:
::decode(inode.version, p);
- ::decode(fcntl_locks, p);
- ::decode(flock_locks, p);
+ _decode_file_locks(p);
break;
case CEPH_LOCK_IPOLICY:
mdcache->num_caps--;
//clean up advisory locks
- bool fcntl_removed = fcntl_locks.remove_all_from(client);
- bool flock_removed = flock_locks.remove_all_from(client);
+ bool fcntl_removed = fcntl_locks ? fcntl_locks->remove_all_from(client) : false;
+ bool flock_removed = flock_locks ? flock_locks->remove_all_from(client) : false;
if (fcntl_removed || flock_removed) {
list<MDSInternalContextBase*> waiters;
take_waiting(CInode::WAIT_FLOCK, waiters);
_encode_locks_full(bl);
- ::encode(fcntl_locks, bl);
- ::encode(flock_locks, bl);
+ _encode_file_locks(bl);
+
ENCODE_FINISH(bl);
get(PIN_TEMPEXPORTING);
_decode_locks_full(p);
- if (struct_v >= 5) {
- ::decode(fcntl_locks, p);
- ::decode(flock_locks, p);
- }
+ _decode_file_locks(p);
DECODE_FINISH(p);
}
protected:
- ceph_lock_state_t fcntl_locks;
- ceph_lock_state_t flock_locks;
+ ceph_lock_state_t *fcntl_locks;
+ ceph_lock_state_t *flock_locks;
+ ceph_lock_state_t *get_fcntl_lock_state() {
+ if (!fcntl_locks)
+ fcntl_locks = new ceph_lock_state_t(g_ceph_context);
+ return fcntl_locks;
+ }
+ void clear_fcntl_lock_state() {
+ delete fcntl_locks;
+ fcntl_locks = NULL;
+ }
+ ceph_lock_state_t *get_flock_lock_state() {
+ if (!flock_locks)
+ flock_locks = new ceph_lock_state_t(g_ceph_context);
+ return flock_locks;
+ }
+ void clear_flock_lock_state() {
+ delete flock_locks;
+ flock_locks = NULL;
+ }
void clear_file_locks() {
- fcntl_locks.clear();
- flock_locks.clear();
+ clear_fcntl_lock_state();
+ clear_flock_lock_state();
+ }
+ void _encode_file_locks(bufferlist& bl) const {
+ bool has_fcntl_locks = fcntl_locks && !fcntl_locks->empty();
+ ::encode(has_fcntl_locks, bl);
+ if (has_fcntl_locks)
+ ::encode(*fcntl_locks, bl);
+ bool has_flock_locks = flock_locks && !flock_locks->empty();
+ ::encode(has_flock_locks, bl);
+ if (has_flock_locks)
+ ::encode(*flock_locks, bl);
+ }
+ void _decode_file_locks(bufferlist::iterator& p) {
+ bool has_fcntl_locks;
+ ::decode(has_fcntl_locks, p);
+ if (has_fcntl_locks)
+ ::decode(*get_fcntl_lock_state(), p);
+ else
+ clear_fcntl_lock_state();
+ bool has_flock_locks;
+ ::decode(has_flock_locks, p);
+ if (has_flock_locks)
+ ::decode(*get_flock_lock_state(), p);
+ else
+ clear_flock_lock_state();
}
// LogSegment lists i (may) belong to
parent(0),
inode_auth(CDIR_AUTH_DEFAULT),
replica_caps_wanted(0),
- fcntl_locks(g_ceph_context), flock_locks(g_ceph_context),
+ fcntl_locks(0), flock_locks(0),
item_dirty(this), item_caps(this), item_open_file(this), item_dirty_parent(this),
item_dirty_dirfrag_dir(this),
item_dirty_dirfrag_nest(this),
g_num_inos++;
close_dirfrags();
close_snaprealm();
+ clear_file_locks();
}
for ( int i=0; i < num_locks; ++i) {
ceph_filelock decoded_lock;
::decode(decoded_lock, bli);
- in->fcntl_locks.held_locks.
+ in->get_fcntl_lock_state()->held_locks.
insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
- ++in->fcntl_locks.client_held_lock_counts[(client_t)(decoded_lock.client)];
+ ++in->get_fcntl_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
}
::decode(num_locks, bli);
for ( int i=0; i < num_locks; ++i) {
ceph_filelock decoded_lock;
::decode(decoded_lock, bli);
- in->flock_locks.held_locks.
+ in->get_flock_lock_state()->held_locks.
insert(pair<uint64_t, ceph_filelock>(decoded_lock.start, decoded_lock));
- ++in->flock_locks.client_held_lock_counts[(client_t)(decoded_lock.client)];
+ ++in->get_flock_lock_state()->client_held_lock_counts[(client_t)(decoded_lock.client)];
}
}
#include "Beacon.h"
-#define CEPH_MDS_PROTOCOL 24 /* cluster internal */
+#define CEPH_MDS_PROTOCOL 25 /* cluster internal */
enum {
l_mds_first = 2000,
for (int i = 0; i < numlocks; ++i) {
::decode(lock, p);
lock.client = client;
- in->fcntl_locks.held_locks.insert(pair<uint64_t, ceph_filelock>
- (lock.start, lock));
- ++in->fcntl_locks.client_held_lock_counts[client];
+ in->get_fcntl_lock_state()->held_locks.insert(pair<uint64_t, ceph_filelock>(lock.start, lock));
+ ++in->get_fcntl_lock_state()->client_held_lock_counts[client];
}
::decode(numlocks, p);
for (int i = 0; i < numlocks; ++i) {
::decode(lock, p);
lock.client = client;
- in->flock_locks.held_locks.insert(pair<uint64_t, ceph_filelock>
- (lock.start, lock));
- ++in->flock_locks.client_held_lock_counts[client];
+ in->get_flock_lock_state()->held_locks.insert(pair<uint64_t, ceph_filelock> (lock.start, lock));
+ ++in->get_flock_lock_state()->client_held_lock_counts[client];
}
}
interrupt = true;
// fall-thru
case CEPH_LOCK_FLOCK:
- lock_state = &cur->flock_locks;
+ lock_state = cur->get_flock_lock_state();
break;
case CEPH_LOCK_FCNTL_INTR:
interrupt = true;
// fall-thru
case CEPH_LOCK_FCNTL:
- lock_state = &cur->fcntl_locks;
+ lock_state = cur->get_fcntl_lock_state();
break;
default:
ceph_lock_state_t *lock_state = NULL;
switch (req->head.args.filelock_change.rule) {
case CEPH_LOCK_FLOCK:
- lock_state = &cur->flock_locks;
+ lock_state = cur->get_flock_lock_state();
break;
case CEPH_LOCK_FCNTL:
- lock_state = &cur->fcntl_locks;
+ lock_state = cur->get_fcntl_lock_state();
break;
default:
client_held_lock_counts.clear();
client_waiting_lock_counts.clear();
}
+ bool empty() const {
+ return held_locks.empty() && waiting_locks.empty() &&
+ client_held_lock_counts.empty() &&
+ client_waiting_lock_counts.empty();
+ }
};
WRITE_CLASS_ENCODER(ceph_lock_state_t)