python - 如何处理来自 binance websocket 的多流数据?

我正在使用 unicorn_binance_websocket_api 从 2 个不同的时间范围流式传输 100 个加密货币的价格数据, 我想处理这些数据以存储不同加密货币各自时间范围的收盘价,然后执行我的策略以查看我需要交易的加密货币和时间范围

我将分享有关我如何能够为单个加密货币和单个时间范围编写策略的代码

from unicorn_binance_websocket_api.unicorn_binance_websocket_api_manager import
BinanceWebSocketApiManager
import json, numpy, talib

binance_websocket_api_manager = BinanceWebSocketApiManager(exchange="binance.com-futures")

binance_websocket_api_manager.create_stream('kline_1m', 'btcusdt')


closes =[]

RSI_PERIOD = 14
RSI_OVERBOUGHT = 70
RSI_OVERSOLD = 30

while True:
received_stream_data_json = binance_websocket_api_manager.pop_stream_data_from_stream_buffer()
if received_stream_data_json:
    json_data = json.loads(received_stream_data_json)
    candle_data = json_data.get('data',{})
    candle = candle_data.get('k', {})

    symboll = candle.get('s',{})
    timeframe = candle.get('i',{})
    close_prices = candle.get('c',{})
    open_prices = candle.get('o',{})
    is_candle_closed = candle.get('x',{})

    if is_candle_closed:
        closes.append(float(close_prices))

    if len(closes) > RSI_PERIOD:
        np_closes = numpy.array(closes)
        rsi = talib.RSI(np_closes,RSI_PERIOD)
        
    if (rsi[-1] > RSI_OVERBOUGHT):
        print("SELL")

    elif (rsi[-1] < RSI_OVERSOLD):
        print('BUY')

最佳答案

您只需使用 subscribe_to_stream 函数并附加您想要观看的其他 channel 和市场。我试图通过 python-binance 库手动编写它,但它看起来很粗糙、笨拙且效率低下。所以我发现了你的问题并决定改用这个 unicorn 库,我得说,它非常棒。这是我的解决方案,你不需要使用 asyncio btw

   class BinanceWs:

    def __init__(self, channels, markets):
        market = 'btcusdt'
        tf = 'kline_1w'
        self.binance_websocket_api_manager = BinanceWebSocketApiManager(exchange="binance.com-futures")
        stream = self.binance_websocket_api_manager.create_stream(tf, market)
        self.binance_websocket_api_manager.subscribe_to_stream(stream, channels, markets)

    async def run(self):
        while True:
            received_stream_data_json = self.binance_websocket_api_manager.pop_stream_data_from_stream_buffer()
            if received_stream_data_json:
                json_data = json.loads(received_stream_data_json)
                candle_data = json_data.get('data', {})

                candle = candle_data.get('k', {})
                symbol = candle.get('s', {})
                timeframe = candle.get('i', {})
                close_prices = candle.get('c', {})
                open_prices = candle.get('o', {})
                is_candle_closed = candle.get('x', {})
                print(candle_data)
                # do stuff with data ... 

async def main():
    tasks = []

    channels = ['kline_1m', 'kline_5m', 'kline_15m', 'kline_30m', 'kline_1h', 'kline_12h', 'miniTicker']
    markets = {'btcusdt', 'ethusdt', 'ltcusdt'}

    print(f'Main starting streams ... ')
    kl_socket = BinanceWs(channels=channels, markets=markets)
    task = await kl_socket.run()
    tasks.append(task)
    print(tasks)
    await asyncio.gather(*tasks)


if __name__ == "__main__":
    asyncio.run(main())

https://stackoverflow.com/questions/67434123/

相关文章:

visual-studio-code - 删除 VSCode 中的 Sublime 文本主题

javascript - 使用 ts-node 的 Typescript 声明合并无法按预期工作

python - 有没有办法缩短多个 if 语句?

python - 基于字典值映射Python列表

r - 使用数值条件对一系列列进行编码

regex - 如何将 RegEx token 传递给 RegEx 替换中的 PowerShell

amazon-web-services - CloudFormation YAML - 带有条件语句

reactjs - 创建一个可以通过函数调用显示的 React 组件(如 react-toastif

fortran - Fortran 中的嵌套名单

r - 将列中以冒号分隔的字符串拆分为 R 中的不同列