Skip to content

Commit

Permalink
Merge pull request #2 from mdavis332/feature/api-v2
Browse files Browse the repository at this point in the history
add support for v2 of fsi api
  • Loading branch information
wesyoung authored Dec 15, 2020
2 parents 01e6aab + 60884f2 commit 4989b94
Show file tree
Hide file tree
Showing 2 changed files with 218 additions and 24 deletions.
92 changes: 71 additions & 21 deletions csirtg_dnsdb/client.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#!/usr/bin/env python

import os
import string
from . import VERSION
import logging
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from argparse import ArgumentParser, Action, ArgumentError, RawDescriptionHelpFormatter
import textwrap
import requests
from .exceptions import QuotaLimit
from .exceptions import DNSDBException, QuotaLimit
try:
import ujson as json
except ImportError:
Expand All @@ -15,50 +16,91 @@

logger = logging.getLogger(__name__)

REMOTE = os.environ.get('FARSIGHT_REMOTE', 'https://api.dnsdb.info')
REMOTE = os.environ.get('FARSIGHT_REMOTE', 'https://api.dnsdb.info/dnsdb/v2')
TOKEN = os.environ.get('FARSIGHT_TOKEN')


class Client(object):

def __init__(self, token=TOKEN, remote=REMOTE, limit=None, **kwargs):
def __init__(self, token=TOKEN, remote=REMOTE, **kwargs):
self.remote = remote
self.token = token
self.limit = limit
self.last_request = ''

self.session = requests.session()
self.session.headers['User-Agent'] = "csirtg-dnsdb-py/{0}".format(VERSION)
self.session.headers['X-Api-Key'] = self.token
self.session.headers['Accept'] = 'application/json'
self.session.headers['Accept'] = 'application/jsonl'

def search(self, i, limit=None):
self.PREFIX_MAP = {
'standard': 'lookup',
'keyword': 'glob',
'glob': 'glob',
'regex': 'regex'
}

def search(self, i, search_type='standard', limit=5000):
params = {}

if limit:
params['limit'] = limit

path = '/rdata/ip'
try:
socket.inet_aton(i)
except:
if '/' in i:
i = i.replace('/', ',')
else:
if search_type == 'standard':
path = '/rdata'
try:
socket.inet_aton(i)
path += '/ip'
except:
if '/' in i:
i = i.replace('/', ',')
path = '/rrset/name'

path = '{}/lookup{}/{}'.format(self.remote, path, i)
else:
# if we strip out all punc and are only left with nums, we prob want rdata. bad assumption?
if i.translate(str.maketrans('', '', string.punctuation)).isnumeric():
path = '/rdata'
else:
path = '/rrnames'

if search_type == 'keyword':
# if the our first char of our keyword query is a not a wildcard, prepend a wildcard
if i[0] is not '*':
i = '*' + i
# everybody gets a wildcard at the end for keyword 'cuz that's how dnsdb scout seems to do it
i += '*'

# TODO: implement rtype options. atm, gets back any. pytests will need to be fixed for .endswith
# i += '/ANY'

endpoint = self.PREFIX_MAP[search_type]
path = '{}/{}{}/{}'.format(self.remote, endpoint, path, i)
self.last_request = path

r = self.session.get(path, params=params, stream=True)

if r.status_code == 200:
for line in r.iter_lines():
lines = r.iter_lines()
first = next(lines).decode('utf-8')
if first != '{"cond":"begin"}':
raise DNSDBException('Unusual response received: {}'.format(first))

for line in lines:
if not line:
continue

yield (json.loads(line.decode('utf-8')))

line_dec = json.loads(line.decode('utf-8')).get('obj')
if line_dec:
yield (line_dec)

if r.status_code == 429:
raise QuotaLimit('API quota reached..')

class check_limit(Action):
def __call__(self, parser, namespace, limit, option_string=None):
if not 1 <= limit <= 100000:
raise ArgumentError(self, 'limit must be 1 - 100,000')
setattr(namespace, self.dest, limit)


def main():

Expand All @@ -70,14 +112,22 @@ def main():
formatter_class=RawDescriptionHelpFormatter,
prog='dnsdb',
)
p.add_argument('--token', help='specify api token', default=TOKEN)
p.add_argument('--token', help='specify api token (default pulls from FARSIGHT_TOKEN envvar)', default=TOKEN)
p.add_argument('--search', '-q', help='search for something')
p.add_argument('--search-type', '-t', choices=('standard', 'keyword', 'glob', 'regex'), nargs='?', const='standard', dest='type',
help='specify a search type (default is standard)', default='standard'
)
p.add_argument('--limit', '-l', type=int, metavar=('(1-100000)'), action=check_limit, nargs='?', const=5000,
help='max number of results to return (default of 5000)', default=5000
)
args = p.parse_args()

c = Client(token=args.token)

