Stepping on a rake. Experience writing Kubernetes Operator

Recently, together with a colleague, I implemented an operator for Kubernetes – Vector Operator. (Here it is described how we came to the decision that we need our own operator for Logging in Kubernetes)
Within the framework of this article, I will describe various interesting Tasks / Problems that we encountered during the development process and how we solved them.

Controller Runtime Cache

Problem:
When testing the operator in production conditions, we encountered an error that looked like this in the logs:

W1029 16:56:43.443740   81279 reflector.go:324] pkg/mod/k8s.io/client-go@v0.24.2/tools/cache/reflector.go:167: failed to list *v1.Secret: stream error when reading response body, may be caused by closed connection. Please retry. Original error: stream error: stream ID 155; INTERNAL_ERROR; received from peer
I1029 16:56:43.443865   81279 trace.go:205] Trace[1292523411]: "Reflector ListAndWatch" name:pkg/mod/k8s.io/client-go@v0.24.2/tools/cache/reflector.go:167 (29-Oct-2022 16:55:43.022) (total time: 60421ms):
Trace[1292523411]: ---"Objects listed" error:stream error when reading response body, may be caused by closed connection. Please retry. Original error: stream error: stream ID 155; INTERNAL_ERROR; received from peer 60421ms (16:56:43.443)
Trace[1292523411]: [1m0.4217365s] [1m0.4217365s] END

Analytics:
When we execute Get of some object (for example – secret):

err := c.Get(ctx, client.ObjectKeyFromObject(desired), existing)
if err != nil {
  return err
}

The controller tries to get a list of ALL secrets, subscribe to the change of each one, and store all this in cache. There are a lot of secrets in the cluster where the tests took place, and the operator simply did not have time to get all the secrets (it does this with one request) and the request fell off by timeout.
After digging into the controller code, we realized that this is its default behavior.

Solution:
When registering a manager, you must specify which resources should NOT be cached. We indicate here all the resources with which we work:

mgr, err := ctrl.NewManager(config, ctrl.Options{
		Scheme:                 scheme,
		...
    ...
		ClientDisableCacheFor: []client.Object{&corev1.Secret{}, &corev1.ConfigMap{}, &corev1.Pod{}, &appsv1.Deployment{},
			&appsv1.StatefulSet{}, &rbacv1.ClusterRole{}, &rbacv1.ClusterRoleBinding{}},
	})

Cancellation Reconcile by Time

Problem:
One of the main tasks of the operator is to ensure that all the objects that he deployed are in a valid state. That is, if someone in the cluster corrects an object controlled by the operator with handles, the operator must catch it and return everything to the “controlled” state. (Perhaps with the exception of some fields, but this is a question about server side applywhich is worthy of a separate article and is not considered here)

Analytics:
By default, we have configured Vector’s Reconcile to fire every 15 seconds. That is, every 15 seconds, the operator starts checking that all the rolled out resources are in the right state.
However, this looks like overkill. More often than not, when Reconcile is launched, it does nothing.
It would be great if we could “subscribe” to all the resources that are created for the work of Vector’a and run Reconcile only if there were any changes in these resources. And we have such an opportunity!
When initializing the Controller-Runtime Manager, we can tell it the following:

func (r *VectorReconciler) SetupWithManager(mgr ctrl.Manager) error {
	return ctrl.NewControllerManagedBy(mgr).
		For(&vectorv1alpha1.Vector{}).
		Owns(&appsv1.DaemonSet{}).
		Owns(&corev1.Service{}).
		Owns(&corev1.Secret{}).
		Owns(&corev1.ServiceAccount{}).
		Complete(r)
}

In short, this means:

  • We subscribe to all changes related to the resource vectorv1alpha1.Vector. This is specified in the method. For

  • We subscribe to all resources that are rolled out by the operator, whose Owners contains vectorv1alpha1.Vector. This is indicated in Owns

