Skip to content

Commit 4b1ddc3

Browse files
Do not use buffering in pipe file handles
Allow buffer size specification in byte and char APIs.
1 parent 7b666db commit 4b1ddc3

File tree

4 files changed

+113
-29
lines changed

4 files changed

+113
-29
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changelog
22

3+
## Unreleased
4+
5+
* Remove buffering from the pipe chunked APIs.
6+
37
## 0.3.1 (Dec 2023)
48

59
* Allow streamly-0.10.0 and streamly-core-0.2.0

src/DocTestProcess.hs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
>>> :set -XFlexibleContexts
44
>>> :set -XScopedTypeVariables
5+
>>> :set -Wno-deprecations
56
>>> import Data.Char (toUpper)
67
>>> import Data.Function ((&))
78
>>> import qualified Streamly.Console.Stdio as Stdio
@@ -13,7 +14,8 @@
1314
1415
For APIs that have not been released yet.
1516
16-
>>> import qualified Streamly.Internal.Console.Stdio as Stdio (putChars, putChunks)
17+
>>> import Streamly.Internal.System.IO (defaultChunkSize)
18+
>>> import qualified Streamly.Internal.Console.Stdio as Stdio (putChars, putChunks, readChunks)
1719
>>> import qualified Streamly.Internal.FileSystem.Dir as Dir (readFiles)
1820
>>> import qualified Streamly.Internal.System.Process as Process
1921
>>> import qualified Streamly.Internal.Unicode.Stream as Unicode (lines)

src/Streamly/Internal/System/Process.hs

Lines changed: 94 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
--
99

