Let's say no to “Transformation”, or Kafka analysis

Have you ever wondered what bugs might be hidden in the source code of projects used by large companies around the world? Don't miss the chance to get acquainted with interesting errors that the PVS-Studio static analyzer found in the source code of the Open Source Apache Kafka project.

Introduction

Apache Kafka – popular Open Source a project written primarily in Java. It is a message “broker”, that is, a data bus for various system components. Developed by LinkedIn in 2011. At the moment it is one of the most popular solutions within its genre.

Well, are you ready to look under the hood?

PS

Just in case, I’ll leave a comment here about the title – this is a reference to Franz Kafka’s work “The Metamorphosis”, where the main character turns into a beetle. Our static analyzer fights to ensure that your projects turned into a huge and scary beetle weren't like one big bug, so we say no to “Transformation”.

So, mistakes

A great joke and pain always go side by side

This quote, alas, is not mine, it belongs to John Lennon. But what does she have to do with it? The first thing I would like to show you from the analysis was a really funny, humorous mistake. However, after a long time of trying to understand why the program does not work as expected, it is quite painful to stumble upon what will be given in the example below.

@Override
public KeyValueIterator<Windowed<K>, V> backwardFetch(
  K keyFrom,
  K keyTo,
  Instant timeFrom,
  Instant timeTo) {
  ....
  if (keyFrom == null && keyFrom == null) {   // <=
    kvSubMap = kvMap;
  } else if (keyFrom == null) {
    kvSubMap = kvMap.headMap(keyTo, true);
  } else if (keyTo == null) {
    kvSubMap = kvMap.tailMap(keyFrom, true);
  } else {
    // keyFrom != null and KeyTo != null 
    kvSubMap = kvMap.subMap(keyFrom, true, keyTo, true);
  } 
  ....
}

In this fragment you can observe what happens to each of us during development – a banal typo. Obviously, the very first condition implied the following logical expression:

keyFrom == null && keyTo == null

The analyzer issued two warnings for this section of code:

V6001 There are identical sub-expressions 'keyFrom == null' to the left and to the right of the '&&' operator. ReadOnlyWindowStoreStub.java 327, ReadOnlyWindowStoreStub.java 327

V6007 Expression 'keyFrom == null' is always false. ReadOnlyWindowStoreStub.java 329

And both are fair. Such funny typos are common to every developer, but you can spend a lot of time searching for them. And, unfortunately, it will not be remembered in the best way.

Within the same class, another method contains exactly the same error. We can say with confidence that this is a copy-paste.

@Override
public KeyValueIterator<Windowed<K>, V> fetch(
  K keyFrom,
  K keyTo,
  Instant timeFrom,
  Instant timeTo) {
  ....
  NavigableMap<K, V> kvMap = data.get(now);
  if (kvMap != null) {
    NavigableMap<K, V> kvSubMap;
    if (keyFrom == null && keyFrom == null) {      // <=
      kvSubMap = kvMap;
    } else if (keyFrom == null) {
      kvSubMap = kvMap.headMap(keyTo, true);
    } else if (keyTo == null) {
      kvSubMap = kvMap.tailMap(keyFrom, true);
    } else {
      // keyFrom != null and KeyTo != null
      kvSubMap = kvMap.subMap(keyFrom, true, keyTo, true);
    }
  }
  ....
}

And the same triggers:

V6007 Expression 'keyFrom == null' is always false. ReadOnlyWindowStoreStub.java 273

V6001 There are identical sub-expressions 'keyFrom == null' to the left and to the right of the '&&' operator. ReadOnlyWindowStoreStub.java 271, ReadOnlyWindowStoreStub.java 271

But don’t be afraid, we won’t have to run our eyes over hundreds of lines of code even once. PVS-Studio copes with such simple things with a bang. How about something more challenging?

Variable synchronized

