# -*- coding: utf-8 -*-
"""This module contains all Zone related API objects."""
import os
from time import sleep
from datetime import datetime
from dyn.tm.utils import unix_date
from dyn.compat import force_unicode
from dyn.tm.errors import (DynectCreateError, DynectGetError,
DynectInvalidArgumentError)
from dyn.tm.records import (ARecord, AAAARecord, ALIASRecord, CDSRecord,
CAARecord, CDNSKEYRecord, CSYNCRecord, CERTRecord,
CNAMERecord, DHCIDRecord, DNAMERecord,
DNSKEYRecord, DSRecord, KEYRecord, KXRecord,
LOCRecord, IPSECKEYRecord, MXRecord, NAPTRRecord,
PTRRecord, PXRecord, NSAPRecord, RPRecord,
NSRecord, SOARecord, SPFRecord, SRVRecord,
TLSARecord, TXTRecord, SSHFPRecord, UNKNOWNRecord)
from dyn.tm.session import DynectSession
from dyn.tm.services import (ActiveFailover, DynamicDNS, DNSSEC,
TrafficDirector, GSLB, ReverseDNS, RTTM,
HTTPRedirect, AdvancedRedirect)
from dyn.tm.task import Task
__author__ = 'jnappi'
__all__ = ['get_all_zones', 'Zone', 'SecondaryZone', 'Node',
'ExternalNameserver', 'ExternalNameserverEntry']
RECS = {'A': ARecord, 'AAAA': AAAARecord, 'ALIAS': ALIASRecord,
'CAA': CAARecord, 'CDS': CDSRecord, 'CDNSKEY': CDNSKEYRecord,
'CSYNC': CSYNCRecord, 'CERT': CERTRecord, 'CNAME': CNAMERecord,
'DHCID': DHCIDRecord, 'DNAME': DNAMERecord, 'DNSKEY': DNSKEYRecord,
'DS': DSRecord, 'KEY': KEYRecord, 'KX': KXRecord, 'LOC': LOCRecord,
'IPSECKEY': IPSECKEYRecord, 'MX': MXRecord, 'NAPTR': NAPTRRecord,
'PTR': PTRRecord, 'PX': PXRecord, 'NSAP': NSAPRecord,
'RP': RPRecord, 'NS': NSRecord, 'SOA': SOARecord, 'SPF': SPFRecord,
'SRV': SRVRecord, 'TLSA': TLSARecord, 'TXT': TXTRecord,
'SSHFP': SSHFPRecord, 'UNKNOWN': UNKNOWNRecord}
[docs]def get_all_zones():
"""Accessor function to retrieve a *list* of all
:class:`~dyn.tm.zones.Zone`'s accessible to a user
:return: a *list* of :class:`~dyn.tm.zones.Zone`'s
"""
uri = '/Zone/'
api_args = {'detail': 'Y'}
response = DynectSession.get_session().execute(uri, 'GET', api_args)
zones = []
for zone in response['data']:
zones.append(Zone(zone['zone'], api=False, **zone))
return zones
[docs]def get_all_secondary_zones():
"""Accessor function to retrieve a *list* of all :class:`SecondaryZone`'s
accessible to a user
:return: a *list* of :class:`~dyn.tm.zones.SecondaryZone`'s
"""
uri = '/Secondary/'
api_args = {'detail': 'Y'}
response = DynectSession.get_session().execute(uri, 'GET', api_args)
zones = []
for zone in response['data']:
zones.append(SecondaryZone(zone.pop('zone'), api=False, **zone))
return zones
def get_apex(node_name, full_details=False):
"""Accessor function to retireve the apex zone name of a given node
available to logged in user.
:param node_name: name of the node to search for apex for.
:param full_details: if true, returns zone_type, serial_type, and serial
along with apex zone name
:return: a *string* containing apex zone name, if full_details is
:const:`False`, a :const:`dict` containing apex zone name otherwise
"""
uri = '/Apex/{}'.format(node_name)
api_args = {}
response = DynectSession.get_session().execute(uri, 'GET', api_args)
if full_details:
return response['data']
else:
return response['data']['zone']
[docs]class Zone(object):
"""A class representing a DynECT Zone"""
[docs] def __init__(self, name, *args, **kwargs):
"""Create a :class:`Zone` object. Note: When creating a new
:class:`Zone` if no contact is specified the path to a local zone
file must be passed to the ``file_name`` param.
:param name: the name of the zone to create
:param contact: Administrative contact for this zone
:param ttl: TTL (in seconds) for records in the zone
:param serial_style: The style of the zone's serial. Valid values:
increment, epoch, day, minute
:param file_name: The path to a valid RFC1035, BIND, or tinydns style
Master file. Note: this file must be under 1mb in size.
:param master_ip: The IP of the master server from which to fetch zone
data for Transferring this :class:`Zone`. Note: This argument is
required for performing a valid ZoneTransfer operation.
:param timeout: The time, in minutes, to wait for a zone xfer to
complete
"""
super(Zone, self).__init__()
self.valid_serials = ('increment', 'epoch', 'day', 'minute')
self._name = name
self._fqdn = self._name
if self._fqdn and not self._fqdn.endswith('.'):
self._fqdn += '.'
self._contact = self._ttl = self._serial_style = self._serial = None
self._zone = self._status = None
self.records = {}
self._task_id = None
self.services = {}
self.uri = '/Zone/{}/'.format(self._name)
if 'api' in kwargs:
del kwargs['api']
for key, val in kwargs.items():
setattr(self, '_' + key, val)
self._name = self._zone
self.uri = '/Zone/{}/'.format(self._name)
elif len(args) == 0 and len(kwargs) == 0:
self._get()
else:
self._post(*args, **kwargs)
self._status = 'active'
def _post(self, contact=None, ttl=60, serial_style='increment',
file_name=None, master_ip=None, timeout=None):
"""Create a new :class:`Zone` object on the DynECT System"""
if contact is None and file_name is None and master_ip is None:
raise DynectInvalidArgumentError('contact', None)
if file_name is not None:
self._post_with_file(file_name)
elif master_ip is not None:
self._xfer(master_ip, timeout)
else:
self._contact = contact
self._ttl = ttl
if serial_style not in self.valid_serials:
raise DynectInvalidArgumentError(serial_style,
self.valid_serials)
self._serial_style = serial_style
api_args = {'zone': self._name,
'rname': self._contact,
'ttl': self._ttl,
'serial_style': self._serial_style}
response = DynectSession.get_session().execute(self.uri, 'POST',
api_args)
self._build(response['data'])
def _post_with_file(self, file_name):
"""Create a :class:`Zone` from a RFC1035 style Master file. A ZoneFile
for BIND or tinydns will also be accepted
:param file_name: The path to a valid ZoneFile
"""
full_path = os.path.abspath(file_name)
file_size = os.path.getsize(full_path)
if file_size > 1048576:
raise DynectInvalidArgumentError('Zone File Size', file_size,
'Under 1MB')
else:
uri = '/ZoneFile/{}/'.format(self.name)
f = open(full_path, 'r')
content = f.read()
f.close()
api_args = {'file': content}
response = DynectSession.get_session().execute(
uri, 'POST', api_args)
self.__poll_for_get()
self._build(response['data'])
def _xfer(self, master_ip, timeout=None):
"""Create a :class:`Zone` by ZoneTransfer by providing an optional
master_ip argument.
"""
uri = '/ZoneTransfer/{}/'.format(self.name)
api_args = {'master_ip': master_ip}
response = DynectSession.get_session().execute(uri, 'POST', api_args)
self._build(response['data'])
time_out = timeout or 10
count = 0
while count < time_out:
response = DynectSession.get_session().execute(uri, 'GET', {})
if response['status'] == 'running' and response['message'] == '':
sleep(60)
count += 1
else:
break
self._get()
def __poll_for_get(self, n_loops=10, xfer=False, xfer_master_ip=None):
"""For use ONLY by _post_with_file and _xfer. Will wait at MOST
``n_loops * 2`` seconds for a successfull GET API response. If no
successfull get is recieved no error will be raised.
"""
count = 0
got = False
while count < n_loops:
try:
self._get()
got = True
break
except DynectGetError:
sleep(2)
count += 1
if not got and xfer:
uri = '/ZoneTransfer/{}/'.format(self.name)
api_args = {}
if xfer_master_ip is not None:
api_args['master_ip'] = xfer_master_ip
response = DynectSession.get_session().execute(uri, 'GET',
api_args)
error_labels = ['running', 'waiting', 'failed', 'canceled']
ok_labels = ['ready', 'unpublished', 'ok']
if response['data']['status'] in error_labels:
raise DynectCreateError(response['msgs'])
elif response['data']['status'] in ok_labels:
self._get()
else:
pass # Should never get here
def _get(self):
"""Get an existing :class:`Zone` object from the DynECT System"""
api_args = {}
response = DynectSession.get_session().execute(self.uri, 'GET',
api_args)
self._build(response['data'])
def _build(self, data):
"""Build the variables in this object by pulling out the data from data
"""
for key, val in data.items():
if key == "task_id" and not val:
self._task_id = None
elif key == "task_id":
self._task_id = Task(val)
else:
setattr(self, '_' + key, val)
def _update(self, api_args):
"""Update this :class:`ActiveFailover`, via the API, with the args in
api_args
"""
response = DynectSession.get_session().execute(self.uri, 'PUT',
api_args)
self._build(response['data'])
@property
def __root_soa(self):
"""Return the SOA record associated with this Zone"""
return self.get_all_records_by_type('SOA')[0]
@property
def name(self):
"""The name of this :class:`Zone`"""
return self._name
@name.setter
def name(self, value):
pass
@property
def fqdn(self):
"""The name of this :class:`Zone`"""
return self._fqdn
@fqdn.setter
def fqdn(self, value):
pass
@property
def contact(self):
"""The email address of the primary :class:`Contact` associated with
this :class:`Zone`
"""
self._contact = self.__root_soa.rname
return self._contact
@contact.setter
def contact(self, value):
self.__root_soa.rname = value
@property
def ttl(self):
"""This :class:`Zone`'s default TTL"""
self._ttl = self.__root_soa.ttl
return self._ttl
@ttl.setter
def ttl(self, value):
self.__root_soa.ttl = value
@property
def serial(self):
"""The current serial of this :class:`Zone`"""
self._get()
return self._serial
@serial.setter
def serial(self, value):
pass
@property
def serial_style(self):
"""The current serial style of this :class:`Zone`"""
self._get()
return self._serial_style
@serial_style.setter
def serial_style(self, value):
if value not in self.valid_serials:
raise DynectInvalidArgumentError('serial_style', value,
self.valid_serials)
self.__root_soa.serial_style = value
@property
def status(self):
"""Convenience property for :class:`Zones`. If a :class:`Zones` is
frozen the status will read as `'frozen'`, if the :class:`Zones` is not
frozen the status will read as `'active'`. Because the API does not
return information about whether or not a :class:`Zones` is frozen
there will be a few cases where this status will be `None` in order to
avoid guessing what the current status actually is.
"""
self._get()
return self._status
@status.setter
def status(self, value):
pass
[docs] def freeze(self):
"""Causes the zone to become frozen. Freezing a zone prevents changes
to the zone until it is thawed.
"""
api_args = {'freeze': True}
response = DynectSession.get_session().execute(self.uri, 'PUT',
api_args)
self._build(response['data'])
if response['status'] == 'success':
self._status = 'frozen'
[docs] def thaw(self):
"""Causes the zone to become thawed. Thawing a frozen zone allows
changes to again be made to the zone.
"""
api_args = {'thaw': True}
response = DynectSession.get_session().execute(self.uri, 'PUT',
api_args)
self._build(response['data'])
if response['status'] == 'success':
self._status = 'active'
[docs] def publish(self, notes=None):
"""Causes all pending changes to become part of the zone. The serial
number increments based on its serial style and the data is pushed out
to the nameservers.
"""
api_args = {'publish': True}
if notes:
api_args['notes'] = notes
self._update(api_args)
@property
def task(self):
""":class:`Task` for most recent system action on this :class:`Zone`.
"""
if self._task_id:
self._task_id.refresh()
return self._task_id
[docs] def get_notes(self, offset=None, limit=None):
"""Generates a report containing the Zone Notes for this :class:`Zone`
:param offset: The starting point at which to retrieve the notes
:param limit: The maximum number of notes to be retrieved
:return: A :class:`list` of :class:`dict` containing :class:`Zone`
Notes
"""
uri = '/ZoneNoteReport/'
api_args = {'zone': self.name}
if offset:
api_args['offset'] = offset
if limit:
api_args['limit'] = limit
response = DynectSession.get_session().execute(uri, 'POST', api_args)
return response['data']
[docs] def add_record(self, name=None, record_type='A', *args, **kwargs):
"""Adds an a record with the provided name and data to this
:class:`Zone`
:param name: The name of the node where this record will be added
:param record_type: The type of record you would like to add.
Valid record_type arguments are: 'A', 'AAAA', 'CERT', 'CNAME',
'DHCID', 'DNAME', 'DNSKEY', 'DS', 'KEY', 'KX', 'LOC', 'IPSECKEY',
'MX', 'NAPTR', 'PTR', 'PX', 'NSAP', 'RP', 'NS', 'SOA', 'SPF',
'SRV', and 'TXT'.
:param args: Non-keyword arguments to pass to the Record constructor
:param kwargs: Keyword arguments to pass to the Record constructor
"""
fqdn = name + '.' + self.name + '.' if name else self.name + '.'
# noinspection PyCallingNonCallable
rec = RECS[record_type](self.name, fqdn, *args, **kwargs)
if record_type in self.records:
self.records[record_type].append(rec)
else:
self.records[record_type] = [rec]
return rec
[docs] def add_service(self, name=None, service_type=None, *args, **kwargs):
"""Add the specified service type to this zone, or to a node under this
zone
:param name: The name of the :class:`Node` where this service will be
attached to or `None` to attach it to the root :class:`Node` of
this :class:`Zone`
:param service_type: The type of the service you would like to create.
Valid service_type arguments are: 'ActiveFailover', 'DDNS',
'DNSSEC', 'DSF', 'GSLB', 'RDNS', 'RTTM', 'HTTPRedirect'
:param args: Non-keyword arguments to pass to the Record constructor
:param kwargs: Keyword arguments to pass to the Record constructor
"""
constructors = {'ActiveFailover': ActiveFailover,
'DDNS': DynamicDNS,
'DNSSEC': DNSSEC,
'DSF': TrafficDirector,
'GSLB': GSLB,
'RDNS': ReverseDNS,
'RTTM': RTTM,
'HTTPRedirect': HTTPRedirect}
fqdn = self.name + '.'
if name:
fqdn = name + '.' + fqdn
if service_type == 'DNSSEC':
# noinspection PyCallingNonCallable
service = constructors[service_type](self.name, *args, **kwargs)
else:
# noinspection PyCallingNonCallable
service = constructors[service_type](self.name, fqdn, *args,
**kwargs)
if service_type in self.services:
self.services[service_type].append(service)
else:
self.services[service_type] = [service]
return service
[docs] def get_all_nodes(self):
"""Returns a list of Node Objects for all subnodes in Zone (Excluding
the Zone itself.)
"""
api_args = {}
uri = '/NodeList/{}/'.format(self._name)
response = DynectSession.get_session().execute(uri, 'GET',
api_args)
nodes = [Node(self._name, fqdn) for fqdn in response['data'] if
fqdn != self._name]
return nodes
[docs] def get_node(self, node=None):
"""Returns all DNS Records for that particular node
:param node: The name of the Node you wish to access, or `None` to get
the root :class:`Node` of this :class:`Zone`
"""
if node:
fqdn = node + '.' + self.name + '.'
else:
fqdn = self.name + '.'
return Node(self.name, fqdn)
[docs] def get_all_records(self):
"""Retrieve a list of all record resources for the specified node and
zone combination as well as all records from any Base_Record below that
point on the zone hierarchy
:return: A :class:`List` of all the :class:`DNSRecord`'s under this
:class:`Zone`
"""
self.records = {}
uri = '/AllRecord/{}/'.format(self._name)
if self.fqdn is not None:
uri += '{}/'.format(self.fqdn)
api_args = {'detail': 'Y'}
response = DynectSession.get_session().execute(uri, 'GET', api_args)
# Strip out empty record_type lists
record_lists = {label: rec_list for label, rec_list in
response['data'].items() if rec_list != []}
records = {}
for key, record_list in record_lists.items():
search = key.split('_')[0].upper()
try:
constructor = RECS[search]
except KeyError:
constructor = RECS['UNKNOWN']
list_records = []
for record in record_list:
del record['zone']
fqdn = record['fqdn']
del record['fqdn']
# Unpack rdata
for r_key, r_val in record['rdata'].items():
record[r_key] = r_val
record['create'] = False
list_records.append(constructor(self._name, fqdn, **record))
records[key] = list_records
return records
[docs] def get_all_records_by_type(self, record_type):
"""Get a list of all :class:`DNSRecord` of type ``record_type`` which
are owned by this node.
:param record_type: The type of :class:`DNSRecord` you wish returned.
Valid record_type arguments are: 'A', 'AAAA', 'CAA', 'CERT',
'CNAME', 'DHCID', 'DNAME', 'DNSKEY', 'DS', 'KEY', 'KX', 'LOC',
'IPSECKEY', 'MX', 'NAPTR', 'PTR', 'PX', 'NSAP', 'RP', 'NS', 'SOA',
'SPF', 'SRV', and 'TXT'.
:return: A :class:`List` of :class:`DNSRecord`'s
"""
names = {'A': 'ARecord', 'AAAA': 'AAAARecord', 'ALIAS': 'ALIASRecord',
'CAA': 'CAARecord', 'CDS': 'CDSRecord', 'CDNSKEY':
'CDNSKEYRecord', 'CERT': 'CERTRecord', 'CSYNC': 'CSYNCRecord',
'CNAME': 'CNAMERecord', 'DHCID': 'DHCIDRecord', 'DNAME':
'DNAMERecord', 'DNSKEY': 'DNSKEYRecord', 'DS': 'DSRecord',
'KEY': 'KEYRecord', 'KX': 'KXRecord', 'LOC': 'LOCRecord',
'IPSECKEY': 'IPSECKEYRecord', 'MX': 'MXRecord', 'NAPTR':
'NAPTRRecord', 'PTR': 'PTRRecord', 'PX': 'PXRecord', 'NSAP':
'NSAPRecord', 'RP': 'RPRecord', 'NS': 'NSRecord', 'SOA':
'SOARecord', 'SPF': 'SPFRecord', 'SRV': 'SRVRecord', 'TLSA':
'TLSARecord', 'TXT': 'TXTRecord', 'SSHFP': 'SSHFPRecord'}
constructor = RECS[record_type]
uri = '/{}/{}/{}/'.format(names[record_type], self._name, self.fqdn)
api_args = {'detail': 'Y'}
response = DynectSession.get_session().execute(uri, 'GET', api_args)
records = []
for record in response['data']:
fqdn = record['fqdn']
del record['fqdn']
del record['zone']
# Unpack rdata
for key, val in record['rdata'].items():
record[key] = val
del record['rdata']
record['create'] = False
records.append(constructor(self._name, fqdn, **record))
return records
[docs] def get_any_records(self):
"""Retrieve a list of all :class:`DNSRecord`'s associated with this
:class:`Zone`
"""
if self.fqdn is None:
return
api_args = {'detail': 'Y'}
uri = '/ANYRecord/{}/{}/'.format(self._name, self.fqdn)
response = DynectSession.get_session().execute(uri, 'GET', api_args)
# Strip out empty record_type lists
record_lists = {label: rec_list for label, rec_list in
response['data'].items() if rec_list != []}
records = {}
for key, record_list in record_lists.items():
search = key.split('_')[0].upper()
try:
constructor = RECS[search]
except KeyError:
constructor = RECS['UNKNOWN']
list_records = []
for record in record_list:
del record['zone']
del record['fqdn']
# Unpack rdata
for r_key, r_val in record['rdata'].items():
record[r_key] = r_val
record['create'] = False
list_records.append(constructor(self._name, self.fqdn,
**record))
records[key] = list_records
return records
[docs] def get_all_active_failovers(self):
"""Retrieve a list of all :class:`ActiveFailover` services associated
with this :class:`Zone`
:return: A :class:`List` of :class:`ActiveFailover` Services
"""
uri = '/Failover/{}/'.format(self._name)
api_args = {'detail': 'Y'}
response = DynectSession.get_session().execute(uri, 'GET', api_args)
afos = []
for failover in response['data']:
del failover['zone']
del failover['fqdn']
afos.append(ActiveFailover(self._name, self._fqdn, api=False,
**failover))
return afos
[docs] def get_all_ddns(self):
"""Retrieve a list of all :class:`DDNS` services associated with this
:class:`Zone`
:return: A :class:`List` of :class:`DDNS` Services
"""
uri = '/DDNS/{}/'.format(self._name)
api_args = {'detail': 'Y'}
response = DynectSession.get_session().execute(uri, 'GET', api_args)
ddnses = []
for svc in response['data']:
del svc['zone']
del svc['fqdn']
ddnses.append(
DynamicDNS(self._name, self._fqdn, api=False, **svc))
return ddnses
[docs] def get_all_httpredirect(self):
"""Retrieve a list of all :class:`HTTPRedirect` services associated
with this :class:`Zone`
:return: A :class:`List` of :class:`HTTPRedirect` Services
"""
uri = '/HTTPRedirect/{}/'.format(self._name)
api_args = {'detail': 'Y'}
response = DynectSession.get_session().execute(uri, 'GET', api_args)
httpredirs = []
for httpredir in response['data']:
fqdn = httpredir['fqdn']
del httpredir['zone']
del httpredir['fqdn']
httpredirs.append(
HTTPRedirect(self._name, fqdn, api=False, **httpredir))
return httpredirs
[docs] def get_all_advanced_redirect(self):
"""Retrieve a list of all :class:`AdvancedRedirect` services associated
with this :class:`Zone`
:return: A :class:`List` of :class:`AdvancedRedirect` Services
"""
uri = '/AdvRedirect/{}/'.format(self._name)
api_args = {'rules': 'Y'}
response = DynectSession.get_session().execute(uri, 'GET', api_args)
advredirs = []
for advredir in response['data']:
del advredir['zone']
del advredir['fqdn']
advredirs.append(
AdvancedRedirect(self._name, self._fqdn,
api=False, **advredir))
return advredirs
[docs] def get_all_gslb(self):
"""Retrieve a list of all :class:`GSLB` services associated with this
:class:`Zone`
:return: A :class:`List` of :class:`GSLB` Services
"""
uri = '/GSLB/{}/'.format(self._name)
api_args = {'detail': 'Y'}
response = DynectSession.get_session().execute(uri, 'GET', api_args)
gslbs = []
for gslb_svc in response['data']:
del gslb_svc['zone']
del gslb_svc['fqdn']
gslbs.append(GSLB(self._name, self._fqdn, api=False, **gslb_svc))
return gslbs
[docs] def get_all_rdns(self):
"""Retrieve a list of all :class:`ReverseDNS` services associated with
this :class:`Zone`
:return: A :class:`List` of :class:`ReverseDNS` Services
"""
uri = '/IPTrack/{}/'.format(self._name)
api_args = {'detail': 'Y'}
response = DynectSession.get_session().execute(uri, 'GET', api_args)
rdnses = []
for rdns in response['data']:
del rdns['zone']
del rdns['fqdn']
rdnses.append(
ReverseDNS(self._name, self._fqdn, api=False, **rdns))
return rdnses
[docs] def get_all_rttm(self):
"""Retrieve a list of all :class:`RTTM` services associated with this
:class:`Zone`
:return: A :class:`List` of :class:`RTTM` Services
"""
uri = '/RTTM/{}/'.format(self._name)
api_args = {'detail': 'Y'}
response = DynectSession.get_session().execute(uri, 'GET', api_args)
rttms = []
for rttm_svc in response['data']:
del rttm_svc['zone']
del rttm_svc['fqdn']
rttms.append(RTTM(self._name, self._fqdn, api=False, **rttm_svc))
return rttms
[docs] def get_qps(self, start_ts, end_ts=None, breakdown=None, hosts=None,
rrecs=None):
"""Generates a report with information about Queries Per Second (QPS)
for this zone
:param start_ts: datetime.datetime instance identifying point in time
for the QPS report
:param end_ts: datetime.datetime instance indicating the end of the
data range for the report. Defaults to datetime.datetime.now()
:param breakdown: By default, most data is aggregated together.
Valid values ('hosts', 'rrecs', 'zones').
:param hosts: List of hosts to include in the report.
:param rrecs: List of record types to include in report.
:return: A :class:`str` with CSV data
"""
end_ts = end_ts or datetime.now()
api_args = {'start_ts': unix_date(start_ts),
'end_ts': unix_date(end_ts),
'zones': [self.name]}
if breakdown is not None:
api_args['breakdown'] = breakdown
if hosts is not None:
api_args['hosts'] = hosts
if rrecs is not None:
api_args['rrecs'] = rrecs
response = DynectSession.get_session().execute('/QPSReport/',
'POST', api_args)
return response['data']
[docs] def delete(self):
"""Delete this :class:`Zone` and perform nessecary cleanups"""
api_args = {}
DynectSession.get_session().execute(self.uri, 'DELETE', api_args)
def __eq__(self, other):
"""Equivalence operations for easily pulling a :class:`Zone` out of a
list of :class:`Zone` objects
"""
if isinstance(other, str):
return other == self._name
elif isinstance(other, Zone):
return other.name == self._name
return False
def __ne__(self, other):
"""Non-Equivalence operator"""
return not self.__eq__(other)
def __str__(self):
"""str override"""
return force_unicode('<Zone>: {}').format(self._name)
__repr__ = __unicode__ = __str__
def __bytes__(self):
"""bytes override"""
return bytes(self.__str__())
[docs]class SecondaryZone(object):
"""A class representing DynECT Secondary zones"""
[docs] def __init__(self, zone, *args, **kwargs):
"""Create a :class:`SecondaryZone` object
:param zone: The name of this secondary zone
:param masters: A list of IPv4 or IPv6 addresses of the master
nameserver(s) for this zone.
:param contact_nickname: Name of the :class:`Contact` that will receive
notifications for this zone
:param tsig_key_name: Name of the TSIG key that will be used to sign
transfer requests to this zone's master
"""
super(SecondaryZone, self).__init__()
self._zone = self._name = zone
self.uri = '/Secondary/{}/'.format(self._zone)
self._masters = self._contact_nickname = self._tsig_key_name = None
self._task_id = None
if 'api' in kwargs:
del kwargs['api']
for key, val in kwargs.items():
setattr(self, '_' + key, val)
elif len(args) == 0 and len(kwargs) == 0:
self._get()
else:
self._post(*args, **kwargs)
def _get(self):
"""Get a :class:`SecondaryZone` object from the DynECT System"""
api_args = {}
response = DynectSession.get_session().execute(self.uri, 'GET',
api_args)
self._build(response['data'])
def _post(self, masters, contact_nickname=None, tsig_key_name=None):
"""Create a new :class:`SecondaryZone` object on the DynECT System"""
self._masters = masters
self._contact_nickname = contact_nickname
self._tsig_key_name = tsig_key_name
api_args = {'masters': self._masters}
if contact_nickname:
api_args['contact_nickname'] = self._contact_nickname
if tsig_key_name:
api_args['tsig_key_name'] = self._tsig_key_name
response = DynectSession.get_session().execute(self.uri, 'POST',
api_args)
self._build(response['data'])
def _update(self, api_args, uri=None):
"""Update this :class:`ActiveFailover`, via the API, with the args in
api_args
"""
if not uri:
uri = self.uri
response = DynectSession.get_session().execute(uri, 'PUT',
api_args)
self._build(response['data'])
def _build(self, data):
"""Build the variables in this object by pulling out the data from data
"""
for key, val in data.items():
if key == "task_id" and not val:
self._task_id = None
elif key == "task_id":
self._task_id = Task(val)
else:
setattr(self, '_' + key, val)
@property
def task(self):
""":class:`Task` for most recent system action
on this :class:`SecondaryZone`.
"""
if self._task_id:
self._task_id.refresh()
return self._task_id
@property
def zone(self):
"""The name of this :class:`SecondaryZone`"""
return self._zone
@zone.setter
def zone(self, value):
pass
@property
def masters(self):
"""A list of IPv4 or IPv6 addresses of the master nameserver(s) for
this zone.
"""
self._get()
return self._masters
@masters.setter
def masters(self, value):
self._masters = value
api_args = {'masters': self._masters}
self._update(api_args)
@property
def contact_nickname(self):
"""Name of the :class:`Contact` that will receive notifications for
this zone
"""
self._get()
return self._contact_nickname
@contact_nickname.setter
def contact_nickname(self, value):
self._contact_nickname = value
api_args = {'contact_nickname': self._contact_nickname}
self._update(api_args)
@property
def tsig_key_name(self):
"""Name of the TSIG key that will be used to sign transfer requests to
this zone's master
"""
self._get()
return self._tsig_key_name
@tsig_key_name.setter
def tsig_key_name(self, value):
self._tsig_key_name = value
api_args = {'tsig_key_name': self._tsig_key_name}
self._update(api_args)
[docs] def activate(self):
"""Activates this secondary zone"""
api_args = {'activate': True}
self._update(api_args)
[docs] def deactivate(self):
"""Deactivates this secondary zone"""
api_args = {'deactivate': True}
self._update(api_args)
[docs] def retransfer(self):
"""Retransfers this secondary zone from its original provider into
Dyn's Managed DNS
"""
api_args = {'retransfer': True}
self._update(api_args)
[docs] def delete(self):
"""Delete this :class:`SecondaryZone`"""
api_args = {}
uri = '/Zone/{}/'.format(self._zone)
DynectSession.get_session().execute(uri, 'DELETE', api_args)
@property
def active(self):
"""Reports the status of :class:`SecondaryZone` Y, L or N"""
self._get()
return self._active
@property
def serial(self):
"""Reports the serial of :class:`SecondaryZone`"""
api_args = {}
uri = '/Zone/{}/'.format(self._zone)
self._update(api_args, uri)
return self._serial
def __str__(self):
"""str override"""
return force_unicode('<SecondaryZone>: {}').format(self._zone)
__repr__ = __unicode__ = __str__
def __bytes__(self):
"""bytes override"""
return bytes(self.__str__())
[docs]class Node(object):
"""Node object. Represents a valid fqdn node within a zone. It should be
noted that simply creating a :class:`Node` object does not actually create
anything on the DynECT System. The only way to actively create a
:class:`Node` on the DynECT System is by attaching either a record or a
service to it.
"""
[docs] def __init__(self, zone, fqdn=None):
"""Create a :class:`Node` object
:param zone: name of the zone that this Node belongs to
:param fqdn: the fully qualified domain name of this zone
"""
super(Node, self).__init__()
self.zone = zone
self.fqdn = fqdn or self.zone + '.'
self.records = self.my_records = {}
self.services = []
[docs] def add_record(self, record_type='A', *args, **kwargs):
"""Adds an a record with the provided data to this :class:`Node`
:param record_type: The type of record you would like to add.
Valid record_type arguments are: 'A', 'AAAA', 'CERT', 'CNAME',
'DHCID', 'DNAME', 'DNSKEY', 'DS', 'KEY', 'KX', 'LOC', 'IPSECKEY',
'MX', 'NAPTR', 'PTR', 'PX', 'NSAP', 'RP', 'NS', 'SOA', 'SPF',
'SRV', and 'TXT'.
:param args: Non-keyword arguments to pass to the Record constructor
:param kwargs: Keyword arguments to pass to the Record constructor
"""
# noinspection PyCallingNonCallable
rec = RECS[record_type](self.zone, self.fqdn, *args, **kwargs)
if record_type in self.records:
self.records[record_type].append(rec)
else:
self.records[record_type] = [rec]
return rec
[docs] def add_service(self, service_type=None, *args, **kwargs):
"""Add the specified service type to this :class:`Node`
:param service_type: The type of the service you would like to create.
Valid service_type arguments are: 'ActiveFailover', 'DDNS',
'DNSSEC', 'DSF', 'GSLB', 'RDNS', 'RTTM', 'HTTPRedirect'
:param args: Non-keyword arguments to pass to the Record constructor
:param kwargs: Keyword arguments to pass to the Record constructor
"""
constructors = {'ActiveFailover': ActiveFailover,
'DDNS': DynamicDNS,
'DNSSEC': DNSSEC,
'DSF': TrafficDirector,
'GSLB': GSLB,
'RDNS': ReverseDNS,
'RTTM': RTTM,
'HTTPRedirect': HTTPRedirect}
# noinspection PyCallingNonCallable
service = constructors[service_type](self.zone, self.fqdn, *args,
**kwargs)
self.services.append(service)
return service
[docs] def get_all_records(self):
"""Retrieve a list of all record resources for the specified node and
zone combination as well as all records from any Base_Record below that
point on the zone hierarchy
"""
self.records = {}
uri = '/AllRecord/{}/'.format(self.zone)
if self.fqdn is not None:
uri += '{}/'.format(self.fqdn)
api_args = {'detail': 'Y'}
response = DynectSession.get_session().execute(uri, 'GET', api_args)
# Strip out empty record_type lists
record_lists = {label: rec_list for label, rec_list in
response['data'].items() if rec_list != []}
records = {}
for key, record_list in record_lists.items():
search = key.split('_')[0].upper()
try:
constructor = RECS[search]
except KeyError:
constructor = RECS['UNKNOWN']
list_records = []
for record in record_list:
del record['zone']
fqdn = record['fqdn']
del record['fqdn']
# Unpack rdata
for r_key, r_val in record['rdata'].items():
record[r_key] = r_val
record['create'] = False
list_records.append(constructor(self.zone, fqdn, **record))
records[key] = list_records
return records
[docs] def get_all_records_by_type(self, record_type):
"""Get a list of all :class:`DNSRecord` of type ``record_type`` which
are owned by this node.
:param record_type: The type of :class:`DNSRecord` you wish returned.
Valid record_type arguments are: 'A', 'AAAA', 'CERT', 'CNAME',
'DHCID', 'DNAME', 'DNSKEY', 'DS', 'KEY', 'KX', 'LOC', 'IPSECKEY',
'MX', 'NAPTR', 'PTR', 'PX', 'NSAP', 'RP', 'NS', 'SOA', 'SPF',
'SRV', and 'TXT'.
:return: A list of :class:`DNSRecord`'s
"""
names = {'A': 'ARecord', 'AAAA': 'AAAARecord', 'CAA': 'CAARecord',
'CERT': 'CERTRecord', 'CNAME': 'CNAMERecord',
'DHCID': 'DHCIDRecord', 'DNAME': 'DNAMERecord',
'DNSKEY': 'DNSKEYRecord', 'DS': 'DSRecord',
'KEY': 'KEYRecord', 'KX': 'KXRecord', 'LOC': 'LOCRecord',
'IPSECKEY': 'IPSECKEYRecord', 'MX': 'MXRecord',
'NAPTR': 'NAPTRRecord', 'PTR': 'PTRRecord',
'PX': 'PXRecord', 'NSAP': 'NSAPRecord', 'RP': 'RPRecord',
'NS': 'NSRecord', 'SOA': 'SOARecord', 'SPF': 'SPFRecord',
'SRV': 'SRVRecord', 'TLSA': 'TLSARecord',
'TXT': 'TXTRecord', 'SSHFP': 'SSHFPRecord',
'ALIAS': 'ALIASRecord'}
constructor = RECS[record_type]
uri = '/{}/{}/{}/'.format(names[record_type], self.zone,
self.fqdn)
api_args = {'detail': 'Y'}
response = DynectSession.get_session().execute(uri, 'GET', api_args)
records = []
for record in response['data']:
fqdn = record['fqdn']
del record['fqdn']
del record['zone']
# Unpack rdata
for key, val in record['rdata'].items():
record[key] = val
del record['rdata']
record['create'] = False
records.append(constructor(self.zone, fqdn, **record))
return records
[docs] def get_any_records(self):
"""Retrieve a list of all recs"""
if self.fqdn is None:
return
api_args = {'detail': 'Y'}
uri = '/ANYRecord/{}/{}/'.format(self.zone, self.fqdn)
response = DynectSession.get_session().execute(uri, 'GET', api_args)
# Strip out empty record_type lists
record_lists = {label: rec_list for label, rec_list in
response['data'].items() if rec_list != []}
records = {}
for key, record_list in record_lists.items():
search = key.split('_')[0].upper()
try:
constructor = RECS[search]
except KeyError:
constructor = RECS['UNKNOWN']
list_records = []
for record in record_list:
del record['zone']
del record['fqdn']
# Unpack rdata
for r_key, r_val in record['rdata'].items():
record[r_key] = r_val
record['create'] = False
list_records.append(
constructor(self.zone, self.fqdn, **record))
records[key] = list_records
return records
[docs] def delete(self):
"""Delete this node, any records within this node, and any nodes
underneath this node
"""
uri = '/Node/{}/{}'.format(self.zone, self.fqdn)
DynectSession.get_session().execute(uri, 'DELETE', {})
def __str__(self):
"""str override"""
return force_unicode('<Node>: {}').format(self.fqdn)
__repr__ = __unicode__ = __str__
def __bytes__(self):
"""bytes override"""
return bytes(self.__str__())
[docs]class TSIG(object):
"""A class representing DynECT TSIG Records"""
[docs] def __init__(self, name, *args, **kwargs):
"""Create a :class:`TSIG` object
:param name: The name of the TSIG key for :class:`TSIG` object
:param algorithm: Algorithm used for :class:`TSIG` object. Valid
options: hmac-sha1, hmac-md5, hmac-sha224,hmac-sha256, hmac-sha384,
hmac-sha512
:param secret: Secret key used by :class:`TSIG` object
"""
self._name = name
self.uri = '/TSIGKey/{}/'.format(self._name)
self._secret = None
self._algorithm = None
if len(args) == 0 and len(kwargs) == 0:
self._get()
else:
self._post(*args, **kwargs)
def _get(self):
"""Get a :class:`TSIG` object from the DynECT System"""
api_args = {'name': self._name}
response = DynectSession.get_session().execute(self.uri, 'GET',
api_args)
for key, val in response['data'].items():
setattr(self, '_' + key, val)
def _post(self, *args, **kwargs):
"""Create a new :class:`TSIG` object on the DynECT System"""
api_args = {'name': self._name, 'secret': kwargs['secret'],
'algorithm': kwargs['algorithm']}
response = DynectSession.get_session().execute(self.uri, 'POST',
api_args)
for key, val in response['data'].items():
setattr(self, '_' + key, val)
@property
def secret(self):
"""Gets Secret key of :class:`TSIG` object"""
self._get()
return self._secret
@secret.setter
def secret(self, secret):
"""
Sets secret key of :class:`TSIG` object
:param secret: key
"""
api_args = {'name': self._name, 'secret': secret}
response = DynectSession.get_session().execute(self.uri, 'PUT',
api_args)
for key, val in response['data'].items():
setattr(self, '_' + key, val)
@property
def algorithm(self):
"""Gets Algorithm of :class:`TSIG` object. """
self._get()
return self._algorithm
@algorithm.setter
def algorithm(self, algorithm):
"""
Sets Algorithm of :class:`TSIG` object.
:param algorithm: hmac-sha1, hmac-md5, hmac-sha224,
hmac-sha256, hmac-sha384, hmac-sha512
"""
api_args = {'name': self._name, 'algorithm': algorithm}
response = DynectSession.get_session().execute(self.uri, 'PUT',
api_args)
for key, val in response['data'].items():
setattr(self, '_' + key, val)
@property
def name(self):
"""Gets name of TSIG Key in :class:`TSIG`"""
return self._name
[docs] def delete(self):
api_args = {}
DynectSession.get_session().execute(self.uri, 'DELETE',
api_args)
class ExternalNameserver(object):
"""A class representing DynECT External Nameserver """
def __init__(self, zone, *args, **kwargs):
"""Create a :class:`ExternalNameserver` object
:param zone: The name of the zone for this :class:`ExternalNameserver`
:param deny: does this block requests or add them
:param hosts: list of :class:`ExternalNameserverEntry`
:param active: active? Y/N
:param tsig_key_name: Name of TSIG to associate with this
:class:`ExternalNameserver`
"""
self._zone = zone
self.uri = '/ExtNameserver/{}/'.format(self._zone)
self._deny = None
self._hosts = None
self._active = None
self._tsig_key_name = None
if len(args) == 0 and len(kwargs) == 0:
self._get()
else:
self._post(*args, **kwargs)
def _get(self):
"""Get a :class:`ExternalNameserver` object from the DynECT System"""
api_args = {'zone': self._zone}
response = DynectSession.get_session().execute(self.uri, 'GET',
api_args)
self._build(response['data'])
def _post(self, *args, **kwargs):
"""Create a new :class:`ExternalNameserver`
object on the DynECT System"""
api_args = {'zone': self._zone}
self._deny = kwargs.get('deny', None)
if self._deny:
api_args['deny'] = self._deny
self._tsig_key_name = kwargs.get('tsig_key_name', None)
if self._tsig_key_name:
api_args['tsig_key_name'] = self._tsig_key_name
self._active = kwargs.get('active', None)
if self._active:
api_args['active'] = self._active
self._hosts = kwargs.get('hosts', None)
if self._hosts:
api_args['hosts'] = list()
for host in self._hosts:
if isinstance(host, ExternalNameserverEntry):
api_args['hosts'].append(host._json)
else:
api_args['hosts'].append(host)
response = DynectSession.get_session().execute(self.uri, 'POST',
api_args)
self._build(response['data'])
def _update(self, api_args=None):
"""Update an existing :class:`AdvancedRedirect` Service
on the DynECT System"""
response = DynectSession.get_session().execute(self.uri, 'PUT',
api_args)
self._build(response['data'])
def _build(self, data):
self._hosts = []
for key, val in data.items():
if key == 'hosts':
for host in val:
host['api'] = 'Y'
self._hosts.append(ExternalNameserverEntry(
host['address'], notifies=host['notifies']))
continue
setattr(self, '_' + key, val)
@property
def deny(self):
"""Gets deny value :class:`ExternalNameserver` object"""
self._get()
return self._deny
@deny.setter
def deny(self, deny):
"""
Sets deny value of :class:`ExternalNameserver` object
:param deny: Y/N
"""
api_args = {'zone': self._zone, 'deny': deny}
self._update(api_args=api_args)
@property
def tsig_key_name(self):
"""Gets tsig_key_name value :class:`ExternalNameserver` object"""
self._get()
return self._tsig_key_name
@tsig_key_name.setter
def tsig_key_name(self, tsig_key_name):
"""
Sets deny value of :class:`ExternalNameserver` object
:param deny: Y/N
"""
api_args = {'zone': self._zone, 'tsig_key_name': tsig_key_name}
self._update(api_args=api_args)
@property
def hosts(self):
"""
:class:`ExternalNameserver` hosts. list of ExternalNameserverEntries
"""
self._get()
return self._hosts
@hosts.setter
def hosts(self, value):
api_args = dict()
api_args['hosts'] = list()
for host in value:
if isinstance(host, ExternalNameserverEntry):
api_args['hosts'].append(host._json)
else:
api_args['hosts'].append(host)
self._update(api_args)
@property
def active(self):
"""Gets active status of :class:`ExternalNameserver` object. """
self._get()
return self._active
@active.setter
def active(self, active):
"""
Sets active status of :class:`ExternalNameserver` object.
:param active: Y/N
"""
api_args = {'zone': self._zone, 'active': active}
self._update(api_args=api_args)
@property
def zone(self):
"""Gets name of zone in :class:`ExternalNameserver`"""
return self._zone
def delete(self):
api_args = {}
DynectSession.get_session().execute(self.uri, 'DELETE',
api_args)
class ExternalNameserverEntry(object):
"""A class representing DynECT :class:`ExternalNameserverEntry`"""
def __init__(self, address, *args, **kwargs):
"""Create a :class:`ExternalNameserverEntry` object
:param address: address or CIDR of this nameserver Entry
:param notifies: Y/N Do we send notifies to this host?
"""
self._address = address
self._notifies = kwargs.get('notifies')
@property
def _json(self):
"""Get the JSON representation of this :class:`ExternalNameserverEntry`
object
"""
json_blob = {'address': self._address, 'notifies': self._notifies}
return {k: v for k, v in json_blob.items() if v is not None}
@property
def address(self):
"""Gets address value :class:`ExternalNameserverEntry` object"""
return self._address
@address.setter
def address(self, address):
"""
Sets address of :class:`ExternalNameserverEntry` object
:param address: address or CIDR
"""
self._address = address
@property
def notifies(self):
"""Gets address value :class:`ExternalNameserverEntry` object"""
return self._notifies
@notifies.setter
def notifies(self, notifies):
"""
Sets notifies of :class:`ExternalNameserverEntry` object
:param notifies: send notifies to this server. Y/N
"""
self._notifies = notifies
def __str__(self):
"""str override"""
return force_unicode(
'<ExternalNameserverEntry>: {}, Notifies: {}').format(
self._address,
self._notifies)
__repr__ = __unicode__ = __str__
def __bytes__(self):
"""bytes override"""
return bytes(self.__str__())