--- /dev/null
+# -*- coding: utf-8 -*-
+
+import bcrypt
+import cherrypy
+import time
+from cherrypy import tools
+
+class Auth(object):
+ """
+ Provide login and logout actions.
+
+ Supported config-keys:
+
+ | KEY | DEFAULT | DESCR |
+ --------------------------------------------------------------------------------------------
+ | username | None | Username |
+ | password | None | Password encrypted using bcrypt |
+ | session-expire | 1200 | Session will expire after <expires> seconds without activity |
+ """
+
+ SESSION_KEY = '_username'
+ SESSION_KEY_TS = '_username_ts'
+
+ DEFAULT_SESSION_EXPIRE = 1200
+
+ def __init__(self, module):
+ self.module = module
+ self.log = self.module.log
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['POST'])
+ @tools.json_out()
+ def login(self, username=None, password=None):
+ now = int(time.time())
+ config_username = self.module.get_localized_config('username', None)
+ config_password = self.module.get_localized_config('password', None)
+ hash_password = bcrypt.hashpw(password.encode('utf8'), config_password)
+ if username == config_username and hash_password == config_password:
+ cherrypy.session.regenerate()
+ cherrypy.session[Auth.SESSION_KEY] = username
+ cherrypy.session[Auth.SESSION_KEY_TS] = now
+ self.log.debug("Login successful")
+ return {'username': username}
+ else:
+ cherrypy.response.status = 403
+ self.log.debug("Login fail")
+ return {'detail': 'Invalid credentials'}
+
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['POST'])
+ def logout(self):
+ self.log.debug("Logout successful")
+ cherrypy.session[Auth.SESSION_KEY] = None
+ cherrypy.session[Auth.SESSION_KEY_TS] = None
+
+ def check_auth(self):
+ username = cherrypy.session.get(Auth.SESSION_KEY)
+ if not username:
+ self.log.debug("Unauthorized")
+ raise cherrypy.HTTPError(401,
+ 'You are not authorized to access that resource')
+ now = int(time.time())
+ expires = int(self.module.get_localized_config('session-expire', Auth.DEFAULT_SESSION_EXPIRE))
+ if expires > 0:
+ username_ts = cherrypy.session.get(Auth.SESSION_KEY_TS, None)
+ if username_ts and username_ts < now - expires:
+ cherrypy.session[Auth.SESSION_KEY] = None
+ cherrypy.session[Auth.SESSION_KEY_TS] = None
+ self.log.debug("Session expired.")
+ raise cherrypy.HTTPError(401,
+ 'Session expired. You are not authorized to access that resource')
+ cherrypy.session[Auth.SESSION_KEY_TS] = now
class BaseMgrModule(object):
def __init__(self, py_modules_ptr, this_ptr):
- pass
+ self.config_key_map = {}
def _ceph_get_version(self):
return "ceph-13.0.0"
+ def _ceph_get_mgr_id(self):
+ return "x"
+
+ def _ceph_set_config(self, key, value):
+ self.config_key_map[key] = value
+
+ def _ceph_get_config(self, key):
+ return self.config_key_map.get(key, None)
+
def _ceph_log(self, *args):
pass
import cherrypy
from cherrypy import tools
+from auth import Auth
from mgr_module import MgrModule
# cherrypy likes to sys.exit on error. don't let it take us down too!
cherrypy.config.update({'server.socket_host': server_addr,
'server.socket_port': int(server_port),
})
- cherrypy.tree.mount(Module.HelloWorld(self), "/")
+ auth = Auth(self)
+ cherrypy.tools.autenticate = cherrypy.Tool('before_handler', auth.check_auth)
+ noauth_required_config = {
+ '/': {
+ 'tools.autenticate.on': False,
+ 'tools.sessions.on': True
+ }
+ }
+ auth_required_config = {
+ '/': {
+ 'tools.autenticate.on': True,
+ 'tools.sessions.on': True
+ }
+ }
+ cherrypy.tree.mount(auth, "/api/auth", config=noauth_required_config)
+ cherrypy.tree.mount(Module.HelloWorld(self), "/api/hello", config=auth_required_config)
cherrypy.engine.start()
self.log.info("Waiting for engine...")
cherrypy.engine.block()
--- /dev/null
+# -*- coding: utf-8 -*-
+
+from __future__ import absolute_import
+
+import time
+from cherrypy.lib.sessions import RamSession
+from cherrypy.test import helper
+from mock import patch
+
+from ..auth import Auth
+from ..module import Module, cherrypy
+
+class Ping(object):
+ @cherrypy.expose
+ @cherrypy.tools.allow(methods=['POST'])
+ def ping(self):
+ pass
+
+class AuthTest(helper.CPWebCase):
+ @staticmethod
+ def setup_server():
+ module = Module('dashboard', None, None)
+ auth = Auth(module)
+ cherrypy.tools.autenticate = cherrypy.Tool('before_handler', auth.check_auth)
+ cherrypy.tree.mount(auth, "/api/auth")
+ cherrypy.tree.mount(Ping(), "/api/test",
+ config={'/': {'tools.autenticate.on': True}})
+ module.set_localized_config('session-expire','2')
+ module.set_localized_config('username','admin')
+ module.set_localized_config('password',
+ '$2b$12$KunrLI/uq7pqjvwUcAhIZu.B1dAGZ3liB8KFIJUOqZC.5/bEEmBQG')
+
+ def test_login_valid(self):
+ sess_mock = RamSession()
+ with patch('cherrypy.session', sess_mock, create=True):
+ self.getPage("/api/auth/login",
+ body="username=admin&password=admin",
+ method='POST')
+ self.assertStatus('200 OK')
+ self.assertBody('{"username": "admin"}')
+ self.assertEquals(sess_mock.get(Auth.SESSION_KEY), 'admin')
+
+ def test_login_invalid(self):
+ sess_mock = RamSession()
+ with patch('cherrypy.session', sess_mock, create=True):
+ self.getPage("/api/auth/login",
+ body="username=admin&password=invalid",
+ method='POST')
+ self.assertStatus('403 Forbidden')
+ self.assertBody('{"detail": "Invalid credentials"}')
+ self.assertEquals(sess_mock.get(Auth.SESSION_KEY), None)
+
+ def test_logout(self):
+ sess_mock = RamSession()
+ with patch('cherrypy.session', sess_mock, create=True):
+ self.getPage("/api/auth/login",
+ body="username=admin&password=admin",
+ method='POST')
+ self.assertEquals(sess_mock.get(Auth.SESSION_KEY), 'admin')
+ self.getPage("/api/auth/logout", method='POST')
+ self.assertStatus('200 OK')
+ self.assertBody('')
+ self.assertEquals(sess_mock.get(Auth.SESSION_KEY), None)
+
+ def test_session_expire(self):
+ sess_mock = RamSession()
+ with patch('cherrypy.session', sess_mock, create=True):
+ self.getPage("/api/auth/login",
+ body="username=admin&password=admin",
+ method='POST')
+ self.assertStatus('200 OK')
+ self.assertEquals(sess_mock.get(Auth.SESSION_KEY), 'admin')
+ self.getPage("/api/test/ping", method='POST')
+ self.assertStatus('200 OK')
+ self.assertEquals(sess_mock.get(Auth.SESSION_KEY), 'admin')
+ time.sleep(3)
+ self.getPage("/api/test/ping", method='POST')
+ self.assertStatus('401 Unauthorized')
+ self.assertEquals(sess_mock.get(Auth.SESSION_KEY), None)
+
+ def test_unauthorized(self):
+ sess_mock = RamSession()
+ with patch('cherrypy.session', sess_mock, create=True):
+ self.getPage("/api/test/ping", method='POST')
+ self.assertStatus('401 Unauthorized')
+ self.assertEquals(sess_mock.get(Auth.SESSION_KEY), None)
@staticmethod
def setup_server():
module = Module('attic', None, None)
- cherrypy.tree.mount(Module.HelloWorld(module))
+ cherrypy.tree.mount(Module.HelloWorld(module), "/api/hello")
def test_ping(self):
- self.getPage("/ping")
+ self.getPage("/api/hello/ping")
self.assertStatus('200 OK')
self.assertBody('"pong"')