• 宁陵县:法院家事的实践与创新 2019-03-24
  • 日照:前5个月日照税收比重居山东省第二位 2019-03-22
  • 武警部队“长城-2018”反恐国际论坛在京开幕 2019-03-22
  • 税费“红包”助推高质量发展 2019-03-21
  • 电影人共聚“微博电影之夜” 张艺谋:如果不当导演就当守门员 2019-03-21
  • 陕西拍摄到野化放归林麝活动影像 放归林麝已度过危险期 2019-03-15
  • 国宝级黄腹角雉住进三清山 2019-03-15
  • 王石田朴珺罕见亲密写真曝光 女方喂男方冰淇淋娇羞甜蜜 2018-11-22
  • 世界很多国家想拥有核弹,但迫于种种原因而没能实现。 2018-11-21
  • 人事 江西两设区市任免一批领导干部 2018-11-20
  • 自由的生活_软路由论坛

     找回密码
     注册

    QQ登录

    只需一步,快速开始

    搜索
    查看: 543|回复: 0
    打印 上一主题 下一主题

    排列五开奖结果: 分布式组件etcd应用-睿云智合技术漫谈分享

    [复制链接]
    跳转到指定楼层
    1#
    发表于 2018-11-14 18:22:40 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

    马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

    广西11选5历史 www.jqel.net 您需要 登录 才可以下载或查看,没有帐号?注册

    x

    一、搭建etcd环境

    我将使用 etcd v3版本, so,本地先建个单机版本ETCD环境。

    docker pull xieyanze/etcd3:latest
    docker run —name etcd-v3.0.9 -d -v /tmp:/data \\
              -p 2379:2379 -p 2380:2380 -p 4001:4001 -p 7001:7001 xieyanze/etcd3:latest
             
    docker exec -it etcd-v3.0.9 sh
    当前默认还是v2版本通过设定环境变量export ETCDCTL_API=3,设置成V3版本。
    export ETCDCTL_API=3
    etcdctl put /test/ok 11
    etcdctl put /test/ok 22
    etcdctl del  /test/gg
    #删除所有/test前缀的节点
    etcdctl del  /test --prefix
    etcdctl get /test/ok
    # 前缀查询
    etcdctl get /test/ok --prefix

    .软件逻辑结构


                                   
    登录/注册后可看大图

    1. k8s master cluster
    dev-7
    dev-8

    2. k8s slave cluster 1 env1
    dev-1
    dev-2
    dev-3

    3. k8s slave cluster 2 env2
    dev-4
    dev-5
    dev-6

    . controller agent 服务注册与发现


    实现原理:

    注意: etcd v3版本, k/v 的超时间时TTL最小5秒种.
    1.每2秒钟,每个服务向etcd发送一次心跳包,证明自己还活着.
    2.当服务退出时, 主动删除etcd的key. 或者等到TTL超时之后,自动下线.
    3.controller需要获得agent的状态,直接GET [ingress/agent/${env_uuid}/]就能获得当前agent在线状态
    4.agent需要获得controller的状态,直接GET [ingress/controller]就能获得当前controller在线状态


                                   
    登录/注册后可看大图

    controller 目录

    目录

    TTL

    ingress/controller/dev7_xxx

    {"ip":xxx}

    5

    ingress/controller/dev8_xxx

    {"ip":xxx}

    5

    agent 目录

    目录

    TTL

    ingress/agent/env1/dev1_xxx

    {"ip":xxx}

    5

    ingress/agent/env1/dev2_xxx

    {"ip":xxx}

    5

    ingress/agent/env1/dev3_yyy

    {"ip":xxx}

    5

    ingress/agent/env2/dev4_xxx

    {"ip":xxx}

    5

    ingress/agent/env2/dev5_xxx

    {"ip":xxx}

    5

    ingress/agent/env2/dev6_yyy

    {"ip":xxx}

    5

    .软件业务的实现.

    4.1 controller side:

    1. 客户端调用controller restful api.controller 直接写入ETCD,同时写入副本到mysql.

    2. controller 如果关注于agent的变化.只需要watch ingress/agent这个目录

    3. controller 是无状态,不需要同步多个实例之间的数据,可以任意的scale它的实例数.

    4. 如果controller挂掉之后,重启加载mysql的数据库同步到etcd中.

    4.2 controller需要了解规则执行状态


                                   
    登录/注册后可看大图

    etcd 目录

    目录

    TTL

    ingress/ingress_config/env1/${config_uuid1}/status/dev1_xxx

    1

    5

    ingress/ingress_config/env1/${config_uuid1}/status/dev2_xxx

    1

    5

    ingress/ingress_config/env1/${config_uuid1}/status/dev3_xxx

    1

    5

    ingress/ingress_config/env1/${config_uuid2}/status/dev1_xxx

    1

    5

    ingress/ingress_config/env1/${config_uuid2}/status/dev2_xxx

    1

    5

    ingress/ingress_config/env1/${config_uuid2}/status/dev3_xxx

    1

    5

    ingress/ingress_config/env2/${config_uuid3}/status/dev4_xxx

    1

    5

    ingress/ingress_config/env2/${config_uuid3}/status/dev5_xxx

    1

    5

    ingress/ingress_config/env2/${config_uuid3}/status/dev6_xxx

    1

    5

    agent的执行状态直接写入配置状态中,
    先获得当前ingress/agent/env1目录下的agent列表,对比ingress/ingress_agent/env1/${config_uuid1}/status目录下的规则完成之后反馈列表, 每一个都存在时,则全部执行成功.

    4.3 agent side:

    1. 不同集群agent 通过etcd的watch功能在第一时间,获得监听到所有数据的变化 新建,删除,更新

    2. 不同集群agent 定时3分钟获得自已环境下的列表信息,同步处理相关信息

    3. 如果agent挂了之后, 重启加载一次etcd中所有的ingress_conifg.

    . 代码实例

    5.1 etcd clientv3 的封装

    1.连接管理,支持TLS
    2.增,删,查, 支持自动超时的设值
    3.watch 监听目录或KEY的值的变化(PUT,DELETE)

    package main
    import (
    "fmt"
    "time"
    // "github.com/coreos/etcd/pkg/transport"
    "github.com/coreos/etcd/clientv3"
    "golang.org/x/net/context"
    // "sync"
    )
    type EtcdData struct {
    Key              string
    Value            string
    }
    type EtcdHelper struct {
    RequestTimeout   time.Duration
    Client           *clientv3.Client
    }
    func NewEtcdHelper() *EtcdHelper {
    //tlsInfo := transport.TLSInfo{
    // CertFile:      "/tmp/test-certs/test-name-1.pem",
    // KeyFile:       "/tmp/test-certs/test-name-1-key.pem",
    // TrustedCAFile: "/tmp/test-certs/trusted-ca.pem",
    //}
    //tlsInfo := transport.TLSInfo{
    // CertFile:      "./tls/apiserver.crt",
    // KeyFile:       "./tls/apiserver.key",
    //}
    //tlsConfig, err := tlsInfo.ClientConfig()
    //if err != nil {
    // fmt.Printf("%s", err.Error())
    // return nil
    //}
    //cli, err := clientv3.New(clientv3.Config{
    // Endpoints: []string{"dev-7:2379"},
    // DialTimeout: 3 * time.Second,
    // TLS:         tlsConfig,
    //})
    cli, err := clientv3.New(clientv3.Config{
    Endpoints: []string{"127.0.0.1:2379"},
    //Endpoints: []string{"//dev-7:2379"},
    DialTimeout: 3 * time.Second,
    })
    if err != nil {
    fmt.Printf("%s", err.Error())
    return nil
    }
    return &EtcdHelper{
    RequestTimeout: 5 *time.Second,
    Client: cli,
    }
    }
    func (c *EtcdHelper) Release() {
    if c.Client != nil {
    c.Client.Close()
    }
    }
    func (c *EtcdHelper) PutValue(key string, value string, ttl int64) error {
    ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
    defer cancel()
    // minimum lease TTL is 5-second
    resp, err := c.Client.Grant(context.TODO(), ttl)
    if err != nil {
    fmt.Printf("%s\\n", err.Error())
    return err
    }
    _, err = c.Client.Put(ctx, key, value, clientv3.WithLease(resp.ID))
    if err != nil {
    fmt.Printf("%s\\n", err.Error())
    return err
    }
    return nil
    }
    func (c *EtcdHelper) SetValue(key string, value string) error {
    ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
    defer cancel()
    _, err := c.Client.Put(ctx, key, value)
    if err != nil {
    fmt.Printf("%s\\n", err.Error())
    return err
    }
    return nil
    }
    func (c *EtcdHelper) GetValue(key string) []EtcdData {
    ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
    defer cancel()
    resp, err := c.Client.Get(ctx, key, clientv3.WithPrefix())
    if err != nil {
    fmt.Printf("%s\\n", err.Error())
    return nil
    }
    var kv_slice []EtcdData
    for _, ev := range resp.Kvs {
    //fmt.Printf("%s : %s\\n", ev.Key, ev.Value)
    kv := EtcdData{string(ev.Key), string(ev.Value)}
    kv_slice = append(kv_slice, kv)
    }
    return kv_slice
    }
    func (c *EtcdHelper) DelValue(key string) error {
    ctx, cancel := context.WithTimeout(context.Background(), c.RequestTimeout)
    defer cancel()
    _, err := c.Client.Delete(ctx, key, clientv3.WithPrefix())
    if err != nil {
    fmt.Printf("%s\\n", err.Error())
    return err
    }
    return nil
    }
    func (c *EtcdHelper) Watch(key string) {
    rch := c.Client.Watch(context.Background(), key, clientv3.WithPrefix())
    for wresp := range rch {
    for _, ev := range wresp.Events {
    fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
    }
    }
    }
    func (c *EtcdHelper) Listen(key string) clientv3.WatchChan {
    return c.Client.Watch(context.Background(), key, clientv3.WithPrefix())
    }

    5.2 controller 的代码实现

    1.controller上线,下线功能
    2.controller定时发送心跳包到etcd.
    3.controller监听agent的变化.(1-3)完成服务注册与发现
    4.controller通过下发配置到etcd,通知所有watch ingress_config变化的agent

    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
    type ControllerClient struct {
    Period   time.Duration
    Name     string
    IP       string
    Helper   *EtcdHelper
    StopCha  chan int
    //Lock     *sync.Mutex
    }
    func NewControllerClient(name string, host_ip string) *ControllerClient {
    return &ControllerClient{
    Period:  2,
    Name:    name,
    IP:      host_ip,
    Helper:  NewEtcdHelper(),
    StopCha: make(chan int, 10),
    //Lock:    new(sync.Mutex),
    }
    }
    func (cc *ControllerClient) Init(display bool) {
    go func() {
    cc.OnLine()
    for {
    select {
    case <-cc.StopCha:
    fmt.Printf("online goroutinue is exited.")
    return
    case <-time.After(time.Second * cc.Period):
    cc.OnLine()
    }
    }
    }()
    if display {
    go func() {
    watch_chan := cc.Helper.Listen("/ingress/agent")
    for wresp := range watch_chan {
    for _, ev := range wresp.Events {
    fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
    }
    }
    }()
    }
    }
    func (cc *ControllerClient) OnLine() {
    key := fmt.Sprintf("/ingress/controller/%s", cc.Name)
    //cc.Lock.Lock()
    //defer cc.Lock.Unlock()
    err := cc.Helper.PutValue(key, "1", 5)
    if err != nil  {
    fmt.Printf(err.Error())
    }
    }
    func (cc *ControllerClient) OffLine() {
    close(cc.StopCha)
    key := fmt.Sprintf("/ingress/controller/%s", cc.Name)
    //cc.Lock.Lock()
    //defer cc.Lock.Unlock()
    err := cc.Helper.DelValue(key)
    if err != nil  {
    fmt.Printf(err.Error())
    }
    }
    func (cc *ControllerClient) GetIngressConfig(env_uuid string, uuid string) []EtcdData {
    //TODO. first save to mysql.
    key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)
    //cc.Lock.Lock()
    //defer cc.Lock.Unlock()
    return cc.Helper.GetValue(key)
    }
    func (cc *ControllerClient) SetIngressConfig(env_uuid string, uuid string, config string) {
    //TODO. first save to mysql.
    key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)
    //cc.Lock.Lock()
    //defer cc.Lock.Unlock()
    err := cc.Helper.SetValue(key, config)
    if err != nil  {
    fmt.Printf(err.Error())
    }
    }
    func (cc *ControllerClient) DelIngressConfig(env_uuid string, uuid string) {
    //TODO. first update to mysql.
    key := fmt.Sprintf("/ingress/ingress_config/%s/%s", env_uuid, uuid)
    //cc.Lock.Lock()
    //defer cc.Lock.Unlock()
    err := cc.Helper.DelValue(key)
    if err != nil {
    fmt.Printf(err.Error())
    }
    }

    5.3 agent代码实现

    1.agent上线,下线功能
    2.agent定时发送心跳包到etcd.
    3.agent监听(watch) controller的变化.(1-3)完成服务注册与发现
    4.agnet监听(watch) ingress_config变化的agent,实时完成更新或设置配置,删除配置功能.

    ////////////////////////////////////////////////////////////////////////////////////////////
    type AgentClient struct {
    LivePeriod         time.Duration
    FirstConfigPerid   time.Duration
    SyncConfigPeriod   time.Duration
    Name         string
    EnvUUID      string
    IP           string
    Helper       *EtcdHelper
    StopCha      chan struct{}
    }
    func NewAgentClient(name string, env_uuid string, host_ip string) *AgentClient {
    return &AgentClient{
    LivePeriod:         2,
    FirstConfigPerid:   3,
    SyncConfigPeriod:   60,
    Name:           name,
    EnvUUID:        env_uuid,
    IP:             host_ip,
    Helper:         NewEtcdHelper(),
    StopCha:        make(chan struct{}, 1),
    }
    }
    func (ac *AgentClient) Init(display bool) {
    //我还活着,不要干掉我.
    go func() {
    ac.OnLine()
    for {
    select {
    case <-ac.StopCha:
    return
    case <-time.After(time.Second *ac.LivePeriod):
    ac.OnLine()
    }
    }
    }()
    //if display {
    // go func() {
    // watch_chan := cc.Helper.Listen("/ingress/agent")
    // for wresp := range watch_chan {
    // for _, ev := range wresp.Events {
    // fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
    // }
    // }
    // }()
    //}
    //重启之后,第一次同步 和 定期同步.
    //go func() {
    //
    // time.Sleep(time.Second * ac.FirstConfigPerid)
    // ac.SyncIngressConfigs()
    //
    // for {
    // select {
    // case <-ac.StopCha:
    // return
    // case <-time.After(time.Second * ac.SyncConfigPeriod):
    // ac.SyncIngressConfigs()
    // }
    // }
    //}()
    if display {
    //监听controller变化(等待处理掉线自动重连后,重监听)
    go func() {
    watch_chan := ac.Helper.Listen("/ingress/controller")
    for wresp := range watch_chan {
    for _, ev := range wresp.Events {
    fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
    }
    }
    }()
    }
    //监听本环境下ingress_config的变化(等待处理掉线自动重连, 重监听)
    go func() {
    key := fmt.Sprintf("/ingress/ingress_config/%s", ac.EnvUUID)
    watch_chan := ac.Helper.Listen(key)
    for wresp := range watch_chan {
    for _, ev := range wresp.Events {
    fmt.Printf("watch %s %q : %q\\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
    switch ev.Type.String() {
    case "PUT":
    fmt.Printf("agent=%s SetIngressConfig(%s, %s)\\n", ac.Name, ev.Kv.Key, ev.Kv.Value)
    //TODO: SetIngressConfig(key, value)
    break
    case "DELETE":
    fmt.Printf("agent=%s DelIngressConfig(%s)\\n", ac.Name,  ev.Kv.Key)
    //TODO: DelIngressConfig(key)
    break
    }
    }
    }
    }()
    }
    func (ac *AgentClient) OnLine() {
    key := fmt.Sprintf("/ingress/agent/%s/%s", ac.EnvUUID, ac.Name)
    err := ac.Helper.PutValue(key, fmt.Sprintf(`{"name":"%s", "env_uuid":"%s", "ip":"%s"}`, ac.Name, ac.EnvUUID, ac.IP), 5)
    if err != nil  {
    fmt.Printf(err.Error())
    }
    }
    func (ac *AgentClient) OffLine() {
    //ac.StopCha <- 1
    close(ac.StopCha)
    key := fmt.Sprintf("/ingress/agent/%s/%s", ac.EnvUUID, ac.Name)
    err := ac.Helper.DelValue(key)
    if err != nil  {
    fmt.Printf(err.Error())
    }
    }
    func (ac *AgentClient) UpdateIngressStatus(uuid string) {
    key := fmt.Sprintf("/ingress/ingress_config_status/%s/%s/%s", ac.EnvUUID, uuid, ac.Name)
    err := ac.Helper.DelValue(key)
    if err != nil  {
    fmt.Printf(err.Error())
    }
    }
    //服务重启之后,第一次先调用 并用 定时同步
    func (ac *AgentClient) SyncIngressConfigs() {
    key := fmt.Sprintf("/ingress/ingress_config/%s", ac.EnvUUID)
    kv_slice := ac.Helper.GetValue(key)
    if kv_slice != nil {
    //TODO: ingressConfig.SyncIngressConfigs(kv_slice)
    for _, kv := range kv_slice {
    fmt.Printf("name=%s, key:%s-----value:%s\\n", ac.Name, kv.Key, kv.Value)
    }
    }
    }
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
    func main() {
    controller1 := NewControllerClient("dev-7_001", "192.168.0.10")
    controller1.Init(false)
    controller2 := NewControllerClient("dev-8_002", "192.168.0.11")
    controller2.Init(false)
    controller3 := NewControllerClient("dev-8_003", "192.168.0.12")
    controller3.Init(false)
    agent1 := NewAgentClient("dev-1_001", "1", "192.168.0.1")
    agent1.Init(false)
    agent2 := NewAgentClient("dev-2_001", "1", "192.168.0.2")
    agent2.Init(false)
    agent3 := NewAgentClient("dev-3_001", "1", "192.168.0.3")
    agent3.Init(false)
    agent4 := NewAgentClient("dev-4_001", "1", "192.168.0.4")
    agent4.Init(false)
    agent5 := NewAgentClient("dev-5_001", "1", "192.168.0.5")
    agent5.Init(false)
    agent6 := NewAgentClient("dev-6_001", "1", "192.168.0.6")
    agent6.Init(false)
    agent7 := NewAgentClient("dev-7_001", "1", "192.168.0.7")
    agent7.Init(false)
    agent8 := NewAgentClient("dev-8_001", "1", "192.168.0.8")
    agent8.Init(false)
    agent9 := NewAgentClient("dev-9_001", "1", "192.168.0.9")
    agent9.Init(false)
    agent10 := NewAgentClient("dev-10_001", "1", "192.168.0.10")
    agent10.Init(false)
    time.Sleep(time.Second*1)
    controller3.SetIngressConfig("1", "0001", `{"config":"helloworld"}`)
    controller3.DelIngressConfig("1", "0001")
    controller3.SetIngressConfig("1", "0002", `{"config":"helloworld"}`)
    controller3.DelIngressConfig("1", "0002")
    controller3.SetIngressConfig("1", "0003", `{"config":"helloworld"}`)
    controller3.DelIngressConfig("1", "0003")
    controller3.SetIngressConfig("1", "0004", `{"config":"helloworld"}`)
    controller3.DelIngressConfig("1", "0004")
    controller3.SetIngressConfig("1", "0005", `{"config":"helloworld"}`)
    controller3.DelIngressConfig("1", "0005")
    forever := make(chan struct{})
    <-forever
    }

    routeros
    您需要登录后才可以回帖 登录 | 注册

    本版积分规则

    QQ|小黑屋|手机版|Archiver|广西11选5历史 ( 渝ICP备15001194号-1,渝公网安备 50011602500124号 )

    GMT+8, 2019-3-26 17:56 , Processed in 0.165786 second(s), 21 queries , Gzip On, MemCache On.

    Powered by Discuz! X3.4 Licensed

    © 2001-2017 Comsenz Inc.

    快速回复 广西11选5历史 返回列表
  • 宁陵县:法院家事的实践与创新 2019-03-24
  • 日照:前5个月日照税收比重居山东省第二位 2019-03-22
  • 武警部队“长城-2018”反恐国际论坛在京开幕 2019-03-22
  • 税费“红包”助推高质量发展 2019-03-21
  • 电影人共聚“微博电影之夜” 张艺谋:如果不当导演就当守门员 2019-03-21
  • 陕西拍摄到野化放归林麝活动影像 放归林麝已度过危险期 2019-03-15
  • 国宝级黄腹角雉住进三清山 2019-03-15
  • 王石田朴珺罕见亲密写真曝光 女方喂男方冰淇淋娇羞甜蜜 2018-11-22
  • 世界很多国家想拥有核弹,但迫于种种原因而没能实现。 2018-11-21
  • 人事 江西两设区市任免一批领导干部 2018-11-20