What is the keyword used for in Java? synchronized? For the purposes of this bug, I will only talk about this in the context of synchronized methods, not blocks. According to Oracle documentationkeyword synchronized allows you to provide thread-safe interaction with an object by declaring the method synchronized. If a thread accesses a synchronized method on an object, other threads that attempt to access synchronized methods on the same object will be blocked (that is, their execution will be suspended). They will be blocked until the method that was called by the first thread finishes executing. This is necessary in cases where an object is visible to more than one thread. Write and read operations on such an object should be performed only through synchronized methods.

In class Sensora simplified fragment of which is given below, this rule was violated. Write and read operations with an object field are performed both through synchronized methods and through unsynchronized ones. Such interaction with an object may result in a race condition, and the result will be unpredictable.

private final Map<MetricName, KafkaMetric> metrics;

public void checkQuotas(long timeMs) {                  // <=
  for (KafkaMetric metric : this.metrics.values()) {
    MetricConfig config = metric.config();
    if (config != null) {
      ....
    }
  }
  ....
}  

public synchronized boolean add(CompoundStat stat,      // <=
                                MetricConfig config) {       
  ....
  if (!metrics.containsKey(metric.metricName())) {         
    metrics.put(metric.metricName(), metric);            
  }  
  ....
}  

public synchronized boolean add(MetricName metricName,  // <=
                                MeasurableStat stat, 
                                MetricConfig config) {  
  if (hasExpired()) {
    return false;
  } else if (metrics.containsKey(metricName)) {
    return true;
  } else {
    ....
    metrics.put(metric.metricName(), metric);
    return true;
  }
}

This is what the analyzer triggers on this section of code looks like:

V6102 Inconsistent synchronization of the 'metrics' field. Consider synchronizing the field on all usages. Sensor.java 49, Sensor.java 254

If the state of an object can be changed by several threads at once, then the methods through which this will happen must be synchronized. If the program does not imply that multiple threads can interact with an object, making its methods synchronized is at a minimum pointless, and at most it can adversely affect the performance of your program.

This is not the only error in the program. Below is an example where the analyzer responded to exactly the same case:

private final PrefixKeyFormatter prefixKeyFormatter; 

@Override
public synchronized void destroy() {                // <=
  ....
  Bytes keyPrefix = prefixKeyFormatter.getPrefix();
  ....
}

@Override
public void addToBatch(....) {                      // <=
  physicalStore.addToBatch(
    new KeyValue<>(
    prefixKeyFormatter.addPrefix(record.key),
    record.value
    ), batch
  );
} 

@Override
public synchronized void deleteRange(....) {        // <=
  physicalStore.deleteRange(
    prefixKeyFormatter.addPrefix(keyFrom),
    prefixKeyFormatter.addPrefix(keyTo)
  );
}

@Override
public synchronized void put(....) {                // <=
  physicalStore.put(
    prefixKeyFormatter.addPrefix(key),
    value
  );
}

Analyzer message:

V6102 Inconsistent synchronization of the 'prefixKeyFormatter' field. Consider synchronizing the field on all usages. LogicalKeyValueSegment.java 60, LogicalKeyValueSegment.java 247

Iterator, iterator and more iterator…

But in this example, there will be two rather unpleasant errors within one line. I will tell you about the nature of each of them in this part of the article. But first, of course, the code itself:

private final Map<String, Uuid> topicIds = new HashMap(); 

private Map<String, KafkaFutureVoid> handleDeleteTopicsUsingNames(....) { 
  ....
  Collection<String> topicNames = new ArrayList<>(topicNameCollection);

  for (final String topicName : topicNames) {
    KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();

    if (allTopics.remove(topicName) == null) {
      ....
    } else {
      topicNames.remove(topicIds.remove(topicName));      // <=
      future.complete(null);
    }
    ....
  }
}

Here's what the analyzer tells us about this:

V6066 The type of object passed as argument is incompatible with the type of collection: String, Uuid. MockAdminClient.java 569

V6053 The 'topicNames' collection of 'ArrayList' type is modified while iteration is in progress. ConcurrentModificationException may occur. MockAdminClient.java 569

