diff --git a/.secrets.baseline b/.secrets.baseline index 0197e0a1f3d..74979b4f91b 100644 --- a/.secrets.baseline +++ b/.secrets.baseline @@ -1156,7 +1156,7 @@ "filename": "infra/feast-operator/internal/controller/services/services.go", "hashed_secret": "36dc326eb15c7bdd8d91a6b87905bcea20b637d1", "is_verified": false, - "line_number": 179 + "line_number": 180 } ], "infra/feast-operator/internal/controller/services/tls_test.go": [ @@ -1539,5 +1539,5 @@ } ] }, - "generated_at": "2026-05-20T07:55:48Z" + "generated_at": "2026-05-22T11:36:48Z" } diff --git a/infra/feast-operator/internal/controller/services/services.go b/infra/feast-operator/internal/controller/services/services.go index 47226d460aa..4acb8504d4e 100644 --- a/infra/feast-operator/internal/controller/services/services.go +++ b/infra/feast-operator/internal/controller/services/services.go @@ -32,6 +32,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/log" @@ -768,6 +769,17 @@ func (feast *FeastServices) setInitContainer(podSpec *corev1.PodSpec, fsYamlB64 } } +// getServiceAppProtocol returns the appProtocol for a Service port. +// The registry gRPC service uses the gRPC protocol, which requires HTTP/2. +// Setting appProtocol allows service meshes (e.g. Istio) and load balancers +// to correctly classify the traffic and avoid downgrading to HTTP/1.1. +func (feast *FeastServices) getServiceAppProtocol(feastType FeastServiceType, isRestService bool) *string { + if feastType == RegistryFeastType && !isRestService && feast.isRegistryGrpcEnabled() { + return ptr.To("grpc") + } + return nil +} + func (feast *FeastServices) setService(svc *corev1.Service, feastType FeastServiceType, isRestService bool) error { svc.Labels = feast.getFeastTypeLabels(feastType) if feast.isOpenShiftTls(feastType) { @@ -829,10 +841,11 @@ func (feast *FeastServices) setService(svc *corev1.Service, feastType FeastServi Type: corev1.ServiceTypeClusterIP, Ports: []corev1.ServicePort{ { - Name: scheme, - Port: port, - Protocol: corev1.ProtocolTCP, - TargetPort: intstr.FromInt(int(targetPort)), + Name: scheme, + Port: port, + Protocol: corev1.ProtocolTCP, + TargetPort: intstr.FromInt(int(targetPort)), + AppProtocol: feast.getServiceAppProtocol(feastType, isRestService), }, }, } diff --git a/infra/feast-operator/internal/controller/services/services_test.go b/infra/feast-operator/internal/controller/services/services_test.go index 4b7b4343216..7cabc1d596d 100644 --- a/infra/feast-operator/internal/controller/services/services_test.go +++ b/infra/feast-operator/internal/controller/services/services_test.go @@ -632,6 +632,118 @@ var _ = Describe("Registry Service", func() { }) }) +var _ = Describe("Service AppProtocol Configuration", func() { + var ( + featureStore *feastdevv1.FeatureStore + feast *FeastServices + ctx context.Context + ) + + BeforeEach(func() { + ctx = context.Background() + featureStore = &feastdevv1.FeatureStore{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testfeaturestore-approtocol", + Namespace: "default", + }, + Spec: feastdevv1.FeatureStoreSpec{ + FeastProject: "testproject", + Services: &feastdevv1.FeatureStoreServices{ + Registry: &feastdevv1.Registry{ + Local: &feastdevv1.LocalRegistryConfig{ + Server: &feastdevv1.RegistryServerConfigs{ + ServerConfigs: feastdevv1.ServerConfigs{ + ContainerConfigs: feastdevv1.ContainerConfigs{ + DefaultCtrConfigs: feastdevv1.DefaultCtrConfigs{ + Image: ptr.To("test-image"), + }, + }, + }, + GRPC: ptr.To(true), + RestAPI: ptr.To(false), + }, + }, + }, + }, + }, + } + Expect(k8sClient.Create(ctx, featureStore)).To(Succeed()) + applySpecToStatus(featureStore) + feast = &FeastServices{ + Handler: handler.FeastHandler{ + Client: k8sClient, + Context: ctx, + Scheme: k8sClient.Scheme(), + FeatureStore: featureStore, + }, + } + Expect(feast.ApplyDefaults()).To(Succeed()) + applySpecToStatus(featureStore) + }) + + AfterEach(func() { + Expect(k8sClient.Delete(ctx, featureStore)).To(Succeed()) + }) + + It("should return grpc appProtocol for the registry gRPC service", func() { + Expect(feast.isRegistryGrpcEnabled()).To(BeTrue()) + Expect(feast.getServiceAppProtocol(RegistryFeastType, false)).To(Equal(ptr.To("grpc"))) + }) + + It("should return nil appProtocol for the registry REST service", func() { + featureStore.Spec.Services.Registry.Local.Server.RestAPI = ptr.To(true) + Expect(k8sClient.Update(ctx, featureStore)).To(Succeed()) + Expect(feast.ApplyDefaults()).To(Succeed()) + applySpecToStatus(featureStore) + + Expect(feast.getServiceAppProtocol(RegistryFeastType, true)).To(BeNil()) + }) + + It("should return nil appProtocol for the online store service", func() { + Expect(feast.getServiceAppProtocol(OnlineFeastType, false)).To(BeNil()) + }) + + It("should return nil appProtocol for the offline store service", func() { + Expect(feast.getServiceAppProtocol(OfflineFeastType, false)).To(BeNil()) + }) + + It("should return nil appProtocol when registry gRPC is disabled", func() { + featureStore.Spec.Services.Registry.Local.Server.GRPC = ptr.To(false) + featureStore.Spec.Services.Registry.Local.Server.RestAPI = ptr.To(true) + Expect(k8sClient.Update(ctx, featureStore)).To(Succeed()) + Expect(feast.ApplyDefaults()).To(Succeed()) + applySpecToStatus(featureStore) + + Expect(feast.isRegistryGrpcEnabled()).To(BeFalse()) + Expect(feast.getServiceAppProtocol(RegistryFeastType, false)).To(BeNil()) + }) + + It("should set grpc appProtocol on the registry gRPC Service port", func() { + Expect(feast.deployFeastServiceByType(RegistryFeastType)).To(Succeed()) + svc := feast.initFeastSvc(RegistryFeastType) + Expect(svc).NotTo(BeNil()) + Expect(feast.setService(svc, RegistryFeastType, false)).To(Succeed()) + + Expect(svc.Spec.Ports).To(HaveLen(1)) + Expect(svc.Spec.Ports[0].AppProtocol).To(Equal(ptr.To("grpc"))) + }) + + It("should not set appProtocol on the registry REST Service port", func() { + featureStore.Spec.Services.Registry.Local.Server.RestAPI = ptr.To(true) + Expect(k8sClient.Update(ctx, featureStore)).To(Succeed()) + Expect(feast.ApplyDefaults()).To(Succeed()) + applySpecToStatus(featureStore) + + Expect(feast.deployFeastServiceByType(RegistryFeastType)).To(Succeed()) + restSvc := feast.initFeastRestSvc(RegistryFeastType) + Expect(restSvc).NotTo(BeNil()) + Expect(feast.setService(restSvc, RegistryFeastType, true)).To(Succeed()) + + Expect(restSvc.Spec.Ports).To(HaveLen(1)) + Expect(restSvc.Spec.Ports[0].AppProtocol).To(BeNil()) + }) +}) + var _ = Describe("Pod Container Failure Messages", func() { It("should detect init container in CrashLoopBackOff", func() { pod := &corev1.Pod{