/usr/share/scheme48-1.9/big/pipe.scm is in scheme48 1.9-5.
This file is owned by root:root, with mode 0o644.
The actual contents of the file can be viewed below.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 | ; Part of Scheme 48 1.9. See file COPYING for notices and license.
; Authors: Richard Kelsey, Jonathan Rees, Robert Ransom
; Pipes.
;
; This would be easy except that we have to deal with threads (and who else
; would be using pipes?).
;
; Pipes either have a fixed set of buffers which are continually recycled or
; create new buffers as needed. Having a fixed buffer supply keeps readers
; and writers more-or-less synchronised, while creating buffers as needed allows
; the writers to get arbitrarily far ahead of the readers.
;
; A fixed-buffer pipe has two buffers. At any point one is acting as the
; input buffer and the other as the output buffer. When the input buffer is
; empty the two are swapped.
;
; The complexity of the code below comes from having to deal with the two
; blocking situations:
; - a read is done when all buffers are empty
; - a write is done when the output buffer is full and the input buffer
; is non-empty (and we aren't allowed to make more buffers)
;
; If a read occurs when all buffers are empty we swap a zero-length buffer in
; for the output-buffer and block on a condition variable. The zero-length
; buffer guarantees that the reading thread will be awakened when the next
; write occurs. When a write occurs with a zero-length buffer we swap in the
; real buffer, do the write, and then set the input condition variable.
;
; When a write occurs with the write buffer full for a pipe without a fixed
; set of buffers the full buffer is added to a queue.
;
; For a pipe with only two buffers, if a write occurs with the write buffer
; full and the read buffer non-empty, we set the read-limit to be one shorter
; than its real value and block on a condition variable. The bogus read-limit
; means that the writing thread will be woken when a read empties the buffer
; and not have to wait until the following read. When a read reaches the
; read-limit we check to see if there are waiting output threads. If so, we
; write one more character and then wake the sleepers.
;
; If this were a little more integrated with the threads package pipes
; could use queues instead of making new condition variables all the time.
(define-record-type pipe-data :pipe-data
(make-pipe-data lock in-condvar out-condvar queue out-buffer)
pipe-data?
(lock pipe-lock) ; a lock for keeping this pipe single threaded
(in-condvar ; waiting for a non-empty buffer
pipe-in-condvar
set-pipe-in-condvar!)
(out-condvar ; waiting for an empty buffer
pipe-out-condvar
set-pipe-out-condvar!)
(queue ; queue of full buffers, or #f for a pipe with a fixed
pipe-buffer-queue); buffer set
(out-buffer ; stashed output buffer
pipe-out-buffer
set-pipe-out-buffer!))
(define (lock pipe)
(obtain-lock (pipe-lock (port-data pipe))))
(define (unlock pipe)
(release-lock (pipe-lock (port-data pipe))))
; Swap the buffers and initialize the various buffer pointers.
(define (swap-buffers! port)
(let ((temp (port-in-buffer port)))
(set-port-in-buffer! port (port-out-buffer port))
(set-port-out-buffer! port temp)
(set-port-in-limit! port (port-out-index port))
(set-port-in-index! port 0)
(set-port-out-index! port 0)))
; Get a full buffer from the queue.
(define (use-buffer-from-queue port)
(let ((queue (pipe-buffer-queue (port-data port))))
(if (and queue (not (queue-empty? queue)))
(let ((buffer (dequeue! queue)))
(set-port-in-buffer! port buffer)
(set-port-in-index! port 0)
(set-port-in-limit! port (code-vector-length buffer))
#t)
#f)))
;----------------------------------------------------------------
; Input buffers
; Get a non-empty input buffer, if possible. We have five options:
; 1. The current buffer isn't empty.
; 2. The current buffer looks empty but really isn't; the limit was
; decremented by a writer who wants an empty buffer.
; 3. There is a queue and it has a full buffer.
; 4. The output buffer isn't empty.
; 5. The output port is still open and may produce characters in the future.
; If there are no characters and the output port is closed we lose.
(define (get-in-buffer port)
(cond ((> (port-in-limit port)
(port-in-index port))
#t)
((pipe-out-condvar (port-data port))
(set-port-in-limit! port (+ 1 (port-in-limit port)))
#t)
((use-buffer-from-queue port)
#t)
((< 0 (port-out-index port))
(swap-buffers! port)
#t)
((open-output-port? port)
(wait-for-input port)
(get-in-buffer port))
(else
#f)))
; Wait on the input condition variable. If there isn't one, we make a
; new condition variable and swap in a zero-length write buffer to get the
; condition variable set as soon as a write occurs.
(define (wait-for-input port)
(let* ((data (port-data port))
(cv (if (pipe-in-condvar data)
(pipe-in-condvar data)
(let ((cv (make-condvar)))
(set-pipe-out-buffer! (port-data port) (port-out-buffer port))
(set-port-out-buffer! port (make-code-vector 0 0))
(set-pipe-in-condvar! data cv)
cv))))
(release-lock (pipe-lock data))
(condvar-ref cv)
(obtain-lock (pipe-lock data))))
; Wake any threads waiting for input if there are characters available.
(define (wake-any-input-waiters port)
(let ((data (port-data port)))
(let ((cv (pipe-in-condvar data)))
(if (and cv
(or (< 0 (port-out-index port))
(not (open-output-port? port))))
(begin
(set-pipe-in-condvar! data #f)
(condvar-set! cv (unspecific)))))))
;----------------------------------------------------------------
; Output buffers
; Get a non-full output buffer, if possible. We have five options:
; 1. The current buffer has room.
; 2. The current buffer looks full but really isn't; it is a zero-length
; buffer swapped in by a reader who wants characters.
; 3. There is a queue for full buffers.
; 4. The input buffer is empty.
; 5. The input port is still open and may empty its buffer later on.
; If there are no empty buffers and the input port is closed we lose.
(define (get-out-buffer port)
(let ((len (code-vector-length (port-out-buffer port))))
(cond ((< (port-out-index port) len)
#t)
((= 0 len)
(set-port-out-buffer! port (pipe-out-buffer (port-data port)))
#t)
((pipe-buffer-queue (port-data port))
(make-new-out-buffer port)
#t)
((= (port-in-index port) (port-in-limit port))
(swap-buffers! port)
#t)
((open-input-port? port)
(wait-for-output port)
(get-out-buffer port))
(else
#f))))
; Make a new output buffer and put the full one on the queue.
(define (make-new-out-buffer port)
(let* ((old (port-out-buffer port))
(new (make-code-vector (code-vector-length old) 0)))
(enqueue! (pipe-buffer-queue (port-data port)) old)
(set-port-out-buffer! port new)
(set-port-out-index! port 0)))
; Same as above, on a different condition variable and with a different
; wakeup method.
(define (wait-for-output port)
(let* ((data (port-data port))
(cv (if (pipe-out-condvar data)
(pipe-out-condvar data)
(let ((cv (make-condvar)))
(set-port-in-limit! port (- (port-in-limit port) 1))
(set-pipe-out-condvar! data cv)
cv))))
(release-lock (pipe-lock data))
(condvar-ref cv)
(obtain-lock (pipe-lock data))))
(define (wake-any-output-waiters port)
(let ((data (port-data port)))
(let ((cv (pipe-out-condvar data)))
(if (and cv
(or (= (port-in-limit port) (port-in-index port))
(not (open-output-port? port))))
(begin
(set-pipe-out-condvar! data #f)
(condvar-set! cv (unspecific)))))))
; Used by PEEK-CHAR to reset the wakeup limit.
(define (do-not-disturb-output-waiters port)
(if (pipe-out-condvar (port-data port))
(set-port-in-limit! port (- (port-in-limit port) 1))))
; Close both ports and wake up any sleepers.
(define (close-pipe port close-input?)
(lock port)
(if close-input?
(make-input-port-closed! port))
(make-output-port-closed! port)
(wake-any-input-waiters port)
(wake-any-output-waiters port)
(unlock port))
;----------------------------------------------------------------
; The actual handler
(define pipe-handler
(make-port-handler
;; discloser
(lambda (port)
(list 'pipe))
;; input port methods --------------------------
;; close-input-port
(lambda (port)
(close-pipe port #t))
;; The next three methods are called when the input buffer is empty
;; read-char
(lambda (port)
(lock port)
(cond ((get-in-buffer port)
(let ((c (read-char port)))
(wake-any-output-waiters port)
(unlock port)
c))
(else
(unlock port)
(eof-object))))
;; peek-char
(lambda (port)
(lock port)
(cond ((get-in-buffer port)
(let ((c (peek-char port)))
(do-not-disturb-output-waiters port)
(unlock port)
c))
(else
(unlock port)
(eof-object))))
;; char-ready?
(lambda (port)
(> (port-out-index port) 0))
;; read-block - the buffer has fewer than COUNT characters
(lambda (thing start count port)
(lock port)
(let loop ((start start) (count count))
(let* ((index (port-in-index port))
(have (min (- (port-in-limit port) index)
count)))
(cond ((> have 0)
(copy! (port-in-buffer port) index thing start have)
(set-port-in-index! port (+ index have))))
(wake-any-output-waiters port)
(cond ((= have count)
(unlock port))
((get-in-buffer port)
(loop (+ start have) (- count have)))
(else
(unlock port)
(eof-object))))))
;; output port methods -------------------------
;; close-output-port
(lambda (port)
(close-pipe port #f))
;; write-char got a full buffer
(lambda (char port)
(lock port)
(cond ((get-out-buffer port)
(write-char char port)
(wake-any-input-waiters port)
(unlock port))
(else
(unlock port)
(assertion-violation 'write-char "writing to a broken pipe"))))
;; write-block couldn't fit COUNT characters into the buffer
(lambda (thing start count port)
(lock port)
(let loop ((start start) (count count))
(cond ((get-out-buffer port)
(let* ((buffer (port-out-buffer port))
(index (port-out-index port))
(have (min (- (code-vector-length buffer) index)
count)))
(cond ((> have 0)
(copy! thing start buffer index have)
(set-port-out-index! port (+ index have))))
(wake-any-input-waiters port)
(if (= have count)
(unlock port)
(loop (+ start have) (- count have)))))
(else
(unlock port)
(assertion-violation 'write-block "writing to a broken pipe")))))
;; force-output
(lambda (port)
(values))))
(define pipe-buffer-size 1024)
; Takes an optional size to use for the buffers. A size of #f indicates
; that buffers should be made as needed (we really need omega).
(define (make-pipe . maybe-buffer-size)
(call-with-values
(lambda ()
(parse-make-pipe-args maybe-buffer-size))
(lambda (size queue)
(make-port pipe-handler
(bitwise-ior open-input-port-status
open-output-port-status)
(make-pipe-data (make-lock) ; the lock
#f ; input condition variable
#f ; output condition variable
queue ; full buffer queue
#f) ; stashed output buffer
(make-code-vector size 0) ; input buffer
0 ; input index
0 ; input limit
(make-code-vector size 0) ; output buffer
0)))) ; output limit
(define (parse-make-pipe-args maybe-buffer-size)
(if (null? maybe-buffer-size)
(values pipe-buffer-size #f)
(let ((size (car maybe-buffer-size)))
(cond ((not size)
(values pipe-buffer-size (make-queue)))
((and (integer? size)
(exact? size)
(< 0 size))
(values size #f))
(else
(assertion-violation 'make-pipe "invalid pipe buffer size" size))))))
; These should probably be moved to I/O
(define (open-input-port? port)
(= (bitwise-and open-input-port-status
(port-status port))
open-input-port-status))
(define (open-output-port? port)
(= (bitwise-and open-output-port-status
(port-status port))
open-output-port-status))
; Won't do string->string copies.
(define (copy! from i to j count)
(if (code-vector? from)
(if (code-vector? to)
(copy-bytes! from i to j count)
(copy-bytes->chars! from i to j count))
(copy-chars->bytes! from i to j count)))
; Copied from more-port.scm.
(define (copy-bytes! from i to j count)
(let ((limit (+ count i)))
(do ((i i (+ i 1))
(j j (+ j 1)))
((= i limit))
(code-vector-set! to j (code-vector-ref from i)))))
(define (copy-chars->bytes! from i to j count)
(let ((limit (+ count i)))
(do ((i i (+ i 1))
(j j (+ j 1)))
((= i limit))
(code-vector-set! to j (char->ascii (string-ref from i))))))
(define (copy-bytes->chars! from i to j count)
(let ((limit (+ count i)))
(do ((i i (+ i 1))
(j j (+ j 1)))
((= i limit))
(string-set! to j (ascii->char (code-vector-ref from i))))))
|