3 Min. Lesezeit

gRPC Streaming with Rust

gRPC Streaming with Rust

gRPC Streaming with Rust 

In this article, we will create a streaming service in Rust. There are two parts to a streaming service, the server and the client.

Dependencies 

When implementing anything related to gRPC in Rust, the following crates are usually used:

Crate Description
prost A protobuf implementation for Rust
prost-types Well-known types for protobuf
tonic A Rust implementation of gRPC
tonic-prost Prost codec implementation for tonic
tokio An asynchronous framework for network applications
tokio-stream Stream utilities for Tokio

gRPC service and messages 

Wa are going to implement a server streaming service, that streams the server local time to the connected clients.

syntax = "proto3";
import "google/protobuf/timestamp.proto";

package clock;

service ClockService {
rpc Observe(ClockRequest) returns (stream ClockResponse);
}

message ClockRequest {
uint64 interval_seconds = 1;
}

message ClockResponse {
google.protobuf.Timestamp time_stamp = 1;
}

The service ClockService.Observe returns a stream of the message ClockResponse. In our Rust example, we use tonic to implement the server and an example client.

Rust implementation 

Our Rust project contains two binaries, one for the gRPC server and one for the client example.
The directory looks like this:

.
├── build
.rs
├── Cargo
.toml
├── proto
│ └── clock
│ └── service
.proto
├── src
│ ├── client
.rs
│ ├── lib
.rs
│ └── server
.rs
└── target
└── generated

The Cargo.toml with all the necessary dependencies and binary definitions should look like this:

[package]
name = "grpc-streaming-example"
version = "0.1.0"
edition = "2024"

[dependencies]
chrono = "0.4"
prost = "0.14"
prost-types = "0.14"
tokio = { version = "1.47", features = ["macros", "rt-multi-thread"] }
tokio-stream = "0.1"
tonic = "0.14"
tonic-prost = "0.14"


[build-dependencies]
tonic-prost-build = {version = "0.14", features=["default"]}

[[bin]]
name = "server"
path = "src/server.rs"

[[bin]]
name = "client"
path = "src/client.rs"

Generating the protobuf stubs 

We need Rust stubs for all the gRPC and protobuf structures. To generate those, we use the crate tonic-prost-build

fn main() -> Result<(), Box<dyn std::error::Error>> {
let basepath = "proto";
let out_dir = "target/generated/proto";

// create the out_dir
std::fs::create_dir_all(out_dir)?;

tonic_prost_build::configure()
// Using an outdir makes it easier to find the generated files
.out_dir(out_dir)
// Build the server stubs
.build_server(true)
// Build the client stubs
.build_client(true)
.compile_protos(
&[format!("{}/clock/service.proto", basepath)],
&[basepath.into()],
)?;

Ok(())
}

After building, there will be a file called clock.rs in the directory target/generated/proto.
To be able to use any of those generated structures, we need to provide them to our code as a module. In our src/lib.rs we add a module for the gRPC/protobuf structures. This way, both, our server and the client have access to them.

pub mod clock {
include!("../target/generated/proto/clock.rs");
}

Server implementation 

We have to implement a trait that is provided by generated code. It is called ClockService. Its fully qualified name is grpc_streaming_example::clock::clock_service_server::ClockService. So we create a struct and let this struct implement the trait:

use grpc_streaming_example::{
clock::{
ClockRequest, ClockResponse,
clock_service_server::{ClockService, ClockServiceServer},
},
format_time
,
};
use prost_types::Timestamp;
use std::{
pin::Pin, result::Result, time::{Duration, SystemTime, UNIX_EPOCH}
};
use tokio::sync::mpsc;
use tokio_stream::{Stream, wrappers::ReceiverStream};
use tonic::{Response, Status, transport::Server};

pub struct ClockServiceImpl;

type ClockResult = Result<ClockResponse, Status>;
type ClockStream = dyn Stream<Item = ClockResult> + Send;

#[tonic::async_trait]
impl ClockService for ClockServiceImpl {
type ObserveStream = Pin<Box<ClockStream>>;

async fn observe(
&self,
request
: tonic::Request<ClockRequest>,
) -> std::result::Result<tonic::Response<Self::ObserveStream>, tonic::Status> {
// Server side implementation goes here
}
  • The observe function takes a tonic request for a ClockRequest as a parameter and returns a standard Result of a tonic response for a ObserveStream.
  • The ObserveStream is a type alias from the trait, that we have to define. It is defined as a pinned box of a ClockStream.
  • Pin and Box are some commonly used wrappers or smart pointers in rust.
  • ClockStream is a stream of ClockResult items, that can be sent between threads.
  • ClockResult is an alias for a Result of a ClockResponse and a tonic Status.

Demo server code 

Every connection will spawn a thread that loops until the remote side closes the channel. Here is the full implementation for the service observe:

async fn observe(
&self,
request
: tonic::Request<ClockRequest>,
) -> Result<tonic::Response<Self::ObserveStream>, tonic::Status> {
let seconds = request.into_inner().interval_seconds;

let (tx, rx) = mpsc::channel(1);
tokio::spawn(async move {
loop {
let server_time = Result::<_, Status>::Ok(ClockResponse {
time_stamp
: Some(now()),
});
match tx.send(server_time.clone()).await {
Ok(_) => {
if let Some(event) = server_time.ok() {
if let Some(time_stamp) = event.time_stamp {
println!("Sent {}", format_time(time_stamp));
}
}

std::thread::sleep(Duration::from_secs(seconds));
}
Err(e) => {
println!("Send got an error: {e}");
break;
}
}
}
});

let stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(stream) as Self::ObserveStream))
}

The server creates a mpsc::channel (using the tokio version, not the standard one), and spawns a thread, that sends the current time as a ClockResponse to the tx side of the channel. The thread sleeps for the configured amount of seconds, before continuing.

It then returns the rx-side of the channel as a ReceiverStream.

Client example 

The client connects to the server and requests to observe the server stream.
It uses the generated ClockServiceClient to connect to the server:

let mut client = ClockServiceClient::connect("http://[::1]:50051").await?;

It then calls the gRPC service:

let request = ClockRequest {
interval_seconds
: 1,
};
let mut stream = client.observe(request).await?.into_inner();

We then can iterate over the stream like this:

while let Some(event) = stream.next().await {
if let Ok(event) = event {
if let Some(time_stamp) = event.time_stamp {
print!("\rServer time: {}", format_time(time_stamp));
std::io::stdout().flush()?;
}
}
}

Running the examples 

  • To start the server, execute: cargo run --bin server
  • To run the client, execute: cargo run --bin client

The code of this can be found on GitLab

Datenschutz und KI: Herausforderungen und Lösungen

Datenschutz und KI: Herausforderungen und Lösungen

Die HR-Welt ändert sich gerade schneller denn je. Die Digitalisierung und der Fachkräftemangel zwingen sie in eine strategische Ausrichtung mit dem...

Mehr lesen
Wie die Digitalisierung im Personalwesen gelingt

Wie die Digitalisierung im Personalwesen gelingt

Die Digitalisierung revolutioniert das Personalwesen und birgt zahlreiche Vorteile, aber auch Herausforderungen. Erfahre hier, wie Du diese meistern...

Mehr lesen
Moderne Zeiterfassung: Effiziente Lösungen für Dein Unternehmen

Moderne Zeiterfassung: Effiziente Lösungen für Dein Unternehmen

In der heutigen Geschäftswelt ist eine präzise und effiziente Zeiterfassung entscheidend für den Erfolg Deines Unternehmens. Warum moderne...

Mehr lesen