AICosmus

Where tech meets the everyday — AI, fintech, swimming, and cars.
gRPC Interceptor 계층 구조 일러스트

gRPC Interceptor 완벽 가이드: 인증부터 로깅까지

마이크로서비스를 gRPC로 구축하다 보면 이상하게 익숙한 패턴이 반복됩니다. 모든 RPC 핸들러 앞에 토큰 검증 코드를 붙이고, 요청마다 로깅을 찍고, 예외가 터지면 gRPC 상태 코드로 변환하는 코드를 복사해 붙입니다. 서비스가 3개, 5개, 10개로 늘어나면 이 반복 코드는 걷잡을 수 없이 불어나죠.

웹 프레임워크에서는 미들웨어(Middleware)로 이 문제를 해결합니다. Express.js의 미들웨어, Django의 Middleware, Spring의 Filter/Interceptor처럼요. gRPC에도 정확히 같은 역할을 하는 메커니즘이 있습니다. 바로 Interceptor입니다.

이 글에서는 gRPC Interceptor의 동작 원리를 처음부터 짚고, 실무에서 가장 많이 쓰이는 인증·로깅·에러 처리·재시도 패턴을 코드와 함께 살펴봅니다. Python과 Go 예시를 중심으로 진행하되, 다른 언어에서도 개념은 동일하게 적용됩니다.

gRPC Interceptor 요청 응답 흐름도

gRPC Interceptor란 무엇인가

Interceptor는 gRPC 호출이 실제 핸들러(서버 측) 또는 네트워크 전송(클라이언트 측)에 도달하기 전후에 끼어드는 함수입니다. HTTP 미들웨어와 개념이 같지만, gRPC의 호출 모델에 맞춰 설계된 점이 다릅니다.

Unary Interceptor와 Stream Interceptor

gRPC에는 네 가지 호출 패턴이 있습니다. 단일 요청-응답인 Unary, 서버가 여러 응답을 보내는 Server Streaming, 클라이언트가 여러 요청을 보내는 Client Streaming, 양쪽 모두 스트리밍하는 Bidirectional Streaming입니다. Interceptor도 이에 맞춰 두 종류로 나뉩니다.

Unary Interceptor는 단일 요청-응답 패턴에 적용됩니다. 요청 하나가 들어오고 응답 하나가 나가는 가장 단순한 형태라서, 대부분의 공통 로직은 여기서 처리합니다. Stream Interceptor는 스트리밍 RPC에 적용됩니다. 스트림이 열리는 시점, 메시지가 오가는 시점, 스트림이 닫히는 시점 각각에 로직을 넣을 수 있어 더 세밀한 제어가 가능합니다.

실무에서는 Unary Interceptor만으로도 80% 이상의 공통 관심사를 처리할 수 있습니다. 스트리밍을 쓰는 서비스라면 Stream Interceptor도 함께 구현하면 됩니다.

서버 Interceptor와 클라이언트 Interceptor

Interceptor는 적용 위치에 따라서도 구분됩니다. 서버 Interceptor는 요청이 서버에 도달했을 때 핸들러 호출 전후에 실행됩니다. 인증 검증, 요청 로깅, 에러 변환 같은 서버 측 공통 로직을 담습니다. 클라이언트 Interceptor는 클라이언트가 RPC를 호출할 때 네트워크 전송 전후에 실행됩니다. 인증 토큰 주입, 자동 재시도, 타임아웃 설정 같은 클라이언트 측 공통 로직을 담습니다.

하나의 gRPC 호출이 완료되기까지 클라이언트 Interceptor → 네트워크 → 서버 Interceptor → 핸들러 → 서버 Interceptor → 네트워크 → 클라이언트 Interceptor 순서로 거칩니다. 양쪽 모두에 Interceptor를 두면 호출의 전체 생애주기를 제어할 수 있습니다.

Interceptor 체이닝

Interceptor는 여러 개를 연결해서 쓸 수 있습니다. 이것을 체이닝(chaining)이라고 합니다. 등록 순서대로 실행되며, 각 Interceptor는 다음 Interceptor를 호출하거나 호출을 중단할 수 있습니다. 예를 들어 인증 Interceptor에서 토큰 검증에 실패하면 다음 Interceptor나 핸들러를 호출하지 않고 즉시 UNAUTHENTICATED 에러를 반환할 수 있습니다.

