From: Yehuda Sadeh Date: Mon, 11 Jun 2012 18:01:56 +0000 (-0700) Subject: rgw: usage logger X-Git-Tag: v0.48argonaut~80^2~6 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=744a1b31d82981df493f5b078afcb3eaeb0a04e7;p=ceph.git rgw: usage logger Accumulate usage info and flush it periodically. Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index 852e72e34836..4b93ff3e361c 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -250,9 +250,10 @@ protected: void init(CephContext *cct, RGWEnv * env); public: RGWConf() : - enable_ops_log(1) {} + enable_ops_log(1), enable_usage_log(1) {} int enable_ops_log; + int enable_usage_log; }; enum http_op { @@ -544,6 +545,7 @@ struct req_state { uint64_t bytes_received; // data received uint64_t obj_size; bool enable_ops_log; + bool enable_usage_log; uint32_t perm_mask; utime_t header_time; diff --git a/src/rgw/rgw_env.cc b/src/rgw/rgw_env.cc index d7569b0a5faf..b8f6b0440cc7 100644 --- a/src/rgw/rgw_env.cc +++ b/src/rgw/rgw_env.cc @@ -77,4 +77,5 @@ size_t RGWEnv::get_size(const char *name, size_t def_val) void RGWConf::init(CephContext *cct, RGWEnv *env) { enable_ops_log = cct->_conf->rgw_enable_ops_log; + enable_usage_log = cct->_conf->rgw_enable_usage_log; } diff --git a/src/rgw/rgw_log.cc b/src/rgw/rgw_log.cc index fd6f53c3e57a..77a7befcf0dd 100644 --- a/src/rgw/rgw_log.cc +++ b/src/rgw/rgw_log.cc @@ -1,4 +1,5 @@ #include "common/Clock.h" +#include "common/Timer.h" #include "common/utf8.h" #include "rgw_log.h" @@ -75,11 +76,122 @@ string render_log_object_name(const string& format, return o; } +/* usage logger */ +class UsageLogger { + CephContext *cct; + map usage_map; + Mutex lock; + int32_t num_entries; + Mutex timer_lock; + SafeTimer timer; + utime_t round_timestamp; + + class C_UsageLogTimeout : public Context { + CephContext *cct; + UsageLogger *logger; + public: + C_UsageLogTimeout(CephContext *_cct, UsageLogger *_l) : cct(_cct), logger(_l) {} + void finish(int r) { + logger->flush(); + logger->set_timer(); + } + }; + + void set_timer() { + timer.add_event_after(cct->_conf->rgw_usage_log_tick_interval, new C_UsageLogTimeout(cct, this)); + } +public: + + UsageLogger(CephContext *_cct) : cct(_cct), lock("UsageLogger"), num_entries(0), timer_lock("UsageLogger::timer_lock"), timer(cct, timer_lock) { + timer.init(); + Mutex::Locker l(timer_lock); + set_timer(); + utime_t ts = ceph_clock_now(cct); + recalc_round_timestamp(ts); + } + + ~UsageLogger() { + Mutex::Locker l(timer_lock); + flush(); + timer.cancel_all_events(); + timer.shutdown(); + } + + void recalc_round_timestamp(utime_t& ts) { + struct tm bdt; + time_t tt = ts.sec(); + gmtime_r(&tt, &bdt); + bdt.tm_sec = 0; + bdt.tm_min = 0; + tt = mktime(&bdt); + round_timestamp = utime_t(tt, 0); + } + + void insert(utime_t& timestamp, rgw_usage_log_entry& entry) { + if (timestamp.sec() > round_timestamp + 3600) + recalc_round_timestamp(timestamp); + entry.epoch = round_timestamp.sec(); + lock.Lock(); + bool account; + rgw_user_bucket ub(entry.owner, entry.bucket); + usage_map[ub].insert(round_timestamp, entry, &account); + if (account) + num_entries++; + bool need_flush = (num_entries > cct->_conf->rgw_usage_log_flush_threshold); + lock.Unlock(); + if (need_flush) { + Mutex::Locker l(timer_lock); + flush(); + } + } + + void flush() { + map old_map; + lock.Lock(); + old_map.swap(usage_map); + num_entries = 0; + lock.Unlock(); + + rgwstore->log_usage(old_map); + } +}; + +static UsageLogger *usage_logger = NULL; + +void rgw_log_usage_init(CephContext *cct) +{ + usage_logger = new UsageLogger(cct); +} + +void rgw_log_usage_finalize() +{ + delete usage_logger; + usage_logger = NULL; +} + +static void log_usage(struct req_state *s) +{ + if (!usage_logger) + return; + + if (s->bucket_owner.empty()) + return; + + rgw_usage_log_entry entry(s->bucket_owner, s->bucket.name, s->bytes_sent, s->bytes_received); + + utime_t ts = ceph_clock_now(s->cct); + + usage_logger->insert(ts, entry); +} + int rgw_log_op(struct req_state *s) { struct rgw_log_entry entry; string bucket_id; + if (s->enable_usage_log) + log_usage(s); + if (!s->enable_ops_log) return 0; diff --git a/src/rgw/rgw_log.h b/src/rgw/rgw_log.h index 2f02da75f30f..6152708f77a8 100644 --- a/src/rgw/rgw_log.h +++ b/src/rgw/rgw_log.h @@ -6,6 +6,7 @@ #define RGW_LOG_POOL_NAME ".log" #define RGW_INTENT_LOG_POOL_NAME ".intent-log" +#define RGW_USAGE_LOG_POOL_NAME ".usage" struct rgw_log_entry { string object_owner; @@ -118,6 +119,8 @@ WRITE_CLASS_ENCODER(rgw_intent_log_entry) int rgw_log_op(struct req_state *s); int rgw_log_intent(struct req_state *s, rgw_obj& obj, RGWIntentEvent intent); +void rgw_log_usage_init(CephContext *cct); +void rgw_log_usage_finalize(); #endif diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index b6699e778db2..2a81c3709126 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -428,9 +428,13 @@ int main(int argc, const char **argv) init_timer.shutdown(); mutex.Unlock(); + rgw_log_usage_init(g_ceph_context); + RGWProcess process(g_ceph_context, g_conf->rgw_thread_pool_size); process.run(); + rgw_log_usage_finalize(); + rgw_perf_stop(g_ceph_context); unregister_async_signal_handler(SIGHUP, sighup_handler);