 | Level: Intermediate Joseph Hartal (saffi@myrealbox.com), Software developer, GlobaLoop LTD Ze’ev Bubis (zeevb@myrealbox.com), Softwaredevelopment team leader, GlobaLoop LTD
01 Feb 2002 The producer-consumer scenario is one of the most-used constructs in multithreaded application development -- and herein lies the rub. Because the producer-consumer behavior can be duplicated many times throughout a single application, so can the code. Software developers Ze'ev Bubis and Saffi Hartal have created the Consumer class, which works around this problem by facilitating code reuse and simplifying code debugging and maintenance in some multithreaded applications. Share your thoughts on this article with the author and other readers in the discussion forum by clicking Discuss at the top or bottom of the article. Multithreaded applications often make use of the producer-consumer programming scenario, wherein repetitive jobs are created by a producer thread, passed to a job queue, and processed by a consumer thread. While this programming method is very useful, it often results in duplicate code, which can be a real problem to debug and maintain. To resolve this problem and facilitate code reuse, we created the Consumer class. The Consumer class contains all the code for the job queue and the consumer thread, as well as the logical glue that holds them together. This frees us to focus on the business logic -- the specifics of how the jobs should be processed -- rather than on writing lines and lines of redundant code. It also makes the task of debugging a multithreaded application much easier. In this article, we'll offer a simple observation on common thread usage in multithreaded application development, explain the producer-consumer programming scenario, and examine a real-world example to show you how the Consumer class works. Note that this article does not offer an in-depth introduction to multithreaded application development or the producer-consumer scenario; see Resources
for a listing of articles on those subjects. Multithreading basics
Multithreading is a programming technique that enables an application to handle more than one operation at the same time. Threads are commonly employed for two different types of multithreaded operation:
-
Timely events, when a job must be scheduled to take place at a specific time or at specific intervals
-
Background processing, when background events must be handled or executed in parallel to the current flow of execution
Examples of timely events include program reminders, timeout events, and repeated operations such as polling and refreshes. Examples of background processing include packets waiting to be sent or received messages waiting to be processed.
The producer-consumer relationship
The producer-consumer scenario is well suited for cases that fall into the category of background processing. These situations generally revolve around a job "producer" party and a job "consumer" party. There are other considerations regarding jobs executed in parallel. In most cases, jobs that use the same resource should be processed sequentially and in a first-come, first-served manner, which can be accomplished easily by using a single-threaded consumer. Using this approach, we deal with a single thread accessing a single resource instead of multiple threads accessing a single resource. To enable the standard consumer, a job queue is created to store all the jobs as they come in. The producer thread delivers a new object to be processed by adding it to the consumer's queue. The consumer thread then pulls each object from the queue and processes it in turn. When the queue is empty, the consumer goes to sleep. When a new object is added to the empty queue, the consumer awakes and processes the object. Because most applications favor sequential processing, the consumer is usually single-threaded.
The problem: Code duplication
Because the producer-consumer scenario is very common, it may appear several times when building an application, which results in code duplication. We realized this presented a concern during an application development process in which we employed the producer-consumer scenario multiple times. The first time we required the producer-consumer behavior, we implemented it by writing a class that employed a thread and a queue. The second time we needed this behavior, we started to implement it from scratch, but then realized we had done this before. We copied the code and modified the way the objects were processed. By the third time we implemented the producer-consumer behavior in the application, it was clear that we were duplicating far too much code. We decided that we needed a generic Consumer class that would handle all our producer-consumer scenarios.
Our solution: The Consumer class
Our objective in creating the Consumer class was to do away with the code duplication entailed in writing a new job queue and consumer thread for every producer-consumer instance in our applications. With the Consumer class in place, we only have to write the code that is specific to job processing (the business logic). This makes our code cleaner, easier to maintain, and more flexible to change. Our requirements for the Consumer class were as follows:
-
Reuse: We wanted a class that had it all. A thread, a queue, and all the logic to hold it together. This would free us to write only the code for consuming the specific jobs in the queue. (So, for example, a method called
onConsume(ObjectjobToBeConsumed) would be overloaded by the programmer using the Consumer class.)
-
Queue choice: We wanted to be able to set the queue implementation to be used by the
Consumer object. This means, however, that we must either ensure that the queue is thread-safe or use a single-threaded producer that won't collide with the consume operation. Either way, the queue has to be designed to allow for the different processes accessing its methods.
-
Consumer thread priority: We wanted to be able to set the priority the
Consumer's thread would run in.
-
Consumer thread naming: It is convenient for the thread to have a meaningful name, and it surely helps with debugging. For example, if you send a signal to the Java virtual machine, it will generate a full thread dump that is a snapshot of all the threads and their corresponding stack trace. To generate this thread dump on a Windows platform, you must enter the key sequence
<ctrl><break> in the window where the
Java program is running, or click the Close button on the window. For more information on how to
diagnose Java software problems using the full thread dump, see Resources.
 |
