From 60f6040fe397b72756448cf2dbc36b7a6bd84a1a Mon Sep 17 00:00:00 2001 From: AYM1607 Date: Thu, 5 Aug 2021 16:18:28 -0500 Subject: [PATCH] Finished core log package. --- internal/log/log.go | 218 +++++++++++++++++++++++++++++++++++++++ internal/log/log_test.go | 126 ++++++++++++++++++++++ 2 files changed, 344 insertions(+) create mode 100644 internal/log/log.go create mode 100644 internal/log/log_test.go diff --git a/internal/log/log.go b/internal/log/log.go new file mode 100644 index 0000000..361ab53 --- /dev/null +++ b/internal/log/log.go @@ -0,0 +1,218 @@ +package log + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "path" + "sort" + "strconv" + "strings" + "sync" + + api "github.com/AYM1607/proglog/api/v1" +) + +type Log struct { + mu sync.RWMutex + + Dir string + Config Config + + activeSegment *segment + segments []*segment +} + +func NewLog(dir string, c Config) (*Log, error) { + // Deal with default values for max index and store sizes. + if c.Segment.MaxIndexBytes == 0 { + c.Segment.MaxIndexBytes = 1024 + } + if c.Segment.MaxStoreBytes == 0 { + c.Segment.MaxStoreBytes = 1024 + } + + l := &Log{ + Dir: dir, + Config: c, + } + + return l, l.setup() +} + +func (l *Log) setup() error { + files, err := ioutil.ReadDir(l.Dir) + if err != nil { + return err + } + + // Get all the base offsets for the existing segments. This is posible because + // the .index and .store files have their base offset as their name. + var baseOffsets []uint64 + for _, file := range files { + offStr := strings.TrimSuffix( + file.Name(), + path.Ext(file.Name()), + ) + off, _ := strconv.ParseUint(offStr, 10, 0) + baseOffsets = append(baseOffsets, off) + } + + // Sort ascending. + sort.Slice(baseOffsets, func(i, j int) bool { + return baseOffsets[i] < baseOffsets[j] + }) + + // Create a segment for each of the base offsets. + for i := 0; i < len(baseOffsets); i++ { + if err = l.newSegment(baseOffsets[i]); err != nil { + return nil + } + // baseOffsets contains 2 entries for each base offset, one for the index and one for the store. + // dedup. + i++ + } + + // nil is the zero value for a slice, check if the log is new (no segments) + if l.segments == nil { + if err = l.newSegment( + l.Config.Segment.InitialOffset, + ); err != nil { + return err + } + } + + return nil +} + +func (l *Log) Append(record *api.Record) (uint64, error) { + l.mu.Lock() + defer l.mu.Unlock() + + off, err := l.activeSegment.Append(record) + if err != nil { + return 0, err + } + + if l.activeSegment.IsMaxed() { + // I don't know if it's the best thing to return this error, because + // the core operation (appending) is correctly performed at this point. + // The new segment creation failed but the record was written and is now part of the active segment. + err = l.newSegment(off + 1) + } + + return off, err +} + +func (l *Log) Read(off uint64) (*api.Record, error) { + l.mu.RLock() + defer l.mu.RUnlock() + var s *segment + for _, segsegment := range l.segments { + if segsegment.baseOffset <= off && off < segsegment.nextOffset { + s = segsegment + break + } + } + // The condition after the of shouldn't matter because we check for this in the for. + // The code comes from the book. + if s == nil || s.nextOffset <= off { + return nil, fmt.Errorf("offset out of range: %d", off) + } + return s.Read(off) +} + +func (l *Log) Close() error { + l.mu.Lock() + defer l.mu.Unlock() + + for _, segment := range l.segments { + if err := segment.Close(); err != nil { + return err + } + } + return nil +} + +func (l *Log) Remove() error { + if err := l.Close(); err != nil { + return err + } + return os.RemoveAll(l.Dir) +} + +func (l *Log) Replace() error { + if err := l.Remove(); err != nil { + return err + } + return l.setup() +} + +func (l *Log) LowestOffset() (uint64, error) { + l.mu.RLock() + defer l.mu.RUnlock() + return l.segments[0].baseOffset, nil +} + +func (l *Log) HighestOffset() (uint64, error) { + l.mu.RLock() + defer l.mu.RUnlock() + off := l.segments[len(l.segments)-1].nextOffset + if off == 0 { + return 0, nil + } + return off - 1, nil +} + +func (l *Log) Truncate(lowest uint64) error { + l.mu.Lock() + defer l.mu.Unlock() + + var segments []*segment + for _, s := range l.segments { + // Remove any segments whose highest offset is smaller than the lowest to truncate. + if s.nextOffset <= lowest { + if err := s.Remove(); err != nil { + return err + } + continue + } + segments = append(segments, s) + } + l.segments = segments + return nil +} + +// Reader returns a reader that allows to read all the log records subsequently even though they're in different segments. +func (l *Log) Reader() io.Reader { + l.mu.RLock() + defer l.mu.RUnlock() + + readers := make([]io.Reader, len(l.segments)) + for i, segment := range l.segments { + readers[i] = &originReader{segment.store, 0} + } + return io.MultiReader(readers...) +} + +type originReader struct { + *store + off int64 +} + +func (o *originReader) Read(p []byte) (int, error) { + n, err := o.ReadAt(p, o.off) + o.off += int64(n) + return n, err +} + +func (l *Log) newSegment(off uint64) error { + s, err := newSegment(l.Dir, off, l.Config) + if err != nil { + return err + } + l.segments = append(l.segments, s) + l.activeSegment = s + return nil +} diff --git a/internal/log/log_test.go b/internal/log/log_test.go new file mode 100644 index 0000000..f2d089f --- /dev/null +++ b/internal/log/log_test.go @@ -0,0 +1,126 @@ +package log + +import ( + "io" + "io/ioutil" + "os" + "testing" + + api "github.com/AYM1607/proglog/api/v1" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" +) + +func TestLog(t *testing.T) { + for scenario, fn := range map[string]func( + t *testing.T, log *Log, + ){ + "append and rea a record succeeds": testAppendRead, + "offset out of range error": testOutOfRangeErr, + "init with existing segments": testInitExisting, + "reader": testReader, + "truncate": testTruncate, + } { + t.Run(scenario, func(t *testing.T) { + dir, err := ioutil.TempDir("", "log-test") + require.NoError(t, err) + defer os.RemoveAll(dir) + + c := Config{} + // This ensures that each segment can only hold one record. + c.Segment.MaxIndexBytes = entWidth + log, err := NewLog(dir, c) + require.NoError(t, err) + + fn(t, log) + }) + } +} + +func testAppendRead(t *testing.T, log *Log) { + apnd := &api.Record{ + Value: []byte("hello world"), + } + off, err := log.Append(apnd) + require.NoError(t, err) + require.Equal(t, uint64(0), off) + + read, err := log.Read(off) + require.NoError(t, err) + require.Equal(t, apnd.Value, read.Value) +} + +func testOutOfRangeErr(t *testing.T, log *Log) { + read, err := log.Read(1) + require.Nil(t, read) + require.Error(t, err) +} + +func testInitExisting(t *testing.T, o *Log) { + apnd := &api.Record{ + Value: []byte("hello world"), + } + + for i := 0; i < 3; i++ { + _, err := o.Append(apnd) + require.NoError(t, err) + } + require.NoError(t, o.Close()) + + off, err := o.LowestOffset() + require.NoError(t, err) + require.Equal(t, uint64(0), off) + off, err = o.HighestOffset() + require.NoError(t, err) + require.Equal(t, uint64(2), off) + + // Create a new log from the directory and config of the old one. + n, err := NewLog(o.Dir, o.Config) + require.NoError(t, err) + + off, err = n.LowestOffset() + require.NoError(t, err) + require.Equal(t, uint64(0), off) + off, err = n.HighestOffset() + require.NoError(t, err) + require.Equal(t, uint64(2), off) + +} + +func testReader(t *testing.T, log *Log) { + apnd := &api.Record{ + Value: []byte("hello world"), + } + + off, err := log.Append(apnd) + require.NoError(t, err) + require.Equal(t, uint64(0), off) + + reader := log.Reader() + b, err := io.ReadAll(reader) + require.NoError(t, err) + + read := &api.Record{} + // Store writes the length as a prefix to the binary content so we have to skip it. + err = proto.Unmarshal(b[lenWidth:], read) + require.NoError(t, err) + require.Equal(t, apnd.Value, read.Value) +} + +func testTruncate(t *testing.T, log *Log) { + apnd := &api.Record{ + Value: []byte("hello world"), + } + + // Because of the configured store limit, each segment should only contain a single record. + for i := 0; i < 3; i++ { + _, err := log.Append(apnd) + require.NoError(t, err) + } + + err := log.Truncate(1) + require.NoError(t, err) + + _, err = log.Read(0) + require.Error(t, err) +}