본문 바로가기
Lab/Mini-Broker

[Mini-Broker] 01. Kafka의 심장부를 직접 만들어보자

by junseokoo 2026. 5. 10.

Intro

Strimzi로 3-broker KRaft Kafka 클러스터를 운영하고 있다.

Kafka를 사용하다가 어느 날 문득 이런 생각이 들었다.

 

 

producer.send(record) 하면 메시지가 디스크에 어떻게 저장되는 걸까?
consumer.poll() 은 어떻게 그 메시지를 정확히 찾아오는 걸까?



설명할 수 없었다. 운영은 하고 있었지만, 내부는 블랙박스였다.

그래서 직접 만들어보기로 했다. Java로, 표준 라이브러리만 써서, Kafka-like 단일 노드 브로커를.

꼭 명심해야 할 점이 있다.
이 프로젝트의 목표는 성능이 아니다. "왜 Kafka가 이렇게 설계됐는지"를 체감하는 것이다.
그래서 Netty, Protobuf, Spring Boot 같은 라이브러리는 의도적으로 쓰지 않는다. 그것들이 대신 해주는 부분이 바로 이 프로젝트에서 직접 구현할 내용이기 때문이다.


목표

메시지를 파일에 이어서 쓰고, 처음부터 읽어서 순서대로 복원한다.

Kafka 브로커 디렉토리를 실제로 열어보면 이런 파일들이 있다:

/kafka-logs/orders-0/
├── 00000000000000000000.log    ← 메시지가 바이트로 쌓이는 파일
├── 00000000000000000000.index  ← 인덱스 (Stage 2에서 구현)

1편에서는 저 .log 파일을 직접 구현하는 단계다.
단순해 보이지만, 여기서 가장 중요한 결정이 나온다: 바이너리 포맷 설계.


바이너리 포맷 설계

코드보다 설계가 먼저다.

메시지 하나에 뭐가 들어가나

Kafka ConsumerRecord를 떠올려보면:

record.topic()      // 토픽명
record.key()        // 키
record.value()      // 실제 데이터
record.offset()     // 순번
record.timestamp()  // 시각

topic은 파일명으로 대체 가능하다.
토픽 하나 = 파일 하나 구조라면 파일명이 토픽명이 되기 때문에 파일 안에 따로 저장할 필요가 없다.

나머지 4개(offset, timestamp, key, value)가 파일에 저장될 필드다.

가변 길이 문제

offsettimestamp는 고정 크기다 (long = 8바이트). 문제는 keyvalue — 메시지마다 길이가 다르다.

파일에 그냥 이어서 쓰면:

[offset][timestamp][user-1{}][offset][timestamp][customer-999{...긴JSON}]

읽을 때 user-1이 어디서 끝나고 {}가 어디서 시작하는지 알 수 없다.

구분자(| 같은)를 쓰면 어떨까? 바이너리 데이터 안에는 어떤 바이트값이든 나올 수 있기 때문에 구분자가 데이터와 충돌한다. JSON이면 중괄호 천지고, 이미지 바이너리라면 0x7C가 얼마든지 등장한다.

해결책: length-prefixed encoding

데이터 앞에 길이를 먼저 쓴다. 읽는 쪽에서 길이를 먼저 읽고, 그 길이만큼만 읽으면 된다. Kafka도 같은 방식을 쓴다.

[key 길이: 2바이트][key 데이터: N바이트][value 길이: 2바이트][value 데이터: M바이트]

파일 손상 감지

디스크 오류나 프로세스 강제 종료 시 파일이 중간에 깨질 수 있다. 깨진 부분부터는 쓰레기 바이트인데, 이걸 모르고 value 길이 필드로 읽으면 수GB짜리 배열을 할당하려다 OOM이 난다.

해결책: magic byte

메시지 맨 앞에 고정된 2바이트 0xCAFE를 붙인다. 읽을 때 이 값이 없으면 즉시 파싱을 중단한다.

