c# - 为什么 IServerStreamWriter 不发送通知响应?

我从事跨平台应用程序的工作。对于它们之间的连接,我使用了 gRPC 技术。当客户端连接到服务器时,它被添加到位于服务器实现中的观察者列表中。当客户端连接时,我想向其余连接的客户端发送一条消息,告诉他们新客户端已连接。问题是,当我想使用我列表中的观察者向新客户端连接的客户端发送响应时,出现以下异常:

Grpc.Core.RpcException: 'Status(StatusCode=Unknown, Detail="Exception was thrown by handler.")'

这是我声明服务器的原型(prototype)文件:

syntax = "proto3";

package com.example.grpc.chat;

message ChatMessage {
    string from = 1;
    string message = 2;
}

message ChatMessageFromServer {
    ChatMessage message = 2;
}

service ChatService {
    rpc Login(ChatMessage ) returns (stream ChatMessageFromServer);
}

服务器代码:

 public class ChatServiceImpl : ChatService.ChatServiceBase
    {
        private static HashSet<IServerStreamWriter<ChatMessageFromServer>> responseStreams = new HashSet<IServerStreamWriter<ChatMessageFromServer>>();

        /*
         * if the stream object (from "for" statement inside this method) isn't the responseStream object given in the list with parameters,
         * the rest of clients aren't notified when a new login request is pushed.
         */
        public override async Task Login(global::Com.Example.Grpc.Chat.ChatMessage request,
            IServerStreamWriter<global::Com.Example.Grpc.Chat.ChatMessageFromServer> responseStream, 
            ServerCallContext context)
        {
            Console.WriteLine("Login method from server");
            responseStreams.Add(responseStream);

            // Create a server message that wraps the client message
            var message = new ChatMessageFromServer
            {
                Message = new ChatMessage
                {
                    From = "login",
                    Message = "hello"
                }
            };
            // If stream variable isn't equal to responseStream from list of parameters, the client corresponding to that stream isn't notified and it's thrown the above exception
            foreach (var stream in responseStreams)
            {
                await stream.WriteAsync(message);
            }
        }

    }

客户端发送登录请求的客户端代码:

public partial class ChatForm : Form
    {
        private const string Host = "localhost";
        private const int Port = 9090;

        private ChatService.ChatServiceClient _chatService;

        public ChatForm()
        {
            //InitializeComponent();
            InitializeGrpc();
        }

        private void InitializeGrpc()
        {
            // Create a channel
            var channel = new Channel(Host + ":" + Port, ChannelCredentials.Insecure);

            // Create a client with the channel
            _chatService = new ChatService.ChatServiceClient(channel);
        }

        private async void ChatForm_Load(object sender, EventArgs e)
        {

            var message = new ChatMessage
            {
                From = "Unknown",
                Message = "Login text"
            };

            // Open a connection to the server
            try
            {
                using (var call = _chatService.Login(message))
                {
                    // Read messages from the response stream
                    while (await call.ResponseStream.MoveNext(CancellationToken.None))
                    {
                        var serverMessage = call.ResponseStream.Current;
                        var otherClientMessage = serverMessage.Message;
                        var displayMessage = string.Format("{0}:{1}{2}", otherClientMessage.From, otherClientMessage.Message, Environment.NewLine);
                        chatTextBox.Text += displayMessage;
                    }

                }
            }
            catch (RpcException )
            {

                throw;
            }
        }
    }

最佳答案

您的 notifyObservers 方法是异步的,但具有 void 返回类型,这意味着您不能等待它。您正在有效地启动该方法,并在您遇到第一个使用不完整可等待对象(第一个 WriteAsync 调用)的 await 运算符时立即返回,可能)。

然后您使用 ReservationResponse 返回任务,操作完成。

当第一个等待调用完成时,notifyObservers 将继续,但此时操作已经完成,因此当您尝试写入响应流时,系统将抛出错误重新看到。

我强烈怀疑你应该从 notifyObservers 返回一个 Task 并从你的主入口方法等待它:

// Names changed to be conventional C#
public override async Task<ReservationResponse> SaveReservation(
    global::Res.Protocol.ReservationRequest request, ServerCallContext context)
{
    // some code for saving my reservation in repository database
    ReservationResponse response = new Res.Protocol.ReservationResponse
    {
        Type = ReservationResponse.Types.Type.Savereservation,
        Journey = GetProtoJourney(journey)
    };

    await NotifyObserversAsync(response);

    // Note: no Task.FromResult, as you're in an async method. The response
    // will already be wrapped in a task.
    return new ReservationResponse
    {
        Type = ReservationResponse.Types.Type.Savereservation
    };
}

public async Task NotifyObserversAsync(Res.Protocol.ReservationResponse response)
{
    foreach (var ob in responseStreams)
    {
        await ob.WriteAsync(response);
    }
}

https://stackoverflow.com/questions/55965723/

相关文章:

azure-active-directory - Azure B2C 访问 token 缺少 ten

python - 自定义记录器类 python : How to make it work acro

javascript - 使用 cheerio 在两个标签之间进行网页抓取

amazon-dynamodb - AWS DynamoDB 中没有任何注释的普通 POJO

java - @PathVariable 在路径变量 {} 前附加 "/"时出现 404 错误

python-3.x - aiohttp异步请求的任务异常

php - 电子商务 : how to display custom success notice

http - Telegram: Bad Request: failed to get HTTP U

node.js - jenkins 中私有(private) git npm 存储库的主机 key

angular - 重新分配通过异步管道使用的 Observable