Finished core log package.
This commit is contained in:
parent
ce0f4b7bb4
commit
60f6040fe3
2 changed files with 344 additions and 0 deletions
218
internal/log/log.go
Normal file
218
internal/log/log.go
Normal file
|
|
@ -0,0 +1,218 @@
|
|||
package log
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
api "github.com/AYM1607/proglog/api/v1"
|
||||
)
|
||||
|
||||
type Log struct {
|
||||
mu sync.RWMutex
|
||||
|
||||
Dir string
|
||||
Config Config
|
||||
|
||||
activeSegment *segment
|
||||
segments []*segment
|
||||
}
|
||||
|
||||
func NewLog(dir string, c Config) (*Log, error) {
|
||||
// Deal with default values for max index and store sizes.
|
||||
if c.Segment.MaxIndexBytes == 0 {
|
||||
c.Segment.MaxIndexBytes = 1024
|
||||
}
|
||||
if c.Segment.MaxStoreBytes == 0 {
|
||||
c.Segment.MaxStoreBytes = 1024
|
||||
}
|
||||
|
||||
l := &Log{
|
||||
Dir: dir,
|
||||
Config: c,
|
||||
}
|
||||
|
||||
return l, l.setup()
|
||||
}
|
||||
|
||||
func (l *Log) setup() error {
|
||||
files, err := ioutil.ReadDir(l.Dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Get all the base offsets for the existing segments. This is posible because
|
||||
// the .index and .store files have their base offset as their name.
|
||||
var baseOffsets []uint64
|
||||
for _, file := range files {
|
||||
offStr := strings.TrimSuffix(
|
||||
file.Name(),
|
||||
path.Ext(file.Name()),
|
||||
)
|
||||
off, _ := strconv.ParseUint(offStr, 10, 0)
|
||||
baseOffsets = append(baseOffsets, off)
|
||||
}
|
||||
|
||||
// Sort ascending.
|
||||
sort.Slice(baseOffsets, func(i, j int) bool {
|
||||
return baseOffsets[i] < baseOffsets[j]
|
||||
})
|
||||
|
||||
// Create a segment for each of the base offsets.
|
||||
for i := 0; i < len(baseOffsets); i++ {
|
||||
if err = l.newSegment(baseOffsets[i]); err != nil {
|
||||
return nil
|
||||
}
|
||||
// baseOffsets contains 2 entries for each base offset, one for the index and one for the store.
|
||||
// dedup.
|
||||
i++
|
||||
}
|
||||
|
||||
// nil is the zero value for a slice, check if the log is new (no segments)
|
||||
if l.segments == nil {
|
||||
if err = l.newSegment(
|
||||
l.Config.Segment.InitialOffset,
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *Log) Append(record *api.Record) (uint64, error) {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
off, err := l.activeSegment.Append(record)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if l.activeSegment.IsMaxed() {
|
||||
// I don't know if it's the best thing to return this error, because
|
||||
// the core operation (appending) is correctly performed at this point.
|
||||
// The new segment creation failed but the record was written and is now part of the active segment.
|
||||
err = l.newSegment(off + 1)
|
||||
}
|
||||
|
||||
return off, err
|
||||
}
|
||||
|
||||
func (l *Log) Read(off uint64) (*api.Record, error) {
|
||||
l.mu.RLock()
|
||||
defer l.mu.RUnlock()
|
||||
var s *segment
|
||||
for _, segsegment := range l.segments {
|
||||
if segsegment.baseOffset <= off && off < segsegment.nextOffset {
|
||||
s = segsegment
|
||||
break
|
||||
}
|
||||
}
|
||||
// The condition after the of shouldn't matter because we check for this in the for.
|
||||
// The code comes from the book.
|
||||
if s == nil || s.nextOffset <= off {
|
||||
return nil, fmt.Errorf("offset out of range: %d", off)
|
||||
}
|
||||
return s.Read(off)
|
||||
}
|
||||
|
||||
func (l *Log) Close() error {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
for _, segment := range l.segments {
|
||||
if err := segment.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *Log) Remove() error {
|
||||
if err := l.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return os.RemoveAll(l.Dir)
|
||||
}
|
||||
|
||||
func (l *Log) Replace() error {
|
||||
if err := l.Remove(); err != nil {
|
||||
return err
|
||||
}
|
||||
return l.setup()
|
||||
}
|
||||
|
||||
func (l *Log) LowestOffset() (uint64, error) {
|
||||
l.mu.RLock()
|
||||
defer l.mu.RUnlock()
|
||||
return l.segments[0].baseOffset, nil
|
||||
}
|
||||
|
||||
func (l *Log) HighestOffset() (uint64, error) {
|
||||
l.mu.RLock()
|
||||
defer l.mu.RUnlock()
|
||||
off := l.segments[len(l.segments)-1].nextOffset
|
||||
if off == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
return off - 1, nil
|
||||
}
|
||||
|
||||
func (l *Log) Truncate(lowest uint64) error {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
var segments []*segment
|
||||
for _, s := range l.segments {
|
||||
// Remove any segments whose highest offset is smaller than the lowest to truncate.
|
||||
if s.nextOffset <= lowest {
|
||||
if err := s.Remove(); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
segments = append(segments, s)
|
||||
}
|
||||
l.segments = segments
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reader returns a reader that allows to read all the log records subsequently even though they're in different segments.
|
||||
func (l *Log) Reader() io.Reader {
|
||||
l.mu.RLock()
|
||||
defer l.mu.RUnlock()
|
||||
|
||||
readers := make([]io.Reader, len(l.segments))
|
||||
for i, segment := range l.segments {
|
||||
readers[i] = &originReader{segment.store, 0}
|
||||
}
|
||||
return io.MultiReader(readers...)
|
||||
}
|
||||
|
||||
type originReader struct {
|
||||
*store
|
||||
off int64
|
||||
}
|
||||
|
||||
func (o *originReader) Read(p []byte) (int, error) {
|
||||
n, err := o.ReadAt(p, o.off)
|
||||
o.off += int64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (l *Log) newSegment(off uint64) error {
|
||||
s, err := newSegment(l.Dir, off, l.Config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
l.segments = append(l.segments, s)
|
||||
l.activeSegment = s
|
||||
return nil
|
||||
}
|
||||
126
internal/log/log_test.go
Normal file
126
internal/log/log_test.go
Normal file
|
|
@ -0,0 +1,126 @@
|
|||
package log
|
||||
|
||||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
api "github.com/AYM1607/proglog/api/v1"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
func TestLog(t *testing.T) {
|
||||
for scenario, fn := range map[string]func(
|
||||
t *testing.T, log *Log,
|
||||
){
|
||||
"append and rea a record succeeds": testAppendRead,
|
||||
"offset out of range error": testOutOfRangeErr,
|
||||
"init with existing segments": testInitExisting,
|
||||
"reader": testReader,
|
||||
"truncate": testTruncate,
|
||||
} {
|
||||
t.Run(scenario, func(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "log-test")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
c := Config{}
|
||||
// This ensures that each segment can only hold one record.
|
||||
c.Segment.MaxIndexBytes = entWidth
|
||||
log, err := NewLog(dir, c)
|
||||
require.NoError(t, err)
|
||||
|
||||
fn(t, log)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testAppendRead(t *testing.T, log *Log) {
|
||||
apnd := &api.Record{
|
||||
Value: []byte("hello world"),
|
||||
}
|
||||
off, err := log.Append(apnd)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(0), off)
|
||||
|
||||
read, err := log.Read(off)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, apnd.Value, read.Value)
|
||||
}
|
||||
|
||||
func testOutOfRangeErr(t *testing.T, log *Log) {
|
||||
read, err := log.Read(1)
|
||||
require.Nil(t, read)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
||||
func testInitExisting(t *testing.T, o *Log) {
|
||||
apnd := &api.Record{
|
||||
Value: []byte("hello world"),
|
||||
}
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
_, err := o.Append(apnd)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, o.Close())
|
||||
|
||||
off, err := o.LowestOffset()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(0), off)
|
||||
off, err = o.HighestOffset()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(2), off)
|
||||
|
||||
// Create a new log from the directory and config of the old one.
|
||||
n, err := NewLog(o.Dir, o.Config)
|
||||
require.NoError(t, err)
|
||||
|
||||
off, err = n.LowestOffset()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(0), off)
|
||||
off, err = n.HighestOffset()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(2), off)
|
||||
|
||||
}
|
||||
|
||||
func testReader(t *testing.T, log *Log) {
|
||||
apnd := &api.Record{
|
||||
Value: []byte("hello world"),
|
||||
}
|
||||
|
||||
off, err := log.Append(apnd)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(0), off)
|
||||
|
||||
reader := log.Reader()
|
||||
b, err := io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
|
||||
read := &api.Record{}
|
||||
// Store writes the length as a prefix to the binary content so we have to skip it.
|
||||
err = proto.Unmarshal(b[lenWidth:], read)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, apnd.Value, read.Value)
|
||||
}
|
||||
|
||||
func testTruncate(t *testing.T, log *Log) {
|
||||
apnd := &api.Record{
|
||||
Value: []byte("hello world"),
|
||||
}
|
||||
|
||||
// Because of the configured store limit, each segment should only contain a single record.
|
||||
for i := 0; i < 3; i++ {
|
||||
_, err := log.Append(apnd)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
err := log.Truncate(1)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = log.Read(0)
|
||||
require.Error(t, err)
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue