AICosmus

Where tech meets the everyday — AI, fintech, swimming, and cars.
gRPC 스트리밍 4가지 패턴 개념 일러스트

gRPC 스트리밍 패턴 4가지 실전 구현 가이드

마이크로서비스 아키텍처가 보편화되면서, 서비스 간 통신의 효율성과 실시간성이 그 어느 때보다 중요해졌습니다. REST API의 요청-응답 모델로는 실시간 데이터 스트림, 대용량 파일 업로드, 양방향 채팅 같은 시나리오를 깔끔하게 처리하기 어렵죠. 바로 이 지점에서 gRPC의 스트리밍 패턴이 빛을 발합니다.

gRPC는 HTTP/2를 기반으로 동작하기 때문에, 하나의 연결 위에서 여러 메시지를 양방향으로 주고받을 수 있는 스트리밍을 네이티브로 지원합니다. 이 글에서는 gRPC가 제공하는 4가지 통신 패턴 — Unary, Server Streaming, Client Streaming, Bidirectional Streaming — 을 하나씩 깊이 파고들면서, 각 패턴이 어떤 상황에 적합한지, 실제 .proto 파일과 서버·클라이언트 코드는 어떻게 작성하는지 꼼꼼히 살펴보겠습니다.

특히 2026년 현재, AI 추론 결과의 실시간 스트리밍이나 IoT 센서 데이터 수집처럼 스트리밍이 핵심인 서비스가 급증하고 있어, gRPC 스트리밍 패턴을 제대로 이해하고 활용하는 것이 개발자에게 필수 역량이 되어가고 있습니다.

gRPC 4가지 통신 패턴 비교 다이어그램

gRPC 통신 패턴 4가지 한눈에 보기

gRPC에서 서비스의 메서드를 정의할 때, 요청(request)과 응답(response) 각각에 stream 키워드를 붙이느냐 마느냐에 따라 총 4가지 통신 패턴이 만들어집니다. 먼저 전체 그림을 간단히 정리해 보겠습니다.

첫 번째는 Unary RPC입니다. 클라이언트가 하나의 요청을 보내고 서버가 하나의 응답을 돌려주는, 가장 익숙한 함수 호출 스타일입니다. REST API의 일반적인 요청-응답과 개념적으로 동일합니다.

두 번째는 Server Streaming RPC입니다. 클라이언트가 하나의 요청을 보내면, 서버가 여러 개의 응답을 스트림으로 연속해서 보내줍니다. 실시간 주식 시세 조회나 검색 결과를 점진적으로 전달하는 시나리오에 적합합니다.

세 번째는 Client Streaming RPC입니다. 이번에는 반대로, 클라이언트가 여러 개의 메시지를 스트림으로 보내고 서버가 모든 데이터를 받은 뒤 하나의 응답을 돌려줍니다. 대용량 파일 업로드나 센서 데이터 일괄 전송에 어울립니다.

네 번째는 Bidirectional Streaming RPC입니다. 클라이언트와 서버가 동시에, 독립적으로 메시지 스트림을 주고받습니다. 실시간 채팅, 게임 서버, AI 모델과의 대화형 추론처럼 양쪽 모두가 능동적으로 데이터를 보내야 하는 상황에서 강력합니다.

이 4가지 패턴은 .proto 파일에서 아래와 같이 단순한 문법으로 선언됩니다.

service ExampleService {
  // 1. Unary
  rpc GetUser (UserRequest) returns (UserResponse);

  // 2. Server Streaming
  rpc ListUsers (ListRequest) returns (stream UserResponse);

  // 3. Client Streaming
  rpc UploadLogs (stream LogEntry) returns (UploadSummary);

  // 4. Bidirectional Streaming
  rpc Chat (stream ChatMessage) returns (stream ChatMessage);
}

stream 키워드의 위치 하나로 통신 모델이 완전히 달라진다는 것이 gRPC 서비스 정의의 우아한 점입니다. 이제 각 패턴을 하나씩 깊이 살펴보겠습니다.

Unary RPC — 가장 기본적이지만 가장 많이 쓰이는 패턴

개념과 동작 원리