체이닝 순서는 중요합니다. 보통 로깅 → 인증 → 검증 → 비즈니스 핸들러 순서로 배치합니다. 로깅을 가장 바깥에 두면 인증 실패 요청도 로그에 남길 수 있기 때문입니다. 이 순서에 대해서는 뒤에서 더 자세히 다루겠습니다.

서버 Interceptor 실전 구현

이론은 여기까지 하고, 이제 실제 코드로 들어가 봅시다. 가장 많이 쓰이는 세 가지 서버 Interceptor를 Python과 Go로 구현해 보겠습니다.

1. 인증 Interceptor

거의 모든 gRPC 서비스에 필요한 것이 인증입니다. 클라이언트가 메타데이터(HTTP 헤더에 해당)에 실어 보낸 토큰을 검증하고, 유효하지 않으면 호출을 거부합니다.

Python (grpcio) 구현:

import grpc
from grpc import ServerInterceptor

class AuthInterceptor(ServerInterceptor):
    def __init__(self, token_verifier):
        self._verifier = token_verifier
        # 인증을 건너뛸 메서드 목록
        self._public_methods = {
            '/health.HealthService/Check',
            '/grpc.reflection.v1alpha.ServerReflection/ServerReflectionInfo',
        }

    def intercept_service(self, continuation, handler_call_details):
        method = handler_call_details.method
        
        # 공개 엔드포인트는 인증 없이 통과
        if method in self._public_methods:
            return continuation(handler_call_details)
        
        # 메타데이터에서 토큰 추출
        metadata = dict(handler_call_details.invocation_metadata)
        auth_header = metadata.get('authorization', '')
        
        if not auth_header.startswith('Bearer '):
            return self._abort(grpc.StatusCode.UNAUTHENTICATED,
                             'Missing or invalid authorization header')
        
        token = auth_header[7:]  # 'Bearer ' 제거
        
        try:
            user_info = self._verifier.verify(token)
        except TokenExpiredError:
            return self._abort(grpc.StatusCode.UNAUTHENTICATED,
                             'Token has expired')
        except InvalidTokenError:
            return self._abort(grpc.StatusCode.UNAUTHENTICATED,
                             'Invalid token')
        
        # 검증 성공 시 다음 Interceptor 또는 핸들러로 진행
        return continuation(handler_call_details)

    def _abort(self, code, message):
        def _unary_abort(request, context):
            context.abort(code, message)
        # Unary/Stream 모든 핸들러 유형에 대한 abort 래퍼 반환
        return grpc.unary_unary_rpc_method_handler(_unary_abort)

핵심 포인트를 짚겠습니다. 첫째, 공개 메서드 화이트리스트를 두었습니다. 헬스체크나 gRPC 리플렉션 같은 엔드포인트는 인증 없이 접근 가능해야 합니다. 둘째, 토큰 검증 로직을 token_verifier로 분리했습니다. JWT, OAuth2, 커스텀 토큰 등 검증 방식이 바뀌어도 Interceptor 코드는 수정할 필요가 없습니다.

Go 구현:

func AuthUnaryInterceptor(verifier TokenVerifier) grpc.UnaryServerInterceptor {
    publicMethods := map[string]bool{
        "/health.HealthService/Check": true,
    }
    
    return func(
        ctx context.Context,
        req interface{},
        info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler,
    ) (interface{}, error) {
        // 공개 메서드는 통과
        if publicMethods[info.FullMethod] {
            return handler(ctx, req)
        }
        
        // 메타데이터 추출
        md, ok := metadata.FromIncomingContext(ctx)
        if !ok {
            return nil, status.Error(codes.Unauthenticated, "missing metadata")
        }
        
        tokens := md.Get("authorization")
        if len(tokens) == 0 {
            return nil, status.Error(codes.Unauthenticated, "missing token")
        }
        
        token := strings.TrimPrefix(tokens[0], "Bearer ")
        userInfo, err := verifier.Verify(ctx, token)
        if err != nil {
            return nil, status.Error(codes.Unauthenticated, "invalid token")
        }
        
        // 검증된 사용자 정보를 context에 주입
        ctx = context.WithValue(ctx, userInfoKey, userInfo)
        return handler(ctx, req)
    }
}

