Done with chapter 3.

This commit is contained in:
Mariano Uvalle 2022-01-19 22:30:55 -08:00
parent 6f03983721
commit 725753866a
17 changed files with 1264 additions and 0 deletions

9
internal/log/config.go Normal file
View file

@ -0,0 +1,9 @@
package log
type Config struct {
Segment struct {
MaxStoreBytes uint64
MaxIndexBytes uint64
InitialOffset uint64
}
}

102
internal/log/index.go Normal file
View file

@ -0,0 +1,102 @@
package log
import (
"io"
"os"
"github.com/tysonmote/gommap"
)
var (
offWidth uint64 = 4
posWidth uint64 = 8
entWidth = offWidth + posWidth
)
type index struct {
file *os.File
mmap gommap.MMap
size uint64
}
func newIndex(f *os.File, c Config) (*index, error) {
idx := &index{
file: f,
}
fi, err := f.Stat()
if err != nil {
return nil, err
}
idx.size = uint64(fi.Size())
// File needs to be expanded to its max size as re-sizing is not possible after memory map.
err = os.Truncate(
f.Name(),
int64(c.Segment.MaxIndexBytes),
)
if err != nil {
return nil, err
}
if idx.mmap, err = gommap.Map(
idx.file.Fd(),
gommap.PROT_READ|gommap.PROT_WRITE,
gommap.MAP_SHARED,
); err != nil {
return nil, err
}
return idx, nil
}
// Read takes a relative offset and returns the offset and the position of tha offset
// in the store.
// Information of the last record is returned if `in` is -1
func (i *index) Read(in int64) (off uint32, pos uint64, err error) {
var out uint32
if i.size == 0 {
return 0, 0, io.EOF
}
if in == -1 {
out = uint32((i.size / entWidth) - 1)
} else {
out = uint32(in)
}
idxPos := uint64(out) * entWidth
if idxPos >= i.size {
return 0, 0, io.EOF
}
off = enc.Uint32(i.mmap[idxPos : idxPos+offWidth])
pos = enc.Uint64(i.mmap[idxPos+offWidth : idxPos+entWidth])
return off, pos, nil
}
func (i *index) Write(off uint32, pos uint64) error {
if i.size+entWidth > uint64(len(i.mmap)) {
return io.EOF
}
idxPos := i.size
enc.PutUint32(i.mmap[idxPos:idxPos+offWidth], off)
enc.PutUint64(i.mmap[idxPos+offWidth:idxPos+entWidth], pos)
i.size += entWidth
return nil
}
func (i *index) Name() string {
return i.file.Name()
}
func (i *index) Close() error {
if err := i.mmap.Sync(gommap.MS_SYNC); err != nil {
return err
}
if err := i.file.Sync(); err != nil {
return err
}
// Truncate back to real file size after memory mapped data is synced.
if err := i.file.Truncate(int64(i.size)); err != nil {
return err
}
return i.file.Close()
}

View file

@ -0,0 +1,59 @@
package log
import (
"io"
"io/ioutil"
"os"
"testing"
"github.com/stretchr/testify/require"
)
// TODO: Missing test for out of bounds writes.
// TODO: Improve creation and usage of test data set (entries).
func TestIndex(t *testing.T) {
f, err := ioutil.TempFile("", "index_test")
require.NoError(t, err)
defer os.Remove(f.Name())
c := Config{}
c.Segment.MaxIndexBytes = 1024
idx, err := newIndex(f, c)
require.NoError(t, err)
_, _, err = idx.Read(-1)
require.Error(t, err, "Index read fails if empty")
entries := []struct {
Off uint32
Pos uint64
}{
{Off: 0, Pos: 0},
{Off: 1, Pos: 10},
}
for _, want := range entries {
err = idx.Write(want.Off, want.Pos)
require.NoError(t, err)
_, pos, err := idx.Read(int64(want.Off))
require.NoError(t, err)
require.Equal(t, want.Pos, pos)
}
_, _, err = idx.Read(
int64(len(entries)),
)
require.Equal(t, io.EOF, err, "Read fails when requesting out of bounds offset")
err = idx.Close()
require.NoError(t, err, "Closes successfully")
f, _ = os.OpenFile(f.Name(), os.O_RDWR, 0600)
idx, err = newIndex(f, c)
require.NoError(t, err, "Opens from a pre-existing index file")
off, pos, err := idx.Read(-1)
require.NoError(t, err)
require.Equal(t, uint32(1), off)
require.Equal(t, entries[1].Pos, pos)
}

203
internal/log/log.go Normal file
View file

@ -0,0 +1,203 @@
package log
import (
"fmt"
"io"
"io/ioutil"
"os"
"path"
"sort"
"strconv"
"strings"
"sync"
api "github.com/AYM1607/proglog/api/v1"
)
type Log struct {
mu sync.RWMutex
Dir string
Config Config
activeSegment *segment
segments []*segment
}
func NewLog(dir string, c Config) (*Log, error) {
if c.Segment.MaxIndexBytes == 0 {
c.Segment.MaxIndexBytes = 1024
}
if c.Segment.MaxStoreBytes == 0 {
c.Segment.MaxStoreBytes = 1024
}
l := &Log{
Dir: dir,
Config: c,
}
return l, l.setup()
}
// setup hidrates the log with segments that already exist in the directory.
func (l *Log) setup() error {
files, err := ioutil.ReadDir(l.Dir)
if err != nil {
return err
}
var baseOffsets []uint64
for _, file := range files {
offStr := strings.TrimSuffix(
file.Name(),
path.Ext(file.Name()),
)
off, _ := strconv.ParseUint(offStr, 10, 0)
baseOffsets = append(baseOffsets, off)
}
sort.Slice(baseOffsets, func(i, j int) bool {
return baseOffsets[i] < baseOffsets[j]
})
for i := 0; i < len(baseOffsets); i++ {
if err := l.newSegment(baseOffsets[i]); err != nil {
return err
}
// There's a baseOffset for the store file and one for the index.
// We only need to create the segment once per baseOffset.
i++
}
if len(l.segments) == 0 {
if err := l.newSegment(
l.Config.Segment.InitialOffset,
); err != nil {
return err
}
}
return nil
}
func (l *Log) Append(record *api.Record) (uint64, error) {
l.mu.Lock()
defer l.mu.Unlock()
if l.activeSegment.IsMaxed() {
err := l.newSegment(l.activeSegment.nextOffset)
if err != nil {
return 0, err
}
}
off, err := l.activeSegment.Append(record)
if err != nil {
return 0, err
}
return off, nil
}
func (l *Log) Read(off uint64) (*api.Record, error) {
l.mu.RLock()
defer l.mu.RUnlock()
var s *segment
// Find the segment where the record is located.
for _, segment := range l.segments {
if segment.baseOffset <= off && off < segment.nextOffset {
s = segment
break
}
}
if s == nil {
return nil, fmt.Errorf("offset out range: %d", off)
}
return s.Read(off)
}
func (l *Log) Close() error {
l.mu.Lock()
defer l.mu.Unlock()
for _, segment := range l.segments {
if err := segment.Close(); err != nil {
return err
}
}
return nil
}
func (l *Log) Remove() error {
if err := l.Close(); err != nil {
return err
}
return os.RemoveAll(l.Dir)
}
func (l *Log) Reset() error {
if err := l.Remove(); err != nil {
return err
}
return l.setup()
}
func (l *Log) LowestOffset() (uint64, error) {
l.mu.RLock()
defer l.mu.RUnlock()
return l.segments[0].baseOffset, nil
}
func (l *Log) HighestOffset() (uint64, error) {
l.mu.RLock()
defer l.mu.RUnlock()
off := l.segments[len(l.segments)-1].nextOffset
if off == 0 {
return 0, nil
}
return off - 1, nil
}
// Truncate deletes all segments whose highest offset is lower than lowest.
// Guarantees that the record with offset "lowest" is kept in the log.
func (l *Log) Truncate(lowest uint64) error {
l.mu.Lock()
defer l.mu.Unlock()
var segments []*segment
for _, s := range l.segments {
if (s.nextOffset - 1) < lowest {
if err := s.Remove(); err != nil {
return err
}
continue
}
segments = append(segments, s)
}
// What happends if we delete the whole log?
// We'd end up with no segments and thus no active segment and subsequent
// writes would fail.
l.segments = segments
return nil
}
func (l *Log) Reader() io.Reader {
l.mu.Lock()
defer l.mu.Unlock()
readers := make([]io.Reader, len(l.segments))
for i, segment := range l.segments {
readers[i] = &originReader{segment.store, 0}
}
return io.MultiReader(readers...)
}
type originReader struct {
*store
off int64 // Offset relative to the start of the store.
}
func (o *originReader) Read(p []byte) (int, error) {
n, err := o.ReadAt(p, o.off)
o.off += int64(n)
return n, err
}
func (l *Log) newSegment(off uint64) error {
s, err := newSegment(l.Dir, off, l.Config)
if err != nil {
return err
}
l.segments = append(l.segments, s)
l.activeSegment = s
return nil
}

121
internal/log/log_test.go Normal file
View file

@ -0,0 +1,121 @@
package log
import (
"io/ioutil"
"os"
"testing"
api "github.com/AYM1607/proglog/api/v1"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"
)
func TestLog(t *testing.T) {
for scenario, fn := range map[string]func(
t *testing.T, log *Log,
){
"append and read a record suceeds": testAppendRead,
"offset out of range error": testOutOfRangeErr,
"init with existing segments": testInitExisting,
"reader": testReader,
"truncate": testTruncate,
} {
t.Run(scenario, func(t *testing.T) {
dir, err := ioutil.TempDir("", "store-test")
require.NoError(t, err)
defer os.RemoveAll(dir)
c := Config{}
// Guarantee that each segment will only have one record.
c.Segment.MaxIndexBytes = entWidth
log, err := NewLog(dir, c)
require.NoError(t, err)
fn(t, log)
})
}
}
func testAppendRead(t *testing.T, log *Log) {
want := &api.Record{
Value: []byte("hello world"),
}
off, err := log.Append(want)
require.NoError(t, err, "Record is appended successfully")
require.Equal(t, uint64(0), off, "First written record has offset 0")
read, err := log.Read(off)
require.NoError(t, err)
require.Equal(t, want.Value, read.Value)
require.Equal(t, off, read.Offset)
}
func testOutOfRangeErr(t *testing.T, log *Log) {
read, err := log.Read(10)
require.Nil(t, read)
require.Error(t, err)
}
func testInitExisting(t *testing.T, o *Log) {
record := &api.Record{
Value: []byte("hello world"),
}
for i := 0; i < 3; i++ {
_, err := o.Append(record)
require.NoError(t, err)
}
require.NoError(t, o.Close())
off, err := o.LowestOffset()
require.NoError(t, err)
require.Equal(t, uint64(0), off)
off, err = o.HighestOffset()
require.NoError(t, err)
require.Equal(t, uint64(2), off)
n, err := NewLog(o.Dir, o.Config)
require.NoError(t, err)
off, err = n.LowestOffset()
require.NoError(t, err)
require.Equal(t, uint64(0), off)
off, err = n.HighestOffset()
require.NoError(t, err)
require.Equal(t, uint64(2), off)
}
func testReader(t *testing.T, log *Log) {
want := &api.Record{
Value: []byte("hello world"),
}
off, err := log.Append(want)
require.NoError(t, err)
require.Equal(t, uint64(0), off)
reader := log.Reader()
b, err := ioutil.ReadAll(reader)
require.NoError(t, err)
read := &api.Record{}
err = proto.Unmarshal(b[lenWidth:], read) // Ignore the bytes used to store the length of the record.
require.NoError(t, err)
require.Equal(t, want.Value, read.Value)
}
func testTruncate(t *testing.T, log *Log) {
record := &api.Record{
Value: []byte("hello world"),
}
for i := 0; i < 3; i++ {
_, err := log.Append(record)
require.NoError(t, err)
}
err := log.Truncate(1)
require.NoError(t, err)
_, err = log.Read(0)
require.Error(t, err)
}

127
internal/log/segment.go Normal file
View file

