package log import ( "io" "io/ioutil" "os" "path" "sort" "strconv" "strings" "sync" api "github.com/AYM1607/proglog/api/v1" ) type Log struct { mu sync.RWMutex Dir string Config Config activeSegment *segment segments []*segment } func NewLog(dir string, c Config) (*Log, error) { // Deal with default values for max index and store sizes. if c.Segment.MaxIndexBytes == 0 { c.Segment.MaxIndexBytes = 1024 } if c.Segment.MaxStoreBytes == 0 { c.Segment.MaxStoreBytes = 1024 } l := &Log{ Dir: dir, Config: c, } return l, l.setup() } func (l *Log) setup() error { files, err := ioutil.ReadDir(l.Dir) if err != nil { return err } // Get all the base offsets for the existing segments. This is posible because // the .index and .store files have their base offset as their name. var baseOffsets []uint64 for _, file := range files { offStr := strings.TrimSuffix( file.Name(), path.Ext(file.Name()), ) off, _ := strconv.ParseUint(offStr, 10, 0) baseOffsets = append(baseOffsets, off) } // Sort ascending. sort.Slice(baseOffsets, func(i, j int) bool { return baseOffsets[i] < baseOffsets[j] }) // Create a segment for each of the base offsets. for i := 0; i < len(baseOffsets); i++ { if err = l.newSegment(baseOffsets[i]); err != nil { return nil } // baseOffsets contains 2 entries for each base offset, one for the index and one for the store. // dedup. i++ } // nil is the zero value for a slice, check if the log is new (no segments) if l.segments == nil { if err = l.newSegment( l.Config.Segment.InitialOffset, ); err != nil { return err } } return nil } func (l *Log) Append(record *api.Record) (uint64, error) { l.mu.Lock() defer l.mu.Unlock() off, err := l.activeSegment.Append(record) if err != nil { return 0, err } if l.activeSegment.IsMaxed() { // I don't know if it's the best thing to return this error, because // the core operation (appending) is correctly performed at this point. // The new segment creation failed but the record was written and is now part of the active segment. err = l.newSegment(off + 1) } return off, err } func (l *Log) Read(off uint64) (*api.Record, error) { l.mu.RLock() defer l.mu.RUnlock() var s *segment for _, segsegment := range l.segments { if segsegment.baseOffset <= off && off < segsegment.nextOffset { s = segsegment break } } // The condition after the of shouldn't matter because we check for this in the for. // The code comes from the book. if s == nil || s.nextOffset <= off { return nil, api.ErrOffsetOutOfRange{Offset: off} } return s.Read(off) } func (l *Log) Close() error { l.mu.Lock() defer l.mu.Unlock() for _, segment := range l.segments { if err := segment.Close(); err != nil { return err } } return nil } func (l *Log) Remove() error { if err := l.Close(); err != nil { return err } return os.RemoveAll(l.Dir) } func (l *Log) Replace() error { if err := l.Remove(); err != nil { return err } return l.setup() } func (l *Log) LowestOffset() (uint64, error) { l.mu.RLock() defer l.mu.RUnlock() return l.segments[0].baseOffset, nil } func (l *Log) HighestOffset() (uint64, error) { l.mu.RLock() defer l.mu.RUnlock() off := l.segments[len(l.segments)-1].nextOffset if off == 0 { return 0, nil } return off - 1, nil } func (l *Log) Truncate(lowest uint64) error { l.mu.Lock() defer l.mu.Unlock() var segments []*segment for _, s := range l.segments { // Remove any segments whose highest offset is smaller than the lowest to truncate. if s.nextOffset <= lowest { if err := s.Remove(); err != nil { return err } continue } segments = append(segments, s) } l.segments = segments return nil } // Reader returns a reader that allows to read all the log records subsequently even though they're in different segments. func (l *Log) Reader() io.Reader { l.mu.RLock() defer l.mu.RUnlock() readers := make([]io.Reader, len(l.segments)) for i, segment := range l.segments { readers[i] = &originReader{segment.store, 0} } return io.MultiReader(readers...) } type originReader struct { *store off int64 } func (o *originReader) Read(p []byte) (int, error) { n, err := o.ReadAt(p, o.off) o.off += int64(n) return n, err } func (l *Log) newSegment(off uint64) error { s, err := newSegment(l.Dir, off, l.Config) if err != nil { return err } l.segments = append(l.segments, s) l.activeSegment = s return nil }