diff --git a/src/tests/intg/Makefile.am b/src/tests/intg/Makefile.am index 0cfd268dce7..a04788eadcd 100644 --- a/src/tests/intg/Makefile.am +++ b/src/tests/intg/Makefile.am @@ -39,7 +39,6 @@ dist_noinst_DATA = \ test_ssh_pubkey.py \ test_pam_responder.py \ test_sudo.py \ - test_resolver.py \ conftest.py \ sssd_hosts.py \ sssd_nets.py \ diff --git a/src/tests/intg/test_resolver.py b/src/tests/intg/test_resolver.py deleted file mode 100644 index 072d41e7bfd..00000000000 --- a/src/tests/intg/test_resolver.py +++ /dev/null @@ -1,327 +0,0 @@ -# -# Resolver integration test -# -# Authors: -# Samuel Cabrero -# -# Copyright (C) 2019 SUSE LINUX GmbH, Nuernberg, Germany. -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -import os -import stat -import signal -import subprocess -import time -import ldap -import pytest -import socket -import config -import ds_openldap -import ldap_ent -from util import unindent -from sssd_nss import NssReturnCode, HostError -from sssd_hosts import call_sssd_gethostbyname -from sssd_nets import call_sssd_getnetbyname, call_sssd_getnetbyaddr - -LDAP_BASE_DN = "dc=example,dc=com" - - -@pytest.fixture(scope="module") -def ds_inst(request): - """LDAP server instance fixture""" - ds_inst = ds_openldap.DSOpenLDAP( - config.PREFIX, 10389, LDAP_BASE_DN, - "cn=admin", "Secret123" - ) - - try: - ds_inst.setup() - except Exception: - ds_inst.teardown() - raise - request.addfinalizer(ds_inst.teardown) - return ds_inst - - -@pytest.fixture(scope="module") -def ldap_conn(request, ds_inst): - """LDAP server connection fixture""" - ldap_conn = ds_inst.bind() - ldap_conn.ds_inst = ds_inst - request.addfinalizer(ldap_conn.unbind_s) - return ldap_conn - - -def create_ldap_entries(ldap_conn, ent_list=None): - """Add LDAP entries from ent_list""" - if ent_list is not None: - for entry in ent_list: - ldap_conn.add_s(entry[0], entry[1]) - - -def cleanup_ldap_entries(ldap_conn, ent_list=None): - """Remove LDAP entries added by create_ldap_entries""" - if ent_list is None: - for ou in ("Users", "Groups", "Netgroups", "Services", "Policies", - "Hosts", "Networks"): - for entry in ldap_conn.search_s(f"ou={ou}," - f"{ldap_conn.ds_inst.base_dn}", - ldap.SCOPE_ONELEVEL, - attrlist=[]): - ldap_conn.delete_s(entry[0]) - else: - for entry in ent_list: - ldap_conn.delete_s(entry[0]) - - -def create_ldap_cleanup(request, ldap_conn, ent_list=None): - """Add teardown for removing all user/group LDAP entries""" - request.addfinalizer(lambda: cleanup_ldap_entries(ldap_conn, ent_list)) - - -def create_ldap_fixture(request, ldap_conn, ent_list=None): - """Add LDAP entries and add teardown for removing them""" - create_ldap_entries(ldap_conn, ent_list) - create_ldap_cleanup(request, ldap_conn, ent_list) - - -SCHEMA_RFC2307 = "rfc2307" -SCHEMA_RFC2307_BIS = "rfc2307bis" - - -def format_basic_conf(ldap_conn, schema): - """Format a basic SSSD configuration""" - schema_conf = "ldap_schema = " + schema + "\n" - if schema == SCHEMA_RFC2307_BIS: - schema_conf += "ldap_group_object_class = groupOfNames\n" - iphost_search_base = "ou=Hosts," + ldap_conn.ds_inst.base_dn - ipnetwork_search_base = "ou=Networks," + ldap_conn.ds_inst.base_dn - return unindent("""\ - [sssd] - debug_level = 0xffff - domains = LDAP - services = nss - - [nss] - debug_level = 0xffff - memcache_timeout = 0 - entry_negative_timeout = 1 - - [domain/LDAP] - ldap_auth_disable_tls_never_use_in_production = true - ldap_id_use_start_tls = false - debug_level = 0xffff - {schema_conf} - id_provider = ldap - auth_provider = ldap - resolver_provider = ldap - ldap_uri = {ldap_conn.ds_inst.ldap_url} - ldap_search_base = {ldap_conn.ds_inst.base_dn} - ldap_iphost_search_base = {iphost_search_base} - ldap_ipnetwork_search_base = {ipnetwork_search_base} - """).format(**locals()) - - -def create_conf_file(contents): - """Create sssd.conf with specified contents""" - conf = open(config.CONF_PATH, "w") - conf.write(contents) - conf.close() - os.chmod(config.CONF_PATH, stat.S_IRUSR | stat.S_IWUSR) - - -def cleanup_conf_file(): - """Remove sssd.conf, if it exists""" - if os.path.lexists(config.CONF_PATH): - os.unlink(config.CONF_PATH) - - -def create_conf_cleanup(request): - """Add teardown for removing sssd.conf""" - request.addfinalizer(cleanup_conf_file) - - -def create_conf_fixture(request, contents): - """ - Create sssd.conf with specified contents and add teardown for removing it - """ - create_conf_file(contents) - create_conf_cleanup(request) - - -def create_sssd_process(): - """Start the SSSD process""" - if subprocess.call(["sssd", "-D", "--logger=files"]) != 0: - raise Exception("sssd start failed") - - -def get_sssd_pid(): - pid_file = open(config.PIDFILE_PATH, "r") - pid = int(pid_file.read()) - return pid - - -def cleanup_sssd_process(): - """Stop the SSSD process and remove its state""" - try: - pid = get_sssd_pid() - os.kill(pid, signal.SIGTERM) - while True: - try: - os.kill(pid, signal.SIGCONT) - except OSError: - break - time.sleep(1) - except OSError: - pass - for path in os.listdir(config.DB_PATH): - os.unlink(config.DB_PATH + "/" + path) - for path in os.listdir(config.MCACHE_PATH): - os.unlink(config.MCACHE_PATH + "/" + path) - - -def create_sssd_fixture(request): - """Start SSSD and add teardown for stopping it and removing its state""" - create_sssd_process() - create_sssd_cleanup(request) - - -def create_sssd_cleanup(request): - """Add teardown for stopping SSSD and removing its state""" - request.addfinalizer(cleanup_sssd_process) - - -@pytest.fixture -def add_hosts(request, ldap_conn): - ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) - - ent_list.add_host("host1", - aliases=["host1_alias1", "host1_alias2"], - addresses=["192.168.1.1", "192.168.1.2", - "2001:db8:1::1", "2001:db8:1::2"]) - ent_list.add_host("host2.example.com", - aliases=["host2_alias1.example.com", - "host2_alias2.example.com"], - addresses=["192.168.2.1", "192.168.2.2", - "2001:db8:2::1", "2001:db8:2::2"]) - - create_ldap_fixture(request, ldap_conn, ent_list) - conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307) - create_conf_fixture(request, conf) - create_sssd_fixture(request) - return None - - -@pytest.fixture -def add_nets(request, ldap_conn): - ent_list = ldap_ent.List(ldap_conn.ds_inst.base_dn) - - ent_list.add_ipnet("net1", "192.168.1.1", - aliases=["net1_alias1", "net1_alias2"]) - ent_list.add_ipnet("net2", "10.2.2.2", - aliases=["net2_alias1", "net2_alias2"]) - - create_ldap_fixture(request, ldap_conn, ent_list) - conf = format_basic_conf(ldap_conn, SCHEMA_RFC2307) - create_conf_fixture(request, conf) - create_sssd_fixture(request) - return None - - -def test_hostbyname(add_hosts): - (res, hres, _) = call_sssd_gethostbyname("invalid") - - assert res == NssReturnCode.NOTFOUND - assert hres == HostError.HOST_NOT_FOUND - - (res, hres, _) = call_sssd_gethostbyname("example.com") - assert res == NssReturnCode.NOTFOUND - assert hres == HostError.HOST_NOT_FOUND - - (res, hres, _) = call_sssd_gethostbyname("invalid.example.com") - assert res == NssReturnCode.NOTFOUND - assert hres == HostError.HOST_NOT_FOUND - - (res, hres, _) = call_sssd_gethostbyname("host1") - assert res == NssReturnCode.SUCCESS - assert hres == 0 - - (res, hres, _) = call_sssd_gethostbyname("host1_alias1") - assert res == NssReturnCode.SUCCESS - assert hres == 0 - - (res, hres, _) = call_sssd_gethostbyname("host1_alias2") - assert res == NssReturnCode.SUCCESS - assert hres == 0 - - (res, hres, _) = call_sssd_gethostbyname("host2.example.com") - assert res == NssReturnCode.SUCCESS - assert hres == 0 - - (res, hres, _) = call_sssd_gethostbyname("host2_alias1.example.com") - assert res == NssReturnCode.SUCCESS - assert hres == 0 - - (res, hres, _) = call_sssd_gethostbyname("host2_alias2.example.com") - assert res == NssReturnCode.SUCCESS - assert hres == 0 - - -def test_netbyname(add_nets): - (res, hres, _) = call_sssd_getnetbyname("invalid") - assert res == NssReturnCode.NOTFOUND - assert hres == HostError.HOST_NOT_FOUND - - (res, hres, _) = call_sssd_getnetbyname("net1") - assert res == NssReturnCode.SUCCESS - assert hres == 0 - - (res, hres, _) = call_sssd_getnetbyname("net1_alias1") - assert res == NssReturnCode.SUCCESS - assert hres == 0 - - (res, hres, _) = call_sssd_getnetbyname("net1_alias2") - assert res == NssReturnCode.SUCCESS - assert hres == 0 - - (res, hres, _) = call_sssd_getnetbyname("net2") - assert res == NssReturnCode.SUCCESS - assert hres == 0 - - (res, hres, _) = call_sssd_getnetbyname("net2_alias1") - assert res == NssReturnCode.SUCCESS - assert hres == 0 - - (res, hres, _) = call_sssd_getnetbyname("net2_alias2") - assert res == NssReturnCode.SUCCESS - assert hres == 0 - - -def test_netbyaddr(add_nets): - (res, hres, _) = call_sssd_getnetbyaddr("10.2.2.1", socket.AF_INET) - assert res == NssReturnCode.NOTFOUND - assert hres == HostError.HOST_NOT_FOUND - - (res, hres, _) = call_sssd_getnetbyaddr("10.2.2.1", socket.AF_UNSPEC) - assert res == NssReturnCode.NOTFOUND - assert hres == HostError.HOST_NOT_FOUND - - (res, hres, _) = call_sssd_getnetbyaddr("10.2.2.2", socket.AF_INET) - assert res == NssReturnCode.SUCCESS - assert hres == 0 - - (res, hres, _) = call_sssd_getnetbyaddr("10.2.2.2", socket.AF_UNSPEC) - assert res == NssReturnCode.SUCCESS - assert hres == 0