Producer vs Consumer - Code Munchie

Modeling the KrustyKrab in Go

Posted by     Charlie on Thursday, April 4, 2019

TOC

The Basics

If you ever watched Nickelodeon in the mid 90’s you might remember the Help Wanted Sponge Bob episode, in this episode the main protagonist saves the KrustyKrab after thousands of anchovies come clamoring for food and start destroying the restaurant, Spongebob proceeds to cook hamburger batches and feeding them to the fish so they can leave satisfied.

Help Wanted

In computing we’ve got a similar problem called the Producer-Consumer or Bounded-Buffer problem, where we have a producer (Spongebob) and a consumer (Anchovies) but instead of a trashed KrustyKrab we could end with a deadlock and a frozen app if we failed to feed the computer correctly.

A comprehensive explanation of this computing problem would be:

  • Two processes, the producer and the consumer, who share a common, fixed-size buffer used as a queue. The producer’s job is to generate data, put it into the buffer, and start again. At the same time, the consumer is consuming the data (i.e., removing it from the buffer), one piece at a time. - Wikipedia

Note: This definition poses the problem with two processes, one producer and one consumer, but the problem can be expanded to multiple producer and consumer processes.

What this basically means is that we have 2 types of processes, one is supposed to produce something (let’s say hamburgers) and store it in a fixed sized location (like a restaurant) while the other is meant to consume said resource, simple enough right? Well…not really, unlike Nickelodeon’s universe your computer has limitations and that means a couple of things could go wrong…

Memory Limitations

overflow

Let’s be honest here, hamburgers are not infinite as much as we would like that to be the case. In real life our computer has limited memory and thus limited resources, that means that our buffer has to have a maximum capacity and the producer can’t and shouldn’t produce more than the buffer can handle, it also means our consumer can’t eat when there’s nothing to eat.

Race Conditions

impact

Since we don’t live on a quantum level our hamburgers can’t be cooked and eaten at the same time, for that reason we need to let the producer create resources and wait until it finishes before consuming said resources.

If a consumer and a producer tried to access and modify shared data at the same time then a race condition would occur and a bug could arise.

Deadlocks

too much politeness

When all members of a process group are waiting for each other to change state or take action, such as releasing a lock, then a deadlock occurs and the program halts.

Diving into the Code

This little article will present to you the Mutex approach to solving this problem in Go, the other possible approach would be to use Go’s standard channels, however I’ll just stick to the Mutex solution as I feel is a bit more streamlined for someone who’s trying to understand and find a quick solution to the standard producer-consumer problem. If you’d like to know when to use Mutex and when to use channels you might find these resources useful: this article and this git page.

If I have some free time I might write another small munchie of this problem using a Channel

Mutex Buffer Structure

In this case, our program will have two basic processes, the producer and the consumer. The producer needs to lock the buffer and check if it is full and if there’s enough space to insert more data, in which case he will either insert data or pass to finally unlock the buffer. The consumer process needs to lock the buffer and make sure it’s not empty, in which case it will proceed to consume a limited amount of resources.

  • By using a Mutex lock we make sure we don’t have any race conditions.
  • By checking the queue size and free space we make sure we don’t exceed its capacity or try to read from an empty buffer.

The Buffer Struct

type Buffer struct {
  mux sync.Mutex // Mutex Lock
  MaxSize int
  Queue *list.List
}

I wrote a buffer initializer function to easily create a new buffer with a specific size:

func NewBuffer(size int) *Buffer {
  b := new(Buffer)
  b.MaxSize = size
  b.Queue = list.New()
  return b
}

The function IsFull() method returns true if the buffer is full and the number of available space considering the buffer’s maximum size.

func (b Buffer) IsFull() (bool, int) {
  return b.Queue.Len() >= b.MaxSize, b.MaxSize - b.Queue.Len()
}

And the IsEmpty() method returns true if the buffer is empty and the number of available products.

func (b Buffer) IsEmpty() (bool, int) {
  return b.Queue.Len() == 0, b.Queue.Len()
}

The Producer & Consumer Entities

Since both producer and consumer entities have a fixed consumption and production rate we can simplify our code and just have one structure for both.

This struct has a SleepTime which pauses the writing/reading to the buffer to emulate data processing, and an Active property to tell the entity when to stop.

type Entity struct {
  Active bool
  BatchSize int
  SleepTime int
}

Producing

As established earlier, the producer process must do the following:

  • Loop while the entity is active.
  • Lock our shared resource (the buffer).
  • Check if the buffer is not full and that Queueing won’t end up overflowing the buffer.
  • Queue product
  • Unlock our shared resource
  • Pause the process (Optional)
func (p Entity) Produce(buffer *Buffer) {
  for p.Active {
    buffer.mux.Lock()
    full, freeSpace := buffer.IsFull()
    if !full && freeSpace >= p.BatchSize {
      fmt.Printf("Producer Queueing %d -> ", p.BatchSize) // Optional
      for i := p.BatchSize; i > 0; i-- {
        buffer.Queue.PushBack(1) // Queue a 1 instead of a hamburger
      }
      fmt.Println(buffer) // Optional
    }
    buffer.mux.Unlock()
    time.Sleep(time.Duration(p.SleepTime) * time.Millisecond) // Optional
  }
}

Consuming

Consuming then becomes analogue to producing, the only differences being in the if conditional and dequeueing process.

func (c Entity) Consume(buffer *Buffer) {
  for c.Active {
    buffer.mux.Lock()
    empty, itemCount := buffer.IsEmpty()
    if !empty && itemCount >= c.BatchSize {
      fmt.Printf("Consumer Dequeueing %d -> ", c.BatchSize) // Optional
      for i := c.BatchSize; i > 0; i-- {
        e := buffer.Queue.Front()
        buffer.Queue.Remove(e)
      }
      fmt.Println(buffer) // Optional
    }
    buffer.mux.Unlock()
    time.Sleep(time.Duration(c.SleepTime) * time.Millisecond) // Optional
  }
}

End Result & Pitfalls

Once we have defined our structures and methods correctly we can proceed to write the main process, here we create a buffer of size 10, an active producer entity which will output 3 products every 5 seconds, and an active consumer which will consume 2 products every second.

func main() {
  buffer := NewBuffer(10)
  p := Entity{true, 3, 5000}
  go p.Produce(buffer)
  c := Entity{true, 2, 1000}
  go c.Consume(buffer)
  fmt.Scanln() // Check for any input to end the main proc
  p.Active, c.Active = false, false
}

You can tweak these parameters any way you please and the output will change accordingly.

Output

main-prod

What if we consumed in twice the time we produced?

duble-prod

Awesome isn’t it?! But what if we had multiple consumers? or multiple producers?

Well then we would have an issue because Mutex is mainly useful to save state and it locks the whole buffer when an entity is either producing or consuming from it, that isn’t very efficient because it makes other free consumers and producers unable to make use of the buffer until unlocked making our solution impractical.

I will explore what can be done to improve this model, GoLang channels maybe…Anyway that’s all the time I’ve got today, I really wan’t to simplify the code next week to reach an optimal solution, that’s all for now folks, cheers!

Disclaimer: This is just an example of me exploring an interesting problem/subject as my first entry to the Code-Munchies section and it must not be taken as the best or most efficient solution.