etcd:Go API 学习

etcd go API客户端下载:https://github.com/etcd-io/etcd/tree/master/clientv3godoc 文档

connect

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)

func main() {
var (
config clientv3.Config
client *clientv3.Client
err error
)

config = clientv3.Config{
Endpoints: []string{"http://127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}

if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
defer client.Close()

fmt.Println(client)
fmt.Println("connect successfully!")
}

put

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package main

import (
"context"
"fmt"
"time"

"go.etcd.io/etcd/clientv3"
)

func main() {
var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
putResp *clientv3.PutResponse
)

config = clientv3.Config{
Endpoints: []string{"http://127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}

if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
defer client.Close()

kv = clientv3.NewKV(client)
if putResp, err = kv.Put(context.TODO(), "/cron/jobs/job1", "byte", clientv3.WithPrevKV()); err != nil {
fmt.Println(err)
} else {
fmt.Println(putResp.Header.Revision)
if putResp.PrevKv != nil {
fmt.Println("PrevValue", string(putResp.PrevKv.Value))
}
}
}

get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package main

import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)

func main() {
var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
getResp *clientv3.GetResponse
)

config = clientv3.Config{
Endpoints: []string{"http://127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}

if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
defer client.Close()

kv = clientv3.NewKV(client)

if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job1"); err != nil {
fmt.Println(err)
return
} else {
fmt.Println(getResp.Kvs, getResp.Count)
}
}

遍历目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)

func main() {
var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
getResp *clientv3.GetResponse
putResp *clientv3.PutResponse
)

config = clientv3.Config{
Endpoints: []string{"http://127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}

if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
defer client.Close()

kv = clientv3.NewKV(client)

if putResp, err = kv.Put(context.TODO(), "/cron/jobs/job0", "job0"); err != nil {
fmt.Println(err)
return
} else {
fmt.Println(putResp.Header.Revision)
}

// 使用 withPrefix 遍历目录
if getResp, err = kv.Get(context.TODO(), "/cron/jobs", clientv3.WithPrefix()); err != nil {
fmt.Println(err)
return
} else {
fmt.Println(getResp.Kvs, getResp.Count)
}
}

delete

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package main

import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)

func main() {
var (
config clientv3.Config
client *clientv3.Client
err error
delResp *clientv3.DeleteResponse
putResp *clientv3.PutResponse
kv clientv3.KV
)

config = clientv3.Config{
Endpoints: []string{"http://127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}

if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
defer client.Close()

kv = clientv3.NewKV(client)

if putResp, err = kv.Put(context.TODO(), "/cron/jobs/job0", "job0"); err != nil {
fmt.Println(err)
return
} else {
fmt.Println(putResp.Header.Revision)
}

// 如果要删除多个key,可以考虑使用 clientv3.WithPrefix() 选项
if delResp, err = kv.Delete(context.TODO(), "/cron/jobs/job0", clientv3.WithPrevKV()); err != nil {
fmt.Println(err)
return
}

if len(delResp.PrevKvs) != 0 {
for _, keyPair := range delResp.PrevKvs {
fmt.Println(string(keyPair.Value), keyPair.CreateRevision, keyPair.ModRevision)
}
}
}

lease

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package main

import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)

func main() {
var (
config clientv3.Config
client *clientv3.Client
err error
lease clientv3.Lease
leaseGrantResp *clientv3.LeaseGrantResponse
putResp *clientv3.PutResponse
leaseId clientv3.LeaseID
kv clientv3.KV
getResp *clientv3.GetResponse
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
keepResp *clientv3.LeaseKeepAliveResponse
)

config = clientv3.Config{
Endpoints: []string{"http://127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}

if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
defer client.Close()

lease = clientv3.NewLease(client)

if leaseGrantResp, err = lease.Grant(context.TODO(), 10); err != nil {
fmt.Println(err)
return
}

leaseId = leaseGrantResp.ID

// 自动续租
ctx, _ := context.WithTimeout(context.TODO(), 5*time.Second)
if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
fmt.Println(err)
return
}

go func() {
for {
select {
case keepResp = <-keepRespChan:
if keepResp == nil {
fmt.Println("租约已尽失效了")
goto END
} else {
fmt.Println("续租正常", keepResp.ID)
}
}
}
END:
}()
kv = clientv3.NewKV(client)

// 使用 leaseid 创建
if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "hello world", clientv3.WithLease(leaseId)); err != nil {
fmt.Println(err)
return
}
fmt.Println("写入成功", putResp.Header.Revision)

// 检查是否过期
for {
if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1", clientv3.WithCountOnly()); err != nil {
fmt.Println(err)
return
}
if getResp.Count == 0 {
fmt.Println("/cron/lock/job1过期了")
}
time.Sleep(2 * time.Second)
}
}

watch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package main

import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/mvcc/mvccpb"
"time"
)

