#!/usr/libexec/platform-python
#
# Sample test script demonstrating the use of the susetest
# framework.
#
# To run this script, you first have to provision two nodes
# (one client, one server) and put all relevant information
# on these nodes into a config file that should look like this:
#
# backend       "vagrant";
# testcase      "rpc";
# node "client" {
#     ipv4_address  "192.168.121.196";
#     target        "tcp:192.168.121.196:4000";
# }
# node "server" {
#     ipv4_address  "192.168.121.231";
#     target        "tcp:192.168.121.231:4000";
# }
#
# Then, invoke this script like this:
#  ./run --config $path_to_config_file
#
#
# One way to provision the nodes (and create the config
# file above) is to use twopence-provision.
#
# Copyright (C) 2021 Olaf Kirch <okir@suse.de>

import susetest

susetest.requireAddressResource("ipv4_address")
susetest.optionalAddressResource("ipv6_address")

# Quite a few RPC applications (eg rpcinfo) will attempt a call to bindresvport.
# This is expected behavior, and we should not fail a test case because of this.
#
# The following function looks at a parsed AVC audit message. If it is
# a name_bind operation on a udp or tcp socket in the reserved range,
# it downgrades the severity to "info" which means it gets reported in the
# log, but is not considered a reason to fail the test case.
def bindresvportConsideredHarmless(violation):
	if violation.op == "name_bind" and \
	   violation.tclass in ('udp_socket', 'tcp_socket') and \
	   violation.src and violation.src.isdecimal() and \
	   int(violation.src) < 1024:
		return "info"

	return None

@susetest.setup
def setup(driver):
	'''Ensure we have all the resources this test suite requires'''
	global rpcbind, rpcinfo, square_daemon, square_client

	# Enable rpcbind on the server
	rpcbind = driver.requireService("rpcbind", "server")

	# Locate rpcinfo on the client
	rpcinfo = driver.client.requireExecutable("rpcinfo")

	square_daemon = driver.server.requireExecutable("rpc.squared")
	square_client = driver.client.requireExecutable("square")

	driver.client.defaultUser = driver.client.test_user

	selinux = driver.getFeature("selinux")
	if selinux:
		selinux.addMessageFilter(bindresvportConsideredHarmless)

@susetest.test
def rpc_run_rpcinfo_root(driver):
	'''rpcinfo-root: check that root can run rpcinfo'''

	rpcinfo.runOrFail("-p %s" % driver.server.ipv4_address, user = "root")

@susetest.test
def rpc_run_rpcinfo_user(driver):
	'''rpcinfo-user: check that test user can run rpcinfo'''

	rpcinfo.runOrFail("-p %s" % driver.server.ipv4_address)

@susetest.test
def rpc_check_rpcbind_user(driver):
	'''daemonuser: check rpcbind user'''

	user = rpcbind.user
	if user == "rpc":
		driver.server.logInfo("OK: rpcbind runs as user rpc")
	else:
		driver.server.logFailure("rpcbind runs as user %s (expected rpc)" % user)

##################################################################
# Parameterized test functions
#
# The way this works is you define a function that takes
# a driver and a list (args). The function's __doc__ string
# should include the string "@ARGS"
#
# def myfunc(driver, args):
#	'''testid: do something with @ARGS'''
#
#	fnord(args)
#
# Subsequently, you can instantiate test cases that invoke
# this function with a given set of arguments, like this:
#
# susetest.define_parameterized(myfunc, 42, "blah", "@server:ipv4_address")
#
# This will register a test case that performs a bit of magic string
# substitution on the argument list, and then invokes myfunc as
#
#	myfunc(driver, 42, "blah", "192.168.7.42")
#
# string substitution looks for occurences of @nodename:varname, and will
# consult the indicated node ("server" in this case), and look for the
# named string resource ("ipv4_address"). If set, the reference will
# be replaced with the value of this resource. If not, expansion will
# fail, and the test will be skipped.
#
# The doc string for the test case contains the unexpanded list
# of arguments, ie
# "do something with 42 blah @server:ipv4_address"
##################################################################

##################################################################
# Local rpcinfo:
#  On the server, exercise rpcinfo against the server's rpcbind.
##################################################################
def __rpcinfo_local_test(driver, args):
	'''local: verify that rpcinfo @ARGS works'''

	# Locate rpcinfo on the server
	rpcinfo = driver.server.requireExecutable("rpcinfo")

	rpcinfo.runOrFail(" ".join(args))

susetest.define_parameterized(__rpcinfo_local_test)
susetest.define_parameterized(__rpcinfo_local_test, "@server:ipv4_loopback")
susetest.define_parameterized(__rpcinfo_local_test, "@server:ipv6_loopback")
susetest.define_parameterized(__rpcinfo_local_test, "-p")
susetest.define_parameterized(__rpcinfo_local_test, "-p", "@server:ipv4_loopback")
susetest.define_parameterized(__rpcinfo_local_test, "-p", "@server:ipv6_loopback")
susetest.define_parameterized(__rpcinfo_local_test, "-m")
susetest.define_parameterized(__rpcinfo_local_test, "-m", "@server:ipv4_loopback")
susetest.define_parameterized(__rpcinfo_local_test, "-m", "@server:ipv6_loopback")
susetest.define_parameterized(__rpcinfo_local_test, "-T", "local", "ignoreme", "portmapper")
susetest.define_parameterized(__rpcinfo_local_test, "-s")

##################################################################
# Remote rpcinfo:
#  On the client, exercise rpcinfo against the server's rpcbind.
##################################################################
def __rpcinfo_remote_test(driver, args):
	'''remote: verify that remote rpcinfo @ARGS works'''

	# Locate rpcinfo on the client
	rpcinfo = driver.client.requireExecutable("rpcinfo")

	rpcinfo.runOrFail(" ".join(args))

susetest.define_parameterized(__rpcinfo_remote_test, "@server:ipv4_address")
susetest.define_parameterized(__rpcinfo_remote_test, "@server:ipv6_address")
susetest.define_parameterized(__rpcinfo_remote_test, "-T", "udp", "@server:ipv4_address", "portmapper")
susetest.define_parameterized(__rpcinfo_remote_test, "-T", "tcp", "@server:ipv4_address", "portmapper")
susetest.define_parameterized(__rpcinfo_remote_test, "-T", "udp6", "@server:ipv6_address", "portmapper")
susetest.define_parameterized(__rpcinfo_remote_test, "-T", "tcp6", "@server:ipv6_address", "portmapper")

##################################################################
# Test simple RPC server
##################################################################
@susetest.test
def rpc_square_server(driver):
	'''square.start: ensure that we can start rpc.squared'''

	daemon = driver.server.requireExecutable("rpc.squared")
	daemon.runOrFail()

@susetest.test
def rpc_square_call(driver):
	'''square.call: ensure that we can talk to the RPC square service'''

	client = driver.client
	server = driver.server

	st = client.runOrFail("square -h %s 11" % server.ipv4_address)
	if not st:
		return
	
	words = st.stdoutString.split()
	s = " ".join(words)
	if s == "11^2 = 121":
		client.logInfo("Good, got the expected result \"%s\"" % s)
	else:
		client.logFailure("Unexpected result \"%s\"" % s)

@susetest.test
def rpc_square_server(driver):
	'''square.stop: ensure that we can stop rpc.squared'''

	daemon = driver.server.requireExecutable("rpc.squared")
	daemon.runOrFail("-K")

if __name__ == '__main__':
	susetest.perform()
