waveterm/pkg/blockcontroller/shellcontroller.go
Mike Sawka d886b5e246
more terminal integration for wave ai (#2470)
- mark last command / prompts in xterm.js
- split out term model into its own file
- try to detect repl/shells/ssh/tmux etc commands that stop wave ai
integration
- show icons in term headers for whether wave ai integration is
available
- keep integration status / last command client side (sync with server
on reload)
2025-10-25 21:47:16 -07:00

905 lines
30 KiB
Go

// Copyright 2025, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package blockcontroller
import (
"bytes"
"context"
"fmt"
"io"
"io/fs"
"log"
"os"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/wavetermdev/waveterm/pkg/blocklogger"
"github.com/wavetermdev/waveterm/pkg/filestore"
"github.com/wavetermdev/waveterm/pkg/panichandler"
"github.com/wavetermdev/waveterm/pkg/remote"
"github.com/wavetermdev/waveterm/pkg/remote/conncontroller"
"github.com/wavetermdev/waveterm/pkg/shellexec"
"github.com/wavetermdev/waveterm/pkg/util/envutil"
"github.com/wavetermdev/waveterm/pkg/util/fileutil"
"github.com/wavetermdev/waveterm/pkg/util/shellutil"
"github.com/wavetermdev/waveterm/pkg/util/utilfn"
"github.com/wavetermdev/waveterm/pkg/wavebase"
"github.com/wavetermdev/waveterm/pkg/waveobj"
"github.com/wavetermdev/waveterm/pkg/wconfig"
"github.com/wavetermdev/waveterm/pkg/wps"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient"
"github.com/wavetermdev/waveterm/pkg/wshutil"
"github.com/wavetermdev/waveterm/pkg/wslconn"
"github.com/wavetermdev/waveterm/pkg/wstore"
)
const (
ConnType_Local = "local"
ConnType_Wsl = "wsl"
ConnType_Ssh = "ssh"
)
type ShellController struct {
Lock *sync.Mutex
// shared fields
ControllerType string
TabId string
BlockId string
BlockDef *waveobj.BlockDef
RunLock *atomic.Bool
ProcStatus string
ProcExitCode int
StatusVersion int
// for shell/cmd
ShellProc *shellexec.ShellProc
ShellInputCh chan *BlockInputUnion
}
// Constructor that returns the Controller interface
func MakeShellController(tabId string, blockId string, controllerType string) Controller {
return &ShellController{
Lock: &sync.Mutex{},
ControllerType: controllerType,
TabId: tabId,
BlockId: blockId,
ProcStatus: Status_Init,
RunLock: &atomic.Bool{},
}
}
// Implement Controller interface methods
func (sc *ShellController) Start(ctx context.Context, blockMeta waveobj.MetaMapType, rtOpts *waveobj.RuntimeOpts, force bool) error {
// Get the block data
blockData, err := wstore.DBMustGet[*waveobj.Block](ctx, sc.BlockId)
if err != nil {
return fmt.Errorf("error getting block: %w", err)
}
// Use the existing run method which handles all the start logic
go sc.run(ctx, blockData, blockData.Meta, rtOpts, force)
return nil
}
func (sc *ShellController) Stop(graceful bool, newStatus string) error {
sc.Lock.Lock()
defer sc.Lock.Unlock()
if sc.ShellProc == nil || sc.ProcStatus == Status_Done || sc.ProcStatus == Status_Init {
if newStatus != sc.ProcStatus {
sc.ProcStatus = newStatus
sc.sendUpdate_nolock()
}
return nil
}
sc.ShellProc.Close()
if graceful {
doneCh := sc.ShellProc.DoneCh
sc.Lock.Unlock() // Unlock before waiting
<-doneCh
sc.Lock.Lock() // Re-lock after waiting
}
// Update status
sc.ProcStatus = newStatus
sc.sendUpdate_nolock()
return nil
}
func (sc *ShellController) getRuntimeStatus_nolock() BlockControllerRuntimeStatus {
var rtn BlockControllerRuntimeStatus
sc.StatusVersion++
rtn.Version = sc.StatusVersion
rtn.BlockId = sc.BlockId
rtn.ShellProcStatus = sc.ProcStatus
if sc.ShellProc != nil {
rtn.ShellProcConnName = sc.ShellProc.ConnName
}
rtn.ShellProcExitCode = sc.ProcExitCode
return rtn
}
func (sc *ShellController) GetRuntimeStatus() *BlockControllerRuntimeStatus {
var rtn BlockControllerRuntimeStatus
sc.WithLock(func() {
rtn = sc.getRuntimeStatus_nolock()
})
return &rtn
}
func (sc *ShellController) SendInput(inputUnion *BlockInputUnion) error {
var shellInputCh chan *BlockInputUnion
sc.WithLock(func() {
shellInputCh = sc.ShellInputCh
})
if shellInputCh == nil {
return fmt.Errorf("no shell input chan")
}
shellInputCh <- inputUnion
return nil
}
func (sc *ShellController) WithLock(f func()) {
sc.Lock.Lock()
defer sc.Lock.Unlock()
f()
}
type RunShellOpts struct {
TermSize waveobj.TermSize `json:"termsize,omitempty"`
}
// only call when holding the lock
func (sc *ShellController) sendUpdate_nolock() {
rtStatus := sc.getRuntimeStatus_nolock()
log.Printf("sending blockcontroller update %#v\n", rtStatus)
wps.Broker.Publish(wps.WaveEvent{
Event: wps.Event_ControllerStatus,
Scopes: []string{
waveobj.MakeORef(waveobj.OType_Tab, sc.TabId).String(),
waveobj.MakeORef(waveobj.OType_Block, sc.BlockId).String(),
},
Data: rtStatus,
})
}
func (sc *ShellController) UpdateControllerAndSendUpdate(updateFn func() bool) {
var sendUpdate bool
sc.WithLock(func() {
sendUpdate = updateFn()
})
if sendUpdate {
rtStatus := sc.GetRuntimeStatus()
log.Printf("sending blockcontroller update %#v\n", rtStatus)
wps.Broker.Publish(wps.WaveEvent{
Event: wps.Event_ControllerStatus,
Scopes: []string{
waveobj.MakeORef(waveobj.OType_Tab, sc.TabId).String(),
waveobj.MakeORef(waveobj.OType_Block, sc.BlockId).String(),
},
Data: rtStatus,
})
}
}
func (sc *ShellController) resetTerminalState(logCtx context.Context) {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
wfile, statErr := filestore.WFS.Stat(ctx, sc.BlockId, wavebase.BlockFile_Term)
if statErr == fs.ErrNotExist || wfile.Size == 0 {
return
}
blocklogger.Debugf(logCtx, "[conndebug] resetTerminalState: resetting terminal state\n")
// controller type = "shell"
var buf bytes.Buffer
buf.WriteString("\x1b[0m") // reset attributes
buf.WriteString("\x1b[?25h") // show cursor
buf.WriteString("\x1b[?1000l") // disable mouse tracking
buf.WriteString("\x1b[?1007l") // disable alternate scroll mode
buf.WriteString(shellutil.FormatOSC(16162, "R")) // OSC 16162 "R" - disable alternate screen mode (only if active), reset "shell integration" status.
buf.WriteString("\r\n\r\n")
err := HandleAppendBlockFile(sc.BlockId, wavebase.BlockFile_Term, buf.Bytes())
if err != nil {
log.Printf("error appending to blockfile (terminal reset): %v\n", err)
}
}
// [All the other existing private methods remain exactly the same - I'm not including them all here for brevity, but they would all be copied over with sc. replacing bc. throughout]
func (sc *ShellController) DoRunShellCommand(logCtx context.Context, rc *RunShellOpts, blockMeta waveobj.MetaMapType) error {
blocklogger.Debugf(logCtx, "[conndebug] DoRunShellCommand\n")
shellProc, err := sc.setupAndStartShellProcess(logCtx, rc, blockMeta)
if err != nil {
return err
}
return sc.manageRunningShellProcess(shellProc, rc, blockMeta)
}
// [Continue with all other methods, replacing bc with sc throughout...]
func (sc *ShellController) LockRunLock() bool {
rtn := sc.RunLock.CompareAndSwap(false, true)
if rtn {
log.Printf("block %q run() lock\n", sc.BlockId)
}
return rtn
}
func (sc *ShellController) UnlockRunLock() {
sc.RunLock.Store(false)
log.Printf("block %q run() unlock\n", sc.BlockId)
}
func (sc *ShellController) run(logCtx context.Context, bdata *waveobj.Block, blockMeta map[string]any, rtOpts *waveobj.RuntimeOpts, force bool) {
blocklogger.Debugf(logCtx, "[conndebug] ShellController.run() %q\n", sc.BlockId)
runningShellCommand := false
ok := sc.LockRunLock()
if !ok {
log.Printf("block %q is already executing run()\n", sc.BlockId)
return
}
defer func() {
if !runningShellCommand {
sc.UnlockRunLock()
}
}()
curStatus := sc.GetRuntimeStatus()
controllerName := bdata.Meta.GetString(waveobj.MetaKey_Controller, "")
if controllerName != BlockController_Shell && controllerName != BlockController_Cmd {
log.Printf("unknown controller %q\n", controllerName)
return
}
runOnce := getBoolFromMeta(blockMeta, waveobj.MetaKey_CmdRunOnce, false)
runOnStart := getBoolFromMeta(blockMeta, waveobj.MetaKey_CmdRunOnStart, true)
if ((runOnStart || runOnce) && curStatus.ShellProcStatus == Status_Init) || force {
if getBoolFromMeta(blockMeta, waveobj.MetaKey_CmdClearOnStart, false) {
err := HandleTruncateBlockFile(sc.BlockId)
if err != nil {
log.Printf("error truncating term blockfile: %v\n", err)
}
}
if runOnce {
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn()
metaUpdate := map[string]any{
waveobj.MetaKey_CmdRunOnce: false,
waveobj.MetaKey_CmdRunOnStart: false,
}
err := wstore.UpdateObjectMeta(ctx, waveobj.MakeORef(waveobj.OType_Block, sc.BlockId), metaUpdate, false)
if err != nil {
log.Printf("error updating block meta (in blockcontroller.run): %v\n", err)
return
}
}
runningShellCommand = true
go func() {
defer func() {
panichandler.PanicHandler("blockcontroller:run-shell-command", recover())
}()
defer sc.UnlockRunLock()
var termSize waveobj.TermSize
if rtOpts != nil {
termSize = rtOpts.TermSize
} else {
termSize = getTermSize(bdata)
}
err := sc.DoRunShellCommand(logCtx, &RunShellOpts{TermSize: termSize}, bdata.Meta)
if err != nil {
debugLog(logCtx, "error running shell: %v\n", err)
}
}()
}
}
// [Include all the remaining private methods with bc replaced by sc]
type ConnUnion struct {
ConnName string
ConnType string
SshConn *conncontroller.SSHConn
WslConn *wslconn.WslConn
WshEnabled bool
ShellPath string
ShellOpts []string
ShellType string
}
func (bc *ShellController) getConnUnion(logCtx context.Context, remoteName string, blockMeta waveobj.MetaMapType) (ConnUnion, error) {
rtn := ConnUnion{ConnName: remoteName}
wshEnabled := !blockMeta.GetBool(waveobj.MetaKey_CmdNoWsh, false)
if strings.HasPrefix(remoteName, "wsl://") {
wslName := strings.TrimPrefix(remoteName, "wsl://")
wslConn := wslconn.GetWslConn(wslName)
if wslConn == nil {
return ConnUnion{}, fmt.Errorf("wsl connection not found: %s", remoteName)
}
connStatus := wslConn.DeriveConnStatus()
if connStatus.Status != conncontroller.Status_Connected {
return ConnUnion{}, fmt.Errorf("wsl connection %s not connected, cannot start shellproc", remoteName)
}
rtn.ConnType = ConnType_Wsl
rtn.WslConn = wslConn
rtn.WshEnabled = wshEnabled && wslConn.WshEnabled.Load()
} else if remoteName != "" {
opts, err := remote.ParseOpts(remoteName)
if err != nil {
return ConnUnion{}, fmt.Errorf("invalid ssh remote name (%s): %w", remoteName, err)
}
conn := conncontroller.GetConn(opts)
if conn == nil {
return ConnUnion{}, fmt.Errorf("ssh connection not found: %s", remoteName)
}
connStatus := conn.DeriveConnStatus()
if connStatus.Status != conncontroller.Status_Connected {
return ConnUnion{}, fmt.Errorf("ssh connection %s not connected, cannot start shellproc", remoteName)
}
rtn.ConnType = ConnType_Ssh
rtn.SshConn = conn
rtn.WshEnabled = wshEnabled && conn.WshEnabled.Load()
} else {
rtn.ConnType = ConnType_Local
rtn.WshEnabled = wshEnabled
}
err := rtn.getRemoteInfoAndShellType(blockMeta)
if err != nil {
return ConnUnion{}, err
}
return rtn, nil
}
func (bc *ShellController) setupAndStartShellProcess(logCtx context.Context, rc *RunShellOpts, blockMeta waveobj.MetaMapType) (*shellexec.ShellProc, error) {
// create a circular blockfile for the output
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn()
fsErr := filestore.WFS.MakeFile(ctx, bc.BlockId, wavebase.BlockFile_Term, nil, wshrpc.FileOpts{MaxSize: DefaultTermMaxFileSize, Circular: true})
if fsErr != nil && fsErr != fs.ErrExist {
return nil, fmt.Errorf("error creating blockfile: %w", fsErr)
}
if fsErr == fs.ErrExist {
// reset the terminal state
bc.resetTerminalState(logCtx)
}
bcInitStatus := bc.GetRuntimeStatus()
if bcInitStatus.ShellProcStatus == Status_Running {
return nil, nil
}
// TODO better sync here (don't let two starts happen at the same times)
remoteName := blockMeta.GetString(waveobj.MetaKey_Connection, "")
connUnion, err := bc.getConnUnion(logCtx, remoteName, blockMeta)
if err != nil {
return nil, err
}
blocklogger.Infof(logCtx, "[conndebug] remoteName: %q, connType: %s, wshEnabled: %v, shell: %q, shellType: %s\n", remoteName, connUnion.ConnType, connUnion.WshEnabled, connUnion.ShellPath, connUnion.ShellType)
var cmdStr string
var cmdOpts shellexec.CommandOptsType
if bc.ControllerType == BlockController_Shell {
cmdOpts.Interactive = true
cmdOpts.Login = true
cmdOpts.Cwd = blockMeta.GetString(waveobj.MetaKey_CmdCwd, "")
if cmdOpts.Cwd != "" {
cwdPath, err := wavebase.ExpandHomeDir(cmdOpts.Cwd)
if err != nil {
return nil, err
}
cmdOpts.Cwd = cwdPath
}
} else if bc.ControllerType == BlockController_Cmd {
var cmdOptsPtr *shellexec.CommandOptsType
cmdStr, cmdOptsPtr, err = createCmdStrAndOpts(bc.BlockId, blockMeta, remoteName)
if err != nil {
return nil, err
}
cmdOpts = *cmdOptsPtr
} else {
return nil, fmt.Errorf("unknown controller type %q", bc.ControllerType)
}
var shellProc *shellexec.ShellProc
swapToken := bc.makeSwapToken(ctx, logCtx, blockMeta, remoteName, connUnion.ShellType)
cmdOpts.SwapToken = swapToken
blocklogger.Debugf(logCtx, "[conndebug] created swaptoken: %s\n", swapToken.Token)
if connUnion.ConnType == ConnType_Wsl {
wslConn := connUnion.WslConn
if !connUnion.WshEnabled {
shellProc, err = shellexec.StartWslShellProcNoWsh(ctx, rc.TermSize, cmdStr, cmdOpts, wslConn)
if err != nil {
return nil, err
}
} else {
sockName := wslConn.GetDomainSocketName()
rpcContext := wshrpc.RpcContext{TabId: bc.TabId, BlockId: bc.BlockId, Conn: wslConn.GetName()}
jwtStr, err := wshutil.MakeClientJWTToken(rpcContext, sockName)
if err != nil {
return nil, fmt.Errorf("error making jwt token: %w", err)
}
swapToken.SockName = sockName
swapToken.RpcContext = &rpcContext
swapToken.Env[wshutil.WaveJwtTokenVarName] = jwtStr
shellProc, err = shellexec.StartWslShellProc(ctx, rc.TermSize, cmdStr, cmdOpts, wslConn)
if err != nil {
wslConn.SetWshError(err)
wslConn.WshEnabled.Store(false)
blocklogger.Infof(logCtx, "[conndebug] error starting wsl shell proc with wsh: %v\n", err)
blocklogger.Infof(logCtx, "[conndebug] attempting install without wsh\n")
shellProc, err = shellexec.StartWslShellProcNoWsh(ctx, rc.TermSize, cmdStr, cmdOpts, wslConn)
if err != nil {
return nil, err
}
}
}
} else if connUnion.ConnType == ConnType_Ssh {
conn := connUnion.SshConn
if !connUnion.WshEnabled {
shellProc, err = shellexec.StartRemoteShellProcNoWsh(ctx, rc.TermSize, cmdStr, cmdOpts, conn)
if err != nil {
return nil, err
}
} else {
sockName := conn.GetDomainSocketName()
rpcContext := wshrpc.RpcContext{TabId: bc.TabId, BlockId: bc.BlockId, Conn: conn.Opts.String()}
jwtStr, err := wshutil.MakeClientJWTToken(rpcContext, sockName)
if err != nil {
return nil, fmt.Errorf("error making jwt token: %w", err)
}
swapToken.SockName = sockName
swapToken.RpcContext = &rpcContext
swapToken.Env[wshutil.WaveJwtTokenVarName] = jwtStr
shellProc, err = shellexec.StartRemoteShellProc(ctx, logCtx, rc.TermSize, cmdStr, cmdOpts, conn)
if err != nil {
conn.SetWshError(err)
conn.WshEnabled.Store(false)
blocklogger.Infof(logCtx, "[conndebug] error starting remote shell proc with wsh: %v\n", err)
blocklogger.Infof(logCtx, "[conndebug] attempting install without wsh\n")
shellProc, err = shellexec.StartRemoteShellProcNoWsh(ctx, rc.TermSize, cmdStr, cmdOpts, conn)
if err != nil {
return nil, err
}
}
}
} else if connUnion.ConnType == ConnType_Local {
if connUnion.WshEnabled {
sockName := wavebase.GetDomainSocketName()
rpcContext := wshrpc.RpcContext{TabId: bc.TabId, BlockId: bc.BlockId}
jwtStr, err := wshutil.MakeClientJWTToken(rpcContext, sockName)
if err != nil {
return nil, fmt.Errorf("error making jwt token: %w", err)
}
swapToken.SockName = sockName
swapToken.RpcContext = &rpcContext
swapToken.Env[wshutil.WaveJwtTokenVarName] = jwtStr
}
cmdOpts.ShellPath = connUnion.ShellPath
cmdOpts.ShellOpts = getLocalShellOpts(blockMeta)
shellProc, err = shellexec.StartLocalShellProc(logCtx, rc.TermSize, cmdStr, cmdOpts)
if err != nil {
return nil, err
}
} else {
return nil, fmt.Errorf("unknown connection type for conn %q: %s", remoteName, connUnion.ConnType)
}
bc.UpdateControllerAndSendUpdate(func() bool {
bc.ShellProc = shellProc
bc.ProcStatus = Status_Running
return true
})
return shellProc, nil
}
func (bc *ShellController) manageRunningShellProcess(shellProc *shellexec.ShellProc, rc *RunShellOpts, blockMeta waveobj.MetaMapType) error {
shellInputCh := make(chan *BlockInputUnion, 32)
bc.ShellInputCh = shellInputCh
// make esc sequence wshclient wshProxy
// we don't need to authenticate this wshProxy since it is coming direct
wshProxy := wshutil.MakeRpcProxy()
wshProxy.SetRpcContext(&wshrpc.RpcContext{TabId: bc.TabId, BlockId: bc.BlockId})
wshutil.DefaultRouter.RegisterRoute(wshutil.MakeControllerRouteId(bc.BlockId), wshProxy, true)
ptyBuffer := wshutil.MakePtyBuffer(wshutil.WaveOSCPrefix, shellProc.Cmd, wshProxy.FromRemoteCh)
go func() {
// handles regular output from the pty (goes to the blockfile and xterm)
defer func() {
panichandler.PanicHandler("blockcontroller:shellproc-pty-read-loop", recover())
}()
defer func() {
log.Printf("[shellproc] pty-read loop done\n")
shellProc.Close()
bc.WithLock(func() {
// so no other events are sent
bc.ShellInputCh = nil
})
shellProc.Cmd.Wait()
exitCode := shellProc.Cmd.ExitCode()
blockData := bc.getBlockData_noErr()
if blockData != nil && blockData.Meta.GetString(waveobj.MetaKey_Controller, "") == BlockController_Cmd {
termMsg := fmt.Sprintf("\r\nprocess finished with exit code = %d\r\n\r\n", exitCode)
HandleAppendBlockFile(bc.BlockId, wavebase.BlockFile_Term, []byte(termMsg))
}
// to stop the inputCh loop
time.Sleep(100 * time.Millisecond)
close(shellInputCh) // don't use bc.ShellInputCh (it's nil)
}()
buf := make([]byte, 4096)
for {
nr, err := ptyBuffer.Read(buf)
if nr > 0 {
err := HandleAppendBlockFile(bc.BlockId, wavebase.BlockFile_Term, buf[:nr])
if err != nil {
log.Printf("error appending to blockfile: %v\n", err)
}
}
if err == io.EOF {
break
}
if err != nil {
log.Printf("error reading from shell: %v\n", err)
break
}
}
}()
go func() {
// handles input from the shellInputCh, sent to pty
// use shellInputCh instead of bc.ShellInputCh (because we want to be attached to *this* ch. bc.ShellInputCh can be updated)
defer func() {
panichandler.PanicHandler("blockcontroller:shellproc-input-loop", recover())
}()
for ic := range shellInputCh {
if len(ic.InputData) > 0 {
shellProc.Cmd.Write(ic.InputData)
}
if ic.TermSize != nil {
updateTermSize(shellProc, bc.BlockId, *ic.TermSize)
}
}
}()
go func() {
defer func() {
panichandler.PanicHandler("blockcontroller:shellproc-output-loop", recover())
}()
// handles outputCh -> shellInputCh
for msg := range wshProxy.ToRemoteCh {
encodedMsg, err := wshutil.EncodeWaveOSCBytes(wshutil.WaveServerOSC, msg)
if err != nil {
log.Printf("error encoding OSC message: %v\n", err)
}
shellInputCh <- &BlockInputUnion{InputData: encodedMsg}
}
}()
go func() {
defer func() {
panichandler.PanicHandler("blockcontroller:shellproc-wait-loop", recover())
}()
// wait for the shell to finish
var exitCode int
defer func() {
wshutil.DefaultRouter.UnregisterRoute(wshutil.MakeControllerRouteId(bc.BlockId))
bc.UpdateControllerAndSendUpdate(func() bool {
if bc.ProcStatus == Status_Running {
bc.ProcStatus = Status_Done
}
bc.ProcExitCode = exitCode
return true
})
log.Printf("[shellproc] shell process wait loop done\n")
}()
waitErr := shellProc.Cmd.Wait()
exitCode = shellProc.Cmd.ExitCode()
shellProc.SetWaitErrorAndSignalDone(waitErr)
go checkCloseOnExit(bc.BlockId, exitCode)
}()
return nil
}
func (union *ConnUnion) getRemoteInfoAndShellType(blockMeta waveobj.MetaMapType) error {
if !union.WshEnabled {
return nil
}
if union.ConnType == ConnType_Ssh || union.ConnType == ConnType_Wsl {
connRoute := wshutil.MakeConnectionRouteId(union.ConnName)
remoteInfo, err := wshclient.RemoteGetInfoCommand(wshclient.GetBareRpcClient(), &wshrpc.RpcOpts{Route: connRoute, Timeout: 2000})
if err != nil {
// weird error, could flip the wshEnabled flag and allow it to go forward, but the connection should have already been vetted
return fmt.Errorf("unable to obtain remote info from connserver: %w", err)
}
// TODO allow overriding remote shell path
union.ShellPath = remoteInfo.Shell
} else {
union.ShellPath = getLocalShellPath(blockMeta)
}
union.ShellType = shellutil.GetShellTypeFromShellPath(union.ShellPath)
return nil
}
func checkCloseOnExit(blockId string, exitCode int) {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
blockData, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId)
if err != nil {
log.Printf("error getting block data: %v\n", err)
return
}
closeOnExit := blockData.Meta.GetBool(waveobj.MetaKey_CmdCloseOnExit, false)
closeOnExitForce := blockData.Meta.GetBool(waveobj.MetaKey_CmdCloseOnExitForce, false)
if !closeOnExitForce && !(closeOnExit && exitCode == 0) {
return
}
delayMs := blockData.Meta.GetFloat(waveobj.MetaKey_CmdCloseOnExitDelay, 2000)
if delayMs < 0 {
delayMs = 0
}
time.Sleep(time.Duration(delayMs) * time.Millisecond)
rpcClient := wshclient.GetBareRpcClient()
err = wshclient.DeleteBlockCommand(rpcClient, wshrpc.CommandDeleteBlockData{BlockId: blockId}, nil)
if err != nil {
log.Printf("error deleting block data (close on exit): %v\n", err)
}
}
func getLocalShellPath(blockMeta waveobj.MetaMapType) string {
shellPath := blockMeta.GetString(waveobj.MetaKey_TermLocalShellPath, "")
if shellPath != "" {
return shellPath
}
settings := wconfig.GetWatcher().GetFullConfig().Settings
if settings.TermLocalShellPath != "" {
return settings.TermLocalShellPath
}
return shellutil.DetectLocalShellPath()
}
func getLocalShellOpts(blockMeta waveobj.MetaMapType) []string {
if blockMeta.HasKey(waveobj.MetaKey_TermLocalShellOpts) {
opts := blockMeta.GetStringList(waveobj.MetaKey_TermLocalShellOpts)
return append([]string{}, opts...)
}
settings := wconfig.GetWatcher().GetFullConfig().Settings
if len(settings.TermLocalShellOpts) > 0 {
return append([]string{}, settings.TermLocalShellOpts...)
}
return nil
}
// for "cmd" type blocks
func createCmdStrAndOpts(blockId string, blockMeta waveobj.MetaMapType, connName string) (string, *shellexec.CommandOptsType, error) {
var cmdStr string
var cmdOpts shellexec.CommandOptsType
cmdStr = blockMeta.GetString(waveobj.MetaKey_Cmd, "")
if cmdStr == "" {
return "", nil, fmt.Errorf("missing cmd in block meta")
}
cmdOpts.Cwd = blockMeta.GetString(waveobj.MetaKey_CmdCwd, "")
if cmdOpts.Cwd != "" {
cwdPath, err := wavebase.ExpandHomeDir(cmdOpts.Cwd)
if err != nil {
return "", nil, err
}
cmdOpts.Cwd = cwdPath
}
useShell := blockMeta.GetBool(waveobj.MetaKey_CmdShell, true)
if !useShell {
if strings.Contains(cmdStr, " ") {
return "", nil, fmt.Errorf("cmd should not have spaces if cmd:shell is false (use cmd:args)")
}
cmdArgs := blockMeta.GetStringList(waveobj.MetaKey_CmdArgs)
// shell escape the args
for _, arg := range cmdArgs {
cmdStr = cmdStr + " " + utilfn.ShellQuote(arg, false, -1)
}
}
cmdOpts.ForceJwt = blockMeta.GetBool(waveobj.MetaKey_CmdJwt, false)
return cmdStr, &cmdOpts, nil
}
func (bc *ShellController) makeSwapToken(ctx context.Context, logCtx context.Context, blockMeta waveobj.MetaMapType, remoteName string, shellType string) *shellutil.TokenSwapEntry {
token := &shellutil.TokenSwapEntry{
Token: uuid.New().String(),
Env: make(map[string]string),
Exp: time.Now().Add(5 * time.Minute),
}
token.Env["TERM_PROGRAM"] = "waveterm"
token.Env["WAVETERM_BLOCKID"] = bc.BlockId
token.Env["WAVETERM_VERSION"] = wavebase.WaveVersion
token.Env["WAVETERM"] = "1"
tabId, err := wstore.DBFindTabForBlockId(ctx, bc.BlockId)
if err != nil {
log.Printf("error finding tab for block: %v\n", err)
} else {
token.Env["WAVETERM_TABID"] = tabId
}
if tabId != "" {
wsId, err := wstore.DBFindWorkspaceForTabId(ctx, tabId)
if err != nil {
log.Printf("error finding workspace for tab: %v\n", err)
} else {
token.Env["WAVETERM_WORKSPACEID"] = wsId
}
}
clientData, err := wstore.DBGetSingleton[*waveobj.Client](ctx)
if err != nil {
log.Printf("error getting client data: %v\n", err)
} else {
token.Env["WAVETERM_CLIENTID"] = clientData.OID
}
token.Env["WAVETERM_CONN"] = remoteName
envMap, err := resolveEnvMap(bc.BlockId, blockMeta, remoteName)
if err != nil {
log.Printf("error resolving env map: %v\n", err)
}
for k, v := range envMap {
token.Env[k] = v
}
token.ScriptText = getCustomInitScript(logCtx, blockMeta, remoteName, shellType)
return token
}
func (bc *ShellController) getBlockData_noErr() *waveobj.Block {
ctx, cancelFn := context.WithTimeout(context.Background(), DefaultTimeout)
defer cancelFn()
blockData, err := wstore.DBGet[*waveobj.Block](ctx, bc.BlockId)
if err != nil {
log.Printf("error getting block data (getBlockData_noErr): %v\n", err)
return nil
}
return blockData
}
func resolveEnvMap(blockId string, blockMeta waveobj.MetaMapType, connName string) (map[string]string, error) {
rtn := make(map[string]string)
config := wconfig.GetWatcher().GetFullConfig()
connKeywords := config.Connections[connName]
ckEnv := connKeywords.CmdEnv
for k, v := range ckEnv {
rtn[k] = v
}
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn()
_, envFileData, err := filestore.WFS.ReadFile(ctx, blockId, wavebase.BlockFile_Env)
if err == fs.ErrNotExist {
err = nil
}
if err != nil {
return nil, fmt.Errorf("error reading command env file: %w", err)
}
if len(envFileData) > 0 {
envMap := envutil.EnvToMap(string(envFileData))
for k, v := range envMap {
rtn[k] = v
}
}
cmdEnv := blockMeta.GetStringMap(waveobj.MetaKey_CmdEnv, true)
for k, v := range cmdEnv {
if v == waveobj.MetaMap_DeleteSentinel {
delete(rtn, k)
continue
}
rtn[k] = v
}
connEnv := blockMeta.GetConnectionOverride(connName).GetStringMap(waveobj.MetaKey_CmdEnv, true)
for k, v := range connEnv {
if v == waveobj.MetaMap_DeleteSentinel {
delete(rtn, k)
continue
}
rtn[k] = v
}
return rtn, nil
}
func getCustomInitScriptKeyCascade(shellType string) []string {
if shellType == "bash" {
return []string{waveobj.MetaKey_CmdInitScriptBash, waveobj.MetaKey_CmdInitScriptSh, waveobj.MetaKey_CmdInitScript}
}
if shellType == "zsh" {
return []string{waveobj.MetaKey_CmdInitScriptZsh, waveobj.MetaKey_CmdInitScriptSh, waveobj.MetaKey_CmdInitScript}
}
if shellType == "pwsh" {
return []string{waveobj.MetaKey_CmdInitScriptPwsh, waveobj.MetaKey_CmdInitScript}
}
if shellType == "fish" {
return []string{waveobj.MetaKey_CmdInitScriptFish, waveobj.MetaKey_CmdInitScript}
}
return []string{waveobj.MetaKey_CmdInitScript}
}
func getCustomInitScript(logCtx context.Context, meta waveobj.MetaMapType, connName string, shellType string) string {
initScriptVal, metaKeyName := getCustomInitScriptValue(meta, connName, shellType)
if initScriptVal == "" {
return ""
}
if !fileutil.IsInitScriptPath(initScriptVal) {
blocklogger.Infof(logCtx, "[conndebug] inline initScript (size=%d) found in meta key: %s\n", len(initScriptVal), metaKeyName)
return initScriptVal
}
blocklogger.Infof(logCtx, "[conndebug] initScript detected as a file %q from meta key: %s\n", initScriptVal, metaKeyName)
initScriptVal, err := wavebase.ExpandHomeDir(initScriptVal)
if err != nil {
blocklogger.Infof(logCtx, "[conndebug] cannot expand home dir in Wave initscript file: %v\n", err)
return fmt.Sprintf("echo \"cannot expand home dir in Wave initscript file, from key %s\";\n", metaKeyName)
}
fileData, err := os.ReadFile(initScriptVal)
if err != nil {
blocklogger.Infof(logCtx, "[conndebug] cannot open Wave initscript file: %v\n", err)
return fmt.Sprintf("echo \"cannot open Wave initscript file, from key %s\";\n", metaKeyName)
}
if len(fileData) > MaxInitScriptSize {
blocklogger.Infof(logCtx, "[conndebug] initscript file too large, size=%d, max=%d\n", len(fileData), MaxInitScriptSize)
return fmt.Sprintf("echo \"initscript file too large, from key %s\";\n", metaKeyName)
}
if utilfn.HasBinaryData(fileData) {
blocklogger.Infof(logCtx, "[conndebug] initscript file contains binary data\n")
return fmt.Sprintf("echo \"initscript file contains binary data, from key %s\";\n", metaKeyName)
}
blocklogger.Infof(logCtx, "[conndebug] initscript file read successfully, size=%d\n", len(fileData))
return string(fileData)
}
// returns (value, metakey)
func getCustomInitScriptValue(meta waveobj.MetaMapType, connName string, shellType string) (string, string) {
keys := getCustomInitScriptKeyCascade(shellType)
connMeta := meta.GetConnectionOverride(connName)
if connMeta != nil {
for _, key := range keys {
if connMeta.HasKey(key) {
return connMeta.GetString(key, ""), "blockmeta/[" + connName + "]/" + key
}
}
}
for _, key := range keys {
if meta.HasKey(key) {
return meta.GetString(key, ""), "blockmeta/" + key
}
}
fullConfig := wconfig.GetWatcher().GetFullConfig()
connKeywords := fullConfig.Connections[connName]
connKeywordsMap := make(map[string]any)
err := utilfn.ReUnmarshal(&connKeywordsMap, connKeywords)
if err != nil {
log.Printf("error re-unmarshalling connKeywords: %v\n", err)
return "", ""
}
ckMeta := waveobj.MetaMapType(connKeywordsMap)
for _, key := range keys {
if ckMeta.HasKey(key) {
return ckMeta.GetString(key, ""), "connections.json/" + connName + "/" + key
}
}
return "", ""
}
func updateTermSize(shellProc *shellexec.ShellProc, blockId string, termSize waveobj.TermSize) {
err := setTermSizeInDB(blockId, termSize)
if err != nil {
log.Printf("error setting pty size: %v\n", err)
}
err = shellProc.Cmd.SetSize(termSize.Rows, termSize.Cols)
if err != nil {
log.Printf("error setting pty size: %v\n", err)
}
}
func setTermSizeInDB(blockId string, termSize waveobj.TermSize) error {
ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFn()
ctx = waveobj.ContextWithUpdates(ctx)
bdata, err := wstore.DBMustGet[*waveobj.Block](ctx, blockId)
if err != nil {
return fmt.Errorf("error getting block data: %v", err)
}
if bdata.RuntimeOpts == nil {
bdata.RuntimeOpts = &waveobj.RuntimeOpts{}
}
bdata.RuntimeOpts.TermSize = termSize
err = wstore.DBUpdate(ctx, bdata)
if err != nil {
return fmt.Errorf("error updating block data: %v", err)
}
updates := waveobj.ContextGetUpdatesRtn(ctx)
wps.Broker.SendUpdateEvents(updates)
return nil
}