Go에서는 검증된 사용자 정보를 context.WithValue로 주입하는 패턴이 표준입니다. 핸들러에서 ctx.Value(userInfoKey)로 꺼내 쓸 수 있습니다. Python에서도 비슷하게 context 객체에 값을 설정하는 방식을 쓸 수 있습니다.

서버 Interceptor 인증 로깅 에러 처리 구조

2. 로깅 Interceptor

모든 RPC 호출의 메서드명, 소요 시간, 상태 코드를 기록하는 Interceptor입니다. 운영 환경에서 디버깅과 모니터링의 기본이 됩니다.

Python 구현:

import time
import logging
import grpc

logger = logging.getLogger('grpc.access')

class LoggingInterceptor(ServerInterceptor):
    def intercept_service(self, continuation, handler_call_details):
        method = handler_call_details.method
        start = time.monotonic()
        
        # 원래 핸들러 가져오기
        handler = continuation(handler_call_details)
        if handler is None:
            return handler
        
        # 원래 핸들러를 래핑하는 새 핸들러 생성
        if handler.unary_unary:
            original = handler.unary_unary
            def logged_unary(request, context):
                try:
                    response = original(request, context)
                    elapsed = (time.monotonic() - start) * 1000
                    logger.info(
                        'method=%s status=OK duration_ms=%.1f',
                        method, elapsed
                    )
                    return response
                except Exception as e:
                    elapsed = (time.monotonic() - start) * 1000
                    code = self._extract_code(context, e)
                    logger.warning(
                        'method=%s status=%s duration_ms=%.1f error=%s',
                        method, code, elapsed, str(e)
                    )
                    raise
            
            return grpc.unary_unary_rpc_method_handler(
                logged_unary,
                request_deserializer=handler.request_deserializer,
                response_serializer=handler.response_serializer,
            )
        
        return handler

    def _extract_code(self, context, error):
        if hasattr(context, 'code') and context.code():
            return context.code().name
        return 'UNKNOWN'

로깅 Interceptor에서 주의할 점이 있습니다. 요청/응답 본문은 기본적으로 로깅하지 않습니다. Protocol Buffers 메시지에는 개인정보나 민감한 데이터가 포함될 수 있기 때문입니다. 디버그 모드에서만 선택적으로 본문을 남기되, 그때도 마스킹 처리를 거치는 것이 안전합니다.

Go 구현:

func LoggingUnaryInterceptor() grpc.UnaryServerInterceptor {
    return func(
        ctx context.Context,
        req interface{},
        info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler,
    ) (interface{}, error) {
        start := time.Now()
        
        resp, err := handler(ctx, req)
        
        elapsed := time.Since(start)
        code := status.Code(err)
        
        log.Printf("method=%s status=%s duration=%s",
            info.FullMethod, code, elapsed)
        
        return resp, err
    }
}

Go 버전이 훨씬 간결합니다. Go의 gRPC Interceptor는 함수 시그니처가 명확해서 핸들러 래핑이 깔끔합니다. status.Code(err)로 에러에서 gRPC 상태 코드를 추출하는 것도 표준 패턴입니다.

3. 에러 변환 Interceptor

비즈니스 로직에서 발생하는 도메인 예외를 적절한 gRPC 상태 코드로 변환하는 Interceptor입니다. 핸들러마다 try-except를 쓰는 대신 Interceptor에서 일관된 에러 매핑을 처리합니다.

