Delete all files to re-write the code.
This commit is contained in:
parent
fb58fc61fa
commit
6f03983721
30 changed files with 0 additions and 2834 deletions
|
|
@ -1,34 +0,0 @@
|
|||
package auth
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/casbin/casbin"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func New(model, policy string) *Authorizer {
|
||||
enforcer := casbin.NewEnforcer(model, policy)
|
||||
return &Authorizer{
|
||||
enforcer: enforcer,
|
||||
}
|
||||
}
|
||||
|
||||
type Authorizer struct {
|
||||
enforcer *casbin.Enforcer
|
||||
}
|
||||
|
||||
func (a *Authorizer) Authorize(subject, object, action string) error {
|
||||
if !a.enforcer.Enforce(subject, object, action) {
|
||||
msg := fmt.Sprintf(
|
||||
"%s not permitted to %s to %s",
|
||||
subject,
|
||||
action,
|
||||
object,
|
||||
)
|
||||
st := status.New(codes.PermissionDenied, msg)
|
||||
return st.Err()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -1,30 +0,0 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
var (
|
||||
CAFile = configFile("ca.pem")
|
||||
ServerCertFile = configFile("server.pem")
|
||||
ServerKeyFile = configFile("server-key.pem")
|
||||
RootClientCertFile = configFile("root-client.pem")
|
||||
RootClientKeyFile = configFile("root-client-key.pem")
|
||||
NobodyClientCertFile = configFile("nobody-client.pem")
|
||||
NobodyClientKeyFile = configFile("nobody-client-key.pem")
|
||||
ACLModelFile = configFile("model.conf")
|
||||
ACLPolicyFile = configFile("policy.csv")
|
||||
)
|
||||
|
||||
func configFile(filename string) string {
|
||||
if dir := os.Getenv("CONFIG_DIR"); dir != "" {
|
||||
return filepath.Join(dir, filename)
|
||||
}
|
||||
|
||||
homeDir, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return filepath.Join(homeDir, ".proglog", filename)
|
||||
}
|
||||
|
|
@ -1,59 +0,0 @@
|
|||
package config
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
)
|
||||
|
||||
func SetupTLSConfig(cfg TLSConfig) (*tls.Config, error) {
|
||||
var err error
|
||||
tlsConfig := &tls.Config{}
|
||||
// Load key pair for the server.
|
||||
if cfg.CertFile != "" && cfg.KeyFile != "" {
|
||||
tlsConfig.Certificates = make([]tls.Certificate, 1)
|
||||
tlsConfig.Certificates[0], err = tls.LoadX509KeyPair(
|
||||
cfg.CertFile,
|
||||
cfg.KeyFile,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if cfg.CAFile != "" {
|
||||
b, err := ioutil.ReadFile(cfg.CAFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ca := x509.NewCertPool()
|
||||
ok := ca.AppendCertsFromPEM(b)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf(
|
||||
"failed to parse root certificate: %q",
|
||||
cfg.CAFile,
|
||||
)
|
||||
}
|
||||
if cfg.Server {
|
||||
tlsConfig.ClientCAs = ca
|
||||
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
|
||||
} else {
|
||||
tlsConfig.RootCAs = ca
|
||||
}
|
||||
tlsConfig.ServerName = cfg.ServerAddress
|
||||
}
|
||||
|
||||
return tlsConfig, nil
|
||||
|
||||
}
|
||||
|
||||
type TLSConfig struct {
|
||||
CertFile string
|
||||
KeyFile string
|
||||
CAFile string
|
||||
ServerAddress string
|
||||
// If this is true, the server validates the authenticity of client certificates.
|
||||
// Meaning we can use mutual TLS authentication.
|
||||
Server bool
|
||||
}
|
||||
|
|
@ -1,9 +0,0 @@
|
|||
package log
|
||||
|
||||
type Config struct {
|
||||
Segment struct {
|
||||
MaxStoreBytes uint64
|
||||
MaxIndexBytes uint64
|
||||
InitialOffset uint64
|
||||
}
|
||||
}
|
||||
|
|
@ -1,100 +0,0 @@
|
|||
package log
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/tysontate/gommap"
|
||||
)
|
||||
|
||||
var (
|
||||
offWidth uint64 = 4
|
||||
posWidth uint64 = 8
|
||||
entWidth = offWidth + posWidth
|
||||
)
|
||||
|
||||
type index struct {
|
||||
file *os.File
|
||||
mmap gommap.MMap
|
||||
size uint64
|
||||
}
|
||||
|
||||
func newIndex(f *os.File, c Config) (*index, error) {
|
||||
idx := &index{
|
||||
file: f,
|
||||
}
|
||||
fi, err := os.Stat(f.Name())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Get and store the file size.
|
||||
idx.size = uint64(fi.Size())
|
||||
// Grow the file to its max size so it can be memory mapped as a whole.
|
||||
if err = os.Truncate(
|
||||
f.Name(), int64(c.Segment.MaxIndexBytes),
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Map the grown file to memory.
|
||||
if idx.mmap, err = gommap.Map(
|
||||
idx.file.Fd(),
|
||||
gommap.PROT_READ|gommap.PROT_WRITE,
|
||||
gommap.MAP_SHARED,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return idx, nil
|
||||
}
|
||||
|
||||
func (i *index) Read(in int64) (out uint32, pos uint64, err error) {
|
||||
if i.size == 0 {
|
||||
return 0, 0, io.EOF
|
||||
}
|
||||
if in == -1 {
|
||||
// The last entity was requested
|
||||
out = uint32((i.size / entWidth) - 1)
|
||||
} else {
|
||||
out = uint32(in)
|
||||
}
|
||||
|
||||
pos = uint64(out) * entWidth
|
||||
|
||||
// Check that the index actually contains the record with the given offset.
|
||||
if i.size < pos+entWidth {
|
||||
return 0, 0, io.EOF
|
||||
}
|
||||
|
||||
out = enc.Uint32(i.mmap[pos : pos+offWidth])
|
||||
pos = enc.Uint64(i.mmap[pos+offWidth : pos+entWidth])
|
||||
return out, pos, nil
|
||||
}
|
||||
|
||||
func (i *index) Write(off uint32, pos uint64) error {
|
||||
if uint64(len(i.mmap)) < i.size+entWidth {
|
||||
return io.EOF
|
||||
}
|
||||
enc.PutUint32(i.mmap[i.size:i.size+offWidth], off)
|
||||
enc.PutUint64(i.mmap[i.size+offWidth:i.size+entWidth], pos)
|
||||
i.size += entWidth
|
||||
return nil
|
||||
}
|
||||
|
||||
func (i *index) Name() string {
|
||||
return i.file.Name()
|
||||
}
|
||||
|
||||
func (i *index) Close() error {
|
||||
if err := i.mmap.Sync(gommap.MS_SYNC); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := i.file.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := i.file.Truncate(int64(i.size)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return i.file.Close()
|
||||
}
|
||||
|
|
@ -1,57 +0,0 @@
|
|||
package log
|
||||
|
||||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestIndex(t *testing.T) {
|
||||
// Create a temp file.
|
||||
f, err := ioutil.TempFile("", "index_tes")
|
||||
require.NoError(t, err)
|
||||
defer os.Remove(f.Name())
|
||||
|
||||
// Use a configuration that allows enough bytes for the test.
|
||||
c := Config{}
|
||||
c.Segment.MaxIndexBytes = 1024
|
||||
idx, err := newIndex(f, c)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, _, err = idx.Read(-1)
|
||||
require.Error(t, err, "Read should return an error on empty index.")
|
||||
|
||||
require.Equal(t, f.Name(), idx.Name(), "Name should return the same name as the underlying file.")
|
||||
|
||||
entries := []struct {
|
||||
Off uint32
|
||||
Pos uint64
|
||||
}{
|
||||
{Off: 0, Pos: 0},
|
||||
{Off: 1, Pos: 10},
|
||||
}
|
||||
|
||||
for _, want := range entries {
|
||||
err = idx.Write(want.Off, want.Pos)
|
||||
require.NoError(t, err, "No error when writing to an index with enough space.")
|
||||
|
||||
_, pos, err := idx.Read(int64(want.Off))
|
||||
require.NoError(t, err, "No error when reading an existing record.")
|
||||
require.Equal(t, want.Pos, pos, "Read pos should be the same as the read one.")
|
||||
}
|
||||
|
||||
_, _, err = idx.Read(int64(len(entries)))
|
||||
require.Equal(t, io.EOF, err, "Read should error with EOF when reading past the index records.")
|
||||
_ = idx.Close()
|
||||
|
||||
f, _ = os.OpenFile(f.Name(), os.O_RDWR, 0600)
|
||||
idx, err = newIndex(f, c)
|
||||
require.NoError(t, err, "No error when creating index from an existing file.")
|
||||
off, pos, err := idx.Read(-1)
|
||||
require.NoError(t, err, "No error when reading the last record of a non empty index.")
|
||||
require.Equal(t, uint32(1), off)
|
||||
require.Equal(t, entries[1].Pos, pos)
|
||||
}
|
||||
|
|
@ -1,217 +0,0 @@
|
|||
package log
|
||||
|
||||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
api "github.com/AYM1607/proglog/api/v1"
|
||||
)
|
||||
|
||||
type Log struct {
|
||||
mu sync.RWMutex
|
||||
|
||||
Dir string
|
||||
Config Config
|
||||
|
||||
activeSegment *segment
|
||||
segments []*segment
|
||||
}
|
||||
|
||||
func NewLog(dir string, c Config) (*Log, error) {
|
||||
// Deal with default values for max index and store sizes.
|
||||
if c.Segment.MaxIndexBytes == 0 {
|
||||
c.Segment.MaxIndexBytes = 1024
|
||||
}
|
||||
if c.Segment.MaxStoreBytes == 0 {
|
||||
c.Segment.MaxStoreBytes = 1024
|
||||
}
|
||||
|
||||
l := &Log{
|
||||
Dir: dir,
|
||||
Config: c,
|
||||
}
|
||||
|
||||
return l, l.setup()
|
||||
}
|
||||
|
||||
func (l *Log) setup() error {
|
||||
files, err := ioutil.ReadDir(l.Dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Get all the base offsets for the existing segments. This is posible because
|
||||
// the .index and .store files have their base offset as their name.
|
||||
var baseOffsets []uint64
|
||||
for _, file := range files {
|
||||
offStr := strings.TrimSuffix(
|
||||
file.Name(),
|
||||
path.Ext(file.Name()),
|
||||
)
|
||||
off, _ := strconv.ParseUint(offStr, 10, 0)
|
||||
baseOffsets = append(baseOffsets, off)
|
||||
}
|
||||
|
||||
// Sort ascending.
|
||||
sort.Slice(baseOffsets, func(i, j int) bool {
|
||||
return baseOffsets[i] < baseOffsets[j]
|
||||
})
|
||||
|
||||
// Create a segment for each of the base offsets.
|
||||
for i := 0; i < len(baseOffsets); i++ {
|
||||
if err = l.newSegment(baseOffsets[i]); err != nil {
|
||||
return nil
|
||||
}
|
||||
// baseOffsets contains 2 entries for each base offset, one for the index and one for the store.
|
||||
// dedup.
|
||||
i++
|
||||
}
|
||||
|
||||
// nil is the zero value for a slice, check if the log is new (no segments)
|
||||
if l.segments == nil {
|
||||
if err = l.newSegment(
|
||||
l.Config.Segment.InitialOffset,
|
||||
); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *Log) Append(record *api.Record) (uint64, error) {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
off, err := l.activeSegment.Append(record)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if l.activeSegment.IsMaxed() {
|
||||
// I don't know if it's the best thing to return this error, because
|
||||
// the core operation (appending) is correctly performed at this point.
|
||||
// The new segment creation failed but the record was written and is now part of the active segment.
|
||||
err = l.newSegment(off + 1)
|
||||
}
|
||||
|
||||
return off, err
|
||||
}
|
||||
|
||||
func (l *Log) Read(off uint64) (*api.Record, error) {
|
||||
l.mu.RLock()
|
||||
defer l.mu.RUnlock()
|
||||
var s *segment
|
||||
for _, segsegment := range l.segments {
|
||||
if segsegment.baseOffset <= off && off < segsegment.nextOffset {
|
||||
s = segsegment
|
||||
break
|
||||
}
|
||||
}
|
||||
// The condition after the of shouldn't matter because we check for this in the for.
|
||||
// The code comes from the book.
|
||||
if s == nil || s.nextOffset <= off {
|
||||
return nil, api.ErrOffsetOutOfRange{Offset: off}
|
||||
}
|
||||
return s.Read(off)
|
||||
}
|
||||
|
||||
func (l *Log) Close() error {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
for _, segment := range l.segments {
|
||||
if err := segment.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *Log) Remove() error {
|
||||
if err := l.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return os.RemoveAll(l.Dir)
|
||||
}
|
||||
|
||||
func (l *Log) Replace() error {
|
||||
if err := l.Remove(); err != nil {
|
||||
return err
|
||||
}
|
||||
return l.setup()
|
||||
}
|
||||
|
||||
func (l *Log) LowestOffset() (uint64, error) {
|
||||
l.mu.RLock()
|
||||
defer l.mu.RUnlock()
|
||||
return l.segments[0].baseOffset, nil
|
||||
}
|
||||
|
||||
func (l *Log) HighestOffset() (uint64, error) {
|
||||
l.mu.RLock()
|
||||
defer l.mu.RUnlock()
|
||||
off := l.segments[len(l.segments)-1].nextOffset
|
||||
if off == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
return off - 1, nil
|
||||
}
|
||||
|
||||
func (l *Log) Truncate(lowest uint64) error {
|
||||
l.mu.Lock()
|
||||
defer l.mu.Unlock()
|
||||
|
||||
var segments []*segment
|
||||
for _, s := range l.segments {
|
||||
// Remove any segments whose highest offset is smaller than the lowest to truncate.
|
||||
if s.nextOffset <= lowest {
|
||||
if err := s.Remove(); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
segments = append(segments, s)
|
||||
}
|
||||
l.segments = segments
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reader returns a reader that allows to read all the log records subsequently even though they're in different segments.
|
||||
func (l *Log) Reader() io.Reader {
|
||||
l.mu.RLock()
|
||||
defer l.mu.RUnlock()
|
||||
|
||||
readers := make([]io.Reader, len(l.segments))
|
||||
for i, segment := range l.segments {
|
||||
readers[i] = &originReader{segment.store, 0}
|
||||
}
|
||||
return io.MultiReader(readers...)
|
||||
}
|
||||
|
||||
type originReader struct {
|
||||
*store
|
||||
off int64
|
||||
}
|
||||
|
||||
func (o *originReader) Read(p []byte) (int, error) {
|
||||
n, err := o.ReadAt(p, o.off)
|
||||
o.off += int64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (l *Log) newSegment(off uint64) error {
|
||||
s, err := newSegment(l.Dir, off, l.Config)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
l.segments = append(l.segments, s)
|
||||
l.activeSegment = s
|
||||
return nil
|
||||
}
|
||||
|
|
@ -1,127 +0,0 @@
|
|||
package log
|
||||
|
||||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
api "github.com/AYM1607/proglog/api/v1"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
func TestLog(t *testing.T) {
|
||||
for scenario, fn := range map[string]func(
|
||||
t *testing.T, log *Log,
|
||||
){
|
||||
"append and rea a record succeeds": testAppendRead,
|
||||
"offset out of range error": testOutOfRangeErr,
|
||||
"init with existing segments": testInitExisting,
|
||||
"reader": testReader,
|
||||
"truncate": testTruncate,
|
||||
} {
|
||||
t.Run(scenario, func(t *testing.T) {
|
||||
dir, err := ioutil.TempDir("", "log-test")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
c := Config{}
|
||||
// This ensures that each segment can only hold one record.
|
||||
c.Segment.MaxIndexBytes = entWidth
|
||||
log, err := NewLog(dir, c)
|
||||
require.NoError(t, err)
|
||||
|
||||
fn(t, log)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testAppendRead(t *testing.T, log *Log) {
|
||||
apnd := &api.Record{
|
||||
Value: []byte("hello world"),
|
||||
}
|
||||
off, err := log.Append(apnd)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(0), off)
|
||||
|
||||
read, err := log.Read(off)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, apnd.Value, read.Value)
|
||||
}
|
||||
|
||||
func testOutOfRangeErr(t *testing.T, log *Log) {
|
||||
read, err := log.Read(1)
|
||||
require.Nil(t, read)
|
||||
apiErr := err.(api.ErrOffsetOutOfRange)
|
||||
require.Equal(t, uint64(1), apiErr.Offset)
|
||||
}
|
||||
|
||||
func testInitExisting(t *testing.T, o *Log) {
|
||||
apnd := &api.Record{
|
||||
Value: []byte("hello world"),
|
||||
}
|
||||
|
||||
for i := 0; i < 3; i++ {
|
||||
_, err := o.Append(apnd)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
require.NoError(t, o.Close())
|
||||
|
||||
off, err := o.LowestOffset()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(0), off)
|
||||
off, err = o.HighestOffset()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(2), off)
|
||||
|
||||
// Create a new log from the directory and config of the old one.
|
||||
n, err := NewLog(o.Dir, o.Config)
|
||||
require.NoError(t, err)
|
||||
|
||||
off, err = n.LowestOffset()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(0), off)
|
||||
off, err = n.HighestOffset()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(2), off)
|
||||
|
||||
}
|
||||
|
||||
func testReader(t *testing.T, log *Log) {
|
||||
apnd := &api.Record{
|
||||
Value: []byte("hello world"),
|
||||
}
|
||||
|
||||
off, err := log.Append(apnd)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(0), off)
|
||||
|
||||
reader := log.Reader()
|
||||
b, err := io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
|
||||
read := &api.Record{}
|
||||
// Store writes the length as a prefix to the binary content so we have to skip it.
|
||||
err = proto.Unmarshal(b[lenWidth:], read)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, apnd.Value, read.Value)
|
||||
}
|
||||
|
||||
func testTruncate(t *testing.T, log *Log) {
|
||||
apnd := &api.Record{
|
||||
Value: []byte("hello world"),
|
||||
}
|
||||
|
||||
// Because of the configured store limit, each segment should only contain a single record.
|
||||
for i := 0; i < 3; i++ {
|
||||
_, err := log.Append(apnd)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
err := log.Truncate(1)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, err = log.Read(0)
|
||||
require.Error(t, err)
|
||||
}
|
||||
|
|
@ -1,146 +0,0 @@
|
|||
package log
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
|
||||
api "github.com/AYM1607/proglog/api/v1"
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
type segment struct {
|
||||
store *store
|
||||
index *index
|
||||
baseOffset, nextOffset uint64
|
||||
config Config
|
||||
}
|
||||
|
||||
func newSegment(dir string, baseOffset uint64, c Config) (*segment, error) {
|
||||
s := &segment{
|
||||
baseOffset: baseOffset,
|
||||
config: c,
|
||||
}
|
||||
|
||||
// This code comes from the book, why declare the error as a variable?
|
||||
var err error
|
||||
|
||||
// Store creation.
|
||||
storeFile, err := os.OpenFile(
|
||||
path.Join(dir, fmt.Sprintf("%d%s", baseOffset, ".store")),
|
||||
os.O_RDWR|os.O_CREATE|os.O_APPEND,
|
||||
0644,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.store, err = newStore(storeFile); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Index creation.
|
||||
indexFile, err := os.OpenFile(
|
||||
path.Join(dir, fmt.Sprintf("%d%s", baseOffset, ".index")),
|
||||
os.O_RDWR|os.O_CREATE,
|
||||
0644,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if s.index, err = newIndex(indexFile, c); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// If Read(-1) returns an error it means that the index's underlying file
|
||||
// and thus itself is empty.
|
||||
if off, _, err := s.index.Read(-1); err != nil {
|
||||
s.nextOffset = baseOffset
|
||||
} else {
|
||||
// Add one because the relative offsets from the index start at 0.
|
||||
s.nextOffset = baseOffset + uint64(off) + 1
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *segment) Append(record *api.Record) (offset uint64, err error) {
|
||||
cur := s.nextOffset
|
||||
record.Offset = cur
|
||||
|
||||
p, err := proto.Marshal(record)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
_, pos, err := s.store.Append(p)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
if err = s.index.Write(
|
||||
// Index offsets are relative to baseOffset.
|
||||
uint32(s.nextOffset-uint64(s.baseOffset)),
|
||||
pos,
|
||||
); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
s.nextOffset += 1
|
||||
return cur, nil
|
||||
}
|
||||
|
||||
func (s *segment) Read(off uint64) (*api.Record, error) {
|
||||
_, pos, err := s.index.Read(int64(off - s.baseOffset))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p, err := s.store.Read(pos)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
record := &api.Record{}
|
||||
err = proto.Unmarshal(p, record)
|
||||
return record, err
|
||||
}
|
||||
|
||||
func (s *segment) IsMaxed() bool {
|
||||
return s.store.size >= s.config.Segment.MaxStoreBytes ||
|
||||
s.index.size >= s.config.Segment.MaxIndexBytes
|
||||
}
|
||||
|
||||
func (s *segment) Close() error {
|
||||
if err := s.index.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.store.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *segment) Remove() error {
|
||||
if err := s.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := os.Remove(s.index.Name()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := os.Remove(s.store.Name()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// nearestMultiple returns the nearest and lesser multiple of k in j.
|
||||
// example: nearestMultiple(9, 4) = 8
|
||||
func nearestMultiple(j, k uint64) uint64 {
|
||||
// This code comes from the book. is this check necessary since the passed values are uint64?
|
||||
if j >= 0 {
|
||||
return (j / k) * k
|
||||
}
|
||||
return ((j - k + 1) / k) * k
|
||||
}
|
||||
|
|
@ -1,60 +0,0 @@
|
|||
package log
|
||||
|
||||
import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
api "github.com/AYM1607/proglog/api/v1"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSegment(t *testing.T) {
|
||||
dir, _ := ioutil.TempDir("", "segment-test")
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
want := &api.Record{Value: []byte("hello world")}
|
||||
|
||||
c := Config{}
|
||||
c.Segment.MaxIndexBytes = entWidth * 3
|
||||
c.Segment.MaxStoreBytes = 1024
|
||||
|
||||
s, err := newSegment(dir, 16, c)
|
||||
require.NoError(t, err, "The segment is created successfully")
|
||||
require.Equal(t, uint64(16), s.nextOffset, "Next offset should be equal to base offset for new segments.")
|
||||
|
||||
for i := uint64(0); i < 3; i++ {
|
||||
off, err := s.Append(want)
|
||||
require.NoError(t, err, "Record should be appended successfully.")
|
||||
require.Equal(t, off, 16+i, "The offsets should be increase by 1 with respect to the base offset.")
|
||||
|
||||
got, err := s.Read(off)
|
||||
require.NoError(t, err, "Existing records should be read successfully.")
|
||||
require.Equal(t, want.Value, got.Value, "Record's read data should be the same as the written one.")
|
||||
}
|
||||
|
||||
_, err = s.Append(want)
|
||||
require.Equal(t, io.EOF, err, "Appends should fail if the segment is maxed.")
|
||||
|
||||
require.True(t, s.IsMaxed(), "IsMaxed should return true when the segment's index is maxed.")
|
||||
|
||||
// Create a new segment from the same files with a different configuration.
|
||||
c.Segment.MaxIndexBytes = 1024
|
||||
// The length of the record's value is not the same as the byte size of the
|
||||
// marshalled data which includes other fields. By setting this as the limit though,
|
||||
// we assure that the store will be maxed with only 3 records.
|
||||
c.Segment.MaxStoreBytes = uint64(len(want.Value) * 3)
|
||||
|
||||
s, err = newSegment(dir, 16, c)
|
||||
require.NoError(t, err, "Segment should be created successfully from existing files.")
|
||||
require.True(t, s.IsMaxed(), "IsMaxed should return true when the segment's store is maxed.")
|
||||
|
||||
err = s.Remove()
|
||||
require.NoError(t, err, "Segment should be able to remove itself.")
|
||||
|
||||
s, err = newSegment(dir, 16, c)
|
||||
require.NoError(t, err, "Segment should create the necessary files after removed.")
|
||||
require.False(t, s.IsMaxed(), "Segment should not be maxed if new and config's limits are non-zero.")
|
||||
|
||||
}
|
||||
|
|
@ -1,116 +0,0 @@
|
|||
package log
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/binary"
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var (
|
||||
// enc is the endianess used to store records.
|
||||
enc = binary.BigEndian
|
||||
)
|
||||
|
||||
const (
|
||||
// lenWidth determines how many bytes will be used to store the length of the record.
|
||||
lenWidth = 8
|
||||
)
|
||||
|
||||
type store struct {
|
||||
// type embedding of an os file.
|
||||
*os.File
|
||||
|
||||
mu sync.Mutex
|
||||
buf *bufio.Writer
|
||||
size uint64
|
||||
}
|
||||
|
||||
// newStore returns a ready to use store ginven a file descriptor.
|
||||
func newStore(f *os.File) (*store, error) {
|
||||
// Get information for the given file descriptor.
|
||||
fi, err := os.Stat(f.Name())
|
||||
if err != nil {
|
||||
// Return nil becuase it's the zero value for a pointer.
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// This is useful when working with pre-existing files, which could be the case when restarting.
|
||||
size := uint64(fi.Size())
|
||||
return &store{
|
||||
File: f,
|
||||
size: size,
|
||||
buf: bufio.NewWriter(f),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Append writes the provided bytes as a record to the end of the store.
|
||||
// Returns the size fo the record and the position of the record within the store.
|
||||
func (s *store) Append(p []byte) (n uint64, pos uint64, err error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
pos = s.size
|
||||
// Write the size of the record before the actual record.
|
||||
if err := binary.Write(s.buf, enc, uint64(len(p))); err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
|
||||
w, err := s.buf.Write(p)
|
||||
// I don't think this is the way of doing it, because Write could return an
|
||||
// error even though it wrote some bytes to the file.
|
||||
if err != nil {
|
||||
return 0, 0, err
|
||||
}
|
||||
w += lenWidth
|
||||
s.size += uint64(w)
|
||||
return uint64(w), pos, nil
|
||||
}
|
||||
|
||||
// Read retrieves the record at position pos from the store.
|
||||
func (s *store) Read(pos uint64) ([]byte, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
// Commit any buffered data to the file.
|
||||
if err := s.buf.Flush(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Read the size of the record at pos.
|
||||
size := make([]byte, lenWidth)
|
||||
// Could remove `File` because of type embedding but leaving could be better for clarity?
|
||||
if _, err := s.File.ReadAt(size, int64(pos)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Read the actual record data given its offset and size.
|
||||
b := make([]byte, enc.Uint64(size))
|
||||
// Could remove `File` because of type embedding but leaving could be better for clarity?
|
||||
if _, err := s.File.ReadAt(b, int64(pos+lenWidth)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
func (s *store) ReadAt(p []byte, off int64) (int, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if err := s.buf.Flush(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
// Could remove `File` because of type embedding but leaving could be better for clarity?
|
||||
return s.File.ReadAt(p, off)
|
||||
}
|
||||
|
||||
func (s *store) Close() error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if err := s.buf.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
return s.File.Close()
|
||||
}
|
||||
|
|
@ -1,122 +0,0 @@
|
|||
package log
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var (
|
||||
write = []byte("hello world")
|
||||
width = uint64(len(write) + lenWidth)
|
||||
)
|
||||
|
||||
func TestStoreAppendRead(t *testing.T) {
|
||||
// Create a temprorary file with a random name.
|
||||
f, err := ioutil.TempFile("", "store_append_read_test")
|
||||
require.NoError(t, err)
|
||||
defer os.Remove(f.Name())
|
||||
|
||||
// Create a new store from the file.
|
||||
s, err := newStore(f)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Basic operations.
|
||||
testAppend(t, s)
|
||||
testRead(t, s)
|
||||
testReadAt(t, s)
|
||||
|
||||
// A store can be created from an existing non-empty file.
|
||||
s, err = newStore(f)
|
||||
require.NoError(t, err)
|
||||
testRead(t, s)
|
||||
}
|
||||
|
||||
func testAppend(t *testing.T, s *store) {
|
||||
t.Helper()
|
||||
for i := uint64(1); i < 4; i++ {
|
||||
n, pos, err := s.Append(write)
|
||||
require.NoError(t, err)
|
||||
// test the returned offsets.
|
||||
require.Equal(t, width*i, pos+n)
|
||||
}
|
||||
}
|
||||
|
||||
func testRead(t *testing.T, s *store) {
|
||||
t.Helper()
|
||||
var pos uint64
|
||||
for i := uint64(1); i < 4; i++ {
|
||||
read, err := s.Read(pos)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, write, read)
|
||||
pos += width
|
||||
}
|
||||
}
|
||||
|
||||
func testReadAt(t *testing.T, s *store) {
|
||||
t.Helper()
|
||||
for i, off := uint64(1), int64(0); i < 4; i++ {
|
||||
// Read the size of the record at the current offset.
|
||||
b := make([]byte, lenWidth)
|
||||
n, err := s.File.ReadAt(b, off)
|
||||
require.NoError(t, err)
|
||||
// bytes read same as predifined byte size for length.
|
||||
require.Equal(t, lenWidth, n)
|
||||
off += int64(n)
|
||||
|
||||
// Read the actual content of the record.
|
||||
size := enc.Uint64(b)
|
||||
b = make([]byte, size)
|
||||
n, err = s.ReadAt(b, off)
|
||||
require.NoError(t, err)
|
||||
// Read the correct number of bytes.
|
||||
require.Equal(t, int(size), n)
|
||||
// The content read is correct.
|
||||
require.Equal(t, write, b)
|
||||
off += int64(n)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStoreClose(t *testing.T) {
|
||||
f, err := ioutil.TempFile("", "store_close_test")
|
||||
require.NoError(t, err)
|
||||
defer os.Remove(f.Name())
|
||||
|
||||
s, err := newStore(f)
|
||||
require.NoError(t, err)
|
||||
|
||||
// Append a single record.
|
||||
_, _, err = s.Append(write)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, beforeSize, err := openFile(f.Name())
|
||||
require.NoError(t, err)
|
||||
|
||||
err = s.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
_, afterSize, err := openFile(f.Name())
|
||||
require.NoError(t, err)
|
||||
|
||||
// A store buffers its writes so the size of the underlying file should change after close.
|
||||
require.Greater(t, afterSize, beforeSize)
|
||||
|
||||
}
|
||||
|
||||
func openFile(name string) (file *os.File, size int64, err error) {
|
||||
f, err := os.OpenFile(
|
||||
name,
|
||||
os.O_RDWR|os.O_CREATE|os.O_APPEND,
|
||||
0644,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
fi, err := f.Stat()
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
return f, fi.Size(), nil
|
||||
}
|
||||
|
|
@ -1,204 +0,0 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
api "github.com/AYM1607/proglog/api/v1"
|
||||
|
||||
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
|
||||
grpc_auth "github.com/grpc-ecosystem/go-grpc-middleware/auth"
|
||||
grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
|
||||
grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
|
||||
"go.opencensus.io/plugin/ocgrpc"
|
||||
"go.opencensus.io/stats/view"
|
||||
"go.opencensus.io/trace"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
const (
|
||||
objectWildCard = "*"
|
||||
produceAction = "produce"
|
||||
consumeAction = "consume"
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
CommitLog CommitLog
|
||||
Authorizer Authorizer
|
||||
}
|
||||
|
||||
// This comes from the book, why is this needed?
|
||||
var _ api.LogServer = (*grpcServer)(nil)
|
||||
|
||||
func NewGRPCServer(config *Config, opts ...grpc.ServerOption) (*grpc.Server, error) {
|
||||
// Logging.
|
||||
logger := zap.L().Named("server")
|
||||
zapOpts := []grpc_zap.Option{
|
||||
grpc_zap.WithDurationField(
|
||||
func(duration time.Duration) zapcore.Field {
|
||||
return zap.Int64(
|
||||
"grpc.time_ns",
|
||||
duration.Nanoseconds(),
|
||||
)
|
||||
},
|
||||
),
|
||||
}
|
||||
|
||||
// Tracing.
|
||||
// This should not be used in production and a better sampler should be written.
|
||||
trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
|
||||
err := view.Register(ocgrpc.DefaultServerViews...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
opts = append(opts,
|
||||
// Streaming interceptors.
|
||||
grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(
|
||||
grpc_ctxtags.StreamServerInterceptor(),
|
||||
grpc_zap.StreamServerInterceptor(logger, zapOpts...),
|
||||
grpc_auth.StreamServerInterceptor(authenticate),
|
||||
)),
|
||||
// Unary interceptors.
|
||||
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
|
||||
grpc_ctxtags.UnaryServerInterceptor(),
|
||||
grpc_zap.UnaryServerInterceptor(logger, zapOpts...),
|
||||
grpc_auth.UnaryServerInterceptor(authenticate),
|
||||
)),
|
||||
grpc.StatsHandler(&ocgrpc.ServerHandler{}),
|
||||
)
|
||||
gsrv := grpc.NewServer(opts...)
|
||||
srv, err := newgrpcServer(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
api.RegisterLogServer(gsrv, srv)
|
||||
return gsrv, nil
|
||||
}
|
||||
|
||||
type grpcServer struct {
|
||||
api.UnimplementedLogServer
|
||||
*Config
|
||||
}
|
||||
|
||||
func newgrpcServer(config *Config) (srv *grpcServer, err error) {
|
||||
srv = &grpcServer{
|
||||
Config: config,
|
||||
}
|
||||
return srv, nil
|
||||
}
|
||||
|
||||
func (s *grpcServer) Produce(ctx context.Context, req *api.ProduceRequest) (
|
||||
*api.ProduceResponse, error) {
|
||||
if err := s.Authorizer.Authorize(
|
||||
subject(ctx),
|
||||
objectWildCard,
|
||||
produceAction,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
offset, err := s.CommitLog.Append(req.Record)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &api.ProduceResponse{Offset: offset}, nil
|
||||
}
|
||||
|
||||
func (s *grpcServer) Consume(ctx context.Context, req *api.ConsumeRequest) (
|
||||
*api.ConsumeResponse, error) {
|
||||
if err := s.Authorizer.Authorize(
|
||||
subject(ctx),
|
||||
objectWildCard,
|
||||
consumeAction,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
record, err := s.CommitLog.Read(req.Offset)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &api.ConsumeResponse{Record: record}, nil
|
||||
}
|
||||
|
||||
func (s *grpcServer) ProduceStream(
|
||||
stream api.Log_ProduceStreamServer,
|
||||
) error {
|
||||
for {
|
||||
req, err := stream.Recv()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
res, err := s.Produce(stream.Context(), req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = stream.Send(res); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *grpcServer) ConsumeStream(
|
||||
req *api.ConsumeRequest,
|
||||
stream api.Log_ConsumeStreamServer,
|
||||
) error {
|
||||
for {
|
||||
select {
|
||||
case <-stream.Context().Done():
|
||||
return nil
|
||||
default:
|
||||
res, err := s.Consume(stream.Context(), req)
|
||||
switch err.(type) {
|
||||
case nil:
|
||||
case api.ErrOffsetOutOfRange:
|
||||
// This is supposed to hold off until there's more data appended to the log.
|
||||
// The code could return this error for records that have been deleted and it'd be stuck forever.
|
||||
continue
|
||||
default:
|
||||
return err
|
||||
}
|
||||
if err = stream.Send(res); err != nil {
|
||||
return err
|
||||
}
|
||||
req.Offset += 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func authenticate(ctx context.Context) (context.Context, error) {
|
||||
peer, ok := peer.FromContext(ctx)
|
||||
if !ok {
|
||||
return ctx, status.New(
|
||||
codes.Unknown,
|
||||
"could not find peer info",
|
||||
).Err()
|
||||
}
|
||||
if peer.AuthInfo == nil {
|
||||
return context.WithValue(ctx, subjectContextKey{}, ""), nil
|
||||
}
|
||||
tlsInfo := peer.AuthInfo.(credentials.TLSInfo)
|
||||
subject := tlsInfo.State.VerifiedChains[0][0].Subject.CommonName
|
||||
ctx = context.WithValue(ctx, subjectContextKey{}, subject)
|
||||
|
||||
return ctx, nil
|
||||
}
|
||||
|
||||
func subject(ctx context.Context) string {
|
||||
return ctx.Value(subjectContextKey{}).(string)
|
||||
}
|
||||
|
||||
type subjectContextKey struct{}
|
||||
|
||||
type CommitLog interface {
|
||||
Append(*api.Record) (uint64, error)
|
||||
Read(uint64) (*api.Record, error)
|
||||
}
|
||||
|
||||
type Authorizer interface {
|
||||
Authorize(subject, object, action string) error
|
||||
}
|
||||
|
|
@ -1,306 +0,0 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
api "github.com/AYM1607/proglog/api/v1"
|
||||
"github.com/AYM1607/proglog/internal/auth"
|
||||
"github.com/AYM1607/proglog/internal/config"
|
||||
"github.com/AYM1607/proglog/internal/log"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.opencensus.io/examples/exporter"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
var debug = flag.Bool("debug", false, "Enable observability for debugging.")
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
flag.Parse()
|
||||
if *debug {
|
||||
logger, err := zap.NewDevelopment()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
zap.ReplaceGlobals(logger)
|
||||
}
|
||||
os.Exit(m.Run())
|
||||
}
|
||||
|
||||
func TestServer(t *testing.T) {
|
||||
for scenario, fn := range map[string]func(
|
||||
t *testing.T,
|
||||
rootClient api.LogClient,
|
||||
nobodyClient api.LogClient,
|
||||
config *Config,
|
||||
){
|
||||
"produce/consume a message to/from the log succeeds": testProduceConsume,
|
||||
"produce/consume stream succeeds": testProduceConsumeStream,
|
||||
"consume past a log boundary fails": testConsumePastBoundary,
|
||||
"unauthorized fails": testUnauthorized,
|
||||
} {
|
||||
t.Run(scenario, func(t *testing.T) {
|
||||
rootClient,
|
||||
nobodyClient,
|
||||
config,
|
||||
teardown := setupTest(t, nil)
|
||||
defer teardown()
|
||||
fn(t, rootClient, nobodyClient, config)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func setupTest(t *testing.T, fn func(*Config)) (
|
||||
rootClient api.LogClient,
|
||||
nobodyClient api.LogClient,
|
||||
cfg *Config,
|
||||
teardown func(),
|
||||
) {
|
||||
t.Helper()
|
||||
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
newClient := func(crtPath, keyPath string) (
|
||||
*grpc.ClientConn,
|
||||
api.LogClient,
|
||||
[]grpc.DialOption,
|
||||
) {
|
||||
tlsConfig, err := config.SetupTLSConfig(config.TLSConfig{
|
||||
CertFile: crtPath,
|
||||
KeyFile: keyPath,
|
||||
CAFile: config.CAFile,
|
||||
Server: false,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
tlsCreds := credentials.NewTLS(tlsConfig)
|
||||
opts := []grpc.DialOption{grpc.WithTransportCredentials(tlsCreds)}
|
||||
conn, err := grpc.Dial(l.Addr().String(), opts...)
|
||||
require.NoError(t, err)
|
||||
client := api.NewLogClient(conn)
|
||||
return conn, client, opts
|
||||
}
|
||||
|
||||
// TODO: research relation of closures and shorthand variable declaration.
|
||||
// If the connection is on created with var, traces don't work.
|
||||
var rootConn *grpc.ClientConn
|
||||
rootConn, rootClient, _ = newClient(
|
||||
config.RootClientCertFile,
|
||||
config.RootClientKeyFile,
|
||||
)
|
||||
|
||||
// If the connection is on created with var, traces don't work.
|
||||
var nobodyConn *grpc.ClientConn
|
||||
nobodyConn, nobodyClient, _ = newClient(
|
||||
config.NobodyClientCertFile,
|
||||
config.NobodyClientKeyFile,
|
||||
)
|
||||
|
||||
// Server config.
|
||||
serverTLSConfig, err := config.SetupTLSConfig(config.TLSConfig{
|
||||
CertFile: config.ServerCertFile,
|
||||
KeyFile: config.ServerKeyFile,
|
||||
CAFile: config.CAFile,
|
||||
Server: true,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
serverCreds := credentials.NewTLS(serverTLSConfig)
|
||||
|
||||
dir, err := ioutil.TempDir("", "server-test")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(dir)
|
||||
|
||||
clog, err := log.NewLog(dir, log.Config{})
|
||||
require.NoError(t, err)
|
||||
|
||||
authorizer := auth.New(config.ACLModelFile, config.ACLPolicyFile)
|
||||
|
||||
var telemetryExporter *exporter.LogExporter
|
||||
if *debug {
|
||||
metricsLogFile, err := ioutil.TempFile("", "metrics-*.log")
|
||||
require.NoError(t, err)
|
||||
t.Logf("metrics log file: %s", metricsLogFile.Name())
|
||||
|
||||
tracesLogFile, err := ioutil.TempFile("", "traces-*.log")
|
||||
require.NoError(t, err)
|
||||
t.Logf("traces log file: %s", tracesLogFile.Name())
|
||||
|
||||
telemetryExporter, err = exporter.NewLogExporter(exporter.Options{
|
||||
MetricsLogFile: metricsLogFile.Name(),
|
||||
TracesLogFile: tracesLogFile.Name(),
|
||||
ReportingInterval: time.Second,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
err = telemetryExporter.Start()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
cfg = &Config{
|
||||
CommitLog: clog,
|
||||
Authorizer: authorizer,
|
||||
}
|
||||
if fn != nil {
|
||||
fn(cfg)
|
||||
}
|
||||
server, err := NewGRPCServer(cfg, grpc.Creds(serverCreds))
|
||||
require.NoError(t, err)
|
||||
|
||||
go func() {
|
||||
server.Serve(l)
|
||||
}()
|
||||
|
||||
return rootClient, nobodyClient, cfg, func() {
|
||||
server.Stop()
|
||||
rootConn.Close()
|
||||
nobodyConn.Close()
|
||||
l.Close()
|
||||
clog.Remove()
|
||||
|
||||
if telemetryExporter != nil {
|
||||
time.Sleep(2000 * time.Millisecond)
|
||||
telemetryExporter.Stop()
|
||||
telemetryExporter.Close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testProduceConsume(t *testing.T, client, _ api.LogClient, config *Config) {
|
||||
ctx := context.Background()
|
||||
|
||||
want := &api.Record{
|
||||
Value: []byte("hello world"),
|
||||
}
|
||||
|
||||
produce, err := client.Produce(
|
||||
ctx,
|
||||
&api.ProduceRequest{
|
||||
Record: want,
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
consume, err := client.Consume(ctx, &api.ConsumeRequest{
|
||||
Offset: produce.Offset,
|
||||
})
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, produce.Offset, consume.Record.Offset)
|
||||
require.Equal(t, want.Value, consume.Record.Value)
|
||||
}
|
||||
|
||||
func testConsumePastBoundary(
|
||||
t *testing.T,
|
||||
client, _ api.LogClient,
|
||||
config *Config,
|
||||
) {
|
||||
ctx := context.Background()
|
||||
|
||||
produce, err := client.Produce(ctx, &api.ProduceRequest{
|
||||
Record: &api.Record{
|
||||
Value: []byte("hello world"),
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
consume, err := client.Consume(ctx, &api.ConsumeRequest{
|
||||
Offset: produce.Offset + 1,
|
||||
})
|
||||
require.Nil(t, consume, "consume should be nil")
|
||||
got := status.Code(err)
|
||||
want := status.Code(api.ErrOffsetOutOfRange{}.GRPCStatus().Err())
|
||||
require.Equal(t, want, got)
|
||||
}
|
||||
|
||||
func testProduceConsumeStream(
|
||||
t *testing.T,
|
||||
client, _ api.LogClient,
|
||||
config *Config,
|
||||
) {
|
||||
ctx := context.Background()
|
||||
|
||||
records := []*api.Record{{
|
||||
Value: []byte("first message"),
|
||||
Offset: 0,
|
||||
}, {
|
||||
Value: []byte("second message"),
|
||||
Offset: 1,
|
||||
}}
|
||||
|
||||
// Test Produce Stream.
|
||||
// The code from the book adds an extra scope. Is it really needed?
|
||||
{
|
||||
stream, err := client.ProduceStream(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
// The log is empty so the slice index for reach record is also their offset.
|
||||
for offset, record := range records {
|
||||
err = stream.Send(&api.ProduceRequest{
|
||||
Record: record,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
res, err := stream.Recv()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(offset), res.Offset)
|
||||
}
|
||||
}
|
||||
|
||||
// Test Consume stream.
|
||||
// The code from the book adds an extra scope. Is it really needed?
|
||||
{
|
||||
stream, err := client.ConsumeStream(
|
||||
ctx,
|
||||
&api.ConsumeRequest{Offset: 0},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, record := range records {
|
||||
res, err := stream.Recv()
|
||||
require.NoError(t, err)
|
||||
// A record literal must be used otherwise the comparison fails.
|
||||
require.Equal(t, &api.Record{
|
||||
Value: record.Value,
|
||||
Offset: record.Offset,
|
||||
}, res.Record)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func testUnauthorized(
|
||||
t *testing.T,
|
||||
_,
|
||||
client api.LogClient,
|
||||
config *Config,
|
||||
) {
|
||||
ctx := context.Background()
|
||||
|
||||
produce, err := client.Produce(ctx,
|
||||
&api.ProduceRequest{
|
||||
Record: &api.Record{
|
||||
Value: []byte("hello world"),
|
||||
},
|
||||
},
|
||||
)
|
||||
require.Nil(t, produce, "produce response should be nil")
|
||||
gotCode, wantCode := status.Code(err), codes.PermissionDenied
|
||||
require.Equal(t, wantCode, gotCode,
|
||||
"produce error code when client is unauthorized should be permission denied")
|
||||
|
||||
consume, err := client.Consume(ctx, &api.ConsumeRequest{
|
||||
Offset: 0,
|
||||
})
|
||||
require.Nil(t, consume, "consume response should be nil")
|
||||
gotCode, wantCode = status.Code(err), codes.PermissionDenied
|
||||
require.Equal(t, wantCode, gotCode,
|
||||
"consume error code when client is unauthorized should be permission denied")
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue