In my last article here I explained how to write your own gRPC service(please go through it before starting this tutorial).
However there are a number of features in gRPC using which can be used for better performance. One such feature is streaming and we are going to implement streaming in gRPC using generators in python.
Generators in Python
Generators are iterators which return values only on demand. They use the keyword yield for returning values. They’re evaluated lazily and do not block upon invocation.
Implementing Fibonacci with Generators
You could almost everything that you do in a generator in a loop, however in some cases generators actually end up being more performant as compared to a loop. Consider the code below:
def fibonacci(number): fibo_sequence = [] i = 0 while i < number: i = i + 1 if len(fibo_sequence) < 2: fibo_sequence.append(1) continue fibo_sequence.append(fibo_sequence[-1] + fibo_sequence[-2]) return fibo_sequence
This is a simple function which returns the first 10 fibonacci numbers.
In [1]: fibonacci(10) Out[1]: [1, 1, 2, 3, 5, 8, 13, 21, 34, 55] In [2]: fibonacci(15) Out[2]: [1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610]
This evaluates the fibonacci numbers as soon as the function will be invoked. Essentially your program execution will block till this function executes completely.
Now we implement the above function using a generator:
def fibonacci_gen(number): fibo_sequence = [] i = 0 while i < number: i = i + 1 if len(fibo_sequence) < 2: fibo_sequence.append(1) yield fibo_sequence[-1] continue new_number = sum(fibo_sequence) fibo_sequence.append(new_number) fibo_sequence.pop(0) yield new_number
Invoking this function will return a generator object that needs to be iterated to get results from:
In [40]: fibonacci_gen(10): In [41]: <generator object fibonacci_gen at 0x10e68dd00> In [42]: for i in fibonacci_gen(10): ...: print(i) ...: 1 1 2 3 5 8 13 21 34 55
As you can see you get a generator object(which can be iterated over in python). Once you do iterate over it, you get all the results.
Using Generators with gRPC
You can clone the gRPC example from here. We will be making changes from where we left the last tutorial.
We will be implementing a service which takes a sentence, digests it and streams the digest for each word separately(in order though).
Edit the Proto definition to add an additional RPC
We need to start editing our proto file under the name of digestor.proto.
Currently the file has only a single rpc declared in it. It looks like this:
syntax = "proto3"; // You can ignore these for now //option java_multiple_files = true; //option java_package = "example-digestor.resource.grpc.digestor"; //option java_outer_classname = "DigestorProto"; //option objc_class_prefix = "DIGEST"; package digestor; service Digestor{ rpc GetDigestor(DigestMessage) returns (DigestedMessage) {} } message DigestMessage{ string ToDigest = 1; } message DigestedMessage{ string Digested = 1; bool WasDigested = 2; }
We need to add another rpc which supports streaming and implement it in the server as well as the client.
To specify a streaming service in gRPC we use the keyword stream in the proto file to specify the streaming.
syntax = "proto3"; // You can ignore these for now //option java_multiple_files = true; //option java_package = "example-digestor.resource.grpc.digestor"; //option java_outer_classname = "DigestorProto"; //option objc_class_prefix = "DIGEST"; package digestor; service Digestor{ rpc GetDigestor(DigestMessage) returns (DigestedMessage) {} rpc GetDStream(DigestMessage) returns (stream DigestedMessage) {} } message DigestMessage{ string ToDigest = 1; } message DigestedMessage{ string Digested = 1; bool WasDigested = 2; }
Since we are using same the message protocol buffers that are already declared, we don’t need to declare anything else, other than declaring the rpc.
Generate protocol definition for the new proto file
Use the below give command to actually generate gRPC stubs for python as follows:
python -m grpc_tools.protoc --proto_path=. ./digestor.proto --python_out=. --grpc_python_out=.
Check out the previous tutorial here to see how to setup this up.
Implement the actual rpc in the server
This is where we write the actual implementation of the the GetDStream rpc. The digest_server.py is where the implementation of the all the rpc’s live.
import grpc import time import hashlib import digestor_pb2 import digestor_pb2_grpc from concurrent import futures class DigestorServicer(digestor_pb2_grpc.DigestorServicer): """ gRPC server for Digestor Service """ def __init__(self, *args, **kwargs): self.server_port = 46001 def GetDigestor(self, request, context): """ Implementation of the rpc GetDigest declared in the proto file above. """ # get the string from the incoming request to_be_digested = request.ToDigest # digest and get the string representation # from the digestor hasher = hashlib.sha256() hasher.update(to_be_digested.encode()) digested = hasher.hexdigest() # print the output here print(digested) result = {'Digested': digested, 'WasDigested': True} return digestor_pb2.DigestedMessage(**result) # code specific to our new service def GetDStream(self, request, context): """ RPC for getting all the """ # get the sentence that needs to be processed to_be_digested_message = request.ToDigest # get all the words in the sentence word_list =to_be_digested_message.split(' ') for word in word_list: yield digestor_pb2.DigestedMessage(**self.get_hash(word)) def get_hash(self, data): """ Class function for returning the hash of a function """ hasher = hashlib.sha256() hasher.update(data.encode()) digested = hasher.hexdigest() return {'Digested': digested, 'WasDigested': True} def start_server(self): """ Function which actually starts the gRPC server, and preps it for serving incoming connections """ # declare a server object with desired number # of thread pool workers. digestor_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) # This line can be ignored digestor_pb2_grpc.add_DigestorServicer_to_server(DigestorServicer(),digestor_server) # bind the server to the port defined above digestor_server.add_insecure_port('[::]:{}'.format(self.server_port)) # start the server digestor_server.start() print ('Digestor Server running ...') try: # need an infinite loop since the above # code is non blocking, and if I don't do this # the program will exit while True: time.sleep(60*60*60) except KeyboardInterrupt: digestor_server.stop(0) print('Digestor Server Stopped ...') curr_server = DigestorServicer() curr_server.start_server()
Adding the rpc to the client
Once we have added the implementation to the server, we also need to add the caller method to the client as well. If we don’t do this we won’t be able to call our remote procedure call remotely. Modify the the digestor_server.py to ensure the contents look like this:
import grpc import digestor_pb2 import digestor_pb2_grpc class DigestorClient(object): """ Client for accessing the gRPC functionality """ def __init__(self): # configure the host and the # the port to which the client should connect # to. self.host = 'localhost' self.server_port = 46001 # instantiate a communication channel self.channel = grpc.insecure_channel( '{}:{}'.format(self.host, self.server_port)) # bind the client to the server channel self.stub = digestor_pb2_grpc.DigestorStub(self.channel) def get_digest(self, message): """ Client function to call the rpc for GetDigest """ to_digest_message =digestor_pb2.DigestMessage(ToDigest=message) return self.stub.GetDigestor(to_digest_message) # function to invoke our newly implemented RPC def get_streaming_digest(self, message): """ Client function to call the rpc for GetDStream """ to_digest_message =digestor_pb2.DigestMessage(ToDigest=message) digested_words = self.stub.GetDStream(to_digest_message) for digested_word in digested_words: print(digested_word)
Testing out the streaming rpc
With the server running in another console(but in the same virtualenv), fire up the python interpreter, and write the following:
from digestor_client import DigestorClient currs_client = DigestorClient()
Call the all the get_streaming_digest function on the client object to invoke the gRPC. For example:
currs_client.get_streaming_digest('This is a sample test where I get streaming responses')
This should return the following
In [9]: currs_client.get_streaming_digest('This is a sample test where I get streaming responses') ToDigest: "86e1de74820a9b252ba33b2eed445b0cd02c445b5f4b8007205aff1762d7301a" ToDigest: "fa51fd49abf67705d6a35d18218c115ff5633aec1f9ebfdc9d5d4956416f57f6" ToDigest: "ca978112ca1bbdcafac231b39a23dc4da786eff8147c4e72b9807785afee48bb" ToDigest: "af2bdbe1aa9b6ec1e2ade1d694f41fc71a831d0268e9891562113d8a62add1bf" ToDigest: "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08" ToDigest: "b48111c10c65fc119368edafb19f97451759ee90b3f44647368135ca47aa4753" ToDigest: "a83dd0ccbffe39d071cc317ddf6e97f5c6b1c87af91919271f9fa140b0508c6c" ToDigest: "2998b3232d29e8dc5a78d97a32ce83f556f3ed31b057077503df05641dd79158" ToDigest: "b304adc33719d6cddb96d6c4f1aa46628d927547b49b70c981c7fea953e0600a" ToDigest: "e30b3910ffd943c99f7edd7543c56c608522784ec521e85a91cf296f14a13e72"
You can find the code here . It is under the branch streaming.
Further
This is an excellent way to build long running network calls which are iterative in nature, as well make them performant.
I have faced a bit of random GO-AWAY errors with gRPC while using it production. I would love to here incase anyone has experienced the same.