import json
import inspect
import pickle
-import bunch
+import munch
import yaml
import configparser
from boto.s3.key import Key
r = _make_admin_request(self.conn, "GET", path, params)
if r.status != 200:
raise boto.exception.S3ResponseError(r.status, r.reason)
- return bunch.bunchify(json.loads(r.read()))
+ return munch.munchify(json.loads(r.read()))
def read_meta_key(self, key):
class RTestJSONSerialize(json.JSONEncoder):
def default(self, obj):
- if isinstance(obj, (list, dict, str, unicode, int, float, bool, type(None))):
+ if isinstance(obj, (list, dict, str, int, float, bool, type(None))):
return JSONEncoder.default(self, obj)
return {'__pickle': pickle.dumps(obj)}
self.storage_classes = config.storage_classes
else:
try:
- self.storage_classes = bunch.bunchify({ 'STANDARD': { 'data_pool': config.data_pool }})
+ self.storage_classes = munch.munchify({ 'STANDARD': { 'data_pool': config.data_pool }})
except:
self.storage_classes = None
pass
return sc
def get_all(self):
- for (name, _) in self.storage_classes.iteritems():
+ for (name, _) in self.storage_classes.items():
yield name
class RPlacementTarget:
self.check()
def read_config(fp):
- config = bunch.Bunch()
+ config = munch.Munch()
g = yaml.safe_load_all(fp)
for new in g:
- print(bunch.bunchify(new))
- config.update(bunch.bunchify(new))
+ print(munch.munchify(new))
+ config.update(munch.munchify(new))
return config
str_config_opts = [
]
def dict_find(d, k):
- if d.has_key(k):
+ if k in d:
return d[k]
return None
class RagweedEnv:
def __init__(self):
- self.config = bunch.Bunch()
+ self.config = munch.Munch()
cfg = configparser.RawConfigParser()
try:
for section in cfg.sections():
try:
(section_type, name) = section.split(None, 1)
- if not self.config.has_key(section_type):
- self.config[section_type] = bunch.Bunch()
- self.config[section_type][name] = bunch.Bunch()
+ if not section_type in self.config:
+ self.config[section_type] = munch.Munch()
+ self.config[section_type][name] = munch.Munch()
cur = self.config[section_type]
except ValueError:
section_type = ''
name = section
- self.config[name] = bunch.Bunch()
+ self.config[name] = munch.Munch()
cur = self.config
- cur[name] = bunch.Bunch()
+ cur[name] = munch.Munch()
for var in str_config_opts:
try:
except:
self.bucket_prefix = 'ragweed'
- conn = bunch.Bunch()
- for (k, u) in self.config.user.iteritems():
+ conn = munch.Munch()
+ for (k, u) in self.config.user.items():
conn[k] = RGWConnection(u.access_key, u.secret_key, rgw_conf.host, dict_find(rgw_conf, 'port'), dict_find(rgw_conf, 'is_secure'))
self.zone = RZone(conn)
import boto.s3.connection
from http.client import HTTPConnection, HTTPSConnection
-from urllib.parse import urlparse
-import urllib
+from urllib.parse import urlparse, urlencode
def _make_admin_request(conn, method, path, query_dict=None, body=None, response_headers=None, request_headers=None, expires_in=100000, path_style=True, timeout=None):
"""
query = ''
if query_dict is not None:
- query = urllib.urlencode(query_dict)
+ query = urlencode(query_dict)
(bucket_str, key_str) = path.split('/', 2)[1:]
bucket = conn.get_bucket(bucket_str, validate=False)
if request_headers is None:
request_headers = {}
- c = class_(host, port, strict=True, timeout=timeout)
+ c = class_(host, port, timeout=timeout)
# TODO: We might have to modify this in future if we need to interact with
# how http.client.request handles Accept-Encoding and Host.