@@ -133,15 +133,16 @@ const (
133133 ReadCommitted IsolationLevel = 1
134134)
135135
136- // DefaultClientID is the default value used as ClientID of kafka
137- // connections.
138- var DefaultClientID string
136+ var (
137+ // DefaultClientID is the default value used as ClientID of kafka
138+ // connections.
139+ DefaultClientID string
140+ )
139141
140142func init () {
141143 progname := filepath .Base (os .Args [0 ])
142144 hostname , _ := os .Hostname ()
143145 DefaultClientID = fmt .Sprintf ("%s@%s (github.com/segmentio/kafka-go)" , progname , hostname )
144- DefaultTransport .(* Transport ).ClientID = DefaultClientID
145146}
146147
147148// NewConn returns a new kafka connection for the given topic and partition.
@@ -262,12 +263,10 @@ func (c *Conn) Controller() (broker Broker, err error) {
262263 }
263264 for _ , brokerMeta := range res .Brokers {
264265 if brokerMeta .NodeID == res .ControllerID {
265- broker = Broker {
266- ID : int (brokerMeta .NodeID ),
266+ broker = Broker {ID : int (brokerMeta .NodeID ),
267267 Port : int (brokerMeta .Port ),
268268 Host : brokerMeta .Host ,
269- Rack : brokerMeta .Rack ,
270- }
269+ Rack : brokerMeta .Rack }
271270 break
272271 }
273272 }
@@ -323,6 +322,7 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato
323322 err := c .readOperation (
324323 func (deadline time.Time , id int32 ) error {
325324 return c .writeRequest (findCoordinator , v0 , id , request )
325+
326326 },
327327 func (deadline time.Time , size int ) error {
328328 return expectZeroSize (func () (remain int , err error ) {
@@ -340,6 +340,32 @@ func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinato
340340 return response , nil
341341}
342342
343+ // heartbeat sends a heartbeat message required by consumer groups
344+ //
345+ // See http://kafka.apache.org/protocol.html#The_Messages_Heartbeat
346+ func (c * Conn ) heartbeat (request heartbeatRequestV0 ) (heartbeatResponseV0 , error ) {
347+ var response heartbeatResponseV0
348+
349+ err := c .writeOperation (
350+ func (deadline time.Time , id int32 ) error {
351+ return c .writeRequest (heartbeat , v0 , id , request )
352+ },
353+ func (deadline time.Time , size int ) error {
354+ return expectZeroSize (func () (remain int , err error ) {
355+ return (& response ).readFrom (& c .rbuf , size )
356+ }())
357+ },
358+ )
359+ if err != nil {
360+ return heartbeatResponseV0 {}, err
361+ }
362+ if response .ErrorCode != 0 {
363+ return heartbeatResponseV0 {}, Error (response .ErrorCode )
364+ }
365+
366+ return response , nil
367+ }
368+
343369// joinGroup attempts to join a consumer group
344370//
345371// See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup
@@ -726,8 +752,9 @@ func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch {
726752// ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured
727753// with the default values in ReadBatchConfig except for minBytes and maxBytes.
728754func (c * Conn ) ReadBatchWith (cfg ReadBatchConfig ) * Batch {
755+
729756 var adjustedDeadline time.Time
730- maxFetch : = int (c .fetchMaxBytes )
757+ var maxFetch = int (c .fetchMaxBytes )
731758
732759 if cfg .MinBytes < 0 || cfg .MinBytes > maxFetch {
733760 return & Batch {err : fmt .Errorf ("kafka.(*Conn).ReadBatch: minBytes of %d out of [1,%d] bounds" , cfg .MinBytes , maxFetch )}
@@ -933,6 +960,7 @@ func (c *Conn) readOffset(t int64) (offset int64, err error) {
933960// connection. If there are none, the method fetches all partitions of the kafka
934961// cluster.
935962func (c * Conn ) ReadPartitions (topics ... string ) (partitions []Partition , err error ) {
963+
936964 if len (topics ) == 0 {
937965 if len (c .topic ) != 0 {
938966 defaultTopics := [... ]string {c .topic }
@@ -1132,10 +1160,11 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
11321160 deadline = adjustDeadlineForRTT (deadline , now , defaultRTT )
11331161 switch produceVersion {
11341162 case v7 :
1135- recordBatch , err := newRecordBatch (
1136- codec ,
1137- msgs ... ,
1138- )
1163+ recordBatch , err :=
1164+ newRecordBatch (
1165+ codec ,
1166+ msgs ... ,
1167+ )
11391168 if err != nil {
11401169 return err
11411170 }
@@ -1150,10 +1179,11 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
11501179 recordBatch ,
11511180 )
11521181 case v3 :
1153- recordBatch , err := newRecordBatch (
1154- codec ,
1155- msgs ... ,
1156- )
1182+ recordBatch , err :=
1183+ newRecordBatch (
1184+ codec ,
1185+ msgs ... ,
1186+ )
11571187 if err != nil {
11581188 return err
11591189 }
@@ -1218,6 +1248,7 @@ func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message)
12181248 }
12191249 return size , err
12201250 }
1251+
12211252 })
12221253 if err != nil {
12231254 return size , err
@@ -1577,7 +1608,7 @@ func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) {
15771608 return nil , err
15781609 }
15791610 if version == v1 {
1580- request : = saslAuthenticateRequestV0 {Data : data }
1611+ var request = saslAuthenticateRequestV0 {Data : data }
15811612 var response saslAuthenticateResponseV0
15821613
15831614 err := c .writeOperation (
0 commit comments