gRPC流、异步通信

背景

gRPC是谷歌开源的一套基于Http实现的rpc通讯框架,用 protobuf 实现传输数据的定义,其格式类似json。由于其实现了多平台,在不同编程语言上用一套 probuf 数据的定义就可以生成不同rpc调用 客户端、服务端,不用自己去实现服务端的监听数据校检等等,是一个非常方便的工具。

我这里需要采用该工具开发一套 异步并发的远端调用的程序,所以快速的学了下其工作原理,写了个简单的异步通信的模板。

学习基础四种通信方式调用

主要是看这篇博客官方Python gRPC教程,学习了Python的gRPC的使用方式。相比官网给的例程,他还给出了 一个类似心跳传呼的机制 用服务端主动向客户端发送数据。原文观看需要验证码,我这里有打印出的pdf

gRPC的rpc调用有四种工作模式:

  • 基础的RPC,其中客户端使用存根向服务器发送请求 并等待响应返回,就像正常的函数调用一样。

    rpc GetFeature(Point) returns (Feature) {}
    
  • 响应流式处理 RPC,其中客户机向服务器发送一个请求并获得一个流以读取返回的消息序列。客户端从返回的流中读取,直到没有更多的消息

    rpc ListFeatures(Rectangle) returns (stream Feature) {}
    
  • 请求流式 RPC,其中客户端写入一系列消息并将它们发送到服务器,同样使用提供的流。一旦客户机完成了消息的写入,它将等待服务器读取所有消息并返回其响应。

    rpc RecordRoute(stream Point) returns (RouteSummary) {}
    
  • 双向流式传输 RPC,这两个流是独立运行的,因此客户机和服务器可以按照它们喜欢的任何顺序进行读写:例如,服务器可以等待接收所有客户机消息后再写入其响应,或者它可以交替读取一条消息,然后再写入一条消息,或者其他读写的组合。保留每个流中的消息顺序。

    rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
    

基础的使用我这里就不过多赘述,只看看他这里流的使用。不过博客的代码直接使用有些问题,我这边实际测试后的代码贴出。

代码结构和他一样

.
├── contact
│   ├── contact.proto
│   └── __init__.py
├── contact_client.py
└── contact_server.py

