more scalable compression, using Bytes.Builder.Stream;
authorwenzelm
Sat, 15 Jun 2024 21:59:31 +0200
changeset 80377 28dd9b91dfe5
parent 80376 201af0b45e57
child 80378 ab4badc7db7f
more scalable compression, using Bytes.Builder.Stream;
src/Pure/General/bytes.scala
--- a/src/Pure/General/bytes.scala	Sat Jun 15 21:52:14 2024 +0200
+++ b/src/Pure/General/bytes.scala	Sat Jun 15 21:59:31 2024 +0200
@@ -151,6 +151,16 @@
       body(builder)
       builder.done()
     }
+
+    class Stream(hint: Long = 0L) extends OutputStream {
+      val builder = new Builder(hint)
+
+      override def write(b: Int): Unit =
+        { builder += b.toByte }
+
+      override def write(array: Array[Byte], offset: Int, length: Int): Unit =
+        { builder += (array, offset, length) }
+    }
   }
 
   final class Builder private[Bytes](hint: Long) {
@@ -167,6 +177,12 @@
         buffer = new ByteArrayOutputStream
       }
 
+    def += (b: Byte): Unit = synchronized {
+      size += 1
+      buffer.write(b)
+      buffer_check()
+    }
+
     def += (s: CharSequence): Unit = {
       val n = s.length
       if (n > 0) {
@@ -210,7 +226,7 @@
 
     def += (a: Subarray): Unit = { this += (a.array, a.offset, a.length) }
 
-    private def done(): Bytes = synchronized {
+    private[Bytes] def done(): Bytes = synchronized {
       val cs = chunks.toArray
       val b = buffer.toByteArray
       chunks = null
@@ -485,16 +501,16 @@
     options: Compress.Options = Compress.Options(),
     cache: Compress.Cache = Compress.Cache.none
   ): Bytes = {
-    options match {
-      case options_xz: Compress.Options_XZ =>
-        val out = new ByteArrayOutputStream((size min Bytes.array_size).toInt)
-        using(new xz.XZOutputStream(out, options_xz.make, cache.for_xz))(write_stream)
-        Bytes(out.toByteArray)
-      case options_zstd: Compress.Options_Zstd =>
-        Zstd.init()
-        val inp = if (chunks.isEmpty && !is_sliced) chunk0 else make_array
-        Bytes(zstd.Zstd.compress(inp, options_zstd.level))
-    }
+    val bytes = new Bytes.Builder.Stream(hint = size)
+    using(
+      options match {
+        case options_xz: Compress.Options_XZ =>
+          new xz.XZOutputStream(bytes, options_xz.make, cache.for_xz)
+        case options_zstd: Compress.Options_Zstd =>
+          new zstd.ZstdOutputStream(bytes, cache.for_zstd, options_zstd.level)
+      }
+    ) { s => for (a <- subarray_iterator) s.write(a.array, a.offset, a.length) }
+    bytes.builder.done()
   }
 
   def maybe_compress(