1010
{-# LANGUAGE CPP #-}
11+
{-# LANGUAGE FlexibleContexts #-}
1112
{-# LANGUAGE ScopedTypeVariables #-}
1213

1314
-- TODO:
@@ -42,7 +43,6 @@
4243
--
4344
-- - Replace FilePath with a typed path.
4445
--
45-
{-# LANGUAGE FlexibleContexts #-}
4646

4747
module Streamly.Internal.System.Process
4848
(
@@ -138,7 +138,7 @@ import Streamly.Data.Array (Array)
138138
import Streamly.Data.Fold (Fold)
139139
import Streamly.Data.Stream.Prelude (MonadAsync, Stream)
140140
import System.Exit (ExitCode(..))
141-
import System.IO (hClose, Handle)
141+
import System.IO (hClose, Handle, hSetBuffering, BufferMode(..))
142142
#if !defined(mingw32_HOST_OS)
143143
import System.Posix.Types (CUid (..), CGid (..))
144144
#endif
@@ -485,7 +485,7 @@ parallel s1 s2 = Stream.parList (Stream.eager True) [s1, s2]
485485
-------------------------------------------------------------------------------
486486
-- Transformation
487487
-------------------------------------------------------------------------------
488-
--
488+
489489
-- | On normal cleanup we do not need to close the pipe handles as they are
490490
-- already guaranteed to be closed (we can assert that) by the time we reach
491491
-- here. We should not kill the process, rather wait for it to terminate
@@ -574,9 +574,16 @@ createProc' modCfg path args = do
574574
-- XXX Read the exception channel and reap the process if it failed before
575575
-- exec.
576576
parent
577+
hSetBuffering inp NoBuffering
578+
hSetBuffering out NoBuffering
579+
hSetBuffering err NoBuffering
577580
return (Just inp, Just out, err, proc)
578581
#else
579-
createProcess cfg
582+
r@(inp, out, err, _) <- createProcess cfg
583+
mapM_ (`hSetBuffering` NoBuffering) inp
584+
mapM_ (`hSetBuffering` NoBuffering) out
585+
mapM_ (`hSetBuffering` NoBuffering) err
586+
return r
580587
#endif
581588

582589
where
@@ -634,6 +641,11 @@ pipeChunksEitherWith modifier path args input =
634641

635642
-- | Like 'pipeChunks' but also includes stderr as 'Left' stream in the
636643
-- 'Either' output.
644+
--
645+
-- Definition:
646+
--
647+
-- >>> pipeChunksEither = pipeChunksEitherWith id
648+
--
637649
{-# INLINE pipeChunksEither #-}
638650
pipeChunksEither ::
639651
(MonadCatch m, MonadAsync m)
@@ -643,10 +655,13 @@ pipeChunksEither ::
643655
-> Stream m (Either (Array Word8) (Array Word8)) -- ^ Output stream
644656
pipeChunksEither = pipeChunksEitherWith id
645657

646-
-- | @pipeBytesEither path args input@ runs the executable at @path@ using @args@
647-
-- as arguments and @input@ stream as its standard input. The error stream of
648-
-- the executable is presented as 'Left' values in the resulting stream and
649-
-- output stream as 'Right' values.
658+
-- | @pipeBytesEither path args input@ runs the executable at @path@ using
659+
-- @args@ as arguments and @input@ stream as its standard input. The error
660+
-- stream of the executable is presented as 'Left' values in the resulting
661+
-- stream and output stream as 'Right' values. The input to the pipe is
662+
-- buffered with a buffer size of 'defaultChunkSize'.
663+
--
664+
-- For control over the buffer use your own chunking and chunk based APIs.
650665
--
651666
-- Raises 'ProcessFailure' exception in case of failure.
652667
--
@@ -697,7 +712,8 @@ pipeChunksWith modifier path args input =
697712

698713
-- | @pipeChunks file args input@ runs the executable @file@ specified by
699714
-- its name or path using @args@ as arguments and @input@ stream as its
700-
-- standard input. Returns the standard output of the executable as a stream.
715+
-- standard input. Returns the standard output of the process as a stream
716+
-- of chunks of bytes (Array Word8).
701717
--
702718
-- If only the name of an executable file is specified instead of its path then
703719
-- the file name is searched in the directories specified by the PATH
@@ -719,6 +735,10 @@ pipeChunksWith modifier path args input =
719735
-- :}
720736
--HELLO WORLD
721737
--
738+
-- Definition:
739+
--
740+
-- >>> pipeChunks = pipeChunksWith id
741+
--
722742
-- /pre-release/
723743
{-# INLINE pipeChunks #-}
724744
pipeChunks ::
@@ -739,8 +759,12 @@ processChunks ::
739759
-> Stream m (Array Word8) -- ^ Output stream
740760
processChunks = pipeChunks
741761

742-
-- | Like 'pipeChunks' except that it works on a stream of bytes instead of
743-
-- a stream of chunks.
762+
-- | Like 'pipeChunks' except that its input and output is stream of bytes
763+
-- instead of a stream of chunks. The input to the pipe is buffered with a
764+
-- buffer size of 'defaultChunkSize'.
765+
--
766+
-- For control over the input buffer use your own chunking and chunk based
767+
-- APIs.
744768
--
745769
-- We can write the example in 'pipeChunks' as follows.
746770
--
@@ -774,8 +798,12 @@ processBytes ::
774798
-> Stream m Word8 -- ^ Output Stream
775799
processBytes = pipeBytes
776800

777-
-- | Like 'pipeChunks' except that it works on a stream of chars instead of
778-
-- a stream of chunks.
801+
-- | Like 'pipeChunks' except that its input and output is stream of chars
802+
-- instead of a stream of chunks. The input to the pipe is buffered with a
803+
-- buffer size of 'defaultChunkSize'.
804+
--
805+
-- For control over the input buffer use your own chunking and chunk based
806+
-- APIs.
779807
--
780808
-- >>> :{
781809
-- Process.toChars "echo" ["hello world"]
@@ -847,8 +875,9 @@ toChunksWith modifier path args =
847875
run _ = error "toChunksWith: Not reachable"
848876

849877
-- | @toBytesEither path args@ runs the executable at @path@ using @args@ as
850-
-- arguments and returns a stream of 'Either' bytes. The 'Left' values are from
851-
-- @stderr@ and the 'Right' values are from @stdout@ of the executable.
878+
-- arguments and returns the output of the process as a stream of 'Either'
879+
-- bytes. The 'Left' values are from @stderr@ and the 'Right' values are from
880+
-- @stdout@ of the executable.
852881
--
853882
-- Raises 'ProcessFailure' exception in case of failure.
854883
--
@@ -877,8 +906,12 @@ toBytesEither path args =
877906
rightRdr = fmap Right Array.reader
878907
in Stream.unfoldMany (Unfold.either leftRdr rightRdr) output
879908

880-
-- | The following code is equivalent to the shell command @echo "hello
881-
-- world"@:
909+
-- | @toBytes path args@ runs the executable specified by @path@ using @args@
910+
-- as arguments and returns the output of the process as a stream of bytes.
911+
--
912+
-- Raises 'ProcessFailure' exception in case of failure.
913+
--
914+
-- The following code is equivalent to the shell command @echo "hello world"@:
882915
--
883916
-- >>> :{
884917
-- Process.toBytes "echo" ["hello world"]
@@ -921,8 +954,13 @@ toChunksEither ::
921954
-> Stream m (Either (Array Word8) (Array Word8)) -- ^ Output Stream
922955
toChunksEither = toChunksEitherWith id
923956

924-
-- | The following code is equivalent to the shell command @echo "hello
925-
-- world"@:
957+
-- | @toChunks path args@ runs the executable specified by @path@ using @args@
958+
-- as arguments and returns the output of the process as a stream of chunks of
959+
-- bytes (Array Word8).
960+
--
961+
-- Raises 'ProcessFailure' exception in case of failure.
962+
--
963+
-- The following code is equivalent to the shell command @echo "hello world"@:
926964
--
927965
-- >>> :{
928966
-- Process.toChunks "echo" ["hello world"]
@@ -941,7 +979,13 @@ toChunks ::
941979
-> Stream m (Array Word8) -- ^ Output Stream
942980
toChunks = toChunksWith id
943981

944-
-- |
982+
-- | @toChars path args@ runs the executable specified by @path@ using @args@
983+
-- as arguments and returns the output of the process as a stream of chars.
984+
--
985+
-- Raises 'ProcessFailure' exception in case of failure.
986+
--
987+
-- Definition:
988+
--
945989
-- >>> toChars path args = toBytes path args & Unicode.decodeUtf8
946990
--
947991
{-# INLINE toChars #-}
@@ -952,8 +996,19 @@ toChars ::
952996
-> Stream m Char -- ^ Output Stream
953997
toChars path args = toBytes path args & Unicode.decodeUtf8
954998

955-
-- |
956-
-- >>> toLines path args f = toChars path args & Unicode.lines f
999+
-- | @toLines f path args@ runs the executable specified by @path@ using @args@
1000+
-- as arguments and folds the output of the process at line breaks, using the
1001+
-- fold @f@, to return a stream of folded lines.
1002+
--
1003+
-- Raises 'ProcessFailure' exception in case of failure.
1004+
--
1005+
-- To return a stream of lines as strings:
1006+
--
1007+
-- >>> toStrings = toLines Fold.toList
1008+
--
1009+
-- Definition:
1010+
--
1011+
-- >>> toLines f path args = toChars path args & Unicode.lines f
9571012
--
9581013
{-# INLINE toLines #-}
9591014
toLines ::
@@ -964,18 +1019,26 @@ toLines ::
9641019
-> Stream m a -- ^ Output Stream
9651020
toLines f path args = toChars path args & Unicode.lines f
9661021

967-
-- |
968-
-- >>> toString path args = toChars path args & Stream.fold Fold.toList
1022+
-- | @toString path args@ runs the executable specified by @path@ using @args@
1023+
-- as arguments and folds the entire output of the process as a single string.
1024+
--
1025+
-- Definition:
1026+
--
1027+
-- >>> toString path args = toChars path args & Stream.toList
9691028
--
9701029
{-# INLINE toString #-}
9711030
toString ::
9721031
(MonadAsync m, MonadCatch m)
9731032
=> FilePath -- ^ Executable name or path
9741033
-> [String] -- ^ Arguments
9751034
-> m String
976-
toString path args = toChars path args & Stream.fold Fold.toList
1035+
toString path args = toChars path args & Stream.toList
9771036

978-
-- |
1037+
-- | @toStdout path args@ runs the executable specified by @path@ using @args@
1038+
-- as arguments and returns the output of the process on stdout.
1039+
--
1040+
-- Definition:
1041+
--
9791042
-- >>> toStdout path args = toChunks path args & Stdio.putChunks
9801043
--
9811044
{-# INLINE toStdout #-}
@@ -992,7 +1055,11 @@ toStdout path args = do
9921055
return ()
9931056
-}
9941057

995-
-- |
1058+
-- | @toNull path args@ runs the executable specified by @path@ using @args@
1059+
-- as arguments and discards the output of the process.
1060+
--
1061+
-- Definition:
1062+
--
9961063
-- >>> toNull path args = toChunks path args & Stream.fold Fold.drain
9971064
--
9981065
{-# INLINE toNull #-}

src/Streamly/System/Process.hs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@
3838
--
3939
-- >>> :{
4040
-- Process.toBytes "echo" ["hello world"]
41-
-- & Unicode.decodeLatin1 & fmap toUpper & Unicode.encodeLatin1
41+
-- & Unicode.decodeLatin1
42+
-- & fmap toUpper
43+
-- & Unicode.encodeLatin1
4244
-- & Stream.fold Stdio.write
4345
-- :}
4446
-- HELLO WORLD
@@ -74,6 +76,15 @@
7476
-- & Stream.fold Stdio.writeChunks
7577
-- :}
7678
--
79+
-- = Running Interactive Programs (e.g. ghci)
80+
--
81+
-- >>> :{
82+
-- ghci =
83+
-- Stdio.readChunks
84+
-- & Process.pipeChunks "ghci" []
85+
-- & Stdio.putChunks
86+
-- :}
87+
--
7788
-- = Experimental APIs
7889
--
7990
-- See "Streamly.Internal.System.Process" for unreleased functions.

0 commit comments

Comments
 (0)