Unary RPC는 gRPC의 가장 단순한 형태입니다. 클라이언트가 하나의 요청 메시지를 서버에 보내고, 서버가 처리 후 하나의 응답 메시지를 돌려줍니다. 일반적인 HTTP REST API의 GET이나 POST 요청과 동작 방식이 비슷해서, gRPC를 처음 접하는 개발자도 자연스럽게 이해할 수 있습니다.

하지만 내부적으로는 REST와 상당히 다릅니다. gRPC의 Unary 호출도 HTTP/2 프레임 위에서 동작하기 때문에, 같은 TCP 연결을 여러 Unary 호출이 다중화(multiplexing)하여 공유할 수 있습니다. 또한 Protocol Buffers로 직렬화된 바이너리 페이로드는 JSON에 비해 크기가 작고 파싱 속도가 빠릅니다.

.proto 정의와 코드 예시

사용자 정보를 조회하는 간단한 Unary RPC를 정의해 보겠습니다.

syntax = "proto3";

package user;

service UserService {
  rpc GetUser (GetUserRequest) returns (GetUserResponse);
}

message GetUserRequest {
  string user_id = 1;
}

message GetUserResponse {
  string user_id = 1;
  string name = 2;
  string email = 3;
  int64 created_at = 4;
}

Python으로 서버를 구현하면 이렇게 됩니다.

import grpc
from concurrent import futures
import user_pb2
import user_pb2_grpc

class UserServiceServicer(user_pb2_grpc.UserServiceServicer):
    def GetUser(self, request, context):
        # 실제로는 DB 조회 등의 로직
        return user_pb2.GetUserResponse(
            user_id=request.user_id,
            name="홍길동",
            email="[email protected]",
            created_at=1718600000
        )

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    user_pb2_grpc.add_UserServiceServicer_to_server(
        UserServiceServicer(), server
    )
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()

if __name__ == '__main__':
    serve()

클라이언트 측은 더 간단합니다.

import grpc
import user_pb2
import user_pb2_grpc

def run():
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = user_pb2_grpc.UserServiceStub(channel)
        response = stub.GetUser(
            user_pb2.GetUserRequest(user_id="user-123")
        )
        print(f"이름: {response.name}, 이메일: {response.email}")

if __name__ == '__main__':
    run()

Unary RPC를 선택해야 하는 경우

Unary RPC는 다음과 같은 상황에서 최적의 선택입니다.

  • CRUD 작업: 사용자 조회, 주문 생성, 설정 업데이트 등 단건 처리가 자연스러운 작업
  • 인증/인가: 로그인 토큰 발급, 권한 검증 등 한 번의 요청으로 결과가 결정되는 작업
  • 계산 요청: 특정 입력에 대해 서버가 한 번 계산하고 결과를 돌려주는 작업

전체 gRPC 서비스의 70~80%는 Unary RPC로 충분히 커버됩니다. 스트리밍이 필요하지 않은 곳에 스트리밍을 억지로 적용하면 오히려 코드 복잡도만 높아지니, Unary로 충분한 경우에는 Unary를 사용하는 것이 바람직합니다.

Server Streaming RPC — 서버가 지속적으로 데이터를 밀어주는 패턴

개념과 동작 원리

Server Streaming RPC에서 클라이언트는 하나의 요청을 보내지만, 서버는 여러 개의 응답을 스트림으로 차례차례 보내줍니다. 클라이언트는 서버가 스트림을 닫을 때까지, 또는 원하는 만큼 데이터를 계속 수신할 수 있습니다.

Server Streaming RPC 시퀀스 다이어그램

이 패턴은 HTTP/2의 핵심 능력인 서버 푸시를 활용합니다. 하나의 HTTP/2 스트림 위에서 서버가 여러 DATA 프레임을 연속으로 보내는 방식으로, 각 응답 메시지가 개별 프레임으로 전달됩니다. 클라이언트는 별도의 폴링이나 재연결 없이 데이터가 도착하는 대로 처리하면 됩니다.

실전 예시: 실시간 주식 시세 스트리밍

실시간 시세 데이터를 서버에서 푸시하는 서비스를 구현해 보겠습니다.

syntax = "proto3";

package stock;

service StockService {
  // 한 종목을 구독하면 실시간 시세를 스트림으로 전송
  rpc SubscribePrice (SubscribeRequest) returns (stream PriceUpdate);
}

message SubscribeRequest {
  string symbol = 1;  // 예: "AAPL", "005930"
}

message PriceUpdate {
  string symbol = 1;
  double price = 2;
  double change_percent = 3;
  int64 timestamp = 4;
}

서버 구현에서 핵심은 yield를 사용하여 응답 메시지를 하나씩 스트림으로 전달하는 것입니다.

import grpc
import time
import random
from concurrent import futures
import stock_pb2
import stock_pb2_grpc

class StockServiceServicer(stock_pb2_grpc.StockServiceServicer):
    def SubscribePrice(self, request, context):
        symbol = request.symbol
        base_price = 150.0  # 시뮬레이션용 기준가

        while context.is_active():  # 클라이언트가 연결을 유지하는 동안
            fluctuation = random.uniform(-2.0, 2.0)
            current_price = base_price + fluctuation
            change_pct = (fluctuation / base_price) * 100

            yield stock_pb2.PriceUpdate(
                symbol=symbol,
                price=round(current_price, 2),
                change_percent=round(change_pct, 4),
                timestamp=int(time.time())
            )
            time.sleep(1)  # 1초 간격으로 시세 전송

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    stock_pb2_grpc.add_StockServiceServicer_to_server(
        StockServiceServicer(), server
    )
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()

클라이언트는 서버가 보내는 스트림을 이터레이터처럼 순회하며 처리합니다.

import grpc
import stock_pb2
import stock_pb2_grpc

def run():
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = stock_pb2_grpc.StockServiceStub(channel)

        # 서버 스트림을 수신
        responses = stub.SubscribePrice(
            stock_pb2.SubscribeRequest(symbol="005930")
        )

        for update in responses:
            print(f"[{update.symbol}] 가격: {update.price:,.2f}원 "
                  f"({update.change_percent:+.2f}%)")

if __name__ == '__main__':
    run()

Server Streaming의 핵심 포인트

Server Streaming을 실무에서 사용할 때 반드시 신경 써야 할 몇 가지 포인트가 있습니다.

  • context.is_active() 체크: 클라이언트가 연결을 끊었는데도 서버가 데이터를 계속 생성하면 리소스 낭비입니다. 반복문 안에서 항상 컨텍스트 활성 상태를 확인하세요.
  • 백프레셔(Backpressure) 관리: 서버가 클라이언트의 처리 속도보다 빠르게 데이터를 밀어넣으면 문제가 생깁니다. gRPC는 HTTP/2의 흐름 제어(flow control)를 통해 기본적인 백프레셔를 제공하지만, 서버 측에서도 적절한 전송 간격을 설정하는 것이 좋습니다.
  • Deadline 설정: 무한히 스트림이 열려 있으면 리소스 누수의 원인이 됩니다. 클라이언트 측에서 적절한 deadline을 설정하거나, 서버에서 일정 시간 후 스트림을 종료하는 로직을 넣어주세요.
  • 에러 처리: 스트림 중간에 네트워크 오류가 발생할 수 있습니다. 클라이언트는 재연결 로직을 갖추고, 서버는 스트림 중단 시 리소스를 정리하는 정리(cleanup) 코드를 작성해야 합니다.

Server Streaming이 빛나는 시나리오

Server Streaming은 다음과 같은 시나리오에서 특히 강력합니다.

  • 실시간 피드: 주식 시세, 뉴스 피드, 스포츠 경기 실시간 중계
  • 대용량 데이터 조회: 수만 건의 검색 결과를 페이지 단위가 아니라 점진적으로 전달
  • AI 추론 스트리밍: LLM이 토큰을 생성하는 대로 클라이언트에 전달 (2026년 현재 가장 뜨거운 활용 사례 중 하나)
  • 로그/이벤트 테일링: 서버 로그를 실시간으로 모니터링하는 대시보드

특히 AI 추론 스트리밍에서 Server Streaming은 핵심적인 역할을 합니다. 사용자가 질문을 하나 보내면(Unary 요청), 모델이 답변을 생성하면서 토큰 단위로 스트리밍해 주는 구조가 바로 Server Streaming입니다. 여러분이 ChatGPT나 Claude에서 경험하는 “글자가 타이핑되듯 나타나는” 효과가 기술적으로는 이 패턴과 같은 원리입니다.

Client Streaming RPC — 클라이언트가 데이터를 쏟아붓는 패턴

