
Intro
Strimzi로 3-broker KRaft Kafka 클러스터를 운영하고 있다.
Kafka를 사용하다가 어느 날 문득 이런 생각이 들었다.
producer.send(record)하면 메시지가 디스크에 어떻게 저장되는 걸까?consumer.poll()은 어떻게 그 메시지를 정확히 찾아오는 걸까?
설명할 수 없었다. 운영은 하고 있었지만, 내부는 블랙박스였다.
그래서 직접 만들어보기로 했다. Java로, 표준 라이브러리만 써서, Kafka-like 단일 노드 브로커를.
꼭 명심해야 할 점이 있다.
이 프로젝트의 목표는 성능이 아니다. "왜 Kafka가 이렇게 설계됐는지"를 체감하는 것이다.
그래서 Netty, Protobuf, Spring Boot 같은 라이브러리는 의도적으로 쓰지 않는다. 그것들이 대신 해주는 부분이 바로 이 프로젝트에서 직접 구현할 내용이기 때문이다.
목표
메시지를 파일에 이어서 쓰고, 처음부터 읽어서 순서대로 복원한다.
Kafka 브로커 디렉토리를 실제로 열어보면 이런 파일들이 있다:
/kafka-logs/orders-0/
├── 00000000000000000000.log ← 메시지가 바이트로 쌓이는 파일
├── 00000000000000000000.index ← 인덱스 (Stage 2에서 구현)
1편에서는 저 .log 파일을 직접 구현하는 단계다.
단순해 보이지만, 여기서 가장 중요한 결정이 나온다: 바이너리 포맷 설계.
바이너리 포맷 설계
코드보다 설계가 먼저다.
메시지 하나에 뭐가 들어가나
Kafka ConsumerRecord를 떠올려보면:
record.topic() // 토픽명
record.key() // 키
record.value() // 실제 데이터
record.offset() // 순번
record.timestamp() // 시각
topic은 파일명으로 대체 가능하다.
토픽 하나 = 파일 하나 구조라면 파일명이 토픽명이 되기 때문에 파일 안에 따로 저장할 필요가 없다.
나머지 4개(offset, timestamp, key, value)가 파일에 저장될 필드다.
가변 길이 문제
offset과 timestamp는 고정 크기다 (long = 8바이트). 문제는 key와 value — 메시지마다 길이가 다르다.
파일에 그냥 이어서 쓰면:
[offset][timestamp][user-1{}][offset][timestamp][customer-999{...긴JSON}]
읽을 때 user-1이 어디서 끝나고 {}가 어디서 시작하는지 알 수 없다.
구분자(| 같은)를 쓰면 어떨까? 바이너리 데이터 안에는 어떤 바이트값이든 나올 수 있기 때문에 구분자가 데이터와 충돌한다. JSON이면 중괄호 천지고, 이미지 바이너리라면 0x7C가 얼마든지 등장한다.
해결책: length-prefixed encoding
데이터 앞에 길이를 먼저 쓴다. 읽는 쪽에서 길이를 먼저 읽고, 그 길이만큼만 읽으면 된다. Kafka도 같은 방식을 쓴다.
[key 길이: 2바이트][key 데이터: N바이트][value 길이: 2바이트][value 데이터: M바이트]
파일 손상 감지
디스크 오류나 프로세스 강제 종료 시 파일이 중간에 깨질 수 있다. 깨진 부분부터는 쓰레기 바이트인데, 이걸 모르고 value 길이 필드로 읽으면 수GB짜리 배열을 할당하려다 OOM이 난다.
해결책: magic byte
메시지 맨 앞에 고정된 2바이트 0xCAFE를 붙인다. 읽을 때 이 값이 없으면 즉시 파싱을 중단한다.
최종 레이아웃
[magic: 2B][offset: 8B][timestamp: 8B][keyLen: 2B][key: NB][valueLen: 2B][value: MB]
0xCAFE long long short byte[] short byte[]
고정 헤더 합계: 2 + 8 + 8 + 2 + 2 = 22바이트
구현
Record.java — 메시지 하나
불변 객체로 만든다. 메시지는 한 번 쓰면 바뀌지 않아야 한다.
byte[]는 ==로 비교하면 참조 비교가 되기 때문에 Arrays.equals()와 Arrays.hashCode()를 직접 구현했다.
public class Record {
private final long offset;
private final long timestamp;
private final byte[] key;
private final byte[] value;
public Record(long offset, long timestamp, byte[] key, byte[] value) {
this.offset = offset;
this.timestamp = timestamp;
this.key = key;
this.value = value;
}
public long offset() { return offset; }
public long timestamp() { return timestamp; }
public byte[] key() { return key; }
public byte[] value() { return value; }
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (!(o instanceof Record r)) return false;
return offset == r.offset && timestamp == r.timestamp
&& Arrays.equals(key, r.key) && Arrays.equals(value, r.value);
}
@Override
public int hashCode() {
int result = Long.hashCode(offset);
result = 31 * result + Long.hashCode(timestamp);
result = 31 * result + Arrays.hashCode(key);
result = 31 * result + Arrays.hashCode(value);
return result;
}
}
RecordWriter.java — 파일에 append
FileChannel + ByteBuffer를 사용한다.FileOutputStream 대신 NIO를 쓰는 이유는 두 가지다:
- Kafka가 실제로 쓰는 방식이고
- Stage 3에서 구현할 NIO Selector 서버와 개념이 이어진다
null key/value는 길이 필드에 -1을 써서 표현한다. Kafka와 동일한 방식이다.
public long append(Record record) throws IOException {
long position = channel.position(); // 이 메시지의 파일 내 시작 위치
int keyLen = (record.key() == null) ? 0 : record.key().length;
int valueLen = (record.value() == null) ? 0 : record.value().length;
ByteBuffer buffer = ByteBuffer.allocate(22 + keyLen + valueLen);
buffer.putShort((short) 0xCAFE);
buffer.putLong(record.offset());
buffer.putLong(record.timestamp());
if (record.key() == null) {
buffer.putShort((short) -1);
} else {
buffer.putShort((short) record.key().length);
buffer.put(record.key());
}
if (record.value() == null) {
buffer.putShort((short) -1);
} else {
buffer.putShort((short) record.value().length);
buffer.put(record.value());
}
buffer.flip();
channel.write(buffer);
return position;
}
append()가 position을 반환하는 이유는 Stage 2에서 구현할 index 파일에 [offset, position] 쌍을 저장해야 하기 때문이다. 지금 심어두지 않으면 나중에 갈아엎어야 한다.
RecordReader.java — 파일에서 복원
쓴 순서 그대로 읽으면 된다. magic byte 체크가 핵심이다.
public List<Record> readAll() throws IOException {
List<Record> records = new ArrayList<>();
while (channel.position() < channel.size()) {
// 1. magic 확인
ByteBuffer magicBuf = ByteBuffer.allocate(2);
channel.read(magicBuf);
magicBuf.flip();
if (magicBuf.getShort() != (short) 0xCAFE) break; // 파일 손상, 중단
// 2. offset, timestamp
ByteBuffer fixed = ByteBuffer.allocate(16);
channel.read(fixed);
fixed.flip();
long offset = fixed.getLong();
long timestamp = fixed.getLong();
// 3. key
ByteBuffer keyLenBuf = ByteBuffer.allocate(2);
channel.read(keyLenBuf);
keyLenBuf.flip();
short keyLen = keyLenBuf.getShort();
byte[] key = null;
if (keyLen != -1) {
key = new byte[keyLen];
channel.read(ByteBuffer.wrap(key));
}
// 4. value (같은 패턴)
ByteBuffer valueLenBuf = ByteBuffer.allocate(2);
channel.read(valueLenBuf);
valueLenBuf.flip();
short valueLen = valueLenBuf.getShort();
byte[] value = null;
if (valueLen != -1) {
value = new byte[valueLen];
channel.read(ByteBuffer.wrap(value));
}
records.add(new Record(offset, timestamp, key, value));
}
return records;
}
테스트: 100개 round-trip
@Test
void hundredRecordsRoundTrip() throws Exception {
Path logFile = tempDir.resolve("test.log");
// write
try (RecordWriter writer = new RecordWriter(logFile)) {
for (int i = 0; i < 100; i++) {
writer.append(new Record(
i,
System.currentTimeMillis(),
("key-" + i).getBytes(),
("value-" + i).getBytes()
));
}
}
// read & verify
try (RecordReader reader = new RecordReader(logFile)) {
List<Record> records = reader.readAll();
assertEquals(100, records.size());
for (int i = 0; i < 100; i++) {
assertEquals(i, records.get(i).offset());
assertArrayEquals(("key-" + i).getBytes(), records.get(i).key());
assertArrayEquals(("value-" + i).getBytes(), records.get(i).value());
}
}
}
./gradlew test → BUILD SUCCESSFUL ✓
배운 것
직접 설계하면서 Kafka의 선택들이 왜 그런지 체감됐다.
append-only가 왜 빠른가
기존 데이터를 수정하지 않고 뒤에만 붙인다. 랜덤 write 없이 순차 write만 하기 때문에 디스크 성능을 최대한 끌어낼 수 있다. HDD 기준으로 랜덤 write는 순차 write보다 수백 배 느리다.
length-prefix가 구분자보다 나은 이유
구분자는 데이터 안에 같은 바이트가 등장하면 무너진다. length-prefix는 데이터 내용과 무관하게 항상 동작한다.
magic byte의 한계
파일 손상을 감지하긴 하지만, 어느 메시지가 깨졌는지 특정할 수 없다. Stage 2에서 CRC를 추가하면 손상된 메시지를 정확히 집어낼 수 있다.
null key는 -1로 표현한다
처음엔 그냥 빈 배열을 쓰면 되지 않나 생각했는데, 빈 배열(길이=0)과 null의 의미가 다르다. Kafka도 -1로 null을 표현한다. 같은 이유다.
느낀 점
솔직히 말하면, 코드 자체는 어렵지 않다. ByteBuffer에 순서대로 넣고 꺼내는 게 전부다.
어려운 건 왜 이렇게 해야 하는지 이해하는 과정이었다. 처음엔 구분자로 하면 안 되나, JSON으로 직렬화하면 안 되나 생각했다. 직접 설계해보고 나서야 각각의 문제점이 손에 잡혔다.
Kafka가 이런 선택을 한 이유가 "천재 엔지니어의 직관"이 아니라, "이 문제를 해결하다 보면 자연스럽게 나오는 답"이라는 게 느껴졌다.
Next
- sparse index + segment rolling
- 1편에서 만든 로그 파일에 메시지가 100만 개 쌓이면 어떻게 될까?
- 특정 offset 하나를 찾으려면 파일 전체를 처음부터 읽어야 한다.
- 2편에서는 이 문제를 해결하는 sparse index를 구현한다.
- Kafka가
.index파일에 뭘 저장하는지, 왜 모든 메시지가 아닌 N개마다 하나씩만 기록하는지 직접 설계해볼 예정이다.
'Lab > Mini-Broker' 카테고리의 다른 글
| [Mini-Broker] 06. 벤치마크 측정 (0) | 2026.05.29 |
|---|---|
| [Mini-Broker] 05. 파티션과 컨슈머 그룹, Kafka가 병렬 처리를 하는 방법 (0) | 2026.05.26 |
| [Mini-Broker] 04. Consumer 구현, sparse index 사용 (0) | 2026.05.21 |
| [Mini-Broker] 03. NIO Selector로 TCP 서버 직접 만들기 (0) | 2026.05.15 |
| [Mini-Broker] 02. 100만 개 중 하나를 빠르게 찾는 법, sparse index (0) | 2026.05.13 |