You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
service/lib/consul/consul_discovery.go

245 lines
5.5 KiB

package consul
import (
"path"
"strings"
"sync"
"time"
"github.com/rpcxio/libkv"
"github.com/rpcxio/libkv/store"
"github.com/rpcxio/libkv/store/consul"
"github.com/smallnest/rpcx/client"
"github.com/smallnest/rpcx/log"
)
func init() {
consul.Register()
}
// ConsulDiscovery is a consul service discovery.
// It always returns the registered servers in consul.
type ConsulDiscovery struct {
basePath string
kv store.Store
pairsMu sync.RWMutex
pairs []*client.KVPair
chans []chan []*client.KVPair
mu sync.Mutex
// -1 means it always retry to watch until zookeeper is ok, 0 means no retry.
RetriesAfterWatchFailed int
filter client.ServiceDiscoveryFilter
stopCh chan struct{}
}
// NewConsulDiscovery returns a new ConsulDiscovery.
func NewConsulDiscovery(basePath, servicePath string, consulAddr []string, options *store.Config) (*ConsulDiscovery, error) {
kv, err := libkv.NewStore(store.CONSUL, consulAddr, options)
if err != nil {
log.Infof("cannot create store: %v", err)
return nil, err
}
return NewConsulDiscoveryStore(basePath+"/"+servicePath, kv)
}
// NewConsulDiscoveryStore returns a new ConsulDiscovery with specified store.
func NewConsulDiscoveryStore(basePath string, kv store.Store) (*ConsulDiscovery, error) {
if basePath[0] == '/' {
basePath = basePath[1:]
}
if len(basePath) > 1 && strings.HasSuffix(basePath, "/") {
basePath = basePath[:len(basePath)-1]
}
d := &ConsulDiscovery{basePath: basePath, kv: kv}
d.stopCh = make(chan struct{})
ps, err := kv.List(basePath)
if err != nil && err != store.ErrKeyNotFound {
log.Infof("cannot get services of from registry: %v, err: %v", basePath, err)
return nil, err
}
pairs := make([]*client.KVPair, 0, len(ps))
prefix := d.basePath + "/"
for _, p := range ps {
if path.Dir(p.Key) != d.basePath {
continue
}
k := strings.TrimPrefix(p.Key, prefix)
pair := &client.KVPair{Key: k, Value: string(p.Value)}
if d.filter != nil && !d.filter(pair) {
continue
}
pairs = append(pairs, pair)
}
d.pairsMu.Lock()
d.pairs = pairs
d.pairsMu.Unlock()
d.RetriesAfterWatchFailed = -1
go d.watch()
return d, nil
}
// NewConsulDiscoveryTemplate returns a new ConsulDiscovery template.
func NewConsulDiscoveryTemplate(basePath string, consulAddr []string, options *store.Config) (*ConsulDiscovery, error) {
if basePath[0] == '/' {
basePath = basePath[1:]
}
if len(basePath) > 1 && strings.HasSuffix(basePath, "/") {
basePath = basePath[:len(basePath)-1]
}
kv, err := libkv.NewStore(store.CONSUL, consulAddr, options)
if err != nil {
log.Infof("cannot create store: %v", err)
return nil, err
}
return NewConsulDiscoveryStore(basePath, kv)
}
// Clone clones this ServiceDiscovery with new servicePath.
func (d *ConsulDiscovery) Clone(servicePath string) (client.ServiceDiscovery, error) {
return NewConsulDiscoveryStore(d.basePath+"/"+servicePath, d.kv)
}
// SetFilter sets the filer.
func (d *ConsulDiscovery) SetFilter(filter client.ServiceDiscoveryFilter) {
d.filter = filter
}
// GetServices returns the servers
func (d *ConsulDiscovery) GetServices() []*client.KVPair {
d.pairsMu.RLock()
defer d.pairsMu.RUnlock()
return d.pairs
}
// WatchService returns a nil chan.
func (d *ConsulDiscovery) WatchService() chan []*client.KVPair {
d.mu.Lock()
defer d.mu.Unlock()
ch := make(chan []*client.KVPair, 10)
d.chans = append(d.chans, ch)
return ch
}
func (d *ConsulDiscovery) RemoveWatcher(ch chan []*client.KVPair) {
d.mu.Lock()
defer d.mu.Unlock()
var chans []chan []*client.KVPair
for _, c := range d.chans {
if c == ch {
continue
}
chans = append(chans, c)
}
d.chans = chans
}
func (d *ConsulDiscovery) watch() {
defer func() {
d.kv.Close()
}()
for {
var err error
var c <-chan []*store.KVPair
var tempDelay time.Duration
retry := d.RetriesAfterWatchFailed
for d.RetriesAfterWatchFailed < 0 || retry >= 0 {
c, err = d.kv.WatchTree(d.basePath, d.stopCh)
if err != nil {
if d.RetriesAfterWatchFailed > 0 {
retry--
}
if tempDelay == 0 {
tempDelay = 1 * time.Second
} else {
tempDelay *= 2
}
if max := 30 * time.Second; tempDelay > max {
tempDelay = max
}
log.Warnf("can not watchtree (with retry %d, sleep %v): %s: %v", retry, tempDelay, d.basePath, err)
time.Sleep(tempDelay)
continue
}
break
}
if err != nil {
log.Errorf("can't watch %s: %v", d.basePath, err)
return
}
prefix := d.basePath + "/"
readChanges:
for {
select {
case <-d.stopCh:
log.Info("discovery has been closed")
return
case ps, ok := <-c:
if !ok {
break readChanges
}
var pairs []*client.KVPair // latest servers
if ps == nil {
d.pairsMu.Lock()
d.pairs = pairs
d.pairsMu.Unlock()
continue
}
for _, p := range ps {
if !strings.HasPrefix(p.Key, prefix) { // avoid prefix issue of consul List
continue
}
k := strings.TrimPrefix(p.Key, prefix)
pair := &client.KVPair{Key: k, Value: string(p.Value)}
if d.filter != nil && !d.filter(pair) {
continue
}
pairs = append(pairs, pair)
}
d.pairsMu.Lock()
d.pairs = pairs
d.pairsMu.Unlock()
d.mu.Lock()
for _, ch := range d.chans {
ch := ch
go func() {
defer func() {
recover()
}()
select {
case ch <- pairs:
case <-time.After(time.Minute):
log.Warn("chan is full and new change has been dropped")
}
}()
}
d.mu.Unlock()
}
}
log.Warn("chan is closed and will rewatch")
}
}
func (d *ConsulDiscovery) Close() {
close(d.stopCh)
}