86 Base.table.select(List(Base.executable, Base.compressed, Base.body), |
86 Base.table.select(List(Base.executable, Base.compressed, Base.body), |
87 sql = private_data.where_equal(entry_name.session, entry_name.theory, entry_name.name)), |
87 sql = private_data.where_equal(entry_name.session, entry_name.theory, entry_name.name)), |
88 { res => |
88 { res => |
89 val executable = res.bool(Base.executable) |
89 val executable = res.bool(Base.executable) |
90 val compressed = res.bool(Base.compressed) |
90 val compressed = res.bool(Base.compressed) |
91 val bytes = res.bytes(Base.body) |
91 val body = res.bytes(Base.body) |
92 val body = Future.value(compressed, bytes) |
92 Entry(entry_name, executable, compressed, body, cache) |
93 Entry(entry_name, executable, body, cache) |
|
94 } |
93 } |
95 ) |
94 ) |
96 |
95 |
97 def write_entries(db: SQL.Database, entries: List[Entry]): Unit = |
96 def write_entries(db: SQL.Database, entries: List[Entry]): Unit = |
98 db.execute_batch_statement(Base.table.insert(), batch = |
97 db.execute_batch_statement(Base.table.insert(), batch = |
99 for (entry <- entries) yield { (stmt: SQL.Statement) => |
98 for (entry <- entries) yield { (stmt: SQL.Statement) => |
100 val (compressed, bs) = entry.body.join |
|
101 stmt.string(1) = entry.session_name |
99 stmt.string(1) = entry.session_name |
102 stmt.string(2) = entry.theory_name |
100 stmt.string(2) = entry.theory_name |
103 stmt.string(3) = entry.name |
101 stmt.string(3) = entry.name |
104 stmt.bool(4) = entry.executable |
102 stmt.bool(4) = entry.executable |
105 stmt.bool(5) = compressed |
103 stmt.bool(5) = entry.compressed |
106 stmt.bytes(6) = bs |
104 stmt.bytes(6) = entry.body |
107 }) |
105 }) |
108 |
106 |
109 def read_theory_names(db: SQL.Database, session_name: String): List[String] = |
107 def read_theory_names(db: SQL.Database, session_name: String): List[String] = |
110 db.execute_query_statement( |
108 db.execute_query_statement( |
111 Base.table.select(List(Base.theory_name), distinct = true, |
109 Base.table.select(List(Base.theory_name), distinct = true, |
145 |
143 |
146 object Entry { |
144 object Entry { |
147 def apply( |
145 def apply( |
148 entry_name: Entry_Name, |
146 entry_name: Entry_Name, |
149 executable: Boolean, |
147 executable: Boolean, |
150 body: Future[(Boolean, Bytes)], |
148 compressed: Boolean, |
|
149 body: Bytes, |
151 cache: XML.Cache |
150 cache: XML.Cache |
152 ): Entry = new Entry(entry_name, executable, body, cache) |
151 ): Entry = new Entry(entry_name, executable, compressed, body, cache) |
153 |
152 |
154 def empty(theory_name: String, name: String): Entry = |
153 def empty(theory_name: String, name: String): Entry = |
155 Entry(Entry_Name(theory = theory_name, name = name), |
154 Entry(Entry_Name(theory = theory_name, name = name), |
156 false, Future.value(false, Bytes.empty), XML.Cache.none) |
155 false, false, Bytes.empty, XML.Cache.none) |
157 |
156 |
158 def make( |
157 def make( |
159 session_name: String, |
158 session_name: String, |
160 args: Protocol.Export.Args, |
159 args: Protocol.Export.Args, |
161 bytes: Bytes, |
160 bytes: Bytes, |
162 cache: XML.Cache |
161 cache: XML.Cache |
163 ): Entry = { |
162 ): Entry = { |
164 val body = |
163 val (compressed, body) = |
165 if (args.compress) Future.fork(bytes.maybe_compress(cache = cache.compress)) |
164 if (args.compress) bytes.maybe_compress(cache = cache.compress) |
166 else Future.value((false, bytes)) |
165 else (false, bytes) |
167 val entry_name = |
166 val entry_name = |
168 Entry_Name(session = session_name, theory = args.theory_name, name = args.name) |
167 Entry_Name(session = session_name, theory = args.theory_name, name = args.name) |
169 Entry(entry_name, args.executable, body, cache) |
168 Entry(entry_name, args.executable, compressed, body, cache) |
170 } |
169 } |
171 } |
170 } |
172 |
171 |
173 final class Entry private( |
172 final class Entry private( |
174 val entry_name: Entry_Name, |
173 val entry_name: Entry_Name, |
175 val executable: Boolean, |
174 val executable: Boolean, |
176 val body: Future[(Boolean, Bytes)], |
175 val compressed: Boolean, |
|
176 val body: Bytes, |
177 val cache: XML.Cache |
177 val cache: XML.Cache |
178 ) { |
178 ) { |
179 def session_name: String = entry_name.session |
179 def session_name: String = entry_name.session |
180 def theory_name: String = entry_name.theory |
180 def theory_name: String = entry_name.theory |
181 def name: String = entry_name.name |
181 def name: String = entry_name.name |
182 override def toString: String = name |
182 override def toString: String = name |
183 |
183 |
184 def is_finished: Boolean = body.is_finished |
|
185 def cancel(): Unit = body.cancel() |
|
186 |
|
187 def compound_name: String = entry_name.compound_name |
184 def compound_name: String = entry_name.compound_name |
188 |
185 |
189 def name_has_prefix(s: String): Boolean = name.startsWith(s) |
186 def name_has_prefix(s: String): Boolean = name.startsWith(s) |
190 val name_elems: List[String] = explode_name(name) |
187 val name_elems: List[String] = explode_name(name) |
191 |
188 |
192 def name_extends(elems: List[String]): Boolean = |
189 def name_extends(elems: List[String]): Boolean = |
193 name_elems.startsWith(elems) && name_elems != elems |
190 name_elems.startsWith(elems) && name_elems != elems |
194 |
191 |
195 def bytes: Bytes = { |
192 def bytes: Bytes = |
196 val (compressed, bs) = body.join |
193 if (compressed) body.uncompress(cache = cache.compress) else body |
197 if (compressed) bs.uncompress(cache = cache.compress) else bs |
|
198 } |
|
199 |
194 |
200 def text: String = bytes.text |
195 def text: String = bytes.text |
201 |
196 |
202 def yxml: XML.Body = YXML.parse_body(UTF8.decode_permissive(bytes), cache = cache) |
197 def yxml: XML.Body = YXML.parse_body(UTF8.decode_permissive(bytes), cache = cache) |
203 } |
198 } |
252 |
247 |
253 class Consumer private[Export](db: SQL.Database, cache: XML.Cache, progress: Progress) { |
248 class Consumer private[Export](db: SQL.Database, cache: XML.Cache, progress: Progress) { |
254 private val errors = Synchronized[List[String]](Nil) |
249 private val errors = Synchronized[List[String]](Nil) |
255 |
250 |
256 private def consume(args: List[(Entry, Boolean)]): List[Exn.Result[Unit]] = { |
251 private def consume(args: List[(Entry, Boolean)]): List[Exn.Result[Unit]] = { |
257 for ((entry, _) <- args) { |
|
258 if (progress.stopped) entry.cancel() else entry.body.join |
|
259 } |
|
260 private_data.transaction_lock(db, label = "Export.consumer(" + args.length + ")") { |
252 private_data.transaction_lock(db, label = "Export.consumer(" + args.length + ")") { |
261 var known = private_data.known_entries(db, args.map(p => p._1.entry_name)) |
253 var known = private_data.known_entries(db, args.map(p => p._1.entry_name)) |
262 val buffer = new mutable.ListBuffer[Option[Entry]] |
254 val buffer = new mutable.ListBuffer[Option[Entry]] |
263 |
255 |
264 for ((entry, strict) <- args) { |
256 for ((entry, strict) <- args) { |
289 } |
281 } |
290 } |
282 } |
291 |
283 |
292 private val consumer = |
284 private val consumer = |
293 Consumer_Thread.fork_bulk[(Entry, Boolean)](name = "export")( |
285 Consumer_Thread.fork_bulk[(Entry, Boolean)](name = "export")( |
294 bulk = { case (entry, _) => entry.is_finished }, |
286 bulk = _ => true, |
295 consume = args => (args.grouped(20).toList.flatMap(consume), true)) |
287 consume = args => (args.grouped(20).toList.flatMap(consume), true)) |
296 |
288 |
297 def make_entry(session_name: String, args: Protocol.Export.Args, body: Bytes): Unit = { |
289 def make_entry(session_name: String, args: Protocol.Export.Args, body: Bytes): Unit = { |
298 if (!progress.stopped && !body.is_empty) { |
290 if (!progress.stopped && !body.is_empty) { |
299 consumer.send(Entry.make(session_name, args, body, cache) -> args.strict) |
291 consumer.send(Entry.make(session_name, args, body, cache) -> args.strict) |