I made lots of changes to the script presented in my previous question. I was tempted to edit that one with the new code, but it would invalidate @200_success's helpful answer. It was also disallowed on a Meta thread. So I apologize for the new question, but it seemed like the right thing to do.
Again, any and all tips are appreciated! Also, this is my first time writing any docstrings, so if I'm breaking convention, let me know.
#!/usr/bin/env python3 import argparse import errno import functools import multiprocessing import os import platform import socket import time from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ProcessPoolExecutor DEFAULT_HOST = '127.0.0.1' DEFAULT_TIMEOUT = 1 DEFAULT_THREADS = 512 PORT_RANGE = range(1, 65536) def tcp_ping(host, port): """ Attempts to connect to host:port via TCP. Arguments: host: IP address or URL to hit port: Port to hit Returns: port number, if it's available; otherwise False """ try: with socket.socket() as sock: sock.connect((host, port)) print(str(port) + ' Open') return port except socket.timeout: return False except socket.error as socket_error: if socket_error.errno == errno.ECONNREFUSED: return False raise def perform_scan(host, use_threads = False): """ Perform a scan on all valid ports (1 - 65535), either by spawning threads or forking processes. Arguments: host: IP address or URL to scan use_threads: whether or not to use threads; default behaviour is to fork processes Returns: list of available ports """ if use_threads: executor = ThreadPoolExecutor(max_workers = DEFAULT_THREADS) else: executor = ProcessPoolExecutor() with executor: ping_partial = functools.partial(tcp_ping, host) return list(filter(bool, executor.map(ping_partial, PORT_RANGE))) def is_address_valid(host): """ Validate the host's existence by attempting to establish a connection to it on port 80 (HTTP). Arguments: host: IP address or URL to validate Returns: bool indicating whether the host is valid """ try: with socket.socket() as sock: sock.connect((host, 80)) return True except socket.gaierror: return False except (socket.timeout, socket.error): return True def scan_ports(host = DEFAULT_HOST, timeout = DEFAULT_TIMEOUT): """ Scan all possible ports on the specified host. If the operating system is detected as Windows, the ports will be scanned by spawning threads. Otherwise, new processes will be forked. Arguments: host: IP address or URL to scan timeout: connection timeout when testing a port """ # Set defaults if CLI arguments were passed in but not specified if host is None: host = DEFAULT_HOST if timeout is None: timeout = DEFAULT_TIMEOUT # Strip the protocol from the URL, if present if '://' in host: host = host[host.index('://') + 3:] # Set the timeout for all subsequent connections socket.setdefaulttimeout(timeout) # Validate the IP/host by testing a connection if not is_address_valid(host): print('DNS lookup for \'' + host + '\' failed.') return # Perform the scan. On Windows, thread. On all others, fork. print('Scanning ' + host + ' ...') start_time = time.clock() if os.name == 'nt': print('Running on Windows OS.') available_ports = perform_scan(host, use_threads = True) elif os.name == 'posix': print('Running on *Nix OS.') available_ports = perform_scan(host) else: print('Unidentified operating system: ' + os.name + ' [' + platform.system() + ' ' + platform.version() + ']') available_ports = perform_scan(host) end_time = time.clock() print('Done.') # Display results print() print('Time elapsed: ' + format(end_time - start_time, '.2f') + ' sec') available_ports.sort() print() print(str(len(available_ports)) + ' ports available.') print(available_ports) def main(): arg_parser = argparse.ArgumentParser() arg_parser.add_argument('-ip', '--host', help = 'IP address/host to scan') arg_parser.add_argument('-t', '--timeout', help = 'Connection timeout in seconds', type = int) args = arg_parser.parse_args() scan_ports(args.host, args.timeout) if __name__ == '__main__': main()