최종 레이아웃

[magic: 2B][offset: 8B][timestamp: 8B][keyLen: 2B][key: NB][valueLen: 2B][value: MB]
  0xCAFE     long         long           short       byte[]   short         byte[]

고정 헤더 합계: 2 + 8 + 8 + 2 + 2 = 22바이트


구현

Record.java — 메시지 하나

불변 객체로 만든다. 메시지는 한 번 쓰면 바뀌지 않아야 한다.

byte[]==로 비교하면 참조 비교가 되기 때문에 Arrays.equals()Arrays.hashCode()를 직접 구현했다.

public class Record {
    private final long offset;
    private final long timestamp;
    private final byte[] key;
    private final byte[] value;

    public Record(long offset, long timestamp, byte[] key, byte[] value) {
        this.offset = offset;
        this.timestamp = timestamp;
        this.key = key;
        this.value = value;
    }

    public long offset()    { return offset; }
    public long timestamp() { return timestamp; }
    public byte[] key()     { return key; }
    public byte[] value()   { return value; }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (!(o instanceof Record r)) return false;
        return offset == r.offset && timestamp == r.timestamp
            && Arrays.equals(key, r.key) && Arrays.equals(value, r.value);
    }

    @Override
    public int hashCode() {
        int result = Long.hashCode(offset);
        result = 31 * result + Long.hashCode(timestamp);
        result = 31 * result + Arrays.hashCode(key);
        result = 31 * result + Arrays.hashCode(value);
        return result;
    }
}

RecordWriter.java — 파일에 append

FileChannel + ByteBuffer를 사용한다.
FileOutputStream 대신 NIO를 쓰는 이유는 두 가지다:

  • Kafka가 실제로 쓰는 방식이고
  • Stage 3에서 구현할 NIO Selector 서버와 개념이 이어진다

null key/value는 길이 필드에 -1을 써서 표현한다. Kafka와 동일한 방식이다.

public long append(Record record) throws IOException {
    long position = channel.position(); // 이 메시지의 파일 내 시작 위치

    int keyLen   = (record.key()   == null) ? 0 : record.key().length;
    int valueLen = (record.value() == null) ? 0 : record.value().length;

    ByteBuffer buffer = ByteBuffer.allocate(22 + keyLen + valueLen);
    buffer.putShort((short) 0xCAFE);
    buffer.putLong(record.offset());
    buffer.putLong(record.timestamp());

    if (record.key() == null) {
        buffer.putShort((short) -1);
    } else {
        buffer.putShort((short) record.key().length);
        buffer.put(record.key());
    }

    if (record.value() == null) {
        buffer.putShort((short) -1);
    } else {
        buffer.putShort((short) record.value().length);
        buffer.put(record.value());
    }

    buffer.flip();
    channel.write(buffer);
    return position;
}

append()position을 반환하는 이유는 Stage 2에서 구현할 index 파일에 [offset, position] 쌍을 저장해야 하기 때문이다. 지금 심어두지 않으면 나중에 갈아엎어야 한다.

RecordReader.java — 파일에서 복원

쓴 순서 그대로 읽으면 된다. magic byte 체크가 핵심이다.

public List<Record> readAll() throws IOException {
    List<Record> records = new ArrayList<>();

    while (channel.position() < channel.size()) {
        // 1. magic 확인
        ByteBuffer magicBuf = ByteBuffer.allocate(2);
        channel.read(magicBuf);
        magicBuf.flip();
        if (magicBuf.getShort() != (short) 0xCAFE) break; // 파일 손상, 중단

        // 2. offset, timestamp
        ByteBuffer fixed = ByteBuffer.allocate(16);
        channel.read(fixed);
        fixed.flip();
        long offset    = fixed.getLong();
        long timestamp = fixed.getLong();

        // 3. key
        ByteBuffer keyLenBuf = ByteBuffer.allocate(2);
        channel.read(keyLenBuf);
        keyLenBuf.flip();
        short keyLen = keyLenBuf.getShort();
        byte[] key = null;
        if (keyLen != -1) {
            key = new byte[keyLen];
            channel.read(ByteBuffer.wrap(key));
        }

        // 4. value (같은 패턴)
        ByteBuffer valueLenBuf = ByteBuffer.allocate(2);
        channel.read(valueLenBuf);
        valueLenBuf.flip();
        short valueLen = valueLenBuf.getShort();
        byte[] value = null;
        if (valueLen != -1) {
            value = new byte[valueLen];
            channel.read(ByteBuffer.wrap(value));
        }

        records.add(new Record(offset, timestamp, key, value));
    }

    return records;
}

테스트: 100개 round-trip

@Test
void hundredRecordsRoundTrip() throws Exception {
    Path logFile = tempDir.resolve("test.log");

    // write
    try (RecordWriter writer = new RecordWriter(logFile)) {
        for (int i = 0; i < 100; i++) {
            writer.append(new Record(
                i,
                System.currentTimeMillis(),
                ("key-" + i).getBytes(),
                ("value-" + i).getBytes()
            ));
        }
    }

    // read & verify
    try (RecordReader reader = new RecordReader(logFile)) {
        List<Record> records = reader.readAll();

        assertEquals(100, records.size());
        for (int i = 0; i < 100; i++) {
            assertEquals(i, records.get(i).offset());
            assertArrayEquals(("key-"   + i).getBytes(), records.get(i).key());
            assertArrayEquals(("value-" + i).getBytes(), records.get(i).value());
        }
    }
}
./gradlew test → BUILD SUCCESSFUL ✓

배운 것

직접 설계하면서 Kafka의 선택들이 왜 그런지 체감됐다.

append-only가 왜 빠른가
기존 데이터를 수정하지 않고 뒤에만 붙인다. 랜덤 write 없이 순차 write만 하기 때문에 디스크 성능을 최대한 끌어낼 수 있다. HDD 기준으로 랜덤 write는 순차 write보다 수백 배 느리다.

length-prefix가 구분자보다 나은 이유
구분자는 데이터 안에 같은 바이트가 등장하면 무너진다. length-prefix는 데이터 내용과 무관하게 항상 동작한다.

magic byte의 한계
파일 손상을 감지하긴 하지만, 어느 메시지가 깨졌는지 특정할 수 없다. Stage 2에서 CRC를 추가하면 손상된 메시지를 정확히 집어낼 수 있다.

null key는 -1로 표현한다
처음엔 그냥 빈 배열을 쓰면 되지 않나 생각했는데, 빈 배열(길이=0)과 null의 의미가 다르다. Kafka도 -1로 null을 표현한다. 같은 이유다.


느낀 점

솔직히 말하면, 코드 자체는 어렵지 않다. ByteBuffer에 순서대로 넣고 꺼내는 게 전부다.

어려운 건 왜 이렇게 해야 하는지 이해하는 과정이었다. 처음엔 구분자로 하면 안 되나, JSON으로 직렬화하면 안 되나 생각했다. 직접 설계해보고 나서야 각각의 문제점이 손에 잡혔다.

Kafka가 이런 선택을 한 이유가 "천재 엔지니어의 직관"이 아니라, "이 문제를 해결하다 보면 자연스럽게 나오는 답"이라는 게 느껴졌다.


Next

  • sparse index + segment rolling
  • 1편에서 만든 로그 파일에 메시지가 100만 개 쌓이면 어떻게 될까?
    • 특정 offset 하나를 찾으려면 파일 전체를 처음부터 읽어야 한다.
  • 2편에서는 이 문제를 해결하는 sparse index를 구현한다.
  • Kafka가 .index 파일에 뭘 저장하는지, 왜 모든 메시지가 아닌 N개마다 하나씩만 기록하는지 직접 설계해볼 예정이다.