不喜勿喷!大神请绕道!

这一篇我们来讲服务注册中心的实现,什么是服务注册中心?首先你得明白 etcd 的作用是什么?它的作用在博客:etcd 套路(一)什么是 etcd 这篇博文当中有详细的描述,服务注册发现是它的特色之一,那就看看是如何实现的,篇幅较长内容较多,很多东西我都直接写到代码注释里面供大家查看,因为还不完善没有实现 RoundRobin 服务发现以及 etcd 实现负载均衡(我们利用 etcd 要实现的目标是服务注册与发现以及负载均衡功能) 所以撒 先提供部分代码供参考,比这代码一步步进行起码这篇博文能让你知道如何试下 etcd 服务注册中心,这也是核心!
上代码:

Ademo.proto 内容:
syntax = "proto3";
package demo;
service DemoService {
    rpc DemoHandler(DemoRequest) returns (DemoResponse){};
}
message DemoRequest{
    string name = 1;
}
message DemoResponse{
    string name = 1;
}
利用这个 proto 文件如何生成 demo.pd.go 这里就不说了,前边的博文都讲过了,在 grpc 的大章节中好几篇是在讲它!
B: 有了 demo.pd.go 接口文件我们得要去实现啊 家里 grpc 服务端
demoserverimp.go 实现类文件(其实没有类你就把它理解为类文件就行了)
package rpcserverimpl
import (
"context"
"grpclb/rpcfile"
)
type DemoServiceServerImpl struct {}
func (s *DemoServiceServerImpl) DemoHandler(_ context.Context,request *demo.DemoRequest) (*demo.DemoResponse,error) {
    return &demo.DemoResponse{
        Name: request.Name,
    },nil
}
c. 现在我们来看最为核心的 etcd 当中服务注册中心的代码,也没啥男的!
package register

import (
    "context"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
    "log"
    "time"
)

type (
    Register struct {
        key           string          //前缀
        client3       *clientv3.Client //etcd的链接
        serverAddress string          //服务地址
        stop          chan bool       //假设一个rpc服务宕机了我们需要从etcd当中删除 利用channel管道技术传递bool值
        interval      time.Duration   //心跳周期  我们需要跟etcd保持联系 隔一段时间就去联系一下证明自己还活着
        leaseTime     int64           //租赁的时间 都是有时间限制的
    }
)

func NewRegister(key string,client3 *clientv3.Client,serverAddress string) *Register{
    return &Register{
        key:           key,
        client3:      client3,
        serverAddress: serverAddress,
        //心跳的周期一定要小于租赁的周期  不然会存在真空期
        interval:    3 * time.Second,
        leaseTime: 15,
        stop:          make(chan bool,1),
    }
}

//注册
func (r *Register) Reg() {
    k := r.makeKye()
    //心跳
    go func() {
        //每次心跳周期设置的时间都会往通道里面塞入值
        t := time.NewTicker(r.interval)
        //这里起一个死循环 不停的循环
        for {
            //租赁 生成租赁合同哦
            lgs, err := r.client3.Grant(context.TODO(), r.leaseTime)
            if nil != err {
                panic(err)
            }
            //判断key是否存在 存在则更新 不存在则写入
            if _, err := r.client3.Get(context.TODO(), k);err != nil {
                //如果没有发现key值的存在
                if err == rpctypes.ErrKeyNotFound {
                    //没有发现key那就往里面添加喽  k就是key+服务器地址   value就是服务器地址 然后再来个租赁周期(得有个过期时间啊)
                    if _, err := r.client3.Put(context.TODO(), k, r.serverAddress,clientv3.WithLease(lgs.ID));err != nil {
                        //如果有错误那么我们就直接退出了
                        panic(err)
                    }
                }else {
                        //既然没发现次key的存在 err还不为空 那就是未知错误来处理
                        panic(err)
                }
            }else{
                //有key的存在那么我们就去更新这个key
                if _, err := r.client3.Put(context.TODO(), k, r.serverAddress, clientv3.WithLease(lgs.ID));err != nil {
                    panic(err)
                }
            }
            //这里需要对select有深入的理解https://studygolang.com/articles/7203 <-t.C 以及 <-r.stop会一直等待
            select {
            //通道里面没有值 程序会阻塞在这里
            case ttl := <-t.C:
                log.Println(ttl)
            //如果收到了停止信号 则整个协程结束 即心跳结束
            case <-r.stop:
                return
            }

        }
    }()
}

//取消注册
//比如我的服务端挂掉了 我需要取消key即删除掉key
func (r *Register) UnReg() {
    //1.首先要停止心跳
    r.stop <- true
    //为了防止多线程下出现死锁的问题  channel管道就是为了协程和协程之间的通讯 上边设置了true那么注册中心里面的心跳程序就死掉了 初始化一下r.stop
    r.stop = make(chan bool,1)
    k := r.makeKye()
    if _, e := r.client3.Delete(context.TODO(), k);e != nil {
        panic(e)
    }else {
        //打印哥日志看看喽
        log.Printf("%s unreg success",k)
    }

    return
}

//生成key策略
func (r *Register) makeKye() string {
    return fmt.Sprintf("%s_%s",r.key,r.serverAddress)
}

