Previously, the zlib compressor was initialized once, lazily, then reused for each packet by resetting the stream. This didn't properly emit the zlib headers, so packet compression was broken and caused "java.util.zip.DataFormatException: incorrect header check" on the server. Since packets are only compressed above a threshold, this problem manifested itself only on larger servers, such as 1.10.2 FTB Beyond and Skyfactory 3, and 1.12.2 SevTech: Ages, which require sending a large ModList above the compression threshold enabled by the server. Change to instead recreate the zlib compressor, each time it is used as to capture the full header. For symmetry, the decompressor is also recreated each time. * Removes self.compression_write and self.compression_read instance variables * Remove self.compression_write, initialize with cursor * Log compressing/decompressing packets in network debug mode
This commit is contained in:
parent
0cd02fd2da
commit
b98ba3afd2
|
@ -972,8 +972,6 @@ pub struct Conn {
|
||||||
cipher: Option<Aes128Cfb>,
|
cipher: Option<Aes128Cfb>,
|
||||||
|
|
||||||
compression_threshold: i32,
|
compression_threshold: i32,
|
||||||
compression_read: Option<ZlibDecoder<io::Cursor<Vec<u8>>>>,
|
|
||||||
compression_write: Option<ZlibEncoder<io::Cursor<Vec<u8>>>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Conn {
|
impl Conn {
|
||||||
|
@ -1000,8 +998,6 @@ impl Conn {
|
||||||
protocol_version,
|
protocol_version,
|
||||||
cipher: Option::None,
|
cipher: Option::None,
|
||||||
compression_threshold: -1,
|
compression_threshold: -1,
|
||||||
compression_read: Option::None,
|
|
||||||
compression_write: Option::None,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1016,16 +1012,18 @@ impl Conn {
|
||||||
0
|
0
|
||||||
};
|
};
|
||||||
if self.compression_threshold >= 0 && buf.len() as i32 > self.compression_threshold {
|
if self.compression_threshold >= 0 && buf.len() as i32 > self.compression_threshold {
|
||||||
if self.compression_write.is_none() {
|
|
||||||
self.compression_write = Some(ZlibEncoder::new(io::Cursor::new(Vec::new()), Compression::default()));
|
|
||||||
}
|
|
||||||
extra = 0;
|
extra = 0;
|
||||||
let uncompressed_size = buf.len();
|
let uncompressed_size = buf.len();
|
||||||
let mut new = Vec::new();
|
let mut new = Vec::new();
|
||||||
VarInt(uncompressed_size as i32).write_to(&mut new)?;
|
VarInt(uncompressed_size as i32).write_to(&mut new)?;
|
||||||
let write = self.compression_write.as_mut().unwrap();
|
let mut write = ZlibEncoder::new(io::Cursor::new(buf), Compression::default());
|
||||||
write.reset(io::Cursor::new(buf));
|
|
||||||
write.read_to_end(&mut new)?;
|
write.read_to_end(&mut new)?;
|
||||||
|
let network_debug = unsafe { NETWORK_DEBUG };
|
||||||
|
if network_debug {
|
||||||
|
println!("Compressed for sending {} bytes to {} since > threshold {}, new={:?}",
|
||||||
|
uncompressed_size, new.len(), self.compression_threshold,
|
||||||
|
new);
|
||||||
|
}
|
||||||
buf = new;
|
buf = new;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1039,6 +1037,7 @@ impl Conn {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn read_packet(&mut self) -> Result<packet::Packet, Error> {
|
pub fn read_packet(&mut self) -> Result<packet::Packet, Error> {
|
||||||
|
let network_debug = unsafe { NETWORK_DEBUG };
|
||||||
let len = VarInt::read_from(self)?.0 as usize;
|
let len = VarInt::read_from(self)?.0 as usize;
|
||||||
let mut ibuf = vec![0; len];
|
let mut ibuf = vec![0; len];
|
||||||
self.read_exact(&mut ibuf)?;
|
self.read_exact(&mut ibuf)?;
|
||||||
|
@ -1046,17 +1045,17 @@ impl Conn {
|
||||||
let mut buf = io::Cursor::new(ibuf);
|
let mut buf = io::Cursor::new(ibuf);
|
||||||
|
|
||||||
if self.compression_threshold >= 0 {
|
if self.compression_threshold >= 0 {
|
||||||
if self.compression_read.is_none() {
|
|
||||||
self.compression_read = Some(ZlibDecoder::new(io::Cursor::new(Vec::new())));
|
|
||||||
}
|
|
||||||
let uncompressed_size = VarInt::read_from(&mut buf)?.0;
|
let uncompressed_size = VarInt::read_from(&mut buf)?.0;
|
||||||
if uncompressed_size != 0 {
|
if uncompressed_size != 0 {
|
||||||
let mut new = Vec::with_capacity(uncompressed_size as usize);
|
let mut new = Vec::with_capacity(uncompressed_size as usize);
|
||||||
{
|
{
|
||||||
let reader = self.compression_read.as_mut().unwrap();
|
let mut reader = ZlibDecoder::new(buf);
|
||||||
reader.reset(buf);
|
|
||||||
reader.read_to_end(&mut new)?;
|
reader.read_to_end(&mut new)?;
|
||||||
}
|
}
|
||||||
|
if network_debug {
|
||||||
|
println!("Decompressed threshold={} len={} uncompressed_size={} to {} bytes",
|
||||||
|
self.compression_threshold, len, uncompressed_size, new.len());
|
||||||
|
}
|
||||||
buf = io::Cursor::new(new);
|
buf = io::Cursor::new(new);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1067,8 +1066,6 @@ impl Conn {
|
||||||
Direction::Serverbound => Direction::Clientbound,
|
Direction::Serverbound => Direction::Clientbound,
|
||||||
};
|
};
|
||||||
|
|
||||||
let network_debug = unsafe { NETWORK_DEBUG };
|
|
||||||
|
|
||||||
if network_debug {
|
if network_debug {
|
||||||
println!("about to parse id={:x}, dir={:?} state={:?}", id, dir, self.state);
|
println!("about to parse id={:x}, dir={:?} state={:?}", id, dir, self.state);
|
||||||
std::fs::File::create("last-packet")?.write_all(buf.get_ref())?;
|
std::fs::File::create("last-packet")?.write_all(buf.get_ref())?;
|
||||||
|
@ -1274,8 +1271,6 @@ impl Clone for Conn {
|
||||||
protocol_version: self.protocol_version,
|
protocol_version: self.protocol_version,
|
||||||
cipher: Option::None,
|
cipher: Option::None,
|
||||||
compression_threshold: self.compression_threshold,
|
compression_threshold: self.compression_threshold,
|
||||||
compression_read: Option::None,
|
|
||||||
compression_write: Option::None,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue