tests: enable additional ruff Python lint options

These all seem reasonable to enable for this code.
This commit is contained in:
Dan Fandrich 2024-09-26 14:31:39 -07:00
parent 223fb00a78
commit 57cc523378
15 changed files with 142 additions and 177 deletions

View File

@ -73,7 +73,7 @@ jobs:
run: find . -name '*.py' -exec pytype -j auto -k {} +
- name: ruff
run: ruff check
run: ruff check --extend-select=B007,B016,C405,C416,COM818,D200,D213,D204,D401,D415,FURB129,N818,PERF401,PERF403,PIE790,PIE808,PLW0127,Q004,RUF010,SIM101,SIM117,SIM118,TRY400,TRY401
reuse:
runs-on: ubuntu-latest

View File

@ -24,7 +24,7 @@
#
###########################################################################
#
""" DICT server """
"""DICT server."""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
@ -50,10 +50,7 @@ VERIFIED_RSP = "WE ROOLZ: {pid}"
def dictserver(options):
"""
Starts up a TCP server with a DICT handler and serves DICT requests
forever.
"""
"""Start up a TCP server with a DICT handler and serve DICT requests forever."""
if options.pidfile:
pid = os.getpid()
# see tests/server/util.c function write_pidfile
@ -74,13 +71,10 @@ def dictserver(options):
class DictHandler(socketserver.BaseRequestHandler):
"""Handler class for DICT connections.
"""Handler class for DICT connections."""
"""
def handle(self):
"""
Simple function which responds to all queries with a 552.
"""
"""Respond to all queries with a 552."""
try:
# First, send a response to allow the server to continue.
rsp = "220 dictserver <xnooptions> <msgid@msgid>\n"
@ -133,9 +127,7 @@ def get_options():
def setup_logging(options):
"""
Set up logging from the command line options
"""
"""Set up logging from the command line options."""
root_logger = logging.getLogger()
add_stdout = False
@ -166,16 +158,13 @@ def setup_logging(options):
class ScriptRC(object):
"""Enum for script return codes"""
"""Enum for script return codes."""
SUCCESS = 0
FAILURE = 1
EXCEPTION = 2
class ScriptException(Exception):
pass
if __name__ == '__main__':
# Get the options from the user.
options = get_options()
@ -186,8 +175,8 @@ if __name__ == '__main__':
# Run main script.
try:
rc = dictserver(options)
except Exception as e:
log.exception(e)
except Exception:
log.exception('Error running server')
rc = ScriptRC.EXCEPTION
if options.pidfile and os.path.isfile(options.pidfile):

View File

