# -*- coding: utf-8 -*-
"""dyn.core is a utilities module for use internally within the dyn library
itself. Although it's possible to use this functionality outside of the dyn
library, it is not recommened and could possible result in some strange
behavior.
"""
import base64
import copy
import locale
import logging
import re
import threading
import time
from datetime import datetime
from . import __version__
from .compat import (HTTPConnection, HTTPSConnection, HTTPException, json,
prepare_to_send, force_unicode)
def cleared_class_dict(dict_obj):
"""Return a cleared dict of class attributes. The items cleared are any
fields which evaluate to None, and any methods
"""
return {x: dict_obj[x] for x in dict_obj if dict_obj[x] is not None and
not hasattr(dict_obj[x], '__call__')}
def clean_args(dict_obj):
"""Clean a dictionary of API arguments to prevent the display of plain text
passwords to users
:param dict_obj: The dictionary of arguments to be cleaned
"""
cleaned_args = copy.deepcopy(dict_obj)
if 'password' in cleaned_args:
cleaned_args['password'] = '*****'
return cleaned_args
class _Singleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
cur_thread = threading.current_thread()
key = getattr(cls, '__metakey__')
if key not in cls._instances:
cls._instances[key] = {
# super(Singleton, cls) evaluates to type; *args/**kwargs get
# passed to class __init__ method via type.__call__
cur_thread: super(_Singleton, cls).__call__(*args, **kwargs)
}
elif key in cls._instances and cur_thread not in cls._instances[key]:
cls._instances[key][cur_thread] = \
super(_Singleton, cls).__call__(*args, **kwargs)
return cls._instances[key][cur_thread]
# This class is a workaround for supporting metaclasses in both Python2 and 3
[docs]class Singleton(_Singleton('SingletonMeta', (object,), {})):
"""A :class:`~dyn.core.Singleton` type for implementing a true Singleton
design pattern, cleanly, using metaclasses
"""
pass
class _History(list):
"""A *list* subclass specifically targeted at being able to store the
history of calls made via a SessionEngine
"""
def append(self, p_object):
"""Override builtin list append operators to allow for the automatic
appendation of a timestamp for cleaner record keeping
"""
now_ts = datetime.now().isoformat()
super(_History, self).append(tuple([now_ts] + list(p_object)))
[docs]class SessionEngine(Singleton):
"""Base object representing a DynectSession Session"""
_valid_methods = tuple()
uri_root = '/'
[docs] def __init__(self, host=None, port=443, ssl=True, history=False,
proxy_host=None, proxy_port=None, proxy_user=None,
proxy_pass=None):
"""Initialize a Dynect Rest Session object and store the provided
credentials
:param host: DynECT API server address
:param port: Port to connect to DynECT API server
:param ssl: Enable SSL
:param history: A boolean flag determining whether or not you would
like to store a record of all API calls made to review later
:param proxy_host: A proxy host to utilize
:param proxy_port: The port that the proxy is served on
:param proxy_user: A username to connect to the proxy with if required
:param proxy_pass: A password to connect to the proxy with if required
:return: SessionEngine object
"""
super(SessionEngine, self).__init__()
self.__call_cache = _History() if history else None
self.extra_headers = dict()
self.logger = logging.getLogger(self.name)
self.host = host
self.port = port
self.ssl = ssl
self.proxy_host = proxy_host
self.proxy_port = proxy_port
self.proxy_user = proxy_user
self.proxy_pass = proxy_pass
self.poll_incomplete = True
self.content_type = 'application/json'
self._encoding = locale.getdefaultlocale()[-1] or 'UTF-8'
self._token = self._conn = self._last_response = None
self._permissions = None
self._tasks = {}
[docs] @classmethod
def new_session(cls, *args, **kwargs):
"""Return a new session instance, regardless of whether or not there is
already an existing session.
:param args: Arguments to be passed to the Singleton __call__ method
:param kwargs: keyword arguments to be passed to the Singleton __call__
method
"""
cur_thread = threading.current_thread()
key = getattr(cls, '__metakey__')
instance = cls._instances.get(key, {}).get(cur_thread, None)
if instance:
instance.close_session()
return cls.__call__(*args, **kwargs)
[docs] @classmethod
def get_session(cls):
"""Return the current session for this Session type or None if there is
not an active session
"""
cur_thread = threading.current_thread()
key = getattr(cls, '__metakey__')
return cls._instances.get(key, {}).get(cur_thread, None)
[docs] @classmethod
def close_session(cls):
"""Remove the current session from the dict of instances and return it.
If there was not currently a session being stored, return None. If,
after removing this session, there is nothing under the current key,
delete that key's entry in the _instances dict.
"""
cur_thread = threading.current_thread()
key = getattr(cls, '__metakey__')
closed = cls._instances.get(key, {}).pop(cur_thread, None)
if len(cls._instances.get(key, {})) == 0:
cls._instances.pop(key, None)
return closed
@property
def name(self):
"""A human readable version of the name of this object"""
return str(self.__class__).split('.')[-1][:-2]
[docs] def connect(self):
"""Establishes a connection to the REST API server as defined by the
host, port and ssl instance variables. If a proxy is specified, it
is used.
"""
if self._token:
self.logger.debug('Forcing logout from old session')
orig_value = self.poll_incomplete
self.poll_incomplete = False
self.execute('/REST/Session', 'DELETE')
self.poll_incomplete = orig_value
self._token = None
self._conn = None
use_proxy = False
headers = {}
if self.proxy_host and not self.proxy_port:
msg = 'Proxy missing port, please specify a port'
raise ValueError(msg)
if self.proxy_host and self.proxy_port:
use_proxy = True
if self.proxy_user and self.proxy_pass:
auth = '{}:{}'.format(self.proxy_user, self.proxy_pass)
headers['Proxy-Authorization'] = 'Basic ' + base64.b64encode(
auth)
if use_proxy:
if self.ssl:
s = 'Establishing SSL connection to {}:{} with proxy {}:{}'
msg = s.format(
self.host,
self.port,
self.proxy_host,
self.proxy_port)
self.logger.info(msg)
self._conn = HTTPSConnection(self.proxy_host, self.proxy_port,
timeout=300)
self._conn.set_tunnel(self.host, self.port, headers)
else:
s = ('Establishing unencrypted connection to {}:{} with proxy '
'{}:{}')
msg = s.format(
self.host,
self.port,
self.proxy_host,
self.proxy_port)
self.logger.info(msg)
self._conn = HTTPConnection(self.proxy_host, self.proxy_port,
timeout=300)
self._conn.set_tunnel(self.host, self.port, headers)
else:
if self.ssl:
msg = 'Establishing SSL connection to {}:{}'.format(self.host,
self.port)
self.logger.info(msg)
self._conn = HTTPSConnection(self.host, self.port,
timeout=300)
else:
msg = 'Establishing unencrypted connection to {}:{}'.format(
self.host,
self.port)
self.logger.info(msg)
self._conn = HTTPConnection(self.host, self.port,
timeout=300)
def _process_response(self, response, method, final=False):
"""API Method. Process an API response for failure, incomplete, or
success and throw any appropriate errors
:param response: the JSON response from the request being processed
:param method: the HTTP method
:param final: boolean flag representing whether or not to continue
polling
"""
return response
def _handle_error(self, uri, method, raw_args):
"""Handle the processing of a connection error with the api. Note, to be
implemented as needed in subclasses.
"""
return None
def _retry(self, msgs, final=False):
"""Retry logic around throttled or blocked tasks"""
throttle_err = 'RATE_LIMIT_EXCEEDED'
throttled = any(throttle_err == err['ERR_CD'] for err in msgs)
if throttled:
# We're rate limited, so wait 5 seconds and try again
return dict(retry=True, wait=5, final=final)
blocked_err = 'Operation blocked by current task'
blocked = any(blocked_err in err['INFO'] for err in msgs)
pat = re.compile(r'^task_id:\s+(\d+)$')
if blocked:
try:
# Get the task id
task = next(pat.match(i['INFO']).group(1) for i in msgs
if pat.match(i.get('INFO', '')))
except:
# Task id could not be recovered
wait = 1
else:
# Exponential backoff for individual blocked tasks
wait = self._tasks.get(task, 1)
self._tasks[task] = wait * 2 + 1
# Give up if final or wait > 30 seconds
return dict(retry=True, wait=wait, final=wait > 30 or final)
# Neither blocked nor throttled?
return dict(retry=False, wait=0, final=True)
def _handle_response(self, response, uri, method, raw_args, final):
"""Handle the processing of the API's response"""
body = response.read()
self.logger.debug('RESPONSE: {0}'.format(body))
self._last_response = response
if self.poll_incomplete:
response, body = self.poll_response(response, body)
self._last_response = response
if not body:
err_msg_fmt = "Received Empty Response: {!r} status: {!r} {!r}"
error_message = err_msg_fmt.format(body, response.status, uri)
self.logger.error(error_message)
raise ValueError(error_message)
json_err_fmt = "Decode Error on Response Body: {!r} status: {!r} {!r}"
try:
ret_val = json.loads(body.decode('UTF-8'))
except ValueError:
self.logger.error(json_err_fmt.format(body, response.status, uri))
raise
if self.__call_cache is not None:
self.__call_cache.append((uri, method, clean_args(raw_args),
ret_val['status']))
self._meta_update(uri, method, ret_val)
retry = {}
# Try to retry?
if ret_val['status'] == 'failure' and not final:
retry = self._retry(ret_val['msgs'], final)
if retry.get('retry', False):
time.sleep(retry['wait'])
return self.execute(uri, method, raw_args, final=retry['final'])
else:
return self._process_response(ret_val, method)
def _validate_uri(self, uri):
"""Validate and return a cleaned up uri. Make sure the command is
prefixed by '/REST/'
"""
if not uri.startswith('/'):
uri = '/' + uri
if not uri.startswith(self.uri_root):
uri = self.uri_root + uri
return uri
def _validate_method(self, method):
"""Validate the provided HTTP method type"""
if method.upper() not in self._valid_methods:
msg = '{} is not a valid HTTP method. Please use one of {}'
msg = msg.format(method, ', '.join(self._valid_methods))
raise ValueError(msg)
def _prepare_arguments(self, args, method, uri):
"""Prepare the arguments to be sent off to the API"""
if args is None:
args = {}
if not isinstance(args, dict):
# If args is an object type, parse it's dict for valid args
# If an item in args.__dict__ has a _json attribute, use that in
# place of the actual object
d = args.__dict__
args = {(x if not x.startswith('_') else x[1:]):
(d[x] if not hasattr(d[x], '_json') else getattr(d[x],
'_json'))
for x in d if d[x] is not None and
not hasattr(d[x], '__call__') and x.startswith('_')}
return args, json.dumps(args), uri
[docs] def execute(self, uri, method, args=None, final=False):
"""Execute a commands against the rest server
:param uri: The uri of the resource to access. /REST/ will be prepended
if it is not at the beginning of the uri
:param method: One of 'DELETE', 'GET', 'POST', or 'PUT'
:param args: Any arguments to be sent as a part of the request
:param final: boolean flag representing whether or not we have already
failed executing once or not
"""
if self._conn is None:
self.connect()
uri = self._validate_uri(uri)
# Make sure the method is valid
self._validate_method(method)
# Prepare arguments to send to API
raw_args, args, uri = self._prepare_arguments(args, method, uri)
msg = 'uri: {}, method: {}, args: {}'
self.logger.debug(
msg.format(uri, method, clean_args(json.loads(args))))
# Send the command and deal with results
self.send_command(uri, method, args)
# Deal with the results
try:
response = self._conn.getresponse()
except (IOError, HTTPException) as e:
if final:
raise e
else:
# Handle processing a connection error
resp = self._handle_error(uri, method, raw_args)
# If we got a valid response back from our _handle_error call
# Then return it, otherwise raise the original exception
if resp is not None:
return resp
raise e
return self._handle_response(response, uri, method, raw_args, final)
def _meta_update(self, uri, method, results):
"""Update the HTTP session token if the uri is a login or logout
:param uri: the uri from the call being updated
:param method: the api method
:param results: the JSON results
"""
# If we had a successful log in, update the token
if uri.startswith('/REST/Session') and method == 'POST':
if results['status'] == 'success':
self._token = results['data']['token']
# Otherwise, if it's a successful logout, blank the token
if uri.startswith('/REST/Session') and method == 'DELETE':
if results['status'] == 'success':
self._token = None
[docs] def poll_response(self, response, body):
"""Looks at a response from a REST command, and while indicates that
the job is incomplete, poll for response
:param response: the JSON response containing return codes
:param body: the body of the HTTP response
"""
while response.status == 307:
time.sleep(1)
uri = response.getheader('Location')
self.logger.info('Polling {}'.format(uri))
self.send_command(uri, 'GET', '')
response = self._conn.getresponse()
body = response.read()
return response, body
[docs] def send_command(self, uri, method, args):
"""Responsible for packaging up the API request and sending it to the
server over the established connection
:param uri: The uri of the resource to interact with
:param method: The HTTP method to use
:param args: Encoded arguments to send to the server
"""
self._conn.putrequest(method, uri)
# Build headers
user_agent = 'dyn-py v{}'.format(__version__)
headers = {'Content-Type': self.content_type, 'User-Agent': user_agent}
for key, val in self.extra_headers.items():
headers[key] = val
if self._token is not None:
headers['Auth-Token'] = self._token
for key, val in headers.items():
self._conn.putheader(key, val)
# Now the arguments
self._conn.putheader('Content-length', '%d' % len(args))
self._conn.endheaders()
self._conn.send(prepare_to_send(args))
[docs] def wait_for_job_to_complete(self, job_id, timeout=120):
"""When a response comes back with a status of "incomplete" we need to
wait and poll for the status of that job until it comes back with
success or failure
:param job_id: the id of the job to poll for a response from
:param timeout: how long (in seconds) we should wait for a valid
response before giving up on this request
"""
self.logger.debug('Polling for job_id: {}'.format(job_id))
start = datetime.now()
uri = '/Job/{}/'.format(job_id)
api_args = {}
# response = self.execute(uri, 'GET', api_args)
response = {'status': 'incomplete'}
now = datetime.now()
self.logger.warn('Waiting for job {}'.format(job_id))
too_long = (now - start).seconds < timeout
while response['status'] is 'incomplete' and too_long:
time.sleep(10)
response = self.execute(uri, 'GET', api_args)
return response
def __getstate__(cls):
"""Because HTTP/HTTPS connections are not serializeable, we need to
strip the connection instance out before we ship the pickled data
"""
d = cls.__dict__.copy()
d.pop('_conn')
return d
def __setstate__(cls, state):
"""Because the HTTP/HTTPS connection was stripped out in __getstate__ we
must manually re-enter it as None and let the sessions execute method
handle rebuilding it later
"""
cls.__dict__ = state
cls.__dict__['_conn'] = None
def __str__(self):
"""str override"""
return force_unicode('<{}>').format(self.name)
__repr__ = __unicode__ = __str__
def __bytes__(self):
"""bytes override"""
return bytes(self.__str__())
@property
def history(self):
"""A history of all API calls that have been made during the duration
of this Session's existence. These API call details are returned as a
*list* of 5-tuples of the form: (timestamp, uri, method, args, status)
where status will be one of 'success' or 'failure'
"""
return self.__call_cache