waveterm/pkg/suggestion/filewalk.go
2025-02-18 18:52:32 -08:00

261 lines
5.5 KiB
Go

// Copyright 2025, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package suggestion
import (
"container/list"
"context"
"fmt"
"io/fs"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient"
"golang.org/x/sync/singleflight"
)
const ListDirChanSize = 50
// cache settings
const (
maxCacheEntries = 20
cacheTTL = 60 * time.Second
)
type cacheEntry struct {
key string
value []DirEntryResult
expiration time.Time
lruElement *list.Element
}
var (
cache = make(map[string]*cacheEntry)
cacheLRU = list.New()
cacheMu sync.Mutex
// group ensures only one listing per key is executed concurrently.
group singleflight.Group
)
func init() {
go func() {
ticker := time.NewTicker(60 * time.Second)
defer ticker.Stop()
for range ticker.C {
cleanCache()
}
}()
}
func cleanCache() {
cacheMu.Lock()
defer cacheMu.Unlock()
now := time.Now()
for key, entry := range cache {
if now.After(entry.expiration) {
cacheLRU.Remove(entry.lruElement)
delete(cache, key)
}
}
}
func getCache(key string) ([]DirEntryResult, bool) {
cacheMu.Lock()
defer cacheMu.Unlock()
entry, ok := cache[key]
if !ok {
return nil, false
}
if time.Now().After(entry.expiration) {
// expired
cacheLRU.Remove(entry.lruElement)
delete(cache, key)
return nil, false
}
// update LRU order
cacheLRU.MoveToFront(entry.lruElement)
return entry.value, true
}
func setCache(key string, value []DirEntryResult) {
cacheMu.Lock()
defer cacheMu.Unlock()
// if already exists, update it
if entry, ok := cache[key]; ok {
entry.value = value
entry.expiration = time.Now().Add(cacheTTL)
cacheLRU.MoveToFront(entry.lruElement)
return
}
// evict if at capacity
if cacheLRU.Len() >= maxCacheEntries {
oldest := cacheLRU.Back()
if oldest != nil {
oldestKey := oldest.Value.(string)
if oldEntry, ok := cache[oldestKey]; ok {
cacheLRU.Remove(oldEntry.lruElement)
delete(cache, oldestKey)
}
}
}
// add new entry
elem := cacheLRU.PushFront(key)
cache[key] = &cacheEntry{
key: key,
value: value,
expiration: time.Now().Add(cacheTTL),
lruElement: elem,
}
}
// cacheDispose clears all cache entries for the provided widgetId.
func cacheDispose(widgetId string) {
cacheMu.Lock()
defer cacheMu.Unlock()
prefix := widgetId + "|"
for key, entry := range cache {
if strings.HasPrefix(key, prefix) {
cacheLRU.Remove(entry.lruElement)
delete(cache, key)
}
}
}
type DirEntryResult struct {
Entry fs.DirEntry
Err error
}
func listS3Directory(ctx context.Context, widgetId string, conn string, dir string, maxFiles int) (<-chan DirEntryResult, error) {
if !strings.HasPrefix(conn, "aws:") {
return nil, fmt.Errorf("invalid S3 connection: %s", conn)
}
key := widgetId + "|" + dir
if cached, ok := getCache(key); ok {
ch := make(chan DirEntryResult, ListDirChanSize)
go func() {
defer close(ch)
for _, r := range cached {
select {
case ch <- r:
case <-ctx.Done():
return
}
}
}()
return ch, nil
}
// Ensure only one operation populates the cache for this key.
value, err, _ := group.Do(key, func() (interface{}, error) {
path := conn + ":s3://" + dir
entries, err := wshclient.FileListCommand(wshclient.GetBareRpcClient(), wshrpc.FileListData{Path: path, Opts: &wshrpc.FileListOpts{Limit: maxFiles}}, nil)
if err != nil {
return nil, err
}
var results []DirEntryResult
for _, entry := range entries {
mockEntry := &MockDirEntry{
NameStr: entry.Name,
IsDirVal: entry.IsDir,
FileMode: entry.Mode,
}
results = append(results, DirEntryResult{Entry: mockEntry})
}
return results, nil
})
if err != nil {
return nil, err
}
results := value.([]DirEntryResult)
setCache(key, results)
ch := make(chan DirEntryResult, ListDirChanSize)
go func() {
defer close(ch)
for _, r := range results {
select {
case ch <- r:
case <-ctx.Done():
return
}
}
}()
return ch, nil
}
func listDirectory(ctx context.Context, widgetId string, dir string, maxFiles int) (<-chan DirEntryResult, error) {
key := widgetId + "|" + dir
if cached, ok := getCache(key); ok {
ch := make(chan DirEntryResult, ListDirChanSize)
go func() {
defer close(ch)
for _, r := range cached {
select {
case ch <- r:
case <-ctx.Done():
return
}
}
}()
return ch, nil
}
// Use singleflight to ensure only one listing operation occurs per key.
value, err, _ := group.Do(key, func() (interface{}, error) {
f, err := os.Open(dir)
if err != nil {
return nil, err
}
defer f.Close()
fi, err := f.Stat()
if err != nil {
return nil, err
}
if !fi.IsDir() {
return nil, fmt.Errorf("%s is not a directory", dir)
}
entries, err := f.ReadDir(maxFiles)
if err != nil {
return nil, err
}
var results []DirEntryResult
for _, entry := range entries {
results = append(results, DirEntryResult{Entry: entry})
}
// Add parent directory (“..”) entry if not at the filesystem root.
if filepath.Dir(dir) != dir {
mockDir := &MockDirEntry{
NameStr: "..",
IsDirVal: true,
FileMode: fs.ModeDir | 0755,
}
results = append(results, DirEntryResult{Entry: mockDir})
}
return results, nil
})
if err != nil {
return nil, err
}
results := value.([]DirEntryResult)
setCache(key, results)
ch := make(chan DirEntryResult, ListDirChanSize)
go func() {
defer close(ch)
for _, r := range results {
select {
case ch <- r:
case <-ctx.Done():
return
}
}
}()
return ch, nil
}