• 宁陵县:法院家事的实践与创新 2019-03-24
  • 日照:前5个月日照税收比重居山东省第二位 2019-03-22
  • 武警部队“长城-2018”反恐国际论坛在京开幕 2019-03-22
  • 税费“红包”助推高质量发展 2019-03-21
  • 电影人共聚“微博电影之夜” 张艺谋:如果不当导演就当守门员 2019-03-21
  • 陕西拍摄到野化放归林麝活动影像 放归林麝已度过危险期 2019-03-15
  • 国宝级黄腹角雉住进三清山 2019-03-15
  • 王石田朴珺罕见亲密写真曝光 女方喂男方冰淇淋娇羞甜蜜 2018-11-22
  • 世界很多国家想拥有核弹,但迫于种种原因而没能实现。 2018-11-21
  • 人事 江西两设区市任免一批领导干部 2018-11-20
  • 自由的生活_软路由论坛

     找回密码
     注册

    QQ登录

    只需一步,快速开始

    搜索
    查看: 411|回复: 0
    打印 上一主题 下一主题

    黑龙江体彩十一选5直选三码遗漏: controller侧与agent侧分布式通迅实现-睿云智合

    [复制链接]
    跳转到指定楼层
    1#
    发表于 2018-11-14 18:19:43 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式

    马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

    广西11选5历史 www.jqel.net 您需要 登录 才可以下载或查看,没有帐号?注册

    x
    .背景
    在k8s分布式系统中,通迅成为重要的部分。 本文分享一下如何使用通迅中间件。
    本文代码相关技术如下:
    rabbitmq
    redis
    golang
    k8s集群与集群之间通讯, 我们都可以使用相同的中间件rabbitmq。

    本文使用最简单的模式 LB,单实例的RPC调用。
    .分布式调用结构

    2.1 rabbitmq lb模式调用


                                   
    登录/注册后可看大图

    . agent1 agent2 agent3 同时上报自己在线时, rabbitmq自动调用 controller1 或 controller2 中其中一个实例,再由controller X 写入redis中去。当controller1、controller2需要所有agent状态时, 读取数据都是redis,所以都是一致的。

    . agent1 agent2 agent3 获取配置信息时, rabbitmq也自动调用controller1或 controller2 其中一个实例。再由controller X 读取redis或者mysql数据,再返回给agent。不论是调用到controller1还是controller2,返回的数据都是一致。


    2.2 rabbitmq 单实例模式调用


                                   
    登录/注册后可看大图
    controller 实例下发配置信息时:
    setp1. 获得当前在线的agent。
    setp2. 单实例模式rpc调用。向所有的agent发送配置信息。
    setp3. 可以明确了解有没有agent时下发配置失败的。如果都失败,则本次调用失败.。只要有一个失败,就可以认为需要重发一次命令。
    .代码实现
    3.1. rabbitmq rpc 调用 客户端实现
    package ingress
    import (
    "fmt"
    "time"
    "context"
    "github.com/wzhliang/xing"
    "wise2c/wisecloud-ingress-agent/communicate"
    "wise2c/wisecloud-ingress-agent/log"
    "wise2c/wisecloud-ingress-agent/common"
    )
    type ControllerClient struct {
    Producer  *xing.Client
    Client    communicate.ControllerHelperClient
    }
    func NewControllerClient() *ControllerClient {
    agent := &ControllerClient{}
    //amqp_url := "amqp://guest:[email protected]:5672/"
    amqp_url := fmt.Sprintf("amqp://%s:%[email protected]%s:%d",
    common.MQUser,
    common.MQPassword,
    common.MQHost,
    common.MQPort,
    )
    var err error
    agent.Producer, err = xing.NewClient(
    globalRPCAgentName,
    amqp_url,
    xing.SetIdentifier(&xing.NoneIdentifier{}),
    xing.SetSerializer(&xing.JSONSerializer{}),
    )
    if err != nil {
    log.Error("xing.NewClient() is failed.%s", err.Error())
    return agent
    }
    //LB RPC
    target := fmt.Sprint("ingress.controller")
    agent.Client = communicate.NewControllerHelperClient(target, agent.Producer)
    return agent
    }
    func (this *ControllerClient) Close() {
    if this.Producer == nil {
    return
    }
    this.Producer.Close()
    //this.closed = true
    }
    func (this *ControllerClient) OnlineAgent(name string) error {

    ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
    defer cancel()
    log.Debug("OnlineAgent(%s)", name)
    _, err := this.Client.OnlineAgent(ctx,
    &communicate.OnlineAgentRequest{Name: name, })
    if err != nil {
    return err
    }
    return nil
    }
    func (this *ControllerClient) GetIngressConfigs(uuid string) (string, error){

    ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
    defer cancel()
    response, err := this.Client.GetIngressConfigs(ctx,
    &communicate.GetIngressConfigsRequest{
    Uuid:   uuid,
    })
    if err != nil {
    return "error", err
    }
    return response.Content, err
    }
    func AgentHeartbeatToController() {
    if globalControllerClient == nil {
    return
    }
    var err error
    content := ""
    for {
    err = globalControllerClient.OnlineAgent(globalRPCAgentName)
    if err != nil {
    log.Error(err.Error())
    }
    //1 time / 2 second.
    time.Sleep(time.Millisecond*2000)
    }
    }
    3.2. rabbitmq rpc 调用 服务端实现
    package ingress
    import (
    "fmt"
    "context"
    "github.com/wzhliang/xing"
    "wise2c/wisecloud-ingress-controller/communicate"
    "wise2c/wisecloud-ingress-controller/log"
    "wise2c/wisecloud-ingress-controller/common"
    )
    type ControllerServerImp struct{}
    func (g *ControllerServerImp) OnlineAgent(ctx context.Context, req *communicate.OnlineAgentRequest, rsp *communicate.Void) error {
    log.Debug("OnlineAgent(%s)", req.Name)
    err := globalAgentClient.manager.OnlineAgent(req.Name)
    if err != nil {
    return err
    }
    return nil
    }
    func (g *ControllerServerImp) GetIngressConfigs(ctx context.Context, req *communicate.GetIngressConfigsRequest, rsp *communicate.GetIngressConfigsResponse) error {
    log.Info("GetIngressConfigs(%s)", req.Uuid)
    rsp.Content = "ok"
    return nil
    }
    func RunRPCServer() {
    //globalRPCControllerName = fmt.Sprintf("host.controller.%s", common.GetGuid())
    //LB RPC.
    globalRPCControllerName = fmt.Sprintf("ingress.controller")
    //amqp_url := "amqp://guest:[email protected]:5672/"
    amqp_url := fmt.Sprintf("amqp://%s:%[email protected]%s:%d",
    common.MQUser,
    common.MQPassword,
    common.MQHost,
    common.MQPort,
    )
    svc, err := xing.NewService(
    globalRPCControllerName,
    amqp_url,
    xing.SetSerializer(&xing.JSONSerializer{}),
    xing.SetBrokerTimeout(15, 5),
    )
    if err != nil {
    log.Error(fmt.Sprintf("MQURL=%s NewService is failed. %s", amqp_url, err.Error()))
    }
    communicate.RegisterControllerHelperHandler(svc, &ControllerServerImp{})
    log.Info("RPC Server is starting. Connect to the rabbitmq[%s].", amqp_url)
    err = svc.Run()
    if err != nil {
    log.Error(err.Error())
    }
    }
    3.3. rabbitmq 单实例rpc调用 客户端实现

    1. rpc lb调用实时上报agent是否在线,实现了类似consul的服务发现的功能.

    2. ClientManager可以通过redis中的实时数据,管理所有的rpc client. 当 agent 下线,或者3秒之间没有上报状态,则清除指定的rpc client.

    3. 这样每次下发配置时,可以实时发送到每个rpc单实例服务器实例.

    package ingress
    import (
    "fmt"
    "sync"
    "time"
    "errors"
    "context"
    "github.com/wzhliang/xing"
    "github.com/astaxie/beego/utils"
    "wise2c/wisecloud-ingress-controller/communicate"
    "wise2c/wisecloud-ingress-controller/log"
    "wise2c/wisecloud-ingress-controller/common"
    )
    type AgentHelper struct {
    mutex         *sync.Mutex
    closed       bool
    Helper        communicate.AgentHelperClient
    Producer      *xing.Client
    }
    func NewAgentHelper(agent_name string) *AgentHelper {
    var err error
    agent := &AgentHelper{
    mutex:   new(sync.Mutex),
    closed:  false,
    }
    //amqp_url := "amqp://guest:[email protected]:5672/"
    amqp_url := fmt.Sprintf("amqp://%s:%[email protected]%s:%d",
    common.MQUser,
    common.MQPassword,
    common.MQHost,
    common.MQPort,
    )
    agent.Producer, err = xing.NewClient(
    globalRPCControllerName,
    amqp_url,
    xing.SetIdentifier(&xing.NoneIdentifier{}),
    xing.SetSerializer(&xing.JSONSerializer{}),
    )
    if err != nil {
    log.Error(fmt.Sprintf("MQURL=%s NewClient is failed. %s",
    amqp_url,
    err.Error()))
    return agent
    }
    //target := fmt.Sprint("ingress.agent.%s", agent_name)
    agent.Helper = communicate.NewAgentHelperClient(agent_name, agent.Producer)
    return agent
    }
    func (this *AgentHelper) Close() {
    this.mutex.Lock()
    defer this.mutex.Unlock()
    if this.Producer != nil {
    this.Producer.Close()
    }
    this.closed = true
    }
    func (this *AgentHelper) SetIngressConfig(content string) error  {
    this.mutex.Lock()
    defer this.mutex.Unlock()
    if this.closed {
    return errors.New("the client is closed.")
    }
    ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
    defer cancel()
    log.Info("SetIngressConfig(%s)", content)
    _, err := this.Helper.SetIngressConfig(ctx,
    &communicate.SetIngressConfigRequest{
    content,
    })
    return err
    }
    func (this *AgentHelper) DelIngressConfig(uuid string) error {
    this.mutex.Lock()
    defer this.mutex.Unlock()
    if this.closed {
    return errors.New("the client is closed.")
    }
    ctx, cancel := context.WithTimeout(context.Background(), 5000*time.Millisecond)
    defer cancel()
    log.Info("DelIngressConfig(%s)", uuid)
    _, err := this.Helper.DelIngressConfig(ctx,
    &communicate.DelIngressConfigRequest{
    uuid,
    })
    return err
    }
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
    type ClientManager struct {
    mutex           *sync.Mutex
    Pool   *utils.BeeMap
    }
    func NewClientManager() *ClientManager {
    return &ClientManager{
    mutex:      new(sync.Mutex),
    Pool:       utils.NewBeeMap(),
    }
    }
    func (p *ClientManager) Init() {
    go p.RunConnect()
    }
    func (p *ClientManager) RunConnect() {
    for {
    names, err := globalRedisClient.GetAgentNames()
    if err != nil {
    log.Error(err.Error())
    }
    name_map := map[interface{}]int{}
    for _, name := range names {
    name_map[name] = 1
    }
    for key, v := range p.Pool.Items() {
    //log.Error("key=%s", key)
    _, ok := name_map[key]
    if ok {
    //log.Warning("find the %s", key)
    continue
    }
    if v != nil {
    log.Warning("Close the AgentHelper %s", key)
    v.(*AgentHelper).Close()
    }
    log.Warning("Delete the Pool %s", key)
    p.Pool.Delete(key)
    }
    for _, name := range names {
    if !p.Pool.Check(name) {
    log.Warning("New the AgentHelper %s", name)
    p.Pool.Set(name, NewAgentHelper(name))
    }
    }
    time.Sleep(time.Second * 1)
    }
    }
    func (p *ClientManager) GetClients() map[interface{}]interface{} {
    return p.Pool.Items()
    }
    func (p *ClientManager) OnlineAgent(agent_name string) error {
    p.mutex.Lock()
    defer p.mutex.Unlock()
    globalRedisClient.OnlineAgent(agent_name, "1")
    if !p.Pool.Check(agent_name) {
    p.Pool.Set(agent_name, NewAgentHelper(agent_name))
    }
    return nil
    }
    func (p *ClientManager) OfflineAgent(agent_name string) error {
    p.mutex.Lock()
    defer p.mutex.Unlock()
    globalRedisClient.OfflineAgent(agent_name)
    agent_helper :=  p.Pool.Get(agent_name)
    if agent_helper != nil {
    agent_helper.(*AgentHelper).Close()
    p.Pool.Delete(agent_name)
    }
    return nil
    }
    ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
    type AgentClient struct {
    manager *ClientManager
    }
    func NewAgentClient() *AgentClient {
    return &AgentClient{
    manager:    NewClientManager(),
    }
    }
    func (client *AgentClient) Init() {
    client.manager.Init()
    }
    type AgentHandlerCallback func(request, response interface{}, helper *AgentHelper) error
    func (client *AgentClient) AgentHandler(request, response interface{}, callback AgentHandlerCallback) (err error) {
    maps := client.manager.GetClients()
    count := 0
    for k, v := range maps {
    name := k.(string)
    if v == nil {
    continue
    }
    helper := v.(*AgentHelper)
    if callback == nil {
    continue
    }
    err = callback(request, response, helper)
    if err != nil {
    return errors.New(fmt.Sprintf("%s %s", name, err.Error()))
    }
    count += 1
    }
    if count > 0 {
    return nil
    }
    return errors.New("no agent online.")
    }
    func (this *AgentClient) SetIngressConfig(request, response interface{}) error {
    return this.AgentHandler(request, response, func(request, response interface{}, helper *AgentHelper) error {
    return helper.SetIngressConfig(request.(string))
    })
    }
    func (this *AgentClient) DelIngressConfig(request, response interface{}) error {
    return this.AgentHandler(request, response, func(request, response interface{}, helper *AgentHelper) error {
    return helper.DelIngressConfig(request.(string))
    })
    }

    3.4. rabbitmq 单实例rpc调用.服务端实现
    package ingress
    import (
    "fmt"
    "context"
    "github.com/wzhliang/xing"
    "wise2c/wisecloud-ingress-agent/communicate"
    "wise2c/wisecloud-ingress-agent/log"
    "wise2c/wisecloud-ingress-agent/common"
    )
    type AgentServerImp struct{}
    func (g *AgentServerImp) SetIngressConfig(ctx context.Context, req *communicate.SetIngressConfigRequest, rsp *communicate.Void) error {
    log.Info("SetIngressConfig(%s)", req.Content)
    config := &Wise2cIngressConfig{}
    err := config.Parse([]byte(req.Content))
    if err != nil {
    log.Error(err.Error())
    return err
    }
    globalIngressProcess.SetIngressConfig(config)
    return nil
    }
    func (g *AgentServerImp) DelIngressConfig(ctx context.Context, req *communicate.DelIngressConfigRequest, rsp *communicate.Void) error {
    log.Info("DelIngressConfig(%s)", req.Uuid)
    globalIngressProcess.DelIngressConfig(req.Uuid)
    return nil
    }
    func RunRPCServer() {
    //amqp_url := "amqp://guest:[email protected]:5672/"
    amqp_url := fmt.Sprintf("amqp://%s:%[email protected]%s:%d",
    common.MQUser,
    common.MQPassword,
    common.MQHost,
    common.MQPort,
    )
    svc, err := xing.NewService(
    globalRPCAgentName,
    amqp_url,
    xing.SetSerializer(&xing.JSONSerializer{}),
    )
    if err != nil {
    log.Error(fmt.Sprintf("MQURL=%s, NewService() is failed. %s", amqp_url,  err.Error()))
    }
    communicate.RegisterAgentHelperHandler(svc, &AgentServerImp{})
    log.Info("RPC Server is starting. Connect to the rabbitmq[%s].", amqp_url)
    go LoopRPC(svc)
    }
    func LoopRPC(svc *xing.Client) {
    err := svc.Run()
    if err != nil {
    log.Error(err.Error())
    }
    }
    .总结
    ● 通过 rabbitmq lb调用方式,可以实现从agent侧上报数据到controller侧或者agent侧拉取需要的数据。
    ● 通过rabbitmq 单实例调用方式,由于有了之前lb上报agent状态,或者使用第三方 consul.etcd中服务发现功能。我们可以实现从controller侧下发配置到每一个agent,在每个agent实例中完成相同的功能。

    routeros
    您需要登录后才可以回帖 登录 | 注册

    本版积分规则

    QQ|小黑屋|手机版|Archiver|广西11选5历史 ( 渝ICP备15001194号-1,渝公网安备 50011602500124号 )

    GMT+8, 2019-3-26 18:36 , Processed in 0.152714 second(s), 21 queries , Gzip On, MemCache On.

    Powered by Discuz! X3.4 Licensed

    © 2001-2017 Comsenz Inc.

    快速回复 广西11选5历史 返回列表
  • 宁陵县:法院家事的实践与创新 2019-03-24
  • 日照:前5个月日照税收比重居山东省第二位 2019-03-22
  • 武警部队“长城-2018”反恐国际论坛在京开幕 2019-03-22
  • 税费“红包”助推高质量发展 2019-03-21
  • 电影人共聚“微博电影之夜” 张艺谋:如果不当导演就当守门员 2019-03-21
  • 陕西拍摄到野化放归林麝活动影像 放归林麝已度过危险期 2019-03-15
  • 国宝级黄腹角雉住进三清山 2019-03-15
  • 王石田朴珺罕见亲密写真曝光 女方喂男方冰淇淋娇羞甜蜜 2018-11-22
  • 世界很多国家想拥有核弹,但迫于种种原因而没能实现。 2018-11-21
  • 人事 江西两设区市任免一批领导干部 2018-11-20