diff --git a/CHANGELOG.md b/CHANGELOG.md index 30aca87..6f12345 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Unreleased * Remove buffering from the pipe chunked APIs. +* Add a `toBytesWith` function to modify process configuration ## 0.3.1 (Dec 2023) diff --git a/src/Streamly/Internal/System/Process.hs b/src/Streamly/Internal/System/Process.hs index 1c53706..7f52567 100644 --- a/src/Streamly/Internal/System/Process.hs +++ b/src/Streamly/Internal/System/Process.hs @@ -77,6 +77,13 @@ module Streamly.Internal.System.Process , inheritStdin , inheritStdout , pipeStdErr + , pipeStdin + , pipeStdout + + -- ** CleanupConfig options + , terminateWithSigInt + , terminateProcessGroup + , waitOnTermination -- * Exceptions , ProcessFailure (..) @@ -84,6 +91,7 @@ module Streamly.Internal.System.Process -- * Generation -- | stdout of the process is redirected to output stream. , toBytes + , toBytesWith , toChunks , toChunksWith , toChars @@ -156,8 +164,12 @@ import System.Process , waitForProcess , CmdSpec(..) , terminateProcess + , interruptProcessGroupOf , withCreateProcess ) +import System.Process.Internals (ProcessHandle__(..), withProcessHandle) +import System.Posix.Signals (signalProcess, signalProcessGroup, sigINT, sigTERM) +import System.Posix.Process (getProcessGroupIDOf) #endif import qualified Streamly.Data.Array as Array @@ -226,6 +238,12 @@ mkConfig _ _ = Config False pipeStdErr :: Config -> Config pipeStdErr (Config _) = Config True +pipeStdin :: Config -> Config +pipeStdin (Config _) = Config True + +pipeStdout :: Config -> Config +pipeStdout (Config _) = Config True + inheritStdin :: Config -> Config inheritStdin (Config _) = Config True @@ -234,13 +252,22 @@ inheritStdout (Config _) = Config True #else -newtype Config = Config CreateProcess +data CleanupConfig = CleanupConfig + { deliverSigInt :: Bool + , deliverToProcessGroup :: Bool + , blockingWait :: Bool + } + +defaultCleanupConfig :: CleanupConfig +defaultCleanupConfig = CleanupConfig False False False + +data Config = Config CleanupConfig CreateProcess -- | Create a default process configuration from an executable file path and -- an argument list. -- mkConfig :: FilePath -> [String] -> Config -mkConfig path args = Config $ CreateProcess +mkConfig path args = Config defaultCleanupConfig $ CreateProcess { cmdspec = RawCommand path args , cwd = Nothing -- inherit , env = Nothing -- inherit @@ -278,14 +305,14 @@ mkConfig path args = Config $ CreateProcess -- -- Default is 'Nothing' - inherited from the parent process. setCwd :: Maybe FilePath -> Config -> Config -setCwd path (Config cfg) = Config $ cfg { cwd = path } +setCwd path (Config cleanupCfg cfg) = Config cleanupCfg $ cfg { cwd = path } -- | Set the environment variables for the new process. When 'Nothing', the -- environment is inherited from the parent process. -- -- Default is 'Nothing' - inherited from the parent process. setEnv :: Maybe [(String, String)] -> Config -> Config -setEnv e (Config cfg) = Config $ cfg { env = e } +setEnv e (Config cleanupCfg cfg) = Config cleanupCfg $ cfg { env = e } {- -- XXX We should allow setting only those stdio streams which are not used for @@ -315,15 +342,15 @@ toStdStream x = -- | What to do with the @stdin@ stream of the process. setStdin :: Stdio -> Config -> Config -setStdin x (Config cfg) = Config $ cfg { std_in = toStdStream x } +setStdin x (Config cleanupCfg cfg) = Config cleanupCfg $ cfg { std_in = toStdStream x } -- | What to do with the @stdout@ stream of the process. setStdout :: Stdio -> Config -> Config -setStdout x (Config cfg) = Config $ cfg { std_out = toStdStream x } +setStdout x (Config cleanupCfg cfg) = Config cleanupCfg $ cfg { std_out = toStdStream x } -- | What to do with the @stderr@ stream of the process. setStderr :: Stdio -> Config -> Config -setStderr x (Config cfg) = Config $ cfg { std_err = toStdStream x } +setStderr x (Config cleanupCfg cfg) = Config cleanupCfg $ cfg { std_err = toStdStream x } -} -- | Close all open file descriptors inherited from the parent process. Note, @@ -335,20 +362,23 @@ setStderr x (Config cfg) = Config $ cfg { std_err = toStdStream x } -- Note: if the number of open descriptors is large, it may take a while -- closing them. closeFiles :: Bool -> Config -> Config -closeFiles x (Config cfg) = Config $ cfg { close_fds = x } +closeFiles x (Config cleanupCfg cfg) = Config cleanupCfg $ cfg { close_fds = x } -- XXX Do these details apply to Windows as well? -- | If 'True' the new process starts a new process group, becomes a process -- group leader, its pid becoming the process group id. -- +-- If a new process group is created, termination signals are also delivered to the +-- process group +-- -- See the POSIX -- -- man page. -- -- Default is 'False', the new process belongs to the parent's process group. newProcessGroup :: Bool -> Config -> Config -newProcessGroup x (Config cfg) = Config $ cfg { create_group = x } +newProcessGroup x (Config cleanupCfg cfg) = Config cleanupCfg{ deliverToProcessGroup = x } $ cfg { create_group = x } -- | 'InheritSession' makes the new process inherit the terminal session from the -- parent process. This is the default. @@ -381,8 +411,8 @@ data Session = -- -- Default is 'InheritSession'. setSession :: Session -> Config -> Config -setSession x (Config cfg) = - Config $ +setSession x (Config cleanupCfg cfg) = + Config cleanupCfg $ case x of InheritSession -> cfg NewSession -> cfg { new_session = True} @@ -401,11 +431,11 @@ setSession x (Config cfg) = -- Default is 'Nothing' - inherit from the parent. setUserId :: Maybe Word32 -> Config -> Config #if defined(mingw32_HOST_OS) -setUserId _ (Config cfg) = - Config cfg +setUserId _ (Config cleanupCfg cfg) = + Config cleanupCfg cfg #else -setUserId x (Config cfg) = - Config $ cfg { child_user = CUid <$> x } +setUserId x (Config cleanupCfg cfg) = + Config cleanupCfg $ cfg { child_user = CUid <$> x } #endif -- | Use the POSIX @@ -421,13 +451,22 @@ setUserId x (Config cfg) = -- Default is 'Nothing' - inherit from the parent. setGroupId :: Maybe Word32 -> Config -> Config #if defined(mingw32_HOST_OS) -setGroupId _ (Config cfg) = - Config cfg +setGroupId _ (Config cleanupCfg cfg) = + Config cleanupCfg cfg #else -setGroupId x (Config cfg) = - Config $ cfg { child_group = CGid <$> x } +setGroupId x (Config cleanupCfg cfg) = + Config cleanupCfg $ cfg { child_group = CGid <$> x } #endif +terminateWithSigInt :: Bool -> Config -> Config +terminateWithSigInt b (Config cleanupCfg cfg) = Config cleanupCfg{ deliverSigInt = b } cfg + +terminateProcessGroup :: Config -> Config +terminateProcessGroup (Config cleanupCfg cfg) = Config cleanupCfg{ deliverToProcessGroup = True } cfg + +waitOnTermination :: Bool -> Config -> Config +waitOnTermination b (Config cleanupCfg cfg) = Config cleanupCfg{ blockingWait = b } cfg + -- See https://www.cons.org/cracauer/sigint.html for more details on signal -- handling by interactive processes. @@ -445,7 +484,7 @@ setGroupId x (Config cfg) = -- -- POSIX only. Default is 'False'. interruptChildOnly :: Bool -> Config -> Config -interruptChildOnly x (Config cfg) = Config $ cfg { delegate_ctlc = x } +interruptChildOnly x (Config cleanupCfg cfg) = Config cleanupCfg $ cfg { delegate_ctlc = x } {-# DEPRECATED parentIgnoresInterrupt "Use interruptChildOnly instead." #-} parentIgnoresInterrupt :: Bool -> Config -> Config @@ -456,20 +495,26 @@ parentIgnoresInterrupt = interruptChildOnly -- -- Default is 'True'. waitForDescendants :: Bool -> Config -> Config -waitForDescendants x (Config cfg) = Config $ cfg { use_process_jobs = x } +waitForDescendants x (Config cleanupCfg cfg) = Config cleanupCfg $ cfg { use_process_jobs = x } {-# DEPRECATED waitForChildTree "Use waitForDescendants instead." #-} waitForChildTree :: Bool -> Config -> Config waitForChildTree = waitForDescendants pipeStdErr :: Config -> Config -pipeStdErr (Config cfg) = Config $ cfg { std_err = CreatePipe } +pipeStdErr (Config cleanupCfg cfg) = Config cleanupCfg $ cfg { std_err = CreatePipe } + +pipeStdin :: Config -> Config +pipeStdin (Config cleanupCfg cfg) = Config cleanupCfg $ cfg { std_in = CreatePipe } + +pipeStdout :: Config -> Config +pipeStdout (Config cleanupCfg cfg) = Config cleanupCfg $ cfg { std_out = CreatePipe } inheritStdin :: Config -> Config -inheritStdin (Config cfg) = Config $ cfg { std_in = Inherit } +inheritStdin (Config cleanupCfg cfg) = Config cleanupCfg $ cfg { std_in = Inherit } inheritStdout :: Config -> Config -inheritStdout (Config cfg) = Config $ cfg { std_out = Inherit } +inheritStdout (Config cleanupCfg cfg) = Config cleanupCfg $ cfg { std_out = Inherit } #endif ------------------------------------------------------------------------------- @@ -529,27 +574,40 @@ cleanupNormal (_, _, _, procHandle) = do -- possibly use a timer and send a SIGKILL after the timeout if the process is -- still hanging around. cleanupException :: - (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> IO () -cleanupException (Just stdinH, Just stdoutH, stderrMaybe, ph) = do + CleanupConfig + -> (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) -> IO () +cleanupException cleanupCfg (stdinMaybe, stdoutMaybe, stderrMaybe, ph) = do -- Send a SIGTERM to the process #ifdef USE_NATIVE - terminate ph + if deliverSigInt cleanupCfg + then interruptProcess ph + else terminate ph #else - terminateProcess ph + if deliverSigInt cleanupCfg + then if deliverToProcessGroup cleanupCfg + then interruptProcessGroupOf ph + else interruptProcess ph + else if deliverToProcessGroup cleanupCfg + then terminateProcessGroupOf ph + else terminateProcess ph #endif -- Ideally we should be closing the handle without flushing the buffers so -- that we cannot get a SIGPIPE. But there seems to be no way to do that as -- of now so we just ignore the SIGPIPE. - hClose stdinH `catch` eatSIGPIPE - hClose stdoutH + whenJust (\stdinH -> hClose stdinH `catch` eatSIGPIPE) stdinMaybe + whenJust hClose stdoutMaybe whenJust hClose stderrMaybe -- Non-blocking wait for the process to go away #ifdef USE_NATIVE - void $ forkIO (void $ wait ph) + if blockingWait cleanupCfg + then void $ wait ph + else void $ forkIO (void $ wait ph) #else - void $ forkIO (void $ waitForProcess ph) + if blockingWait cleanupCfg + then void $ waitForProcess ph + else void $ forkIO (void $ waitForProcess ph) #endif where @@ -565,7 +623,6 @@ cleanupException (Just stdinH, Just stdoutH, stderrMaybe, ph) = do _ -> False eatSIGPIPE e = unless (isSIGPIPE e) $ throwIO e -cleanupException _ = error "cleanupProcess: Not reachable" -- | Creates a system process from an executable path and arguments. For the -- default attributes used to create the process see 'mkConfig'. @@ -574,7 +631,7 @@ createProc' :: (Config -> Config) -- ^ Process attribute modifier -> FilePath -- ^ Executable path -> [String] -- ^ Arguments - -> IO (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle) + -> IO (CleanupConfig, (Maybe Handle, Maybe Handle, Maybe Handle, ProcessHandle)) -- ^ (Input Handle, Output Handle, Error Handle, Process Handle) createProc' modCfg path args = do #ifdef USE_NATIVE @@ -589,18 +646,18 @@ createProc' modCfg path args = do hSetBuffering inp NoBuffering hSetBuffering out NoBuffering hSetBuffering err NoBuffering - return (Just inp, Just out, err, proc) + return (cleanupCfg, (Just inp, Just out, err, proc)) #else r@(inp, out, err, _) <- createProcess cfg mapM_ (`hSetBuffering` NoBuffering) inp mapM_ (`hSetBuffering` NoBuffering) out mapM_ (`hSetBuffering` NoBuffering) err - return r + return (cleanupCfg, r) #endif where - Config cfg = modCfg $ mkConfig path args + Config cleanupCfg cfg = modCfg $ mkConfig path args {-# INLINE putChunksClose #-} putChunksClose :: MonadIO m => @@ -624,12 +681,17 @@ pipeChunksWithAction :: -> Stream m a -- ^ Output stream pipeChunksWithAction run modCfg path args = Stream.bracketIO3 - alloc cleanupNormal cleanupException cleanupException run + alloc (cleanupNormal . snd) (uncurry cleanupException) (uncurry cleanupException) (run . snd) where alloc = createProc' modCfg path args +-- Note: It is allowed to inheritStdout or inheritStderr but not both as that +-- would not generate a stream for further processing. +-- inheritStdin has no affect, we always pipe the input stream to the process's +-- stdin + -- | Like 'pipeChunksEither' but use the specified configuration to run the -- process. {-# INLINE pipeChunksEitherWith #-} @@ -641,10 +703,12 @@ pipeChunksEitherWith :: -> Stream m (Array Word8) -- ^ Input stream -> Stream m (Either (Array Word8) (Array Word8)) -- ^ Output stream pipeChunksEitherWith modifier path args input = - pipeChunksWithAction run (modifier . pipeStdErr) path args + pipeChunksWithAction run (pipeStdin . modifier . pipeStdErr) path args where + run (_, Nothing, Nothing, _) = + error "pipeChunksEitherWith: only one of stdout or stderr can be inherited" run (Just stdinH, Just stdoutH, Just stderrH, _) = putChunksClose stdinH input `parallel` fmap Left (toChunksClose stderrH) @@ -704,6 +768,10 @@ pipeBytesEither path args input = rightRdr = fmap Right Array.reader in UNFOLD_EACH (Unfold.either leftRdr rightRdr) output +-- Note: inheritStdin, inheritStdout have no affect, we always pipe +-- the input stream to the process's stdin and pipe the stdout to the +-- resulting stream + -- | Like 'pipeChunks' but use the specified configuration to run the process. {-# INLINE pipeChunksWith #-} pipeChunksWith :: @@ -714,13 +782,14 @@ pipeChunksWith :: -> Stream m (Array Word8) -- ^ Input stream -> Stream m (Array Word8) -- ^ Output stream pipeChunksWith modifier path args input = - pipeChunksWithAction run modifier path args + pipeChunksWithAction run (pipeStdout . pipeStdin . modifier) path args where run (Just stdinH, Just stdoutH, _, _) = - putChunksClose stdinH input `parallel` toChunksClose stdoutH - run _ = error "pipeChunksWith: Not reachable" + putChunksClose stdinH input + `parallel` toChunksClose stdoutH + run _ = error "pipeChunksWith: unreachable" -- | @pipeChunks file args input@ runs the executable @file@ specified by -- its name or path using @args@ as arguments and @input@ stream as its @@ -810,6 +879,9 @@ processBytes :: -> Stream m Word8 -- ^ Output Stream processBytes = pipeBytes +whenJustS :: Applicative m => (a -> Stream m b) -> Maybe a -> Stream m b +whenJustS action mb = maybe Stream.nil action mb + -- | Like 'pipeChunks' except that its input and output is stream of chars -- instead of a stream of chunks. The input to the pipe is buffered with a -- buffer size of 'defaultChunkSize'. @@ -853,6 +925,10 @@ pipeChars path args input = -- Generation ------------------------------------------------------------------------------- +-- Note: It is allowed to inheritStdout or inheritStderr but not both as that +-- would not generate a stream for further processing and would result in unintuitive +-- behaviour + -- | Like 'toChunksEither' but use the specified configuration to run the -- process. {-# INLINE toChunksEitherWith #-} @@ -863,14 +939,18 @@ toChunksEitherWith :: -> [String] -- ^ Arguments -> Stream m (Either (Array Word8) (Array Word8)) -- ^ Output stream toChunksEitherWith modifier path args = - pipeChunksWithAction run (modifier . inheritStdin . pipeStdErr) path args + pipeChunksWithAction run (modifier . inheritStdin . pipeStdErr . pipeStdout) path args where - run (_, Just stdoutH, Just stderrH, _) = - fmap Left (toChunksClose stderrH) - `parallel` fmap Right (toChunksClose stdoutH) - run _ = error "toChunksEitherWith: Not reachable" + run (_, Nothing, Nothing, _) = + error "toChunksEitherWith: only one of stdout or stderr can be inherited" + run (_, stdoutMaybe, stderrMaybe, _) = + fmap Left (whenJustS toChunksClose stderrMaybe) + `parallel` fmap Right (whenJustS toChunksClose stdoutMaybe) + +-- Note: inheritStdout has no affect, we always pipe stdout to the resulting +-- stream -- | Like 'toChunks' but use the specified configuration to run the process. {-# INLINE toChunksWith #-} @@ -881,12 +961,12 @@ toChunksWith :: -> [String] -- ^ Arguments -> Stream m (Array Word8) -- ^ Output stream toChunksWith modifier path args = - pipeChunksWithAction run (modifier . inheritStdin) path args + pipeChunksWithAction run (pipeStdout . modifier . inheritStdin) path args where run (_, Just stdoutH, _, _) = toChunksClose stdoutH - run _ = error "toChunksWith: Not reachable" + run _ = error "toChunksWith: Not reachable" -- | @toBytesEither path args@ runs the executable at @path@ using @args@ as -- arguments and returns the output of the process as a stream of 'Either' @@ -944,6 +1024,20 @@ toBytes path args = let output = toChunks path args in UNFOLD_EACH Array.reader output +-- | Like 'toBytes' but use the specified configuration to run the process. +-- +-- @since 0.3.2 +{-# INLINE toBytesWith #-} +toBytesWith :: + (MonadAsync m, MonadCatch m) + => (Config -> Config) + -> FilePath -- ^ Executable name or path + -> [String] -- ^ Arguments + -> Stream m Word8 -- ^ Output Stream +toBytesWith modCfg path args = + let output = toChunksWith modCfg path args + in UNFOLD_EACH Array.reader output + -- | Like 'toBytesEither' but generates a stream of @Array Word8@ instead of a stream -- of @Word8@. -- @@ -1128,7 +1222,7 @@ standalone wait (close_stdin, close_stdout, close_stderr) modCfg path args = else return $ Right procHandle cfg = - let Config c = modCfg $ mkConfig path args + let Config _ c = modCfg $ mkConfig path args s_in = if close_stdin then NoStream else Inherit s_out = if close_stdout then NoStream else Inherit s_err = if close_stderr then NoStream else Inherit @@ -1193,3 +1287,30 @@ daemon modCfg path args = (setSession NewSession . modCfg) path args in fmap (either undefined id) r + + +#if defined(mingw32_HOST_OS) +#else +interruptProcess + :: ProcessHandle -- ^ A process in the process group + -> IO () +interruptProcess ph = do + withProcessHandle ph $ \p_ -> do + case p_ of + OpenExtHandle{} -> return () + ClosedHandle _ -> return () + OpenHandle h -> do + signalProcess sigINT h + +terminateProcessGroupOf + :: ProcessHandle -- ^ A process in the process group + -> IO () +terminateProcessGroupOf ph = do + withProcessHandle ph $ \p_ -> + case p_ of + ClosedHandle _ -> return () + OpenExtHandle{} -> error "terminateProcessGroupOf with OpenExtHandle should not happen on POSIX." + OpenHandle h -> do + pgid <- getProcessGroupIDOf h + signalProcessGroup sigTERM pgid +#endif diff --git a/src/Streamly/System/Process.hs b/src/Streamly/System/Process.hs index 8e0e2bb..5b155b2 100644 --- a/src/Streamly/System/Process.hs +++ b/src/Streamly/System/Process.hs @@ -128,10 +128,16 @@ module Streamly.System.Process -- | These options have no effect on Posix. , waitForDescendants + -- ** CleanupConfig options + , terminateWithSigInt + , terminateProcessGroup + , waitOnTermination + -- * Generation , toChunks , toChunksWith , toBytes + , toBytesWith , toChars , toLines diff --git a/streamly-process.cabal b/streamly-process.cabal index 0cd0f0c..f864ac9 100644 --- a/streamly-process.cabal +++ b/streamly-process.cabal @@ -100,11 +100,11 @@ library , streamly >= 0.9 && < 0.12 , streamly-core >= 0.1 && < 0.4 if !flag(use-native) - build-depends: process >= 1.0 && < 1.7 - else - if !os(windows) - build-depends: - unix >= 2.5 && < 2.8 + build-depends: + process >= 1.0 && < 1.7 + if !os(windows) + build-depends: + unix >= 2.5 && < 2.8 ------------------------------------------------------------------------------- -- Benchmarks