]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: usage logger
authorYehuda Sadeh <yehuda@inktank.com>
Mon, 11 Jun 2012 18:01:56 +0000 (11:01 -0700)
committerYehuda Sadeh <yehuda@inktank.com>
Mon, 11 Jun 2012 20:25:51 +0000 (13:25 -0700)
Accumulate usage info and flush it periodically.

Signed-off-by: Yehuda Sadeh <yehuda@inktank.com>
src/rgw/rgw_common.h
src/rgw/rgw_env.cc
src/rgw/rgw_log.cc
src/rgw/rgw_log.h
src/rgw/rgw_main.cc

index 852e72e34836d344fd0e632115fd218a14b8d727..4b93ff3e361cad17783675cc464a2887e2f88fcd 100644 (file)
@@ -250,9 +250,10 @@ protected:
   void init(CephContext *cct, RGWEnv * env);
 public:
   RGWConf() :
-    enable_ops_log(1) {}
+    enable_ops_log(1), enable_usage_log(1) {}
 
   int enable_ops_log;
+  int enable_usage_log;
 };
 
 enum http_op {
@@ -544,6 +545,7 @@ struct req_state {
    uint64_t bytes_received; // data received
    uint64_t obj_size;
    bool enable_ops_log;
+   bool enable_usage_log;
    uint32_t perm_mask;
    utime_t header_time;
 
index d7569b0a5faff921dec7cdadfe075f4273d9ae53..b8f6b0440cc7e9e34741f39dda8b4e72201d0e5b 100644 (file)
@@ -77,4 +77,5 @@ size_t RGWEnv::get_size(const char *name, size_t def_val)
 void RGWConf::init(CephContext *cct, RGWEnv *env)
 {
   enable_ops_log = cct->_conf->rgw_enable_ops_log;
+  enable_usage_log = cct->_conf->rgw_enable_usage_log;
 }
index fd6f53c3e57ae81bb88a43eda0befca8eb967475..77a7befcf0dd9b4679a9705b063f0b7fccf86bd2 100644 (file)
@@ -1,4 +1,5 @@
 #include "common/Clock.h"
+#include "common/Timer.h"
 #include "common/utf8.h"
 
 #include "rgw_log.h"
@@ -75,11 +76,122 @@ string render_log_object_name(const string& format,
   return o;
 }
 
+/* usage logger */
+class UsageLogger {
+  CephContext *cct;
+  map<rgw_user_bucket, RGWUsageInfo> usage_map;
+  Mutex lock;
+  int32_t num_entries;
+  Mutex timer_lock;
+  SafeTimer timer;
+  utime_t round_timestamp;
+
+  class C_UsageLogTimeout : public Context {
+    CephContext *cct;
+    UsageLogger *logger;
+  public:
+    C_UsageLogTimeout(CephContext *_cct, UsageLogger *_l) : cct(_cct), logger(_l) {}
+    void finish(int r) {
+      logger->flush();
+      logger->set_timer();
+    }
+  };
+
+  void set_timer() {
+    timer.add_event_after(cct->_conf->rgw_usage_log_tick_interval, new C_UsageLogTimeout(cct, this));
+  }
+public:
+
+  UsageLogger(CephContext *_cct) : cct(_cct), lock("UsageLogger"), num_entries(0), timer_lock("UsageLogger::timer_lock"), timer(cct, timer_lock) {
+    timer.init();
+    Mutex::Locker l(timer_lock);
+    set_timer();
+    utime_t ts = ceph_clock_now(cct);
+    recalc_round_timestamp(ts);
+  }
+
+  ~UsageLogger() {
+    Mutex::Locker l(timer_lock);
+    flush();
+    timer.cancel_all_events();
+    timer.shutdown();
+  }
+
+  void recalc_round_timestamp(utime_t& ts) {
+    struct tm bdt;
+    time_t tt = ts.sec();
+    gmtime_r(&tt, &bdt);
+    bdt.tm_sec = 0;
+    bdt.tm_min = 0;
+    tt = mktime(&bdt);
+    round_timestamp = utime_t(tt, 0);
+  }
+
+  void insert(utime_t& timestamp, rgw_usage_log_entry& entry) {
+    if (timestamp.sec() > round_timestamp + 3600)
+      recalc_round_timestamp(timestamp);
+    entry.epoch = round_timestamp.sec();
+    lock.Lock();
+    bool account;
+    rgw_user_bucket ub(entry.owner, entry.bucket);
+    usage_map[ub].insert(round_timestamp, entry, &account);
+    if (account)
+      num_entries++;
+    bool need_flush = (num_entries > cct->_conf->rgw_usage_log_flush_threshold);
+    lock.Unlock();
+    if (need_flush) {
+      Mutex::Locker l(timer_lock);
+      flush();
+    }
+  }
+
+  void flush() {
+    map<rgw_user_bucket, RGWUsageInfo> old_map;
+    lock.Lock();
+    old_map.swap(usage_map);
+    num_entries = 0;
+    lock.Unlock();
+
+    rgwstore->log_usage(old_map);
+  }
+};
+
+static UsageLogger *usage_logger = NULL;
+
+void rgw_log_usage_init(CephContext *cct)
+{
+  usage_logger = new UsageLogger(cct);
+}
+
+void rgw_log_usage_finalize()
+{
+  delete usage_logger;
+  usage_logger = NULL;
+}
+
+static void log_usage(struct req_state *s)
+{
+  if (!usage_logger)
+    return;
+
+  if (s->bucket_owner.empty())
+    return;
+
+  rgw_usage_log_entry entry(s->bucket_owner, s->bucket.name, s->bytes_sent, s->bytes_received);
+
+  utime_t ts = ceph_clock_now(s->cct);
+
+  usage_logger->insert(ts, entry);
+}
+
 int rgw_log_op(struct req_state *s)
 {
   struct rgw_log_entry entry;
   string bucket_id;
 
+  if (s->enable_usage_log)
+    log_usage(s);
+
   if (!s->enable_ops_log)
     return 0;
 
index 2f02da75f30f46aeaa6be5d1088c3762a0bbc13a..6152708f77a8dd325e5d74c4ec63a87d7b6f7713 100644 (file)
@@ -6,6 +6,7 @@
 
 #define RGW_LOG_POOL_NAME ".log"
 #define RGW_INTENT_LOG_POOL_NAME ".intent-log"
+#define RGW_USAGE_LOG_POOL_NAME ".usage"
 
 struct rgw_log_entry {
   string object_owner;
@@ -118,6 +119,8 @@ WRITE_CLASS_ENCODER(rgw_intent_log_entry)
 
 int rgw_log_op(struct req_state *s);
 int rgw_log_intent(struct req_state *s, rgw_obj& obj, RGWIntentEvent intent);
+void rgw_log_usage_init(CephContext *cct);
+void rgw_log_usage_finalize();
 
 #endif
 
index b6699e778db272a8aaf91317e105c480927b5c05..2a81c37091266ccd9208f9c3c59cc2ba6b2a7f44 100644 (file)
@@ -428,9 +428,13 @@ int main(int argc, const char **argv)
   init_timer.shutdown();
   mutex.Unlock();
 
+  rgw_log_usage_init(g_ceph_context);
+
   RGWProcess process(g_ceph_context, g_conf->rgw_thread_pool_size);
   process.run();
 
+  rgw_log_usage_finalize();
+
   rgw_perf_stop(g_ceph_context);
 
   unregister_async_signal_handler(SIGHUP, sighup_handler);