From 525abb53fd8c40a0e0aea44cf62466c5a184c5cd Mon Sep 17 00:00:00 2001 From: AYM1607 Date: Tue, 3 Aug 2021 22:56:25 -0500 Subject: [PATCH] Finished index with its tests. --- internal/log/config.go | 9 ++++ internal/log/index.go | 100 +++++++++++++++++++++++++++++++++++++ internal/log/index_test.go | 57 +++++++++++++++++++++ 3 files changed, 166 insertions(+) create mode 100644 internal/log/config.go create mode 100644 internal/log/index.go create mode 100644 internal/log/index_test.go diff --git a/internal/log/config.go b/internal/log/config.go new file mode 100644 index 0000000..2df654d --- /dev/null +++ b/internal/log/config.go @@ -0,0 +1,9 @@ +package log + +type Config struct { + Segment struct { + MaxStoreBytes uint64 + MaxIndexBytes uint64 + InitialOffset uint64 + } +} diff --git a/internal/log/index.go b/internal/log/index.go new file mode 100644 index 0000000..b88d1d8 --- /dev/null +++ b/internal/log/index.go @@ -0,0 +1,100 @@ +package log + +import ( + "io" + "os" + + "github.com/tysontate/gommap" +) + +var ( + offWidth uint64 = 4 + posWidth uint64 = 8 + entWidth = offWidth + posWidth +) + +type index struct { + file *os.File + mmap gommap.MMap + size uint64 +} + +func newIndex(f *os.File, c Config) (*index, error) { + idx := &index{ + file: f, + } + fi, err := os.Stat(f.Name()) + if err != nil { + return nil, err + } + + // Get and store the file size. + idx.size = uint64(fi.Size()) + // Grow the file to its max size so it can be memory mapped as a whole. + if err = os.Truncate( + f.Name(), int64(c.Segment.MaxIndexBytes), + ); err != nil { + return nil, err + } + // Map the grown file to memory. + if idx.mmap, err = gommap.Map( + idx.file.Fd(), + gommap.PROT_READ|gommap.PROT_WRITE, + gommap.MAP_SHARED, + ); err != nil { + return nil, err + } + + return idx, nil +} + +func (i *index) Read(in int64) (out uint32, pos uint64, err error) { + if i.size == 0 { + return 0, 0, io.EOF + } + if in == -1 { + // The last entity was requested + out = uint32((i.size / entWidth) - 1) + } else { + out = uint32(in) + } + + pos = uint64(out) * entWidth + + // Check that the index actually contains the record with the given offset. + if i.size < pos+entWidth { + return 0, 0, io.EOF + } + + out = enc.Uint32(i.mmap[pos : pos+offWidth]) + pos = enc.Uint64(i.mmap[pos+offWidth : pos+entWidth]) + return out, pos, nil +} + +func (i *index) Write(off uint32, pos uint64) error { + if uint64(len(i.mmap)) < i.size+entWidth { + return io.EOF + } + enc.PutUint32(i.mmap[i.size:i.size+offWidth], off) + enc.PutUint64(i.mmap[i.size+offWidth:i.size+entWidth], pos) + i.size += entWidth + return nil +} + +func (i *index) Name() string { + return i.file.Name() +} + +func (i *index) Close() error { + if err := i.mmap.Sync(gommap.MS_SYNC); err != nil { + return err + } + if err := i.file.Sync(); err != nil { + return err + } + if err := i.file.Truncate(int64(i.size)); err != nil { + return err + } + + return i.file.Close() +} diff --git a/internal/log/index_test.go b/internal/log/index_test.go new file mode 100644 index 0000000..ac24f5f --- /dev/null +++ b/internal/log/index_test.go @@ -0,0 +1,57 @@ +package log + +import ( + "io" + "io/ioutil" + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestIndex(t *testing.T) { + // Create a temp file. + f, err := ioutil.TempFile(os.TempDir(), "index_tes") + require.NoError(t, err) + defer os.Remove(f.Name()) + + // Use a configuration that allows enough bytes for the test. + c := Config{} + c.Segment.MaxIndexBytes = 1024 + idx, err := newIndex(f, c) + require.NoError(t, err) + + _, _, err = idx.Read(-1) + require.Error(t, err, "Read should return an error on empty index.") + + require.Equal(t, f.Name(), idx.Name(), "Name should return the same name as the underlying file.") + + entries := []struct { + Off uint32 + Pos uint64 + }{ + {Off: 0, Pos: 0}, + {Off: 1, Pos: 10}, + } + + for _, want := range entries { + err = idx.Write(want.Off, want.Pos) + require.NoError(t, err, "No error when writing to an index with enough space.") + + _, pos, err := idx.Read(int64(want.Off)) + require.NoError(t, err, "No error when reading an existing record.") + require.Equal(t, want.Pos, pos, "Read pos should be the same as the read one.") + } + + _, _, err = idx.Read(int64(len(entries))) + require.Equal(t, io.EOF, err, "Read should error with EOF when reading past the index records.") + _ = idx.Close() + + f, _ = os.OpenFile(f.Name(), os.O_RDWR, 0600) + idx, err = newIndex(f, c) + require.NoError(t, err, "No error when creating index from an existing file.") + off, pos, err := idx.Read(-1) + require.NoError(t, err, "No error when reading the last record of a non empty index.") + require.Equal(t, uint32(1), off) + require.Equal(t, entries[1].Pos, pos) +}