我正在尝试使用 python 的 asyncio 编写非阻塞代码。关于这个主题有几个线程,但我仍然没有设法使它们适应代码。这将是一个基于 this 的最小示例:
import asyncio
import websockets
async def ws_rec(websocket, path):
while True:
data = await websocket.recv()
print(data)
start_server = websockets.serve(ws_rec, 'localhost', 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
print("ok")
如何调整代码以打印“ok”?为什么我什至需要 asyncio?
最佳答案
非常感谢您的回答。 我找到了使用 simple-websocket 的解决方法
from simple_websocket_server import WebSocketServer, WebSocket
class SimpleEcho(WebSocket):
def handle(self):
# echo message back to client
print(self.data)
self.send_message(self.data)
def connected(self):
print(self.address, 'connected')
def handle_close(self):
print(self.address, 'closed')
def run():
server = WebSocketServer('localhost', 8765, SimpleEcho)
server.serve_forever()
from threading import Thread
ws_run = Thread(target=run)
ws_run.start()
print("ok")
现在,这对我来说似乎是一个可行的解决方案。
https://stackoverflow.com/questions/56438301/
相关文章:
junit - 覆盖本地函数中的 Mockito 语句不适用于 PER CLASS 模式?
ios - 在应用程序生命周期的中间将本地 Realm 转换为同步 Realm (在 Swift 中
spring-cloud - Spring Cloud Gateway 阻止路由发现请求
ionic-framework - LocalStorage 中设置的键值可供其他应用程序访问
csv - 在 Java 中解析大型 CSV 文件的最快最有效的方法
azure-ad-b2c - Azure AD B2C 中的自定义策略不允许为属性保存空字符串
python-3.x - 如何在 Visual Studio Code 中使用 activate.b