1use std::{
2 io::{self, BufReader},
3 net::TcpStream,
4 thread,
5};
6
7use crossbeam_channel::{Receiver, Sender, bounded};
8
9use crate::{
10 Message,
11 stdio::{IoThreads, make_io_threads},
12};
13
14pub(crate) fn socket_transport(
15 stream: TcpStream,
16) -> (Sender<Message>, Receiver<Message>, IoThreads) {
17 let (reader_receiver, reader) = make_reader(stream.try_clone().unwrap());
18 let (writer_sender, writer, messages_to_drop) = make_write(stream);
19 let dropper = std::thread::spawn(move || {
20 messages_to_drop.into_iter().for_each(drop);
21 });
22 let io_threads = make_io_threads(reader, writer, dropper);
23 (writer_sender, reader_receiver, io_threads)
24}
25
26fn make_reader(stream: TcpStream) -> (Receiver<Message>, thread::JoinHandle<io::Result<()>>) {
27 let (reader_sender, reader_receiver) = bounded::<Message>(0);
28 let reader = thread::spawn(move || {
29 let mut buf_read = BufReader::new(stream);
30 while let Some(msg) = Message::read(&mut buf_read).unwrap() {
31 let is_exit = matches!(&msg, Message::Notification(n) if n.is_exit());
32 reader_sender.send(msg).unwrap();
33 if is_exit {
34 break;
35 }
36 }
37 Ok(())
38 });
39 (reader_receiver, reader)
40}
41
42fn make_write(
43 mut stream: TcpStream,
44) -> (Sender<Message>, thread::JoinHandle<io::Result<()>>, Receiver<Message>) {
45 let (writer_sender, writer_receiver) = bounded::<Message>(0);
46 let (drop_sender, drop_receiver) = bounded::<Message>(0);
47 let writer = thread::spawn(move || {
48 writer_receiver
49 .into_iter()
50 .try_for_each(|it| {
51 let result = it.write(&mut stream);
52 let _ = drop_sender.send(it);
53 result
54 })
55 .unwrap();
56 Ok(())
57 });
58 (writer_sender, writer, drop_receiver)
59}