Big / Bug Data: Analyzing Apache Flink Source Code

image1.png

Big Data applications process huge amounts of information, often in real time. Naturally, such applications must be highly reliable so that no error in the code could interfere with data processing. To achieve high reliability, it is necessary to closely monitor the quality of the code of projects developed for this area. The PVS-Studio static analyzer deals with this problem. Today, the Apache Flink project developed by the Apache Software Foundation, one of the leaders in the Big Data software market, was chosen as a test subject for the analyzer.

What is Apache Flink? It is an open-source framework for distributed processing of large amounts of data. It was developed as an alternative to Hadoop MapReduce in 2010 at the Technical University of Berlin. The framework is based on a distributed execution engine for batch and stream data processing applications. This engine is written in Java and Scala languages. Today Apache Flink can be used in projects written using Java, Scala, Python and even SQL.

Project analysis

Having downloaded the source code of the project, I started building the project with the command ‘mvn clean package -DskipTests’ specified in the instructions for Github… While the assembly was in progress, I used the utility CLOC found out that the project has 10838 Java files, which have about 1.3 million lines of code. Moreover, there were 3833 test Java files, which is more than 1/3 of all Java files. I also noticed that the project uses the FindBugs static code analyzer and the Cobertura utility, which provides information on code coverage by tests. With all of this in mind, it becomes clear that the Apache Flink developers carefully monitored code quality and test coverage during development.

After a successful build, I opened the project in IntelliJ IDEA and ran the analysis using the plugin PVS-Studio for IDEA and Android Studio… The analyzer warnings were distributed as follows:

  • 183 High;
  • 759 Medium;
  • 545 Low.

About 2/3 of the PVS-Studio analyzer triggers were assigned to test files. Considering this fact and the size of the project’s codebase, we can say that the Apache Flink developers managed to keep the code quality at their best.

Having studied the analyzer warnings in more detail, I have chosen the most interesting ones in my opinion. So let’s see what PVS-Studio managed to find in this project!

Just a little carelessness

V6001 There are identical sub-expressions ‘processedData’ to the left and to the right of the ‘==’ operator. CheckpointStatistics.java (229)

@Override
public boolean equals(Object o) 
{
  ....
  CheckpointStatistics that = (CheckpointStatistics) o;
  return id == that.id &&
    savepoint == that.savepoint &&
    triggerTimestamp == that.triggerTimestamp &&
    latestAckTimestamp == that.latestAckTimestamp &&
    stateSize == that.stateSize &&
    duration == that.duration &&
    alignmentBuffered == that.alignmentBuffered &&
    processedData == processedData &&                // <=
    persistedData == that.persistedData &&
    numSubtasks == that.numSubtasks &&
    numAckSubtasks == that.numAckSubtasks &&
    status == that.status &&
    Objects.equals(checkpointType, that.checkpointType) &&
    Objects.equals(
      checkpointStatisticsPerTask, 
      that.checkpointStatisticsPerTask);
}

Against the background of other expressions in return this error is not very conspicuous. When overriding a method equals for class CheckpointStatistics the programmer made a mistake in the expression processedData == processedDatawhich does not make sense because it is always true. Similar to the rest of the expressions in return the fields of the current object should have been compared this and object that: processedData == that.processedData… This situation is one of the typical error patterns found in comparison functions, which are described in detail in the article “Evil lives in comparison functions“. And so it turns out that just” a little inattention “broke the logic of checking the equivalence of class objects CheckpointStatistics

Expression is always true

V6007 Expression ‘input2.length> 0’ is always true. Operator.java (283)

public static <T> Operator<T> createUnionCascade(Operator<T> input1, 
                                                 Operator<T>... input2) 
{
  if (input2 == null || input2.length == 0) 
  {
    return input1;                                // <=
  } 
  else if (input2.length == 1 && input1 == null) 
  {
    return input2[0];
  }
  ....
  if (input1 != null) 
  {
    ....
  } 
  else if (input2.length > 0 && input2[0] != null) // <=
  {
    ....
  } 
  else 
  {
    ....
  }
}

In this method, the analyzer turned out to be more attentive than a person, which he decided to report in his own peculiar manner, indicating that the expression input2.length> 0 will always be true. The reason is that if the length of the array input2 will be equal to 0, then the condition input2 == null || input2.length == 0 the first if in the method will be true, and the execution of the method will be interrupted before reaching the line with the expression input2.length> 0