@ -0,0 +1,127 @@
package log
import (
"fmt"
"os"
"path"
api "github.com/AYM1607/proglog/api/v1"
"google.golang.org/protobuf/proto"
)
type segment struct {
store *store
index *index
baseOffset uint64 // Absolute offset.
nextOffset uint64 // Absolute offset.
config Config
}
func newSegment(dir string, baseOffset uint64, c Config) (*segment, error) {
s := &segment{
baseOffset: baseOffset,
config: c,
}
storeF, err := os.OpenFile(
path.Join(dir, fmt.Sprintf("%d%s", baseOffset, ".store")),
os.O_RDWR|os.O_CREATE|os.O_APPEND,
0644,
)
if err != nil {
return nil, err
}
if s.store, err = newStore(storeF); err != nil {
return nil, err
}
indexF, err := os.OpenFile(
path.Join(dir, fmt.Sprintf("%d%s", baseOffset, ".index")),
os.O_RDWR|os.O_CREATE,
0644,
)
if err != nil {
return nil, err
}
if s.index, err = newIndex(indexF, c); err != nil {
return nil, err
}
// Determine the next offset by getting the last element in the index.
// The `Read` call will return an error if the index is empty.
off, _, err := s.index.Read(-1)
if err != nil {
s.nextOffset = s.baseOffset
} else {
s.nextOffset = s.baseOffset + uint64(off) + 1
}
return s, nil
}
func (s *segment) Append(record *api.Record) (offset uint64, err error) {
recordOff := s.nextOffset
// Write record to the store.
record.Offset = recordOff
p, err := proto.Marshal(record)
if err != nil {
return 0, err
}
_, pos, err := s.store.Append(p)
if err != nil {
return 0, nil
}
// Write record's position to the index.
if err = s.index.Write(
// Index's offsets are relative.
uint32(recordOff-s.baseOffset),
pos,
); err != nil {
return 0, err
}
s.nextOffset++
return recordOff, nil
}
func (s *segment) Read(off uint64) (*api.Record, error) {
_, pos, err := s.index.Read(int64(off - s.baseOffset))
if err != nil {
return nil, err
}
p, err := s.store.Read(pos)
if err != nil {
return nil, err
}
record := &api.Record{}
err = proto.Unmarshal(p, record)
return record, err
}
func (s *segment) IsMaxed() bool {
return s.store.size >= s.config.Segment.MaxStoreBytes ||
s.index.size >= s.config.Segment.MaxIndexBytes
}
func (s *segment) Close() error {
if err := s.index.Close(); err != nil {
return err
}
if err := s.store.Close(); err != nil {
return err
}
return nil
}
func (s *segment) Remove() error {
if err := s.Close(); err != nil {
return err
}
if err := os.Remove(s.store.Name()); err != nil {
return err
}
if err := os.Remove(s.index.Name()); err != nil {
return err
}
return nil
}

View file

@ -0,0 +1,68 @@
package log
import (
"io"
"io/ioutil"
"os"
"testing"
api "github.com/AYM1607/proglog/api/v1"
"github.com/stretchr/testify/require"
)
const (
baseOff uint64 = 16
)
func TestSegment(t *testing.T) {
dir, err := ioutil.TempDir("", "segmet_test")
require.NoError(t, err)
defer os.RemoveAll(dir)
want := &api.Record{Value: []byte("hello world!")}
// Index-limited config.
c := Config{}
c.Segment.MaxStoreBytes = 1024
c.Segment.MaxIndexBytes = entWidth * 3
s, err := newSegment(dir, baseOff, c)
require.NoError(t, err)
require.Equal(t, baseOff, s.nextOffset, "next offset is the base offset for an empty segment")
require.False(t, s.IsMaxed())
for i := uint64(0); i < 3; i++ {
off, err := s.Append(want)
require.NoError(t, err)
require.Equal(t, baseOff+i, off)
got, err := s.Read(off)
require.NoError(t, err)
require.Equal(t, want.Value, got.Value)
}
_, err = s.Append(want)
require.True(t, s.IsMaxed())
require.Equal(t, io.EOF, err, "Append fails when the index is full")
// Store-limited config.
// This is not really accurate. The Marshalled record with the added bytes
// for the length will be longer that just the length of the value in bytes.
// If more fields are added to the record, 2 could cause the store to fill up
// and this test would fail.
c.Segment.MaxStoreBytes = uint64(len(want.Value) * 3)
c.Segment.MaxIndexBytes = 1024
// Create from the existing files.
s, err = newSegment(dir, baseOff, c)
require.NoError(t, err)
require.True(t, s.IsMaxed())
err = s.Remove()
require.NoError(t, err)
// Re-create files.
s, err = newSegment(dir, baseOff, c)
require.NoError(t, err)
require.False(t, s.IsMaxed())
}

96
internal/log/store.go Normal file
View file

