ArticlesBlog

[gRPC] Lecture 12 – P1: Upload file in chunks with client-streaming gRPC – Golang

[gRPC] Lecture 12 – P1: Upload file in chunks with client-streaming gRPC – Golang


Welcome back, everyone. In this lecture, we will learn how to use
client-streaming RPC To upload image file to the server in multiple
chunks. In this first part of the lecture, we will
implement with Golang. OK, let’s start! First we will define the RPC in the laptop_service.proto
file. We need an UploadImageRequest message. The idea is to divide the image file into
multiple chunks And send them one by one to the server in
each request message. I use a oneof field here because the first
request will only contain the metadata Or some basic information of the image. And the following requests will contain the
image data chunks. The ImageInfo will have 2 fields: The laptop ID and the image type, such as
“.jpg” or “.png” Then we define an UploadImageResponse message, Which will be returned to the client once the server has received all chunks of
the image. It will contain the ID of the image, generated
by the server. And the total size of the uploaded image in
bytes. OK, now we define the UploadImage RPC in the
LaptopService It takes a stream of UploadImageRequest as
input And returns 1 single UploadImageResponse. Alright, now let’s run make gen to generate
codes. The code is successfully generated, And we see an error here because the laptop server hasn’t implemented
the UploadImage method that is required by the LaptopServiceServer
interface. So let’s open the laptop_server.go file And add that function to the LaptopServer
struct. We can easily find this function signature inside the generated laptop_service.pb.go
file Just need to copy and paste it here. Let’s return nil for now. And you can see that the error is gone. We will come back to that function later. Now we need to implement a new image store
first. The ImageStore interface has 1 function to
save a laptop image. It will take 3 input parameters: The laptop ID, the image type, and the image
data. And it will return the ID of the saved image,
or an error. Next we will implement a DiskImageStore That will save image file to the disk, And store its information in memory. Similar to the laptop store, We need a mutex to handle concurrency. Then we need the path of the folder to save
laptop images. And finally a map with the key is image ID, and the value is some information of the image. The ImageInfo will contain 3 fields: The ID of the laptop, The type of the image (or its file extension), And the path to the image file on disk. OK, let’s write a function to create a new
DiskImageStore. It has only 1 input, which is the image folder. And inside, we just need to initialize the
map. Now we have to implement the Save function which is required by the ImageStore interface. First we have to generate a new random UUID
for the image. If error is not nil, just wrap and return
it. Else, we make the path to store the image By joining the image folder, image ID, and
image type. Then we call os.Create to create the file. If there’s an error, just wrap and return
it. Otherwise, we write the image data to the
created file. Wrap and return an error if it occurs. If the file is written successfully, We need to save its information to the in-memory
map. So we have to acquire the write lock of the
store. We save the image information to the map with
key is the ID of the image. And the value contains the laptop ID, the
image type, and the path to the image file. Finally we return the image ID and no error. That’s it, we’re done with the image store. Now let’s go back to the server. We need to add the new image store to the
LaptopServer struct, So I will change this store field to laptopStore. And add the imageStore as the second parameter
of this NewLaptopServer function Alright, now some new errors show up because
of this change. Let’s open the laptop_client_test.go file First, the public Store field is replaced
by the laptopStore. So let’s extract this new in-memory laptop
store to a separate variable. And replace this call with it. Now we no longer need the laptopServer object, So let’s remove it from the
startTestLaptopServer() function. And add imageStore as its input parameter. Now we can pass the 2 stores into the
NewLaptopServer() function. At the end, we only need to return the
server address. Then in this create laptop test, We just pass nil as the image store because the test doesn’t do anything with
it. Similar for the search laptop test. No more errors in this file. We do the same for the laptop_server_test.go
file. Finally in the laptop_server.go file, We just change the call from Store to laptopStore, And all errors are gone. To be sure, I will run all unit tests of this
package . They passed. So we’re good. Now in this main.go file of the server, We also need to pass 2 stores into the NewLaptopServer
function. One is the laptop store. And the other is the image store. I will create a new “img” folder to save
the uploaded images. OK, looks like everything is good now. Let’s implement the UploadImage function
on the server. First we call stream.Recv() to receive the
first request Which contains the information of the image. If there’s an error, we write a log And return the status code Unknown to the
client. Actually this looks a bit long and duplicate, So I will define a logError() function to
log the error before returning it. It only prints log if the error is not nil. And always returns the error to the caller. Now with this function, we can simplify the
error handling block like this. If there’s no error, we can get the laptop
ID from the request. As well as the image type. Let’s write a log here saying that We have received the upload-image request
with this laptop ID and image type. Next, we have to make sure that the laptop
ID exists. So we call server.laptopStore.Find() to find
the laptop by ID. If we get an error, Just log and return it with the Internal status
code. Else, if the laptop is nil, which means it
is not found, We log and return an error status code InvalidArgument. Or you might use code NotFound if you want. Now if everything goes well and the laptop
is found, We can start receiving the image chunks data. So let’s create a new byte buffer to store
them. And also a variable to keep track of the total
image size. Since we’re going to receive many requests
from the stream, I will use a for loop here. And inside it, let’s write a log saying
we’re waiting for chunk data. Similar as before, we call stream.Recv() to
get the request. But this time, we first check if the error
is EOF or not. If it is, this means that no more data will
be sent, And we can safely break the loop. Else, if the error is still not nil, We return it with Unknown status code to the
client. Otherwise, if there’s no error, We can get the chunk data from the request. And we get its size using the len() function. We add this size to the total image size. Let’s say we don’t want the client to
send too large image, So we will check if the image size is greater
than the maximum size. I will define a constant for the max image
size of 1 megabyte. Now if this happens, we can return an error
with InvalidArgument status code. And a message saying the image is too large. Else, we can append the chunk to the image
data with the Write() function. Also log and return Internal status code if
an error occurs. After the for loop, We have collected all data of the image in
the buffer. Now we can call imageStore.Save to save the
image data to the store And get back the image ID. If there’s an error, we log and return it
with Internal status code. If the image is saved successfully, We create a response object with the image
ID and image size. Then we call stream.SendAndClose() to send
the response to client. Return any error that occurs with Unknown
status code. And finally we can write a log saying that the image is successfully saved with this
ID and size. Then we’re done with the server. Now let’s implement the client. First I will refactor the code a bit. Let’s make laptop as a parameter of this
createLaptop function. And send a sample laptop to it from outside,
like in this for loop. Then I’m going to create a separate function For the test search laptop RPC that we wrote
in the last lecture. Let’s copy this block of codes And paste it to the function. Let’s add another function for test create
laptop RPC as well. OK, now we will write a new function to test
the upload image RPC. And call it from the main function. In this testUploadImage() function, We first generate a random laptop. And call createLaptop() to create it on the
server. Then we will write a new uploadImage() function To upload an image of this laptop to the server. That function will have 3 input parameters: The laptop client, the laptop ID, and the
path to the laptop image. First we call os.Open() to open the image
file. If there’s an error, we write a fatal log. Else, we use defer to close the file afterward. Then we create a context with timeout of 5
seconds. And we call laptopClient.UploadImage() with
that context. It will return a stream object, and an error. If error is not nil, we write a fatal log. Otherwise, we create the first request To send some image information to the server. Which includes the laptop ID, And the image type, or the extension of the
image file. OK, now we call stream.Send() to send the first request to the server. If we get an error, write a fatal log. Else, we will create a buffer reader to read the content of the image file in chunks. Let’s say each chunk will be 1 kilobyte,
or 1024 bytes. Now we will read the image data chunks in
this for loop. Just call reader.Read() to read the data to
the buffer. It will return the number of bytes read and
an error. If the error is EOF, then it’s the end of
the file, We simply break the loop. Else, if error is not nil, we write a fatal
log. Otherwise, we create a new request with the
chunk data. Make sure that the chunk only contains the
first n bytes of the buffer. Then we call stream.Send() to send it to the
server. Again, write a fatal log here if an error
occurs. Finally, after the for loop, We call stream.CloseAndRecv() to receive a
response from the server. If there’s an error, write a fatal log. Else, we write a log saying that The image is successfully uploaded with this
ID and size. And that’s it. The client is done. Now let’s run the server. And run the client. There’s an error: cannot open image file
laptop.jpg It’s because I forgot to put the file to
the tmp folder. So let’s do that. I have a laptop image file in the Download
folder. I will drag it into the tmp folder. OK, the file is ready. Now let’s rerun the client. We got another error: Cannot send chunk to server: EOF. This error message is not very useful since it doesn’t tell us exactly why. So let’s look at the client code. We know that the message comes from this log. But this error is EOF because when an error
occurs, the server will close the stream, And thus the client cannot send more data
to it. To get the real error that contains the gRPC
status code, We must call stream.RecvMsg() with a nil parameter, Now we can print out this error as well, And rerun the client to see what happens. Now we can see that the real error is InvalidArgument,
laptop doesn’t exist. And it is because the laptop ID is empty. OK, now let’s simplify this error log a
bit. And add it to this log as well. The laptop ID is empty because it is set in
the createLaptop() function. So let’s remove this line. And rerun the client. This time it works. The image is uploaded successfully. On the server side, we see a bunch of logs: waiting to receive more data. It doesn’t look very nice, So let’s write one more line of log here, Saying that we have received a new chunk data
with this size. Alright, now if we open the img folder, We can see the laptop image is saved there. Excellent! OK, now let’s see what happens if timeout
occurs. Suppose that somehow the server is writing
the data very slowly. Here I sleep 1 second before writing the chunk
to the buffer. OK, let’s try it. Run the server. And run the client. After 5 seconds, we see an error log on the
server. However the status code is Unknown, And it also contains other DeadlineExceeded
error, which is not very nice. So let’s fix this by checking the context
error before calling receive on the stream. I will extract this context error checking
block from the create laptop RPC, And make it a separate function. Let’s use switch case here to make it more
concise and easier to read. In case the context error is Canceled, We log it and return the error. In case DeadlineExceeded, We do the same. And for default case, just return nil. OK now go back to our for loop, Here we call the contextError() function with
the stream context. If error is not nil, we return it immediately. Alright, let’s run the server. And run the client. Now on the server side, we see a better error
log With status code DeadLineExceeded. Perfect! Let’s try another case where the upload image is larger than the
maximum allowed size. I will change this constraint to 1 kilobyte
instead of 1 megabyte. Then rerun the server. And the client. This time we got InvalidArgument: image is
too large. On the server side, it only receives 2 data
chunks Before the same error log is printed. So it works! I’m gonna undo this change to make it 1
megabyte as before. And also comment out this time.Sleep statement. OK, now let’s learn how to write test for
this client-streaming RPC. Function test client upload image. For this test, I’m gonna use tmp as the
image folder. The first thing we need to do is to create
a new in-memory laptop store. And create a new disk image store with the
tmp image folder. We generate a sample laptop, And save it to the laptop store. Then we start the test server, And make a new client. The image we’re gonna upload is the laptop.jpg
file inside the tmp folder. So let’s open the file, Check that there’s no error. And defer closing the file. Then we call laptopClient.UploadImage to get
the stream. Require no error. Now we get the image type from the file extension. Actually the rest of the test is very similar
to what we’ve done in the client main.go file So I’m just gonna do a copy and paste to
save time. OK, this laptop ID should be changed to laptop.GetId() And this image type should be just imageType. We replace this block with require.NoError() The same for this error. We also want to keep track of the total image
size So let’s define a size variable here. And add n to the size here. Replace this block with require.NoError() And the same for this one. Now we check that the returned ID should not
be a zero-value. And the value of the returned image size should
equal to size. We also want to check that the image is saved
to the correct folder on the server. It should be inside the test image folder, With file name is the image ID and file extension
is the image type. We can use require.FileExists() function to
check that. And finally we need to remove the file at
the end of the test. Alright, let’s run it. It passed! Let’s run the whole test sets. All passed. Excellent! And that’s it for today’s lecture about
client-streaming RPC. In the next video, we will learn how to implement
it in Java. Thank you for watching and I will see you
later.

Comment here