However, during testing on production conditions, we catch the error we already know:

W1029 16:56:43.443740   81279 reflector.go:324] pkg/mod/k8s.io/client-go@v0.24.2/tools/cache/reflector.go:167: failed to list *v1.Secret: stream error when reading response body, may be caused by closed connection. Please retry. Original error: stream error: stream ID 155; INTERNAL_ERROR; received from peer
I1029 16:56:43.443865   81279 trace.go:205] Trace[1292523411]: "Reflector ListAndWatch" name:pkg/mod/k8s.io/client-go@v0.24.2/tools/cache/reflector.go:167 (29-Oct-2022 16:55:43.022) (total time: 60421ms):
Trace[1292523411]: ---"Objects listed" error:stream error when reading response body, may be caused by closed connection. Please retry. Original error: stream error: stream ID 155; INTERNAL_ERROR; received from peer 60421ms (16:56:43.443)
Trace[1292523411]: [1m0.4217365s] [1m0.4217365s] END

Again she. It occurs for the following reason:
At startup, the Operator reads from the Kubernetes API all objects whose types are listed in Owns. It does this in order to check whether at the time of the start of the Operator there are some objects in the cluster that ALREADY have our Vector set by Owner.
Logical behavior, however, it leads to the fact that on highly loaded (most likely bare-metal) Kubernetes clusters, our operator will constantly spam errors and crash. Yes, and it looks inappropriate when the operator under the hood is trying to pull ALL objects of some type from the cluster.

Solution:
We plan to be able to use the operator on highload and fat Kubernetes clusters, so we had to fall back to running Reconcile every 15 seconds

Duplicating the Vector configuration structure

Problem:
Vector has a defined structure for its configuration file. Architecturally, it consists of 4 blocks:

  1. Global – Important settings of Vector itself. (Does it need to enable the API, the directory in which it will store the cache, helthcheck’s parameter settings, etc.)

  2. Source – ANDsources where the data should be drawn from

  3. Transforms – Instructions for data processing

  4. Sink – Where to send data

Vector strictly monitors the structure of the configuration file and if there are any errors in it, it will give an error and will not start.
To be honest, we really didn’t want to duplicate Vector’s work and store the ALL structure of the configuration file somewhere in the operator and validate it when someone creates new instructions for collecting / processing / sending metrics / logs (Within Vector Operator such instructions are specified via the Custom Resource VectorPipeline (or ClusterVectorPipeline)). We did not want this for several reasons:

  • Vector is written in Rust. And we simply could not reuse ready-made structures that are already defined in Vector in our operator, which we implement in Go

  • Copying the entire structure of Vector’s config file into Go’s structures is a chore, a tedious task

  • We can easily make a mistake somewhere and, for example, specify the type int somewhere when a float is needed, as was noted in Logging Operator

  • Operator upgrades to new versions of Vector are becoming a living hell. Don’t forget anything, don’t make a mistake anywhere. And such manual work in the end always leads to errors.

  • If we want to support the ability to install different versions of Vector, we need to store and control the structure for each version

Analytics:

We decided that the user, when creating a CR VectorPipeline, should be able to specify anything in the sources, transforms and sinks blocks, however, the operator should be able to get specific fields from what the user specified in the CR and check them.
For example, if the user in the CR VectorPipeline specifies the field extra_namespace_label_selector – an error should be thrown, because the Custom Resource VectorPipeline should work, and pick up logs, only in the NameSpace where it is defined. Field extra_namespace_label_selector is filled in automatically, otherwise an ordinary user in the cluster with the rights to create a VectorPipeline will be able to set up the collection of logs, for example, from objects in the NameSpace kube-system, which, to put it mildly, is not secure.

Solution:
By default, when creating a Custom Resource, all fields not specified in the CRD are removed, but kubebuilder has a code generator option:
// +kubebuilder:pruning:PreserveUnknownFieldswhich adding to the generated CRD field x-kubernetes-preserve-unknown-fields: trueallows you to pass any number of additional fields.
The RAW data in the go structure that CRD describes is of type runtime.RawExtension.

type VectorPipelineSpec struct {
	// +kubebuilder:pruning:PreserveUnknownFields
	Sources *runtime.RawExtension `json:"sources,omitempty"`
	// +kubebuilder:pruning:PreserveUnknownFields
	Transforms *runtime.RawExtension `json:"transforms,omitempty"`
	// +kubebuilder:pruning:PreserveUnknownFields
	Sinks *runtime.RawExtension `json:"sinks,omitempty"`
}

But then a problem arises: as written above, we want to control part of the config, and send part as is, respectively, after deserialization, we need to push part of the data into the fields that we have defined, and leave part as is.
The package comes to the rescue. mapstructurewhich decodes maps into structures and, with the help of a special remain tag, does what we need.
Total:
– Binary runtime.RawExtension we deserialize into map[string]interface{} – Received map[string]interface{} we use mapstructure to decode the structure into go, and we transfer those keys that match the names of predefined fields to them for further processing and validation, and everything else to the Options field.
The structure of the config itself looks like this:

type VectorConfig struct {
	DataDir    string                  `mapstructure:"data_dir"`
	Api        *vectorv1alpha1.ApiSpec `mapstructure:"api"`
	Sources    []*Source               `mapstructure:"sources"`
	Transforms []*Transform            `mapstructure:"transforms"`
	Sinks      []*Sink                 `mapstructure:"sinks"`
}

type Source struct {
	Name                        string
	Type                        string                 `mapper:"type"`
	ExtraNamespaceLabelSelector string                 `mapper:"extra_namespace_label_selector,omitempty"`
	Options                     map[string]interface{} `mapstructure:",remain"`
}

type Transform struct {
	Name    string
	Type    string                 `mapper:"type"`
	Inputs  []string               `mapper:"inputs"`
	Options map[string]interface{} `mapstructure:",remain"`
}

type Sink struct {
	Name    string
	Type    string                 `mapper:"type"`
	Inputs  []string               `mapper:"inputs"`
	Options map[string]interface{} `mapstructure:",remain"`
}

Thus, we allowed users to create a CR VectorPipeline (or ClusterVectorPipeline) with any structure for sources, transforms and sinks, but did not lose the ability to analyze what exactly the user created.

Automatic check of the generated config

Problem:
Since we have no control over the structure for source, transforms, sink, which the user sets using CR VectorPipeline – K8s will validate absolutely everything that is written there, even if this is absolute rubbish, with which Vector will never start.
For example, here is such a CR VectorPipeline, the Kubernetes API will be quite suitable for itself

cat <<EOF | kubectl apply -f -
apiVersion: observability.kaasops.io/v1alpha1
kind: VectorPipeline
metadata:
  name: example
spec:
  sources:
    source-test:
      lol: "kek"
  transforms:
    transform-test:
      kek: "lol"
  sinks:
    sink-test:
      cheburek: "wow"
EOF
vectorpipeline.observability.kaasops.io/example created

However, when the operator tries to generate a configuration file for Vector from this, it will surely fail.

Solution:
The generated config goes through two stages of verification:
Validation is a scope check. For example, at the validation stage, non-clustered vector pipelines that are trying to collect logs from neighboring namespaces or journald will be discarded.
configcheck – checking the validated config by Vector itself.

ConfigCheck solves our problem. Every time the user creates/updates a VectorPipeline the following happens:

  1. A secret is generated, in which Vector’s configuration file is written, which consists of sources, transforms, sinks, which are described in VectorPipeline

  2. The pod is launched, in the container of which the Vector is launched with the flag validate.

  3. If the pod completes successfully (that is, the validation of the configuration specified in the VectorPipeline is correct) in the VectorPipeline status in the field ConfigCheckResult value is set true. If the pod ended with an error (that is, the validation ended with an error) – in the field ConfigCheckResult installed false.

