From 02cf3c74a5bbab7d52846f4ecdfaf141a56b3c09 Mon Sep 17 00:00:00 2001 From: what Date: Mon, 5 Jun 2023 13:37:32 +0800 Subject: [PATCH] =?UTF-8?q?[feat]=20job=20=E5=92=8C=20grpc=20=E6=94=AF?= =?UTF-8?q?=E6=8C=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- controller.go | 133 ++++++++++++++++++++++++++++++++++++++++++-------- go.mod | 12 +++-- go.sum | 21 +++++--- grpc.go | 44 +++++++++++++++++ job.go | 8 +++ 5 files changed, 185 insertions(+), 33 deletions(-) create mode 100644 job.go diff --git a/controller.go b/controller.go index 0900fa3..2067fab 100644 --- a/controller.go +++ b/controller.go @@ -8,50 +8,143 @@ import ( ) type Controller interface { - AuthDB() ResAuthDB - Execute(GlobalParams) any - ExecuteWs(WsClient, GlobalParams) error - WsClientId(GlobalParams) WsClientID - WsClientGroup(GlobalParams) WsClientGroup - Call(code string, params map[string]any, category ...RouteCategory) (HttpResponse, error) + Container() *do.Injector + // 派遣一个任务 + Dispatch(job string, payload any, u User) error } type GRpController interface { + Controller + // 获取 gRPC 服务的描述信息 GetGRpcServiceDesc() *grpc.ServiceDesc } +type JobController interface { + Controller + // 任务处理 + Handle(any) error +} + +type HttpController interface { + Controller + // 获取用户信息 + User() User + // 获取请求信息 + Request() *http.Request + // 请求处理 + Execute(GlobalParams) any + // 路由信息 + Route() Route + // 内部调用 + Call(code string, params map[string]any, category ...RouteCategory) (HttpResponse, error) +} + +type WsController interface { + HttpController + // 获取 Ws 客户端 + WsClient() WsClient + // 获取 Ws 标识 + WsClientId(GlobalParams) WsClientID + // 获取 Ws 分组 + WsClientGroup(GlobalParams) WsClientGroup +} + type BaseController struct { - User User - Container *do.Injector - Request *http.Request - request *http.Request - Route Route + container *do.Injector +} + +type GRpcBaseController struct { + Controller +} + +type JobBaseController struct { + Controller +} + +type HttpBaseController struct { + Controller + request *http.Request +} + +type WsBaseController struct { + HttpController + ws WsClient } var defaultWsClientGroup WsClientGroup = "__DEFAULT__" var wsClientID WsClientID = 0 -func (BaseController) AuthDB() ResAuthDB { - return ResAuthOff +func (this BaseController) Container() *do.Injector { + return this.container } -func (BaseController) Execute(params GlobalParams) any { +func (this BaseController) Dispatch(job string, payload any, u User) error { + return do.MustInvoke[Job](this.Container()).Dispatch(job, payload, u) +} + +func NewHttpBaseController(container *do.Injector, request *http.Request) HttpController { + return &HttpBaseController{ + Controller: &BaseController{container}, + request: request, + } +} + +func (this HttpBaseController) Request() *http.Request { + return this.request +} + +func (this HttpBaseController) Route() Route { + return this.Request().Context().Value(RouteCtx{Name: "Route"}).(Route) +} + +func (this HttpBaseController) User() User { + return this.Request().Context().Value(RouteCtx{Name: "User"}).(User) +} + +func (this HttpBaseController) Call(code string, params map[string]any, category ...RouteCategory) (HttpResponse, error) { + return do.MustInvoke[Router](this.Container()).Call(this.Request(), code, params, category...) +} + +func (HttpBaseController) Execute(params GlobalParams) any { return nil } -func (BaseController) ExecuteWs(wc WsClient, params GlobalParams) error { - return nil +func NewWsBaseController(ws WsClient, container *do.Injector, request *http.Request) WsController { + return &WsBaseController{ + HttpController: NewHttpBaseController(container, request), + ws: ws, + } } -func (BaseController) WsClientId(GlobalParams) WsClientID { +func (this WsBaseController) WsClient() WsClient { + return this.ws +} + +func (WsBaseController) WsClientId(GlobalParams) WsClientID { wsClientID++ return wsClientID } -func (BaseController) WsClientGroup(GlobalParams) WsClientGroup { +func (WsBaseController) WsClientGroup(GlobalParams) WsClientGroup { return defaultWsClientGroup } -func (this BaseController) Call(code string, params map[string]any, category ...RouteCategory) (HttpResponse, error) { - return do.MustInvoke[Router](this.Container).Call(this.Request, code, params, category...) +func NewGRpBaseController(container *do.Injector) JobController { + return &JobBaseController{ + Controller: &BaseController{container}, + } +} + +func (JobBaseController) Handle(any) error { + return nil +} + +func NewJobBaseController(container *do.Injector) GRpController { + return &GRpcBaseController{ + Controller: &BaseController{container}, + } +} + +func (GRpcBaseController) GetGRpcServiceDesc() *grpc.ServiceDesc { + return nil } diff --git a/go.mod b/go.mod index 16cebda..1feedcc 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,10 @@ go 1.18 require ( git.fsdpf.net/go/db v0.0.0-20230412075825-59b3faa171bc github.com/go-chi/chi v1.5.4 + github.com/golang/protobuf v1.5.2 github.com/google/uuid v1.3.0 github.com/gorilla/websocket v1.5.0 + github.com/jhump/protoreflect v1.15.1 github.com/lestrrat-go/jwx v1.2.25 github.com/samber/do v1.6.0 github.com/samber/lo v1.38.1 @@ -18,11 +20,11 @@ require ( ) require ( + github.com/bufbuild/protocompile v0.4.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.0-20210816181553-5444fa50b93d // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-sql-driver/mysql v1.7.0 // indirect github.com/goccy/go-json v0.9.7 // indirect - github.com/golang/protobuf v1.5.2 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/lestrrat-go/backoff/v2 v2.0.8 // indirect github.com/lestrrat-go/blackmagic v1.0.0 // indirect @@ -42,11 +44,11 @@ require ( github.com/tidwall/pretty v1.2.0 // indirect golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect golang.org/x/exp v0.0.0-20220303212507-bbda1eaf7a17 // indirect - golang.org/x/net v0.4.0 // indirect - golang.org/x/sys v0.3.0 // indirect - golang.org/x/text v0.5.0 // indirect + golang.org/x/net v0.7.0 // indirect + golang.org/x/sys v0.5.0 // indirect + golang.org/x/text v0.7.0 // indirect google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect - google.golang.org/protobuf v1.28.1 // indirect + google.golang.org/protobuf v1.28.2-0.20230222093303-bc1253ad3743 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 8086cd2..61e4833 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,8 @@ git.fsdpf.net/go/db v0.0.0-20230412075825-59b3faa171bc h1:d3ZqjNGbsgVknkbvATV96r git.fsdpf.net/go/db v0.0.0-20230412075825-59b3faa171bc/go.mod h1:397Sdx1cJS0OlHtTX1bVl//9k3Xn0Klnc6jC4MAkb6w= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/bufbuild/protocompile v0.4.0 h1:LbFKd2XowZvQ/kajzguUp2DC9UEIQhIq77fZZlaQsNA= +github.com/bufbuild/protocompile v0.4.0/go.mod h1:3v93+mbWn/v3xzN+31nwkJfrEpAUwp+BagBSZWx+TP8= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= @@ -141,6 +143,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= +github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c= +github.com/jhump/protoreflect v1.15.1/go.mod h1:jD/2GMKKE6OqX8qTjhADU1e6DShO+gavG9e0Q693nKo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= @@ -302,8 +306,8 @@ golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.4.0 h1:Q5QPcMlvfxFTAPV0+07Xz/MpK9NTXu2VDUuy0FeMfaU= -golang.org/x/net v0.4.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE= +golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g= +golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -323,6 +327,7 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -359,8 +364,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= -golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -369,8 +374,8 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.5.0 h1:OLmvp0KP+FVG99Ct/qFiL/Fhk4zp4QQnZ7b2U+5piUM= -golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -519,8 +524,8 @@ google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGj google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= -google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.28.2-0.20230222093303-bc1253ad3743 h1:yqElulDvOF26oZ2O+2/aoX7mQ8DY/6+p39neytrycd8= +google.golang.org/protobuf v1.28.2-0.20230222093303-bc1253ad3743/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/grpc.go b/grpc.go index 451a00c..b49b4a5 100644 --- a/grpc.go +++ b/grpc.go @@ -1,7 +1,51 @@ package contracts +import ( + "encoding/json" + "io" + + "github.com/golang/protobuf/jsonpb" + "github.com/jhump/protoreflect/desc" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + type GRpc interface { Start() error Stop() error Restart() error } + +type GRpcResponse interface { + // 获取响应头 + Header() metadata.MD + // 发送 stream 数据 + Send(data string) error + // 接收 unary / stream 数据 + Recv() io.Reader + // 接收 unary / stream 数据, 并转 json 对象 + RecvDecoder() *json.Decoder + // 关闭 stream 数据 + Close() error +} + +type GRpcall interface { + // 接口请求 + Invoke(service, method, data string, headers []string) (GRpcResponse, error) + // 获取客户端连接 + GetClientConn() *grpc.ClientConn + // 获取服务端, 服务列表 + GetServices() ([]string, error) + // 获取服务端, 服务下的方法列表 + GetServiceMethods(name string) ([]string, error) + // 获取服务所有类型 + GetAnyResolver() (jsonpb.AnyResolver, error) + // 获取 proto 文件描述 + GetAllFilesDescriptor() ([]*desc.FileDescriptor, error) + // 获取服务描述 + GetServiceDescriptor(service string) (*desc.ServiceDescriptor, error) + // 获取服务方法描述 + GetServiceMethodDescriptor(service, method string) (*desc.MethodDescriptor, error) + // 关闭客户端连接 + Close() error +} diff --git a/job.go b/job.go new file mode 100644 index 0000000..9462737 --- /dev/null +++ b/job.go @@ -0,0 +1,8 @@ +package contracts + +type Job interface { + Start() error + Stop() error + Restart() error + Dispatch(job string, payload any, u User) error +}