@ -38,7 +38,7 @@ from testenv import Env, Httpd, Nghttpx, CurlClient, Caddy, ExecResult, NghttpxQ
log = logging.getLogger(__name__)
class ScoreCardException(Exception):
class ScoreCardError(Exception):
pass
@ -78,7 +78,7 @@ class ScoreCard:
c_samples = []
hs_samples = []
errors = []
for i in range(sample_size):
for _ in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
args = [
'--http3-only' if proto == 'h3' else '--http2',
@ -126,7 +126,7 @@ class ScoreCard:
errors = []
profiles = []
self.info('single...')
for i in range(sample_size):
for _ in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
with_headers=False, with_profile=True)
@ -153,7 +153,7 @@ class ScoreCard:
profiles = []
url = f'{url}?[0-{count - 1}]'
self.info('serial...')
for i in range(sample_size):
for _ in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
with_headers=False, with_profile=True)
@ -181,7 +181,7 @@ class ScoreCard:
max_parallel = self._download_parallel if self._download_parallel > 0 else count
url = f'{url}?[0-{count - 1}]'
self.info('parallel...')
for i in range(sample_size):
for _ in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
with_headers=False,
@ -281,7 +281,7 @@ class ScoreCard:
errors = []
profiles = []
self.info('single...')
for i in range(sample_size):
for _ in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_put(urls=[url], fdata=fpath, alpn_proto=proto,
with_headers=False, with_profile=True)
@ -308,7 +308,7 @@ class ScoreCard:
profiles = []
url = f'{url}?id=[0-{count - 1}]'
self.info('serial...')
for i in range(sample_size):
for _ in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_put(urls=[url], fdata=fpath, alpn_proto=proto,
with_headers=False, with_profile=True)
@ -336,7 +336,7 @@ class ScoreCard:
max_parallel = count
url = f'{url}?id=[0-{count - 1}]'
self.info('parallel...')
for i in range(sample_size):
for _ in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_put(urls=[url], fdata=fpath, alpn_proto=proto,
with_headers=False, with_profile=True,
@ -433,7 +433,7 @@ class ScoreCard:
'--parallel', '--parallel-max', str(max_parallel)
])
self.info(f'{max_parallel}...')
for i in range(sample_size):
for _ in range(sample_size):
curl = CurlClient(env=self.env, silent=self._silent_curl)
r = curl.http_download(urls=[url], alpn_proto=proto, no_save=True,
with_headers=False, with_profile=True,
@ -515,7 +515,7 @@ class ScoreCard:
if proto == 'h3':
p['name'] = 'h3'
if not self.env.have_h3_curl():
raise ScoreCardException('curl does not support HTTP/3')
raise ScoreCardError('curl does not support HTTP/3')
for lib in ['ngtcp2', 'quiche', 'msh3', 'nghttp3']:
if self.env.curl_uses_lib(lib):
p['implementation'] = lib
@ -523,7 +523,7 @@ class ScoreCard:
elif proto == 'h2':
p['name'] = 'h2'
if not self.env.have_h2_curl():
raise ScoreCardException('curl does not support HTTP/2')
raise ScoreCardError('curl does not support HTTP/2')
for lib in ['nghttp2', 'hyper']:
if self.env.curl_uses_lib(lib):
p['implementation'] = lib
@ -534,10 +534,10 @@ class ScoreCard:
p['implementation'] = 'hyper' if self.env.curl_uses_lib('hyper')\
else 'native'
else:
raise ScoreCardException(f"unknown protocol: {proto}")
raise ScoreCardError(f"unknown protocol: {proto}")
if 'implementation' not in p:
raise ScoreCardException(f'did not recognized {p} lib')
raise ScoreCardError(f'did not recognized {p} lib')
p['version'] = Env.curl_lib_version(p['implementation'])
score = {
@ -599,7 +599,7 @@ class ScoreCard:
m_names = {}
mcol_width = 12
mcol_sw = 17
for server, server_score in score['downloads'].items():
for server_score in score['downloads'].values():
for sskey, ssval in server_score.items():
if isinstance(ssval, str):
continue
@ -621,7 +621,7 @@ class ScoreCard:
size_score = score['downloads'][server][size]
print(f' {server:<8} {size:>8}', end='')
errors = []
for key, val in size_score.items():
for val in size_score.values():
if 'errors' in val:
errors.extend(val['errors'])
for m in measures:
@ -644,7 +644,7 @@ class ScoreCard:
m_names = {}
mcol_width = 12
mcol_sw = 17
for server, server_score in score['uploads'].items():
for server_score in score['uploads'].values():
for sskey, ssval in server_score.items():
if isinstance(ssval, str):
continue
@ -666,7 +666,7 @@ class ScoreCard:
size_score = score['uploads'][server][size]
print(f' {server:<8} {size:>8}', end='')
errors = []
for key, val in size_score.items():
for val in size_score.values():
if 'errors' in val:
errors.extend(val['errors'])
for m in measures:
@ -694,11 +694,11 @@ class ScoreCard:
for server in score['requests']:
server_score = score['requests'][server]
for sskey, ssval in server_score.items():
if isinstance(ssval, str) or isinstance(ssval, int):
if isinstance(ssval, (str, int)):
continue
if sskey not in sizes:
sizes.append(sskey)
for mkey, mval in server_score[sskey].items():
for mkey in server_score[sskey]:
if mkey not in measures:
measures.append(mkey)
m_names[mkey] = f'{mkey}'
@ -715,7 +715,7 @@ class ScoreCard:
count = score['requests'][server]['count']
print(f' {server:<8} {size:>6} {count:>6}', end='')
errors = []
for key, val in size_score.items():
for val in size_score.values():
if 'errors' in val:
errors.extend(val['errors'])
for m in measures:
@ -864,8 +864,8 @@ def main():
else:
card.print_score(score)
except ScoreCardException as ex:
sys.stderr.write(f"ERROR: {str(ex)}\n")
except ScoreCardError as ex:
sys.stderr.write(f"ERROR: {ex}\n")
rv = 1
except KeyboardInterrupt:
log.warning("aborted")

View File

@ -183,7 +183,7 @@ class TestCaddy:
r = curl.http_upload(urls=[url], data=data, alpn_proto=proto,
extra_args=['--parallel'])
r.check_stats(count=count, http_status=200, exitcode=0)
for i in range(0,count):
for i in range(count):
respdata = open(curl.response_file(i)).readlines()
assert respdata == [data]

View File

@ -139,12 +139,12 @@ class TestInfo:
# check timings reported on a transfer for consistency
url = s['url_effective']
# all stat keys which reporting timings
all_keys = set([
all_keys = {
'time_appconnect', 'time_connect', 'time_redirect',
'time_pretransfer', 'time_starttransfer', 'time_total'
])
}
# stat keys where we expect a positive value
pos_keys = set(['time_pretransfer', 'time_starttransfer', 'time_total'])
pos_keys = {'time_pretransfer', 'time_starttransfer', 'time_total'}
if s['num_connects'] > 0:
pos_keys.add('time_connect')
if url.startswith('https:'):

View File

@ -266,12 +266,10 @@ class TestSSLUse:
@staticmethod
def gen_test_17_09_list():
ret = []
for tls_proto in ['TLSv1', 'TLSv1.1', 'TLSv1.2', 'TLSv1.3']:
for max_ver in range(0, 5):
for min_ver in range(-2, 4):
ret.append([tls_proto, max_ver, min_ver])
return ret
return [[tls_proto, max_ver, min_ver]
for tls_proto in ['TLSv1', 'TLSv1.1', 'TLSv1.2', 'TLSv1.3']
for max_ver in range(5)
for min_ver in range(-2, 4)]
@pytest.mark.parametrize("tls_proto, max_ver, min_ver", gen_test_17_09_list())
def test_17_09_ssl_min_max(self, env: Env, httpd, tls_proto, max_ver, min_ver):

View File

@ -353,7 +353,9 @@ class TestCA:
valid_from: timedelta = timedelta(days=-1),
valid_to: timedelta = timedelta(days=89),
) -> Credentials:
"""Create a certificate signed by this CA for the given domains.
"""
Create a certificate signed by this CA for the given domains.
:returns: the certificate and private key PEM file paths
"""
if spec.domains and len(spec.domains):
@ -381,7 +383,7 @@ class TestCA:
elif common_name:
name_pieces.append(x509.NameAttribute(NameOID.COMMON_NAME, common_name))
if parent:
name_pieces.extend([rdn for rdn in parent])
name_pieces.extend(list(parent))
return x509.Name(name_pieces)
@staticmethod
@ -522,7 +524,6 @@ class TestCA:
valid_from: timedelta = timedelta(days=-1),
valid_to: timedelta = timedelta(days=89),
) -> Credentials:
name = name
pkey = _private_key(key_type=key_type)
subject = TestCA._make_x509_name(common_name=name, parent=issuer.subject)
csr = TestCA._make_csr(subject=subject,

View File

@ -95,13 +95,12 @@ class LocalClient:
if key in os.environ and key not in run_env:
run_env[key] = os.environ[key]
try:
with open(self._stdoutfile, 'w') as cout:
with open(self._stderrfile, 'w') as cerr:
p = subprocess.run(myargs, stderr=cerr, stdout=cout,
cwd=self._run_dir, shell=False,
input=None, env=run_env,
timeout=self._timeout)
exitcode = p.returncode
with open(self._stdoutfile, 'w') as cout, open(self._stderrfile, 'w') as cerr:
p = subprocess.run(myargs, stderr=cerr, stdout=cout,
cwd=self._run_dir, shell=False,
input=None, env=run_env,
timeout=self._timeout)
exitcode = p.returncode
except subprocess.TimeoutExpired:
log.warning(f'Timeout after {self._timeout}s: {args}')
exitcode = -1

View File

@ -120,20 +120,16 @@ class RunTcpDump:
def stats(self) -> Optional[List[str]]:
if self._proc:
raise Exception('tcpdump still running')
lines = []
for line in open(self._stdoutfile).readlines():
if re.match(r'.* IP 127\.0\.0\.1\.\d+ [<>] 127\.0\.0\.1\.\d+:.*', line):
lines.append(line)
return lines
return [line
for line in open(self._stdoutfile)
if re.match(r'.* IP 127\.0\.0\.1\.\d+ [<>] 127\.0\.0\.1\.\d+:.*', line)]
def stats_excluding(self, src_port) -> Optional[List[str]]:
if self._proc:
raise Exception('tcpdump still running')
lines = []
for line in self.stats:
if not re.match(r'.* IP 127\.0\.0\.1\.' + str(src_port) + ' >.*', line):
lines.append(line)
return lines
return [line
for line in self.stats
if not re.match(r'.* IP 127\.0\.0\.1\.' + str(src_port) + ' >.*', line)]
@property
def stderr(self) -> List[str]:
@ -157,20 +153,19 @@ class RunTcpDump:
args.extend([
tcpdump, '-i', local_if, '-n', 'tcp[tcpflags] & (tcp-rst)!=0'
])
with open(self._stdoutfile, 'w') as cout:
with open(self._stderrfile, 'w') as cerr:
self._proc = subprocess.Popen(args, stdout=cout, stderr=cerr,
text=True, cwd=self._run_dir,
shell=False)
assert self._proc
assert self._proc.returncode is None
while self._proc:
try:
self._proc.wait(timeout=1)
except subprocess.TimeoutExpired:
pass
except Exception as e:
log.error(f'Tcpdump: {e}')
with open(self._stdoutfile, 'w') as cout, open(self._stderrfile, 'w') as cerr:
self._proc = subprocess.Popen(args, stdout=cout, stderr=cerr,
text=True, cwd=self._run_dir,
shell=False)
assert self._proc
assert self._proc.returncode is None
while self._proc:
try:
self._proc.wait(timeout=1)
except subprocess.TimeoutExpired:
pass
except Exception:
log.exception('Tcpdump')
def start(self):
def do_sample():
@ -230,7 +225,7 @@ class ExecResult:
self._stats.append(json.loads(line))
# TODO: specify specific exceptions here
except: # noqa: E722
log.error(f'not a JSON stat: {line}')
log.exception(f'not a JSON stat: {line}')
break
@property
@ -771,39 +766,38 @@ class CurlClient:
tcpdump = RunTcpDump(self.env, self._run_dir)
tcpdump.start()
try:
with open(self._stdoutfile, 'w') as cout:
with open(self._stderrfile, 'w') as cerr:
if with_profile:
end_at = started_at + timedelta(seconds=self._timeout) \
if self._timeout else None
log.info(f'starting: {args}')
p = subprocess.Popen(args, stderr=cerr, stdout=cout,
cwd=self._run_dir, shell=False,
env=self._run_env)
profile = RunProfile(p.pid, started_at, self._run_dir)
if intext is not None and False:
p.communicate(input=intext.encode(), timeout=1)
ptimeout = 0.0
while True:
try:
p.wait(timeout=ptimeout)
break
except subprocess.TimeoutExpired:
if end_at and datetime.now() >= end_at:
p.kill()
raise subprocess.TimeoutExpired(cmd=args, timeout=self._timeout)
profile.sample()
ptimeout = 0.01
exitcode = p.returncode
profile.finish()
log.info(f'done: exit={exitcode}, profile={profile}')
else:
p = subprocess.run(args, stderr=cerr, stdout=cout,
cwd=self._run_dir, shell=False,
input=intext.encode() if intext else None,
timeout=self._timeout,
env=self._run_env)
exitcode = p.returncode
with open(self._stdoutfile, 'w') as cout, open(self._stderrfile, 'w') as cerr:
if with_profile:
end_at = started_at + timedelta(seconds=self._timeout) \
if self._timeout else None
log.info(f'starting: {args}')
p = subprocess.Popen(args, stderr=cerr, stdout=cout,
cwd=self._run_dir, shell=False,
env=self._run_env)
profile = RunProfile(p.pid, started_at, self._run_dir)
if intext is not None and False:
p.communicate(input=intext.encode(), timeout=1)
ptimeout = 0.0
while True:
try:
p.wait(timeout=ptimeout)
break
except subprocess.TimeoutExpired:
if end_at and datetime.now() >= end_at:
p.kill()
raise subprocess.TimeoutExpired(cmd=args, timeout=self._timeout)
profile.sample()
ptimeout = 0.01
exitcode = p.returncode
profile.finish()
log.info(f'done: exit={exitcode}, profile={profile}')
else:
p = subprocess.run(args, stderr=cerr, stdout=cout,
cwd=self._run_dir, shell=False,
input=intext.encode() if intext else None,
timeout=self._timeout,
env=self._run_env)
exitcode = p.returncode
except subprocess.TimeoutExpired:
now = datetime.now()
duration = now - started_at
@ -857,7 +851,6 @@ class CurlClient:
args.extend(['-v', '--trace-ids', '--trace-time'])
if self.env.verbose > 1:
args.extend(['--trace-config', 'http/2,http/3,h2-proxy,h1-proxy'])
pass
active_options = options
if options is not None and '--next' in options:

View File

@ -213,8 +213,8 @@ class EnvConfig:
log.error(f'{self.apxs} failed to query HTTPD_VERSION: {p}')
else:
self._httpd_version = p.stdout.strip()
except Exception as e:
log.error(f'{self.apxs} failed to run: {e}')
except Exception:
log.exception(f'{self.apxs} failed to run')
return self._httpd_version
def versiontuple(self, v):
@ -563,7 +563,7 @@ class Env:
def make_data_file(self, indir: str, fname: str, fsize: int,
line_length: int = 1024) -> str:
if line_length < 11:
raise 'line_length less than 11 not supported'
raise RuntimeError('line_length less than 11 not supported')
fpath = os.path.join(indir, fname)
s10 = "0123456789"
s = round((line_length / 10) + 1) * s10

View File

@ -117,9 +117,7 @@ class Httpd:
self._proxy_auth_basic = active
def _run(self, args, intext=''):
env = {}
for key, val in os.environ.items():
env[key] = val
env = os.environ.copy()
env['APACHE_RUN_DIR'] = self._run_dir
env['APACHE_RUN_USER'] = os.environ['USER']
env['APACHE_LOCK_DIR'] = self._lock_dir
@ -252,7 +250,7 @@ class Httpd:
if os.path.exists(os.path.join(self._mods_dir, f'mod_{m}.so')):
fd.write(f'LoadModule {m}_module "{self._mods_dir}/mod_{m}.so"\n')
if Httpd.MOD_CURLTEST is not None:
fd.write(f'LoadModule curltest_module \"{Httpd.MOD_CURLTEST}\"\n')
fd.write(f'LoadModule curltest_module "{Httpd.MOD_CURLTEST}"\n')
conf = [ # base server config
f'ServerRoot "{self._apache_dir}"',
'DefaultRuntimeDir logs',

View File

@ -164,7 +164,7 @@ class Nghttpx:
def _write_config(self):
with open(self._conf_file, 'w') as fd:
fd.write('# nghttpx test config'),
fd.write('# nghttpx test config')
fd.write("\n".join([
'# do we need something here?'
]))

View File

@ -21,7 +21,7 @@
#
# SPDX-License-Identifier: curl
#
""" A telnet server which negotiates"""
"""A telnet server which negotiates."""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
@ -50,10 +50,7 @@ VERIFIED_RSP = "WE ROOLZ: {pid}"
def telnetserver(options):
"""
Starts up a TCP server with a telnet handler and serves DICT requests
forever.
"""
"""Start up a TCP server with a telnet handler and serve DICT requests forever."""
if options.pidfile:
pid = os.getpid()
# see tests/server/util.c function write_pidfile
@ -74,13 +71,10 @@ def telnetserver(options):
class NegotiatingTelnetHandler(socketserver.BaseRequestHandler):
"""Handler class for Telnet connections.
"""Handler class for Telnet connections."""
"""
def handle(self):
"""
Negotiates options before reading data.
"""
"""Negotiates options before reading data."""
neg = Negotiator(self.request)
try:
@ -134,7 +128,7 @@ class Negotiator(object):
def recv(self, bytes):
"""
Read bytes from TCP, handling negotiation sequences
Read bytes from TCP, handling negotiation sequences.
:param bytes: Number of bytes to read
:return: a buffer of bytes
@ -256,7 +250,7 @@ class NegBase(object):
@classmethod
def from_val(cls, val):
for k in cls.__dict__.keys():
for k in cls.__dict__:
if getattr(cls, k) == val:
return k
@ -314,9 +308,7 @@ def get_options():
def setup_logging(options):
"""
Set up logging from the command line options
"""
"""Set up logging from the command line options."""
root_logger = logging.getLogger()
add_stdout = False
@ -349,16 +341,13 @@ def setup_logging(options):
class ScriptRC(object):
"""Enum for script return codes"""
"""Enum for script return codes."""
SUCCESS = 0
FAILURE = 1
EXCEPTION = 2
class ScriptException(Exception):
pass
if __name__ == '__main__':
# Get the options from the user.
options = get_options()
@ -369,8 +358,8 @@ if __name__ == '__main__':
# Run main script.
try:
rc = telnetserver(options)
except Exception as e:
log.exception(e)
except Exception:
log.exception('Error in telnet server')
rc = ScriptRC.EXCEPTION
if options.pidfile and os.path.isfile(options.pidfile):

View File

@ -21,7 +21,7 @@
#
# SPDX-License-Identifier: curl
#
"""Server for testing SMB"""
"""Server for testing SMB."""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
@ -63,7 +63,8 @@ VERIFIED_RSP = "WE ROOLZ: {pid}\n"
class ShutdownHandler(threading.Thread):
"""Cleanly shut down the SMB server
"""
Cleanly shut down the SMB server.
This can only be done from another thread while the server is in
serve_forever(), so a thread is spawned here that waits for a shutdown
@ -106,9 +107,7 @@ class ShutdownHandler(threading.Thread):
def smbserver(options):
"""Start up a TCP SMB server that serves forever
"""
"""Start up a TCP SMB server that serves forever."""
if options.pidfile:
pid = os.getpid()
# see tests/server/util.c function write_pidfile
@ -142,7 +141,7 @@ def smbserver(options):
smb_config.set("TESTS", "path", TESTS_MAGIC)
if not options.srcdir or not os.path.isdir(options.srcdir):
raise ScriptException("--srcdir is mandatory")
raise ScriptError("--srcdir is mandatory")
test_data_dir = os.path.join(options.srcdir, "data")
@ -191,7 +190,7 @@ class TestSmbServer(imp_smbserver.SMBSERVER):
"""
conn_data = smb_server.getConnectionData(conn_id)
# Wrap processing in a try block which allows us to throw SmbException
# Wrap processing in a try block which allows us to throw SmbError
# to control the flow.
try:
ncax_parms = imp_smb.SMBNtCreateAndX_Parameters(
@ -207,7 +206,7 @@ class TestSmbServer(imp_smbserver.SMBSERVER):
# Currently we only support reading files.
if disposition != imp_smb.FILE_OPEN:
raise SmbException(STATUS_ACCESS_DENIED,
raise SmbError(STATUS_ACCESS_DENIED,
"Only support reading files")
# Check to see if the path we were given is actually a
@ -261,7 +260,7 @@ class TestSmbServer(imp_smbserver.SMBSERVER):
level=imp_smb.SMB_QUERY_FILE_ALL_INFO)
if error_code != STATUS_SUCCESS:
raise SmbException(error_code, "Failed to query path info")
raise SmbError(error_code, "Failed to query path info")
resp_parms["CreateTime"] = resp_info["CreationTime"]
resp_parms["LastAccessTime"] = resp_info[
@ -282,8 +281,8 @@ class TestSmbServer(imp_smbserver.SMBSERVER):
conn_data["OpenedFiles"][fakefid]["FileName"] = path
conn_data["OpenedFiles"][fakefid]["DeleteOnClose"] = False
except SmbException as s:
log.debug("[SMB] SmbException hit: %s", s)
except SmbError as s:
log.debug("[SMB] SmbError hit: %s", s)
error_code = s.error_code
resp_parms = ""
resp_data = ""
@ -307,10 +306,10 @@ class TestSmbServer(imp_smbserver.SMBSERVER):
if "path" in conn_shares[tid]:
path = conn_shares[tid]["path"]
else:
raise SmbException(STATUS_ACCESS_DENIED,
raise SmbError(STATUS_ACCESS_DENIED,
"Connection share had no path")
else:
raise SmbException(imp_smbserver.STATUS_SMB_BAD_TID,
raise SmbError(imp_smbserver.STATUS_SMB_BAD_TID,
"TID was invalid")
return path
@ -319,7 +318,7 @@ class TestSmbServer(imp_smbserver.SMBSERVER):
log.debug("[SMB] Get server path '%s'", requested_filename)
if requested_filename not in [VERIFIED_REQ]:
raise SmbException(STATUS_NO_SUCH_FILE, "Couldn't find the file")
raise SmbError(STATUS_NO_SUCH_FILE, "Couldn't find the file")
fid, filename = tempfile.mkstemp()
log.debug("[SMB] Created %s (%d) for storing '%s'",
@ -360,23 +359,24 @@ class TestSmbServer(imp_smbserver.SMBSERVER):
except Exception:
log.exception("Failed to make test file")
raise SmbException(STATUS_NO_SUCH_FILE, "Failed to make test file")
raise SmbError(STATUS_NO_SUCH_FILE, "Failed to make test file")
class SmbException(Exception):
class SmbError(Exception):
def __init__(self, error_code, error_message):
super(SmbException, self).__init__(error_message)
super(SmbError, self).__init__(error_message)
self.error_code = error_code
class ScriptRC(object):
"""Enum for script return codes"""
"""Enum for script return codes."""
SUCCESS = 0
FAILURE = 1
EXCEPTION = 2
class ScriptException(Exception):
class ScriptError(Exception):
pass
@ -402,9 +402,7 @@ def get_options():
def setup_logging(options):
"""
Set up logging from the command line options
"""
"""Set up logging from the command line options."""
root_logger = logging.getLogger()
add_stdout = False
@ -444,8 +442,8 @@ if __name__ == '__main__':
# Run main script.
try:
rc = smbserver(options)
except Exception as e:
log.exception(e)
except Exception:
log.exception('Error in SMB server')
rc = ScriptRC.EXCEPTION
if options.pidfile and os.path.isfile(options.pidfile):

View File

@ -21,7 +21,7 @@
#
# SPDX-License-Identifier: curl
#
"""Module for extracting test data from the test data folder and other utils"""
"""Module for extracting test data from the test data folder and other utils."""
from __future__ import (absolute_import, division, print_function,
unicode_literals)