package client import ( "context" "errors" "fmt" "git.oa00.com/supply-chain/service/config" consulClient "github.com/rpcxio/rpcx-consul/client" "github.com/smallnest/rpcx/client" "log" "reflect" "strings" "sync" "time" ) var mClient = sync.Map{} var mutex = sync.Mutex{} var basePkgPath string type empty int func init() { pkgPath := reflect.TypeOf(empty(0)).PkgPath() basePkgPath = pkgPath[:len(pkgPath)-6] } type RpcClient struct { baseName string serviceName string client client.XClient } // Call @Title 调用接口 func (r RpcClient) Call(ctx context.Context, serviceMethod string, args interface{}, reply interface{}) (err error) { if config.RpcConfig.BeforeHandel != nil { ctx, err = config.RpcConfig.BeforeHandel(ctx, r.baseName, r.serviceName, serviceMethod, args, reply) if err != nil { return err } } // 调用请求 err = r.client.Call(ctx, serviceMethod, args, reply) if config.RpcConfig.AfterHandel != nil { err = config.RpcConfig.AfterHandel(ctx, r.baseName, r.serviceName, serviceMethod, args, reply, err) } return err } // GetClient @Title 获取RPC客户的 func GetClient(s interface{}) (*RpcClient, error) { path := strings.TrimPrefix(reflect.ValueOf(s).Elem().Type().PkgPath(), basePkgPath) actionName := reflect.ValueOf(s).Elem().Type().Name() key := path + "/" + actionName split := strings.SplitN(key, "/", 2) basePath := split[0] servicePath := split[1] xClient, ok := mClient.Load(key) if !ok { mutex.Lock() xClient, ok = mClient.Load(key) if !ok { d, err := consulClient.NewConsulDiscovery(basePath, servicePath, config.RpcConfig.RegistryServer, nil) if err != nil { return nil, errors.New("系统异常") } option := client.DefaultOption option.Retries = 3 option.GenBreaker = func() client.Breaker { return client.NewConsecCircuitBreaker(2, 30*time.Second) } xClient = client.NewXClient(servicePath, client.Failover, client.RoundRobin, d, option) mClient.Store(key, xClient) } mutex.Unlock() } return &RpcClient{client: xClient.(client.XClient), baseName: basePath, serviceName: servicePath}, nil } // GetClientName @Title 根据服务名获取客户端 func GetClientName(base, service string) (*RpcClient, error) { key := fmt.Sprintf("%s/%s", base, service) xClient, ok := mClient.Load(key) if !ok { mutex.Lock() xClient, ok = mClient.Load(key) if !ok { d, err := consulClient.NewConsulDiscovery(base, service, config.RpcConfig.RegistryServer, nil) if err != nil { log.Println(err) return nil, errors.New("系统异常") } option := client.DefaultOption option.Retries = 3 option.GenBreaker = func() client.Breaker { return client.NewConsecCircuitBreaker(2, 30*time.Second) } xClient = client.NewXClient(service, client.Failover, client.RoundRobin, d, option) mClient.Store(key, xClient) } mutex.Unlock() } return &RpcClient{client: xClient.(client.XClient), baseName: base, serviceName: service}, nil }