Questo script è una versione modificata del vecchio script ed è compatibile con python 3.11.9
from __future__ import division
__author__ = "GabrielePRG"
__license__ = "MIT"
__email__ = "info@gabriele.ovh"
import argparse
import errno
import os
import select
import socket
import struct
import sys
import time
HANDSHAKE_BYTES = struct.pack("20s", b"SWKH")
_HANDSHAKE_SERVER_PORT = 9978
_TFTP_SERVER_PORT = 69
_TIME_FMT = "%c"
_DEFAULT_BLOCK_SIZE = 512
# Dizionario per memorizzare i messaggi in diverse lingue
MESSAGES = {
'en': {
'setting_block_size': "Setting block size to %d",
'serving': "Serving %d-byte %s (block size %d, %d blocks)",
'error_reading_file': "Error: can't read %s",
'file_not_found': "Please download/move it to the current working directory.",
'server_interrupted': "Server interrupted by user. Shutting down."
},
'it': {
'setting_block_size': "Impostazione della dimensione del blocco a %d",
'serving': "Servizio file di %d byte %s (dimensione blocco %d, %d blocchi)",
'error_reading_file': "Errore: impossibile leggere %s",
'file_not_found': "Si prega di scaricare/spostare il file nella directory di lavoro attuale.",
'server_interrupted': "Server interrotto dall'utente. Spegnimento."
}
}
class Error(Exception):
pass
class Server(object):
def __init__(self, handshake_addr, tftp_addr, filename, file_contents, lang):
self._file_contents = file_contents
self._filename = filename
self._language = lang
self._handshake_sock = self._bind(handshake_addr)
self._tftp_sock = self._bind(tftp_addr)
self._set_block_size(_DEFAULT_BLOCK_SIZE)
def _bind(self, addr):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
sock.bind(addr)
except socket.error as e:
raise Error(f"Unable to bind to the address {addr}: {str(e)}")
return sock
def _set_block_size(self, block_size):
print(MESSAGES[self._language]['setting_block_size'] % block_size)
self._block_size = block_size
self._total_blocks = (len(self._file_contents) + block_size - 1) // block_size
print(MESSAGES[self._language]['serving'] % (len(self._file_contents), self._filename, block_size, self._total_blocks))
def run_forever(self):
try:
while True:
readable, _, _ = select.select([self._handshake_sock, self._tftp_sock], [], [])
for sock in readable:
if sock is self._handshake_sock:
self._handle_handshake()
elif sock is self._tftp_sock:
self._handle_tftp()
except KeyboardInterrupt:
print(MESSAGES[self._language]['server_interrupted'])
self.close()
def _handle_handshake(self):
try:
data, addr = self._handshake_sock.recvfrom(1024) # Buffer size of 1024 bytes
if data == HANDSHAKE_BYTES:
self._handshake_sock.sendto(data, addr) # Echo back the received handshake data
print(f"{time.strftime(_TIME_FMT)}: Handshake acknowledged to {addr}.")
else:
print(f"{time.strftime(_TIME_FMT)}: Received non-handshake data from {addr}.")
except socket.error as e:
print(f"Error receiving handshake: {e}")
def _handle_tftp(self):
try:
data, addr = self._tftp_sock.recvfrom(65536) # Maximum size for TFTP packet
if data.startswith(self._tftp_rrq_prefix):
# Handle Read Request
self._process_read_request(data, addr)
elif data.startswith(self._TFTP_ACK_PREFIX):
# Handle Acknowledgement
block_number = struct.unpack(">H", data[2:])[0]
self._process_acknowledgement(block_number, addr)
else:
print(f"{time.strftime(_TIME_FMT)}: Received unknown TFTP packet from {addr}.")
except socket.error as e:
print(f"Error receiving TFTP data: {e}")
def _process_read_request(self, data, addr):
options = self._parse_options(data)
# Potentially adjust settings based on options, such as block size
# Begin sending data blocks starting from block 1
self._tftp_send_data(1, addr)
def _process_acknowledgement(self, block_number, addr):
# Send the next block, if any
next_block = block_number + 1
if next_block <= self._total_blocks:
self._tftp_send_data(next_block, addr)
else:
print(f"{time.strftime(_TIME_FMT)}: All blocks sent successfully to {addr}.")
def _tftp_send_data(self, block_number, addr):
start_byte = (block_number - 1) * self._block_size
end_byte = start_byte + self._block_size
block_data = self._file_contents[start_byte:end_byte]
packet = struct.pack(">HH", self._TFTP_OPCODE_DATA, block_number) + block_data
self._tftp_sock.sendto(packet, addr)
print(f"{time.strftime(_TIME_FMT)}: Sent block {block_number} to {addr}.")
def close(self):
self._handshake_sock.close()
self._tftp_sock.close()
if __name__ == "__main__":
lang = input("Please choose a language (English 'en', Italian 'it'): ")
while lang not in MESSAGES:
print("Invalid language. Choose 'en' for English or 'it' for Italian.")
lang = input("Choose a language (en/it): ")
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--filename", default="digicap.dav", help="File to serve; used both to read from the local disk and for the filename to expect from client.")
parser.add_argument("--server-ip", default="192.168.0.128", help="IP address to serve from.")
args = parser.parse_args()
try:
with open(args.filename, mode="rb") as file:
file_contents = file.read()
except IOError as e:
print(MESSAGES[lang]['error_reading_file'] % args.filename)
if e.errno == errno.ENOENT:
print(MESSAGES[lang]['file_not_found'])
sys.exit(1)
try:
server = Server((args.server_ip, _HANDSHAKE_SERVER_PORT), (args.server_ip, _TFTP_SERVER_PORT), args.filename, file_contents, lang)
server.run_forever()
except Error as e:
print(e)
sys.exit(1)