commit
f2778b6436
@ -0,0 +1,244 @@
|
||||
package consul
|
||||
|
||||
import (
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rpcxio/libkv"
|
||||
"github.com/rpcxio/libkv/store"
|
||||
"github.com/rpcxio/libkv/store/consul"
|
||||
"github.com/smallnest/rpcx/client"
|
||||
"github.com/smallnest/rpcx/log"
|
||||
)
|
||||
|
||||
func init() {
|
||||
consul.Register()
|
||||
}
|
||||
|
||||
// ConsulDiscovery is a consul service discovery.
|
||||
// It always returns the registered servers in consul.
|
||||
type ConsulDiscovery struct {
|
||||
basePath string
|
||||
kv store.Store
|
||||
pairsMu sync.RWMutex
|
||||
pairs []*client.KVPair
|
||||
chans []chan []*client.KVPair
|
||||
mu sync.Mutex
|
||||
// -1 means it always retry to watch until zookeeper is ok, 0 means no retry.
|
||||
RetriesAfterWatchFailed int
|
||||
|
||||
filter client.ServiceDiscoveryFilter
|
||||
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
// NewConsulDiscovery returns a new ConsulDiscovery.
|
||||
func NewConsulDiscovery(basePath, servicePath string, consulAddr []string, options *store.Config) (*ConsulDiscovery, error) {
|
||||
kv, err := libkv.NewStore(store.CONSUL, consulAddr, options)
|
||||
if err != nil {
|
||||
log.Infof("cannot create store: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewConsulDiscoveryStore(basePath+"/"+servicePath, kv)
|
||||
}
|
||||
|
||||
// NewConsulDiscoveryStore returns a new ConsulDiscovery with specified store.
|
||||
func NewConsulDiscoveryStore(basePath string, kv store.Store) (*ConsulDiscovery, error) {
|
||||
if basePath[0] == '/' {
|
||||
basePath = basePath[1:]
|
||||
}
|
||||
|
||||
if len(basePath) > 1 && strings.HasSuffix(basePath, "/") {
|
||||
basePath = basePath[:len(basePath)-1]
|
||||
}
|
||||
|
||||
d := &ConsulDiscovery{basePath: basePath, kv: kv}
|
||||
d.stopCh = make(chan struct{})
|
||||
|
||||
ps, err := kv.List(basePath)
|
||||
if err != nil && err != store.ErrKeyNotFound {
|
||||
log.Infof("cannot get services of from registry: %v, err: %v", basePath, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pairs := make([]*client.KVPair, 0, len(ps))
|
||||
prefix := d.basePath + "/"
|
||||
for _, p := range ps {
|
||||
if path.Dir(p.Key) != d.basePath {
|
||||
continue
|
||||
}
|
||||
k := strings.TrimPrefix(p.Key, prefix)
|
||||
pair := &client.KVPair{Key: k, Value: string(p.Value)}
|
||||
if d.filter != nil && !d.filter(pair) {
|
||||
continue
|
||||
}
|
||||
pairs = append(pairs, pair)
|
||||
}
|
||||
d.pairsMu.Lock()
|
||||
d.pairs = pairs
|
||||
d.pairsMu.Unlock()
|
||||
d.RetriesAfterWatchFailed = -1
|
||||
go d.watch()
|
||||
return d, nil
|
||||
}
|
||||
|
||||
// NewConsulDiscoveryTemplate returns a new ConsulDiscovery template.
|
||||
func NewConsulDiscoveryTemplate(basePath string, consulAddr []string, options *store.Config) (*ConsulDiscovery, error) {
|
||||
if basePath[0] == '/' {
|
||||
basePath = basePath[1:]
|
||||
}
|
||||
|
||||
if len(basePath) > 1 && strings.HasSuffix(basePath, "/") {
|
||||
basePath = basePath[:len(basePath)-1]
|
||||
}
|
||||
|
||||
kv, err := libkv.NewStore(store.CONSUL, consulAddr, options)
|
||||
if err != nil {
|
||||
log.Infof("cannot create store: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewConsulDiscoveryStore(basePath, kv)
|
||||
}
|
||||
|
||||
// Clone clones this ServiceDiscovery with new servicePath.
|
||||
func (d *ConsulDiscovery) Clone(servicePath string) (client.ServiceDiscovery, error) {
|
||||
return NewConsulDiscoveryStore(d.basePath+"/"+servicePath, d.kv)
|
||||
}
|
||||
|
||||
// SetFilter sets the filer.
|
||||
func (d *ConsulDiscovery) SetFilter(filter client.ServiceDiscoveryFilter) {
|
||||
d.filter = filter
|
||||
}
|
||||
|
||||
// GetServices returns the servers
|
||||
func (d *ConsulDiscovery) GetServices() []*client.KVPair {
|
||||
d.pairsMu.RLock()
|
||||
defer d.pairsMu.RUnlock()
|
||||
return d.pairs
|
||||
}
|
||||
|
||||
// WatchService returns a nil chan.
|
||||
func (d *ConsulDiscovery) WatchService() chan []*client.KVPair {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
ch := make(chan []*client.KVPair, 10)
|
||||
d.chans = append(d.chans, ch)
|
||||
return ch
|
||||
}
|
||||
|
||||
func (d *ConsulDiscovery) RemoveWatcher(ch chan []*client.KVPair) {
|
||||
d.mu.Lock()
|
||||
defer d.mu.Unlock()
|
||||
|
||||
var chans []chan []*client.KVPair
|
||||
for _, c := range d.chans {
|
||||
if c == ch {
|
||||
continue
|
||||
}
|
||||
|
||||
chans = append(chans, c)
|
||||
}
|
||||
|
||||
d.chans = chans
|
||||
}
|
||||
|
||||
func (d *ConsulDiscovery) watch() {
|
||||
defer func() {
|
||||
d.kv.Close()
|
||||
}()
|
||||
for {
|
||||
var err error
|
||||
var c <-chan []*store.KVPair
|
||||
var tempDelay time.Duration
|
||||
|
||||
retry := d.RetriesAfterWatchFailed
|
||||
for d.RetriesAfterWatchFailed < 0 || retry >= 0 {
|
||||
c, err = d.kv.WatchTree(d.basePath, d.stopCh)
|
||||
if err != nil {
|
||||
if d.RetriesAfterWatchFailed > 0 {
|
||||
retry--
|
||||
}
|
||||
if tempDelay == 0 {
|
||||
tempDelay = 1 * time.Second
|
||||
} else {
|
||||
tempDelay *= 2
|
||||
}
|
||||
if max := 30 * time.Second; tempDelay > max {
|
||||
tempDelay = max
|
||||
}
|
||||
log.Warnf("can not watchtree (with retry %d, sleep %v): %s: %v", retry, tempDelay, d.basePath, err)
|
||||
time.Sleep(tempDelay)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("can't watch %s: %v", d.basePath, err)
|
||||
return
|
||||
}
|
||||
|
||||
prefix := d.basePath + "/"
|
||||
|
||||
readChanges:
|
||||
for {
|
||||
select {
|
||||
case <-d.stopCh:
|
||||
log.Info("discovery has been closed")
|
||||
return
|
||||
case ps, ok := <-c:
|
||||
if !ok {
|
||||
break readChanges
|
||||
}
|
||||
var pairs []*client.KVPair // latest servers
|
||||
if ps == nil {
|
||||
d.pairsMu.Lock()
|
||||
d.pairs = pairs
|
||||
d.pairsMu.Unlock()
|
||||
continue
|
||||
}
|
||||
for _, p := range ps {
|
||||
if !strings.HasPrefix(p.Key, prefix) { // avoid prefix issue of consul List
|
||||
continue
|
||||
}
|
||||
k := strings.TrimPrefix(p.Key, prefix)
|
||||
pair := &client.KVPair{Key: k, Value: string(p.Value)}
|
||||
if d.filter != nil && !d.filter(pair) {
|
||||
continue
|
||||
}
|
||||
pairs = append(pairs, pair)
|
||||
}
|
||||
d.pairsMu.Lock()
|
||||
d.pairs = pairs
|
||||
d.pairsMu.Unlock()
|
||||
|
||||
d.mu.Lock()
|
||||
for _, ch := range d.chans {
|
||||
ch := ch
|
||||
go func() {
|
||||
defer func() {
|
||||
recover()
|
||||
}()
|
||||
select {
|
||||
case ch <- pairs:
|
||||
case <-time.After(time.Minute):
|
||||
log.Warn("chan is full and new change has been dropped")
|
||||
}
|
||||
}()
|
||||
}
|
||||
d.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
log.Warn("chan is closed and will rewatch")
|
||||
}
|
||||
}
|
||||
|
||||
func (d *ConsulDiscovery) Close() {
|
||||
close(d.stopCh)
|
||||
}
|
Loading…
Reference in new issue