The class code
We used "lazy creation" to create the Consumer's thread in the method getThread(), as shown in Listing 1:
/**
* Lazy creation of the Consumer's thread.
*
* @return the Consumer's thread
*/
private Thread getThread()
{
if (_thread==null)
{
_thread = new Thread()
{
public void run()
{
Consumer.this.run();
}
};
}
return _thread;
|
The thread's run() method runs the Consumer's run() method, which is the main consumer loop, as shown in Listing 2:
/**
* Main Consumer's thread method.
*/
private void run()
{
while (!_isTerminated)
{
// job handling loop
while (true)
{
Object o;
synchronized (_queue)
{
if (_queue.isEmpty())
break;
o = _queue.remove();
}
if (o == null)
break;
onConsume(o);
}
// if we are not terminated and the queue is still empty
// then wait until new jobs arrive.
synchronized(_waitForJobsMonitor)
{
if (_isTerminated)
break;
if(_queue.isEmpty())
{
try
{
_waitForJobsMonitor.wait();
}
catch (InterruptedException ex)
{
}
}
}
}
}// run()
|
Basically, the Consumer's thread runs until there are no more jobs waiting in the queue. It then goes to sleep, only to be awakened by the first call to add(Object), which adds a new job to the queue and "kicks" the thread awake. The "sleeping" and "kicking" is done by using the wait() and notify() mechanism. The actual consumer work is handled by the OnConsume(Object) method, as shown in Listing 3:
/**
* Add an object to the Consumer.
* This is the entry point for the producer.
* After the item is added, the Consumer's thread
* will be notified.
*
* @param the object to be 'consumed' by this consumer
*/
public void add(Object o)
{
_queue.add(o);
kickThread();
}
/**
* Wake up the thread (without adding new stuff to consume)
*
*/
public void kickThread()
{
if (!this._thread.isInterrupted())
{
synchronized(_waitForJobsMonitor)
{
_waitForJobsMonitor.notify();
}
}
}
|
An example: MessagesProcessor
To show you how the Consumer class works, we'll employ a simple example. The MessagesProcessor class handles incoming messages
asynchronously (that is, without interfering with the calling thread). Its job is to print each message as it comes in. MessagesProcessor comes with an internal Consumer that deals with incoming message jobs. When a new job enters the empty queue, the Consumer calls the method processMessage(String) to process it, as shown in Listing 4:
class MessagesProcessor
{
String _name;
// anonymous inner class that supplies the consumer
// capabilities for the MessagesProcessor
private Consumer _consumer = new Consumer()
{
// that method is called on each event retrieved
protected void onConsume(Object o)
{
if (!(o instanceof String))
{
System.out.println("illegal use, ignoring");
return;
}
MessagesProcesser.this.processMessage((String)o);
}
}.setName("MessagesProcessor").init();
public void gotMessageEvent(String s)
{
_consumer.add(s);
}
private void processMessage(String s)
{
System.out.println(_name+" processed message: "+s);
}
private void terminate()
{
_consumer.terminateWait();
_name = null;
}
MessagesProcessor()
{
_name = "Example Consumer";
}
}
|
As you can see from the above code, customizing the Consumer is fairly simple. We employ an anonymous inner class to extend the Consumer class and overload the abstract method onConsume(). Thus, in our example, we simply call processMessage.
Advanced features of the Consumer class
In addition to the basic requirements we started with, we've provided the Consumer class with some advanced features that we found useful.
Event notification
-
onThreadTerminate(): This method is called just before the
Consumer is terminated. We have overridden this method for debug purposes.
-
goingToRest(): This method is called just before the Consumer thread goes to sleep (that is, just before the call to
_waitForJobsMonitor.wait()). This notification may be needed in complex cases where a consumer is required to handle a batch of processed work just before it goes to sleep.
Termination
-
terminate(): Asynchronous termination of the Consumer thread.
-
terminateWait(): Sets the calling thread to wait until the consumer
thread actually terminates.
In our example, if we used terminate() instead of terminateWait(), there would be a problem because the method onConsume() might be called after _name was set to null. This would cause the thread executing processMessage
to throw a NullPointerException.
Conclusion: Benefits of the Consumer class
The source for the Consumer class in available for download below. Please feel free to use the source and extend it as you need to. We've found the benefits of using this class for multithreaded application development to be as follows:
-
Code reuse/elimination of duplicate code: If you have the
Consumer class, you don't have to write a new consumer for every instance in your application. Given how often the producer-consumer scenario comes up in application development, this is a great timesaver. Also remember that duplicate code is fertile ground for bugs. It also makes basic code maintenance more difficult.
-
Fewer bugs: Using proofed code is a good practice to prevent bugs, especially when dealing with multithreaded applications. Because the
Consumer class has already been debugged, it is safer. The consumer also prevents thread-related bugs by acting as a safe mediator between threads and resources. The consumer sequentially accesses resources on behalf of the other threads.
-
Nice, clean code: Using the
Consumer class helps us to write simpler code, which is easier to understand and maintain. If we don't use the Consumer class, we have to write code to handle two different functions: the consuming logic (queue and thread management, synchronizing, and so on) and the code specifying the use or function of the consumer.
Many thanks to Jonathan Lifton at Allot Communications and to Dov Trietsch for their assistance with this article.
Download | Description | Name | Size | Download method |
|---|
| Sample code | j-prodcon.zip | 5 KB | HTTP |
|---|
Resources - Participate in the discussion forum.
- Alex Roetter's "Writing
multithreaded Java applications" (developerWorks, February 2001) is a good introduction to multithreaded application development with the Java Thread API.
- Brian Goetz's three-part series on multithreading (developerWorks, July 2001
through September 2001) offers practical solutions for some of the common
problems that arise in multithreaded application development. Read the whole
series:
- If Allen Holub were king, the Java language would undergo a number of significant changes to its support for multithreading. Read all about these changes in his proposal for fixing the Java language's threading model (developerWorks, October 2000).
- Eric Allen offers tips for debugging multithreaded code in his column, "Diagnosing Java Code: The Orphaned Thread bug pattern" (developerWorks, August 2001). Note that Eric employs the producer-consumer scenario in his example.
- The Java Developer Connection offers a worthwhile trail for those learning about thread synchronization. One of its key examples employs the producer-consumer scenario.
- How do we support scheduled events in Java applications? To facilitate this, Sun
Microsystems introduced the new Timer API
with JDK 1.3.
- The Java Developer Connection article, "An Introduction to Java Stack Traces," shows you how to recognize and collect the clues in a stack trace to solve your Java software problems.
- The IBM Developer
Kit for Linux, Java Technology Edition, Version 1.3 offers complete support for multithreaded application development.
- You'll find hundreds of articles about every aspect of Java programming in the developerWorks Java
technology zone.
About the authors  | 
|  |
Joseph (Saffi) Hartal is a software developer at GlobaLoop LTD. Saffi holds a BSC degree in computer science and mathematics and an MBA from Tel-Aviv University. He has been a software developer for the last 10 years, during which time he has written both real-time embedded code in C++ and Java client and server applications. Most of Saffi's time is devoted to writing infrastructure code and solving riddles. He can be reached at saffi@myrealbox.com. |
 | 
|  |
Ze'ev Bubis is a software development team leader at GlobaLoop LTD. Ze'ev holds a BSC degree in computer science and mathematics from Tel-Aviv University. He has been a software developer for the last 10 years, during which time he has written software applications for a wide variety of platforms and languages. In the last three years, Ze'ev has focused on developing both client and server applications in Java. He can be reached at zeevb@myrealbox.com.
|
Rate this page
|  |