# Python 예시
class ErrorMappingInterceptor(ServerInterceptor):
    """도메인 예외를 gRPC 상태 코드로 매핑"""
    
    ERROR_MAP = {
        NotFoundError: grpc.StatusCode.NOT_FOUND,
        PermissionDeniedError: grpc.StatusCode.PERMISSION_DENIED,
        ValidationError: grpc.StatusCode.INVALID_ARGUMENT,
        ConflictError: grpc.StatusCode.ALREADY_EXISTS,
        RateLimitError: grpc.StatusCode.RESOURCE_EXHAUSTED,
    }
    
    def intercept_service(self, continuation, handler_call_details):
        handler = continuation(handler_call_details)
        if handler is None or not handler.unary_unary:
            return handler
        
        original = handler.unary_unary
        
        def error_mapped(request, context):
            try:
                return original(request, context)
            except Exception as e:
                for exc_type, grpc_code in self.ERROR_MAP.items():
                    if isinstance(e, exc_type):
                        context.abort(grpc_code, str(e))
                # 매핑되지 않은 예외는 INTERNAL로
                context.abort(
                    grpc.StatusCode.INTERNAL,
                    'Internal server error'
                )
        
        return grpc.unary_unary_rpc_method_handler(
            error_mapped,
            request_deserializer=handler.request_deserializer,
            response_serializer=handler.response_serializer,
        )

이 패턴의 가장 큰 장점은 핸들러가 도메인 로직에만 집중할 수 있다는 것입니다. 핸들러에서는 그냥 raise NotFoundError("User not found")를 던지면 되고, gRPC 상태 코드에 대해서는 전혀 알 필요가 없습니다. 에러 매핑 정책이 바뀌면 Interceptor 한 곳만 수정하면 모든 서비스에 일괄 적용됩니다.

매핑되지 않은 예외는 INTERNAL로 처리하면서 원본 에러 메시지는 숨깁니다. 운영 환경에서 내부 구현 세부사항이 클라이언트에 노출되면 보안 취약점이 될 수 있기 때문입니다. 원본 에러는 로깅 Interceptor가 기록하도록 역할을 분리합니다.

클라이언트 Interceptor 활용법

서버 쪽만큼이나 클라이언트 Interceptor도 실용적입니다. 모든 RPC 호출에 공통으로 필요한 로직을 한 곳에서 관리할 수 있기 때문입니다.

클라이언트 Interceptor 메타데이터 재시도 패턴

1. 메타데이터 자동 주입 Interceptor

모든 RPC 호출에 인증 토큰, 요청 ID, 추적 헤더를 자동으로 붙여 주는 Interceptor입니다.

# Python 클라이언트 Interceptor
import uuid
import grpc

class MetadataInjectorInterceptor(
    grpc.UnaryUnaryClientInterceptor
):
    def __init__(self, token_provider):
        self._token_provider = token_provider
    
    def intercept_unary_unary(
        self, continuation, client_call_details, request
    ):
        # 기존 메타데이터 복사
        metadata = list(client_call_details.metadata or [])
        
        # 인증 토큰 추가
        token = self._token_provider.get_token()
        metadata.append(('authorization', f'Bearer {token}'))
        
        # 요청 추적 ID 추가
        request_id = str(uuid.uuid4())
        metadata.append(('x-request-id', request_id))
        
        # 새 메타데이터로 호출 정보 갱신
        new_details = _ClientCallDetails(
            client_call_details.method,
            client_call_details.timeout,
            metadata,
            client_call_details.credentials,
            client_call_details.wait_for_ready,
            client_call_details.compression,
        )
        
        return continuation(new_details, request)

이 패턴을 쓰면 서비스 코드에서 매번 메타데이터를 수동으로 설정하지 않아도 됩니다. 토큰 갱신 로직까지 token_provider에 캡슐화하면 토큰 만료 시 자동 갱신까지 투명하게 처리됩니다.

2. 자동 재시도 Interceptor

일시적인 네트워크 장애나 서버 과부하에 대응하는 재시도 Interceptor입니다. gRPC에는 기본 재시도 정책(retry policy)이 있지만, 세밀한 제어가 필요할 때는 Interceptor로 직접 구현하는 것이 더 유연합니다.

# Go 클라이언트 재시도 Interceptor
func RetryUnaryInterceptor(
    maxRetries int,
    retryableCodes []codes.Code,
) grpc.UnaryClientInterceptor {
    retryable := make(map[codes.Code]bool)
    for _, c := range retryableCodes {
        retryable[c] = true
    }
    
    return func(
        ctx context.Context,
        method string,
        req, reply interface{},
        cc *grpc.ClientConn,
        invoker grpc.UnaryInvoker,
        opts ...grpc.CallOption,
    ) error {
        var lastErr error
        
        for attempt := 0; attempt <= maxRetries; attempt++ {
            lastErr = invoker(ctx, method, req, reply, cc, opts...)
            
            if lastErr == nil {
                return nil // 성공
            }
            
            // 재시도 가능한 상태 코드인지 확인
            if !retryable[status.Code(lastErr)] {
                return lastErr // 재시도 불가능한 에러
            }
            
            // 컨텍스트 취소 확인
            if ctx.Err() != nil {
                return lastErr
            }
            
            // 지수 백오프 대기
            backoff := time.Duration(1<

재시도 Interceptor의 핵심 원칙 세 가지를 기억하세요. 첫째, 멱등한(idempotent) 요청만 재시도합니다. 결제나 주문 생성 같은 비멱등 RPC를 무조건 재시도하면 중복 처리가 발생합니다. 둘째, 재시도 가능한 에러 코드를 제한합니다. UNAVAILABLE(서버 일시 불가), RESOURCE_EXHAUSTED(레이트 리밋), DEADLINE_EXCEEDED(타임아웃)는 재시도해 볼 만하지만, INVALID_ARGUMENTNOT_FOUND는 재시도해도 결과가 같습니다. 셋째, 지수 백오프를 적용합니다. 즉시 재시도하면 이미 과부하인 서버에 부담을 더합니다.

3. Deadline 전파 Interceptor

마이크로서비스 환경에서 서비스 A가 서비스 B를 호출하고, B가 다시 C를 호출하는 체인이 흔합니다. 이때 전체 호출 체인에 일관된 데드라인을 전파하는 것이 중요합니다.

# Python: Deadline 전파 Interceptor
from datetime import timedelta

class DeadlinePropagationInterceptor(
    grpc.UnaryUnaryClientInterceptor
):
    def __init__(self, default_timeout: timedelta):
        self._default_timeout = default_timeout
    
    def intercept_unary_unary(
        self, continuation, client_call_details, request
    ):
        timeout = client_call_details.timeout
        
        # 타임아웃이 설정되지 않았으면 기본값 사용
        if timeout is None:
            timeout = self._default_timeout.total_seconds()
        
        new_details = _ClientCallDetails(
            client_call_details.method,
            timeout,
            client_call_details.metadata,
            client_call_details.credentials,
            client_call_details.wait_for_ready,
            client_call_details.compression,
        )
        
        return continuation(new_details, request)

Deadline 전파가 없으면 호출 체인의 마지막 서비스가 이미 의미 없는 요청을 처리하느라 리소스를 낭비하게 됩니다. 예를 들어 클라이언트가 5초 타임아웃을 걸었는데 서비스 A에서 4초를 썼다면, 서비스 B에는 1초만 남았으므로 그에 맞게 데드라인을 줄여서 전달해야 합니다. gRPC는 이 전파를 context를 통해 자동으로 해 주지만, 기본 타임아웃이 설정되지 않은 호출에 안전망을 씌우는 것이 이 Interceptor의 역할입니다.

Interceptor 체이닝: 순서가 결과를 바꾼다

여러 Interceptor를 함께 쓸 때 등록 순서는 실행 순서에 직접 영향을 줍니다. 잘못된 순서는 예상치 못한 동작을 일으킵니다.

서버 클라이언트 Interceptor 체이닝 권장 순서

권장 서버 Interceptor 순서

서버 측 Interceptor의 권장 등록 순서는 다음과 같습니다.

  • 1순위 - 로깅 Interceptor: 가장 바깥에 배치합니다. 인증 실패, 유효성 검사 실패 등 모든 요청을 빠짐없이 기록합니다. 장애 상황에서 "요청이 아예 안 왔는지" vs "왔는데 인증에서 막혔는지"를 구분하는 데 결정적입니다.
  • 2순위 - 레이트 리밋 Interceptor: 비용이 큰 인증 검증 전에 과도한 요청을 먼저 걸러냅니다.
  • 3순위 - 인증 Interceptor: 신원 확인. 여기서 막히면 이후 처리는 불필요합니다.
  • 4순위 - 에러 변환 Interceptor: 핸들러에 가장 가까운 위치에 둡니다. 핸들러가 던진 도메인 예외를 잡아 gRPC 상태 코드로 변환합니다.

Go에서의 체이닝 구현:

server := grpc.NewServer(
    grpc.ChainUnaryInterceptor(
        LoggingUnaryInterceptor(),           // 1순위
        RateLimitUnaryInterceptor(limiter),   // 2순위
        AuthUnaryInterceptor(verifier),       // 3순위
        ErrorMappingUnaryInterceptor(),       // 4순위
    ),
    grpc.ChainStreamInterceptor(
        LoggingStreamInterceptor(),
        AuthStreamInterceptor(verifier),
    ),
)

Go 1.21+ 기준으로 grpc.ChainUnaryInterceptor가 표준 API입니다. 이전에는 grpc_middleware.ChainUnaryServer(go-grpc-middleware 라이브러리)를 썼는데, 이제 공식 API만으로 충분합니다.

Python에서의 체이닝:

server = grpc.server(
    futures.ThreadPoolExecutor(max_workers=10),
    interceptors=[
        LoggingInterceptor(),
        RateLimitInterceptor(limiter),
        AuthInterceptor(verifier),
        ErrorMappingInterceptor(),
    ],
)

Python도 interceptors 리스트의 순서가 곧 실행 순서입니다. 단, Python의 gRPC Interceptor API가 Go보다 좀 더 번거롭습니다. 핸들러 래핑 시 request_deserializerresponse_serializer를 직접 전달해야 하는 부분이 특히 그렇습니다. 이런 보일러플레이트를 줄이고 싶다면 grpc-interceptor 같은 서드파티 라이브러리가 도움이 됩니다.

클라이언트 Interceptor 순서

클라이언트 측은 서버와 반대로 생각하면 됩니다. 가장 바깥의 Interceptor가 가장 먼저 실행되므로, 메타데이터 주입을 먼저 하고 로깅을 그다음에 두면 로그에 주입된 메타데이터가 포함됩니다.

  • 1순위 - 메타데이터 주입: 인증 토큰, 추적 ID 등 부착
  • 2순위 - Deadline 전파: 타임아웃 설정
  • 3순위 - 재시도: 실패 시 재시도 (위 두 Interceptor가 매번 적용되어야 하므로 안쪽에 배치)
  • 4순위 - 로깅: 최종 호출 결과 기록

실무에서 알아두면 좋은 Interceptor 패턴 5가지

기본 구현을 넘어서, 실무에서 자주 마주치는 상황별 패턴을 정리합니다.

패턴 1: 메서드별 선택 적용

모든 RPC에 동일한 Interceptor를 적용하는 것이 기본이지만, 특정 메서드에만 적용하거나 제외하고 싶을 때가 있습니다. 앞서 인증 Interceptor에서 본 화이트리스트 방식을 일반화하면 됩니다.

class SelectiveInterceptor(ServerInterceptor):
    def __init__(self, inner_interceptor, include=None, exclude=None):
        self._inner = inner_interceptor
        self._include = set(include) if include else None
        self._exclude = set(exclude) if exclude else set()
    
    def intercept_service(self, continuation, handler_call_details):
        method = handler_call_details.method
        
        if self._include is not None and method not in self._include:
            return continuation(handler_call_details)
        if method in self._exclude:
            return continuation(handler_call_details)
        
        return self._inner.intercept_service(
            continuation, handler_call_details
        )

이런 래퍼를 만들어 두면 기존 Interceptor를 수정하지 않고 적용 범위를 조절할 수 있습니다.

패턴 2: 분산 추적 (Distributed Tracing) 연동

OpenTelemetry 같은 분산 추적 시스템과 gRPC를 연동할 때 Interceptor가 핵심입니다. 들어오는 요청에서 추적 컨텍스트를 추출하고, 나가는 요청에 전파하는 역할을 합니다. 직접 구현할 수도 있지만, opentelemetry-instrumentation-grpc (Python) 또는 otelgrpc (Go) 같은 공식 계측 라이브러리가 이 Interceptor를 이미 제공합니다.

# Python: OpenTelemetry gRPC 계측
from opentelemetry.instrumentation.grpc import (
    GrpcInstrumentorServer,
    GrpcInstrumentorClient,
)

# 서버 측 자동 계측
GrpcInstrumentorServer().instrument()

# 클라이언트 측 자동 계측
GrpcInstrumentorClient().instrument()

자동 계측을 쓰면 Interceptor를 직접 등록하지 않아도 됩니다. 다만 커스텀 속성을 추가하거나 특정 메서드를 필터링하려면 수동 Interceptor가 필요합니다.

패턴 3: 요청/응답 유효성 검사

Protocol Buffers는 타입 수준의 검증을 해 주지만, 비즈니스 규칙 수준의 검증은 하지 않습니다. 예를 들어 "이메일 형식이 올바른가", "수량이 0보다 큰가" 같은 검증은 별도로 해야 합니다. protoc-gen-validate(PGV) 또는 후속 프로젝트인 protovalidate를 사용하면 .proto 파일에 검증 규칙을 선언하고, Interceptor에서 자동 검증할 수 있습니다.

// .proto 파일에서 검증 규칙 선언
message CreateUserRequest {
  string email = 1 [(buf.validate.field).string.email = true];
  string name = 2 [(buf.validate.field).string.min_len = 1];
  int32 age = 3 [(buf.validate.field).int32 = {gte: 0, lte: 150}];
}

이렇게 선언하면 Interceptor 한 줄로 모든 RPC 요청의 유효성 검사를 자동화할 수 있습니다.

패턴 4: 메트릭 수집

Prometheus 같은 모니터링 시스템에 RPC 메트릭을 노출하는 Interceptor입니다. 로깅 Interceptor와 비슷하지만, 구조화된 메트릭(히스토그램, 카운터)을 수집한다는 점이 다릅니다.

// Go: Prometheus 메트릭 Interceptor 핵심 로직
func MetricsUnaryInterceptor(
    histogram *prometheus.HistogramVec,
    counter *prometheus.CounterVec,
) grpc.UnaryServerInterceptor {
    return func(
        ctx context.Context,
        req interface{},
        info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler,
    ) (interface{}, error) {
        start := time.Now()
        resp, err := handler(ctx, req)
        
        code := status.Code(err)
        elapsed := time.Since(start).Seconds()
        
        // 메서드별, 상태코드별 히스토그램 기록
        histogram.WithLabelValues(
            info.FullMethod, code.String(),
        ).Observe(elapsed)
        
        // 호출 카운터 증가
        counter.WithLabelValues(
            info.FullMethod, code.String(),
        ).Inc()
        
        return resp, err
    }
}

실무에서는 go-grpc-middlewaregrpc_prometheus 패키지나 py-grpc-prometheus를 쓰는 것이 더 편합니다. 하지만 원리를 알아 두면 커스텀 메트릭을 추가하거나 다른 모니터링 시스템과 연동할 때 유용합니다.

패턴 5: 페이로드 압축 제어

대용량 메시지를 주고받을 때 gzip 압축을 적용하면 네트워크 비용을 줄일 수 있습니다. gRPC는 채널 수준에서 압축을 설정할 수 있지만, 메서드별로 다른 압축 정책을 적용하고 싶을 때 Interceptor가 유용합니다.

// 큰 페이로드 메서드에만 gzip 적용
func CompressionInterceptor(
    largeMethods map[string]bool,
) grpc.UnaryClientInterceptor {
    return func(
        ctx context.Context,
        method string,
        req, reply interface{},
        cc *grpc.ClientConn,
        invoker grpc.UnaryInvoker,
        opts ...grpc.CallOption,
    ) error {
        if largeMethods[method] {
            opts = append(opts,
                grpc.UseCompressor(gzip.Name))
        }
        return invoker(ctx, method, req, reply, cc, opts...)
    }
}

Interceptor 도입 시 주의사항

Interceptor는 강력하지만, 잘못 쓰면 오히려 시스템을 복잡하게 만들 수 있습니다. 실무 경험에서 얻은 주의사항을 정리합니다.

성능 영향을 의식하세요

Interceptor는 모든 RPC 호출에 실행됩니다. Interceptor 안에서 외부 API 호출이나 데이터베이스 조회를 하면 모든 RPC의 지연 시간이 늘어납니다. 특히 인증 Interceptor에서 토큰 검증을 매번 원격 서버에 요청하면 병목이 됩니다. JWT처럼 로컬에서 검증 가능한 방식을 쓰거나, 검증 결과를 TTL 기반으로 캐싱하는 것이 좋습니다.

에러를 삼키지 마세요

Interceptor에서 에러를 catch한 뒤 적절히 다시 던지지 않으면, 원래 에러 정보가 사라집니다. 특히 로깅 Interceptor에서 에러를 기록한 후 반드시 raise하거나 에러를 반환해야 합니다. 에러를 삼키는 Interceptor는 디버깅을 악몽으로 만듭니다.

Stream Interceptor를 빼먹지 마세요

Unary Interceptor만 구현하고 Stream Interceptor를 구현하지 않는 경우가 많습니다. 서비스가 Unary만 쓴다면 문제없지만, 나중에 스트리밍 RPC를 추가했을 때 인증이나 로깅이 빠지는 구멍이 생깁니다. 처음부터 Unary와 Stream 두 종류를 함께 구현하는 습관을 들이세요.

테스트를 잊지 마세요

Interceptor는 독립적으로 테스트하기 어렵지 않습니다. 가짜 핸들러를 만들고, Interceptor를 거쳐 호출한 뒤 기대한 동작(에러 반환, 메타데이터 주입, 로그 기록 등)을 검증하면 됩니다.

# Python: Interceptor 단위 테스트 예시
def test_auth_interceptor_blocks_invalid_token():
    interceptor = AuthInterceptor(FakeTokenVerifier())
    
    # 유효하지 않은 토큰으로 호출
    call_details = FakeCallDetails(
        method='/test.Service/Method',
        metadata=[('authorization', 'Bearer invalid-token')],
    )
    
    handler = interceptor.intercept_service(
        lambda d: FakeHandler(), call_details
    )
    
    context = FakeContext()
    handler.unary_unary(FakeRequest(), context)
    
    assert context.code == grpc.StatusCode.UNAUTHENTICATED

잘 만들어진 오픈소스를 활용하세요

모든 Interceptor를 처음부터 직접 만들 필요는 없습니다. 성숙한 오픈소스 라이브러리를 먼저 검토하세요.

  • Go: grpc-ecosystem/go-grpc-middleware — 로깅, 인증, 재시도, 검증, 메트릭 등 거의 모든 패턴을 제공합니다.
  • Python: grpc-interceptor — Python gRPC의 번거로운 보일러플레이트를 줄여주는 래퍼입니다.
  • Java: grpc-java 자체에 풍부한 Interceptor API가 내장되어 있으며, grpc-spring-boot-starter가 Spring과의 통합을 제공합니다.

직접 구현이 필요한 것은 보통 비즈니스 로직에 밀접한 인증, 에러 매핑, 커스텀 메트릭 정도입니다. 로깅이나 추적 같은 범용 기능은 검증된 라이브러리를 쓰는 것이 안전합니다.

마무리: Interceptor는 gRPC 서비스의 뼈대입니다

Interceptor를 한 줄로 정리하면 "모든 RPC 호출에 공통으로 적용되는 로직을 한 곳에서 관리하는 메커니즘"입니다. HTTP 프레임워크의 미들웨어와 같은 역할이지만, gRPC의 Unary/Stream 호출 모델과 Protocol Buffers 기반 직렬화에 맞춰 설계되어 있습니다.

처음에는 인증과 로깅, 이 두 가지 Interceptor부터 시작하세요. 이것만으로도 코드 중복이 극적으로 줄고, 운영 가시성이 크게 향상됩니다. 서비스가 성장하면 에러 매핑, 메트릭, 재시도 같은 Interceptor를 하나씩 추가하면 됩니다.

gRPC를 쓰면서 매 핸들러마다 반복 코드에 지쳤다면, 지금이 Interceptor를 도입할 때입니다. 체이닝 순서만 잘 잡으면 서비스의 공통 관심사가 깔끔하게 분리되어, 핸들러는 비즈니스 로직에만 집중하는 건강한 구조를 만들 수 있습니다.

이미지는 Leonardo AI 로 생성되었습니다.

이미지는 Claude AI 로 생성되었습니다.

답글 남기기

Your email address will not be published. Required fields are marked *.

Warning: Undefined array key "cookies" in /var/www/html/wp-content/themes/personal-cv-resume/class/class-post-related.php on line 212
*
*

최신 댓글