From ce0f4b7bb427af48f2796ec659cd75dbb7b8f48f Mon Sep 17 00:00:00 2001 From: AYM1607 Date: Wed, 4 Aug 2021 12:46:39 -0500 Subject: [PATCH] Finished segment with its tests. --- internal/log/segment.go | 145 +++++++++++++++++++++++++++++++++++ internal/log/segment_test.go | 60 +++++++++++++++ 2 files changed, 205 insertions(+) create mode 100644 internal/log/segment.go create mode 100644 internal/log/segment_test.go diff --git a/internal/log/segment.go b/internal/log/segment.go new file mode 100644 index 0000000..d0a40a1 --- /dev/null +++ b/internal/log/segment.go @@ -0,0 +1,145 @@ +package log + +import ( + "fmt" + "os" + "path" + + api "github.com/AYM1607/proglog/api/v1" + "google.golang.org/protobuf/proto" +) + +type segment struct { + store *store + index *index + baseOffset, nextOffset uint64 + config Config +} + +func newSegment(dir string, baseOffset uint64, c Config) (*segment, error) { + s := &segment{ + baseOffset: baseOffset, + config: c, + } + + // This code comes from the book, why declare the error as a variable? + var err error + + // Store creation. + storeFile, err := os.OpenFile( + path.Join(dir, fmt.Sprintf("%d%s", baseOffset, ".store")), + os.O_RDWR|os.O_CREATE|os.O_APPEND, + 0644, + ) + if err != nil { + return nil, err + } + if s.store, err = newStore(storeFile); err != nil { + return nil, err + } + + // Index creation. + indexFile, err := os.OpenFile( + path.Join(dir, fmt.Sprintf("%d%s", baseOffset, ".index")), + os.O_RDWR|os.O_CREATE, + 0644, + ) + if err != nil { + return nil, err + } + if s.index, err = newIndex(indexFile, c); err != nil { + return nil, err + } + + // If Read(-1) returns an error it means that the index's underlying file + // and this itself is empty. + if off, _, err := s.index.Read(-1); err != nil { + s.nextOffset = baseOffset + } else { + // Add one because the relative offsets from the index start at 0. + s.nextOffset = baseOffset + uint64(off) + 1 + } + + return s, nil +} + +func (s *segment) Append(record *api.Record) (offset uint64, err error) { + cur := s.nextOffset + record.Offset = cur + + p, err := proto.Marshal(record) + if err != nil { + return 0, err + } + + _, pos, err := s.store.Append(p) + if err != nil { + return 0, err + } + + if err = s.index.Write( + // Index offsets are relative to baseOffset. + uint32(s.nextOffset-uint64(s.baseOffset)), + pos, + ); err != nil { + return 0, err + } + s.nextOffset += 1 + return cur, nil +} + +func (s *segment) Read(off uint64) (*api.Record, error) { + _, pos, err := s.index.Read(int64(off - s.baseOffset)) + if err != nil { + return nil, err + } + + p, err := s.store.Read(pos) + if err != nil { + return nil, err + } + + record := &api.Record{} + err = proto.Unmarshal(p, record) + return record, err +} + +func (s *segment) IsMaxed() bool { + return s.store.size >= s.config.Segment.MaxStoreBytes || + s.index.size >= s.config.Segment.MaxIndexBytes +} + +func (s *segment) Close() error { + if err := s.index.Close(); err != nil { + return err + } + if err := s.store.Close(); err != nil { + return err + } + return nil +} + +func (s *segment) Remove() error { + if err := s.Close(); err != nil { + return err + } + + if err := os.Remove(s.index.Name()); err != nil { + return err + } + + if err := os.Remove(s.store.Name()); err != nil { + return err + } + + return nil +} + +// nearestMultiple returns the nearest and lesser multiple of k in j. +// example: nearestMultiple(9, 4) = 8 +func nearestMultiple(j, k uint64) uint64 { + if j >= 0 { + return (j / k) * k + } + return ((j - k + 1) / k) * k +} diff --git a/internal/log/segment_test.go b/internal/log/segment_test.go new file mode 100644 index 0000000..34c63d8 --- /dev/null +++ b/internal/log/segment_test.go @@ -0,0 +1,60 @@ +package log + +import ( + "io" + "io/ioutil" + "os" + "testing" + + api "github.com/AYM1607/proglog/api/v1" + "github.com/stretchr/testify/require" +) + +func TestSegment(t *testing.T) { + dir, _ := ioutil.TempDir("", "segment-test") + defer os.RemoveAll(dir) + + want := &api.Record{Value: []byte("hello world")} + + c := Config{} + c.Segment.MaxIndexBytes = entWidth * 3 + c.Segment.MaxStoreBytes = 1024 + + s, err := newSegment(dir, 16, c) + require.NoError(t, err, "The segment is created successfully") + require.Equal(t, uint64(16), s.nextOffset, "Next offset should be equal to base offset for new segments.") + + for i := uint64(0); i < 3; i++ { + off, err := s.Append(want) + require.NoError(t, err, "Record should be appended successfully.") + require.Equal(t, off, 16+i, "The offsets should be increase by 1 with respect to the base offset.") + + got, err := s.Read(off) + require.NoError(t, err, "Existing records should be read successfully.") + require.Equal(t, want.Value, got.Value, "Record's read data should be the same as the written one.") + } + + _, err = s.Append(want) + require.Equal(t, io.EOF, err, "Appends should fail if the segment is maxed.") + + require.True(t, s.IsMaxed(), "IsMaxed should return true when the segment's index is maxed.") + + // Create a new segment from the same files with a different configuration. + c.Segment.MaxIndexBytes = 1024 + // The length of the record's value is not the same as the byte size of the + // marshalled data which includes other fields. By setting this as the limit though, + // we assure that the store will be maxed with only 3 records. + c.Segment.MaxStoreBytes = uint64(len(want.Value) * 3) + + s, err = newSegment(dir, 16, c) + require.NoError(t, err, "Segment should be created successfully from existing files.") + require.True(t, s.IsMaxed(), "IsMaxed should return true when the segment's store is maxed.") + + err = s.Remove() + require.NoError(t, err, "Segment should be able to remove itself.") + + s, err = newSegment(dir, 16, c) + require.NoError(t, err, "Segment should create the necessary files after removed.") + require.False(t, s.IsMaxed(), "Segment should not be maxed if new and config's limits are non-zero.") + +}