1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155extern crate hbb_common;
#[cfg(feature = "webrtc")]
use hbb_common::webrtc::WebRTCStream;
use std::io::Write;
use anyhow::Result;
use bytes::Bytes;
use clap::{Arg, Command};
use tokio::time::Duration;
#[cfg(not(feature = "webrtc"))]
#[tokio::main]
async fn main() -> Result<()> {
println!(
"The webrtc feature is not enabled. \
Please enable the webrtc feature to run this example."
);
Ok(())
}
#[cfg(feature = "webrtc")]
#[tokio::main]
async fn main() -> Result<()> {
let app = Command::new("webrtc-stream")
.about("An example of webrtc stream using hbb_common and webrtc-rs")
.arg(
Arg::new("debug")
.long("debug")
.short('d')
.action(clap::ArgAction::SetTrue)
.help("Prints debug log information"),
)
.arg(
Arg::new("offer")
.long("offer")
.short('o')
.help("set offer from other endpoint"),
);
let matches = app.clone().get_matches();
let debug = matches.contains_id("debug");
if debug {
println!("Debug log enabled");
env_logger::Builder::new()
.format(|buf, record| {
writeln!(
buf,
"{}:{} [{}] {} - {}",
record.file().unwrap_or("unknown"),
record.line().unwrap_or(0),
record.level(),
chrono::Local::now().format("%H:%M:%S.%6f"),
record.args()
)
})
.filter(Some("hbb_common"), log::LevelFilter::Debug)
.init();
}
let remote_endpoint = if let Some(endpoint) = matches.get_one::<String>("offer") {
endpoint.to_string()
} else {
"".to_string()
};
let webrtc_stream = WebRTCStream::new(&remote_endpoint, false, 30000).await?;
// Print the offer to be sent to the other peer
let local_endpoint = webrtc_stream.get_local_endpoint().await?;
if remote_endpoint.is_empty() {
println!();
// Wait for the answer to be pasted
println!(
"Start new terminal run: \n{} \ncopy remote endpoint and paste here",
format!(
"cargo r --features webrtc --example webrtc -- --offer {}",
local_endpoint
)
);
// readline blocking
let line = std::io::stdin()
.lines()
.next()
.ok_or_else(|| anyhow::anyhow!("No input received"))??;
webrtc_stream.set_remote_endpoint(&line).await?;
} else {
println!(
"Copy local endpoint and paste to the other peer: \n{}",
local_endpoint
);
}
let s1 = webrtc_stream.clone();
tokio::spawn(async move {
let _ = read_loop(s1).await;
});
let s2 = webrtc_stream.clone();
tokio::spawn(async move {
let _ = write_loop(s2).await;
});
println!("Press ctrl-c to stop");
tokio::select! {
_ = tokio::signal::ctrl_c() => {
println!();
}
};
Ok(())
}
// read_loop shows how to read from the datachannel directly
#[cfg(feature = "webrtc")]
async fn read_loop(mut stream: WebRTCStream) -> Result<()> {
loop {
let Some(res) = stream.next().await else {
println!("WebRTC stream closed; Exit the read_loop");
return Ok(());
};
match res {
Err(e) => {
println!("WebRTC stream read error: {}; Exit the read_loop", e);
return Ok(());
}
Ok(data) => {
println!("Message from stream: {}", String::from_utf8(data.to_vec())?);
}
}
}
}
// write_loop shows how to write to the webrtc stream directly
#[cfg(feature = "webrtc")]
async fn write_loop(mut stream: WebRTCStream) -> Result<()> {
let mut result = Result::<()>::Ok(());
while result.is_ok() {
let timeout = tokio::time::sleep(Duration::from_secs(5));
tokio::pin!(timeout);
tokio::select! {
_ = timeout.as_mut() =>{
let message = webrtc::peer_connection::math_rand_alpha(15);
result = stream.send_bytes(Bytes::from(message.clone())).await;
println!("Sent '{message}' {}", result.is_ok());
}
};
}
println!("WebRTC stream write failed; Exit the write_loop");
Ok(())
}