@@ -14,7 +14,7 @@ function ParserStream(options) {
14
14
stream . Transform . call ( this , options ) ;
15
15
this . lines = "" ;
16
16
this . _parsedHeaders = false ;
17
- this . _rowCount = 0 ;
17
+ this . _rowCount = - 1 ;
18
18
this . _emitData = false ;
19
19
options = options || { } ;
20
20
var delimiter ;
@@ -31,6 +31,7 @@ function ParserStream(options) {
31
31
this . parser = createParser ( options ) ;
32
32
this . _headers = options . headers ;
33
33
this . _ignoreEmpty = options . ignoreEmpty ;
34
+ this . __buffered = [ ] ;
34
35
return this ;
35
36
}
36
37
@@ -61,7 +62,7 @@ extended(ParserStream).extend({
61
62
62
63
__handleLine : function __parseLineData ( line , index , ignore ) {
63
64
var ignoreEmpty = this . _ignoreEmpty ;
64
- if ( extended . isBoolean ( ignoreEmpty ) && ignoreEmpty && ( ! line || EMPTY . test ( line . join ( "" ) ) ) ) {
65
+ if ( extended . isBoolean ( ignoreEmpty ) && ignoreEmpty && ( ! line || EMPTY . test ( line . join ( "" ) ) ) ) {
65
66
return null ;
66
67
}
67
68
if ( ! ignore ) {
@@ -77,9 +78,7 @@ extended(ParserStream).extend({
77
78
} ,
78
79
79
80
_parse : function _parseLine ( data , hasMoreData ) {
80
- var row ,
81
- emitData = this . _emitData ,
82
- count = 0 , ret , rows , self = this ;
81
+ var row , count , ret , rows , self = this ;
83
82
try {
84
83
data = this . parser ( data , hasMoreData ) ;
85
84
ret = data . line ;
@@ -109,12 +108,15 @@ extended(ParserStream).extend({
109
108
for ( var i = 0 , l = rows . length ; i < l ; i ++ ) {
110
109
row = rows [ i ] ;
111
110
if ( row ) {
112
- var dataRow = this . __handleLine ( row , count ) ;
111
+ var dataRow = this . __handleLine ( row , ( count = ++ this . _rowCount ) ) ;
113
112
if ( dataRow ) {
114
- this . emit ( "record" , dataRow , ( count = this . _rowCount ++ ) ) ;
115
- if ( emitData ) {
116
- this . push ( JSON . stringify ( dataRow ) ) ;
113
+ if ( ! this . paused ) {
114
+ this . __emitRecord ( dataRow , count ) ;
115
+ } else {
116
+ this . __buffered . push ( [ dataRow , count ] ) ;
117
117
}
118
+ } else {
119
+ count = -- this . _rowCount ;
118
120
}
119
121
}
120
122
}
@@ -124,6 +126,13 @@ extended(ParserStream).extend({
124
126
return ret ;
125
127
} ,
126
128
129
+ __emitRecord : function ( dataRow , count ) {
130
+ this . emit ( "record" , dataRow , count ) ;
131
+ if ( this . _emitData ) {
132
+ this . push ( JSON . stringify ( dataRow ) ) ;
133
+ }
134
+ } ,
135
+
127
136
_transform : function ( data , encoding , done ) {
128
137
var lines = this . lines ;
129
138
var lineData = ( lines + data ) ;
@@ -142,7 +151,8 @@ extended(ParserStream).extend({
142
151
if ( this . lines ) {
143
152
this . _parse ( this . lines , false ) ;
144
153
}
145
- this . emit ( "end" , this . _rowCount ) ;
154
+ //increment row count so we aren't 0 based
155
+ this . emit ( "end" , ++ this . _rowCount ) ;
146
156
callback ( ) ;
147
157
} ,
148
158
@@ -163,6 +173,19 @@ extended(ParserStream).extend({
163
173
resume : function ( ) {
164
174
if ( this . paused ) {
165
175
this . paused = false ;
176
+ var buffered = this . __buffered , l = buffered . length ;
177
+ if ( l ) {
178
+ var i = - 1 , entry ;
179
+ while ( ++ i < buffered . length ) {
180
+ entry = buffered . shift ( ) ;
181
+ this . __emitRecord ( entry [ 0 ] , entry [ 1 ] ) ;
182
+ //handle case where paused is called while emitting data
183
+ if ( this . paused ) {
184
+ return ;
185
+ }
186
+ }
187
+ buffered . length = 0 ;
188
+ }
166
189
if ( this . __pausedDone ) {
167
190
var done = this . __pausedDone ;
168
191
this . __pausedDone = null ;
0 commit comments