Wednesday, July 6, 2011

Circular Buffers

(A followup with updated results is available)

An extremely useful tool in DSP programming is the circular buffer.  Basically, a circular buffer is useful for keeping an array of n elements of a data stream.  Implementing this in Haskell is not straightforward, because the usual implementation relies upon mutating arrays.  As Haskell is a pure language, mutable data can only be used in restricted contexts, which can make a mutable circular buffer somewhat awkward.  It offends my sensibilities if nothing else.

(Code presented in this article is basically the first thing I thought of.  Improvements are welcome, provided they don't detract too much from readability.  All code is placed in the public domain.)

The ideal Haskell implementation would be pure, and for certain cases a high-performance solution is available.  If you only need access to the end of the buffer (i.e. a FIFO queue),  real-time pure functional implementations are well-known (Okasaki).  But what if you want to read from the middle of the buffer?  In an imperative implementation, this is usually handled by either keeping multiple read pointers, or by using an absolute offset to the start of the buffer.  It's not clear how to apply either of these techniques to Okasaki's queue implementation though, and the obvious approach of dequeueing as many times as required has O(n) complexity.

An obvious approach is to write an implementation using mutable data.  The implementation would no longer be pure unfortunately.  Here's a simple implementation which runs in IO, based upon mutable vectors:

module Data.RingBuffer.Mutable (
  RingBuffer
 ,newInit
 ,push
 ,length
 ,(!)
)

where

import qualified Data.Vector.Unboxed.Mutable as V

import Foreign.ForeignPtr
import Foreign.Storable
import Control.Applicative
import GHC.Prim

data RingBuffer a = RB (V.MVector RealWorld a) (ForeignPtr Int)

newFp :: Storable a => a -> IO (ForeignPtr a)
newFp a = do
  fp <- mallocForeignPtr
  withForeignPtr fp $ \p -> poke p a
  return fp

readFp :: Storable a => ForeignPtr a -> IO a
readFp fp = withForeignPtr fp $ \p -> peek p

writeFp fp a = withForeignPtr fp $ \p -> poke p a

newInit :: (V.Unbox a, Num a) => Int -> IO (RingBuffer a)
newInit sz = RB <$> (V.replicate sz 0) <*> newFp 0
{-# INLINE newInit #-}

(!) :: (V.Unbox a) => RingBuffer a -> Int -> IO a
(!) (RB vec ref) ix = do
  off <- readFp ref
  let len = V.length vec
  V.unsafeRead vec ((ix+off) `rem` len)
{-# INLINE (!) #-}

push :: V.Unbox a => RingBuffer a -> a -> IO ()
push (RB vec ref) el = do
  off <- readFp ref
  let len = V.length vec
  let off' = (off+1) `rem` len
  writeFp ref off'
  V.unsafeWrite vec off el
{-# INLINE push #-}

So what would an efficient pure implementation look like?

Probably the fanciest pure data structure is the finger tree, most commonly known by the implementation in Data.Sequence.Seq.  Finger trees have many interesting attributes, including O(1) access to both ends.  They do not have O(1) access to interior elements, but better than O(n).  It's pretty simple to implement a similar interface with a Seq:

module Data.RingBuffer (
  RingBuffer
 ,new
 ,newInit
 ,push
 ,length
 ,(!)
)

where

import           Prelude hiding (length)

import qualified Data.Sequence as S

newtype RingBuffer a = RB (S.Seq a) deriving (Eq, Ord, Show)

-- | Create a new RingBuffer, initialized to all 0's, of the given size
new :: (Num a) => Int -> RingBuffer a
new = newInit 0
{-# INLINE new #-}

-- | Create a new RingBuffer from a given initial value
newInit :: a -> Int -> RingBuffer a
newInit i sz | sz <= 0 = error "can't make empty ringbuffer"
newInit i sz           = RB (S.replicate sz i)
{-# INLINE newInit #-}

-- | Get the total size of a RingBuffer.
length :: RingBuffer a -> Int
length (RB vec) = S.length vec
{-# INLINE length #-}

-- | Look up a value in a RingBuffer.
(!) :: RingBuffer a -> Int -> a
(!) (RB vec) = S.index vec
{-# INLINE (!) #-}

-- | Push a new value into a RingBuffer.  The following will hold:
--     NewRingBuffer ! 0 === added element
--     NewRingBuffer ! 1 === OldRingBuffer ! 0
push :: RingBuffer a -> a -> RingBuffer a
push (RB vec) el   = case S.viewr vec of
  v' S.:> _ -> RB $ el S.<| v'
  _         -> error "internal error"
{-# INLINE push #-}
Ok, so it's not quite the same, but close enough.

So how does this compare, performance-wise?  I whipped up some criterion code to check it out.


module Main where

import Criterion.Main
import System.Random
import Control.Monad

import qualified Data.RingBuffer.Mutable as RM
import qualified Data.RingBuffer as R
import Data.Vector.Unboxed (Unbox)
import System.IO.Unsafe

rndSeed = 49872

-- test performance as a queue
queueMut :: (Unbox a, Num a) => Int -> Int -> a -> IO ()
queueMut reps sz a = do
  buf <- RM.newInit sz
  replicateM_ reps (buf RM.! (sz-1) >> RM.push buf a)

-- | test performance of multiple reads
multireadMut :: (Unbox a, Num a) => Int -> Int -> a -> IO ()
multireadMut nReads sz a = do
  let rs = take nReads $ randomRs (0,sz-1) $ mkStdGen rndSeed
  buf <- RM.newInit sz
  replicateM_ iters $ do
    mapM_ (\ix -> buf RM.! ix) rs
    RM.push buf a

queueP :: Int -> Int -> a -> a
queueP reps sz a = fst $ (iterate go (a, R.newInit a sz)) !! reps
 where
  go (a', buf) = a' `seq` (buf R.! (sz-1), R.push buf a)

multireadP :: Int -> Int -> a -> [a]
multireadP nReads sz a = fst $ (iterate go (replicate nReads a, R.newInit a sz)) !! iters
 where
  go (last, buf) = length last `seq` (map (buf R.!) rs, R.push buf a)
  rs = take nReads $ randomRs (0,sz-1) $ mkStdGen rndSeed
  

numReps = 100
numReads = 50

iters = 10000

main = defaultMain
  [
   bgroup "Mutable/Int" [
     bgroup "Queue" [
       bench "8"      $ queueMut numReps 8     (10::Int)
      ,bench "128"    $ queueMut numReps 128    (10::Int)
      ,bench "1024"   $ queueMut numReps 1024   (10::Int)
      ,bench "10240"  $ queueMut numReps 10240  (10::Int)
      ,bench "102400" $ queueMut numReps 102400 (10::Int)
     ]
     ,bgroup "Multiread" [
       bench "8"     $ multireadMut numReads 8     (10::Int)
      ,bench "128"    $ multireadMut numReads 128    (10::Int)
      ,bench "1024"   $ multireadMut numReads 1024   (10::Int)
      ,bench "10240"  $ multireadMut numReads 10240  (10::Int)
      ,bench "102400" $ multireadMut numReads 102400 (10::Int)
      ]
   ]
  ,bgroup "Mutable/Double" [
     bgroup "Queue" [
       bench "8"     $ queueMut numReps 8     (10::Double)
      ,bench "128"    $ queueMut numReps 128    (10::Double)
      ,bench "1024"   $ queueMut numReps 1024   (10::Double)
      ,bench "10240"  $ queueMut numReps 10240  (10::Double)
      ,bench "102400" $ queueMut numReps 102400 (10::Double)
     ]
     ,bgroup "Multiread" [
       bench "8"     $ multireadMut numReads 8     (10::Double)
      ,bench "128"    $ multireadMut numReads 128    (10::Double)
      ,bench "1024"   $ multireadMut numReads 1024   (10::Double)
      ,bench "10240"  $ multireadMut numReads 10240  (10::Double)
      ,bench "102400" $ multireadMut numReads 102400 (10::Double)
     ]
   ]
  ,bgroup "Pure/Int" [
     bgroup "Queue" [
       bench "8"     $ whnf (queueP numReps 8)     (10::Int)
      ,bench "128"    $ whnf (queueP numReps 128)    (10::Int)
      ,bench "1024"   $ whnf (queueP numReps 1024)   (10::Int)
      ,bench "10240"  $ whnf (queueP numReps 10240)  (10::Int)
      ,bench "102400" $ whnf (queueP numReps 102400) (10::Int)
     ]
     ,bgroup "Multiread" [
       bench "8"     $ nf (multireadP numReads 8)     (10::Int)
      ,bench "128"    $ nf (multireadP numReads 128)    (10::Int)
      ,bench "1024"   $ nf (multireadP numReads 1024)   (10::Int)
      ,bench "10240"  $ nf (multireadP numReads 10240)  (10::Int)
      ,bench "102400" $ nf (multireadP numReads 102400) (10::Int)
     ]
   ]
  ,bgroup "Pure/Double" [
     bgroup "Queue" [
       bench "8"     $ whnf (queueP numReps 8)     (10::Double)
      ,bench "128"    $ whnf (queueP numReps 128)    (10::Double)
      ,bench "1024"   $ whnf (queueP numReps 1024)   (10::Double)
      ,bench "10240"  $ whnf (queueP numReps 10240)  (10::Double)
      ,bench "102400" $ whnf (queueP numReps 102400) (10::Double)
     ]
     ,bgroup "Multiread" [
       bench "8"     $ nf (multireadP numReads 8)     (10::Double)
      ,bench "128"    $ nf (multireadP numReads 128)    (10::Double)
      ,bench "1024"   $ nf (multireadP numReads 1024)   (10::Double)
      ,bench "10240"  $ nf (multireadP numReads 10240)  (10::Double)
      ,bench "102400" $ nf (multireadP numReads 102400) (10::Double)
     ]
   ]
  ]

A few points about the benchmarks.  First, both queue and multiread perform multiple cycles in each benchmark.  Secondly, the pure benchmarks both seq outputs at each cycle to ensure that work is actually performed.  So how do these implementations compare?
It looks like not only is the Seq-based implementation pure, it outperforms my mutable-vector implementation and scales better too.  Completely unexpected.

Unless somebody points out a major flaw in my testing methodology or implementations, I'll be using Data.Sequence.Seq for my circular buffer needs from now on.

Benchmark data

Edit: thanks to Henning Thielemann for suggested improvements.  I've updated the benchmark code with his suggestions regarding mkStdGen.  I also have updated the mutable buffer to use a ForeignPtr to hold the offset rather than an IORef.  The performance is better, but not drastically so. I haven't put up new images yet, although I have uploaded the new benchmark data.