架构
需要一台服务器运行服务器脚本,将所有客户端的数据包互相转发。
依赖 PyAudio 。
安装 PyAudio (以树莓派 Raspberry Pi OS 为例)
运行环境
- Python 3.9
- Raspberry Pi OS
安装步骤(使用 pip )
- 安装 portaudio.dev
$ sudo apt install portaudio.dev
- 安装依赖库
$ sudo apt install libportaudio0 libportaudio2 libportaudiocpp0 portaudio19-dev
- 安装 pyaudio
$ sudo pip3 install pyaudio
服务器端
详细代码如下。
import socket import time import threading class Server: def __init__(self): self.ip = '192.168.8.150' while True: try: self.port = 9808 self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.s.bind((self.ip, self.port)) self.s2 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.s2.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) break except: print("Couldn't bind to that port") self.connections = [] self.accept_connections() def accept_connections(self): self.s.listen(32) #max suspend queue while True: c, addr = self.s.accept() self.connections.append(c) print('Found client at {}'.format(addr)) threading.Thread(target=self.handle_client, args=(c, addr,)).start() def broadcast(self, sock, data): for client in self.connections: if client != self.s and client != sock: try: client.send(data) except: pass def handle_client(self, c, addr): while 1: try: data = c.recv(512) self.broadcast(c, data) except socket.error: c.close() server = Server()
服务器端通过主线程运行监听死循环,在客户端建立连接后随即开启新的线程用于接收客户端数据,并在每次接收后转发给客户端。
客户端
详细代码如下。
class Client: def __init__(self,ip): self.target_ip = ip self.target_port = 9808 self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.p = pyaudio.PyAudio() self.streaming = False def start_streaming(self): self.streaming = True while 1: try: self.s.connect((self.target_ip, self.target_port)) break except: print("Couldn't connect to server") time.sleep(2) chunk_size = 512 # 512 audio_format = pyaudio.paInt16 channels = 1 rate = 20000 self.playing_stream = self.p.open(format=audio_format, channels=channels, rate=rate, output=True, frames_per_buffer=chunk_size) self.recording_stream = self.p.open(format=audio_format, channels=channels, rate=rate, input=True, frames_per_buffer=chunk_size) print("Connected to Server") # start threads receive_thread = threading.Thread(target=self.receive_server_data).start() send_thread = threading.Thread(target=self.send_data_to_server).start() def stop_streaming(self): self.streaming = False time.sleep(1) self.playing_stream.stop_stream() self.playing_stream.close() self.recording_stream.stop_stream() self.recording_stream.close() self.p.terminate() print("finished clean up") def receive_server_data(self): print("started reciving data.") while self.streaming: try: data = self.s.recv(512) self.playing_stream.write(data) except: pass print("stopped reciving data.") def send_data_to_server(self): print("started sending data.") while self.streaming: try: data = self.recording_stream.read(512) self.s.sendall(data) except: pass print("stopped sending data.")
与服务端不同的是,客户端定义了一个标识符,用于停止交换数据。
使用时,首先创建 Client 对象。
client = Client(address[0])
在需要交换数据时调用 start_streaming() 方法。
client.start_streaming()
在需要停止交换数据时调用 stop_streaming() 方法。
client.stop_streaming()