func main() {
var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
getResp *clientv3.GetResponse
watchStartRevision int64
watcher clientv3.Watcher
watchRespChan clientv3.WatchChan
watchResp clientv3.WatchResponse
ctx context.Context
cancelFunc context.CancelFunc
)

config = clientv3.Config{
Endpoints: []string{"http://127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}

if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
defer client.Close()

kv = clientv3.NewKV(client)
go func() {
for {
kv.Put(context.TODO(), "/cron/jobs/jpb7", "i am job7")
kv.Delete(context.TODO(), "/cron/jobs/jpb7")
time.Sleep(1 * time.Second)
}
}()

if getResp, err = kv.Get(context.TODO(), "/cron/jobs/jpb7"); err != nil {
fmt.Println(err)
return
}
if len(getResp.Kvs) != 0 {
fmt.Println("当前值", string(getResp.Kvs[0].Value))
}

watchStartRevision = getResp.Header.Revision + 1

watcher = clientv3.NewWatcher(client)
fmt.Println("从该版本向后监听", watchStartRevision)

ctx, cancelFunc = context.WithCancel(context.TODO())
watchRespChan = watcher.Watch(ctx, "/cron/jobs/jpb7", clientv3.WithRev(watchStartRevision))
time.AfterFunc(5*time.Second, func() {
cancelFunc()
})

for watchResp = range watchRespChan {
for _, event := range watchResp.Events {
switch event.Type {
case mvccpb.PUT:
fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
case mvccpb.DELETE:
fmt.Println("删除了", event.Kv.ModRevision)
}
}
}
}

op

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)

func main() {
var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
putOp, getOp clientv3.Op
opResp clientv3.OpResponse
)

config = clientv3.Config{
Endpoints: []string{"http://127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}

if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
defer client.Close()

kv = clientv3.NewKV(client)

putOp = clientv3.OpPut("/cron/job3/job8", "job8")
if opResp, err = kv.Do(context.TODO(), putOp); err != nil {
fmt.Println(err)
return
}

fmt.Println("写入Revision:", opResp.Put().Header.Revision)

getOp = clientv3.OpGet("/cron/job3/job8")
if opResp, err = kv.Do(context.TODO(), getOp); err != nil {
fmt.Println(err)
return
}
fmt.Println("数据修改版本:", opResp.Get().Kvs[0].ModRevision)
fmt.Println("数据Value:", string(opResp.Get().Kvs[0].Value))
}

txn

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package main

import (
"context"
"fmt"
"go.etcd.io/etcd/clientv3"
"time"
)

func main() {
var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
lease clientv3.Lease
leaseId clientv3.LeaseID
leaseGrantResp *clientv3.LeaseGrantResponse
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
keepResp *clientv3.LeaseKeepAliveResponse
ctx context.Context
cancelFunc context.CancelFunc
txn clientv3.Txn
txnResp *clientv3.TxnResponse
)
config = clientv3.Config{
Endpoints: []string{"http://127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}

if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}
defer client.Close()

lease = clientv3.NewLease(client)
if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
fmt.Println(err)
return
}

leaseId = leaseGrantResp.ID
// 自动续租
ctx, cancelFunc = context.WithCancel(context.TODO())
defer cancelFunc()
defer lease.Revoke(context.TODO(), leaseId)
if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
fmt.Println(err)
return
}

go func() {
for {
select {
case keepResp = <-keepRespChan:
if keepResp == nil {
fmt.Println("租约已尽失效了")
goto END
} else {
fmt.Println("续租正常", keepResp.ID)
}
}
}
END:
}()

kv = clientv3.NewKV(client)
txn = kv.Txn(context.TODO())
txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)).
Then(clientv3.OpPut("/cron/lock/job9", "", clientv3.WithLease(leaseId))).
Else(clientv3.OpGet("/cron/lock/job9"))
if txnResp, err = txn.Commit(); err != nil {
fmt.Println(err)
return
}

// 是否抢到锁
if !txnResp.Succeeded {
fmt.Println("锁被占用", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
return
}

// 处理任务
fmt.Println("处理任务中")
time.Sleep(10 * time.Second)
}