tests: fix some Python typing issues

These otherwise raise errors in pytype. A few problematic methods
weren't being used and are deleted.
This commit is contained in:
Dan Fandrich 2024-09-26 11:58:57 -07:00
parent 0b864bde08
commit 2f3b7f20fb
13 changed files with 48 additions and 49 deletions

View File

@ -49,7 +49,7 @@ jobs:
- name: install
run: |
sudo apt-get install codespell python3-pip
sudo apt-get install codespell python3-pip python3-pytest
python3 -m pip install cmakelint==1.4.3
- name: spellcheck

View File

@ -37,7 +37,7 @@ import sys
from util import ClosingFileHandler
try: # Python 2
import SocketServer as socketserver
import SocketServer as socketserver # type: ignore
except ImportError: # Python 3
import socketserver

View File

@ -25,7 +25,7 @@
import logging
import os
import sys
from typing import Optional
from typing import Generator
import pytest
@ -81,7 +81,7 @@ def log_global_env_facts(record_testsuite_property, env):
@pytest.fixture(scope='package')
def httpd(env) -> Httpd:
def httpd(env) -> Generator[Httpd, None, None]:
httpd = Httpd(env=env)
if not httpd.exists():
pytest.skip(f'httpd not found: {env.httpd}')
@ -93,7 +93,7 @@ def httpd(env) -> Httpd:
@pytest.fixture(scope='package')
def nghttpx(env, httpd) -> Optional[Nghttpx]:
def nghttpx(env, httpd) -> Generator[Nghttpx, None, None]:
nghttpx = NghttpxQuic(env=env)
if env.have_h3():
nghttpx.clear_logs()
@ -102,7 +102,7 @@ def nghttpx(env, httpd) -> Optional[Nghttpx]:
nghttpx.stop()
@pytest.fixture(scope='package')
def nghttpx_fwd(env, httpd) -> Optional[Nghttpx]:
def nghttpx_fwd(env, httpd) -> Generator[Nghttpx, None, None]:
nghttpx = NghttpxFwd(env=env)
if env.have_h3():
nghttpx.clear_logs()

View File

