quinoa/src/main.rs

923 lines
30 KiB
Rust

#[deny(unused_must_use)]
#[macro_use]
extern crate anyhow;
#[macro_use]
extern crate serde;
#[macro_use]
extern crate tracing;
mod pty;
use std::ffi::{CStr, CString};
use std::future::Future;
use std::mem::ManuallyDrop;
use std::net::SocketAddr;
use std::os::fd::FromRawFd;
use std::os::unix::process::CommandExt;
use std::process::Stdio;
use std::sync::Arc;
use std::task::Poll;
use anyhow::{Context, Result};
use pam_client::ConversationHandler;
use quinn::{RecvStream, SendStream, ReadExactError};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::Command;
use tracing::Instrument;
#[derive(Debug, Clone, Serialize, Deserialize)]
enum Stream {
Exec,
Shell,
// TODO: port forwarding
}
#[tokio::main]
async fn main() -> Result<()> {
tracing::subscriber::set_global_default(
tracing_subscriber::FmtSubscriber::builder()
.with_writer(std::io::stderr)
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.finish(),
)
.unwrap();
let mut args = std::env::args();
_ = args.next();
let ctrl_c = tokio::signal::ctrl_c();
let fut = run_cmd(args);
tokio::select! {
_ = ctrl_c => {
info!("Aborting");
Ok(())
}
r = fut => r,
}
}
async fn run_cmd(mut args: std::env::Args) -> Result<()> {
let cmd = args.next().expect("COMMAND");
match cmd.as_str() {
"server" => run_server().await,
"client" => run_client(args).await,
_ => Err(anyhow!("Unrecognized command: {}", cmd)),
}
}
const ALPN_QUIC_SHELL: &str = "quic-shell";
struct ServerConfig {
shell: String,
listen: SocketAddr,
}
async fn run_server() -> Result<()> {
let cfg = {
let opt_shell = std::env::var("SHELL")
.context("SHELL not defined")?;
let opt_listen = std::env::var("BIND_ADDR")
.unwrap_or_else(|_| "127.0.0.1:8022".to_owned())
.parse()?;
&*Box::leak(Box::new(ServerConfig {
shell: opt_shell,
listen: opt_listen,
}))
};
let subject_alt_names = vec!["localhost".to_string()];
let (cert, key) = if !std::path::Path::new("cert.der").exists() || !std::path::Path::new("key.der").exists() {
let cert = rcgen::generate_simple_self_signed(subject_alt_names)?;
let key = rustls::PrivateKey(cert.serialize_private_key_der());
let cert = rustls::Certificate(cert.serialize_der()?);
std::fs::write("key.der", &key.0)?;
std::fs::write("cert.der", &cert.0)?;
(cert, key)
} else {
let cert = rustls::Certificate(std::fs::read("cert.der")?);
let key = rustls::PrivateKey(std::fs::read("key.der")?);
(cert, key)
};
let mut server_crypto = rustls::ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_single_cert(vec![cert], key)
.unwrap();
server_crypto.alpn_protocols = vec![ALPN_QUIC_SHELL.as_bytes().to_owned()];
let mut transport = transport_config();
transport.max_concurrent_uni_streams(0_u8.into());
let mut server_config = quinn::ServerConfig::with_crypto(Arc::new(server_crypto));
server_config.transport_config(transport.into());
server_config.use_retry(true);
let endpoint = quinn::Endpoint::server(server_config, cfg.listen)?;
info!("listening on {}", endpoint.local_addr()?);
while let Some(conn) = endpoint.accept().await {
info!("connection incoming");
tokio::spawn(async move {
if let Err(e) = greet_conn(cfg, conn).await {
error!("connection failed: {reason}", reason = e.to_string());
}
});
}
Ok(())
}
fn is_broken_pipe<T>(r: &Result<T, std::io::Error>) -> bool {
if let Err(e) = r {
e.kind() == std::io::ErrorKind::BrokenPipe
} else {
false
}
}
fn transport_config() -> quinn::TransportConfig {
let mut transport = quinn::TransportConfig::default();
transport.stream_receive_window((64u32 * 1024 * 1024).into());
transport.send_window(64 * 1024 * 1024);
transport.receive_window((64u32 * 1024 * 1024).into());
transport
}
#[derive(Debug, Clone, Copy)]
struct FinishedEarly;
impl std::fmt::Display for FinishedEarly {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
ReadExactError::FinishedEarly.fmt(f)
}
}
impl std::error::Error for FinishedEarly {
}
async fn read_msg<T: serde::de::DeserializeOwned>(recv: &mut RecvStream) -> Result<Result<T, FinishedEarly>> {
let mut size = [0u8; 2];
match recv.read_exact(&mut size).await {
Ok(()) => {}
Err(ReadExactError::FinishedEarly) => return Ok(Err(FinishedEarly)),
Err(ReadExactError::ReadError(e)) => return Err(e.into()),
}
let size = u16::from_le_bytes(size);
let mut buf = Vec::with_capacity(size.into());
recv.take(size.into()).read_to_end(&mut buf).await?;
Ok(Ok(rmp_serde::from_slice(&buf).with_context(|| format!("reading a {} byte message", size))?))
}
async fn write_msg<T: serde::Serialize>(send: &mut SendStream, value: &T) -> Result<()> {
let buf = rmp_serde::to_vec(value)?;
send.write_all(&u16::try_from(buf.len())?.to_le_bytes())
.await?;
send.write_all(&buf).await?;
Ok(())
}
async fn run_client(mut args: std::env::Args) -> Result<()> {
info!("running client");
let conn_str = args.next().expect("USERNAME@HOST");
let (username, host) = conn_str.split_once('@').expect("USERNAME@HOST");
let mut roots = rustls::RootCertStore::empty();
match std::fs::read("cert.der") {
Ok(cert) => {
roots.add(&rustls::Certificate(cert))?;
}
Err(ref e) if e.kind() == std::io::ErrorKind::NotFound => {
info!("local server certificate not found");
}
Err(e) => {
error!("failed to open local server certificate: {}", e);
}
}
info!("read roots");
let mut client_crypto = rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(roots)
.with_no_client_auth();
client_crypto.alpn_protocols = vec![ALPN_QUIC_SHELL.as_bytes().to_owned()];
let mut transport = transport_config();
transport.keep_alive_interval(Some(std::time::Duration::from_secs(5)));
let mut client_config = quinn::ClientConfig::new(Arc::new(client_crypto));
client_config.transport_config(transport.into());
//let mut endpoint = quinn::Endpoint::client("[::]:0".parse().unwrap())?;
let mut endpoint = quinn::Endpoint::client("0.0.0.0:0".parse().unwrap())?;
endpoint.set_default_client_config(client_config);
info!("connecting");
let conn = endpoint
.connect(host.parse()?, "localhost")?
.await?;
// authenticating client
{
let (mut send, mut recv) = conn.open_bi().await?;
write_msg(&mut send, &auth::Hello {
username: username.to_owned(),
}).await?;
do_auth_prompt(&conn, &mut send, &mut recv).await?;
}
// authenticated client
let _reset = {
use nix::sys::termios::*;
struct Reset(Termios);
impl Drop for Reset {
fn drop(&mut self) {
_ = tcsetattr(libc::STDIN_FILENO, SetArg::TCSAFLUSH, &self.0);
info!("termios reset!");
}
}
let mut termios = tcgetattr(libc::STDIN_FILENO)?;
termios.local_flags.remove(LocalFlags::ECHO);
termios.local_flags.remove(LocalFlags::ICANON);
termios.local_flags.remove(LocalFlags::ISIG);
termios.local_flags.remove(LocalFlags::IEXTEN);
termios.input_flags.remove(InputFlags::IXON);
termios.input_flags.remove(InputFlags::ICRNL);
termios.output_flags.remove(OutputFlags::OPOST);
let reset = Reset(termios.clone());
tcsetattr(libc::STDIN_FILENO, SetArg::TCSAFLUSH, &termios)?;
reset
};
let (mut send, mut recv) = conn.open_bi().await?;
write_msg(&mut send, &Stream::Shell).await?;
info!("connected");
do_shell(&conn, &mut send, &mut recv).await
}
async fn do_shell(conn: &quinn::Connection, send: &mut SendStream, recv: &mut RecvStream) -> Result<()> {
let mut stdin = unsafe { ManuallyDrop::new(tokio::fs::File::from_raw_fd(libc::STDIN_FILENO)) };
let mut stdout = unsafe { ManuallyDrop::new(tokio::fs::File::from_raw_fd(libc::STDOUT_FILENO)) };
let mut stdin_buf = Vec::with_capacity(4096);
//let mut stdout_buf = Vec::with_capacity(4096);
let mut stdout_buf = vec![bytes::Bytes::new(); 128];
let mut stdin_eof = false;
loop {
tokio::select! {
/*r = tokio::io::copy(&mut stdin, &mut send) => {
r?;
info!("EOF on stdin");
}*/
r = stdin.read_buf(&mut stdin_buf), if !stdin_eof => {
if r? == 0 {
stdin_eof = true;
}
send.write_all(&stdin_buf).await?;
stdin_buf.clear();
//info!("sent stdin");
}
r = recv.read_chunks(&mut stdout_buf) => {
if let Some(n) = r? {
for chunk in &stdout_buf[..n] {
stdout.write_all(&chunk).await?;
}
//info!("recv stdout");
}
}
/*r = recv.read_buf(&mut stdout_buf) => if r? > 0 {
stdout.write_all(&stdout_buf).await?;
stdout_buf.clear();
//info!("recv stdout");
}*/,
r = send.stopped() => {
info!("Remote disconnected");
let code = r?.into_inner();
if code == 0 {
return Ok(());
} else {
return Err(anyhow!("Error code {}", code));
}
}
e = conn.closed() => {
info!("Remote disconnected: {}", e);
return Err(anyhow!("Remote connection closed"));
}
}
}
}
async fn do_auth_prompt(conn: &quinn::Connection, send: &mut SendStream, recv: &mut RecvStream) -> Result<()> {
use auth::*;
let mut stdout = unsafe { ManuallyDrop::new(tokio::fs::File::from_raw_fd(libc::STDOUT_FILENO)) };
loop {
tokio::select! {
r = read_msg::<Question>(recv) => {
match r?? {
Question::LoggedIn => {
return Ok(());
}
Question::Prompt {
prompt,
echo,
} => {
let mut prompt = prompt.into_bytes();
prompt.push(b' ');
stdout.write_all(&prompt).await?;
let answer = rpassword::read_password()?;
let answer = CString::new(answer)?;
write_msg(send, &Answer::Prompt(Ok(answer))).await?;
},
Question::TextInfo(s) => {
stdout.write_all(b"INFO ").await?;
stdout.write_all(s.as_bytes()).await?;
stdout.write_all(b"\n").await?;
},
Question::ErrorMsg(s) => {
stdout.write_all(b"ERRO ").await?;
stdout.write_all(s.as_bytes()).await?;
stdout.write_all(b"\n").await?;
},
}
}
r = send.stopped() => {
info!("Remote disconnected");
let code = r?.into_inner();
if code == 0 {
return Ok(());
} else {
return Err(anyhow!("Error code {}", code));
}
}
e = conn.closed() => {
info!("Remote disconnected: {}", e);
return Err(anyhow!("Remote connection closed"));
}
}
}
}
async fn greet_conn(cfg: &'static ServerConfig, conn: quinn::Connecting) -> Result<()> {
info!("greeting connection");
let conn = conn.await?;
let span = info_span!(
"connection",
remote = %conn.remote_address(),
protocol = %conn
.handshake_data()
.unwrap()
.downcast::<quinn::crypto::rustls::HandshakeData>().unwrap()
.protocol
.map_or_else(|| "<none>".into(), |x| String::from_utf8_lossy(&x).into_owned())
);
if let Err(e) = authenticate_conn(cfg, &conn)
.instrument(span)
.await {
error!("handler failed: {reason}", reason = e.to_string());
conn.close(1u8.into(), b"handler error");
}
Ok(())
}
mod auth {
use std::ffi::CString;
#[derive(Debug, Serialize, Deserialize)]
pub struct Hello {
pub username: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum Question {
Prompt {
prompt: CString,
echo: bool,
},
TextInfo(CString),
ErrorMsg(CString),
LoggedIn,
}
#[derive(Debug, Serialize, Deserialize)]
pub enum Answer {
Prompt(Result<CString, pam_client::ErrorCode>),
}
}
async fn authenticate_conn(cfg: &'static ServerConfig, conn: &quinn::Connection) -> Result<()> {
use auth::*;
info!("authenticating connection");
let (mut send, mut recv) = conn.accept_bi().await?;
let hello = read_msg::<Hello>(&mut recv).await??;
let (q_send, q_recv) = flume::bounded(1);
let (a_send, a_recv) = flume::bounded(1);
struct Conversation {
send: flume::Sender<Question>,
recv: flume::Receiver<Answer>,
}
impl Conversation {
const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
fn ask(&self, question: Question) -> Result<(), pam_client::ErrorCode> {
self.send.send_timeout(question, Self::TIMEOUT).map_err(|_| pam_client::ErrorCode::ABORT)
}
fn answer(&self) -> Result<Answer, pam_client::ErrorCode> {
self.recv.recv_timeout(Self::TIMEOUT).map_err(|_| pam_client::ErrorCode::ABORT)
}
}
impl ConversationHandler for Conversation {
fn prompt_echo_on(&mut self, prompt: &CStr) -> std::result::Result<std::ffi::CString, pam_client::ErrorCode> {
self.ask(Question::Prompt {
prompt: prompt.to_owned(),
echo: true,
})?;
match self.answer()? {
Answer::Prompt(r) => r,
}
}
fn prompt_echo_off(&mut self, prompt: &CStr) -> std::result::Result<std::ffi::CString, pam_client::ErrorCode> {
self.ask(Question::Prompt {
prompt: prompt.to_owned(),
echo: false,
})?;
match self.answer()? {
Answer::Prompt(r) => r,
}
}
fn text_info(&mut self, msg: &CStr) {
_ = self.ask(Question::TextInfo(msg.to_owned()));
}
fn error_msg(&mut self, msg: &CStr) {
_ = self.ask(Question::ErrorMsg(msg.to_owned()));
}
}
let hdl = tokio::task::spawn_blocking(move || {
let mut ctx = pam_client::Context::new("sshd", Some(&hello.username), Conversation {
send: q_send,
recv: a_recv,
})?;
info!("created context");
ctx.authenticate(pam_client::Flag::NONE)?;
info!("authenticated user");
ctx.acct_mgmt(pam_client::Flag::NONE)?;
info!("validated user");
let sess = ctx.open_session(pam_client::Flag::NONE)?;
info!("opened session");
let sess = sess.leak();
let conv = ctx.conversation_mut();
conv.send = flume::bounded(0).0;
conv.recv = flume::bounded(0).1;
Result::<_>::Ok((ctx, sess))
});
while let Ok(question) = q_recv.recv_async().await {
debug!("received question: {:?}", question);
write_msg(&mut send, &question).await?;
if matches!(question, Question::Prompt { .. }) {
let answer = read_msg(&mut recv).await??;
trace!("received answer: {:?}", answer);
a_send.send_async(answer).await?;
}
/*match question {
Question::Prompt { prompt, echo } => {
let r = async {
// FIXME: actually disable echo
send.write_all(prompt.as_bytes()).await?;
send.write_all(b" ").await?;
let erase = format!("\x1b[{}G\x1b[K", prompt.as_bytes().len() + 1 + 1);
let mut buf = Vec::new();
'prompt: loop {
let mut i = buf.len();
recv.read_buf(&mut buf).await?;
let mut j = i;
while j < buf.len() {
match buf[j] {
0x7f => {
buf.remove(j);
if j > 0 {
if j == i {
i -= 1;
}
buf.remove(j-1);
// erase in line, move cursor left 1 column
send.write_all(erase.as_bytes()).await?;
send.write_all(&buf).await?;
j -= 1;
}
}
0x3 => {
send.write_all(b"\r\n").await?;
return Err(anyhow!("Aborted by the user"));
}
b'\r' => {
buf.remove(j);
}
b'\n' => {
info!("found \\n");
// remove newline and trailing chars
buf.truncate(j);
break 'prompt;
}
_ => {
j += 1;
}
}
}
let seg = &buf[i..];
if echo {
send.write_all(&seg).await?;
} else {
send.write_all(&vec![b'*'; seg.len()]).await?;
}
info!("{:?} ({:x?})", std::str::from_utf8(&buf), buf);
}
let buf = CString::new(buf)?;
send.write_all(b"\n").await?;
Result::<_>::Ok(buf)
}.await;
a_send.send_async(Answer::Prompt(r.map_err(|e| {
error!("PAM error: {}", e);
pam_client::ErrorCode::ABORT
}))).await?;
}
Question::TextInfo(s) => {
send.write_all(b"INFO ").await?;
send.write_all(s.as_bytes()).await?;
send.write_all(b"\n").await?;
}
Question::ErrorMsg(s) => {
send.write_all(b"ERRO ").await?;
send.write_all(s.as_bytes()).await?;
send.write_all(b"\n").await?;
}
}*/
}
let (mut ctx, sess) = hdl.await??;
let sess = ctx.unleak_session(sess);
info!("logged in: {}", sess.envlist());
write_msg(&mut send, &Question::LoggedIn).await?;
send.finish().await?;
recv.stop(0u8.into())?;
handle_conn(cfg, conn).await
}
async fn handle_conn(cfg: &'static ServerConfig, conn: &quinn::Connection) -> Result<()> {
info!("established");
loop {
let stream = conn.accept_bi().await;
let (send, mut recv) = match stream {
Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
info!("connection closed");
return Ok(());
}
Err(e) => {
return Err(e.into());
}
Ok(s) => s,
};
let stream = read_msg::<Stream>(&mut recv).await??;
let span = info_span!(
"stream",
r#type = ?stream
);
tokio::task::spawn(
async move {
let r = match stream {
Stream::Exec => handle_stream_exec(cfg, send, recv).await,
Stream::Shell => handle_stream_shell(cfg, send, recv).await,
};
if let Err(e) = r {
error!("Error in stream handler: {e}");
}
}
.instrument(span),
);
}
}
async fn handle_stream_exec(
cfg: &ServerConfig,
mut send: SendStream,
mut recv: RecvStream,
) -> Result<()> {
let mut cmd = std::process::Command::new(&cfg.shell);
if cfg.shell == "bash" || cfg.shell.ends_with("/bash") {
cmd.arg("-i");
}
cmd.stdout(Stdio::piped())
.stderr(Stdio::piped())
.stdin(Stdio::piped());
#[cfg(target_family = "unix")]
cmd.process_group(0);
info!("Running {:?}", cmd);
let mut sh = Command::from(cmd).kill_on_drop(true).spawn()?;
let mut stdout = sh.stdout.take().unwrap();
let mut stderr = sh.stderr.take().unwrap();
let mut stdin = sh.stdin.take().unwrap();
let mut stdout_buf = Vec::with_capacity(4096);
let mut stderr_buf = Vec::with_capacity(4096);
let mut stdin_buf = Vec::with_capacity(4096);
let mut stdout_eof = false;
let mut stderr_eof = false;
loop {
tokio::select! {
r = sh.wait() => {
let code = r?;
send.finish().await?;
if !code.success() {
info!("Child exit: {}", code);
recv.stop(1u8.into())?;
return Ok(());
} else {
info!("Child exit");
recv.stop(0u8.into())?;
return Ok(());
}
}
r = stdout.read_buf(&mut stdout_buf), if !stdout_eof => {
if is_broken_pipe(&r) || r? == 0 {
stdout_eof = true;
info!("stdout eof");
} else {
send.write_all(&stdout_buf).await?;
info!("sent stdout: {:x?}", stdout_buf);
stdout_buf.clear();
}
},
r = stderr.read_buf(&mut stderr_buf), if !stderr_eof => {
if is_broken_pipe(&r) || r? == 0 {
stderr_eof = true;
info!("stderr eof");
} else {
send.write_all(&stderr_buf).await?;
stderr_buf.clear();
info!("sent stderr: {:x?}", stderr_buf);
}
},
r = recv.read_buf(&mut stdin_buf) => if r? > 0 {
stdin.write_all(&stdin_buf).await?;
stdin_buf.clear();
info!("recv stdin");
},
}
}
}
async fn handle_stream_shell(
cfg: &ServerConfig,
mut send: SendStream,
mut recv: RecvStream,
) -> Result<()> {
let args = if cfg.shell == "bash" || cfg.shell.ends_with("/bash") {
vec![CStr::from_bytes_with_nul(b"-i\0")?]
} else {
vec![]
};
let mut opt_shell_with_nul = Vec::with_capacity(cfg.shell.len() + 1);
opt_shell_with_nul.extend(cfg.shell.as_bytes());
opt_shell_with_nul.push(0);
let opt_shell = CStr::from_bytes_with_nul(&opt_shell_with_nul)?;
let mut sh = pty::create_pty(opt_shell, &args)?;
sh.set_nodelay()?;
let mut pty = sh.pty;
//let mut pty = tokio::io::unix::AsyncFd::with_interest(pty, tokio::io::Interest::READABLE)?;
info!("created pty");
//let mut stdin_buf = Vec::with_capacity(4096);
let mut pty_buf = Vec::with_capacity(4096);
//let mut pty_buf = [0u8];
let (_waker_send, mut waker_recv) = flume::bounded::<()>(1);
/*let fd = pty.as_raw_fd();
std::thread::spawn(move || {
let mut set = [nix::poll::PollFd::new(fd.as_raw_fd(), nix::poll::PollFlags::POLLIN)];
loop {
if let Ok(n) = nix::poll::poll(&mut set, -1) {
if n != 0 {
if waker_send.send(()).is_err() {
break;
}
}
}
}
});*/
loop {
/*if let Some(code) = sh.proc.try_wait()? {
send.finish().await?;
if code != 0 {
info!("Child exit: {}", code);
recv.stop(1u8.into())?;
return Ok(());
} else {
info!("Child exit");
recv.stop(0u8.into())?;
return Ok(());
}
}*/
//let mut redraw = tokio::time::interval(std::time::Duration::from_millis(50));
struct Wait<'a> {
proc: &'a pty::Proc,
}
impl<'a> Wait<'a> {
pub fn new(proc: &'a pty::Proc) -> Self {
Self { proc }
}
}
impl<'a> Future for Wait<'a> {
type Output = std::io::Result<i32>;
fn poll(self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
match self.proc.try_wait() {
Ok(Some(code)) => Poll::Ready(Ok(code)),
Ok(None) => Poll::Pending,
Err(e) => Poll::Ready(Err(e)),
}
}
}
/*pin_project_lite::pin_project! {
struct SelectRead<T: AsRawFd> {
//#[allow(dead_code)]
fd: T,
#[pin]
list: triggered::Listener,
}
}
impl<T: AsRawFd> SelectRead<T> {
pub fn new(fd: T) -> Self {
let (trig, list) = triggered::trigger();
Self { fd, list, set }
}
}
impl<T: AsRawFd> Future for SelectRead<T> {
type Output = nix::Result<()>;
fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Self::Output> {
//let mut this = self;
let this = self.project();
match this.list.poll(cx) {
Poll::Ready(()) => {
let r = nix::poll::poll(this.set, 0);
match r {
Ok(0) => {
cx.waker().wake_by_ref();
Poll::Pending
}
Ok(_) => Poll::Ready(Ok(())),
Err(e) => Poll::Ready(Err(e)),
}
},
Poll::Pending => {
//cx.waker().wake_by_ref();
Poll::Pending
},
}
//let mut set = *this.set;
/*let r = nix::sys::select::select(
set.highest().unwrap() + 1,
&mut set,
None,
None,
&mut nix::sys::time::TimeVal::new(0, 0),
);*/
}
}*/
async fn read_pty(
//pty: &mut tokio::io::unix::AsyncFd<tokio::fs::File>,
pty: &mut tokio::fs::File,
_waker_recv: &mut flume::Receiver<()>,
buf: &mut Vec<u8>,
send: &mut SendStream,
) -> Result<()> {
loop {
//let mut pty = pty.readable_mut().await?;
//let pty = pty.get_inner_mut();
//SelectRead::new(pty.as_fd()).await?;
//_ = waker_recv.recv_async().await;
let r = pty.read_buf(buf).await;
//_ = waker_recv.try_recv();
if let Err(e) = r {
if e.raw_os_error() == Some(35) {
//info!("not ready: {}", e);
//tokio::task::yield_now().await;
//tokio::time::sleep(std::time::Duration::from_millis(1)).await;
} else {
return Err(e.into());
}
} else if buf.len() == 0 {
info!("not ready: empty");
//tokio::task::yield_now().await;
//tokio::time::sleep(std::time::Duration::from_millis(1)).await;
} else {
//return Ok(());
send.write_all(&buf).await?;
buf.clear();
//info!("sent pty");
}
}
}
tokio::select! {
/*_ = redraw.tick() => {
sh.pty.read_buf(&mut pty_buf).await?;
send.write_all(&pty_buf).await?;
pty_buf.clear();
info!("redraw complete");
}*/
r = Wait::new(&sh.proc) => {
let code = r?;
send.finish().await?;
if code != 0 {
info!("Child exit: {}", code);
recv.stop(1u8.into())?;
return Ok(());
} else {
info!("Child exit");
recv.stop(0u8.into())?;
return Ok(());
}
}
/*r = tokio::io::copy(&mut pty, &mut send) => {
r?;
}*/
r = read_pty(&mut pty, &mut waker_recv, &mut pty_buf, &mut send) => {
r?;
}
// FIXME: figure out a maximum chunk size
r = recv.read_chunk(usize::MAX, true) => {
if let Some(chunk) = r? {
pty.write_all(&chunk.bytes).await?;
info!("recv stdin");
}
}
/*r = recv.read_buf(&mut stdin_buf) => if r? > 0 {
//pty.get_mut().write_all(&stdin_buf).await?;
pty.write_all(&stdin_buf).await?;
stdin_buf.clear();
info!("recv stdin");
},*/
}
}
}