# -*- coding: utf-8 -*-
from datetime import datetime
from dyn.compat import force_unicode
from dyn.tm.errors import DynectInvalidArgumentError
from dyn.tm.session import DynectSession
from dyn.tm.utils import APIList, Active, unix_date
__author__ = 'jnappi'
__all__ = ['get_all_dnssec', 'DNSSECKey', 'DNSSEC']
[docs]def get_all_dnssec():
""":return: A ``list`` of :class:`DNSSEC` Services"""
uri = '/DNSSEC/'
api_args = {'detail': 'Y'}
response = DynectSession.get_session().execute(uri, 'GET', api_args)
dnssecs = []
for dnssec in response['data']:
zone = dnssec['zone']
del dnssec['zone']
dnssecs.append(DNSSEC(zone, api=False, **dnssec))
return dnssecs
[docs]class DNSSECKey(object):
"""A Key used by the DNSSEC service"""
[docs] def __init__(self, key_type, algorithm, bits, start_ts=None, lifetime=None,
overlap=None, expire_ts=None, **kwargs):
"""Create a :class:`DNSSECKey` object
:param key_type: The type of this key. (KSK or ZSK)
:param algorithm: One of (RSA/SHA-1, RSA/SHA-256, RSA/SHA-512, DSA,
ECDSAP256SHA256, ECDSAP384SHA384)
:param bits: length of the key. Valid values: 256, 384, 1024, 2048,
or 4096
:param start_ts: An epoch time when key is to be valid
:param lifetime: Lifetime of the key expressed in seconds
:param overlap: Time before key expiration when a replacement key is
prepared, expressed in seconds. Default = 7 days.
:param expire_ts: An epoch time when this key is to expire
:param dnskey: The KSK or ZSK record data
:param ds: One of the DS records for the KSK. ZSKs will have this
value intialized, but with null values.
:param all_ds: All the DS records associated with this KSK. Applies
only to KSK, ZSK will have a zero-length list.
"""
super(DNSSECKey, self).__init__()
self.key_type = key_type
self.algorithm = algorithm
if not isinstance(bits, int):
bits = int(bits)
self.bits = bits
self.start_ts = start_ts
self.lifetime = lifetime
self.overlap = overlap
self.expire_ts = expire_ts
self.dnssec_key_id = self.dnskey = self.ds = self.all_ds = None
for key, val in kwargs.items():
setattr(self, key, val)
@property
def _json(self):
"""The JSON representation of this :class:`DNSSECKey` object"""
json_blob = {'type': self.key_type,
'algorithm': self.algorithm,
'bits': self.bits}
if self.start_ts:
json_blob['start_ts'] = self.start_ts
if self.lifetime:
json_blob['lifetime'] = self.lifetime
if self.overlap:
json_blob['overlap'] = self.overlap
if self.expire_ts:
json_blob['expire_ts'] = self.expire_ts
return json_blob
def _update(self, data):
"""Semi-private _update method"""
for key, val in data.items():
if key == 'type':
setattr(self, 'key_type', val)
elif key == 'bits':
setattr(self, key, int(val))
else:
setattr(self, key, val)
def __str__(self):
"""str override"""
return force_unicode('<DNSSECKey>: {}').format(self.algorithm)
__repr__ = __unicode__ = __str__
def __bytes__(self):
"""bytes override"""
return bytes(self.__str__())
[docs]class DNSSEC(object):
"""A DynECT System DNSSEC Service"""
[docs] def __init__(self, zone, *args, **kwargs):
"""Create a :class:`DNSSEC` object
:param zone: the zone this service will be attached to
:param keys: a list of :class:`DNSSECKey`'s for the service
:param contact_nickname: Name of contact to receive notifications
:param notify_events: A ``list`` of events that trigger notifications.
Valid values are "create" (a new version of a key was created),
"expire" (a key was automatically expired), or "warning" (early
warnings (2 weeks, 1 week, 1 day) of events)
"""
super(DNSSEC, self).__init__()
self.valid_notify_events = ('create', 'expire', 'warning')
self._zone = zone
self._contact_nickname = self._notify_events = None
self._keys = APIList(DynectSession.get_session, 'keys')
self._active = None
self.uri = '/DNSSEC/{}/'.format(self._zone)
if 'api' in kwargs:
del kwargs['api']
self._build(kwargs)
elif len(args) == 0 and len(kwargs) == 0:
self._get()
else:
self._post(*args, **kwargs)
self._keys.uri = self.uri
def _post(self, keys, contact_nickname, notify_events=None):
"""Create a new :class:`DNSSEC` Service on the Dynect System"""
self._keys += keys
self._contact_nickname = contact_nickname
self._notify_events = notify_events
api_args = {'keys': [key._json for key in self._keys],
'contact_nickname': self._contact_nickname}
for key, val in self.__dict__.items():
if val is not None and not hasattr(val, '__call__') and \
key.startswith('_'):
if key == '_user_name' or key == '_keys':
pass
else:
api_args[key[1:]] = val
# Need to cast to CSV for API
if self._notify_events is not None:
api_args['notify_events'] = ','.join(self._notify_events)
response = DynectSession.get_session().execute(self.uri, 'POST',
api_args)
self._build(response['data'])
def _get(self):
"""Update this object from an existing :class:`DNSSEC` service from the
Dynect System.
"""
api_args = {}
response = DynectSession.get_session().execute(self.uri, 'GET',
api_args)
self._build(response['data'])
def _build(self, data):
"""Iterate over API data responses and update this object according to
the data returned
"""
for key, val in data.items():
if key == 'keys':
self._keys = APIList(DynectSession.get_session, 'keys')
for key_data in val:
key_data['key_type'] = key_data['type']
del key_data['type']
self._keys.append(DNSSECKey(**key_data))
elif key == 'active':
self._active = Active(val)
else:
setattr(self, '_' + key, val)
self.uri = '/DNSSEC/{}/'.format(self._zone)
self._keys.uri = self.uri
@property
def zone(self):
"""The name of the zone where this service exists. This is a read-only
property
"""
return self._zone
@zone.setter
def zone(self, value):
pass
@property
def active(self):
"""The current status of this :class:`DNSSEC` service. When setting
directly, rather than using activate/deactivate valid arguments are 'Y'
or True to activate, or 'N' or False to deactivate. Note: If your
service is already active and you try to activate it, nothing will
happen. And vice versa for deactivation.
:returns: An :class:`Active` object representing the current state of
this :class:`DNSSEC` Service
"""
self._get() # Do a get to ensure an up-to-date status is returned
return self._active
@active.setter
def active(self, value):
deactivate = ('N', False)
activate = ('Y', True)
if value in deactivate and self.active:
self.deactivate()
elif value in activate and not self.active:
self.activate()
@property
def contact_nickname(self):
"""Name of contact to receive notifications"""
return self._contact_nickname
@contact_nickname.setter
def contact_nickname(self, value):
self._contact_nickname = value
api_args = {'contact_nickname': self._contact_nickname}
response = DynectSession.get_session().execute(self.uri, 'PUT',
api_args)
self._build(response['data'])
@property
def notify_events(self):
"""A list of events that trigger notifications. Valid values are:
create (a new version of a key was created), expire (a key was
automatically expired), warning (early warnings (2 weeks, 1 week, 1
day) of events)
"""
return self._notify_events
@notify_events.setter
def notify_events(self, value):
for val in value:
if val not in self.valid_notify_events:
raise DynectInvalidArgumentError('notify_events', val,
self.valid_notify_events)
value = ','.join(value)
api_args = {'notify_events': value}
response = DynectSession.get_session().execute(self.uri, 'PUT',
api_args)
self._build(response['data'])
@property
def keys(self):
"""A List of :class:`DNSSECKey`'s associated with this :class:`DNSSEC`
service
"""
# Need this check for get_all_dnssec calls which do not return key info
if self._keys is None or self._keys == []:
self._get()
return self._keys
@keys.setter
def keys(self, value):
if isinstance(value, list) and not isinstance(value, APIList):
self._keys = APIList(DynectSession.get_session, 'keys', None,
value)
elif isinstance(value, APIList):
self._keys = value
self._keys.uri = self.uri
[docs] def activate(self):
"""Activate this :class:`DNSSEC` service"""
api_args = {'activate': 'Y'}
response = DynectSession.get_session().execute(self.uri, 'PUT',
api_args)
self._build(response['data'])
[docs] def deactivate(self):
"""Deactivate this :class:`DNSSEC` service"""
api_args = {'deactivate': 'Y'}
response = DynectSession.get_session().execute(self.uri, 'PUT',
api_args)
self._build(response['data'])
[docs] def timeline_report(self, start_ts=None, end_ts=None):
"""Generates a report of events this :class:`DNSSEC` service has
performed and has scheduled to perform
:param start_ts: datetime.datetime instance identifying point in time
for the start of the timeline report
:param end_ts: datetime.datetime instance identifying point in time
for the end of the timeline report. Defaults to
datetime.datetime.now()
"""
api_args = {'zone': self._zone}
if start_ts is not None:
api_args['start_ts'] = unix_date(start_ts)
if end_ts is not None:
api_args['end_ts'] = unix_date(end_ts)
elif end_ts is None and start_ts is not None:
api_args['end_ts'] = unix_date(datetime.now())
uri = '/DNSSECTimelineReport/'
response = DynectSession.get_session().execute(uri, 'POST', api_args)
return response['data']
[docs] def delete(self):
"""Delete this :class:`DNSSEC` Service from the DynECT System"""
api_args = {}
DynectSession.get_session().execute(self.uri, 'DELETE', api_args)
def __str__(self):
"""str override"""
return force_unicode('<DNSSEC>: {}').format(self._zone)
__repr__ = __unicode__ = __str__
def __bytes__(self):
"""bytes override"""
return bytes(self.__str__())