new version full rebuild with claude opus 4.6 and own rdp daemon

This commit is contained in:
felixg
2026-02-28 21:19:52 +01:00
parent fac33c27b4
commit 6e9956bce9
19 changed files with 2185 additions and 326 deletions

22
rdpd/Cargo.toml Normal file
View File

@@ -0,0 +1,22 @@
[package]
name = "rdpd"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-tungstenite = "0.21"
futures-util = "0.3"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
image = { version = "0.25", default-features = false, features = ["jpeg"] }
libc = "0.2"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
x11rb = { version = "0.13", features = ["allow-unsafe-code", "xtest"] }
dashmap = "6"
xxhash-rust = { version = "0.8", features = ["xxh3"] }
[profile.release]
lto = true
strip = true

57
rdpd/Dockerfile Normal file
View File

@@ -0,0 +1,57 @@
# syntax=docker/dockerfile:1
#
# Multi-stage build for rdpd — custom RDP daemon using xfreerdp3 + Xvfb.
# Uses Ubuntu 24.04 for both stages since freerdp3-x11 is only in Ubuntu repos.
#
# ---- Build stage ----
FROM ubuntu:24.04 AS builder
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y --no-install-recommends \
curl \
ca-certificates \
build-essential \
pkg-config \
libx11-dev \
libxcb1-dev \
libxtst-dev \
&& rm -rf /var/lib/apt/lists/*
# Install Rust toolchain
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain stable
ENV PATH="/root/.cargo/bin:${PATH}"
WORKDIR /app
COPY rdpd/Cargo.toml rdpd/Cargo.lock* ./
COPY rdpd/src ./src
RUN cargo build --release
# ---- Runtime stage ----
FROM ubuntu:24.04
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get update && apt-get install -y --no-install-recommends \
xvfb \
xdotool \
xclip \
x11-utils \
x11-xserver-utils \
freerdp3-x11 \
libxcb1 \
libx11-6 \
libxtst6 \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/target/release/rdpd /usr/local/bin/rdpd
ENV RDPD_LISTEN=0.0.0.0:7777
ENV RDPD_LOG_LEVEL=info
EXPOSE 7777
ENTRYPOINT ["/usr/local/bin/rdpd"]

134
rdpd/src/capture.rs Normal file
View File

@@ -0,0 +1,134 @@
use image::codecs::jpeg::JpegEncoder;
use image::ExtendedColorType;
use std::io::Cursor;
use tracing::{debug, trace};
use x11rb::connection::Connection;
use x11rb::protocol::xproto::{self, ConnectionExt, ImageFormat};
use x11rb::rust_connection::RustConnection;
use xxhash_rust::xxh3::xxh3_64;
/// Connects to the X11 display and provides framebuffer capture.
pub struct FrameCapture {
conn: RustConnection,
root: u32,
width: u16,
height: u16,
prev_hash: u64,
}
impl FrameCapture {
/// Connect to the X display specified by `display_num` (e.g. 10 → ":10").
/// Retries a few times since Xvfb may take a moment to start.
pub async fn connect(display_num: u32) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let display_str = format!(":{}", display_num);
let mut last_err = String::new();
for attempt in 0..30 {
match RustConnection::connect(Some(&display_str)) {
Ok((conn, screen_num)) => {
let screen = &conn.setup().roots[screen_num];
let root = screen.root;
let width = screen.width_in_pixels;
let height = screen.height_in_pixels;
debug!(
display = %display_str,
width,
height,
attempt,
"connected to X11 display"
);
return Ok(Self {
conn,
root,
width,
height,
prev_hash: 0,
});
}
Err(e) => {
last_err = e.to_string();
trace!(attempt, err = %last_err, "X11 connect attempt failed, retrying...");
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
}
}
Err(format!(
"failed to connect to X display {} after 30 attempts: {}",
display_str, last_err
)
.into())
}
/// Capture the full framebuffer. Returns Some(jpeg_bytes) if the frame
/// changed since last capture, None if unchanged.
///
/// TODO: implement dirty region tracking — capture only changed tiles and
/// send them with (x, y, w, h) metadata to reduce bandwidth.
pub fn capture_frame(&mut self, quality: u8) -> Result<Option<Vec<u8>>, Box<dyn std::error::Error + Send + Sync>> {
let reply = self
.conn
.get_image(
ImageFormat::Z_PIXMAP,
self.root,
0,
0,
self.width,
self.height,
u32::MAX,
)?
.reply()?;
let pixels = &reply.data;
// Fast hash comparison to skip unchanged frames
let hash = xxh3_64(pixels);
if hash == self.prev_hash {
return Ok(None);
}
self.prev_hash = hash;
// X11 ZPixmap gives us BGRA (or BGRx) — convert to RGB for JPEG
let pixel_count = (self.width as usize) * (self.height as usize);
let mut rgb = Vec::with_capacity(pixel_count * 3);
for i in 0..pixel_count {
let base = i * 4;
if base + 2 < pixels.len() {
rgb.push(pixels[base + 2]); // R
rgb.push(pixels[base + 1]); // G
rgb.push(pixels[base]); // B
}
}
// Encode as JPEG
let mut jpeg_buf = Cursor::new(Vec::with_capacity(256 * 1024));
let mut encoder = JpegEncoder::new_with_quality(&mut jpeg_buf, quality);
encoder.encode(&rgb, self.width as u32, self.height as u32, ExtendedColorType::Rgb8)?;
let jpeg_bytes = jpeg_buf.into_inner();
trace!(
size = jpeg_bytes.len(),
width = self.width,
height = self.height,
"frame captured and encoded"
);
Ok(Some(jpeg_bytes))
}
pub fn width(&self) -> u16 {
self.width
}
pub fn height(&self) -> u16 {
self.height
}
/// Update the capture dimensions (e.g. after xrandr resize).
pub fn set_dimensions(&mut self, width: u16, height: u16) {
self.width = width;
self.height = height;
// Reset hash to force next capture to be sent
self.prev_hash = 0;
}
}

231
rdpd/src/input.rs Normal file
View File

@@ -0,0 +1,231 @@
use tracing::{debug, trace, warn};
use x11rb::connection::{Connection, RequestConnection};
use x11rb::protocol::xproto::{self, ConnectionExt};
use x11rb::protocol::xtest::ConnectionExt as XTestConnectionExt;
use x11rb::rust_connection::RustConnection;
/// Handles X11 input injection via the XTEST extension.
pub struct InputInjector {
conn: RustConnection,
root: u32,
#[allow(dead_code)]
screen_num: usize,
}
impl InputInjector {
/// Connect to the X display for input injection.
pub async fn connect(display_num: u32) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let display_str = format!(":{}", display_num);
let mut last_err = String::new();
for attempt in 0..30 {
match RustConnection::connect(Some(&display_str)) {
Ok((conn, screen_num)) => {
let screen = &conn.setup().roots[screen_num];
let root = screen.root;
// Verify XTEST extension is available
match conn.extension_information(x11rb::protocol::xtest::X11_EXTENSION_NAME) {
Ok(Some(_)) => {
debug!(display = %display_str, attempt, "input injector connected with XTEST");
}
_ => {
return Err("XTEST extension not available on X display".into());
}
}
return Ok(Self {
conn,
root,
screen_num,
});
}
Err(e) => {
last_err = e.to_string();
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
}
}
}
Err(format!(
"input injector: failed to connect to X display {} after 30 attempts: {}",
display_str, last_err
)
.into())
}
/// Move the mouse pointer to (x, y).
pub fn mouse_move(&self, x: i32, y: i32) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
self.conn.xtest_fake_input(
xproto::MOTION_NOTIFY_EVENT,
0, // detail (unused for motion)
x11rb::CURRENT_TIME,
self.root,
x as i16,
y as i16,
0,
)?;
self.conn.flush()?;
trace!(x, y, "mouse_move injected");
Ok(())
}
/// Press a mouse button. Button mapping: 1=left, 2=middle, 3=right.
pub fn mouse_down(&self, button: u8, x: i32, y: i32) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Move first, then press
self.mouse_move(x, y)?;
self.conn.xtest_fake_input(
xproto::BUTTON_PRESS_EVENT,
button,
x11rb::CURRENT_TIME,
self.root,
0,
0,
0,
)?;
self.conn.flush()?;
trace!(button, x, y, "mouse_down injected");
Ok(())
}
/// Release a mouse button.
pub fn mouse_up(&self, button: u8, x: i32, y: i32) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
self.mouse_move(x, y)?;
self.conn.xtest_fake_input(
xproto::BUTTON_RELEASE_EVENT,
button,
x11rb::CURRENT_TIME,
self.root,
0,
0,
0,
)?;
self.conn.flush()?;
trace!(button, x, y, "mouse_up injected");
Ok(())
}
/// Scroll the mouse wheel. In X11, button 4 = scroll up, button 5 = scroll down.
pub fn mouse_scroll(&self, delta: i32, x: i32, y: i32) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
self.mouse_move(x, y)?;
let (button, count) = if delta < 0 {
(5u8, (-delta) as u32) // scroll down
} else {
(4u8, delta as u32) // scroll up
};
for _ in 0..count.min(10) {
self.conn.xtest_fake_input(
xproto::BUTTON_PRESS_EVENT,
button,
x11rb::CURRENT_TIME,
self.root,
0,
0,
0,
)?;
self.conn.xtest_fake_input(
xproto::BUTTON_RELEASE_EVENT,
button,
x11rb::CURRENT_TIME,
self.root,
0,
0,
0,
)?;
}
self.conn.flush()?;
trace!(delta, button, count, x, y, "mouse_scroll injected");
Ok(())
}
/// Convert an X11 KeySym to a keycode on this display, then inject a key press.
pub fn key_down(&self, keysym: u32) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let keycode = self.keysym_to_keycode(keysym)?;
self.conn.xtest_fake_input(
xproto::KEY_PRESS_EVENT,
keycode,
x11rb::CURRENT_TIME,
self.root,
0,
0,
0,
)?;
self.conn.flush()?;
trace!(keysym, keycode, "key_down injected");
Ok(())
}
/// Inject a key release.
pub fn key_up(&self, keysym: u32) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let keycode = self.keysym_to_keycode(keysym)?;
self.conn.xtest_fake_input(
xproto::KEY_RELEASE_EVENT,
keycode,
x11rb::CURRENT_TIME,
self.root,
0,
0,
0,
)?;
self.conn.flush()?;
trace!(keysym, keycode, "key_up injected");
Ok(())
}
/// Map a KeySym to a keycode on the current display.
fn keysym_to_keycode(&self, keysym: u32) -> Result<u8, Box<dyn std::error::Error + Send + Sync>> {
let setup = self.conn.setup();
let min_keycode = setup.min_keycode;
let max_keycode = setup.max_keycode;
let mapping = self
.conn
.get_keyboard_mapping(min_keycode, max_keycode - min_keycode + 1)?
.reply()?;
let keysyms_per_keycode = mapping.keysyms_per_keycode as usize;
for i in 0..=(max_keycode - min_keycode) as usize {
for j in 0..keysyms_per_keycode {
let idx = i * keysyms_per_keycode + j;
if idx < mapping.keysyms.len() && mapping.keysyms[idx] == keysym {
return Ok(min_keycode + i as u8);
}
}
}
// If keysym not found in the current mapping, try to use xdotool as fallback
warn!(keysym, "keysym not found in keyboard mapping");
Err(format!("keysym 0x{:x} not found in keyboard mapping", keysym).into())
}
// TODO: clipboard monitoring — watch X11 CLIPBOARD selection changes via
// XFixes SelectionNotify and forward to the browser as clipboardRead messages
/// Set the X11 clipboard content (CLIPBOARD selection).
/// Uses xclip as a simple approach; a pure X11 implementation would need
/// to become a selection owner which requires an event loop.
pub fn set_clipboard(&self, display_num: u32, text: &str) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use std::process::{Command, Stdio};
use std::io::Write;
let display = format!(":{}", display_num);
let mut child = Command::new("xclip")
.args(["-selection", "clipboard"])
.env("DISPLAY", &display)
.stdin(Stdio::piped())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn()?;
if let Some(ref mut stdin) = child.stdin {
stdin.write_all(text.as_bytes())?;
}
child.wait()?;
debug!(len = text.len(), "clipboard set via xclip");
Ok(())
}
}

151
rdpd/src/main.rs Normal file
View File

@@ -0,0 +1,151 @@
mod capture;
mod input;
mod protocol;
mod session;
mod session_manager;
mod xfreerdp;
use crate::protocol::{ClientMessage, ServerMessage};
use crate::session::Session;
use crate::session_manager::SessionManager;
use futures_util::StreamExt;
use std::env;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio_tungstenite::accept_async;
use tokio_tungstenite::tungstenite::Message;
use tracing::{error, info, warn};
#[tokio::main]
async fn main() {
// Initialize tracing
let log_level = env::var("RDPD_LOG_LEVEL").unwrap_or_else(|_| "info".to_string());
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_new(&log_level)
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
let listen_addr = env::var("RDPD_LISTEN").unwrap_or_else(|_| "0.0.0.0:7777".to_string());
let addr: SocketAddr = listen_addr
.parse()
.expect("RDPD_LISTEN must be a valid socket address");
let manager = SessionManager::new();
let listener = TcpListener::bind(&addr)
.await
.expect("failed to bind TCP listener");
info!(addr = %addr, "rdpd listening for WebSocket connections");
loop {
let (stream, peer) = match listener.accept().await {
Ok(v) => v,
Err(e) => {
error!(err = %e, "TCP accept error");
continue;
}
};
let manager = manager.clone();
tokio::spawn(async move {
info!(peer = %peer, "new TCP connection");
let ws = match accept_async(stream).await {
Ok(ws) => ws,
Err(e) => {
warn!(peer = %peer, err = %e, "WebSocket handshake failed");
return;
}
};
handle_connection(ws, peer, manager).await;
});
}
}
async fn handle_connection(
mut ws: tokio_tungstenite::WebSocketStream<tokio::net::TcpStream>,
peer: SocketAddr,
manager: Arc<SessionManager>,
) {
// The first message must be a "connect" JSON message
let connect_msg = match ws.next().await {
Some(Ok(Message::Text(text))) => match serde_json::from_str::<ClientMessage>(&text) {
Ok(ClientMessage::Connect {
host,
port,
username,
password,
domain,
width,
height,
security,
}) => {
info!(
peer = %peer,
host = %host,
port,
username = %username,
width,
height,
security = %security,
"connect request received"
);
(host, port, username, password, domain, width, height, security)
}
Ok(_) => {
warn!(peer = %peer, "first message must be a connect request");
let msg = ServerMessage::Error {
message: "First message must be a connect request".to_string(),
};
let _ = futures_util::SinkExt::send(
&mut ws,
Message::Text(msg.to_json()),
).await;
return;
}
Err(e) => {
warn!(peer = %peer, err = %e, "invalid connect message");
let msg = ServerMessage::Error {
message: format!("Invalid connect message: {}", e),
};
let _ = futures_util::SinkExt::send(
&mut ws,
Message::Text(msg.to_json()),
).await;
return;
}
},
Some(Ok(Message::Close(_))) | None => {
info!(peer = %peer, "client disconnected before sending connect");
return;
}
Some(Ok(_)) => {
warn!(peer = %peer, "expected text message with connect request");
return;
}
Some(Err(e)) => {
warn!(peer = %peer, err = %e, "WebSocket error before connect");
return;
}
};
let (host, port, username, password, domain, width, height, security) = connect_msg;
// Allocate a display number and register the session
let display_num = manager.allocate_display();
manager.register(display_num, host.clone(), username.clone());
// Run the session — this blocks until the session ends
Session::run(display_num, ws, host, port, username, password, domain, width, height, security).await;
// Cleanup
manager.unregister(display_num);
info!(peer = %peer, display_num, "session ended");
}

77
rdpd/src/protocol.rs Normal file
View File

@@ -0,0 +1,77 @@
use serde::{Deserialize, Serialize};
// ---------------------------------------------------------------------------
// Client → Daemon messages (JSON text frames)
// ---------------------------------------------------------------------------
#[derive(Debug, Deserialize)]
#[serde(tag = "type")]
#[allow(non_snake_case)]
pub enum ClientMessage {
#[serde(rename = "connect")]
Connect {
host: String,
port: u16,
username: String,
password: String,
#[serde(default)]
domain: String,
#[serde(default = "default_width")]
width: u16,
#[serde(default = "default_height")]
height: u16,
/// RDP security mode: "tls", "nla", "rdp", or "any". Defaults to "tls".
#[serde(default = "default_security")]
security: String,
},
#[serde(rename = "mouseMove")]
MouseMove { x: i32, y: i32 },
#[serde(rename = "mouseDown")]
MouseDown { button: u8, x: i32, y: i32 },
#[serde(rename = "mouseUp")]
MouseUp { button: u8, x: i32, y: i32 },
#[serde(rename = "mouseScroll")]
MouseScroll { delta: i32, x: i32, y: i32 },
#[serde(rename = "keyDown")]
KeyDown { keySym: u32 },
#[serde(rename = "keyUp")]
KeyUp { keySym: u32 },
#[serde(rename = "clipboardWrite")]
ClipboardWrite { text: String },
#[serde(rename = "resize")]
Resize { width: u16, height: u16 },
}
fn default_width() -> u16 {
1280
}
fn default_height() -> u16 {
720
}
fn default_security() -> String {
"nla".to_string()
}
// ---------------------------------------------------------------------------
// Daemon → Client messages (JSON text frames)
// Binary frames (JPEG) are sent directly, not through this enum.
// ---------------------------------------------------------------------------
#[derive(Debug, Serialize)]
#[serde(tag = "type")]
pub enum ServerMessage {
#[serde(rename = "connected")]
Connected,
#[serde(rename = "error")]
Error { message: String },
#[serde(rename = "disconnected")]
Disconnected,
#[serde(rename = "clipboardRead")]
ClipboardRead { text: String },
}
impl ServerMessage {
pub fn to_json(&self) -> String {
serde_json::to_string(self).expect("ServerMessage serialization cannot fail")
}
}

307
rdpd/src/session.rs Normal file
View File

@@ -0,0 +1,307 @@
use crate::capture::FrameCapture;
use crate::input::InputInjector;
use crate::protocol::{ClientMessage, ServerMessage};
use crate::xfreerdp;
use futures_util::stream::SplitSink;
use futures_util::{SinkExt, StreamExt};
use std::process::Stdio;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio_tungstenite::tungstenite::Message;
use tokio_tungstenite::WebSocketStream;
use tracing::{error, info, warn};
/// Frame capture rate — ~15 fps.
const FRAME_INTERVAL_MS: u64 = 66;
/// JPEG quality (1-100).
const JPEG_QUALITY: u8 = 85;
type WsSink = Arc<Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>>;
/// Represents a single active RDP session.
pub struct Session;
impl Session {
/// Start a new RDP session: launch Xvfb, launch xfreerdp3, wire up frame
/// capture and input injection, and proxy everything over the WebSocket.
pub async fn run(
display_num: u32,
ws: WebSocketStream<TcpStream>,
host: String,
port: u16,
username: String,
password: String,
domain: String,
width: u16,
height: u16,
security: String,
) {
let (ws_sink, mut ws_stream) = ws.split();
let sink: WsSink = Arc::new(Mutex::new(ws_sink));
// Helper to send a JSON control message
let send_msg = |sink: WsSink, msg: ServerMessage| async move {
let mut s = sink.lock().await;
let _ = s.send(Message::Text(msg.to_json())).await;
};
// 1. Start Xvfb
let screen = format!("{}x{}x24", width, height);
let display_arg = format!(":{}", display_num);
info!(display_num, %screen, "starting Xvfb");
let xvfb_result = tokio::process::Command::new("Xvfb")
.args([
&display_arg,
"-screen", "0", &screen,
"-ac", // disable access control
"-nolisten", "tcp",
"+extension", "XTEST",
])
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.spawn();
let mut xvfb = match xvfb_result {
Ok(child) => child,
Err(e) => {
error!(err = %e, "failed to start Xvfb");
send_msg(sink.clone(), ServerMessage::Error {
message: format!("Failed to start Xvfb: {}", e),
}).await;
return;
}
};
// Give Xvfb a moment to initialize
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
// Check Xvfb is still running
match xvfb.try_wait() {
Ok(Some(status)) => {
error!(?status, "Xvfb exited prematurely");
send_msg(sink.clone(), ServerMessage::Error {
message: format!("Xvfb exited with status: {}", status),
}).await;
return;
}
Ok(None) => { /* still running, good */ }
Err(e) => {
error!(err = %e, "failed to check Xvfb status");
}
}
// 2. Start xfreerdp3
info!(display_num, host = %host, port, "starting xfreerdp3");
let xfreerdp_result = xfreerdp::spawn_xfreerdp(
display_num, &host, port, &username, &password, &domain, width, height, &security,
).await;
let mut xfreerdp = match xfreerdp_result {
Ok(child) => child,
Err(e) => {
error!(err = %e, "failed to start xfreerdp3");
send_msg(sink.clone(), ServerMessage::Error {
message: format!("Failed to start xfreerdp3: {}", e),
}).await;
let _ = xvfb.kill().await;
return;
}
};
// Wait a bit for xfreerdp to connect before we start capturing
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
// Check xfreerdp is still running (connection failure shows up here)
match xfreerdp.try_wait() {
Ok(Some(status)) => {
// Capture stderr for diagnostics
let stderr_msg = if let Some(mut stderr) = xfreerdp.stderr.take() {
use tokio::io::AsyncReadExt;
let mut buf = Vec::new();
let _ = stderr.read_to_end(&mut buf).await;
String::from_utf8_lossy(&buf).to_string()
} else {
String::new()
};
error!(?status, stderr = %stderr_msg, "xfreerdp3 exited prematurely");
let user_msg = if stderr_msg.contains("LOGON_FAILURE") {
"RDP authentication failed — check username/password".to_string()
} else if stderr_msg.contains("CONNECT_TRANSPORT_FAILED") || stderr_msg.contains("connect_rdp") {
"Could not reach RDP host — check host/port".to_string()
} else {
format!("xfreerdp3 exited (code {}): {}", status, stderr_msg.lines().filter(|l| l.contains("ERROR")).collect::<Vec<_>>().join("; "))
};
send_msg(sink.clone(), ServerMessage::Error { message: user_msg }).await;
let _ = xvfb.kill().await;
return;
}
Ok(None) => { /* still running */ }
Err(e) => {
error!(err = %e, "failed to check xfreerdp3 status");
}
}
// 3. Connect frame capture and input injector to the X display
let capture_result = FrameCapture::connect(display_num).await;
let input_result = InputInjector::connect(display_num).await;
let mut capture = match capture_result {
Ok(c) => c,
Err(e) => {
error!(err = %e, "failed to connect frame capture");
send_msg(sink.clone(), ServerMessage::Error {
message: format!("Failed to connect to X display: {}", e),
}).await;
xfreerdp::kill_xfreerdp(&mut xfreerdp).await;
let _ = xvfb.kill().await;
return;
}
};
let input = match input_result {
Ok(i) => i,
Err(e) => {
error!(err = %e, "failed to connect input injector");
send_msg(sink.clone(), ServerMessage::Error {
message: format!("Failed to connect input injector: {}", e),
}).await;
xfreerdp::kill_xfreerdp(&mut xfreerdp).await;
let _ = xvfb.kill().await;
return;
}
};
// Notify client that we're connected
send_msg(sink.clone(), ServerMessage::Connected).await;
info!(display_num, "RDP session connected, entering proxy mode");
let input = Arc::new(input);
// 4. Spawn frame capture loop
let frame_sink = sink.clone();
let frame_shutdown = Arc::new(AtomicBool::new(false));
let frame_shutdown_rx = frame_shutdown.clone();
let capture_display = display_num;
let capture_handle = tokio::task::spawn_blocking(move || {
let rt = tokio::runtime::Handle::current();
loop {
// Check for shutdown
if frame_shutdown_rx.load(Ordering::Relaxed) {
break;
}
match capture.capture_frame(JPEG_QUALITY) {
Ok(Some(jpeg_bytes)) => {
let sink = frame_sink.clone();
rt.block_on(async {
let mut s = sink.lock().await;
if s.send(Message::Binary(jpeg_bytes)).await.is_err() {
// WebSocket closed
}
});
}
Ok(None) => {
// Frame unchanged, skip
}
Err(e) => {
warn!(err = %e, display = capture_display, "frame capture error");
break;
}
}
std::thread::sleep(std::time::Duration::from_millis(FRAME_INTERVAL_MS));
}
});
// 5. Process incoming WebSocket messages (input events)
let input_ref = input.clone();
let ws_display = display_num;
while let Some(msg_result) = ws_stream.next().await {
match msg_result {
Ok(Message::Text(text)) => {
match serde_json::from_str::<ClientMessage>(&text) {
Ok(client_msg) => {
if let Err(e) = handle_input(&input_ref, ws_display, client_msg) {
warn!(err = %e, "input injection error");
}
}
Err(e) => {
warn!(err = %e, raw = %text, "failed to parse client message");
}
}
}
Ok(Message::Close(_)) => {
info!(display_num, "client sent close frame");
break;
}
Ok(Message::Ping(data)) => {
let mut s = sink.lock().await;
let _ = s.send(Message::Pong(data)).await;
}
Ok(_) => { /* ignore binary from client, pong, etc */ }
Err(e) => {
warn!(err = %e, "WebSocket receive error");
break;
}
}
}
// 6. Cleanup
info!(display_num, "session ending, cleaning up");
// Signal capture loop to stop
frame_shutdown.store(true, Ordering::Relaxed);
let _ = capture_handle.await;
// Send disconnected message
{
let mut s = sink.lock().await;
let _ = s.send(Message::Text(ServerMessage::Disconnected.to_json())).await;
let _ = s.close().await;
}
// Kill child processes
xfreerdp::kill_xfreerdp(&mut xfreerdp).await;
let _ = xvfb.kill().await;
info!(display_num, "session cleanup complete");
}
}
#[allow(non_snake_case)]
fn handle_input(
input: &InputInjector,
display_num: u32,
msg: ClientMessage,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
match msg {
ClientMessage::MouseMove { x, y } => input.mouse_move(x, y),
ClientMessage::MouseDown { button, x, y } => input.mouse_down(button, x, y),
ClientMessage::MouseUp { button, x, y } => input.mouse_up(button, x, y),
ClientMessage::MouseScroll { delta, x, y } => input.mouse_scroll(delta, x, y),
ClientMessage::KeyDown { keySym } => input.key_down(keySym),
ClientMessage::KeyUp { keySym } => input.key_up(keySym),
ClientMessage::ClipboardWrite { text } => input.set_clipboard(display_num, &text),
ClientMessage::Resize { width, height } => {
// TODO: implement dynamic resolution change via xrandr:
// 1. xrandr --output default --mode {width}x{height} (on the Xvfb display)
// 2. Send /size:{width}x{height} to xfreerdp3 via its control pipe
// 3. Update capture dimensions
warn!(width, height, "resize requested but not yet implemented");
Ok(())
}
ClientMessage::Connect { .. } => {
// Connect is handled at session start, not here
Ok(())
}
}
}

