diff --git a/contrib/registry/file/file_discovery.go b/contrib/registry/file/file_discovery.go index e6b4560b3..36065cc11 100644 --- a/contrib/registry/file/file_discovery.go +++ b/contrib/registry/file/file_discovery.go @@ -94,6 +94,7 @@ func (r *Registry) getServices(ctx context.Context) (services []gsvc.Service, er `service "%s" is expired, update at: %s, current: %s, sub duration: %s`, s.GetKey(), updateAt.String(), nowTime.String(), subDuration.String(), ) + _ = gfile.Remove(filePath) continue } services = append(services, s) diff --git a/contrib/rpc/grpcx/grpcx_grpc_client.go b/contrib/rpc/grpcx/grpcx_grpc_client.go index 48015da4f..4aeb05856 100644 --- a/contrib/rpc/grpcx/grpcx_grpc_client.go +++ b/contrib/rpc/grpcx/grpcx_grpc_client.go @@ -8,12 +8,14 @@ package grpcx import ( "fmt" - "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "github.com/gogf/gf/v2/errors/gcode" + "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/net/gsel" "github.com/gogf/gf/v2/net/gsvc" + "github.com/gogf/gf/v2/text/gstr" ) // DefaultGrpcDialOptions returns the default options for creating grpc client connection. @@ -25,13 +27,37 @@ func (c modClient) DefaultGrpcDialOptions() []grpc.DialOption { } // NewGrpcClientConn creates and returns a client connection for given service `appId`. -func (c modClient) NewGrpcClientConn(serviceName string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { +func (c modClient) NewGrpcClientConn(serviceNameOrAddress string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { autoLoadAndRegisterFileRegistry() var ( - service = gsvc.NewServiceWithName(serviceName) + dialAddress = serviceNameOrAddress grpcClientOptions = make([]grpc.DialOption, 0) ) + if isServiceName(serviceNameOrAddress) { + dialAddress = fmt.Sprintf( + `%s://%s`, + gsvc.Schema, gsvc.NewServiceWithName(serviceNameOrAddress).GetKey(), + ) + } else { + addressParts := gstr.Split(serviceNameOrAddress, gsvc.EndpointHostPortDelimiter) + switch len(addressParts) { + case 2: + if addressParts[0] == "" { + return nil, gerror.NewCodef( + gcode.CodeInvalidParameter, + `invalid address "%s" for client, missing host`, + serviceNameOrAddress, + ) + } + default: + return nil, gerror.NewCodef( + gcode.CodeInvalidParameter, + `invalid address "%s" for client`, + serviceNameOrAddress, + ) + } + } grpcClientOptions = append(grpcClientOptions, c.DefaultGrpcDialOptions()...) if len(opts) > 0 { grpcClientOptions = append(grpcClientOptions, opts...) @@ -43,7 +69,7 @@ func (c modClient) NewGrpcClientConn(serviceName string, opts ...grpc.DialOption grpcClientOptions = append(grpcClientOptions, c.ChainStream( c.StreamTracing, )) - conn, err := grpc.Dial(fmt.Sprintf(`%s://%s`, gsvc.Schema, service.GetKey()), grpcClientOptions...) + conn, err := grpc.Dial(dialAddress, grpcClientOptions...) if err != nil { return nil, err } @@ -52,8 +78,8 @@ func (c modClient) NewGrpcClientConn(serviceName string, opts ...grpc.DialOption // MustNewGrpcClientConn creates and returns a client connection for given service `appId`. // It panics if any error occurs. -func (c modClient) MustNewGrpcClientConn(serviceName string, opts ...grpc.DialOption) *grpc.ClientConn { - conn, err := c.NewGrpcClientConn(serviceName, opts...) +func (c modClient) MustNewGrpcClientConn(serviceNameOrAddress string, opts ...grpc.DialOption) *grpc.ClientConn { + conn, err := c.NewGrpcClientConn(serviceNameOrAddress, opts...) if err != nil { panic(err) } @@ -75,3 +101,9 @@ func (c modClient) ChainUnary(interceptors ...grpc.UnaryClientInterceptor) grpc. func (c modClient) ChainStream(interceptors ...grpc.StreamClientInterceptor) grpc.DialOption { return grpc.WithChainStreamInterceptor(interceptors...) } + +// isServiceName checks and returns whether given input parameter is service name or not. +// It checks by whether the parameter is address by containing port delimiter character ':'. +func isServiceName(serviceNameOrAddress string) bool { + return !gstr.Contains(serviceNameOrAddress, gsvc.EndpointHostPortDelimiter) +} diff --git a/contrib/rpc/grpcx/grpcx_grpc_server.go b/contrib/rpc/grpcx/grpcx_grpc_server.go index c99fd433e..a44f5f118 100644 --- a/contrib/rpc/grpcx/grpcx_grpc_server.go +++ b/contrib/rpc/grpcx/grpcx_grpc_server.go @@ -129,11 +129,14 @@ func (s *GrpcServer) doSignalListen() { var ctx = context.Background() gproc.AddSigHandlerShutdown(func(sig os.Signal) { s.Logger().Infof(ctx, "signal received: %s, gracefully shutting down", sig.String()) + // Deregister services when shutdown signal triggers. s.doServiceDeregister() time.Sleep(time.Second) s.Stop() }) gproc.Listen() + // Deregister services when process ends. + s.doServiceDeregister() } // Logger is alias of GetLogger. @@ -195,6 +198,7 @@ func (s *GrpcServer) doServiceDeregister() { s.Logger().Errorf(ctx, `%+v`, err) } } + s.services = s.services[:0] } // Start starts the server in no-blocking way. @@ -213,6 +217,7 @@ func (s *GrpcServer) Wait() { // Stop gracefully stops the server. func (s *GrpcServer) Stop() { + s.doServiceDeregister() s.Server.GracefulStop() } diff --git a/contrib/rpc/grpcx/grpcx_registry_file.go b/contrib/rpc/grpcx/grpcx_registry_file.go index c1b0961fe..4605b94e8 100644 --- a/contrib/rpc/grpcx/grpcx_registry_file.go +++ b/contrib/rpc/grpcx/grpcx_registry_file.go @@ -23,10 +23,15 @@ func autoLoadAndRegisterFileRegistry() { return } var ( - ctx = gctx.GetInitCtx() - fileRegistry = file.New(gfile.Temp("gsvc")) + ctx = gctx.GetInitCtx() + directoryPath = gfile.Temp("gsvc") + fileRegistry = file.New(directoryPath) ) - g.Log().Debug(ctx, `set default registry using file registry as no custom registry set`) + g.Log().Debugf( + ctx, + `set default registry using file registry as no custom registry set, path: %s`, + directoryPath, + ) Resolver.Register(fileRegistry) } diff --git a/contrib/rpc/grpcx/grpcx_unit_ctx_test.go b/contrib/rpc/grpcx/grpcx_unit_z_ctx_test.go similarity index 100% rename from contrib/rpc/grpcx/grpcx_unit_ctx_test.go rename to contrib/rpc/grpcx/grpcx_unit_z_ctx_test.go diff --git a/contrib/rpc/grpcx/grpcx_unit_z_grpc_server_basic_test.go b/contrib/rpc/grpcx/grpcx_unit_z_grpc_server_basic_test.go new file mode 100644 index 000000000..f3ffb137b --- /dev/null +++ b/contrib/rpc/grpcx/grpcx_unit_z_grpc_server_basic_test.go @@ -0,0 +1,63 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package grpcx_test + +import ( + "fmt" + "testing" + "time" + + "github.com/gogf/gf/contrib/rpc/grpcx/v2" + "github.com/gogf/gf/contrib/rpc/grpcx/v2/testdata/controller" + "github.com/gogf/gf/contrib/rpc/grpcx/v2/testdata/protobuf" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/net/gipv4" + "github.com/gogf/gf/v2/os/gctx" + "github.com/gogf/gf/v2/test/gtest" + "github.com/gogf/gf/v2/util/guid" +) + +func Test_Grpcx_Grpc_Server_Basic(t *testing.T) { + c := grpcx.Server.NewConfig() + c.Name = guid.S() + s := grpcx.Server.New(c) + controller.Register(s) + s.Start() + time.Sleep(time.Millisecond * 100) + defer s.Stop() + + // use service discovery. + gtest.C(t, func(t *gtest.T) { + var ( + ctx = gctx.New() + conn = grpcx.Client.MustNewGrpcClientConn(c.Name) + client = protobuf.NewGreeterClient(conn) + ) + res, err := client.SayHello(ctx, &protobuf.HelloRequest{Name: "World"}) + if err != nil { + g.Log().Error(ctx, err) + return + } + t.Assert(res.Message, `Hello World`) + }) + + // use direct address. + gtest.C(t, func(t *gtest.T) { + var ( + ctx = gctx.New() + address = fmt.Sprintf(`%s:%d`, gipv4.MustGetIntranetIp(), s.GetListenedPort()) + conn = grpcx.Client.MustNewGrpcClientConn(address) + client = protobuf.NewGreeterClient(conn) + ) + res, err := client.SayHello(ctx, &protobuf.HelloRequest{Name: "World"}) + if err != nil { + g.Log().Error(ctx, err) + return + } + t.Assert(res.Message, `Hello World`) + }) +} diff --git a/contrib/rpc/grpcx/grpcx_grpc_server_config_test.go b/contrib/rpc/grpcx/grpcx_unit_z_grpc_server_config_test.go similarity index 100% rename from contrib/rpc/grpcx/grpcx_grpc_server_config_test.go rename to contrib/rpc/grpcx/grpcx_unit_z_grpc_server_config_test.go diff --git a/contrib/rpc/grpcx/internal/balancer/balancer_builder.go b/contrib/rpc/grpcx/internal/balancer/balancer_builder.go index e87d7f3a0..6e2de3210 100644 --- a/contrib/rpc/grpcx/internal/balancer/balancer_builder.go +++ b/contrib/rpc/grpcx/internal/balancer/balancer_builder.go @@ -33,6 +33,13 @@ func (b *Builder) Build(info base.PickerBuildInfo) balancer.Picker { ) for conn, subConnInfo := range info.ReadySCs { svc, _ := subConnInfo.Address.Attributes.Value(rawSvcKeyInSubConnInfo).(gsvc.Service) + if svc == nil && subConnInfo.Address.Addr != "" { + // It might be a direct address without service name, it so creates a default service. + svc = &gsvc.LocalService{ + Name: subConnInfo.Address.ServerName, + Endpoints: gsvc.NewEndpoints(subConnInfo.Address.Addr), + } + } if svc == nil { g.Log().Noticef(ctx, `empty service read from: %+v`, subConnInfo.Address) continue diff --git a/contrib/rpc/grpcx/testdata/controller/helloworld.go b/contrib/rpc/grpcx/testdata/controller/helloworld.go new file mode 100644 index 000000000..94a6bf441 --- /dev/null +++ b/contrib/rpc/grpcx/testdata/controller/helloworld.go @@ -0,0 +1,27 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package controller + +import ( + "context" + + "github.com/gogf/gf/contrib/rpc/grpcx/v2" + "github.com/gogf/gf/contrib/rpc/grpcx/v2/testdata/protobuf" +) + +type Controller struct { + protobuf.UnimplementedGreeterServer +} + +func Register(s *grpcx.GrpcServer) { + protobuf.RegisterGreeterServer(s.Server, &Controller{}) +} + +// SayHello implements helloworld.GreeterServer +func (s *Controller) SayHello(ctx context.Context, in *protobuf.HelloRequest) (*protobuf.HelloReply, error) { + return &protobuf.HelloReply{Message: "Hello " + in.GetName()}, nil +} diff --git a/contrib/rpc/grpcx/testdata/protobuf/helloworld.pb.go b/contrib/rpc/grpcx/testdata/protobuf/helloworld.pb.go new file mode 100644 index 000000000..8af3b213b --- /dev/null +++ b/contrib/rpc/grpcx/testdata/protobuf/helloworld.pb.go @@ -0,0 +1,217 @@ +// protoc --go_out=paths=source_relative:. --go-grpc_out=paths=source_relative:. *.proto + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.12 +// source: helloworld.proto + +package protobuf + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// The request message containing the user's name. +type HelloRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` +} + +func (x *HelloRequest) Reset() { + *x = HelloRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_helloworld_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HelloRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HelloRequest) ProtoMessage() {} + +func (x *HelloRequest) ProtoReflect() protoreflect.Message { + mi := &file_helloworld_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HelloRequest.ProtoReflect.Descriptor instead. +func (*HelloRequest) Descriptor() ([]byte, []int) { + return file_helloworld_proto_rawDescGZIP(), []int{0} +} + +func (x *HelloRequest) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +// The response message containing the greetings +type HelloReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Message string `protobuf:"bytes,1,opt,name=message,proto3" json:"message,omitempty"` +} + +func (x *HelloReply) Reset() { + *x = HelloReply{} + if protoimpl.UnsafeEnabled { + mi := &file_helloworld_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HelloReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HelloReply) ProtoMessage() {} + +func (x *HelloReply) ProtoReflect() protoreflect.Message { + mi := &file_helloworld_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HelloReply.ProtoReflect.Descriptor instead. +func (*HelloReply) Descriptor() ([]byte, []int) { + return file_helloworld_proto_rawDescGZIP(), []int{1} +} + +func (x *HelloReply) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +var File_helloworld_proto protoreflect.FileDescriptor + +var file_helloworld_proto_rawDesc = []byte{ + 0x0a, 0x10, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x77, 0x6f, 0x72, 0x6c, 0x64, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x12, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x22, 0x22, 0x0a, 0x0c, + 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, + 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, + 0x22, 0x26, 0x0a, 0x0a, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x18, + 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0x45, 0x0a, 0x07, 0x47, 0x72, 0x65, 0x65, + 0x74, 0x65, 0x72, 0x12, 0x3a, 0x0a, 0x08, 0x53, 0x61, 0x79, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x12, + 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x48, 0x65, 0x6c, 0x6c, 0x6f, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x14, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, + 0x75, 0x66, 0x2e, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x42, + 0x35, 0x5a, 0x33, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, + 0x67, 0x66, 0x2f, 0x67, 0x66, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x65, 0x78, 0x61, 0x6d, 0x70, + 0x6c, 0x65, 0x2f, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x77, 0x6f, 0x72, 0x6c, 0x64, 0x2f, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_helloworld_proto_rawDescOnce sync.Once + file_helloworld_proto_rawDescData = file_helloworld_proto_rawDesc +) + +func file_helloworld_proto_rawDescGZIP() []byte { + file_helloworld_proto_rawDescOnce.Do(func() { + file_helloworld_proto_rawDescData = protoimpl.X.CompressGZIP(file_helloworld_proto_rawDescData) + }) + return file_helloworld_proto_rawDescData +} + +var file_helloworld_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_helloworld_proto_goTypes = []interface{}{ + (*HelloRequest)(nil), // 0: protobuf.HelloRequest + (*HelloReply)(nil), // 1: protobuf.HelloReply +} +var file_helloworld_proto_depIdxs = []int32{ + 0, // 0: protobuf.Greeter.SayHello:input_type -> protobuf.HelloRequest + 1, // 1: protobuf.Greeter.SayHello:output_type -> protobuf.HelloReply + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_helloworld_proto_init() } +func file_helloworld_proto_init() { + if File_helloworld_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_helloworld_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*HelloRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_helloworld_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*HelloReply); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_helloworld_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_helloworld_proto_goTypes, + DependencyIndexes: file_helloworld_proto_depIdxs, + MessageInfos: file_helloworld_proto_msgTypes, + }.Build() + File_helloworld_proto = out.File + file_helloworld_proto_rawDesc = nil + file_helloworld_proto_goTypes = nil + file_helloworld_proto_depIdxs = nil +} diff --git a/contrib/rpc/grpcx/testdata/protobuf/helloworld.proto b/contrib/rpc/grpcx/testdata/protobuf/helloworld.proto new file mode 100644 index 000000000..9f4f4864f --- /dev/null +++ b/contrib/rpc/grpcx/testdata/protobuf/helloworld.proto @@ -0,0 +1,24 @@ +// protoc --go_out=paths=source_relative:. --go-grpc_out=paths=source_relative:. *.proto + +syntax = "proto3"; + +package protobuf; + +option go_package = "github.com/gogf/gf/grpc/example/helloworld/protobuf"; + + +// The greeting service definition. +service Greeter { + // Sends a greeting + rpc SayHello (HelloRequest) returns (HelloReply) {} +} + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} diff --git a/contrib/rpc/grpcx/testdata/protobuf/helloworld_grpc.pb.go b/contrib/rpc/grpcx/testdata/protobuf/helloworld_grpc.pb.go new file mode 100644 index 000000000..af29539f4 --- /dev/null +++ b/contrib/rpc/grpcx/testdata/protobuf/helloworld_grpc.pb.go @@ -0,0 +1,107 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.21.12 +// source: helloworld.proto + +package protobuf + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// GreeterClient is the client API for Greeter service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type GreeterClient interface { + // Sends a greeting + SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) +} + +type greeterClient struct { + cc grpc.ClientConnInterface +} + +func NewGreeterClient(cc grpc.ClientConnInterface) GreeterClient { + return &greeterClient{cc} +} + +func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) { + out := new(HelloReply) + err := c.cc.Invoke(ctx, "/protobuf.Greeter/SayHello", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// GreeterServer is the server API for Greeter service. +// All implementations must embed UnimplementedGreeterServer +// for forward compatibility +type GreeterServer interface { + // Sends a greeting + SayHello(context.Context, *HelloRequest) (*HelloReply, error) + mustEmbedUnimplementedGreeterServer() +} + +// UnimplementedGreeterServer must be embedded to have forward compatible implementations. +type UnimplementedGreeterServer struct { +} + +func (UnimplementedGreeterServer) SayHello(context.Context, *HelloRequest) (*HelloReply, error) { + return nil, status.Errorf(codes.Unimplemented, "method SayHello not implemented") +} +func (UnimplementedGreeterServer) mustEmbedUnimplementedGreeterServer() {} + +// UnsafeGreeterServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to GreeterServer will +// result in compilation errors. +type UnsafeGreeterServer interface { + mustEmbedUnimplementedGreeterServer() +} + +func RegisterGreeterServer(s grpc.ServiceRegistrar, srv GreeterServer) { + s.RegisterService(&Greeter_ServiceDesc, srv) +} + +func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(HelloRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(GreeterServer).SayHello(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/protobuf.Greeter/SayHello", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(GreeterServer).SayHello(ctx, req.(*HelloRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// Greeter_ServiceDesc is the grpc.ServiceDesc for Greeter service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Greeter_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "protobuf.Greeter", + HandlerType: (*GreeterServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "SayHello", + Handler: _Greeter_SayHello_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "helloworld.proto", +} diff --git a/net/ghttp/ghttp.go b/net/ghttp/ghttp.go index 7359679c1..8a85eb040 100644 --- a/net/ghttp/ghttp.go +++ b/net/ghttp/ghttp.go @@ -10,6 +10,7 @@ package ghttp import ( "net/http" "reflect" + "sync" "time" "github.com/gorilla/websocket" @@ -42,6 +43,7 @@ type ( statusHandlerMap map[string][]HandlerFunc // Custom status handler map. sessionManager *gsession.Manager // Session manager. openapi *goai.OpenApiV3 // The OpenApi specification management object. + serviceMu sync.Mutex // Concurrent safety for operations of attribute service. service gsvc.Service // The service for Registry. registrar gsvc.Registrar // Registrar for service register. } diff --git a/net/ghttp/ghttp_server.go b/net/ghttp/ghttp_server.go index 782781693..2726cf862 100644 --- a/net/ghttp/ghttp_server.go +++ b/net/ghttp/ghttp_server.go @@ -451,6 +451,7 @@ func Wait() { go handleProcessSignal() <-allShutdownChan + // Remove plugins. serverMapping.Iterator(func(k string, v interface{}) bool { s := v.(*Server) diff --git a/net/ghttp/ghttp_server_admin.go b/net/ghttp/ghttp_server_admin.go index 032a74609..2928493a4 100644 --- a/net/ghttp/ghttp_server_admin.go +++ b/net/ghttp/ghttp_server_admin.go @@ -84,9 +84,8 @@ func (s *Server) EnableAdmin(pattern ...string) { // Shutdown shuts down current server. func (s *Server) Shutdown() error { - var ( - ctx = context.TODO() - ) + var ctx = context.TODO() + s.doServiceDeregister() // Only shut down current servers. // It may have multiple underlying http servers. for _, v := range s.servers { diff --git a/net/ghttp/ghttp_server_admin_process.go b/net/ghttp/ghttp_server_admin_process.go index e1fec2fe1..9bfead594 100644 --- a/net/ghttp/ghttp_server_admin_process.go +++ b/net/ghttp/ghttp_server_admin_process.go @@ -258,7 +258,9 @@ func shutdownWebServersGracefully(ctx context.Context, signal os.Signal) { } serverMapping.RLockFunc(func(m map[string]interface{}) { for _, v := range m { - for _, s := range v.(*Server).servers { + server := v.(*Server) + server.doServiceDeregister() + for _, s := range server.servers { s.shutdown(ctx) } } diff --git a/net/ghttp/ghttp_server_registry.go b/net/ghttp/ghttp_server_registry.go index 015e1b3a3..5a8ae5aff 100644 --- a/net/ghttp/ghttp_server_registry.go +++ b/net/ghttp/ghttp_server_registry.go @@ -22,6 +22,8 @@ func (s *Server) doServiceRegister() { if s.registrar == nil { return } + s.serviceMu.Lock() + defer s.serviceMu.Unlock() var ( ctx = gctx.GetInitCtx() protocol = gsvc.DefaultProtocol @@ -56,11 +58,17 @@ func (s *Server) doServiceDeregister() { if s.registrar == nil { return } + s.serviceMu.Lock() + defer s.serviceMu.Unlock() + if s.service == nil { + return + } var ctx = gctx.GetInitCtx() s.Logger().Debugf(ctx, `service deregister: %+v`, s.service) if err := s.registrar.Deregister(ctx, s.service); err != nil { s.Logger().Errorf(ctx, `%+v`, err) } + s.service = nil } func (s *Server) calculateListenedEndpoints(ctx context.Context) gsvc.Endpoints { diff --git a/net/gsel/gsel_selector_round_robin.go b/net/gsel/gsel_selector_round_robin.go index 5c73ca42a..bbc06b754 100644 --- a/net/gsel/gsel_selector_round_robin.go +++ b/net/gsel/gsel_selector_round_robin.go @@ -37,6 +37,9 @@ func (s *selectorRoundRobin) Update(ctx context.Context, nodes Nodes) error { func (s *selectorRoundRobin) Pick(ctx context.Context) (node Node, done DoneFunc, err error) { s.mu.Lock() defer s.mu.Unlock() + if len(s.nodes) == 0 { + return + } node = s.nodes[s.next] s.next = (s.next + 1) % len(s.nodes) intlog.Printf(ctx, `Picked node: %s`, node.Address())