etcd 安装部署及常见使用场景

    xiaoxiao2021-03-25  125

    安装 ETCD_VER=v3.1.2  DOWNLOAD_URL=https://github.com/coreos/etcd/releases/download  curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz mkdir -p /tmp/test-etcd && tar xzvf /tmp/etcd-${ETCD_VER}-linux-amd64.tar.gz -C /tmp/test-etcd --strip-components=1 /tmp/test-etcd/etcd --version 启动 /tmp/test-etcd/etcd 启动后默认监听2379端口 进入交互模式 export ETCDCTL_API=3 #用3这个版本的api ./etcdctl put foo bar  ./etcdctl get foo 集群搭建 下面将在以下三台主机上搭建etcd集群, node0 10.16.77.95 node1 10.16.77.94 node2 10.16.77.93 配置项说明: --name etcd集群中的节点名,这里可以随意,可区分且不重复就行   --listen-peer-urls 监听的用于节点之间通信的url,可监听多个,集群内部将通过这些url进行数据交互(如选举,数据同步等) --initial-advertise-peer-urls 建议用于节点之间通信的url,节点间将以该值进行通信。 --listen-client-urls 监听的用于客户端通信的url,同样可以监听多个。 --advertise-client-urls 建议使用的客户端通信url,该值用于etcd代理或etcd成员与etcd节点通信。 --initial-cluster-token etcd-cluster-1 节点的token值,设置该值后集群将生成唯一id,并为每个节点也生成唯一id,当使用相同配置文件再启动一个集群时,只要该token值不一样,etcd集群就不会相互影响。 --initial-cluster 也就是集群中所有的initial-advertise-peer-urls 的合集 --initial-cluster-state new 新建集群的标志 如下的配置信息分别启动三个etcd ./etcd --name node0 \ --listen-peer-urls http://10.16.77.95:2380 \ --listen-client-urls http://10.16.77.95:2379,http://127.0.0.1:2379 \ --initial-advertise-peer-urls http://10.16.77.95:2380 \ --advertise-client-urls http://10.16.77.95:2379 \ --initial-cluster-token etcd-cluster-1 \ --initial-cluster node0=http://10.16.77.95:2380,node1=http://10.16.77.94:2380,node2=http://10.16.77.93:2380 \ --initial-cluster-state new ./etcd --name node1 \ --listen-peer-urls http://10.16.77.94:2380 \ --listen-client-urls http://10.16.77.94:2379,http://127.0.0.1:2379 \ --initial-advertise-peer-urls http://10.16.77.94:2380 \ --advertise-client-urls http://10.16.77.94:2379 \ --initial-cluster-token etcd-cluster-1 \ --initial-cluster node0=http://10.16.77.95:2380,node1=http://10.16.77.94:2380,node2=http://10.16.77.93:2380 \ --initial-cluster-state new ./etcd --name node2 \ --listen-peer-urls http://10.16.77.93:2380 \ --listen-client-urls http://10.16.77.93:2379,http://127.0.0.1:2379 \ --initial-advertise-peer-urls http://10.16.77.93:2380 \ --advertise-client-urls http://10.16.77.93:2379 \ --initial-cluster-token etcd-cluster-1 \ --initial-cluster node0=http://10.16.77.95:2380,node1=http://10.16.77.94:2380,node2=http://10.16.77.93:2380 \ --initial-cluster-state new 按如上配置分别启动集群,启动集群后,将会进入集群选举状态,若出现大量超时,则需要检查主机的防火墙是否关闭,或主机之间是否能通过2380端口通信,集群建立后通过以下命令检查集群状态。 查看集群成员 ./etcdctl  member list 判断leader和followers curl http://127.0.0.1:2379/v2/stats/leader 基本操作 版本 curl -L http://127.0.0.1:2379/version 设定键值 etcdctl put /message Hello etcdctl get /message curl -L -X PUT http://127.0.0.1:2379/v2/keys/k1 -d value="v1" curl -L http://127.0.0.1:2379/v2/keys/k1 删除key etcdctl del /message curl -L -X DELETE http://127.0.0.1:2379/v2/keys/message

     

    一些使用场景:

    下面的代码基于github.com/coreos/etcd, 拷贝到$GOPATH/github.com/etcd

     

    key的读写

     

    package main import ( //"net/http" "github.com/etcd/client" "golang.org/x/net/context" "log" "time" ) func main() { //创建client(协程安全) cfg := client.Config{Endpoints:[]string{"http://10.16.77.94:2379"},Transport: client.DefaultTransport} c, err := client.New(cfg) if err != nil { log.Fatalln(err) } kAPI := client.NewKeysAPI(c) //create _, err = kAPI.Create(context.Background(), "/foo1", "bar1") if err != nil { log.Println(err) } bg := context.Background() //设置key _, err = kAPI.Set(bg, "/foo1", "bar3", nil) if err != nil { log.Println(err) } //获取key值 resp, err := kAPI.Get(bg, "/foo1", nil) if err != nil { log.Println(err) } else { log.Println("value:", resp.Node.Value) log.Println("ttl:", resp.Node.TTL) } //设置ttl opt := client.SetOptions{TTL:time.Duration(300)*time.Second} _, err = kAPI.Set(bg, "/foo1", "bar4", &opt) if err != nil { log.Println(err) } resp, err = kAPI.Get(bg, "/foo1", nil) if err != nil { log.Println(err) } else { log.Println("value:", resp.Node.Value) log.Println("ttl:", resp.Node.TTL) } //删除key _, err = kAPI.Delete(bg, "/foo1", nil) if err != nil { log.Println("del key failed:", err) } resp, err = kAPI.Get(bg, "/foo1", nil) if err != nil { log.Println("after del, error:", err) } else { log.Println("after del, value:", resp.Node.Value) } }

     

     

    服务发现:

     

     

    代码的思路很简单, worker启动时向etcd注册自己的信息,并设置一个过期时间TTL,每隔一段时间更新这个TTL,如果该worker挂掉了,这个TTL就会expire. master则监听 workers/ 这个etcd directory, 根据检测到的不同action来增加, 更新, 或删除worker.

     

    package main import ( "github.com/etcd/client" "golang.org/x/net/context" "log" "time" ) type Worker struct { Addr string Api client.KeysAPI Key string Interval time.Duration TTL time.Duration } func NewWorker(addr string, api client.KeysAPI, key string, interval time.Duration, ttl time.Duration) *Worker { w := &Worker{Addr:addr, Api:api, Key:key, Interval:interval, TTL:ttl} go w.HeartBeat() return w } func (this *Worker) HeartBeat() { for { key := this.Key + this.Addr _, err := this.Api.Set(context.Background(), key, this.Addr, &client.SetOptions{TTL:this.TTL}) if err != nil { log.Println("worker set failed:", err) } else { log.Println("worker set once") } time.Sleep(this.Interval) } } func (this *Worker) Work() { for { //do something time.Sleep(this.Interval) } } type Master struct { M map[string]string Api client.KeysAPI Key string Interval time.Duration } func NewMaster(api client.KeysAPI, key string, interval time.Duration) *Master { m := &Master{M:map[string]string{}, Api:api, Key:key, Interval:interval} go m.WatchWorkers() return m } func (this *Master) WatchWorkers() { watcher := this.Api.Watcher(this.Key, &client.WatcherOptions{Recursive: true}) for { time.Sleep(this.Interval) res, err := watcher.Next(context.Background()) if err != nil { log.Println("master watch worker failed:", err) continue } value := res.Node.Value var prevalue string if res.PrevNode != nil { prevalue = res.PrevNode.Value } log.Println("master new event, action:", res.Action, "value:", value, "prevalue:", prevalue) if res.Action == "set" { this.M[value] = "" } if res.Action == "update" { delete(this.M, prevalue) this.M[value] = "" } if res.Action == "delete" || res.Action == "expire" { delete(this.M, prevalue) } } } func (this *Master) PrintWorkers() { //log.Println("workers:", this.WorkerAddrs) for { time.Sleep(time.Duration(1)*time.Second) log.Println(this.M) } } func startWorker() { cfg := client.Config{Endpoints:[]string{"http://10.16.77.94:2379", "http://10.16.77.93:2379"},Transport: client.DefaultTransport} wrokerClient, err := client.New(cfg) if err != nil { log.Fatalln("create worker client failed:", err) } wrokerApi := client.NewKeysAPI(wrokerClient) interval := time.Duration(1) * time.Second ttl := time.Duration(10) * time.Second worker := NewWorker("127.0.0.1:80", wrokerApi, "workers/", interval, ttl) worker.Work() } func startMaster() { cfg := client.Config{Endpoints:[]string{"http://10.16.77.94:2379", "http://10.16.77.93:2379"},Transport: client.DefaultTransport} masterClient, err := client.New(cfg) if err != nil { log.Fatalln("create master client failed:", err) } masterApi := client.NewKeysAPI(masterClient) interval := time.Duration(0) * time.Second m := NewMaster(masterApi, "workers/", interval) m.PrintWorkers() } func main() { go startWorker() go startMaster() select{} }

     

    发布订阅:

     

     下面的代码就是一个简单的发布/订阅的实现。集群中每台机器都用同一套配置文件,在启动阶段,都会到该节点上获取配置信息,同时客户端还需要在在节点注册一个数据变更的watcher监听,一旦配置文件发生变更,就会受到通知信息。

     

    package main import ( "github.com/etcd/client" "golang.org/x/net/context" "log" "time" "sync" "fmt" ) type App struct { Api client.KeysAPI Key string Config string ConfigLock sync.RWMutex } func NewApp(api client.KeysAPI, key string) *App { app := &App{Api:api, Key:key} res, err := app.Api.Get(context.Background(), app.Key, nil) if err == nil { app.Config = res.Node.Value } go app.WatchConfig() return app } func (this *App) WatchConfig() { watcher := this.Api.Watcher(this.Key, &client.WatcherOptions{Recursive: true}) for { res, err := watcher.Next(context.Background()) if err != nil { log.Println("watch config failed:", err) continue } if res.Action == "set" || res.Action == "update" { this.ConfigLock.Lock() this.Config = res.Node.Value this.ConfigLock.Unlock() log.Println("config update once") } } } func (this *App) Run() { for { time.Sleep(time.Duration(1)*time.Second) this.ConfigLock.RLock() log.Println("config:", this.Config) this.ConfigLock.RUnlock() } } func startApp() { cfg := client.Config{Endpoints:[]string{"http://10.16.77.94:2379", "http://10.16.77.93:2379"},Transport: client.DefaultTransport} c, err := client.New(cfg) if err != nil { log.Fatalln("create client failed:", err) } api := client.NewKeysAPI(c) app := NewApp(api, "config") app.Run() } type ConfManager struct { Api client.KeysAPI } func NewConfManager(api client.KeysAPI) *ConfManager { cm := &ConfManager{Api:api} return cm } func (this *ConfManager) UpdateConfig(key string, config string) error{ _, err := this.Api.Set(context.Background(), key, config, nil) return err } func updateConfig() { cfg := client.Config{Endpoints:[]string{"http://10.16.77.94:2379", "http://10.16.77.93:2379"},Transport: client.DefaultTransport} c, err := client.New(cfg) if err != nil { log.Fatalln("create client failed:", err) } api := client.NewKeysAPI(c) confManager := NewConfManager(api) var i int for i=0; i<100; i++ { time.Sleep(time.Duration(2)*time.Second) confManager.UpdateConfig("config", fmt.Sprintf("%d", i)) } } func main() { go startApp() go updateConfig() select{} }

     

     

     

    分布式通知与协调:

    下面的代码等到所有work执行完毕再执行进一步操作。

     

    package main import ( "github.com/etcd/client" "golang.org/x/net/context" "log" "time" "fmt" ) type Worker struct { Hostname string Endpoints []string Api client.KeysAPI Key string SleepDuration time.Duration } func NewWorker(hostname string, endpoints []string, key string, duration time.Duration) (*Worker, error) { cfg := client.Config{Endpoints:endpoints,Transport: client.DefaultTransport} c, err := client.New(cfg) if err != nil { return nil, err } api := client.NewKeysAPI(c) w := &Worker{Hostname:hostname, Endpoints:endpoints, Key:key, Api:api, SleepDuration:duration} return w, nil } func (this *Worker) Work() { var i int for i=1; i<=20; i++ { time.Sleep(this.SleepDuration) this.Api.Set(context.Background(), this.Key + this.Hostname, fmt.Sprintf("%d", i), nil) log.Println("worker:", this.Hostname, "set to", fmt.Sprintf("%d", i)) } } func waitWorkesDone() { cfg := client.Config{Endpoints:[]string{"http://10.16.77.94:2379", "http://10.16.77.93:2379"},Transport: client.DefaultTransport} c, err := client.New(cfg) if err != nil { log.Fatalln("create client failed:", err) } api := client.NewKeysAPI(c) key := "workerProcess/" watcher := api.Watcher(key, &client.WatcherOptions{Recursive: true}) m := map[string]string{} for { res, err := watcher.Next(context.Background()) if err != nil { log.Println("watch failed:", err) continue } if res.Action == "set" || res.Action == "update" { m[res.Node.Key] = res.Node.Value neqTag := false for _, v := range m { if v != "20" { neqTag = true break } } if neqTag == false{ log.Println("all done") return } } } } func main() { endpoints := []string{"http://10.16.77.94:2379", "http://10.16.77.93:2379"} w1, _:= NewWorker("hostname1", endpoints, "workerProcess/", time.Duration(1)*time.Second) w2, _:= NewWorker("hostname2", endpoints, "workerProcess/", time.Duration(2)*time.Second) w3, _:= NewWorker("hostname3", endpoints, "workerProcess/", time.Duration(3)*time.Second) go w1.Work() go w2.Work() go w3.Work() waitWorkesDone() }

     

    分布式锁:

     

    实现思路:通过设置set操作的PrevExist值,保证多个节点同时创建目录时,只有一个成功,即该用户获得了锁。并设置ttl防止连接丢失造成死锁。

     

     

    package main import ( "github.com/etcd/client" "golang.org/x/net/context" "log" "time" ) type Worker struct { Hostname string Endpoints []string Api client.KeysAPI Key string SleepDuration time.Duration } func NewWorker(hostname string, endpoints []string, key string, duration time.Duration) (*Worker, error) { cfg := client.Config{Endpoints:endpoints,Transport: client.DefaultTransport} c, err := client.New(cfg) if err != nil { return nil, err } api := client.NewKeysAPI(c) w := &Worker{Hostname:hostname, Endpoints:endpoints, Key:key, Api:api, SleepDuration:duration} return w, nil } func (this *Worker) Work() { duration := time.Duration(10)*time.Second for { _, err := this.Api.Set(context.Background(), this.Key, "", &client.SetOptions{PrevExist:"false", TTL:duration}) if err != nil { continue } log.Println("hostname:", this.Hostname, "lock succed") time.Sleep(this.SleepDuration) log.Println("hostname:", this.Hostname, "exec finish") this.Api.Delete(context.Background(), this.Key, nil) } } func main() { endpoints := []string{"http://10.16.77.94:2379", "http://10.16.77.93:2379"} w1, _:= NewWorker("hostname1", endpoints, "lock", time.Duration(1)*time.Second) w2, _:= NewWorker("hostname2", endpoints, "lock", time.Duration(2)*time.Second) w3, _:= NewWorker("hostname3", endpoints, "lock", time.Duration(3)*time.Second) go w1.Work() go w2.Work() go w3.Work() select{} }

     

     

     

     

     

     

     

     

    转载请注明原文地址: https://ju.6miu.com/read-5270.html

    最新回复(0)