套路我们再 golang 的基础博文当中就讲过了,你就把他理解为一个类(虽然 golang 当中没有类的概念),创建一个 newxxxx 的函数暴露给外界调用,在这个 newxxxx 函数当中调起 struct 结构体就完事!多么简单的套路撒!

里面注释已经写得很清楚了,核心我说一下,其实就一个心跳,有些新朋友会问什么是心跳啊?就是你的心脏在不停的跳!程序也是一样的,比如这里弄了个定时器 3s 一请求,当然这里用到了 channel 管道 协程间通讯用的 不到时间 <-t.C 就会阻塞等着 其实跟 sleep (3) 一样一样的效果!这不为了装逼才这样写的吗!心跳去请求 etcd 判断当前的具有一定策略的 key 是否存在,这个 key 不是乱写的哈 自己去看里面的策略!如果存在则更新租赁周期 如果不存在则创建并且设置租赁周期,当然一个细节需要注意的就是心跳的周期一定是要小于租赁的周期的,因为如果是大于就会出现真空期!那不就扯犊子了!要说的就这些,核心也就这么多叮叮东西!

d. 再来看我们的 server.go 服务 哟了注册中心你得链接啊 你的起来 rpc 服务啊你得去 etcd 服务啊 草,好麻烦 看代码吧:
package main

import (
    "flag"
    "fmt"
    "go.etcd.io/etcd/clientv3"
    "google.golang.org/grpc"
    "grpclb/register"
    demo1 "grpclb/rpcfile"
    "grpclb/rpcserverimpl"
    "log"
    "net"
    "os"
    "os/signal"
    "syscall"
)
var (
    port  = flag.Int("p",50001,"server port")
)
const (
    key string = "vector_rpc_server"
)
func main(){
    //解析标签
    flag.Parse()
    //形成服务地址端着端口
    serverAddress := fmt.Sprintf("127.0.0.1:%d",*port)

    //=================================================================================================
    //A.监听rpc服务
    listen, err := net.Listen("tcp", serverAddress)
    if err != nil {
        panic(err)
    }
    //A1.服务监听成功打印日志
    log.Printf("start demo rpc listen on %d",*port)
    //A2.老样子 grpc必须的一步
    server := grpc.NewServer()
    //A3.老样子 RegisterDemoServiceServer是形成的 pb.go文件里面的哦  注册服务 第二个参数是你实现接口的struct结构体
    demo1.RegisterDemoServiceServer(server,&rpcserverimpl.DemoServiceServerImpl{})

    //=================================================================================================

    //B.注册etcd
    //B1.服务列表 切片类型
    endpoints := []string{
        "127.0.0.1:2379",
    }
    //B2.实例化etcd服务
    client3,err := clientv3.New(
        clientv3.Config{
            Endpoints:            endpoints,
            //你还可以设置更过的参数 如果开启了auth验证 也可以配置username和password
        },
    )
    // conn fail
    if err != nil {
        panic(err)
    }
    //B3.注册rpc服务到etcd中心
    reg := register.NewRegister(key, client3, serverAddress)
    reg.Reg()

    //=================================================================================================

    //C.善后工作
    //我们的rpc服务有没有挂掉 服务器有没有宕机......
    //所以我们需要建立一个通道 告诉etcd我的服务挂掉了 服务器宕机了...... 赶紧把我的合同解约删掉key 好让接下里的请求不再调取我的rpc服务
    //所以 我们要接收来自系统的信号
    ch := make(chan os.Signal,1)
    //接收系统信号写到ch管道当中去
    signal.Notify(ch,syscall.SIGTERM,syscall.SIGINT,syscall.SIGKILL,syscall.SIGHUP,syscall.SIGQUIT)
    go func() {
        s := <-ch
        log.Printf("signal.notify %v",s)
        //服务停掉之后 删除掉注册进etcd中心的key 我的服务都停了还留着你干啥用呢!
        reg.UnReg()
    }()

    //=================================================================================================

    //A4.启动rpc服务   这个地方我们不需要for死循环 grpc当中会自动帮助我们实时监听 所以主程序也不会死掉的!!! 一直在监听在跑!
    server.Serve(listen)

    //因为我们调用的reg()注册中心里面使用的是协程 所以这里我们需要防止主程序死亡导致协程结束 所以让它睡眠20s
    //time.Sleep(20 * time.Second)
    //fmt.Println("ok")

}

说两句核心,正如上边代码里面注释讲的很明白了如果还是看不懂只能说明基础不牢地动山摇!
核心无法就是一个监听 rpc 服务 注册 etcd 服务 以及 善后工作对系统服务的监控比如宕机、rpc 服务挂掉、进程被无情的杀死……
最后的 server.Serve(listen) 还是比较有讲究的,因为服务注册中心当中用到了协程,主进程一死所有的协程统统得死!所以这句话那会一直监听着 rpc 端口 也就保证了主进程不死 那么也就保证了服务注册中心的子协程不死!你不是我不死大家都不死!就万事大吉了!

就说到这里吧!下边给大家提供一下我的文件目录,自己写的时候可以参考一下:

转自:https://learnku.com/articles/48876