개념과 동작 원리

Client Streaming RPC는 Server Streaming의 거울상입니다. 클라이언트가 여러 개의 메시지를 스트림으로 보내고, 서버는 이 스트림이 완료된 후(또는 중간 중간) 하나의 응답을 돌려줍니다.

이 패턴은 “많은 데이터를 모아서 한꺼번에 처리한 뒤 결과를 알려주는” 시나리오에 적합합니다. 클라이언트가 모든 데이터를 한 번의 거대한 메시지로 보내는 것보다, 작은 단위로 나누어 스트리밍하는 것이 메모리 효율성과 네트워크 활용 면에서 유리합니다.

실전 예시: IoT 센서 데이터 수집 서비스

여름철 서버실 온도 모니터링 시스템을 예로 들어보겠습니다. 여러 센서가 온도·습도 데이터를 지속적으로 서버에 전송하고, 서버는 일정 기간 데이터를 수집한 뒤 통계 요약을 반환합니다.

syntax = "proto3";

package sensor;

service SensorService {
  // 센서 데이터를 스트림으로 수집 → 요약 통계 반환
  rpc CollectReadings (stream SensorReading) returns (CollectionSummary);
}

message SensorReading {
  string sensor_id = 1;
  double temperature = 2;
  double humidity = 3;
  int64 timestamp = 4;
}

message CollectionSummary {
  int32 total_readings = 1;
  double avg_temperature = 2;
  double max_temperature = 3;
  double min_temperature = 4;
  double avg_humidity = 5;
  bool overheat_detected = 6;
}

서버 구현에서 핵심은, 클라이언트로부터 스트림을 수신하면서 데이터를 누적하고, 스트림이 끝나면 집계 결과를 반환하는 것입니다.

class SensorServiceServicer(sensor_pb2_grpc.SensorServiceServicer):
    def CollectReadings(self, request_iterator, context):
        readings = []
        overheat = False

        for reading in request_iterator:
            readings.append(reading)
            if reading.temperature > 40.0:  # 40도 초과 시 과열 경고
                overheat = True

        if not readings:
            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
            context.set_details("수신된 센서 데이터가 없습니다.")
            return sensor_pb2.CollectionSummary()

        temps = [r.temperature for r in readings]
        humids = [r.humidity for r in readings]

        return sensor_pb2.CollectionSummary(
            total_readings=len(readings),
            avg_temperature=sum(temps) / len(temps),
            max_temperature=max(temps),
            min_temperature=min(temps),
            avg_humidity=sum(humids) / len(humids),
            overheat_detected=overheat
        )

클라이언트는 제너레이터 함수를 만들어 스트림으로 전송합니다.

import grpc
import time
import random
import sensor_pb2
import sensor_pb2_grpc

def generate_readings(sensor_id, count=100):
    """센서 데이터를 생성하는 제너레이터"""
    for _ in range(count):
        yield sensor_pb2.SensorReading(
            sensor_id=sensor_id,
            temperature=random.uniform(20.0, 45.0),
            humidity=random.uniform(30.0, 80.0),
            timestamp=int(time.time())
        )
        time.sleep(0.1)  # 100ms 간격으로 전송

def run():
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = sensor_pb2_grpc.SensorServiceStub(channel)

        # 제너레이터를 직접 전달
        summary = stub.CollectReadings(
            generate_readings("sensor-rack-A1", count=50)
        )

        print(f"수집 건수: {summary.total_readings}")
        print(f"평균 온도: {summary.avg_temperature:.1f}°C")
        print(f"최고 온도: {summary.max_temperature:.1f}°C")
        print(f"과열 감지: {'예' if summary.overheat_detected else '아니오'}")

Client Streaming 설계 시 주의사항

  • 메모리 관리: 위 예시처럼 모든 수신 데이터를 리스트에 쌓으면 메모리 문제가 생길 수 있습니다. 대용량 데이터를 다룰 때는 온라인 알고리즘(예: 이동 평균)이나 배치 단위 DB 저장을 적용하세요.
  • 중간 응답 불가: Client Streaming에서 서버는 스트림이 완료된 후에만 응답을 보낼 수 있습니다. 중간 피드백이 필요하면 Bidirectional Streaming을 고려하세요.
  • 타임아웃 설정: 클라이언트 스트림이 예상보다 오래 걸리면 서버 리소스가 묶입니다. 서버 측에서 최대 수신 시간이나 최대 메시지 수 제한을 두는 것이 좋습니다.

Client Streaming이 빛나는 시나리오

  • 파일 업로드: 대용량 파일을 청크 단위로 분할하여 스트리밍 전송. 전체 파일을 메모리에 올릴 필요 없음
  • 배치 데이터 수집: IoT 센서, 모바일 앱의 이벤트 로그, 텔레메트리 데이터 일괄 전송
  • 음성/영상 데이터 전송: 음성 인식 서비스에 오디오 스트림을 실시간으로 전달
  • 데이터 마이그레이션: 대량의 레코드를 소스 시스템에서 읽으며 스트림으로 대상 시스템에 전달

Bidirectional Streaming RPC — 양방향 실시간 통신의 꽃

양방향 스트리밍 채팅 서비스 아키텍처

개념과 동작 원리

Bidirectional Streaming은 gRPC 스트리밍의 최종 형태입니다. 클라이언트와 서버가 독립적으로, 동시에 메시지 스트림을 주고받습니다. 양쪽의 스트림은 완전히 독립적이어서, 서버가 클라이언트의 메시지를 기다리지 않고 먼저 보낼 수도 있고, 클라이언트의 각 메시지에 즉시 응답할 수도 있습니다.

HTTP/2의 양방향 스트리밍 능력 덕분에 이 모든 것이 하나의 TCP 연결, 하나의 HTTP/2 스트림 위에서 이루어집니다. WebSocket과 비교하면, gRPC Bidirectional Streaming은 Protocol Buffers 기반의 강타입 메시지, 자동 코드 생성, 내장 흐름 제어, 메타데이터 전파 등의 장점을 추가로 제공합니다.

실전 예시: 실시간 채팅 서비스

Bidirectional Streaming의 가장 직관적인 예시로, 간단한 실시간 채팅 서비스를 구현해 보겠습니다.

syntax = "proto3";

package chat;

service ChatService {
  // 양방향 스트리밍 채팅
  rpc JoinChat (stream ChatMessage) returns (stream ChatMessage);
}

message ChatMessage {
  string sender = 1;
  string content = 2;
  string room_id = 3;
  int64 timestamp = 4;
  MessageType type = 5;
}

enum MessageType {
  TEXT = 0;
  JOIN = 1;
  LEAVE = 2;
  SYSTEM = 3;
}

서버는 여러 클라이언트의 스트림을 관리하면서 메시지를 브로드캐스트해야 합니다. 아래는 스레드 안전한 채팅 서버의 구현 예시입니다.

import grpc
import threading
import time
from concurrent import futures
import chat_pb2
import chat_pb2_grpc


class ChatRoom:
    """채팅방: 연결된 클라이언트들에게 메시지를 브로드캐스트"""
    def __init__(self):
        self._clients = {}  # sender -> queue
        self._lock = threading.Lock()

    def join(self, sender):
        import queue
        q = queue.Queue(maxsize=100)
        with self._lock:
            self._clients[sender] = q
        return q

    def leave(self, sender):
        with self._lock:
            self._clients.pop(sender, None)

    def broadcast(self, message, exclude_sender=None):
        with self._lock:
            for sender, q in self._clients.items():
                if sender != exclude_sender:
                    try:
                        q.put_nowait(message)
                    except Exception:
                        pass  # 큐가 가득 찬 클라이언트는 건너뜀


# 전역 채팅방 (실제로는 room_id별 관리)
chat_rooms = {}
rooms_lock = threading.Lock()


def get_or_create_room(room_id):
    with rooms_lock:
        if room_id not in chat_rooms:
            chat_rooms[room_id] = ChatRoom()
        return chat_rooms[room_id]


class ChatServiceServicer(chat_pb2_grpc.ChatServiceServicer):
    def JoinChat(self, request_iterator, context):
        sender = None
        room = None
        msg_queue = None

        def read_incoming():
            """클라이언트 메시지를 읽는 스레드"""
            nonlocal sender, room, msg_queue
            try:
                for msg in request_iterator:
                    if sender is None:
                        sender = msg.sender
                        room = get_or_create_room(msg.room_id)
                        msg_queue = room.join(sender)

                        # 입장 알림
                        join_msg = chat_pb2.ChatMessage(
                            sender="SYSTEM",
                            content=f"{sender}님이 입장했습니다.",
                            room_id=msg.room_id,
                            timestamp=int(time.time()),
                            type=chat_pb2.SYSTEM
                        )
                        room.broadcast(join_msg)

                    # 일반 메시지 브로드캐스트
                    room.broadcast(msg, exclude_sender=sender)
            except Exception:
                pass
            finally:
                if room and sender:
                    leave_msg = chat_pb2.ChatMessage(
                        sender="SYSTEM",
                        content=f"{sender}님이 퇴장했습니다.",
                        room_id="",
                        timestamp=int(time.time()),
                        type=chat_pb2.SYSTEM
                    )
                    room.broadcast(leave_msg)
                    room.leave(sender)

        # 수신 스레드 시작
        reader_thread = threading.Thread(target=read_incoming, daemon=True)
        reader_thread.start()

        # 초기화 대기
        while msg_queue is None and context.is_active():
            time.sleep(0.05)

        if msg_queue is None:
            return

        # 큐에서 메시지를 꺼내 클라이언트로 전송
        while context.is_active():
            try:
                message = msg_queue.get(timeout=1.0)
                yield message
            except Exception:
                continue

이 예시에서 핵심적인 부분은 수신(reading)과 송신(writing)이 별도의 스레드에서 독립적으로 동작한다는 것입니다. 클라이언트로부터 메시지를 받는 작업과 다른 클라이언트의 메시지를 전달하는 작업이 동시에 일어나므로, 실시간 채팅의 반응성이 보장됩니다.

Python asyncio 기반 Bidirectional Streaming

위의 스레드 기반 구현도 동작하지만, Python의 grpcio-async 라이브러리를 사용하면 asyncio 기반으로 더 깔끔하게 작성할 수 있습니다.

import grpc.aio
import asyncio
import chat_pb2
import chat_pb2_grpc


class AsyncChatServiceServicer(chat_pb2_grpc.ChatServiceServicer):
    def __init__(self):
        self._rooms = {}  # room_id -> set of asyncio.Queue

    async def JoinChat(self, request_iterator, context):
        my_queue = asyncio.Queue(maxsize=100)
        room_id = None
        sender = None

        async def reader():
            nonlocal room_id, sender
            async for msg in request_iterator:
                if room_id is None:
                    room_id = msg.room_id
                    sender = msg.sender
                    self._rooms.setdefault(room_id, set()).add(my_queue)

                # 다른 참가자에게 브로드캐스트
                for q in self._rooms.get(room_id, set()):
                    if q is not my_queue:
                        await q.put(msg)

        read_task = asyncio.create_task(reader())

        try:
            while not context.cancelled():
                try:
                    message = await asyncio.wait_for(
                        my_queue.get(), timeout=1.0
                    )
                    yield message
                except asyncio.TimeoutError:
                    continue
        finally:
            read_task.cancel()
            if room_id and my_queue in self._rooms.get(room_id, set()):
                self._rooms[room_id].discard(my_queue)


async def serve():
    server = grpc.aio.server()
    chat_pb2_grpc.add_ChatServiceServicer_to_server(
        AsyncChatServiceServicer(), server
    )
    server.add_insecure_port('[::]:50051')
    await server.start()
    await server.wait_for_termination()

if __name__ == '__main__':
    asyncio.run(serve())

asyncio 기반 구현은 스레드 전환 오버헤드가 없어 대량의 동시 접속을 더 효율적으로 처리합니다. 특히 Python에서 GIL(Global Interpreter Lock)의 영향을 덜 받기 때문에, I/O 바운드 작업이 주를 이루는 채팅 서비스에 더 적합합니다.

Bidirectional Streaming이 빛나는 시나리오

  • 실시간 채팅/메시징: 양방향으로 메시지를 주고받아야 하는 전형적인 사례
  • 협업 도구: Google Docs 같은 실시간 공동 편집, 화이트보드 공유
  • 게임 서버: 플레이어 입력 전송과 게임 상태 업데이트의 동시 스트리밍
  • 대화형 AI: 사용자가 말하는 도중에 AI가 실시간으로 텍스트를 분석하며 응답을 준비하는 인터리브드 패턴
  • 양방향 데이터 동기화: 모바일 앱의 오프라인 데이터를 서버와 동기화하면서 서버의 변경 사항도 동시에 수신
gRPC 4가지 패턴 특성 비교 인포그래픽

4가지 패턴 비교와 선택 가이드

패턴별 특성 정리

지금까지 살펴본 4가지 패턴의 핵심 특성을 정리해 보겠습니다.

Unary RPC는 구현이 가장 간단하고, 디버깅이 쉽고, 대부분의 도구가 기본 지원합니다. 단건 CRUD, 인증, 간단한 조회에 사용합니다. 단, 대용량 데이터를 한 번에 보내야 하면 메모리 부담이 생기고, 실시간 업데이트에는 폴링이 필요합니다.

Server Streaming은 서버가 능동적으로 데이터를 밀어줍니다. 클라이언트의 폴링이 불필요하고, 대량 결과를 점진적으로 전달할 수 있습니다. 실시간 피드, AI 추론 스트리밍, 대량 검색 결과 전송에 적합합니다. 다만, 클라이언트에서 서버로의 중간 메시지 전송은 불가합니다.

Client Streaming은 클라이언트가 데이터를 분할 전송하므로 메모리 효율적입니다. 파일 업로드, 센서 데이터 수집, 배치 처리에 강합니다. 하지만 서버의 중간 피드백이 제한적입니다.

Bidirectional Streaming은 가장 유연합니다. 양쪽이 독립적으로 비동기 통신하며, 실시간 상호작용에 최적화되어 있습니다. 채팅, 게임, 협업 도구에 적합하지만, 구현 복잡도가 높고 디버깅이 어렵습니다.

현실적인 선택 흐름도

패턴을 선택할 때 아래와 같은 질문 흐름을 따라가면 도움이 됩니다.

먼저, “클라이언트가 보내는 데이터가 여러 개인가?”를 확인합니다. 아니라면 요청은 단건입니다. 그 다음 “서버의 응답이 여러 개인가?”를 확인합니다. 응답도 단건이면 Unary, 응답이 여러 개면 Server Streaming입니다.

클라이언트 데이터가 여러 개라면, “서버도 여러 응답을 보내야 하는가?”를 확인합니다. 서버 응답이 하나면 Client Streaming, 서버도 여러 응답을 보내야 하면 Bidirectional Streaming입니다.

추가로, 가장 단순한 패턴부터 시작하세요. Unary로 충분한 곳에 스트리밍을 적용하면 코드가 불필요하게 복잡해집니다. 성능 병목이나 실시간성 요구가 명확할 때 스트리밍으로 전환해도 늦지 않습니다. .proto 파일에 stream 키워드 하나만 추가하면 되니까요.

스트리밍 운영 시 실전 팁

1. Deadline과 Timeout을 반드시 설정하세요

스트리밍 RPC는 연결이 오래 유지되므로, timeout 없이 방치하면 서버 리소스가 고갈될 수 있습니다. 클라이언트에서는 deadline을 설정하고, 서버에서는 최대 스트리밍 시간이나 최대 메시지 수 제한을 두세요.

# 클라이언트: 60초 후 자동 종료
responses = stub.SubscribePrice(
    stock_pb2.SubscribeRequest(symbol="005930"),
    timeout=60  # 초 단위 deadline
)

2. 연결 끊김에 대비한 재연결 로직

네트워크가 불안정한 환경(모바일, Wi-Fi)에서는 스트리밍 연결이 자주 끊어질 수 있습니다. 지수 백오프(exponential backoff) 기반의 재연결 로직을 클라이언트에 반드시 구현하세요.

import time
import grpc

def subscribe_with_retry(stub, request, max_retries=5):
    retry_count = 0
    base_delay = 1.0

    while retry_count < max_retries:
        try:
            responses = stub.SubscribePrice(request)
            for update in responses:
                retry_count = 0  # 성공 시 카운터 초기화
                process_update(update)
        except grpc.RpcError as e:
            if e.code() == grpc.StatusCode.UNAVAILABLE:
                delay = base_delay * (2 ** retry_count)
                print(f"연결 끊김. {delay}초 후 재시도...")
                time.sleep(delay)
                retry_count += 1
            else:
                raise  # 복구 불가능한 에러는 상위로 전파

3. 메시지 크기 제한 조정

gRPC의 기본 메시지 크기 제한은 4MB입니다. 스트리밍에서 각 개별 메시지가 이 크기를 넘으면 에러가 발생합니다. 필요시 채널 옵션으로 조정할 수 있지만, 가능하면 메시지를 작게 유지하는 것이 스트리밍의 철학에 더 부합합니다.

# 메시지 크기 제한 상향 (필요시)
channel = grpc.insecure_channel(
    'localhost:50051',
    options=[
        ('grpc.max_receive_message_length', 16 * 1024 * 1024),  # 16MB
        ('grpc.max_send_message_length', 16 * 1024 * 1024),
    ]
)

4. Keepalive 설정으로 좀비 연결 방지

장시간 스트리밍에서 중간에 네트워크 장비(로드밸런서, 프록시)가 유휴 연결을 끊어버리는 경우가 있습니다. gRPC의 keepalive 설정으로 주기적인 핑을 보내 연결을 유지하세요.

# 서버 keepalive 설정
server = grpc.server(
    futures.ThreadPoolExecutor(max_workers=10),
    options=[
        ('grpc.keepalive_time_ms', 30000),           # 30초마다 ping
        ('grpc.keepalive_timeout_ms', 10000),         # 10초 내 응답 없으면 끊기
        ('grpc.keepalive_permit_without_calls', True), # 활성 RPC 없어도 ping
        ('grpc.http2.max_pings_without_data', 0),     # 제한 없음
    ]
)

5. 모니터링과 메트릭 수집

스트리밍 RPC는 Unary에 비해 모니터링이 까다롭습니다. 다음 메트릭을 추적하는 것이 중요합니다.

  • 활성 스트림 수: 현재 열려 있는 스트리밍 연결의 개수
  • 스트림 지속 시간: 각 스트림이 얼마나 오래 열려 있는지
  • 메시지 처리율: 초당 수신/발신 메시지 수
  • 에러율: 비정상 종료된 스트림의 비율
  • 백프레셔 이벤트: 흐름 제어로 전송이 지연된 횟수

Prometheus와 gRPC의 메트릭 인터셉터를 결합하면 이런 메트릭을 자동으로 수집할 수 있습니다. 앞서 다뤘던 Interceptor 패턴과 스트리밍을 함께 활용하면, 모든 스트리밍 RPC에 대해 일관된 메트릭 수집과 로깅을 적용할 수 있습니다.

마치며 — 적재적소의 스트리밍이 핵심입니다

gRPC의 4가지 통신 패턴 — Unary, Server Streaming, Client Streaming, Bidirectional Streaming — 은 각각 고유한 강점과 적합한 시나리오를 갖고 있습니다. HTTP/2의 다중화와 양방향 스트리밍 능력 위에 Protocol Buffers의 효율적인 직렬화를 결합한 gRPC 스트리밍은, REST + WebSocket 조합으로는 달성하기 어려운 수준의 성능과 타입 안전성을 제공합니다.

2026년 여름, AI 서비스의 실시간 스트리밍 수요가 폭발적으로 늘어나면서, gRPC 스트리밍은 더 이상 선택이 아닌 필수가 되어가고 있습니다. 하지만 가장 중요한 원칙은 변하지 않습니다. 가장 단순한 패턴부터 시작하고, 실제 요구사항이 스트리밍을 필요로 할 때 전환하라는 것입니다.

오늘 살펴본 코드 예시들을 직접 실행해 보면서 각 패턴의 차이를 체감해 보세요. .proto 파일에 stream 키워드 하나가 만들어내는 강력한 차이를 직접 경험하는 것이 최고의 학습법입니다. 다음에는 이 스트리밍 패턴 위에 에러 처리, 흐름 제어, 그리고 gRPC-Web을 통한 브라우저 통합까지 확장하는 방법을 함께 알아보겠습니다.

이미지는 Leonardo AI 로 생성되었습니다.

이미지는 Claude AI 로 생성되었습니다.

답글 남기기

Your email address will not be published. Required fields are marked *.

Warning: Undefined array key "cookies" in /var/www/html/wp-content/themes/personal-cv-resume/class/class-post-related.php on line 212
*
*

최신 댓글