Skip to main content

skip to main content

developerWorks  >  Java technology  >

Ease your multithreaded application programming

The Consumer class can simplify implementation of the producer-consumer behavior

developerWorks
Document options

Document options requiring JavaScript are not displayed

Discuss

Sample code


Rate this page

Help us improve this content


Level: Intermediate

Joseph Hartal (saffi@myrealbox.com), Software developer, GlobaLoop LTD
Ze’ev Bubis (zeevb@myrealbox.com), Softwaredevelopment team leader, GlobaLoop LTD

01 Feb 2002

The producer-consumer scenario is one of the most-used constructs in multithreaded application development -- and herein lies the rub. Because the producer-consumer behavior can be duplicated many times throughout a single application, so can the code. Software developers Ze'ev Bubis and Saffi Hartal have created the Consumer class, which works around this problem by facilitating code reuse and simplifying code debugging and maintenance in some multithreaded applications. Share your thoughts on this article with the author and other readers in the discussion forum by clicking Discuss at the top or bottom of the article.

Multithreaded applications often make use of the producer-consumer programming scenario, wherein repetitive jobs are created by a producer thread, passed to a job queue, and processed by a consumer thread. While this programming method is very useful, it often results in duplicate code, which can be a real problem to debug and maintain.

To resolve this problem and facilitate code reuse, we created the Consumer class. The Consumer class contains all the code for the job queue and the consumer thread, as well as the logical glue that holds them together. This frees us to focus on the business logic -- the specifics of how the jobs should be processed -- rather than on writing lines and lines of redundant code. It also makes the task of debugging a multithreaded application much easier.

In this article, we'll offer a simple observation on common thread usage in multithreaded application development, explain the producer-consumer programming scenario, and examine a real-world example to show you how the Consumer class works. Note that this article does not offer an in-depth introduction to multithreaded application development or the producer-consumer scenario; see Resources for a listing of articles on those subjects.

Multithreading basics

Multithreading is a programming technique that enables an application to handle more than one operation at the same time. Threads are commonly employed for two different types of multithreaded operation:

  • Timely events, when a job must be scheduled to take place at a specific time or at specific intervals

  • Background processing, when background events must be handled or executed in parallel to the current flow of execution

Examples of timely events include program reminders, timeout events, and repeated operations such as polling and refreshes. Examples of background processing include packets waiting to be sent or received messages waiting to be processed.



Back to top


The producer-consumer relationship

The producer-consumer scenario is well suited for cases that fall into the category of background processing. These situations generally revolve around a job "producer" party and a job "consumer" party. There are other considerations regarding jobs executed in parallel. In most cases, jobs that use the same resource should be processed sequentially and in a first-come, first-served manner, which can be accomplished easily by using a single-threaded consumer. Using this approach, we deal with a single thread accessing a single resource instead of multiple threads accessing a single resource.

To enable the standard consumer, a job queue is created to store all the jobs as they come in. The producer thread delivers a new object to be processed by adding it to the consumer's queue. The consumer thread then pulls each object from the queue and processes it in turn. When the queue is empty, the consumer goes to sleep. When a new object is added to the empty queue, the consumer awakes and processes the object. Because most applications favor sequential processing, the consumer is usually single-threaded.



Back to top


The problem: Code duplication

Because the producer-consumer scenario is very common, it may appear several times when building an application, which results in code duplication. We realized this presented a concern during an application development process in which we employed the producer-consumer scenario multiple times.

The first time we required the producer-consumer behavior, we implemented it by writing a class that employed a thread and a queue. The second time we needed this behavior, we started to implement it from scratch, but then realized we had done this before. We copied the code and modified the way the objects were processed. By the third time we implemented the producer-consumer behavior in the application, it was clear that we were duplicating far too much code. We decided that we needed a generic Consumer class that would handle all our producer-consumer scenarios.



Back to top


Our solution: The Consumer class

Our objective in creating the Consumer class was to do away with the code duplication entailed in writing a new job queue and consumer thread for every producer-consumer instance in our applications. With the Consumer class in place, we only have to write the code that is specific to job processing (the business logic). This makes our code cleaner, easier to maintain, and more flexible to change.

