GRPC


GRPC

rpc

1. rpc之hello world

Go语言的RPC包的路径为net/rpc,也就是放在了net包目录下面。因此我们可以猜测该RPC包是建立在net包基础之上的。在第一章“Hello, World”革命一节最后,我们基于http实现了一个打印例子。下面我们尝试基于rpc实现一个类似的例子。

1. 服务端:

package main

import (
	"net"
	"net/rpc"
)

type HelloService struct {}
func (s *HelloService) Hello(request string, reply *string) error {
	*reply = "hello "+ request
	return nil
}

func main(){
	_ = rpc.RegisterName("HelloService", &HelloService{})
	listener, err := net.Listen("tcp", ":1234")
	if err != nil {
		panic("监听端口失败")
	}
	conn, err := listener.Accept()
	if err != nil {
		panic("建立链接失败")
	}
	rpc.ServeConn(conn)

}

其中Hello方法必须满足Go语言的RPC规则:方法只能有两个可序列化的参数,其中第二个参数是指针类型,并且返回一个error类型,同时必须是公开的方法

然后就可以将HelloService类型的对象注册为一个RPC服务:(TCP RPC服务)。

其中rpc.Register函数调用会将对象类型中所有满足RPC规则的对象方法注册为RPC函数,所有注册的方法会放在“HelloService”服务空间之下。然后我们建立一个唯一的TCP链接,并且通过rpc.ServeConn函数在该TCP链接上为对方提供RPC服务。

2. 客户端

func main() {
    client, err := rpc.Dial("tcp", "localhost:1234")
    if err != nil {
        log.Fatal("dialing:", err)
    }

    var reply string
    err = client.Call("HelloService.Hello", "hello", &reply)
    if err != nil {
        log.Fatal(err)
    }

    fmt.Println(reply)
}

首先是通过rpc.Dial拨号RPC服务,然后通过client.Call调用具体的RPC方法。在调用client.Call时,第一个参数是用点号链接的RPC服务名字和方法名字,第二和第三个参数分别我们定义RPC方法的两个参数

2. rpc支持json

Go语言的RPC框架有两个比较有特色的设计:一个是RPC数据打包时可以通过插件实现自定义的编码和解码;另 一个是RPC建立在抽象的io.ReadWriteCloser接口之上的,我们可以将RPC架设在不同的通讯协议之上。这里我们将尝试通过官方自带的net/rpc/jsonrpc扩展实现一个跨语言的RPC。

首先是基于json编码重新实现RPC服务:

服务端

package main

import (
	"net"
	"net/rpc"
	"net/rpc/jsonrpc"
)

type HelloService struct {}

func (s *HelloService) Hello(request string, reply *string) error {
	*reply = "hello "+ request
	return nil
}

func main(){
	rpc.RegisterName("HelloService", new(HelloService))
	listener, err := net.Listen("tcp", ":1234")
	if err != nil {
		panic("启动错误")
	}
	for {
		conn, err := listener.Accept()
		if err != nil {
			panic("接收")
		}
		go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
	}
}

代码中最大的变化是用rpc.ServeCodec函数替代了rpc.ServeConn函数,传入的参数是针对服务端的json编解码器。

客户端

package main

import (
	"fmt"
	"net"
	"net/rpc"
	"net/rpc/jsonrpc"
)

func main(){
	conn, err := net.Dial("tcp", "localhost:1234")
	if err != nil {
		panic("连接错误")
	}
	client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
	var reply string
	err = client.Call("HelloService.Hello", "imooc", &reply)
	if err != nil {
		panic("调用错误")
	}
	fmt.Println(reply)
}

3. 基于http的rpc

服务端

func main() {
    rpc.RegisterName("HelloService", new(HelloService))
    http.HandleFunc("/jsonrpc", func(w http.ResponseWriter, r *http.Request) {
        var conn io.ReadWriteCloser = struct {
            io.Writer
            io.ReadCloser
        }{
            ReadCloser: r.Body,
            Writer:     w,
        }
        rpc.ServeRequest(jsonrpc.NewServerCodec(conn))
    })
    http.ListenAndServe(":1234", nil)
}

4. 进一步改进rpc调用过程

前面的rpc调用虽然简单,但是和普通的http的调用差异不大,这次我们解决下面的问题:

1. serviceName统一和名称冲突的问题

    1. server端和client端如何统一serviceName
    2. 多个server的包中serviceName同名的问题

新建handler/handler.go文件内容如下: 为什么要新建一个文件? - 解耦

package handler

const HelloServiceName = "handler/HelloService"
1. 服务端
package main

import (
	"net"
	"net/rpc"
	"start/rpc_ch01/handler"
)



type HelloService struct {}
func (s *HelloService) Hello(request string, reply *string) error {
	*reply = "hello "+ request
	return nil
}

func main(){
	_ = rpc.RegisterName(handler.HelloServiceName, &HelloService{})
	listener, err := net.Listen("tcp", ":1234")
	if err != nil {
		panic("监听端口失败")
	}
	conn, err := listener.Accept()
	if err != nil {
		panic("建立链接失败")
	}
	rpc.ServeConn(conn)

}
2. 客户端
package main

import (
	"fmt"
	"net/rpc"
	"start/rpc_ch01/handler"
)

func main() {
	client, err := rpc.Dial("tcp", "localhost:1234")
	if err != nil {
		panic("连接到服务器失败")
	}

	var reply string
	err = client.Call(handler.HelloServiceName+".Hello", "imooc", &reply)
	if err != nil {
		panic("服务调用失败")
	}

	fmt.Println(reply)
}

2. 继续屏蔽HelloServiceName和Hello函数名称

1. handler源码
package handler

type HelloService struct{}

func (s *HelloService) Hello(request string, reply *string) error {
	*reply = "hello " + request
	return nil
}
2. 服务端代理
package server_proxy

import "net/rpc"

const HelloServiceName = "handler/HelloService"

type HelloServiceInterface interface {
	Hello(request string, reply *string) error
}

func RegisterHelloService(srv HelloServiceInterface) error {
	return rpc.RegisterName(HelloServiceName, srv)
}
3. 服务端
package main

import (
	"net"
	"net/rpc"
	"start/rpc_ch01/handler"
	"start/rpc_ch01/server_proxy"
)

func main(){
	hellohandler := &handler.HelloService{}
	_ = server_proxy.RegisterHelloService(hellohandler)
	listener, err := net.Listen("tcp", ":1234")
	if err != nil {
		panic("监听端口失败")
	}
	conn, err := listener.Accept()
	if err != nil {
		panic("建立链接失败")
	}
	rpc.ServeConn(conn)

}
4. 客户端代理
package client_proxy

import "net/rpc"

const HelloServiceName = "handler/HelloService"

type HelloServiceClient struct{
	*rpc.Client
}
func NewClient(address string) HelloServiceClient {
	conn, err := rpc.Dial("tcp", address)
	if err != nil {
		panic("连接服务器错误")
	}
	return HelloServiceClient{conn}
}

func (c *HelloServiceClient) Hello(request string, reply *string) error {
	err := c.Call(HelloServiceName+".Hello", request, reply)
	if err != nil {
		return err
	}
	return nil
}
5. 客户端
package main

import (
	"fmt"
	"start/rpc_ch01/client_proxy"
)

func main(){
	client := client_proxy.NewClient("localhost:1234")
	var reply string
	err := client.Hello("bobby",&reply)
	if err != nil {
		panic("调用失败")
	}
	fmt.Println(reply)
}

grpc

gRPC 是一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计。目前提供 C、Java 和 Go 语言版本,分别是:grpc, grpc-java, grpc-go. 其中 C 版本支持 C, C++, Node.js, Python, Ruby, Objective-C, PHPC# 支持.

image.png

protobuf

java中的dubbo dubbo/rmi/hessian messagepack 如果你懂了协议完全有能力自己去实现一个协议

  • 习惯用 Json、XML 数据存储格式的你们,相信大多都没听过Protocol Buffer

  • Protocol Buffer 其实 是 Google出品的一种轻量 & 高效的结构化数据存储格式,性能比 Json、XML 真的强!太!多!

  • protobuf经历了protobuf2和protobuf3,pb3比pb2简化了很多,目前主流的版本是pb3

protobuf

grpc开发体验

1. 下载工具

protobuf下载

如果觉得下载较慢可以点击这里下载:

protoc-3.13.0-win64.zip

protoc-3.13.0-linux-x86_64.zip

下载完成后解压后将路径添加到环境变量中

2. 下载go的依赖包

go get github.com/golang/protobuf/protoc-gen-go

3. proto文件

syntax = "proto3";
option go_package = ".;proto";
service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply);
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

4. 生成go文件

protoc -I . goods.proto --go_out=plugins=grpc:.

5. 服务端代码

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "grpc_demo/hello"
    "net"
)

type Server struct {
}


func (s *Server)  SayHello(ctx context.Context,request *hello.HelloRequest)(*hello.HelloReply,error){
    return &hello.HelloReply{Message:"Hello "+request.Name},nil
}

func main()  {
    g := grpc.NewServer()
    s := Server{}
    hello.RegisterGreeterServer(g,&s)
    lis, err := net.Listen("tcp", fmt.Sprintf(":8080"))
    if err != nil {
        panic("failed to listen: "+err.Error())
    }
    g.Serve(lis)
}

6. 客户端

package main

import (
    "context"
    "fmt"
    "google.golang.org/grpc"
    "grpc_demo/proto"
)

func main()  {
    conn,err := grpc.Dial("127.0.0.1:8080",grpc.WithInsecure())
    if err!=nil{
        panic(err)
    }
    defer conn.Close()
    c := hello.NewGreeterClient(conn)
    r,err := c.SayHello(context.Background(),&hello.HelloRequest{Name:"bobby"})
    if err!=nil{
        panic(err)
    }
    fmt.Println(r.Message)
}

grpc的四种数据流

srteam 顾名思义 就是 一种 流,可以源源不断的 推送 数据,很适合 传输一些大数据,或者 服务端 和 客户端 长时间 数据交互,比如 客户端 可以向 服务端 订阅 一个数据,服务端 就 可以利用 stream ,源源不断地 推送数据。

简单模式(Simple RPC)

这种模式最为传统,即客户端发起一次请求,服务端响应一个数据,这和大家平时熟悉的RPC没有什么大的区别,所以不再详细介绍。

服务端数据流模式(Server-side streaming RPC)

这种模式是客户端发起一次请求,服务端返回一段连续的数据流。典型的例子是客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端。

客户端数据流模式(Client-side streaming RPC)

与服务端数据流模式相反,这次是客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应。典型的例子是物联网终端向服务器报送数据。

双向数据流模式(Bidirectional streaming RPC)

顾名思义,这是客户端和服务端都可以向对方发送数据流,这个时候双方的数据可以同时互相发送,也就是可以实现实时交互。典型的例子是聊天机器人。

proto

syntax = "proto3";//声明proto的版本 只能 是3,才支持 grpc

//声明 包名
option go_package=".;proto";

//声明grpc服务
service Greeter {
    /*
    以下 分别是 服务端 推送流, 客户端 推送流 ,双向流。
    */
    rpc GetStream (StreamReqData) returns (stream StreamResData){}
    rpc PutStream (stream StreamReqData) returns (StreamResData){}
    rpc AllStream (stream StreamReqData) returns (stream StreamResData){}
}


//stream请求结构
message StreamReqData {
    string data = 1;
}
//stream返回结构
message StreamResData {
    string data = 1;
}

服务端

package main

import (
	"fmt"
	"google.golang.org/grpc"
	"log"
	"net"
	"start/new_stream/proto"
	"sync"
	"time"
)

const PORT  = ":50052"

type server struct {
}

//服务端 单向流
func (s *server)GetStream(req *proto.StreamReqData, res proto.Greeter_GetStreamServer) error{
	i:= 0
	for{
		i++
		res.Send(&proto.StreamResData{Data:fmt.Sprintf("%v",time.Now().Unix())})
		time.Sleep(1*time.Second)
		if i >10 {
			break
		}
	}
	return nil
}

//客户端 单向流
func (s *server) PutStream(cliStr proto.Greeter_PutStreamServer) error {

	for {
		if tem, err := cliStr.Recv(); err == nil {
			log.Println(tem)
		} else {
			log.Println("break, err :", err)
			break
		}
	}

	return nil
}

//客户端服务端 双向流
func(s *server) AllStream(allStr proto.Greeter_AllStreamServer) error {

	wg := sync.WaitGroup{}
	wg.Add(2)
	go func() {
		for {
			data, _ := allStr.Recv()
			log.Println(data)
		}
		wg.Done()
	}()

	go func() {
		for {
			allStr.Send(&proto.StreamResData{Data:"ssss"})
			time.Sleep(time.Second)
		}
		wg.Done()
	}()

	wg.Wait()
	return nil
}

func main(){
	//监听端口
	lis,err := net.Listen("tcp",PORT)
	if err != nil{
		panic(err)
		return
	}
	//创建一个grpc 服务器
	s := grpc.NewServer()
	//注册事件
	proto.RegisterGreeterServer(s,&server{})
	//处理链接
	err = s.Serve(lis)
	if err != nil {
		panic(err)
	}
}

客户端

package main

import (
	"google.golang.org/grpc"

	"context"
	_ "google.golang.org/grpc/balancer/grpclb"
	"log"
	"start/new_stream/proto"
	"time"
)

const (
	ADDRESS = "localhost:50052"
)


func main(){
	//通过grpc 库 建立一个连接
	conn ,err := grpc.Dial(ADDRESS,grpc.WithInsecure())
	if err != nil{
		return
	}
	defer conn.Close()
	//通过刚刚的连接 生成一个client对象。
	c := proto.NewGreeterClient(conn)
	//调用服务端推送流
	reqstreamData := &proto.StreamReqData{Data:"aaa"}
	res,_ := c.GetStream(context.Background(),reqstreamData)
	for {
		aa,err := res.Recv()
		if err != nil {
			log.Println(err)
			break
		}
		log.Println(aa)
	}
	//客户端 推送 流
	putRes, _ := c.PutStream(context.Background())
	i := 1
	for {
		i++
		putRes.Send(&proto.StreamReqData{Data:"ss"})
		time.Sleep(time.Second)
		if i > 10 {
			break
		}
	}
	//服务端 客户端 双向流
	allStr,_ := c.AllStream(context.Background())
	go func() {
		for {
			data,_ := allStr.Recv()
			log.Println(data)
		}
	}()

	go func() {
		for {
			allStr.Send(&proto.StreamReqData{Data:"ssss"})
			time.Sleep(time.Second)
		}
	}()

	select {
	}

}

文章作者: wmg
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 wmg !
  目录