@ -0,0 +1,96 @@
package log
import (
"bufio"
"encoding/binary"
"os"
"sync"
)
var enc = binary.BigEndian
const lenWidth = 8 // Bytes used to store the length of a record.
type store struct {
*os.File
mu sync.Mutex
buf *bufio.Writer
size uint64
}
func newStore(f *os.File) (*store, error) {
fi, err := os.Stat(f.Name())
if err != nil {
return nil, err
}
size := uint64(fi.Size())
return &store{
File: f,
size: size,
buf: bufio.NewWriter(f),
}, nil
}
func (s *store) Append(p []byte) (n uint64, pos uint64, err error) {
s.mu.Lock()
defer s.mu.Unlock()
pos = s.size // Writing the record starting at the end of the file.
// Write the length of the record.
if err := binary.Write(s.buf, enc, uint64(len(p))); err != nil {
return 0, 0, err
}
w, err := s.buf.Write(p)
// There's a potential problem here, the call to write could return an error,
// but still have written a partial set of the data to the buffer. This could
// introduce inconsistencies at read time.
if err != nil {
return 0, 0, err
}
w += lenWidth
s.size += uint64(w)
return uint64(w), pos, nil
}
func (s *store) Read(pos uint64) ([]byte, error) {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.buf.Flush(); err != nil {
return nil, err
}
rawSize := make([]byte, lenWidth)
if _, err := s.File.ReadAt(rawSize, int64(pos)); err != nil {
return nil, err
}
size := enc.Uint64(rawSize)
b := make([]byte, size)
if _, err := s.File.ReadAt(b, int64(pos+lenWidth)); err != nil {
return nil, err
}
return b, nil
}
func (s *store) ReadAt(p []byte, off int64) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()
if err := s.buf.Flush(); err != nil {
return 0, err
}
return s.File.ReadAt(p, off)
}
func (s *store) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
err := s.buf.Flush()
if err != nil {
return err
}
return s.File.Close()
}

114
internal/log/store_test.go Normal file
View file

@ -0,0 +1,114 @@
package log
import (
"io/ioutil"
"os"
"testing"
"github.com/stretchr/testify/require"
)
var (
write = []byte("hello world")
width = uint64(len(write)) + lenWidth
)
const rCnt = 3
func TestStoreAppendRead(t *testing.T) {
f, err := ioutil.TempFile("", "store_append_read_test")
require.NoError(t, err)
defer os.Remove(f.Name())
s, err := newStore(f)
require.NoError(t, err)
// Test basic operations on the store.
testAppend(t, s)
testRead(t, s)
testReadAt(t, s)
// Test that the store can be reconstructed from an existing file.
s, err = newStore(f)
require.NoError(t, err)
testRead(t, s)
}
func testAppend(t *testing.T, s *store) {
t.Helper()
for i := uint64(1); i <= rCnt; i++ {
n, pos, err := s.Append(write)
require.NoError(t, err)
require.Equal(t, width*i, pos+n, "Bytes written to the store file must be the length of the message + `lenWidth` bytes for length")
}
}
// testRead ensures `rCnt` records were written to the store and the contet matches `write`.
func testRead(t *testing.T, s *store) {
t.Helper()
var pos uint64
for i := uint64(1); i <= rCnt; i++ {
read, err := s.Read(pos)
require.NoError(t, err)
require.Equal(t, write, read, "Record value should match the written one.")
pos += width
}
}
func testReadAt(t *testing.T, s *store) {
t.Helper()
for i, off := uint64(1), int64(0); i <= rCnt; i++ {
// Read record size.
b := make([]byte, lenWidth)
n, err := s.ReadAt(b, off)
require.NoError(t, err)
require.Equal(t, lenWidth, n)
size := enc.Uint64(b)
off += int64(n)
// Read record content.
b = make([]byte, size)
n, err = s.ReadAt(b, off)
require.NoError(t, err)
require.Equal(t, write, b)
off += int64(n)
}
}
func TestClose(t *testing.T) {
f, err := ioutil.TempFile("", "store_close_test")
require.NoError(t, err)
defer os.Remove(f.Name())
s, err := newStore(f)
require.NoError(t, err)
f, beforeSize, err := openFile(f.Name())
require.NoError(t, err)
_, _, err = s.Append(write)
require.NoError(t, err)
err = s.Close()
require.NoError(t, err)
_, afterSize, err := openFile(f.Name())
require.NoError(t, err)
require.Greater(t, afterSize, beforeSize)
}
func openFile(fn string) (file *os.File, size int64, err error) {
f, err := os.OpenFile(
fn,
os.O_RDWR|os.O_CREATE|os.O_APPEND,
0644,
)
if err != nil {
return nil, 0, err
}
fi, err := f.Stat()
if err != nil {
return nil, 0, err
}
return f, fi.Size(), nil
}