Further, when the reconcile of the Vector itself starts, it will generate for itself a configuration file of only those VectorPipelines that have in the field ConfigCheckResult costs true. (For Vector, ConfigCheck is also run, and if it fails, Vector will not be updated)

Accordingly, in such a simple way, we made validation for CR VectorPipeline. Moreover, the user who creates the VectorPipeline can at any time view the field ConfigCheckResult and make sure its VectorPipeline is valid.

Disclaimer:
We do not believe that this option for checking configs is “ideal”. It would be ideal if all checks were run during apply/create CR VectorPipeline. To do this, we need a competent Validate WebHook. However, now the configcheck takes a “long” time to execute (20 seconds on average). And it turns out that when the user runs the create / apply command, the console “freezes” for ~ 20 seconds while ConfigCheck is being executed under the hood, which looks like ordinary “brakes”.
In the future, we have ideas on how to speed up and improve this process.

We get logs from pods

Problem:
As described above, if the user creates a VectorPipeline with an invalid configuration, then his VectorPipeline has a ConfigCheckResult field that will say false. Excellent! But how can the user find out what exactly is wrong in his configuration?
This information is available in the logs of the pod in which the VectorPipeline configuration validation was launched, but these pods are created in the Namespace, where the Custom Resouce Vector is described and, accordingly, where the Vector itself is deployed. Often, a user in a cluster does not have access to such a pod. And it’s not very nice that in order to find out what exactly is broken in VectorPipeline, you need to know that there is somewhere some kind of pod in which this is written.

Solution:
It was decided to add a Reason field to the Status VectorPipeline that appears when ConfigCheckResult is in the false state.
In this Reason, you need to push the logs from the pod that validated the config, but there is a catch. Controller Runtime, which is used in both KubeBuilder and OpenSDK when registering an operator, does not know how to get logs from pods. (There is an open issue since May 2019 and it is not necessary to expect that it will be resolved in the near future)
Controller Runtime does not know how, and to hell with it, but it is default client-go does an excellent job of this. Register ClientSet:

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
	panic(err)
}

We describe the function in order to get the logs from the Pod:

func GetPodLogs(pod *corev1.Pod, cs *kubernetes.Clientset) (string, error) {
	count := int64(100)
	podLogOptions := corev1.PodLogOptions{
		TailLines: &count,
	}

	req := cs.CoreV1().Pods(pod.Namespace).GetLogs(pod.Name, &podLogOptions)
	podLogs, err := req.Stream(context.TODO())
	if err != nil {
		return "", err
	}
	defer podLogs.Close()

	buf := new(bytes.Buffer)
	_, err = io.Copy(buf, podLogs)
	if err != nil {
		return "", err
	}
	str := buf.String()

	return str, nil
}

We use it for health.
Of the minuses – we now use 2 similar clients to work with Kubernetes:

Not critical)

Skip Extra Reconcile

Problem:
Each time the operator starts, it starts to reconsolidate all existing resources (in particular, VectorPipeline and ClusterVectorPipeline), and, accordingly, run configuration file validations (generate pods / secrets) even if our resources have not changed (the operator cannot in any way determine whether changed resources while it was off, accordingly launches Reconcile for everything)

Solution:
In order not to run a bunch of unnecessary ConfigChecks, we added a new field to the Status VectorPipeline – LastAppliedPipelineHash. This is where the Hash of the Configuration that was checked is written.
Now the Reconcile VectorPipeline looks like this:

  1. A configuration is generated that needs to be tested.

  2. The hash of this configuration is taken and compared with the Hash in the field LastAppliedPipelineHash(If the field is filled). If the hashes match, then this configuration has already been checked and there is no need to restart the ConfigCheck process.

In general, that’s all. If you have something to discuss – I’m waiting for you in the comments

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *