GRPC
rpc
1. rpc之hello world
Go语言的RPC包的路径为net/rpc,也就是放在了net包目录下面。因此我们可以猜测该RPC包是建立在net包基础之上的。在第一章“Hello, World”革命一节最后,我们基于http实现了一个打印例子。下面我们尝试基于rpc实现一个类似的例子。
1. 服务端:
package main
import (
"net"
"net/rpc"
)
type HelloService struct {}
func (s *HelloService) Hello(request string, reply *string) error {
*reply = "hello "+ request
return nil
}
func main(){
_ = rpc.RegisterName("HelloService", &HelloService{})
listener, err := net.Listen("tcp", ":1234")
if err != nil {
panic("监听端口失败")
}
conn, err := listener.Accept()
if err != nil {
panic("建立链接失败")
}
rpc.ServeConn(conn)
}
其中Hello方法必须满足Go语言的RPC规则
:方法只能有两个可序列化的参数,其中第二个参数是指针类型,并且返回一个error类型,同时必须是公开的方法
。
然后就可以将HelloService类型的对象注册为一个RPC服务:(TCP RPC服务)。
其中rpc.Register
函数调用会将对象类型中所有满足RPC规则的对象方法注册为RPC函数,所有注册的方法会放在“HelloService”服务空间之下。然后我们建立一个唯一的TCP链接,并且通过rpc.ServeConn函数在该TCP链接上为对方提供RPC服务。
2. 客户端
func main() {
client, err := rpc.Dial("tcp", "localhost:1234")
if err != nil {
log.Fatal("dialing:", err)
}
var reply string
err = client.Call("HelloService.Hello", "hello", &reply)
if err != nil {
log.Fatal(err)
}
fmt.Println(reply)
}
首先是通过rpc.Dial
拨号RPC服务,然后通过client.Call
调用具体的RPC方法。在调用client.Call
时,第一个参数是用点号链接的RPC服务名字和方法名字,第二和第三个参数分别我们定义RPC方法的两个参数
。
2. rpc支持json
Go语言的RPC框架有两个比较有特色的设计:一个是RPC数据打包时可以通过插件实现自定义的编码和解码;另 一个是RPC建立在抽象的io.ReadWriteCloser接口之上的,我们可以将RPC架设在不同的通讯协议之上。这里我们将尝试通过官方自带的net/rpc/jsonrpc扩展实现一个跨语言的RPC。
首先是基于json编码重新实现RPC服务:
服务端
package main
import (
"net"
"net/rpc"
"net/rpc/jsonrpc"
)
type HelloService struct {}
func (s *HelloService) Hello(request string, reply *string) error {
*reply = "hello "+ request
return nil
}
func main(){
rpc.RegisterName("HelloService", new(HelloService))
listener, err := net.Listen("tcp", ":1234")
if err != nil {
panic("启动错误")
}
for {
conn, err := listener.Accept()
if err != nil {
panic("接收")
}
go rpc.ServeCodec(jsonrpc.NewServerCodec(conn))
}
}
代码中最大的变化是用rpc.ServeCodec函数替代了rpc.ServeConn函数,传入的参数是针对服务端的json编解码器。
客户端
package main
import (
"fmt"
"net"
"net/rpc"
"net/rpc/jsonrpc"
)
func main(){
conn, err := net.Dial("tcp", "localhost:1234")
if err != nil {
panic("连接错误")
}
client := rpc.NewClientWithCodec(jsonrpc.NewClientCodec(conn))
var reply string
err = client.Call("HelloService.Hello", "imooc", &reply)
if err != nil {
panic("调用错误")
}
fmt.Println(reply)
}
3. 基于http的rpc
服务端
func main() {
rpc.RegisterName("HelloService", new(HelloService))
http.HandleFunc("/jsonrpc", func(w http.ResponseWriter, r *http.Request) {
var conn io.ReadWriteCloser = struct {
io.Writer
io.ReadCloser
}{
ReadCloser: r.Body,
Writer: w,
}
rpc.ServeRequest(jsonrpc.NewServerCodec(conn))
})
http.ListenAndServe(":1234", nil)
}
4. 进一步改进rpc调用过程
前面的rpc调用虽然简单,但是和普通的http的调用差异不大,这次我们解决下面的问题:
1. serviceName统一和名称冲突的问题
- server端和client端如何统一serviceName
- 多个server的包中serviceName同名的问题
新建handler/handler.go文件内容如下: 为什么要新建一个文件? - 解耦
package handler
const HelloServiceName = "handler/HelloService"
1. 服务端
package main
import (
"net"
"net/rpc"
"start/rpc_ch01/handler"
)
type HelloService struct {}
func (s *HelloService) Hello(request string, reply *string) error {
*reply = "hello "+ request
return nil
}
func main(){
_ = rpc.RegisterName(handler.HelloServiceName, &HelloService{})
listener, err := net.Listen("tcp", ":1234")
if err != nil {
panic("监听端口失败")
}
conn, err := listener.Accept()
if err != nil {
panic("建立链接失败")
}
rpc.ServeConn(conn)
}
2. 客户端
package main
import (
"fmt"
"net/rpc"
"start/rpc_ch01/handler"
)
func main() {
client, err := rpc.Dial("tcp", "localhost:1234")
if err != nil {
panic("连接到服务器失败")
}
var reply string
err = client.Call(handler.HelloServiceName+".Hello", "imooc", &reply)
if err != nil {
panic("服务调用失败")
}
fmt.Println(reply)
}
2. 继续屏蔽HelloServiceName和Hello函数名称
1. handler源码
package handler
type HelloService struct{}
func (s *HelloService) Hello(request string, reply *string) error {
*reply = "hello " + request
return nil
}
2. 服务端代理
package server_proxy
import "net/rpc"
const HelloServiceName = "handler/HelloService"
type HelloServiceInterface interface {
Hello(request string, reply *string) error
}
func RegisterHelloService(srv HelloServiceInterface) error {
return rpc.RegisterName(HelloServiceName, srv)
}
3. 服务端
package main
import (
"net"
"net/rpc"
"start/rpc_ch01/handler"
"start/rpc_ch01/server_proxy"
)
func main(){
hellohandler := &handler.HelloService{}
_ = server_proxy.RegisterHelloService(hellohandler)
listener, err := net.Listen("tcp", ":1234")
if err != nil {
panic("监听端口失败")
}
conn, err := listener.Accept()
if err != nil {
panic("建立链接失败")
}
rpc.ServeConn(conn)
}
4. 客户端代理
package client_proxy
import "net/rpc"
const HelloServiceName = "handler/HelloService"
type HelloServiceClient struct{
*rpc.Client
}
func NewClient(address string) HelloServiceClient {
conn, err := rpc.Dial("tcp", address)
if err != nil {
panic("连接服务器错误")
}
return HelloServiceClient{conn}
}
func (c *HelloServiceClient) Hello(request string, reply *string) error {
err := c.Call(HelloServiceName+".Hello", request, reply)
if err != nil {
return err
}
return nil
}
5. 客户端
package main
import (
"fmt"
"start/rpc_ch01/client_proxy"
)
func main(){
client := client_proxy.NewClient("localhost:1234")
var reply string
err := client.Hello("bobby",&reply)
if err != nil {
panic("调用失败")
}
fmt.Println(reply)
}
grpc
gRPC 是一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计。目前提供 C、Java 和 Go 语言版本,分别是:grpc, grpc-java, grpc-go. 其中 C 版本支持 C, C++, Node.js, Python, Ruby, Objective-C, PHP 和 C# 支持.

protobuf
java中的dubbo dubbo/rmi/hessian messagepack 如果你懂了协议完全有能力自己去实现一个协议
习惯用
Json、XML
数据存储格式的你们,相信大多都没听过Protocol Buffer
Protocol Buffer
其实 是Google
出品的一种轻量 & 高效的结构化数据存储格式,性能比Json、XML
真的强!太!多!protobuf经历了protobuf2和protobuf3,pb3比pb2简化了很多,目前主流的版本是pb3
grpc开发体验
1. 下载工具
如果觉得下载较慢可以点击这里下载:
protoc-3.13.0-linux-x86_64.zip
下载完成后解压后将路径添加到环境变量中
2. 下载go的依赖包
go get github.com/golang/protobuf/protoc-gen-go
3. proto文件
syntax = "proto3";
option go_package = ".;proto";
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply);
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
4. 生成go文件
protoc -I . goods.proto --go_out=plugins=grpc:.
5. 服务端代码
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"grpc_demo/hello"
"net"
)
type Server struct {
}
func (s *Server) SayHello(ctx context.Context,request *hello.HelloRequest)(*hello.HelloReply,error){
return &hello.HelloReply{Message:"Hello "+request.Name},nil
}
func main() {
g := grpc.NewServer()
s := Server{}
hello.RegisterGreeterServer(g,&s)
lis, err := net.Listen("tcp", fmt.Sprintf(":8080"))
if err != nil {
panic("failed to listen: "+err.Error())
}
g.Serve(lis)
}
6. 客户端
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"grpc_demo/proto"
)
func main() {
conn,err := grpc.Dial("127.0.0.1:8080",grpc.WithInsecure())
if err!=nil{
panic(err)
}
defer conn.Close()
c := hello.NewGreeterClient(conn)
r,err := c.SayHello(context.Background(),&hello.HelloRequest{Name:"bobby"})
if err!=nil{
panic(err)
}
fmt.Println(r.Message)
}
grpc的四种数据流
srteam 顾名思义 就是 一种 流,可以源源不断的 推送 数据,很适合 传输一些大数据,或者 服务端 和 客户端 长时间 数据交互,比如 客户端 可以向 服务端 订阅 一个数据,服务端 就 可以利用 stream ,源源不断地 推送数据。
简单模式(Simple RPC)
这种模式最为传统,即客户端发起一次请求,服务端响应一个数据,这和大家平时熟悉的RPC没有什么大的区别,所以不再详细介绍。
服务端数据流模式(Server-side streaming RPC)
这种模式是客户端发起一次请求,服务端返回一段连续的数据流。典型的例子是客户端向服务端发送一个股票代码,服务端就把该股票的实时数据源源不断的返回给客户端。
客户端数据流模式(Client-side streaming RPC)
与服务端数据流模式相反,这次是客户端源源不断的向服务端发送数据流,而在发送结束后,由服务端返回一个响应。典型的例子是物联网终端向服务器报送数据。
双向数据流模式(Bidirectional streaming RPC)
顾名思义,这是客户端和服务端都可以向对方发送数据流,这个时候双方的数据可以同时互相发送,也就是可以实现实时交互。典型的例子是聊天机器人。
proto
syntax = "proto3";//声明proto的版本 只能 是3,才支持 grpc
//声明 包名
option go_package=".;proto";
//声明grpc服务
service Greeter {
/*
以下 分别是 服务端 推送流, 客户端 推送流 ,双向流。
*/
rpc GetStream (StreamReqData) returns (stream StreamResData){}
rpc PutStream (stream StreamReqData) returns (StreamResData){}
rpc AllStream (stream StreamReqData) returns (stream StreamResData){}
}
//stream请求结构
message StreamReqData {
string data = 1;
}
//stream返回结构
message StreamResData {
string data = 1;
}
服务端
package main
import (
"fmt"
"google.golang.org/grpc"
"log"
"net"
"start/new_stream/proto"
"sync"
"time"
)
const PORT = ":50052"
type server struct {
}
//服务端 单向流
func (s *server)GetStream(req *proto.StreamReqData, res proto.Greeter_GetStreamServer) error{
i:= 0
for{
i++
res.Send(&proto.StreamResData{Data:fmt.Sprintf("%v",time.Now().Unix())})
time.Sleep(1*time.Second)
if i >10 {
break
}
}
return nil
}
//客户端 单向流
func (s *server) PutStream(cliStr proto.Greeter_PutStreamServer) error {
for {
if tem, err := cliStr.Recv(); err == nil {
log.Println(tem)
} else {
log.Println("break, err :", err)
break
}
}
return nil
}
//客户端服务端 双向流
func(s *server) AllStream(allStr proto.Greeter_AllStreamServer) error {
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
for {
data, _ := allStr.Recv()
log.Println(data)
}
wg.Done()
}()
go func() {
for {
allStr.Send(&proto.StreamResData{Data:"ssss"})
time.Sleep(time.Second)
}
wg.Done()
}()
wg.Wait()
return nil
}
func main(){
//监听端口
lis,err := net.Listen("tcp",PORT)
if err != nil{
panic(err)
return
}
//创建一个grpc 服务器
s := grpc.NewServer()
//注册事件
proto.RegisterGreeterServer(s,&server{})
//处理链接
err = s.Serve(lis)
if err != nil {
panic(err)
}
}
客户端
package main
import (
"google.golang.org/grpc"
"context"
_ "google.golang.org/grpc/balancer/grpclb"
"log"
"start/new_stream/proto"
"time"
)
const (
ADDRESS = "localhost:50052"
)
func main(){
//通过grpc 库 建立一个连接
conn ,err := grpc.Dial(ADDRESS,grpc.WithInsecure())
if err != nil{
return
}
defer conn.Close()
//通过刚刚的连接 生成一个client对象。
c := proto.NewGreeterClient(conn)
//调用服务端推送流
reqstreamData := &proto.StreamReqData{Data:"aaa"}
res,_ := c.GetStream(context.Background(),reqstreamData)
for {
aa,err := res.Recv()
if err != nil {
log.Println(err)
break
}
log.Println(aa)
}
//客户端 推送 流
putRes, _ := c.PutStream(context.Background())
i := 1
for {
i++
putRes.Send(&proto.StreamReqData{Data:"ss"})
time.Sleep(time.Second)
if i > 10 {
break
}
}
//服务端 客户端 双向流
allStr,_ := c.AllStream(context.Background())
go func() {
for {
data,_ := allStr.Recv()
log.Println(data)
}
}()
go func() {
for {
allStr.Send(&proto.StreamReqData{Data:"ssss"})
time.Sleep(time.Second)
}
}()
select {
}
}