Writing Streaming services with gRPC

Posted by

In my last article here I explained how to write your own gRPC service(please go through it before starting this tutorial).

However there are a number of features in gRPC using which can be used for better performance. One such feature is streaming and we are going to implement streaming in gRPC using generators in python.

Generators in Python

Generators are iterators which return values only on demand. They use the keyword yield for returning values. They’re evaluated lazily and do not block upon invocation.

Implementing Fibonacci with Generators

You could almost everything that you do in a generator in a loop, however in some cases generators actually end up being more performant as compared to a loop. Consider the code below:

def fibonacci(number):

    fibo_sequence = []
    i = 0

    while i < number:
         i = i + 1

         if len(fibo_sequence) < 2:
             fibo_sequence.append(1)
             continue

         fibo_sequence.append(fibo_sequence[-1] + fibo_sequence[-2])

    return fibo_sequence

This is a simple function which returns the first 10 fibonacci numbers.

In [1]: fibonacci(10)
Out[1]: [1, 1, 2, 3, 5, 8, 13, 21, 34, 55]

In [2]: fibonacci(15)
Out[2]: [1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610]

This evaluates the fibonacci numbers as soon as the function will be invoked. Essentially your program execution will block till this function executes completely.

Now we implement the above function using a generator:

def fibonacci_gen(number):
    fibo_sequence = []
    i = 0
    while i < number:        
        i = i + 1
        if len(fibo_sequence) < 2:
           fibo_sequence.append(1)
           yield fibo_sequence[-1]
           continue
        new_number = sum(fibo_sequence)
        fibo_sequence.append(new_number)
        fibo_sequence.pop(0)
        yield new_number

Invoking this function will return a generator object that needs to be iterated to get results from:

In [40]: fibonacci_gen(10):
In [41]: <generator object fibonacci_gen at 0x10e68dd00>
In [42]: for i in fibonacci_gen(10):
 ...:        print(i)
 ...:
1
1
2
3
5
8
13
21
34
55

As you can see you get a generator object(which can be iterated over in python). Once you do iterate over it, you get all the results.

Using Generators with gRPC

You can clone the gRPC example from here. We will be making changes from where we left the last tutorial.

We will be implementing a service which takes a sentence, digests it and streams the digest for each word separately(in order though).

Edit the Proto definition to add an additional RPC

We need to start editing our proto file under the name of digestor.proto.

Currently the file has only a single rpc declared in it. It looks like this:

syntax = "proto3";

// You can ignore these for now
//option java_multiple_files = true;
//option java_package = "example-digestor.resource.grpc.digestor";
//option java_outer_classname = "DigestorProto";
//option objc_class_prefix = "DIGEST";

package digestor;

service Digestor{
 rpc GetDigestor(DigestMessage) returns (DigestedMessage) {}
}

message DigestMessage{
 string ToDigest = 1;
}

message DigestedMessage{
 string Digested = 1;
 bool WasDigested = 2;
}

We need to add another rpc which supports streaming and implement it in the server as well as the client.

To specify a streaming service in gRPC  we use the keyword stream in the proto file to specify the streaming.

syntax = "proto3";

// You can ignore these for now
//option java_multiple_files = true;
//option java_package = "example-digestor.resource.grpc.digestor";
//option java_outer_classname = "DigestorProto";
//option objc_class_prefix = "DIGEST";

package digestor;

service Digestor{
 rpc GetDigestor(DigestMessage) returns (DigestedMessage) {}
 rpc GetDStream(DigestMessage) returns (stream DigestedMessage) {}
}

message DigestMessage{
 string ToDigest = 1;
}

message DigestedMessage{
 string Digested = 1;
 bool WasDigested = 2;
}

Since we are using same the message protocol buffers that are already declared, we don’t need to declare anything else, other than declaring the rpc.

Generate protocol definition for the new proto file

Use the below give command to actually generate gRPC stubs for python as follows:

python -m grpc_tools.protoc --proto_path=. ./digestor.proto --python_out=. --grpc_python_out=.

Check out the previous tutorial here to see how to setup this up.

Implement the actual rpc in the server

This is where we write the actual implementation of the the GetDStream rpc. The digest_server.py is where the implementation of the all the rpc’s live.

import grpc
import time
import hashlib
import digestor_pb2
import digestor_pb2_grpc
from concurrent import futures

class DigestorServicer(digestor_pb2_grpc.DigestorServicer):
    """
    gRPC server for Digestor Service
    """
    def __init__(self, *args, **kwargs):
        self.server_port = 46001

    def GetDigestor(self, request, context):
        """
        Implementation of the rpc GetDigest declared in the proto
        file above.
        """
        # get the string from the incoming request
        to_be_digested = request.ToDigest

        # digest and get the string representation
        # from the digestor
        hasher = hashlib.sha256()
        hasher.update(to_be_digested.encode())
        digested = hasher.hexdigest()

        # print the output here
        print(digested)

        result = {'Digested': digested, 'WasDigested': True}

        return digestor_pb2.DigestedMessage(**result)

    # code specific to our new service
    def GetDStream(self, request, context):
       """
       RPC for getting all the 
       """
       # get the sentence that needs to be processed
       to_be_digested_message = request.ToDigest

       # get all the words in the sentence
       word_list =to_be_digested_message.split(' ')

       for word in word_list:
           yield digestor_pb2.DigestedMessage(**self.get_hash(word)) 

    def get_hash(self, data):
       """
       Class function for returning the hash of a function
       """
       hasher = hashlib.sha256()
       hasher.update(data.encode())
       digested = hasher.hexdigest()
       return {'Digested': digested, 'WasDigested': True}

    def start_server(self):
        """
        Function which actually starts the gRPC server, and preps
        it for serving incoming connections
        """
        # declare a server object with desired number
        # of thread pool workers.
        digestor_server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))

        # This line can be ignored
        digestor_pb2_grpc.add_DigestorServicer_to_server(DigestorServicer(),digestor_server)

        # bind the server to the port defined above
        digestor_server.add_insecure_port('[::]:{}'.format(self.server_port))

        # start the server
        digestor_server.start()
        print ('Digestor Server running ...')

        try:
            # need an infinite loop since the above
            # code is non blocking, and if I don't do this
            # the program will exit
            while True:
                time.sleep(60*60*60)
        except KeyboardInterrupt:
            digestor_server.stop(0)
            print('Digestor Server Stopped ...')

curr_server = DigestorServicer()
curr_server.start_server()

Adding the rpc to the client

Once we have added the implementation to the server, we also need to add the caller method to the client as well. If we don’t do this we won’t be able to call our remote procedure call remotely. Modify the the digestor_server.py to ensure the contents look like this:

import grpc
import digestor_pb2
import digestor_pb2_grpc

class DigestorClient(object):
    """
    Client for accessing the gRPC functionality
    """

    def __init__(self):
        # configure the host and the
        # the port to which the client should connect
        # to.
        self.host = 'localhost'
        self.server_port = 46001

        # instantiate a communication channel
        self.channel = grpc.insecure_channel(
            '{}:{}'.format(self.host, self.server_port))

        # bind the client to the server channel
        self.stub = digestor_pb2_grpc.DigestorStub(self.channel)

    def get_digest(self, message):
        """
        Client function to call the rpc for GetDigest
        """
        to_digest_message =digestor_pb2.DigestMessage(ToDigest=message)
        return self.stub.GetDigestor(to_digest_message)

    # function to invoke our newly implemented RPC
    def get_streaming_digest(self, message):
        """
        Client function to call the rpc for GetDStream
        """
        to_digest_message =digestor_pb2.DigestMessage(ToDigest=message)
        digested_words = self.stub.GetDStream(to_digest_message)
        for digested_word in digested_words:
            print(digested_word)

Testing out the streaming rpc

With the server running in another console(but in the same virtualenv), fire up the python interpreter, and write the following:

from digestor_client import DigestorClient
currs_client = DigestorClient()

Call the all the get_streaming_digest function on the client object to invoke the gRPC. For example:

currs_client.get_streaming_digest('This is a sample test where I get streaming responses')

This should return the following

In [9]: currs_client.get_streaming_digest('This is a sample test where I get streaming responses')

ToDigest: "86e1de74820a9b252ba33b2eed445b0cd02c445b5f4b8007205aff1762d7301a"
ToDigest: "fa51fd49abf67705d6a35d18218c115ff5633aec1f9ebfdc9d5d4956416f57f6"
ToDigest: "ca978112ca1bbdcafac231b39a23dc4da786eff8147c4e72b9807785afee48bb"
ToDigest: "af2bdbe1aa9b6ec1e2ade1d694f41fc71a831d0268e9891562113d8a62add1bf"
ToDigest: "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08"
ToDigest: "b48111c10c65fc119368edafb19f97451759ee90b3f44647368135ca47aa4753"
ToDigest: "a83dd0ccbffe39d071cc317ddf6e97f5c6b1c87af91919271f9fa140b0508c6c"
ToDigest: "2998b3232d29e8dc5a78d97a32ce83f556f3ed31b057077503df05641dd79158"
ToDigest: "b304adc33719d6cddb96d6c4f1aa46628d927547b49b70c981c7fea953e0600a"
ToDigest: "e30b3910ffd943c99f7edd7543c56c608522784ec521e85a91cf296f14a13e72"

You can find the code here . It is under the branch streaming.

Further

This is an excellent way to build long running network calls which are iterative in nature, as well make them performant.

I have faced a bit of random GO-AWAY errors with gRPC while using it production. I would love to here incase anyone has experienced the same.

Leave a Reply