waveterm/pkg/remote/fileshare/wavefs/wavefs.go
Evan Simkowitz d51ff87c26
Not found paths in prefix fs always treated as dir (#2002)
Gracefully handle prefix paths that don't exist, representing them as
directories so they can be escaped from.

Also removes the ".." file info from the backend, instead only creating
it on the frontend
2025-02-21 16:32:14 -08:00

638 lines
20 KiB
Go

// Copyright 2025, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package wavefs
import (
"context"
"encoding/base64"
"errors"
"fmt"
"io"
"io/fs"
"log"
"os"
"path/filepath"
"strings"
"time"
"github.com/wavetermdev/waveterm/pkg/filestore"
"github.com/wavetermdev/waveterm/pkg/remote/connparse"
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/fspath"
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/fstype"
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/fsutil"
"github.com/wavetermdev/waveterm/pkg/util/fileutil"
"github.com/wavetermdev/waveterm/pkg/util/iochan/iochantypes"
"github.com/wavetermdev/waveterm/pkg/util/tarcopy"
"github.com/wavetermdev/waveterm/pkg/util/wavefileutil"
"github.com/wavetermdev/waveterm/pkg/waveobj"
"github.com/wavetermdev/waveterm/pkg/wps"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"github.com/wavetermdev/waveterm/pkg/wshutil"
)
const (
DirMode os.FileMode = 0755 | os.ModeDir
)
type WaveClient struct{}
var _ fstype.FileShareClient = WaveClient{}
func NewWaveClient() *WaveClient {
return &WaveClient{}
}
func (c WaveClient) ReadStream(ctx context.Context, conn *connparse.Connection, data wshrpc.FileData) <-chan wshrpc.RespOrErrorUnion[wshrpc.FileData] {
ch := make(chan wshrpc.RespOrErrorUnion[wshrpc.FileData], 16)
go func() {
defer close(ch)
rtnData, err := c.Read(ctx, conn, data)
if err != nil {
ch <- wshutil.RespErr[wshrpc.FileData](err)
return
}
dataLen := len(rtnData.Data64)
if !rtnData.Info.IsDir {
for i := 0; i < dataLen; i += wshrpc.FileChunkSize {
if ctx.Err() != nil {
ch <- wshutil.RespErr[wshrpc.FileData](context.Cause(ctx))
return
}
dataEnd := min(i+wshrpc.FileChunkSize, dataLen)
ch <- wshrpc.RespOrErrorUnion[wshrpc.FileData]{Response: wshrpc.FileData{Data64: rtnData.Data64[i:dataEnd], Info: rtnData.Info, At: &wshrpc.FileDataAt{Offset: int64(i), Size: dataEnd - i}}}
}
} else {
for i := 0; i < len(rtnData.Entries); i += wshrpc.DirChunkSize {
if ctx.Err() != nil {
ch <- wshutil.RespErr[wshrpc.FileData](context.Cause(ctx))
return
}
ch <- wshrpc.RespOrErrorUnion[wshrpc.FileData]{Response: wshrpc.FileData{Entries: rtnData.Entries[i:min(i+wshrpc.DirChunkSize, len(rtnData.Entries))], Info: rtnData.Info}}
}
}
}()
return ch
}
func (c WaveClient) Read(ctx context.Context, conn *connparse.Connection, data wshrpc.FileData) (*wshrpc.FileData, error) {
zoneId := conn.Host
if zoneId == "" {
return nil, fmt.Errorf("zoneid not found in connection")
}
fileName, err := cleanPath(conn.Path)
if err != nil {
return nil, fmt.Errorf("error cleaning path: %w", err)
}
if data.At != nil {
_, dataBuf, err := filestore.WFS.ReadAt(ctx, zoneId, fileName, data.At.Offset, int64(data.At.Size))
if err == nil {
return &wshrpc.FileData{Info: data.Info, Data64: base64.StdEncoding.EncodeToString(dataBuf)}, nil
} else if errors.Is(err, fs.ErrNotExist) {
return nil, fmt.Errorf("NOTFOUND: %w", err)
} else {
return nil, fmt.Errorf("error reading blockfile: %w", err)
}
} else {
_, dataBuf, err := filestore.WFS.ReadFile(ctx, zoneId, fileName)
if err == nil {
return &wshrpc.FileData{Info: data.Info, Data64: base64.StdEncoding.EncodeToString(dataBuf)}, nil
} else if !errors.Is(err, fs.ErrNotExist) {
return nil, fmt.Errorf("error reading blockfile: %w", err)
}
}
list, err := c.ListEntries(ctx, conn, nil)
if err != nil {
return nil, fmt.Errorf("error listing blockfiles: %w", err)
}
if len(list) == 0 {
return &wshrpc.FileData{
Info: &wshrpc.FileInfo{
Name: fspath.Base(fileName),
Path: fileName,
Dir: fspath.Dir(fileName),
NotFound: true,
IsDir: true,
}}, nil
}
return &wshrpc.FileData{Info: data.Info, Entries: list}, nil
}
func (c WaveClient) ReadTarStream(ctx context.Context, conn *connparse.Connection, opts *wshrpc.FileCopyOpts) <-chan wshrpc.RespOrErrorUnion[iochantypes.Packet] {
log.Printf("ReadTarStream: conn: %v, opts: %v\n", conn, opts)
path := conn.Path
srcHasSlash := strings.HasSuffix(path, "/")
cleanedPath, err := cleanPath(path)
if err != nil {
return wshutil.SendErrCh[iochantypes.Packet](fmt.Errorf("error cleaning path: %w", err))
}
finfo, err := c.Stat(ctx, conn)
exists := err == nil && !finfo.NotFound
if err != nil {
return wshutil.SendErrCh[iochantypes.Packet](fmt.Errorf("error getting file info: %w", err))
}
if !exists {
return wshutil.SendErrCh[iochantypes.Packet](fmt.Errorf("file not found: %s", conn.GetFullURI()))
}
singleFile := finfo != nil && !finfo.IsDir
var pathPrefix string
if !singleFile && srcHasSlash {
pathPrefix = cleanedPath
} else {
pathPrefix = filepath.Dir(cleanedPath)
}
schemeAndHost := conn.GetSchemeAndHost() + "/"
var entries []*wshrpc.FileInfo
if singleFile {
entries = []*wshrpc.FileInfo{finfo}
} else {
entries, err = c.ListEntries(ctx, conn, nil)
if err != nil {
return wshutil.SendErrCh[iochantypes.Packet](fmt.Errorf("error listing blockfiles: %w", err))
}
}
timeout := fstype.DefaultTimeout
if opts.Timeout > 0 {
timeout = time.Duration(opts.Timeout) * time.Millisecond
}
readerCtx, cancel := context.WithTimeout(context.Background(), timeout)
rtn, writeHeader, fileWriter, tarClose := tarcopy.TarCopySrc(readerCtx, pathPrefix)
go func() {
defer func() {
tarClose()
cancel()
}()
for _, file := range entries {
if readerCtx.Err() != nil {
rtn <- wshutil.RespErr[iochantypes.Packet](context.Cause(readerCtx))
return
}
file.Mode = 0644
if err = writeHeader(fileutil.ToFsFileInfo(file), file.Path, singleFile); err != nil {
rtn <- wshutil.RespErr[iochantypes.Packet](fmt.Errorf("error writing tar header: %w", err))
return
}
if file.IsDir {
continue
}
log.Printf("ReadTarStream: reading file: %s\n", file.Path)
internalPath := strings.TrimPrefix(file.Path, schemeAndHost)
_, dataBuf, err := filestore.WFS.ReadFile(ctx, conn.Host, internalPath)
if err != nil {
rtn <- wshutil.RespErr[iochantypes.Packet](fmt.Errorf("error reading blockfile: %w", err))
return
}
if _, err = fileWriter.Write(dataBuf); err != nil {
rtn <- wshutil.RespErr[iochantypes.Packet](fmt.Errorf("error writing tar data: %w", err))
return
}
}
}()
return rtn
}
func (c WaveClient) ListEntriesStream(ctx context.Context, conn *connparse.Connection, opts *wshrpc.FileListOpts) <-chan wshrpc.RespOrErrorUnion[wshrpc.CommandRemoteListEntriesRtnData] {
ch := make(chan wshrpc.RespOrErrorUnion[wshrpc.CommandRemoteListEntriesRtnData], 16)
go func() {
defer close(ch)
list, err := c.ListEntries(ctx, conn, opts)
if err != nil {
ch <- wshutil.RespErr[wshrpc.CommandRemoteListEntriesRtnData](err)
return
}
for i := 0; i < len(list); i += wshrpc.DirChunkSize {
ch <- wshrpc.RespOrErrorUnion[wshrpc.CommandRemoteListEntriesRtnData]{Response: wshrpc.CommandRemoteListEntriesRtnData{FileInfo: list[i:min(i+wshrpc.DirChunkSize, len(list))]}}
}
}()
return ch
}
func (c WaveClient) ListEntries(ctx context.Context, conn *connparse.Connection, opts *wshrpc.FileListOpts) ([]*wshrpc.FileInfo, error) {
log.Printf("ListEntries: conn: %v, opts: %v\n", conn, opts)
zoneId := conn.Host
if zoneId == "" {
return nil, fmt.Errorf("zoneid not found in connection")
}
if opts == nil {
opts = &wshrpc.FileListOpts{}
}
prefix, err := cleanPath(conn.Path)
if err != nil {
return nil, fmt.Errorf("error cleaning path: %w", err)
}
prefix += fspath.Separator
var fileList []*wshrpc.FileInfo
dirMap := make(map[string]*wshrpc.FileInfo)
if err := listFilesPrefix(ctx, zoneId, prefix, func(wf *filestore.WaveFile) error {
if !opts.All {
name, isDir := fspath.FirstLevelDir(strings.TrimPrefix(wf.Name, prefix))
if isDir {
path := fspath.Join(conn.GetPathWithHost(), name)
if _, ok := dirMap[path]; ok {
if dirMap[path].ModTime < wf.ModTs {
dirMap[path].ModTime = wf.ModTs
}
return nil
}
dirMap[path] = &wshrpc.FileInfo{
Path: path,
Name: name,
Dir: fspath.Dir(path),
Size: 0,
IsDir: true,
SupportsMkdir: false,
Mode: DirMode,
}
fileList = append(fileList, dirMap[path])
return nil
}
}
fileList = append(fileList, wavefileutil.WaveFileToFileInfo(wf))
return nil
}); err != nil {
return nil, fmt.Errorf("error listing entries: %w", err)
}
if opts.Offset > 0 {
if opts.Offset >= len(fileList) {
fileList = nil
} else {
fileList = fileList[opts.Offset:]
}
}
if opts.Limit > 0 {
if opts.Limit < len(fileList) {
fileList = fileList[:opts.Limit]
}
}
return fileList, nil
}
func (c WaveClient) Stat(ctx context.Context, conn *connparse.Connection) (*wshrpc.FileInfo, error) {
zoneId := conn.Host
if zoneId == "" {
return nil, fmt.Errorf("zoneid not found in connection")
}
fileName, err := fsutil.CleanPathPrefix(conn.Path)
if err != nil {
return nil, fmt.Errorf("error cleaning path: %w", err)
}
fileInfo, err := filestore.WFS.Stat(ctx, zoneId, fileName)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
// attempt to list the directory
entries, err := c.ListEntries(ctx, conn, nil)
if err != nil {
return nil, fmt.Errorf("error listing entries: %w", err)
}
if len(entries) > 0 {
return &wshrpc.FileInfo{
Path: conn.GetPathWithHost(),
Name: fileName,
Dir: fsutil.GetParentPathString(fileName),
Size: 0,
IsDir: true,
Mode: DirMode,
}, nil
} else {
return &wshrpc.FileInfo{
Path: conn.GetPathWithHost(),
Name: fileName,
Dir: fsutil.GetParentPathString(fileName),
NotFound: true}, nil
}
}
return nil, fmt.Errorf("error getting file info: %w", err)
}
return wavefileutil.WaveFileToFileInfo(fileInfo), nil
}
func (c WaveClient) PutFile(ctx context.Context, conn *connparse.Connection, data wshrpc.FileData) error {
dataBuf, err := base64.StdEncoding.DecodeString(data.Data64)
if err != nil {
return fmt.Errorf("error decoding data64: %w", err)
}
zoneId := conn.Host
if zoneId == "" {
return fmt.Errorf("zoneid not found in connection")
}
fileName, err := cleanPath(conn.Path)
if err != nil {
return fmt.Errorf("error cleaning path: %w", err)
}
if _, err := filestore.WFS.Stat(ctx, zoneId, fileName); err != nil {
if !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("error getting blockfile info: %w", err)
}
var opts wshrpc.FileOpts
var meta wshrpc.FileMeta
if data.Info != nil {
if data.Info.Opts != nil {
opts = *data.Info.Opts
}
if data.Info.Meta != nil {
meta = *data.Info.Meta
}
}
if err := filestore.WFS.MakeFile(ctx, zoneId, fileName, meta, opts); err != nil {
return fmt.Errorf("error making blockfile: %w", err)
}
}
if data.At != nil && data.At.Offset >= 0 {
if err := filestore.WFS.WriteAt(ctx, zoneId, fileName, data.At.Offset, dataBuf); errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("NOTFOUND: %w", err)
} else if err != nil {
return fmt.Errorf("error writing to blockfile: %w", err)
}
} else {
if err := filestore.WFS.WriteFile(ctx, zoneId, fileName, dataBuf); errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("NOTFOUND: %w", err)
} else if err != nil {
return fmt.Errorf("error writing to blockfile: %w", err)
}
}
wps.Broker.Publish(wps.WaveEvent{
Event: wps.Event_BlockFile,
Scopes: []string{waveobj.MakeORef(waveobj.OType_Block, zoneId).String()},
Data: &wps.WSFileEventData{
ZoneId: zoneId,
FileName: fileName,
FileOp: wps.FileOp_Invalidate,
},
})
return nil
}
func (c WaveClient) AppendFile(ctx context.Context, conn *connparse.Connection, data wshrpc.FileData) error {
dataBuf, err := base64.StdEncoding.DecodeString(data.Data64)
if err != nil {
return fmt.Errorf("error decoding data64: %w", err)
}
zoneId := conn.Host
if zoneId == "" {
return fmt.Errorf("zoneid not found in connection")
}
fileName, err := cleanPath(conn.Path)
if err != nil {
return fmt.Errorf("error cleaning path: %w", err)
}
_, err = filestore.WFS.Stat(ctx, zoneId, fileName)
if err != nil {
if !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("error getting blockfile info: %w", err)
}
var opts wshrpc.FileOpts
var meta wshrpc.FileMeta
if data.Info != nil {
if data.Info.Opts != nil {
opts = *data.Info.Opts
}
if data.Info.Meta != nil {
meta = *data.Info.Meta
}
}
if err := filestore.WFS.MakeFile(ctx, zoneId, fileName, meta, opts); err != nil {
return fmt.Errorf("error making blockfile: %w", err)
}
}
err = filestore.WFS.AppendData(ctx, zoneId, fileName, dataBuf)
if errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("NOTFOUND: %w", err)
}
if err != nil {
return fmt.Errorf("error writing to blockfile: %w", err)
}
wps.Broker.Publish(wps.WaveEvent{
Event: wps.Event_BlockFile,
Scopes: []string{waveobj.MakeORef(waveobj.OType_Block, zoneId).String()},
Data: &wps.WSFileEventData{
ZoneId: zoneId,
FileName: fileName,
FileOp: wps.FileOp_Invalidate,
},
})
return nil
}
// WaveFile does not support directories, only prefix-based listing
func (c WaveClient) Mkdir(ctx context.Context, conn *connparse.Connection) error {
return errors.ErrUnsupported
}
func (c WaveClient) MoveInternal(ctx context.Context, srcConn, destConn *connparse.Connection, opts *wshrpc.FileCopyOpts) error {
if srcConn.Host != destConn.Host {
return fmt.Errorf("move internal, src and dest hosts do not match")
}
isDir, err := c.CopyInternal(ctx, srcConn, destConn, opts)
if err != nil {
return fmt.Errorf("error copying blockfile: %w", err)
}
recursive := opts != nil && opts.Recursive && isDir
if err := c.Delete(ctx, srcConn, recursive); err != nil {
return fmt.Errorf("error deleting blockfile: %w", err)
}
return nil
}
func (c WaveClient) CopyInternal(ctx context.Context, srcConn, destConn *connparse.Connection, opts *wshrpc.FileCopyOpts) (bool, error) {
return fsutil.PrefixCopyInternal(ctx, srcConn, destConn, c, opts, func(ctx context.Context, zoneId, prefix string) ([]string, error) {
entryList := make([]string, 0)
if err := listFilesPrefix(ctx, zoneId, prefix, func(wf *filestore.WaveFile) error {
entryList = append(entryList, wf.Name)
return nil
}); err != nil {
return nil, err
}
return entryList, nil
}, func(ctx context.Context, srcPath, destPath string) error {
srcHost := srcConn.Host
srcFileName := strings.TrimPrefix(srcPath, srcHost+fspath.Separator)
destHost := destConn.Host
destFileName := strings.TrimPrefix(destPath, destHost+fspath.Separator)
_, dataBuf, err := filestore.WFS.ReadFile(ctx, srcHost, srcFileName)
if err != nil {
return fmt.Errorf("error reading source blockfile: %w", err)
}
if err := filestore.WFS.WriteFile(ctx, destHost, destFileName, dataBuf); err != nil {
return fmt.Errorf("error writing to destination blockfile: %w", err)
}
wps.Broker.Publish(wps.WaveEvent{
Event: wps.Event_BlockFile,
Scopes: []string{waveobj.MakeORef(waveobj.OType_Block, destHost).String()},
Data: &wps.WSFileEventData{
ZoneId: destHost,
FileName: destFileName,
FileOp: wps.FileOp_Invalidate,
},
})
return nil
})
}
func (c WaveClient) CopyRemote(ctx context.Context, srcConn, destConn *connparse.Connection, srcClient fstype.FileShareClient, opts *wshrpc.FileCopyOpts) (bool, error) {
if srcConn.Scheme == connparse.ConnectionTypeWave && destConn.Scheme == connparse.ConnectionTypeWave {
return c.CopyInternal(ctx, srcConn, destConn, opts)
}
zoneId := destConn.Host
if zoneId == "" {
return false, fmt.Errorf("zoneid not found in connection")
}
return fsutil.PrefixCopyRemote(ctx, srcConn, destConn, srcClient, c, func(zoneId, path string, size int64, reader io.Reader) error {
dataBuf := make([]byte, size)
if _, err := reader.Read(dataBuf); err != nil {
if !errors.Is(err, io.EOF) {
return fmt.Errorf("error reading tar data: %w", err)
}
}
if _, err := filestore.WFS.Stat(ctx, zoneId, path); err != nil {
if !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("error getting blockfile info: %w", err)
} else {
if err := filestore.WFS.MakeFile(ctx, zoneId, path, wshrpc.FileMeta{}, wshrpc.FileOpts{}); err != nil {
return fmt.Errorf("error making blockfile: %w", err)
}
}
}
if err := filestore.WFS.WriteFile(ctx, zoneId, path, dataBuf); err != nil {
return fmt.Errorf("error writing to blockfile: %w", err)
}
wps.Broker.Publish(wps.WaveEvent{
Event: wps.Event_BlockFile,
Scopes: []string{waveobj.MakeORef(waveobj.OType_Block, zoneId).String()},
Data: &wps.WSFileEventData{
ZoneId: zoneId,
FileName: path,
FileOp: wps.FileOp_Invalidate,
},
})
return nil
}, opts)
}
func (c WaveClient) Delete(ctx context.Context, conn *connparse.Connection, recursive bool) error {
zoneId := conn.Host
if zoneId == "" {
return fmt.Errorf("zoneid not found in connection")
}
prefix := conn.Path
finfo, err := c.Stat(ctx, conn)
exists := err == nil && !finfo.NotFound
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("error getting file info: %w", err)
}
if !exists {
return nil
}
pathsToDelete := make([]string, 0)
if finfo.IsDir {
if !recursive {
return fmt.Errorf("%v is not empty, use recursive flag to delete", prefix)
}
if !strings.HasSuffix(prefix, fspath.Separator) {
prefix += fspath.Separator
}
if err := listFilesPrefix(ctx, zoneId, prefix, func(wf *filestore.WaveFile) error {
pathsToDelete = append(pathsToDelete, wf.Name)
return nil
}); err != nil {
return fmt.Errorf("error listing blockfiles: %w", err)
}
} else {
pathsToDelete = append(pathsToDelete, prefix)
}
if len(pathsToDelete) > 0 {
errs := make([]error, 0)
for _, entry := range pathsToDelete {
if err := filestore.WFS.DeleteFile(ctx, zoneId, entry); err != nil {
errs = append(errs, fmt.Errorf("error deleting blockfile %s/%s: %w", zoneId, entry, err))
continue
}
wps.Broker.Publish(wps.WaveEvent{
Event: wps.Event_BlockFile,
Scopes: []string{waveobj.MakeORef(waveobj.OType_Block, zoneId).String()},
Data: &wps.WSFileEventData{
ZoneId: zoneId,
FileName: entry,
FileOp: wps.FileOp_Delete,
},
})
}
if len(errs) > 0 {
return fmt.Errorf("error deleting blockfiles: %v", errs)
}
}
return nil
}
func listFilesPrefix(ctx context.Context, zoneId, prefix string, entryCallback func(*filestore.WaveFile) error) error {
if zoneId == "" {
return fmt.Errorf("zoneid not found in connection")
}
fileListOrig, err := filestore.WFS.ListFiles(ctx, zoneId)
if err != nil {
return fmt.Errorf("error listing blockfiles: %w", err)
}
for _, wf := range fileListOrig {
if prefix == "" || strings.HasPrefix(wf.Name, prefix) {
entryCallback(wf)
}
}
return nil
}
func (c WaveClient) Join(ctx context.Context, conn *connparse.Connection, parts ...string) (*wshrpc.FileInfo, error) {
newPath := fspath.Join(append([]string{conn.Path}, parts...)...)
newPath, err := cleanPath(newPath)
if err != nil {
return nil, fmt.Errorf("error cleaning path: %w", err)
}
conn.Path = newPath
return c.Stat(ctx, conn)
}
func (c WaveClient) GetCapability() wshrpc.FileShareCapability {
return wshrpc.FileShareCapability{
CanAppend: true,
CanMkdir: false,
}
}
func cleanPath(path string) (string, error) {
if path == "" || path == fspath.Separator {
return "", nil
}
if strings.HasPrefix(path, fspath.Separator) {
path = path[1:]
}
if strings.HasPrefix(path, "~") || strings.HasPrefix(path, ".") || strings.HasPrefix(path, "..") {
return "", fmt.Errorf("wavefile path cannot start with ~, ., or ..")
}
var newParts []string
for _, part := range strings.Split(path, fspath.Separator) {
if part == ".." {
if len(newParts) > 0 {
newParts = newParts[:len(newParts)-1]
}
} else if part != "." {
newParts = append(newParts, part)
}
}
return fspath.Join(newParts...), nil
}
func (c WaveClient) GetConnectionType() string {
return connparse.ConnectionTypeWave
}