All-seeing analyzer

V6007 Expression ‘slotSharingGroup == null’ is always false. StreamGraphGenerator.java (510)

private <T> Collection<Integer> transformFeedback(....)
{
  ....
  String slotSharingGroup = determineSlotSharingGroup(null, allFeedbackIds);
  if (slotSharingGroup == null)
  {
    slotSharingGroup = "SlotSharingGroup-" + iterate.getId();
  }
  ....
}

The analyzer reported that the expression slotSharingGroup == null is always false. And this suggests that the method determineSlotSharingGroup will never return null… Is the analyzer so smart that it was able to calculate all the values ​​that this method can return? Let’s better check everything ourselves:

public class StreamGraphGenerator 
{
  ....
  public static final String DEFAULT_SLOT_SHARING_GROUP = "default";
  ....
  private String determineSlotSharingGroup(String specifiedGroup, 
                                           Collection<Integer> inputIds) 
  {
    if (specifiedGroup != null)
    {
      return specifiedGroup; // <= 1
    }
    else
    {
      String inputGroup = null;
      for (int id: inputIds)
      {
        String inputGroupCandidate = streamGraph.getSlotSharingGroup(id);
        if (inputGroup == null)
        {
          inputGroup = inputGroupCandidate;
        }
        else if (!inputGroup.equals(inputGroupCandidate))
        {
          return DEFAULT_SLOT_SHARING_GROUP; // <= 2
        }
      }
      return inputGroup == null 
             ? DEFAULT_SLOT_SHARING_GROUP 
             : inputGroup; // <= 3
    }
  }
  ....
}

Let’s go through all the return and see what this method can return:

  • In the first return method argument will return specifiedGroup, but only if it is not equal null
  • return in a loop for will return the value of the static final field DEFAULT_SLOT_SHARING_GROUPinitialized with a string literal;
  • And the last return in the method will return the value of the variable inputGroupif it doesn’t equal null… Otherwise, the field value will be returned DEFAULT_SLOT_SHARING_GROUP

It turns out that the analyzer was really able to calculate the impossibility of returning null from method determineSlotSharingGroup and warned us about this, pointing out the pointlessness of checking slotSharingGroup == null… And although this situation is not erroneous, such additional protection of the analyzer will be able to detect an error in some other case. For example, when you need a method to return null under certain conditions.

Collect them all

V6007 Expression ‘currentCount <= lastEnd' is always true. CountSlidingWindowAssigner.java (75)V6007 Expression ‘lastStart <= currentCount' is always true. CountSlidingWindowAssigner.java (75)

@Override
public Collection<CountWindow> assignWindows(....) throws IOException 
{
  Long countValue = count.value();
  long currentCount = countValue == null ? 0L : countValue;
  count.update(currentCount + 1);
  long lastId = currentCount / windowSlide;
  long lastStart = lastId * windowSlide;
  long lastEnd = lastStart + windowSize - 1;
  List<CountWindow> windows = new ArrayList<>();
  while (lastId >= 0 && 
         lastStart <= currentCount && 
         currentCount <= lastEnd) 
  {
    if (lastStart <= currentCount && currentCount <= lastEnd) // <=
    {
      windows.add(new CountWindow(lastId));
    }
    lastId--;
    lastStart -= windowSlide;
    lastEnd -= windowSlide;
  }
  return windows;
}

The analyzer warns that expressions currentCount <= lastEnd and lastStart <= currentCount are always true. Indeed, if you look at the loop condition while, then there are exactly the same expressions. This means that inside the loop these expressions will always be true, so the list windows all objects of type will be added CountWindow created in a loop. There are many options for the appearance of this meaningless check, and the first thing that comes to mind is either a refactoring artifact or a developer’s reassurance. But it can be a mistake, if you wanted to check something else …

Incorrect argument order

V6029 Possible incorrect order of arguments passed to method: ‘hasBufferForReleasedChannel’, ‘hasBufferForRemovedChannel’. NettyMessageClientDecoderDelegateTest.java (165), NettyMessageClientDecoderDelegateTest.java (166)

private void testNettyMessageClientDecoding(
       boolean hasEmptyBuffer,
       boolean hasBufferForReleasedChannel,
       boolean hasBufferForRemovedChannel) throws Exception 
{
  ....
  List<BufferResponse> messages = createMessageList (
    hasEmptyBuffer,
    hasBufferForReleasedChannel,
    hasBufferForRemovedChannel);
  ....
}

Java’s lack of the ability to call a method with named parameters sometimes plays a cruel joke with developers. This is exactly what happened when calling the method createMessageList, pointed to by the analyzer. When looking at the definition of this method, it becomes clear that the parameter hasBufferForRemovedChannel must be passed to the method before the parameter hasBufferForReleasedChannel:

private List<BufferResponse> createMessageList(
  boolean hasEmptyBuffer,
  boolean hasBufferForRemovedChannel,
  boolean hasBufferForReleasedChannel) 
{
  ....
  if (hasBufferForReleasedChannel) {
    addBufferResponse(messages, 
                      releasedInputChannelId, 
                      Buffer.DataType.DATA_BUFFER, 
                      BUFFER_SIZE, 
                      seqNumber++);
  }
  if (hasBufferForRemovedChannel) {
    addBufferResponse(messages, 
                      new InputChannelID(), 
                      Buffer.DataType.DATA_BUFFER, 
                      BUFFER_SIZE, 
                      seqNumber++);
  }
  ....
  return messages;
}

However, when calling the method, the developer mixed up the order of these arguments, which is why the logic of the method createMessageList will be broken if the values ​​of the mixed arguments differ.

Oh, this copy-paste

V6032 It is odd that the body of method ‘seekToFirst’ is fully equivalent to the body of another method ‘seekToLast’. RocksIteratorWrapper.java (53), RocksIteratorWrapper.java (59)

public class RocksIteratorWrapper implements RocksIteratorInterface, Closeable {
  ....
  private RocksIterator iterator;
  ....

  @Override
  public void seekToFirst() {
    iterator.seekToFirst(); // <=
    status(); 
  }
  
  @Override
  public void seekToLast() {
    iterator.seekToFirst();  // <=
    status();
  }
  
  ....
}

Method bodies seekToFirst and seekToLast match. Moreover, both methods are used in the code.

Something is unclean here! Indeed, if you look at what methods the object has iterator, then it will become clear what error the analyzer helped to find:

public class RocksIterator extends AbstractRocksIterator<RocksDB>
{
  ....
}

public abstract class AbstractRocksIterator<....> extends ....
{
  ....
  public void seekToFirst() // <=
  {
    assert this.isOwningHandle();
    this.seekToFirst0(this.nativeHandle_);
  }
  
  public void seekToLast() // <=
  {
    assert this.isOwningHandle();
    this.seekToLast0(this.nativeHandle_);
  }
  ....
}

It turns out that the method seekToLast class RocksIteratorWrapper was created by copy-paste method seekToFirst of the same class. However, for some reason, the developer forgot to replace the method call seekToFirst at iterator on seekToLast

Confusion with format strings

V6046 Incorrect format. A different number of format items is expected. Arguments not used: 1. UnsignedTypeConversionITCase.java (102)

public static void prepareMariaDB() throws IllegalStateException {
  ....
  if (!initDbSuccess) {
    throw new IllegalStateException(
      String.format(
        "Initialize MySQL database instance failed after {} attempts," + // <=
        " please open an issue.", INITIALIZE_DB_MAX_RETRY));
  }
}

Method format strings String.format and loggers in Java are different. Unlike the format string of the method String.formatwhere argument substitutions are specified with the ‘%’ character, the logger format strings use the ‘{}’ character combination instead. Because of this confusion, this error occurred. As a format string to a method String.format a string is transmitted that was most likely copied from another location where it was used in some logger. As a result, in the exception message IllegalStateException the field value will not be substituted INITIALIZE_DB_MAX_RETRY instead of ‘{}’, and the person who catches or logs this exception will never know how many attempts to connect to the database were made.

Abnormal distribution

V6048 This expression can be simplified. Operand ‘index’ in the operation equals 0. CollectionUtil.java (76)

public static <T> Collection<List<T>> partition(Collection<T> elements, 
                                                int numBuckets) 
{
  Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
  
  int initialCapacity = elements.size() / numBuckets;

  int index = 0;
  for (T element : elements) 
  {
    int bucket = index % numBuckets;                                 // <=
    buckets.computeIfAbsent(bucket, 
                            key -> new ArrayList<>(initialCapacity))

           .add(element); 
  }

  return buckets.values();
}

Method partition separates elements from collection elements into multiple segments, and then returns those segments. However, due to the error pointed out by the analyzer, no separation will occur. Expression used to determine the segment number index% numBuckets, will always be 0 because index always equal to 0. Initially, I thought that the code of this method was refactored, as a result of which they forgot to add an increase to the variable index in a loop for… But after looking commitwhere this method was added, it turned out that this error came along with this method. Corrected version of the code:

public static <T> Collection<List<T>> partition(Collection<T> elements, 
                                                int numBuckets) 
{
  Map<Integer, List<T>> buckets = new HashMap<>(numBuckets);
  
  int initialCapacity = elements.size() / numBuckets;

  int index = 0;
  for (T element : elements) 
  {
    int bucket = index % numBuckets; 
    buckets.computeIfAbsent(bucket, 
                            key -> new ArrayList<>(initialCapacity))
           .add(element);
    index++;
  }

  return buckets.values();
}

Incompatible type

V6066 The type of object passed as argument is incompatible with the type of collection: String, ListStateDescriptor . FlinkKafkaProducer.java (1083)

public interface OperatorStateStore 
{
  Set<String> getRegisteredStateNames();
}
public class FlinkKafkaProducer<IN> extends ....
{
  ....
  private static final 
  ListStateDescriptor<FlinkKafkaProducer.NextTransactionalIdHint>
  NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR = ....;

  @Override
  public void initializeState(FunctionInitializationContext context).... 
  {
    ....
    if (context.getOperatorStateStore()
               .getRegisteredStateNames()
               .contains(NEXT_TRANSACTIONAL_ID_HINT_DESCRIPTOR))    // <=
    {
       migrateNextTransactionalIdHindState(context);
    }
    ....
  }
}

The expression pointed to by the analyzer will always be false, which means calling the method migrateNextTransactionalIdHindState will never happen. How did it happen that someone searches in a collection like Set an element of a completely different type – ListStateDescriptor ? Without the help of the analyzer, such an error, most likely, would have lived in the code for a very long time, since it does not strike the eye and it is simply impossible to find it without a thorough check of this method.

Non-atomic variable change

V6074 Non-atomic modification of volatile variable. Inspect ‘currentNumAcknowledgedSubtasks’. PendingCheckpointStats.java (131)

boolean reportSubtaskStats(JobVertexID jobVertexId, SubtaskStateStats subtask) {
  TaskStateStats taskStateStats = taskStats.get(jobVertexId);

  if (taskStateStats != null && taskStateStats.reportSubtaskStats(subtask)) {
    currentNumAcknowledgedSubtasks++;                // <=
    latestAcknowledgedSubtask = subtask;

    currentStateSize += subtask.getStateSize();      // <=

    long processedData = subtask.getProcessedData();
    if (processedData > 0) {
      currentProcessedData += processedData;         // <=
    }

    long persistedData = subtask.getPersistedData();
    if (persistedData > 0) {
      currentPersistedData += persistedData;         // <=
    }
    return true;
  } else {
    return false;
  }
}

Plus 3 more analyzer warnings in the same method:

  • V6074 Non-atomic modification of volatile variable. Inspect ‘currentStateSize’. PendingCheckpointStats.java (134)
  • V6074 Non-atomic modification of volatile variable. Inspect ‘currentProcessedData’. PendingCheckpointStats.java (138)
  • V6074 Non-atomic modification of volatile variable. Inspect ‘currentPersistedData’. PendingCheckpointStats.java (143)

The analyzer suggested that as many as 4 volatile fields in a method are modified non-atomic. And the analyzer, as always, turns out to be right, because the operations ++ and + =, in fact, it is a sequence of several read-modify-write operations. As you know, the value volatile fields are visible to all threads, which means that due to a race condition, some of the field changes may be lost. You can read more detailed information about this in the description of the diagnostics.

Conclusion

In Big Data projects, reliability is one of the key requirements, therefore, the quality of the code in them must be closely monitored. Apache Flink developers have been assisted in this by several tools, and they have written a significant number of tests. However, even under such conditions, the PVS-Studio analyzer was able to find errors. It is impossible to completely eliminate errors, but the use of various static code analysis tools on a regular basis will allow you to get closer to this ideal. Yes, exactly regularly. Only with regular use does static analysis show its effectiveness, which is described in more detail in this article

If you want to share this article with an English-speaking audience, please use the translation link: Valery Komarov. Big / Bug Data: Analyzing the Apache Flink Source Code.

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *