]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
os/: Add WBThrottle
authorSamuel Just <sam.just@inktank.com>
Tue, 14 May 2013 21:44:06 +0000 (14:44 -0700)
committerSamuel Just <sam.just@inktank.com>
Tue, 21 May 2013 23:37:41 +0000 (16:37 -0700)
Signed-off-by: Samuel Just <sam.just@inktank.com>
src/Makefile.am
src/common/config_opts.h
src/os/WBThrottle.cc [new file with mode: 0644]
src/os/WBThrottle.h [new file with mode: 0644]
src/os/hobject.h

index 9525fe63f4f840eae1d7d5f371171963939ba2f1..5e10c9eed25fe8490da24ea9f13ddded56372b05 100644 (file)
@@ -1490,7 +1490,8 @@ libos_a_SOURCES = \
        os/IndexManager.cc \
        os/FlatIndex.cc \
        os/DBObjectMap.cc \
-       os/LevelDBStore.cc
+       os/LevelDBStore.cc \
+       os/WBThrottle.cc
 libos_a_CXXFLAGS= ${AM_CXXFLAGS}
 noinst_LIBRARIES += libos.a
 
@@ -1974,6 +1975,7 @@ noinst_HEADERS = \
        os/FlatIndex.h\
        os/HashIndex.h\
        os/FDCache.h\
+       os/WBThrottle.h\
        os/IndexManager.h\
         os/Journal.h\
         os/JournalingObjectStore.h\
index 68c7bef08b35c29f4d4007a0f295f7e992bea8bf..27593cb9f9df7730933de18f475d34bc9b14205d 100644 (file)
@@ -474,6 +474,20 @@ OPTION(osd_mon_shutdown_timeout, OPT_DOUBLE, 5)
 
 OPTION(filestore, OPT_BOOL, false)
 
+/// filestore wb throttle limits
+OPTION(filestore_wbthrottle_btrfs_bytes_start_flusher, OPT_U64, 10<<20)
+OPTION(filestore_wbthrottle_btrfs_bytes_hard_limit, OPT_U64, 100<<20)
+OPTION(filestore_wbthrottle_btrfs_ios_start_flusher, OPT_U64, 100)
+OPTION(filestore_wbthrottle_btrfs_ios_hard_limit, OPT_U64, 1000)
+OPTION(filestore_wbthrottle_btrfs_inodes_start_flusher, OPT_U64, 100)
+OPTION(filestore_wbthrottle_btrfs_inodes_hard_limit, OPT_U64, 1000)
+OPTION(filestore_wbthrottle_xfs_bytes_start_flusher, OPT_U64, 10<<20)
+OPTION(filestore_wbthrottle_xfs_bytes_hard_limit, OPT_U64, 100<<20)
+OPTION(filestore_wbthrottle_xfs_ios_start_flusher, OPT_U64, 10)
+OPTION(filestore_wbthrottle_xfs_ios_hard_limit, OPT_U64, 100)
+OPTION(filestore_wbthrottle_xfs_inodes_start_flusher, OPT_U64, 10)
+OPTION(filestore_wbthrottle_xfs_inodes_hard_limit, OPT_U64, 100)
+
 // Tests index failure paths
 OPTION(filestore_index_retry_probability, OPT_DOUBLE, 0)
 
diff --git a/src/os/WBThrottle.cc b/src/os/WBThrottle.cc
new file mode 100644 (file)
index 0000000..24f7730
--- /dev/null
@@ -0,0 +1,209 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "os/WBThrottle.h"
+
+WBThrottle::WBThrottle(CephContext *cct) :
+  cur_ios(0), cur_size(0),
+  cct(cct),
+  stopping(false),
+  lock("WBThrottle::lock", false, true, false, cct),
+  fs(XFS)
+{
+  {
+    Mutex::Locker l(lock);
+    set_from_conf();
+  }
+  assert(cct);
+  cct->_conf->add_observer(this);
+  create();
+}
+
+WBThrottle::~WBThrottle() {
+  assert(cct);
+  {
+    Mutex::Locker l(lock);
+    stopping = true;
+    cond.Signal();
+  }
+  join();
+  cct->_conf->remove_observer(this);
+}
+
+const char** WBThrottle::get_tracked_conf_keys() const
+{
+  static const char* KEYS[] = {
+    "filestore_wbthrottle_btrfs_bytes_start_flusher",
+    "filestore_wbthrottle_btrfs_bytes_hard_limit",
+    "filestore_wbthrottle_btrfs_ios_start_flusher",
+    "filestore_wbthrottle_btrfs_ios_hard_limit",
+    "filestore_wbthrottle_btrfs_inodes_start_flusher",
+    "filestore_wbthrottle_btrfs_inodes_hard_limit",
+    "filestore_wbthrottle_xfs_bytes_start_flusher",
+    "filestore_wbthrottle_xfs_bytes_hard_limit",
+    "filestore_wbthrottle_xfs_ios_start_flusher",
+    "filestore_wbthrottle_xfs_ios_hard_limit",
+    "filestore_wbthrottle_xfs_inodes_start_flusher",
+    "filestore_wbthrottle_xfs_inodes_hard_limit",
+    NULL
+  };
+  return KEYS;
+}
+
+void WBThrottle::set_from_conf()
+{
+  assert(lock.is_locked());
+  if (fs == BTRFS) {
+    size_limits.first =
+      cct->_conf->filestore_wbthrottle_btrfs_bytes_start_flusher;
+    size_limits.second =
+      cct->_conf->filestore_wbthrottle_btrfs_bytes_hard_limit;
+    io_limits.first =
+      cct->_conf->filestore_wbthrottle_btrfs_ios_start_flusher;
+    io_limits.second =
+      cct->_conf->filestore_wbthrottle_btrfs_ios_hard_limit;
+    fd_limits.first =
+      cct->_conf->filestore_wbthrottle_btrfs_inodes_start_flusher;
+    fd_limits.second =
+      cct->_conf->filestore_wbthrottle_btrfs_inodes_hard_limit;
+  } else if (fs == XFS) {
+    size_limits.first =
+      cct->_conf->filestore_wbthrottle_xfs_bytes_start_flusher;
+    size_limits.second =
+      cct->_conf->filestore_wbthrottle_xfs_bytes_hard_limit;
+    io_limits.first =
+      cct->_conf->filestore_wbthrottle_xfs_ios_start_flusher;
+    io_limits.second =
+      cct->_conf->filestore_wbthrottle_xfs_ios_hard_limit;
+    fd_limits.first =
+      cct->_conf->filestore_wbthrottle_xfs_inodes_start_flusher;
+    fd_limits.second =
+      cct->_conf->filestore_wbthrottle_xfs_inodes_hard_limit;
+  } else {
+    assert(0 == "invalid value for fs");
+  }
+  cond.Signal();
+}
+
+void WBThrottle::handle_conf_change(const md_config_t *conf,
+                                   const std::set<std::string> &changed)
+{
+  Mutex::Locker l(lock);
+  for (const char** i = get_tracked_conf_keys(); *i; ++i) {
+    if (changed.count(*i)) {
+      set_from_conf();
+      return;
+    }
+  }
+}
+
+bool WBThrottle::get_next_should_flush(
+  boost::tuple<hobject_t, FDRef, PendingWB> *next)
+{
+  assert(lock.is_locked());
+  assert(next);
+  while (!stopping &&
+         cur_ios < io_limits.first &&
+         pending_wbs.size() < fd_limits.first &&
+         cur_size < size_limits.first)
+         cond.Wait(lock);
+  if (stopping)
+    return false;
+  assert(!pending_wbs.empty());
+  hobject_t obj(pop_object());
+  
+  map<hobject_t, pair<PendingWB, FDRef> >::iterator i =
+    pending_wbs.find(obj);
+  *next = boost::make_tuple(obj, i->second.second, i->second.first);
+  pending_wbs.erase(i);
+  return true;
+}
+
+
+void *WBThrottle::entry()
+{
+  Mutex::Locker l(lock);
+  boost::tuple<hobject_t, FDRef, PendingWB> wb;
+  while (get_next_should_flush(&wb)) {
+    clearing = wb.get<0>();
+    lock.Unlock();
+    ::fsync(**wb.get<1>());
+    if (wb.get<2>().replica)
+      posix_fadvise(**wb.get<1>(), 0, 0, POSIX_FADV_DONTNEED);
+    lock.Lock();
+    clearing = hobject_t();
+    cur_ios -= wb.get<2>().ios;
+    cur_size -= wb.get<2>().size;
+    cond.Signal();
+    wb = boost::tuple<hobject_t, FDRef, PendingWB>();
+  }
+  return 0;
+}
+
+void WBThrottle::queue_wb(
+  FDRef fd, const hobject_t &hoid, uint64_t offset, uint64_t len,
+  bool replica)
+{
+  Mutex::Locker l(lock);
+  map<hobject_t, pair<PendingWB, FDRef> >::iterator wbiter =
+    pending_wbs.find(hoid);
+  if (wbiter == pending_wbs.end()) {
+    wbiter = pending_wbs.insert(
+      make_pair(hoid,
+       make_pair(
+         PendingWB(),
+         fd))).first;
+  } else {
+    remove_object(hoid);
+  }
+
+  cur_ios++;
+  cur_size += len;
+  wbiter->second.first.add(replica, len, 1);
+  insert_object(hoid);
+  cond.Signal();
+}
+
+void WBThrottle::clear()
+{
+  Mutex::Locker l(lock);
+  for (map<hobject_t, pair<PendingWB, FDRef> >::iterator i =
+        pending_wbs.begin();
+       i != pending_wbs.end();
+       ++i) {
+    cur_ios -= i->second.first.ios;
+    cur_size -= i->second.first.size;
+  }
+  pending_wbs.clear();
+  lru.clear();
+  rev_lru.clear();
+  cond.Signal();
+}
+
+void WBThrottle::clear_object(const hobject_t &hoid)
+{
+  Mutex::Locker l(lock);
+  while (clearing == hoid)
+    cond.Wait(lock);
+  map<hobject_t, pair<PendingWB, FDRef> >::iterator i =
+    pending_wbs.find(hoid);
+  if (i == pending_wbs.end())
+    return;
+
+  cur_ios -= i->second.first.ios;
+  cur_size -= i->second.first.size;
+
+  pending_wbs.erase(i);
+  remove_object(hoid);
+}
+
+void WBThrottle::throttle()
+{
+  Mutex::Locker l(lock);
+  while (!stopping && !(
+          cur_ios < io_limits.second &&
+          pending_wbs.size() < fd_limits.second &&
+          cur_size < size_limits.second)) {
+    cond.Wait(lock);
+  }
+}
diff --git a/src/os/WBThrottle.h b/src/os/WBThrottle.h
new file mode 100644 (file)
index 0000000..2ac7234
--- /dev/null
@@ -0,0 +1,155 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2013 Inktank Storage, Inc.
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ *
+ */
+
+#ifndef WBTHROTTLE_H
+#define WBTHROTTLE_H
+
+#include <map>
+#include <boost/tuple/tuple.hpp>
+#include <tr1/memory>
+#include "include/buffer.h"
+#include "common/Formatter.h"
+#include "os/hobject.h"
+#include "include/interval_set.h"
+#include "FDCache.h"
+#include "common/Thread.h"
+#include "common/ceph_context.h"
+
+
+/**
+ * WBThrottle
+ *
+ * Tracks, throttles, and flushes outstanding IO
+ */
+class WBThrottle : Thread, public md_config_obs_t {
+  hobject_t clearing;
+
+  /// Limits on unflushed bytes
+  pair<uint64_t, uint64_t> size_limits;
+
+  /// Limits on unflushed ios
+  pair<uint64_t, uint64_t> io_limits;
+
+  /// Limits on unflushed objects
+  pair<uint64_t, uint64_t> fd_limits;
+
+  uint64_t cur_ios;  /// Currently unflushed IOs
+  uint64_t cur_size; /// Currently unflushed bytes
+
+  /**
+   * PendingWB tracks the ios pending on an object.
+   */
+  class PendingWB {
+  public:
+    bool replica;
+    uint64_t size;
+    uint64_t ios;
+    PendingWB() : replica(true), size(0), ios(0) {}
+    void add(bool _replica, uint64_t _size, uint64_t _ios) {
+      if (!_replica)
+       replica = false; // only replica if all writes are replica
+      size += _size;
+      ios += _ios;
+    }
+  };
+
+  CephContext *cct;
+  bool stopping;
+  Mutex lock;
+  Cond cond;
+
+
+  /**
+   * Flush objects in lru order
+   */
+  list<hobject_t> lru;
+  map<hobject_t, list<hobject_t>::iterator> rev_lru;
+  void remove_object(const hobject_t &hoid) {
+    assert(lock.is_locked());
+    map<hobject_t, list<hobject_t>::iterator>::iterator iter =
+      rev_lru.find(hoid);
+    if (iter == rev_lru.end())
+      return;
+
+    lru.erase(iter->second);
+    rev_lru.erase(iter);
+  }
+  hobject_t pop_object() {
+    assert(!lru.empty());
+    hobject_t hoid(lru.front());
+    lru.pop_front();
+    rev_lru.erase(hoid);
+    return hoid;
+  }
+  void insert_object(const hobject_t &hoid) {
+    assert(rev_lru.find(hoid) == rev_lru.end());
+    lru.push_back(hoid);
+    rev_lru.insert(make_pair(hoid, --lru.end()));
+  }
+
+  map<hobject_t, pair<PendingWB, FDRef> > pending_wbs;
+
+  /// get next flush to perform
+  bool get_next_should_flush(
+    boost::tuple<hobject_t, FDRef, PendingWB> *next ///< [out] next to flush
+    ); ///< @return false if we are shutting down
+public:
+  enum FS {
+    BTRFS,
+    XFS
+  };
+
+private:
+  FS fs;
+
+  void set_from_conf();
+public:
+  WBThrottle(CephContext *cct);
+  ~WBThrottle();
+
+  /// Set fs as XFS or BTRFS
+  void set_fs(FS new_fs) {
+    Mutex::Locker l(lock);
+    fs = new_fs;
+    set_from_conf();
+  }
+
+  /// Queue wb on hoid, fd taking throttle (does not block)
+  void queue_wb(
+    FDRef fd,              ///< [in] FDRef to hoid
+    const hobject_t &hoid, ///< [in] object
+    uint64_t offset,       ///< [in] offset written
+    uint64_t len,          ///< [in] length written
+    bool replica           ///< [in] write is for replica
+    );
+
+  /// Clear all wb (probably due to sync)
+  void clear();
+
+  /// Clear object
+  void clear_object(const hobject_t &hoid);
+
+  /// Block until there is throttle available
+  void throttle();
+
+  /// md_config_obs_t
+  const char** get_tracked_conf_keys() const;
+  void handle_conf_change(const md_config_t *conf,
+                         const std::set<std::string> &changed);
+
+  /// Thread
+  void *entry();
+};
+
+#endif
index 28e0da0d82a047bc0f126e964962a7c72eae11b5..47fcb3dda3963ba120f7ca36dc3a21c595124918 100644 (file)
@@ -15,6 +15,9 @@
 #ifndef __CEPH_OS_HOBJECT_H
 #define __CEPH_OS_HOBJECT_H
 
+#include <string.h>
+#include "include/types.h"
+#include "include/rados.h"
 #include "include/object.h"
 #include "include/cmp.h"