grpcall/response.go

65 lines
1.3 KiB
Go
Raw Normal View History

2023-05-30 17:30:06 +08:00
package grpcall
import (
"context"
"encoding/json"
2023-05-30 17:30:06 +08:00
"fmt"
"io"
2023-05-30 17:30:06 +08:00
"github.com/jhump/protoreflect/desc"
"google.golang.org/grpc/metadata"
)
type Response struct {
method *desc.MethodDescriptor
header *metadata.MD
recv *io.PipeReader
send chan []byte
sendCompleted chan error
cancel context.CancelFunc
2023-05-30 17:30:06 +08:00
}
func (this Response) Header() metadata.MD {
return *this.header
}
func (this Response) Recv() io.Reader {
return this.recv
}
func (this Response) RecvDecoder() *json.Decoder {
return json.NewDecoder(this.Recv())
2023-05-30 17:30:06 +08:00
}
// 发送数据流
func (this Response) Send(data string) error {
2023-05-30 17:30:06 +08:00
if !this.method.IsClientStreaming() || !this.method.IsServerStreaming() {
return fmt.Errorf("%q is %s, not streaming gRPC", this.method.GetFullyQualifiedName(), this.GetCategory())
2023-05-30 17:30:06 +08:00
}
this.send <- []byte(data)
2023-05-30 17:30:06 +08:00
return <-this.sendCompleted
2023-05-30 17:30:06 +08:00
}
// 取消接收数据
func (this Response) Close() error {
2023-05-30 17:30:06 +08:00
if this.method.IsServerStreaming() {
this.cancel()
}
return this.recv.Close()
2023-05-30 17:30:06 +08:00
}
func (this Response) GetCategory() string {
if this.method.IsClientStreaming() && this.method.IsServerStreaming() {
return "bidi-streaming"
} else if this.method.IsClientStreaming() {
return "client-streaming"
} else if this.method.IsServerStreaming() {
return "server-streaming"
} else {
return "unary"
}
}