View File

@@ -0,0 +1,57 @@
use dashmap::DashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
use tracing::{info, warn};
/// Tracks active sessions and allocates X display numbers.
///
/// Display numbers start at 10 and increment. Released numbers are NOT reused
/// to avoid races with lingering Xvfb processes. Since each number is a u32,
/// we can run ~4 billion sessions before wrapping — effectively unlimited.
pub struct SessionManager {
/// Maps display_num → session metadata (currently just a marker).
sessions: DashMap<u32, SessionInfo>,
/// Next display number to allocate.
next_display: AtomicU32,
}
pub struct SessionInfo {
pub host: String,
pub username: String,
}
impl SessionManager {
pub fn new() -> Arc<Self> {
Arc::new(Self {
sessions: DashMap::new(),
next_display: AtomicU32::new(10),
})
}
/// Allocate a new display number for a session.
pub fn allocate_display(&self) -> u32 {
let num = self.next_display.fetch_add(1, Ordering::Relaxed);
info!(display_num = num, "allocated display number");
num
}
/// Register a session as active.
pub fn register(&self, display_num: u32, host: String, username: String) {
self.sessions.insert(display_num, SessionInfo { host, username });
info!(display_num, active = self.sessions.len(), "session registered");
}
/// Remove a session when it ends.
pub fn unregister(&self, display_num: u32) {
if self.sessions.remove(&display_num).is_some() {
info!(display_num, active = self.sessions.len(), "session unregistered");
} else {
warn!(display_num, "attempted to unregister unknown session");
}
}
/// Number of active sessions.
pub fn active_count(&self) -> usize {
self.sessions.len()
}
}

97
rdpd/src/xfreerdp.rs Normal file
View File

@@ -0,0 +1,97 @@
use std::process::Stdio;
use tokio::process::{Child, Command};
use tracing::{info, warn};
/// Spawn an xfreerdp3 process targeting the given X display.
///
/// Returns the child process handle. The caller is responsible for killing it
/// on session teardown.
pub async fn spawn_xfreerdp(
display_num: u32,
host: &str,
port: u16,
username: &str,
password: &str,
domain: &str,
width: u16,
height: u16,
security: &str,
) -> std::io::Result<Child> {
let display_str = format!(":{}", display_num);
let geometry = format!("{}x{}", width, height);
let mut cmd = Command::new("xfreerdp3");
cmd.env("DISPLAY", &display_str);
// Connection target
cmd.arg(format!("/v:{}:{}", host, port));
cmd.arg(format!("/u:{}", username));
cmd.arg(format!("/p:{}", password));
if !domain.is_empty() {
cmd.arg(format!("/d:{}", domain));
}
// Display settings
cmd.arg(format!("/size:{}", geometry));
cmd.arg("/bpp:32");
cmd.arg("/gfx");
// Security mode — "tls", "nla", "rdp", or "any"
cmd.arg(format!("/sec:{}", security));
// Accept all certificates for lab/internal use
cmd.arg("/cert:ignore");
// Disable features we don't need
cmd.arg("-decorations");
cmd.arg("-wallpaper");
cmd.arg("-aero");
cmd.arg("-themes");
cmd.arg("-sound");
cmd.arg("-microphone");
// Clipboard redirection — we handle it via our own protocol
// TODO: implement clipboard channel via xclip/xsel monitoring
cmd.arg("+clipboard");
// TODO: audio forwarding — could use /sound:sys:pulse with a per-session PulseAudio sink
// TODO: dynamic resolution change — /dynamic-resolution flag + xrandr in the Xvfb
cmd.stdin(Stdio::null());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
info!(
display_str = %display_str,
host = %host,
port = %port,
username = %username,
geometry = %geometry,
"spawning xfreerdp3"
);
let child = cmd.spawn()?;
info!(pid = child.id().unwrap_or(0), "xfreerdp3 process started");
Ok(child)
}
/// Kill an xfreerdp3 child process gracefully, falling back to SIGKILL.
pub async fn kill_xfreerdp(child: &mut Child) {
if let Some(pid) = child.id() {
info!(pid, "terminating xfreerdp3");
// Try SIGTERM first
unsafe {
libc::kill(pid as i32, libc::SIGTERM);
}
// Give it a moment to exit
match tokio::time::timeout(std::time::Duration::from_secs(3), child.wait()).await {
Ok(Ok(status)) => {
info!(pid, ?status, "xfreerdp3 exited after SIGTERM");
}
_ => {
warn!(pid, "xfreerdp3 did not exit after SIGTERM, sending SIGKILL");
let _ = child.kill().await;
}
}
}
}