for r in c.search(args.search):
print(json.dumps(r))
for r in c.search(i=args.search, search_type=args.type, limit=args.limit):
ignore_resps = [ '{"cond":"succeeded"', '{"cond":"limited","msg":"Result limit reached"}' ]
if r not in ignore_resps:
print(json.dumps(r))


if __name__ == "__main__":
Expand Down
150 changes: 147 additions & 3 deletions test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,166 @@

def test_client():
c = Client(token='1234')
assert c


@pytest.mark.skipif(DISABLE_TESTS, reason='need to set FARSIGHT_TOKEN to run')
def test_client_live():
def test_standard_rrname_search_live():
c = Client()

try:
r = c.search('172.217.6.206')
r = c.search('google.com', limit=5)
assert len(list(r)) > 0
except QuotaLimit:
pass

def test_standard_rrname_search_mocked():
c = Client()

try:
list(c.search('google.com', limit=5))
assert c.__dict__['last_request'].endswith('/lookup/rrset/name/google.com')
except QuotaLimit:
pass


@pytest.mark.skipif(DISABLE_TESTS, reason='need to set FARSIGHT_TOKEN to run')
def test_standard_rdata_search_live():
c = Client()

try:
r = c.search('8.8.8.8', limit=5)
assert len(list(r)) > 0
except QuotaLimit:
pass


def test_standard_rdata_search_mocked():
c = Client()

try:
list(c.search('8.8.8.8', limit=5))
assert c.__dict__['last_request'].endswith('/lookup/rdata/ip/8.8.8.8')
except QuotaLimit:
pass

@pytest.mark.skipif(DISABLE_TESTS, reason='need to set FARSIGHT_TOKEN to run')
def test_keyword_rrname_search_live():
c = Client()

try:
r = c.search('google.com', search_type='keyword', limit=5)
assert len(list(r)) > 0
except QuotaLimit:
pass


def test_keyword_rrname_search_mocked():
c = Client()

try:
r = c.search('google.com')
list(c.search('google.com', search_type='keyword', limit=5))
assert c.__dict__['last_request'].endswith('/glob/rrnames/*google.com*')
except QuotaLimit:
pass

@pytest.mark.skipif(DISABLE_TESTS, reason='need to set FARSIGHT_TOKEN to run')
def test_keyword_rdata_search_live():
c = Client()

try:
r = c.search('9.9.9.', search_type='keyword', limit=5)
assert len(list(r)) > 0
except QuotaLimit:
pass

def test_keyword_rdata_search_mocked():
c = Client()

try:
list(c.search('9.9.9.', search_type='keyword', limit=5))
assert c.__dict__['last_request'].endswith('/glob/rdata/*9.9.9.*')
except QuotaLimit:
pass


@pytest.mark.skipif(DISABLE_TESTS, reason='need to set FARSIGHT_TOKEN to run')
def test_globbing_rdata_search_live():
c = Client()

try:
r = c.search('9.*.9.*', limit=5, search_type='glob')
assert len(list(r)) > 0
except QuotaLimit:
pass


def test_globbing_rdata_search_mocked():
c = Client()

try:
list(c.search('9.*.9.*', limit=5, search_type='glob'))
assert c.__dict__['last_request'].endswith('/glob/rdata/9.*.9.*')
except QuotaLimit:
pass

@pytest.mark.skipif(DISABLE_TESTS, reason='need to set FARSIGHT_TOKEN to run')
def test_globbing_rrname_search_live():
c = Client()

try:
r = c.search('*.google.com*', limit=5, search_type='glob')
assert len(list(r)) > 0
except QuotaLimit:
pass


def test_globbing_rrname_search_mocked():
c = Client()

try:
list(c.search('.google.com', limit=5, search_type='glob'))
assert c.__dict__['last_request'].endswith('/glob/rrnames/.google.com')
except QuotaLimit:
pass


@pytest.mark.skipif(DISABLE_TESTS, reason='need to set FARSIGHT_TOKEN to run')
def test_regex_rrname_search_live():
c = Client()

try:
r = c.search(r'.*\.google\.com\.', limit=5, search_type='regex')
assert len(list(r)) > 0
except QuotaLimit:
pass


def test_regex_rrname_search_mocked():
c = Client()

try:
list(c.search(r'.*\.google\.com\.', limit=5, search_type='regex'))
assert c.__dict__['last_request'].endswith(r'/regex/rrnames/.*\.google\.com\.')
except QuotaLimit:
pass

@pytest.mark.skipif(DISABLE_TESTS, reason='need to set FARSIGHT_TOKEN to run')
def test_regex_rdata_search_live():
c = Client()

try:
r = c.search(r'1\.1\.1\.1', limit=5, search_type='regex')
assert len(list(r)) > 0
except QuotaLimit:
pass


def test_regex_rdata_search_mocked():
c = Client()

try:
list(c.search(r'1\.1\.1\.1', limit=5, search_type='regex'))
assert c.__dict__['last_request'].endswith(r'/regex/rdata/1\.1\.1\.1')
except QuotaLimit:
pass

0 comments on commit 4989b94

Please sign in to comment.