本文主要是介绍上传下载程序,支持动态更换IP和端口, 上传下载, 进度条显示, 正则校验文件格式, hash校验文件完整性,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
支持动态更换IP和端口, 上传下载, 进度条显示, 正则校验文件格式, hash校验文件完整性
另有已封装好的exe文件
客户端
# coding=utf-8
from socket import *
import json
import struct
import os,re
import hashlibip = 0
port = 0# 打印进度条
def progress(percent, symbol='█', width=40):if percent > 1: # 超过 100% 的时候让其停在 1percent = 1 # 可以避免进度条溢出show_progress = ("▌%%-%ds▌" % width) % (int(percent * width) * symbol)print("\r%s %.2f%%" % (show_progress, percent * 100), end='')# hash 校验
def Hash_md5(file_path:str):m = hashlib.md5()m.update(str(os.path.getsize(file_path)).encode("utf-8"))return m.hexdigest()# 连接
def connection():client = socket(AF_INET,SOCK_STREAM)client.connect((ip,port))return client# 下载
def download(client):client.send("1".encode("utf-8"))while 1:try:file_path = input("Please enter the file path(q/exit)>>").strip()if file_path.lower() == "q":breakif len(file_path) == 0:continueto_path = input("Please enter the save directory(q/back)>>").strip()if to_path.lower() == "q":continueif not os.path.isdir(to_path):print("not find");continueelse:file_name = input("Please enter filename(q/back)>>").strip()if file_name.lower() == "q":continueif re.search(r'[/|:*"<>\\]',file_name):print(r'Filenames cannot have these symbols:/|:*"<>\\');continuegoal_path = os.path.join(to_path,file_name)client.send(file_path.encode("utf-8"))bytes_4 = client.recv(4)if bytes_4.decode("utf-8") == "4044":print("not find");continueelse:header_bytes_len = struct.unpack("i",bytes_4)[0]header_bytes = client.recv(header_bytes_len)header_dic = json.loads(header_bytes.decode("utf-8"))date_len = header_dic["file_size"]hash_md5 = header_dic["hash"]recv_len = 0with open(goal_path,"wb")as f:while 1:date = client.recv(1024)recv_len += len(date)percent = recv_len / date_len # 接收的比例progress(percent, width=40) # 进度条的宽度40f.write(date)if recv_len == date_len: breakif hash_md5 == Hash_md5(goal_path): # hash 值校验print("\nHash auth succeed\nFile saved...")continueelse:os.remove(goal_path) # 校验失败内容删除print("Hash auth failed!!")except Exception as E:print(E);break# 上传
def uploading(client):client.send("2".encode("utf-8"))while 1:try:file_path = input("Please enter the path of the uploaded file(q/exit)>>").strip()if file_path.lower() == "q":breakfile_path = os.path.normpath(file_path)if not os.path.isfile(file_path):print("not find");continuegoal_path = input("Please enter the destination path(q/back)>>").strip()if goal_path.lower() == "q":continuegoal_path = os.path.normpath(goal_path)client.send(goal_path.encode("utf-8"))bytes_4 = client.recv(4)if bytes_4.decode("utf-8") == "4044":print("not find");continueelse:file_name = input("Please name the new file(q/back)>>").strip()if file_name.lower() == "q":continueif re.search(r'[/|:*"<>\\]',file_name):print(r'Filenames cannot have these symbols:/|:*"<>\\');continuegoal_file_path = os.path.join(goal_path,file_name)file_size = os.path.getsize(file_path)file_name = os.path.basename(file_path)md5 = Hash_md5(file_path)header_dic = {"file_name": file_name, "file_size": file_size, "hash": md5,"file_path":goal_file_path}header_json = json.dumps(header_dic)header_bytes = header_json.encode("utf-8")header_bytes_len = struct.pack("i", len(header_bytes))client.send(header_bytes_len)client.send(header_bytes)send_len = 0with open(file_path, "rb")as f:for line in f:send_len += len(line)percent = send_len / file_size # 接收的比例progress(percent, width=40) # 进度条的宽度40client.send(line)print("\nsuccessfully upload!")except Exception as E:print(E);breakfunc_dic = {"1":["download ",download],"2":["upload",uploading],"3":["IP Settings",download],
}# 连接服务端ip和端口并选择功能
def run():while True:print("Please enter the correct IP and port")global ip,portip = input("Please enter IP(q/exit)>>").strip()if ip.lower() == "q":breakport = input("Please enter PORT(q/exit)>>").strip()if port.lower() == "q":breakif port.isdigit():port = int(port)else:print("Please enter the number")continuetry:client = connection() # 检测连接是否建立成功except Exception as E:print(E);continuewhile 1:for k,v in func_dic.items():print(f"{k} : {v[0]}")select = input("Please select function>>")if select == "3":breakif select in func_dic:func_dic[select][1](client)run()
服务端
# coding=utf-8
from socket import *
import json
import struct
import os,hashlib# 绑定服务端地址
def connection():server = socket(AF_INET,SOCK_STREAM)server.bind((ip,port))server.listen(5)return server# 建立连接选择功能
def recv_send(server):while 1:print("connection...")conn,addr = server.accept()print(f"from {addr} conn")select = conn.recv(1)if select.decode("utf-8") == "1":download(conn)elif select.decode("utf-8") == "2":uploading(conn)# 客户端下载
def download(conn):while 1:try:file_path = conn.recv(1024)file_path = os.path.normpath(file_path.decode("utf-8"))if not os.path.isfile(file_path):conn.send("4044".encode("utf-8"))else:file_size = os.path.getsize(file_path)file_name = os.path.basename(file_path)m = hashlib.md5()m.update(str(file_size).encode("utf-8"))md5 = m.hexdigest()header_dic = {"file_name":file_name,"file_size":file_size,"hash":md5}header_json = json.dumps(header_dic)header_bytes = header_json.encode("utf-8")header_bytes_len = struct.pack("i",len(header_bytes))conn.send(header_bytes_len)conn.send(header_bytes)with open(file_path,"rb")as f:for line in f:conn.send(line)except Exception:break# 客户端的上传
def uploading(conn):while 1:try:dir_path = conn.recv(1024)dir_path = os.path.normpath(dir_path.decode("utf-8"))if not os.path.isdir(dir_path):conn.send("4044".encode("utf-8"));continueelse:conn.send("4444".encode("utf-8"))bytes_4 = conn.recv(4)header_bytes_len = struct.unpack("i", bytes_4)[0]header_bytes = conn.recv(header_bytes_len)header_dic = json.loads(header_bytes.decode("utf-8"))date_len = header_dic["file_size"]goal_file_path = header_dic["file_path"]recv_len = 0with open(goal_file_path, "wb")as f:while 1:date = conn.recv(1024)recv_len += len(date)f.write(date)if recv_len == date_len: breakcontinueexcept Exception as E:print(E);breakdef run():while True:print("Please enter the correct IP and port")global ip, portip = input("Please enter IP(q/exit)>>").strip()if ip.lower() == "q": breakport = input("Please enter PORT(q/exit)>>").strip()if port.lower() == "q": breakif port.isdigit():port = int(port)try:server = connection()except Exception as E:print(E);continuerecv_send(server)else:print("Please enter the number")continuerun()
这篇关于上传下载程序,支持动态更换IP和端口, 上传下载, 进度条显示, 正则校验文件格式, hash校验文件完整性的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!