Fix packet compression (fixes FTB Beyond, etc.). Closes #146 (#147)

Previously, the zlib compressor was initialized once, lazily, then reused for each packet by resetting the stream. This didn't properly emit the zlib headers, so packet compression was broken and caused "java.util.zip.DataFormatException: incorrect header check" on the server. Since packets are only compressed above a threshold, this problem manifested itself only on larger servers, such as 1.10.2 FTB Beyond and Skyfactory 3, and 1.12.2 SevTech: Ages, which require sending a large ModList above the compression threshold enabled by the server. Change to instead recreate the zlib compressor, each time it is used as to capture the full header. For symmetry, the decompressor is also recreated each time.

* Removes self.compression_write and self.compression_read instance variables

* Remove self.compression_write, initialize with cursor

* Log compressing/decompressing packets in network debug mode
This commit is contained in:
iceiix 2019-05-12 12:15:02 -07:00 committed by GitHub
parent 2451e780bd
commit 83c848fa6f
1 changed files with 13 additions and 18 deletions

View File

@ -972,8 +972,6 @@ pub struct Conn {
cipher: Option<Aes128Cfb>,
compression_threshold: i32,
compression_read: Option<ZlibDecoder<io::Cursor<Vec<u8>>>>,
compression_write: Option<ZlibEncoder<io::Cursor<Vec<u8>>>>,
}
impl Conn {
@ -1000,8 +998,6 @@ impl Conn {
protocol_version,
cipher: Option::None,
compression_threshold: -1,
compression_read: Option::None,
compression_write: Option::None,
})
}
@ -1016,16 +1012,18 @@ impl Conn {
0
};
if self.compression_threshold >= 0 && buf.len() as i32 > self.compression_threshold {
if self.compression_write.is_none() {
self.compression_write = Some(ZlibEncoder::new(io::Cursor::new(Vec::new()), Compression::default()));
}
extra = 0;
let uncompressed_size = buf.len();
let mut new = Vec::new();
VarInt(uncompressed_size as i32).write_to(&mut new)?;
let write = self.compression_write.as_mut().unwrap();
write.reset(io::Cursor::new(buf));
let mut write = ZlibEncoder::new(io::Cursor::new(buf), Compression::default());
write.read_to_end(&mut new)?;
let network_debug = unsafe { NETWORK_DEBUG };
if network_debug {
println!("Compressed for sending {} bytes to {} since > threshold {}, new={:?}",
uncompressed_size, new.len(), self.compression_threshold,
new);
}
buf = new;
}
@ -1039,6 +1037,7 @@ impl Conn {
}
pub fn read_packet(&mut self) -> Result<packet::Packet, Error> {
let network_debug = unsafe { NETWORK_DEBUG };
let len = VarInt::read_from(self)?.0 as usize;
let mut ibuf = vec![0; len];
self.read_exact(&mut ibuf)?;
@ -1046,17 +1045,17 @@ impl Conn {
let mut buf = io::Cursor::new(ibuf);
if self.compression_threshold >= 0 {
if self.compression_read.is_none() {
self.compression_read = Some(ZlibDecoder::new(io::Cursor::new(Vec::new())));
}
let uncompressed_size = VarInt::read_from(&mut buf)?.0;
if uncompressed_size != 0 {
let mut new = Vec::with_capacity(uncompressed_size as usize);
{
let reader = self.compression_read.as_mut().unwrap();
reader.reset(buf);
let mut reader = ZlibDecoder::new(buf);
reader.read_to_end(&mut new)?;
}
if network_debug {
println!("Decompressed threshold={} len={} uncompressed_size={} to {} bytes",
self.compression_threshold, len, uncompressed_size, new.len());
}
buf = io::Cursor::new(new);
}
}
@ -1067,8 +1066,6 @@ impl Conn {
Direction::Serverbound => Direction::Clientbound,
};
let network_debug = unsafe { NETWORK_DEBUG };
if network_debug {
println!("about to parse id={:x}, dir={:?} state={:?}", id, dir, self.state);
std::fs::File::create("last-packet")?.write_all(buf.get_ref())?;
@ -1274,8 +1271,6 @@ impl Clone for Conn {
protocol_version: self.protocol_version,
cipher: Option::None,
compression_threshold: self.compression_threshold,
compression_read: Option::None,
compression_write: Option::None,
}
}
}