Featured image of post Service Registration and Discovery in Microservices

Service Registration and Discovery in Microservices

Service registration and discovery ensure that service consumers and providers can maintain normal communication when service instances go online/offline or change.

With service registration and discovery mechanisms, consumers don’t need to know the actual physical addresses of service providers to make calls, nor do they need to know how many providers are available. Service providers simply register with the registry to expose their services, without needing to know which specific services are consuming them.

RPC Configuration

Name: user.rpc
ListenOn: 0.0.0.0:8081
Etcd:
  Hosts:
  - 127.0.0.1:2379
  Key: user.rpc

Callee - Service Registration

  • Source code at mall/user/rpc/user.go:
package main

import (
	"flag"
	"fmt"

	"go-zero-demo-rpc/mall/user/rpc/internal/config"
	"go-zero-demo-rpc/mall/user/rpc/internal/server"
	"go-zero-demo-rpc/mall/user/rpc/internal/svc"
	"go-zero-demo-rpc/mall/user/rpc/types/user"

	"github.com/zeromicro/go-zero/core/conf"
	"github.com/zeromicro/go-zero/core/service"
	"github.com/zeromicro/go-zero/zrpc"
	"google.golang.org/grpc"
	"google.golang.org/grpc/reflection"
)

var configFile = flag.String("f", "etc/user.yaml", "the config file")

func main() {
    flag.Parse()
    
    var c config.Config
    conf.MustLoad(*configFile, &c)
    ctx := svc.NewServiceContext(c)
    svr := server.NewUserServer(ctx)
    
    s := zrpc.MustNewServer(c.RpcServerConf, func(grpcServer *grpc.Server) {
        user.RegisterUserServer(grpcServer, svr)
    
        if c.Mode == service.DevMode || c.Mode == service.TestMode {
            reflection.Register(grpcServer)
        }
    })
    defer s.Stop()
    
    fmt.Printf("Starting rpc server at %s...\n", c.ListenOn)
    s.Start()
}
  • The MustNewServer internally calls NewServer, which creates an internal.Server instance via internal.NewRpcPubServer
if c.HasEtcd() {
    server, err = internal.NewRpcPubServer(c.Etcd, c.ListenOn, serverOptions...)
    if err != nil {
        return nil, err
    }
}
  • internal.NewRpcPubServer
func NewRpcPubServer(etcd discov.EtcdConf, listenOn string, opts ...ServerOption) (Server, error) {
	registerEtcd := func() error {
		pubListenOn := figureOutListenOn(listenOn)
		// ... etcd configuration handling
		pubClient := discov.NewPublisher(etcd.Hosts, etcd.Key, pubListenOn, pubOpts...)
		return pubClient.KeepAlive()
	}
	// ... server initialization
}
  • figureOutListenOn resolves the actual registration address:
    • Handles special cases for 0.0.0.0
    • Retrieves internal IP if needed
func figureOutListenOn(listenOn string) string {
	// ... address resolution logic
}
  • The registration process in Publisher.KeepAlive():
    • Creates etcd lease
    • Registers service with key:value format
    • Maintains keep-alive heartbeat
func (p *Publisher) KeepAlive() error {
	// ... etcd client initialization
	// ... lease management
	return p.keepAliveAsync(cli)
}
  • Registered service example in etcd:

Caller - Service Discovery

  • Caller code at order/api/order.go:
package main

import (
	"flag"
	"fmt"

	"go-zero-demo-rpc/order/api/internal/config"
	"go-zero-demo-rpc/order/api/internal/handler"
	"go-zero-demo-rpc/order/api/internal/svc"

	"github.com/zeromicro/go-zero/core/conf"
	"github.com/zeromicro/go-zero/rest"
)

var configFile = flag.String("f", "etc/order.yaml", "the config file")

func main() {
	flag.Parse()

	var c config.Config
	conf.MustLoad(*configFile, &c)

	server := rest.MustNewServer(c.RestConf)
	defer server.Stop()

	ctx := svc.NewServiceContext(c)
	handler.RegisterHandlers(server, ctx)

	fmt.Printf("Starting server at %s:%d...\n", c.Host, c.Port)
	server.Start()
}
  • Service discovery initialization via zrpc.MustNewClient:
    • Builds etcd-based target format discov://{hosts}/{serviceKey}
    • Implements resolver watching etcd changes
func (b *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, _ resolver.BuildOptions) (
	resolver.Resolver, error) {
	// ... etcd subscriber initialization
	sub.AddListener(update)
	update()
	return &nopResolver{cc: cc}, nil
}
  • Discovery process details:
    • Initial load of available endpoints
    • Continuous etcd watch for changes
    • Dynamic endpoint list updates
func (c *cluster) monitor(key string, l UpdateListener) error {
	// ... etcd client initialization
	c.load(cli, key)
	c.watch(cli, key)
}
  • Discovered service endpoints in etcd:

Q&A

Q: Why not use Redis as registry?
A: Key reasons include:

  1. Lack of versioning support
  2. Unsuitable pub/sub implementation for production
  3. Higher latency compared to etcd’s watch
  4. Different clustering approaches (Raft vs Redis Cluster)
  5. Data persistence differences