Now that's a dilemma! What is happening here, and what are we going to do about it?!

First, let's talk about collections and generics. Using generic types in conjunction with collections allows us to protect ourselves and others from ClassCastException's, as well as from cumbersome constructions where we deal with type casting.

If, when initializing a collection, we indicated what data type it would work with and tried, for example, to add an element of an incompatible type to it, the compiler simply will not compile such code.

Example:

public class Test {
  public static void main(String[] args) {
    Set<String> set = new HashSet<>();
    set.add("str");
    set.add(UUID.randomUUID()); // java.util.UUID cannot be converted to
                                // java.lang.String
  }
}

However, if we try to remove an element of incompatible type from our Set'ah, no exception will occur. The method will simply return false.

Example:

public class Test {
  public static void main(String[] args) {
    Set<String> set = new HashSet<>();
    set.add("abc");
    set.add("def");
    System.out.println(set.remove(new Integer(13))); // false
  }
}

Such an action is meaningless. Most likely, if you encounter something like this in your code, you are dealing with an error. I advise you to go back to the code at the beginning of this subchapter and try to find a place where the situation is equivalent to the one described above.

Secondly, let's talk about Iterator. Iteration through collections as a topic can be developed very, very extensively. However, in order not to bore you and not to go astray myself, I will try to touch upon only the most important points that are necessary to understand the essence of this trigger.

So how do we iterate through the collection here? Cycle for in the fragment it looks like this:

for (Type collectionElem : collection) {
  ....
}

At its core, such a cycle record for is “syntactic sugar”. This design equivalent the following:

for (Iterator<Type> iter = collection.iterator(); iter.hasNext();) {
  Type collectionElem = iter.next();
  ....
}

That is, using this entry, we implicitly work with the collection iterator. Great, that's sorted out! Now let's talk a little about ConcurrentModificationException.

ConcurrentModificationException is an exception that covers a certain range of situations both within single-threaded and multi-threaded programs. Here we are only interested in the part about single-threading. And you don’t have to go far for an explanation. I suggest you contact Oracle documentation: this exception can be thrown by a method in the case when a parallel modification of an object was detected, when the object itself does not support such modifications. In our case, in parallel with the operation of the iterator, we remove objects from the collection, to which the iterator can throw ConcurrentModificationException.

How does an iterator understand when to throw a given exception? If you look into the implementation of the collection ArrayListwe see that her ancestor AbstractList there is a field modCountwhich stores the number of modifications of the collection:

protected transient int modCount = 0;

Examples of using the counter modCount in class ArrayList:

public boolean add(E e) {
  modCount++;
  add(e, elementData, size);
  return true;
}

private void fastRemove(Object[] es, int i) {
  modCount++;
  final int newSize;
  if ((newSize = size - 1) > i)
    System.arraycopy(es, i + 1, es, i, newSize - i);
  es[size = newSize] = null;
}

That is, with each modification of the collection, the counter is incremented.

Method fastRemoveby the way, is exactly what is used in the method removewhich we use inside the loop.

Here's a small fragment of the insides ArrayList-iterator:

private class Itr implements Iterator<E> {
  ....
  int expectedModCount = modCount;            

  final void checkForComodification() {
  if (modCount != expectedModCount)               // <=
    throw new ConcurrentModificationException();
  }

  public E next() {
    checkForComodification();              
    ....
  }
    
  public void remove() {
    ....
    checkForComodification();             

    try {
      ArrayList.this.remove(lastRet);   
      ....
      expectedModCount = modCount;
    } catch (IndexOutOfBoundsException ex) {
      throw new ConcurrentModificationException();
    }
  }
  ....
  public void add(E e) {
    checkForComodification();            
    try {
      ....
      ArrayList.this.add(i, e);        
      ....
      expectedModCount = modCount;     
    } catch (IndexOutOfBoundsException ex) {
      throw new ConcurrentModificationException();
    }
  }
}