Our requirements for the Consumer class were as follows:

  • Reuse: We wanted a class that had it all. A thread, a queue, and all the logic to hold it together. This would free us to write only the code for consuming the specific jobs in the queue. (So, for example, a method called onConsume(ObjectjobToBeConsumed) would be overloaded by the programmer using the Consumer class.)

  • Queue choice: We wanted to be able to set the queue implementation to be used by the Consumer object. This means, however, that we must either ensure that the queue is thread-safe or use a single-threaded producer that won't collide with the consume operation. Either way, the queue has to be designed to allow for the different processes accessing its methods.

  • Consumer thread priority: We wanted to be able to set the priority the Consumer's thread would run in.

  • Consumer thread naming: It is convenient for the thread to have a meaningful name, and it surely helps with debugging. For example, if you send a signal to the Java virtual machine, it will generate a full thread dump that is a snapshot of all the threads and their corresponding stack trace. To generate this thread dump on a Windows platform, you must enter the key sequence <ctrl><break> in the window where the Java program is running, or click the Close button on the window. For more information on how to diagnose Java software problems using the full thread dump, see Resources.


Back to top


The class code

We used "lazy creation" to create the Consumer's thread in the method getThread(), as shown in Listing 1:

     /**
       * Lazy creation of the Consumer's thread.
       *
       * @return  the Consumer's thread
       */
      private Thread getThread()
      {
         if (_thread==null)
         {
            _thread = new Thread()
            {
               public void run()
               {
                  Consumer.this.run();
               }
            };
         }
         return _thread;

The thread's run() method runs the Consumer's run() method, which is the main consumer loop, as shown in Listing 2:

     /**
       *  Main Consumer's thread method.
       */
      private void run()
      {
         while (!_isTerminated)
         {
            // job handling loop
            while (true)
            {
               Object o;
               synchronized (_queue)
               {
                  if (_queue.isEmpty())
                     break;
                  o = _queue.remove();
               }
               if (o == null)
                  break;
               onConsume(o);
            }

            // if we are not terminated and the queue is still empty
            // then wait until new jobs arrive.

            synchronized(_waitForJobsMonitor)
            {
               if (_isTerminated)
                  break;
               if(_queue.isEmpty())
               {
                  try
                  {
                     _waitForJobsMonitor.wait();
                  }
                  catch (InterruptedException ex)
                  {
                  }
               }
            }
         }
}// run()

Basically, the Consumer's thread runs until there are no more jobs waiting in the queue. It then goes to sleep, only to be awakened by the first call to add(Object), which adds a new job to the queue and "kicks" the thread awake.

The "sleeping" and "kicking" is done by using the wait() and notify() mechanism. The actual consumer work is handled by the OnConsume(Object) method, as shown in Listing 3:

     /**
      * Add an object to the Consumer.
      * This is the entry point for the producer.
      * After the item is added, the Consumer's thread
      * will be notified.
      *
      * @param  the object to be 'consumed' by this consumer
      */
      public void add(Object o)
      {
         _queue.add(o);
         kickThread();
      }

      /**
       * Wake up the thread (without adding new stuff to consume)
       *
       */
      public void kickThread()
      {
         if (!this._thread.isInterrupted())
         {
            synchronized(_waitForJobsMonitor)
            {
               _waitForJobsMonitor.notify();
            }
         }
      }



Back to top


An example: MessagesProcessor

To show you how the Consumer class works, we'll employ a simple example. The MessagesProcessor class handles incoming messages asynchronously (that is, without interfering with the calling thread). Its job is to print each message as it comes in. MessagesProcessor comes with an internal Consumer that deals with incoming message jobs. When a new job enters the empty queue, the Consumer calls the method processMessage(String) to process it, as shown in Listing 4:

      class MessagesProcessor
      {
         String _name;
         // anonymous inner class that supplies the consumer
         // capabilities for the MessagesProcessor
         private Consumer _consumer = new Consumer()
         {
            // that method is called on each event retrieved
            protected void onConsume(Object o)
            {
               if (!(o instanceof String))
               {
                  System.out.println("illegal use, ignoring");
                  return;
               }
               MessagesProcesser.this.processMessage((String)o);
            }
         }.setName("MessagesProcessor").init();

         public void gotMessageEvent(String s)
         {
            _consumer.add(s);
         }
         private void processMessage(String s)
         {
            System.out.println(_name+" processed message: "+s);
         }

         private void terminate()
         {
           _consumer.terminateWait();
           _name = null;
         }

         MessagesProcessor()
         {
            _name = "Example Consumer";
         }
      }

As you can see from the above code, customizing the Consumer is fairly simple. We employ an anonymous inner class to extend the Consumer class and overload the abstract method onConsume(). Thus, in our example, we simply call processMessage.



Back to top


Advanced features of the Consumer class

In addition to the basic requirements we started with, we've provided the Consumer class with some advanced features that we found useful.

Event notification

  • onThreadTerminate(): This method is called just before the Consumer is terminated. We have overridden this method for debug purposes.

  • goingToRest(): This method is called just before the Consumer thread goes to sleep (that is, just before the call to _waitForJobsMonitor.wait()). This notification may be needed in complex cases where a consumer is required to handle a batch of processed work just before it goes to sleep.

Termination

  • terminate(): Asynchronous termination of the Consumer thread.

  • terminateWait(): Sets the calling thread to wait until the consumer thread actually terminates.

In our example, if we used terminate() instead of terminateWait(), there would be a problem because the method onConsume() might be called after _name was set to null. This would cause the thread executing processMessage to throw a NullPointerException.



Back to top


Conclusion: Benefits of the Consumer class

The source for the Consumer class in available for download below. Please feel free to use the source and extend it as you need to. We've found the benefits of using this class for multithreaded application development to be as follows:

  • Code reuse/elimination of duplicate code: If you have the Consumer class, you don't have to write a new consumer for every instance in your application. Given how often the producer-consumer scenario comes up in application development, this is a great timesaver. Also remember that duplicate code is fertile ground for bugs. It also makes basic code maintenance more difficult.

  • Fewer bugs: Using proofed code is a good practice to prevent bugs, especially when dealing with multithreaded applications. Because the Consumer class has already been debugged, it is safer. The consumer also prevents thread-related bugs by acting as a safe mediator between threads and resources. The consumer sequentially accesses resources on behalf of the other threads.

  • Nice, clean code: Using the Consumer class helps us to write simpler code, which is easier to understand and maintain. If we don't use the Consumer class, we have to write code to handle two different functions: the consuming logic (queue and thread management, synchronizing, and so on) and the code specifying the use or function of the consumer.

Many thanks to Jonathan Lifton at Allot Communications and to Dov Trietsch for their assistance with this article.




Back to top


Download

DescriptionNameSizeDownload method
Sample codej-prodcon.zip5 KBHTTP
Information about download methods


Resources



About the authors

Photo of Joseph Hartal

Joseph (Saffi) Hartal is a software developer at GlobaLoop LTD. Saffi holds a BSC degree in computer science and mathematics and an MBA from Tel-Aviv University. He has been a software developer for the last 10 years, during which time he has written both real-time embedded code in C++ and Java client and server applications. Most of Saffi's time is devoted to writing infrastructure code and solving riddles. He can be reached at saffi@myrealbox.com.


Photo of Zeev Bubis

Ze'ev Bubis is a software development team leader at GlobaLoop LTD. Ze'ev holds a BSC degree in computer science and mathematics from Tel-Aviv University. He has been a software developer for the last 10 years, during which time he has written software applications for a wide variety of platforms and languages. In the last three years, Ze'ev has focused on developing both client and server applications in Java. He can be reached at zeevb@myrealbox.com.




Rate this page


Please take a moment to complete this form to help us better serve you.



YesNoDon't know
 


 


12345
Not
useful
Extremely
useful
 


Back to top