@ -30,6 +30,7 @@ import logging
import os
import time
import pytest
from typing import List
from testenv import Env, CurlClient, LocalClient
@ -669,7 +670,7 @@ class TestUpload:
up_speed = r.stats[0]['speed_upload']
assert (speed_limit * 0.5) <= up_speed <= (speed_limit * 1.5), f'{r.stats[0]}'
def check_downloads(self, client, source: str, count: int,
def check_downloads(self, client, source: List[str], count: int,
complete: bool = True):
for i in range(count):
dfile = client.download_file(i)

View File

@ -28,6 +28,8 @@ import logging
import os
import socket
from threading import Thread
from typing import Generator
import pytest
from testenv import Env, CurlClient
@ -40,6 +42,7 @@ class UDSFaker:
def __init__(self, path):
self._uds_path = path
self._done = False
self._socket = None
@property
def path(self):
@ -88,7 +91,7 @@ Content-Length: 19
class TestUnix:
@pytest.fixture(scope="class")
def uds_faker(self, env: Env) -> UDSFaker:
def uds_faker(self, env: Env) -> Generator[UDSFaker, None, None]:
uds_path = os.path.join(env.gen_dir, 'uds_11.sock')
faker = UDSFaker(path=uds_path)
faker.start()

View File

@ -56,7 +56,7 @@ class Caddy:
return self._docs_dir
@property
def port(self) -> str:
def port(self) -> int:
return self.env.caddy_https_port
def clear_logs(self):
@ -141,8 +141,10 @@ class Caddy:
def _write_config(self):
domain1 = self.env.domain1
creds1 = self.env.get_credentials(domain1)
assert creds1 # convince pytype this isn't None
domain2 = self.env.domain2
creds2 = self.env.get_credentials(domain2)
assert creds2 # convince pytype this isn't None
self._mkpath(self._docs_dir)
self._mkpath(self._tmp_dir)
with open(os.path.join(self._docs_dir, 'data.json'), 'w') as fd:

View File

@ -126,6 +126,7 @@ class Credentials:
self._cert_file = None
self._pkey_file = None
self._store = None
self._combined_file = None
@property
def name(self) -> str:
@ -372,7 +373,7 @@ class TestCA:
return creds
@staticmethod
def _make_x509_name(org_name: str = None, common_name: str = None, parent: x509.Name = None) -> x509.Name:
def _make_x509_name(org_name: Optional[str] = None, common_name: Optional[str] = None, parent: x509.Name = None) -> x509.Name:
name_pieces = []
if org_name:
oid = NameOID.ORGANIZATIONAL_UNIT_NAME if parent else NameOID.ORGANIZATION_NAME
@ -388,8 +389,8 @@ class TestCA:
subject: x509.Name,
pkey: Any,
issuer_subject: Optional[Credentials],
valid_from_delta: timedelta = None,
valid_until_delta: timedelta = None
valid_from_delta: Optional[timedelta] = None,
valid_until_delta: Optional[timedelta] = None
):
pubkey = pkey.public_key()
issuer_subject = issuer_subject if issuer_subject is not None else subject
@ -468,7 +469,7 @@ class TestCA:
)
@staticmethod
def _add_client_usages(csr: Any, issuer: Credentials, rfc82name: str = None) -> Any:
def _add_client_usages(csr: Any, issuer: Credentials, rfc82name: Optional[str] = None) -> Any:
cert = csr.add_extension(
x509.BasicConstraints(ca=False, path_length=None),
critical=True,
@ -493,7 +494,7 @@ class TestCA:
@staticmethod
def _make_ca_credentials(name, key_type: Any,
issuer: Credentials = None,
issuer: Optional[Credentials] = None,
valid_from: timedelta = timedelta(days=-1),
valid_to: timedelta = timedelta(days=89),
) -> Credentials:

View File

@ -541,7 +541,7 @@ class CurlClient:
with_profile: bool = False,
with_tcpdump: bool = False,
no_save: bool = False,
extra_args: List[str] = None):
extra_args: Optional[List[str]] = None):
if extra_args is None:
extra_args = []
if no_save:
@ -656,7 +656,7 @@ class CurlClient:
with_profile: bool = False,
with_tcpdump: bool = False,
no_save: bool = False,
extra_args: List[str] = None):
extra_args: Optional[List[str]] = None):
if extra_args is None:
extra_args = []
if no_save:
@ -685,7 +685,7 @@ class CurlClient:
with_profile: bool = False,
with_tcpdump: bool = False,
no_save: bool = False,
extra_args: List[str] = None):
extra_args: Optional[List[str]] = None):
if extra_args is None:
extra_args = []
extra_args.extend([
@ -702,7 +702,7 @@ class CurlClient:
with_stats: bool = True,
with_profile: bool = False,
with_tcpdump: bool = False,
extra_args: List[str] = None):
extra_args: Optional[List[str]] = None):
if extra_args is None:
extra_args = []
if fupload is not None:
@ -732,7 +732,7 @@ class CurlClient:
with_stats: bool = True,
with_profile: bool = False,
with_tcpdump: bool = False,
extra_args: List[str] = None):
extra_args: Optional[List[str]] = None):
if extra_args is None:
extra_args = []
extra_args.extend([
@ -837,8 +837,6 @@ class CurlClient:
with_profile=with_profile, with_tcpdump=with_tcpdump)
if r.exit_code == 0 and with_headers:
self._parse_headerfile(self._headerfile, r=r)
if r.json:
r.response["json"] = r.json
return r
def _complete_args(self, urls, timeout=None, options=None,
@ -892,7 +890,7 @@ class CurlClient:
args.append(url)
return args
def _parse_headerfile(self, headerfile: str, r: ExecResult = None) -> ExecResult:
def _parse_headerfile(self, headerfile: str, r: Optional[ExecResult] = None) -> ExecResult:
lines = open(headerfile).readlines()
if r is None:
r = ExecResult(args=[], exit_code=0, stdout=[], stderr=[])

View File

@ -71,9 +71,9 @@ class EnvConfig:
if 'CURL' in os.environ:
self.curl = os.environ['CURL']
self.curl_props = {
'version': None,
'os': None,
'fullname': None,
'version': '',
'os': '',
'fullname': '',
'features': [],
'protocols': [],
'libs': [],
@ -331,7 +331,7 @@ class Env:
return 'unknown'
@staticmethod
def curl_lib_version_at_least(libname: str, min_version) -> str:
def curl_lib_version_at_least(libname: str, min_version) -> bool:
lversion = Env.curl_lib_version(libname)
if lversion != 'unknown':
return Env.CONFIG.versiontuple(min_version) <= \

View File

@ -217,10 +217,13 @@ class Httpd:
domain1 = self.env.domain1
domain1brotli = self.env.domain1brotli
creds1 = self.env.get_credentials(domain1)
assert creds1 # convince pytype this isn't None
domain2 = self.env.domain2
creds2 = self.env.get_credentials(domain2)
assert creds2 # convince pytype this isn't None
proxy_domain = self.env.proxy_domain
proxy_creds = self.env.get_credentials(proxy_domain)
assert proxy_creds # convince pytype this isn't None
self._mkpath(self._conf_dir)
self._mkpath(self._logs_dir)
self._mkpath(self._tmp_dir)

View File

@ -52,7 +52,6 @@ class Nghttpx:
self._error_log = os.path.join(self._run_dir, 'nghttpx.log')
self._stderr = os.path.join(self._run_dir, 'nghttpx.stderr')
self._tmp_dir = os.path.join(self._run_dir, 'tmp')
self._process = None
self._process: Optional[subprocess.Popen] = None
self._rmf(self._pid_file)
self._rmf(self._error_log)
@ -180,6 +179,8 @@ class NghttpxQuic(Nghttpx):
self._mkpath(self._tmp_dir)
if self._process:
self.stop()
creds = self.env.get_credentials(self.env.domain1)
assert creds # convince pytype this isn't None
args = [
self._cmd,
f'--frontend=*,{self.env.h3_port};quic',
@ -190,8 +191,8 @@ class NghttpxQuic(Nghttpx):
f'--errorlog-file={self._error_log}',
f'--conf={self._conf_file}',
f'--cacert={self.env.ca.cert_file}',
self.env.get_credentials(self.env.domain1).pkey_file,
self.env.get_credentials(self.env.domain1).cert_file,
creds.pkey_file,
creds.cert_file,
f'--frontend-http3-window-size=1M',
f'--frontend-http3-max-window-size=10M',
f'--frontend-http3-connection-window-size=10M',
@ -214,6 +215,8 @@ class NghttpxFwd(Nghttpx):
self._mkpath(self._tmp_dir)
if self._process:
self.stop()
creds = self.env.get_credentials(self.env.proxy_domain)
assert creds # convince pytype this isn't None
args = [
self._cmd,
f'--http2-proxy',
@ -224,8 +227,8 @@ class NghttpxFwd(Nghttpx):
f'--errorlog-file={self._error_log}',
f'--conf={self._conf_file}',
f'--cacert={self.env.ca.cert_file}',
self.env.get_credentials(self.env.proxy_domain).pkey_file,
self.env.get_credentials(self.env.proxy_domain).cert_file,
creds.pkey_file,
creds.cert_file,
]
ngerr = open(self._stderr, 'a')
self._process = subprocess.Popen(args=args, stderr=ngerr)

View File

@ -28,12 +28,12 @@ import inspect
import logging
import os
import subprocess
from datetime import timedelta, datetime
from json import JSONEncoder
import time
from typing import List, Union, Optional
from .curl import CurlClient, ExecResult
from datetime import datetime, timedelta
from .curl import CurlClient
from .env import Env
@ -73,7 +73,7 @@ class VsFTPD:
return self._docs_dir
@property
def port(self) -> str:
def port(self) -> int:
return self._port
def clear_logs(self):
@ -157,19 +157,6 @@ class VsFTPD:
log.error(f"Server still not responding after {timeout}")
return False
def _run(self, args, intext=''):
env = {}
for key, val in os.environ.items():
env[key] = val
with open(self._error_log, 'w') as cerr:
self._process = subprocess.run(args, stderr=cerr, stdout=cerr,
cwd=self._vsftpd_dir,
input=intext.encode() if intext else None,
env=env)
start = datetime.now()
return ExecResult(args=args, exit_code=self._process.returncode,
duration=datetime.now() - start)
def _rmf(self, path):
if os.path.exists(path):
return os.remove(path)
@ -200,6 +187,7 @@ class VsFTPD:
]
if self._with_ssl:
creds = self.env.get_credentials(self.domain)
assert creds # convince pytype this isn't None
conf.extend([
f'ssl_enable=YES',
f'debug_ssl=YES',

View File

@ -231,7 +231,7 @@ class TestSmbServer(imp_smbserver.SMBSERVER):
if path == SERVER_MAGIC:
fid, full_path = self.get_server_path(requested_file)
else:
assert (path == TESTS_MAGIC)
assert path == TESTS_MAGIC
fid, full_path = self.get_test_path(requested_file)
self.tmpfiles.append(full_path)