Explanation of the last fragment: if the number of modifications that happened to the collection does not coincide with the expected number of modifications (and the expected one is the one that was before the creation of the iterator + what the iterator did to it), then it is thrown ConcurrentModificationException. And this is only possible if, while iterating the collection with an iterator, we modified it directly using its own methods (that is, parallel iterator). That's all for the second trigger.

So, I told you about the analyzer messages themselves. Now let's put everything together:

We work in parallel Iterator'a trying to remove an element from the collection:

topicNames.remove(topicIds.remove(topicName)); 
// topicsNames – Collection<String>
// topicsIds – Map<String, UUID>

However, since removal in ArrayList an incompatible element is transmitted (topicIds as a result of method execution remove will return UUID object), the modification counter will not increase, but the object will not be deleted. Simply put, at the moment, the section of code that I provided is rudimentary.

I would venture to suggest that the developer’s intention is clear to me. In this case, the option to correct the two monitored positives could look, for example, like this:

Collection<String> topicNames = new ArrayList<>(topicNameCollection);

List<String> removableItems = new ArrayList<>();

for (final String topicName : topicNames) {
  KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();

  if (allTopics.remove(topicName) == null) {
    ....
  } else {
    topicIds.remove(topicName);
    removableItems.add(topicName);
    future.complete(null);
  }
  ....
}
topicNames.removeAll(removableItems);

My sweet emptiness

And of course, where would we be without everyone’s favorite null and the potential problems that come with it. I present to you for your viewing a section of code to which the analyzer reacted as follows:

V6008 Potential null dereference of 'oldMember' in function 'removeStaticMember'. ConsumerGroup.java 311, ConsumerGroup.java 323

@Override
public void removeMember(String memberId) {
  ConsumerGroupMember oldMember = members.remove(memberId);
  ....
  removeStaticMember(oldMember);
  ....
}

private void removeStaticMember(ConsumerGroupMember oldMember) {
  if (oldMember.instanceId() != null) {
    staticMembers.remove(oldMember.instanceId());
  }
}

In case in members does not contain an object with a key memberId, oldMember will refer to null. This is fraught with ejection NullPointerException in the method removeStaticMember.

Whoosh, and the parameter is checked for null:

if (oldMember != null && oldMember.instanceId() != null) {

And the next mistake within this article will be the last, but I want to end on a pleasant note. The code below (like the code at the beginning of the article) is a common, funny typo common to all of us. However, it is certainly capable of leading to unpleasant consequences.

So, I suggest you take a look:

protected SchemaAndValue roundTrip(...., SchemaAndValue input) {
  String serialized = Values.convertToString(input.schema(),
                                             input.value());

  if (input != null && input.value() != null) {   
    ....
  }
  ....
}

Yes, you read that right. In this method, we really go to the object first input contact, and then check whether it refers to null.

V6060 The 'input' reference was used before it was verified against null. ValuesTest.java 1212, ValuesTest.java 1213

Again, I will mention that such typos are normal. However, they can lead to very unpleasant consequences. And looking for such things in the code with your eyes is very, very difficult, and most importantly, very unproductive.

Conclusion

In conclusion, I would like to once again pay attention to the previous thought. Running through the code yourself and looking for all the errors that I cited is a very expensive task and not the most pleasant. Things like what I showed can sit in the code for a very, very long time (for example, the last error was committed already in 2018). This is why it is possible and necessary to use the capabilities of static analysis. For more detailed information about PVS-Studio, with the help of which the search for all the above-mentioned errors was implemented, I recommend contacting here.

That's all for me, I'll say goodbye to you. “And in case I don't see you again, good afternoon, good evening and good night.”

Oh yes, I almost forgot! Catch the link to information about free licensing for Open Source projects.

If you want to share this article with an English-speaking audience, please use the translation link: Vladislav Bogdanov. Belay the Metamorphosis: analyzing Kafka project.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *