lsp_server/
socket.rs

1use std::{
2    io::{self, BufReader},
3    net::TcpStream,
4    thread,
5};
6
7use crossbeam_channel::{Receiver, Sender, bounded};
8
9use crate::{
10    Message,
11    stdio::{IoThreads, make_io_threads},
12};
13
14pub(crate) fn socket_transport(
15    stream: TcpStream,
16) -> (Sender<Message>, Receiver<Message>, IoThreads) {
17    let (reader_receiver, reader) = make_reader(stream.try_clone().unwrap());
18    let (writer_sender, writer, messages_to_drop) = make_write(stream);
19    let dropper = std::thread::spawn(move || {
20        messages_to_drop.into_iter().for_each(drop);
21    });
22    let io_threads = make_io_threads(reader, writer, dropper);
23    (writer_sender, reader_receiver, io_threads)
24}
25
26fn make_reader(stream: TcpStream) -> (Receiver<Message>, thread::JoinHandle<io::Result<()>>) {
27    let (reader_sender, reader_receiver) = bounded::<Message>(0);
28    let reader = thread::spawn(move || {
29        let mut buf_read = BufReader::new(stream);
30        while let Some(msg) = Message::read(&mut buf_read).unwrap() {
31            let is_exit = matches!(&msg, Message::Notification(n) if n.is_exit());
32            reader_sender.send(msg).unwrap();
33            if is_exit {
34                break;
35            }
36        }
37        Ok(())
38    });
39    (reader_receiver, reader)
40}
41
42fn make_write(
43    mut stream: TcpStream,
44) -> (Sender<Message>, thread::JoinHandle<io::Result<()>>, Receiver<Message>) {
45    let (writer_sender, writer_receiver) = bounded::<Message>(0);
46    let (drop_sender, drop_receiver) = bounded::<Message>(0);
47    let writer = thread::spawn(move || {
48        writer_receiver
49            .into_iter()
50            .try_for_each(|it| {
51                let result = it.write(&mut stream);
52                let _ = drop_sender.send(it);
53                result
54            })
55            .unwrap();
56        Ok(())
57    });
58    (writer_sender, writer, drop_receiver)
59}