执行如下命令python -m grpc_tools.protoc -I . --python_out=. --grpc_python_out=. contact/*.proto生成 contact_pb2_grpc包含了客户端、服务端的调用函数,contact_pb2.py 就是message数据的定义。python里为了可以实现类似 流的 for in 操作使用yield来把流数据封装。 Python yield 使用浅析

它例子的双向流我就不介绍了,还蛮有意思的的看看他python流的实现。stub就是客户端,server就是 grpc.server。

  • contact\contact.proto:

    syntax = "proto3";
    
    // 定义一个服务
    service Contact {
        // 客户端通信给服务端,通信方式可以随意选择,这里我选择第4种通信方式
        rpc sendStatus (stream ClientMsg) returns (stream Result);
        // 客户端发送一个空消息给服务端,服务端就能给客户端通信了
        rpc getTask (Empty) returns (stream ServerMsg);
        // 客户端接受完服务端消息处理完后,再告诉服务端。这个tellResult也可以不要,看具体需求
        rpc tellResult (stream Result) returns (Empty);
    }
    
    message ClientMsg {
        string msg = 1;
    }
    
    message ServerMsg {
        string task = 1;
    }
    
    message Empty {
    
    }
    
    message Result {
        string ret = 1;
    }
    
  • contact_client.py

    import logging
    import random
    import threading
    import time
    import grpc
    from contact import contact_pb2
    from contact import contact_pb2_grpc
    
    # 先制造一些客户端能发送的数据
    def make_some_data():
        for i in range(10):
            print(f"发送当前状态:{i}")
            yield contact_pb2.ClientMsg(msg=f"数据:{i}")
    
    def send_status(stub):
        # 开启一个线程每隔60s发送当前流10组的状态包
        try:
            while True:
                status_response = stub.sendStatus(make_some_data())
                for ret in status_response:
                    print(ret.ret)
                print("发送完毕")
                time.sleep(60)
        except Exception as e:
            print(f'err in send_status:{e}')
            return
    
    # 接收服务端发送过来的任务
    def get_task(stub):
        try:
            for task in stub.getTask(contact_pb2.Empty()):
                print(f"客户端已接收到服务端任务:{task.task}\n")
                # 顺便再告诉服务端我已经接收到你发的任务,你不用担心我没接收到它
                yield contact_pb2.Result(
                    ret=f"客户端接收到任务:{task.task}"
                )
        except Exception as e:
            print(f'err:{e}')
            return
    
    def run():
        with grpc.insecure_channel('localhost:55555') as channel:
            stub = contact_pb2_grpc.ContactStub(channel)
            threading.Thread(target=send_status, args=(stub,), daemon=True).start()
            while True:
                try:
                    send_status(stub)
                    # result = get_task(stub)
                    # stub.tellResult(result)
                except grpc.RpcError as e:
                    print(f"server connected out, please retry:{e.code()},{e.details()}")
                except Exception as e:
                    print(f'unknown err:{e}')
                finally:
                    time.sleep(2)
    
    if __name__ == '__main__':
        run()
    
  • contact_server.py (基本没问题,把sendStatus的返回值的Result改为ret即可)

    import logging
    import random
    import time
    from concurrent import futures
    import grpc
    from contact import contact_pb2_grpc
    from contact import contact_pb2
    
    class MyserverClass:
        def __init__(self) -> None:
            self.tasks = []
    
        # 注意服务端的具体实现函数是在类里面
        def sendStatus(self, request_iterator, context):
            for note in request_iterator:
                ret=f"服务端接收到消息:{note.msg}"
                print(ret)
                yield contact_pb2.Result(ret = "来自服务端的响应: " + ret)
    
        # 在类初试化的时候定义了一个列表self.tasks来充当任务队列
        def getTask(self, request_iterator, context):
            print("服务端已接收到客户端上线通知,开始发送任务给客户端\n")
            last_index = 0
            while True:
                print("服务端开始发送任务给客户端了。。。。。。\n")
                while len(self.tasks) > last_index:
                    n = self.tasks[last_index]
                    last_index += 1
                    yield n
                    print(f'服务端发送给了客户端任务:{n.task}##########\n')
    
            # 顺便制造些服务端的任务数据用来填充到任务队列里面
                for i in range(10):
                    num = random.randint(100, 200)
                    self.tasks.append(contact_pb2.ServerMsg(
                        task=f"任务:{num}"
                    ))
                time.sleep(40)
    
        def tellResult(self, request_iterator, context):
            for response in request_iterator:
                print(f"我已经知道客户端接收到我发过去的任务:{response.ret}")
            return contact_pb2.Empty()
    
    if __name__ == '__main__':
        server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
        contact_pb2_grpc.add_ContactServicer_to_server(
            MyserverClass(), server)
        server.add_insecure_port('[::]:55555')
        server.start()
        server.wait_for_termination()
    

学习异步调用

由于Python的多线程实际上是类似协程的实现,所以对于CPU密集型任务,哪怕使用并发多线程其性能由于同步堵塞也相当于单线程性能,这个缺陷导致我这边一个工具性能达到瓶颈,所以需要调用rpc时异步不堵塞,之后再采集结果。

官网的教程中就有其执行方式:

route_summary_future = stub.RecordRoute.future(point_iterator)
route_summary = route_summary_future.result()

对于 future result 的api 例如等待超时等参数可以在这里查看使用方式 Search — gRPC Python 1.46.2 documentation

  • 服务端添加send_cmd实现,用随机值模拟不同cmd执行耗时:

    def remoteCMD(self, request: contact_pb2.cmdRequest, context):
        try:
            t = random.randint(3, 10)
            print(f"执行 '{request.cmdStr}' 需要花费{t}s")
            time.sleep(t)
            print(f"response index: {request.index}")
            return contact_pb2.cmdResponse(
                    index = request.index,
                    success = True,
                    res = request.log,
                    # res = request.cmd + 报错信息,
                )
        except Exception as e:
            print(e)
            return
    
  • 客户端添加send_cmd实现,其是异步执行的:

    def send_cmd(stub: contact_pb2_grpc.ContactStub):
        try:
            func = []
            for i in range(10):
                req = contact_pb2.cmdRequest(
                        index = i, 
                        cmdStr = f"ip link add veth{i}", 
                        log = f"PTP n1 - n{i}",
                    )
                print(f"发送指令 index:{req.index}")
                # response = stub.remoteCMD(req)  # 普通同步堵塞执行
                response_future = stub.remoteCMD.future(req)
                func.append(response_future)
            # func[0].result()   # 调用result时如果该request收到response就直接打印否则就堵塞等待其执行完成
            # for i in range(10):
            #     a = func[i].result()
            #     print(int(a.index))
        except Exception as e:
            print(e)
            return
    
    def run():
        with grpc.insecure_channel('localhost:55555') as channel:
            stub = contact_pb2_grpc.ContactStub(channel)
            while True:
                try:
                    send_cmd(stub)
                    time.sleep(100) # 不等待一会,客户端直接结束服务端不会返回值了
                except grpc.RpcError as e:
                    print(f"server connected out, please retry:{e.code()},{e.details()}")
                except Exception as e:
                    print(f'unknown err:{e}')
                finally:
                    time.sleep(2)
    

所以,由此简易demo可以知道,先发送异步的request。等需要调用其结果时在获取 response_future的result即可。

python logging

这里额外提一嘴 python的 logging过滤器

参考这篇博客就可以实现简单的对 level、前缀之类的数据过滤

# 继承 logging.Filter 类并重载filter函数即可,filter() 为 true表示不输出
class stringFilter(logging.Filter):
    def filter(self, record):
        if record.msg.find('123') == -1:
            return True
        return False

logging.root.addFilter(logging.Filter(json)) # logging默认执行的handler是root

当然 record 有用的参数还有这些 可以自定义许多有效的 过滤器,name指的执行该处的用户名

  • record.filename
  • record.funcName
  • record.levelname
  • record.module
  • record.msg
  • record.name
  • record.pathname
  • record.processName

之前配置logging参数(参数解析教程)的时候都是 直接创建对应的 handler ,然后用 getLogger 取出handler,logger = logging.getLogger("console_logger")

现在才发现不用这么麻烦,每次导一个自定义的logger。因为系统默认的 logging.info('info message') 就会找root的handlers,我们把自定义的 handler加入root组中即可,这样 直接 logging.info 的调用。配置如下所示:

{
	"version": 1,
	"handlers": {
		"console": {
			"class": "logging.StreamHandler",
			"formatter": "default",
			"level": "INFO",
			"stream": "ext://sys.stdout"
		},
		"lktest": {
			"class": "logging.handlers.RotatingFileHandler",
			"formatter": "default",
			"level": "INFO",
			"filename": "/home/lk233/core/test.log",
    		"maxBytes": 10485760,
    		"backupCount": 2
		}
	},
	"formatters": {
		"default": {
			"format": "%(asctime)s - %(levelname)s - %(module)s:%(funcName)s - %(message)s"
		}
	},
	"root": {
		"level": "DEBUG",
		"handlers": ["console", "lktest"]
	}
}