#include "common/Formatter.h"
-#define dout_context g_ceph_context
-#define dout_subsys ceph_subsys_mds
-#undef dout_prefix
-#define dout_prefix *_dout << "Capability "
-
/*
* Capability::Export
return session ? session->get_client() : client_t(-1);
}
-int Capability::confirm_receipt(ceph_seq_t seq, unsigned caps) {
- int was_revoking = (_issued & ~_pending);
- if (seq == last_sent) {
- _revokes.clear();
- _issued = caps;
- // don't add bits
- _pending &= caps;
-
- // if the revoking is not totally finished just add the
- // new revoking caps back.
- if (was_revoking && revoking()) {
- CInode *in = get_inode();
- dout(10) << "revocation is not totally finished yet on " << *in
- << ", the session " << *session << dendl;
- _revokes.emplace_back(_pending, last_sent, last_issue);
- calc_issued();
- }
- } else {
- // can i forget any revocations?
- while (!_revokes.empty() && _revokes.front().seq < seq)
- _revokes.pop_front();
- if (!_revokes.empty()) {
- if (_revokes.front().seq == seq)
- _revokes.begin()->before = caps;
- calc_issued();
- } else {
- // seq < last_sent
- _issued = caps | _pending;
- }
- }
-
- if (was_revoking && _issued == _pending) {
- item_revoking_caps.remove_myself();
- item_client_revoking_caps.remove_myself();
- maybe_clear_notable();
- }
- return was_revoking & ~_issued; // return revoked
-}
-
bool Capability::is_stale() const
{
return session ? session->is_stale() : false;
inc_last_seq();
return last_sent;
}
- int confirm_receipt(ceph_seq_t seq, unsigned caps);
+ int confirm_receipt(ceph_seq_t seq, unsigned caps) {
+ int was_revoking = (_issued & ~_pending);
+ if (seq == last_sent) {
+ _revokes.clear();
+ _issued = caps;
+ // don't add bits
+ _pending &= caps;
+ } else {
+ // can i forget any revocations?
+ while (!_revokes.empty() && _revokes.front().seq < seq)
+ _revokes.pop_front();
+ if (!_revokes.empty()) {
+ if (_revokes.front().seq == seq)
+ _revokes.begin()->before = caps;
+ calc_issued();
+ } else {
+ // seq < last_sent
+ _issued = caps | _pending;
+ }
+ }
+
+ if (was_revoking && _issued == _pending) {
+ item_revoking_caps.remove_myself();
+ item_client_revoking_caps.remove_myself();
+ maybe_clear_notable();
+ }
+ return was_revoking & ~_issued; // return revoked
+ }
+ // we may get a release racing with revocations, which means our revokes will be ignored
+ // by the client. clean them out of our _revokes history so we don't wait on them.
+ void clean_revoke_from(ceph_seq_t li) {
+ bool changed = false;
+ while (!_revokes.empty() && _revokes.front().last_issue <= li) {
+ _revokes.pop_front();
+ changed = true;
+ }
+ if (changed) {
+ bool was_revoking = (_issued & ~_pending);
+ calc_issued();
+ if (was_revoking && _issued == _pending) {
+ item_revoking_caps.remove_myself();
+ item_client_revoking_caps.remove_myself();
+ maybe_clear_notable();
+ }
+ }
+ }
ceph_seq_t get_mseq() const { return mseq; }
void inc_mseq() { mseq++; }
new C_Locker_RetryCapRelease(this, client, ino, cap_id, mseq, seq));
return;
}
+ if (seq != cap->get_last_issue()) {
+ dout(7) << " issue_seq " << seq << " != " << cap->get_last_issue() << dendl;
+ // clean out any old revoke history
+ cap->clean_revoke_from(seq);
+ eval_cap_gather(in);
+ return;
+ }
remove_client_cap(in, cap);
}
#include "gtest/gtest.h"
#include "include/compat.h"
#include "include/cephfs/libcephfs.h"
-#include <dirent.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
-#include <thread>
+#include <dirent.h>
#ifdef __linux__
#include <sys/xattr.h>
#endif
ceph_shutdown(ca);
ceph_shutdown(cb);
}
-
-static void write_func(bool *stop)
-{
- struct ceph_mount_info *cmount;
- ASSERT_EQ(ceph_create(&cmount, NULL), 0);
- ASSERT_EQ(ceph_conf_read_file(cmount, NULL), 0);
- ASSERT_EQ(0, ceph_conf_parse_env(cmount, NULL));
- ASSERT_EQ(ceph_mount(cmount, "/"), 0);
-
- char name[20];
- snprintf(name, sizeof(name), "foo.%d", getpid());
- int fd = ceph_open(cmount, name, O_CREAT|O_RDWR, 0644);
- ASSERT_LE(0, fd);
-
- int buf_size = 4096;
- char *buf = (char *)malloc(buf_size);
- if (!buf) {
- *stop = true;
- printf("write_func failed to allocate buffer!");
- return;
- }
- memset(buf, 1, buf_size);
-
- while (!(*stop)) {
- int i;
-
- // truncate the file size to 4096 will set the max_size to 4MB.
- ASSERT_EQ(0, ceph_ftruncate(cmount, fd, 4096));
-
- // write 4MB + extra 64KB data will make client to trigger to
- // call check_cap() to report new size. And if MDS is revoking
- // the Fsxrw caps and we are still holding the Fw caps and will
- // trigger tracker#57244.
- for (i = 0; i < 1040; i++) {
- ASSERT_EQ(ceph_write(cmount, fd, buf, buf_size, 0), buf_size);
- }
- }
-
- ceph_shutdown(cmount);
-}
-
-static void setattr_func(bool *stop)
-{
- struct ceph_mount_info *cmount;
- ASSERT_EQ(ceph_create(&cmount, NULL), 0);
- ASSERT_EQ(ceph_conf_read_file(cmount, NULL), 0);
- ASSERT_EQ(0, ceph_conf_parse_env(cmount, NULL));
- ASSERT_EQ(ceph_mount(cmount, "/"), 0);
-
- char name[20];
- snprintf(name, sizeof(name), "foo.%d", getpid());
- int fd = ceph_open(cmount, name, O_CREAT|O_RDWR, 0644);
- ASSERT_LE(0, fd);
-
- while (!(*stop)) {
- // setattr will make the MDS to acquire xlock for the filelock and
- // force to revoke caps from clients
- struct ceph_statx stx = {.stx_size = 0};
- ASSERT_EQ(ceph_fsetattrx(cmount, fd, &stx, CEPH_SETATTR_SIZE), 0);
- }
-
- ceph_shutdown(cmount);
-}
-
-TEST(LibCephFS, MulticlientRevokeCaps) {
- std::thread thread1, thread2;
- bool stop = false;
- int wait = 60; // in second
-
- thread1 = std::thread(write_func, &stop);
- thread2 = std::thread(setattr_func, &stop);
-
- printf(" Will run test for %d seconds!\n", wait);
- sleep(wait);
- stop = true;
-
- thread1.join();
- thread2.join();
-}