一、HTTP/HTTPS 通信
1. 客户端示例(requests 库)
- import requests
- # HTTP GET
- response = requests.get('http://httpbin.org/get')
- print(response.text)
- # HTTPS POST
- response = requests.post(
- 'https://httpbin.org/post',
- data={'key': 'value'},
- verify=True # 验证 SSL 证书(默认)
- )
- print(response.json())
复制代码 2. 服务端示例(Flask)
- from flask import Flask, request
- app = Flask(__name__)
- @app.route('/api', methods=['GET', 'POST'])
- def handle_request():
- if request.method == 'GET':
- return {'message': 'GET received'}
- elif request.method == 'POST':
- return {'data': request.json}
- if __name__ == '__main__':
- app.run(ssl_context='adhoc') # 启用 HTTPS
复制代码 二、UDP 通信
1. 服务端
- import socket
- server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- server.bind(('0.0.0.0', 9999))
- while True:
- data, addr = server.recvfrom(1024)
- print(f"Received from {addr}: {data.decode()}")
- server.sendto(b'UDP response', addr)
复制代码
2. 客户端
- import socket
- client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- client.sendto(b'Hello UDP', ('localhost', 9999))
- response, addr = client.recvfrom(1024)
- print(f"Received: {response.decode()}")
复制代码 三、WebSocket 通信
需要安装库:1. 服务端
- import asyncio
- import websockets
- async def handler(websocket):
- async for message in websocket:
- print(f"Received: {message}")
- await websocket.send(f"Echo: {message}")
- async def main():
- async with websockets.serve(handler, "localhost", 8765):
- await asyncio.Future() # 永久运行
- asyncio.run(main())
复制代码 2. 客户端
- import asyncio
- import websockets
- async def client():
- async with websockets.connect("ws://localhost:8765") as ws:
- await ws.send("Hello WebSocket!")
- response = await ws.recv()
- print(f"Received: {response}")
- asyncio.run(client())
复制代码 四、Server-Sent Events (SSE)
需要安装库:
1. 服务端(Flask 实现)
- from flask import Flask, Response
- app = Flask(__name__)
- @app.route('/stream')
- def stream():
- def event_stream():
- for i in range(5):
- yield f"data: Message {i}\n\n"
- return Response(event_stream(), mimetype="text/event-stream")
- if __name__ == '__main__':
- app.run()
复制代码 2. 客户端
- import requests
- from sseclient import SSEClient
- url = 'http://localhost:5000/stream'
- response = requests.get(url, stream=True)
- client = SSEClient(response)
- for event in client.events():
- print(f"Received event: {event.data}")
复制代码 关键点说明
- HTTP/HTTPS:最常用的请求-响应模型
- UDP:无连接协议,适合实时性要求高的场景
- WebSocket:全双工通信协议,适合实时双向通信
- SSE:服务器到客户端的单向推送技术
- 安全建议:
- HTTPS 使用 requests 的验证证书
- WebSocket 使用安全协议
- 生产环境应使用正式 SSL 证书
根据具体需求选择协议:
- 简单数据请求:HTTP/HTTPS
- 实时游戏/视频:UDP
- 聊天应用:WebSocket
- 实时通知:SSE
建议根据实际场景配合使用异步框架(如 aiohttp、FastAPI)以获得更好的性能。
到此这篇关于Python实现常见网络通信的示例详解的文章就介绍到这了,更多相关Python网络通信内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
来源:https://www.jb51.net/python/3398112xz.htm
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |
|