Datenschutz und KI: Herausforderungen und Lösungen
Die HR-Welt ändert sich gerade schneller denn je. Die Digitalisierung und der Fachkräftemangel zwingen sie in eine strategische Ausrichtung mit dem...
In this article, we will create a streaming service in Rust. There are two parts to a streaming service, the server and the client.
When implementing anything related to gRPC in Rust, the following crates are usually used:
Crate | Description |
---|---|
prost | A protobuf implementation for Rust |
prost-types | Well-known types for protobuf |
tonic | A Rust implementation of gRPC |
tonic-prost | Prost codec implementation for tonic |
tokio | An asynchronous framework for network applications |
tokio-stream | Stream utilities for Tokio |
Wa are going to implement a server streaming service, that streams the server local time to the connected clients.
syntax = "proto3";
import "google/protobuf/timestamp.proto";
package clock;
service ClockService {
rpc Observe(ClockRequest) returns (stream ClockResponse);
}
message ClockRequest {
uint64 interval_seconds = 1;
}
message ClockResponse {
google.protobuf.Timestamp time_stamp = 1;
}
The service ClockService.Observe
returns a stream of the message ClockResponse
. In our Rust example, we use tonic
to implement the server and an example client.
Our Rust project contains two binaries, one for the gRPC server and one for the client example.
The directory looks like this:
.
├── build.rs
├── Cargo.toml
├── proto
│ └── clock
│ └── service.proto
├── src
│ ├── client.rs
│ ├── lib.rs
│ └── server.rs
└── target
└── generated
The Cargo.toml
with all the necessary dependencies and binary definitions should look like this:
[package]
name = "grpc-streaming-example"
version = "0.1.0"
edition = "2024"
[dependencies]
chrono = "0.4"
prost = "0.14"
prost-types = "0.14"
tokio = { version = "1.47", features = ["macros", "rt-multi-thread"] }
tokio-stream = "0.1"
tonic = "0.14"
tonic-prost = "0.14"
[build-dependencies]
tonic-prost-build = {version = "0.14", features=["default"]}
[[bin]]
name = "server"
path = "src/server.rs"
[[bin]]
name = "client"
path = "src/client.rs"
We need Rust stubs for all the gRPC and protobuf structures. To generate those, we use the crate tonic-prost-build
fn main() -> Result<(), Box<dyn std::error::Error>> {
let basepath = "proto";
let out_dir = "target/generated/proto";
// create the out_dir
std::fs::create_dir_all(out_dir)?;
tonic_prost_build::configure()
// Using an outdir makes it easier to find the generated files
.out_dir(out_dir)
// Build the server stubs
.build_server(true)
// Build the client stubs
.build_client(true)
.compile_protos(
&[format!("{}/clock/service.proto", basepath)],
&[basepath.into()],
)?;
Ok(())
}
After building, there will be a file called clock.rs
in the directory target/generated/proto
.
To be able to use any of those generated structures, we need to provide them to our code as a module. In our src/lib.rs
we add a module for the gRPC/protobuf structures. This way, both, our server and the client have access to them.
pub mod clock {
include!("../target/generated/proto/clock.rs");
}
We have to implement a trait that is provided by generated code. It is called ClockService
. Its fully qualified name is grpc_streaming_example::clock::clock_service_server::ClockService
. So we create a struct and let this struct implement the trait:
use grpc_streaming_example::{
clock::{
ClockRequest, ClockResponse,
clock_service_server::{ClockService, ClockServiceServer},
},
format_time,
};
use prost_types::Timestamp;
use std::{
pin::Pin, result::Result, time::{Duration, SystemTime, UNIX_EPOCH}
};
use tokio::sync::mpsc;
use tokio_stream::{Stream, wrappers::ReceiverStream};
use tonic::{Response, Status, transport::Server};
pub struct ClockServiceImpl;
type ClockResult = Result<ClockResponse, Status>;
type ClockStream = dyn Stream<Item = ClockResult> + Send;
#[tonic::async_trait]
impl ClockService for ClockServiceImpl {
type ObserveStream = Pin<Box<ClockStream>>;
async fn observe(
&self,
request: tonic::Request<ClockRequest>,
) -> std::result::Result<tonic::Response<Self::ObserveStream>, tonic::Status> {
// Server side implementation goes here
}
observe
function takes a tonic request for a ClockRequest
as a parameter and returns a standard Result
of a tonic response for a ObserveStream
.ObserveStream
is a type alias from the trait, that we have to define. It is defined as a pinned box of a ClockStream
.ClockStream
is a stream of ClockResult
items, that can be sent between threads.ClockResult
is an alias for a Result
of a ClockResponse
and a tonic Status
.Every connection will spawn a thread that loops until the remote side closes the channel. Here is the full implementation for the service observe
:
async fn observe(
&self,
request: tonic::Request<ClockRequest>,
) -> Result<tonic::Response<Self::ObserveStream>, tonic::Status> {
let seconds = request.into_inner().interval_seconds;
let (tx, rx) = mpsc::channel(1);
tokio::spawn(async move {
loop {
let server_time = Result::<_, Status>::Ok(ClockResponse {
time_stamp: Some(now()),
});
match tx.send(server_time.clone()).await {
Ok(_) => {
if let Some(event) = server_time.ok() {
if let Some(time_stamp) = event.time_stamp {
println!("Sent {}", format_time(time_stamp));
}
}
std::thread::sleep(Duration::from_secs(seconds));
}
Err(e) => {
println!("Send got an error: {e}");
break;
}
}
}
});
let stream = ReceiverStream::new(rx);
Ok(Response::new(Box::pin(stream) as Self::ObserveStream))
}
The server creates a mpsc::channel
(using the tokio version, not the standard one), and spawns a thread, that sends the current time as a ClockResponse
to the tx side of the channel. The thread sleeps for the configured amount of seconds, before continuing.
It then returns the rx-side of the channel as a ReceiverStream
.
The client connects to the server and requests to observe the server stream.
It uses the generated ClockServiceClient
to connect to the server:
let mut client = ClockServiceClient::connect("http://[::1]:50051").await?;
It then calls the gRPC service:
let request = ClockRequest {
interval_seconds: 1,
};
let mut stream = client.observe(request).await?.into_inner();
We then can iterate over the stream like this:
while let Some(event) = stream.next().await {
if let Ok(event) = event {
if let Some(time_stamp) = event.time_stamp {
print!("\rServer time: {}", format_time(time_stamp));
std::io::stdout().flush()?;
}
}
}
cargo run --bin server
cargo run --bin client
The code of this can be found on GitLab
Die HR-Welt ändert sich gerade schneller denn je. Die Digitalisierung und der Fachkräftemangel zwingen sie in eine strategische Ausrichtung mit dem...
Die Digitalisierung revolutioniert das Personalwesen und birgt zahlreiche Vorteile, aber auch Herausforderungen. Erfahre hier, wie Du diese meistern...
In der heutigen Geschäftswelt ist eine präzise und effiziente Zeiterfassung entscheidend für den Erfolg Deines Unternehmens. Warum moderne...