splitjoin   xtlang


Defined in:  https://github.com/digego/extempore/tree/v0.8.9/libs/contrib/stream.xtm

Implementation

(bind-func splitjoin:[[[void]*,!a,!b]*,DUPLICATE_RR*,[[void]*,!a,!b]*,i64]*
  (lambda (style constructor multiplicity)
    (let ((pop:i64 (if (refcheck constructor pop) (constructor.pop) 0))
    (peek:i64 (if (refcheck constructor peek) (constructor.peek) 0))
    (lpush:i64 (if (refcheck constructor push) (constructor.push) 0))
    (push:i64 (* multiplicity lpush))
    (prepush (if (refcheck constructor prepush) (constructor.prepush) push))
    (prepeek (if (refcheck constructor prepeek) (constructor.prepeek) peek))
    (prepop (if (refcheck constructor prepop) (constructor.prepop) pop))
    (pre:i1 (if (> (+ prepush prepeek prepop) 0) #t #f))
    (cnt 1)
    (lwork #f))
      (lambda (input output)
  (let ((obufs (map (lambda (idx) (copy output)) (range 0 multiplicity)))
        (ibufs (map (lambda (idx) (copy input)) (range 0 multiplicity)))
        (i 0)
        (filter_s (map (lambda (i o) (constructor i o)) ibufs obufs))
        (make_lwork (lambda (rr)
                (lambda ()
                  (let ((ii 0))
              ;; duplicate input buffer to all ibufs
              (for-each (lambda (b)
                    (dotimes (ii (* rr pop)) ;; why is this pop and not peek!
                      (qbuf_push b (qbuf_peek input ii))))
                  ibufs)
              (dotimes (ii (* rr pop)) (qbuf_pop input)) ;; shouldn't this be prepop?
              ;; run filters multi-core
              (sync (for-each (lambda (func)
                    (spawn
                     (lambda ()
                       (doloop (iii rr) (func)))))
                  filter_s))
              ;; round robin merge all obufs to output
              (dotimes (ii (* rr lpush))
                (for-each (lambda (b)
                      (qbuf_push output (qbuf_pop b)))
                    obufs))
              void))))
        (prework (lambda ()
             ;; duplicate input buffer to all ibufs
             (for-each (lambda (b)
                   (dotimes (i prepeek)
                     (qbuf_push b (qbuf_peek input i))))
                 ibufs)
             (dotimes (i pop) (qbuf_pop input))
             ;; run filters multi-core
       (for-each (lambda (fil)
             (let ((w:[void]* (if pre (fil.prework) fil)))
               (w)))
           filter_s)
             ;; round robin merge all obufs to output
             (dotimes (i prepush)
               (for-each (lambda (b)
                     (qbuf_push output (qbuf_pop b)))
                   obufs))
             void)))
    (lambda ()
      ;; duplicate input buffer to all ibufs
      (for-each (lambda (ibuffer)
      (dotimes (i pop) ;; why is this pop and not peek!
        (qbuf_push ibuffer (qbuf_peek input i))))
          ibufs)
      (dotimes (i pop) (qbuf_pop input)) ;; remove pop'd items from input
      ;; run filters multi-core
      (sync (for-each (lambda (f)
                (spawn (lambda () (f))))
              filter_s))
      ;; round robin merge all obufs to output
      (dotimes (i lpush)
        (for-each (lambda (obuffer)
        (qbuf_push output (qbuf_pop obuffer)))
      obufs))
      void))))))


Back to Index