@@ -27,9 +27,13 @@ type FileWriter struct {
27
27
name string
28
28
replication int
29
29
blockSize int64
30
+ offset int64
30
31
31
32
blockWriter * transfer.BlockWriter
32
33
deadline time.Time
34
+
35
+ // Key and IV for transparent encryption support.
36
+ enc * transparentEncryptionInfo
33
37
}
34
38
35
39
// Create opens a new file in HDFS with the default replication, block size,
@@ -62,13 +66,14 @@ func (c *Client) Create(name string) (*FileWriter, error) {
62
66
// very important that Close is called after all data has been written.
63
67
func (c * Client ) CreateFile (name string , replication int , blockSize int64 , perm os.FileMode ) (* FileWriter , error ) {
64
68
createReq := & hdfs.CreateRequestProto {
65
- Src : proto .String (name ),
66
- Masked : & hdfs.FsPermissionProto {Perm : proto .Uint32 (uint32 (perm ))},
67
- ClientName : proto .String (c .namenode .ClientName ),
68
- CreateFlag : proto .Uint32 (1 ),
69
- CreateParent : proto .Bool (false ),
70
- Replication : proto .Uint32 (uint32 (replication )),
71
- BlockSize : proto .Uint64 (uint64 (blockSize )),
69
+ Src : proto .String (name ),
70
+ Masked : & hdfs.FsPermissionProto {Perm : proto .Uint32 (uint32 (perm ))},
71
+ ClientName : proto .String (c .namenode .ClientName ),
72
+ CreateFlag : proto .Uint32 (1 ),
73
+ CreateParent : proto .Bool (false ),
74
+ Replication : proto .Uint32 (uint32 (replication )),
75
+ BlockSize : proto .Uint64 (uint64 (blockSize )),
76
+ CryptoProtocolVersion : []hdfs.CryptoProtocolVersionProto {hdfs .CryptoProtocolVersionProto_ENCRYPTION_ZONES },
72
77
}
73
78
createResp := & hdfs.CreateResponseProto {}
74
79
@@ -77,11 +82,21 @@ func (c *Client) CreateFile(name string, replication int, blockSize int64, perm
77
82
return nil , & os.PathError {"create" , name , interpretCreateException (err )}
78
83
}
79
84
85
+ var enc * transparentEncryptionInfo
86
+ if createResp .GetFs ().GetFileEncryptionInfo () != nil {
87
+ enc , err = c .kmsGetKey (createResp .GetFs ().GetFileEncryptionInfo ())
88
+ if err != nil {
89
+ c .Remove (name )
90
+ return nil , & os.PathError {"create" , name , err }
91
+ }
92
+ }
93
+
80
94
return & FileWriter {
81
95
client : c ,
82
96
name : name ,
83
97
replication : replication ,
84
98
blockSize : blockSize ,
99
+ enc : enc ,
85
100
}, nil
86
101
}
87
102
@@ -106,11 +121,21 @@ func (c *Client) Append(name string) (*FileWriter, error) {
106
121
return nil , & os.PathError {"append" , name , interpretException (err )}
107
122
}
108
123
124
+ var enc * transparentEncryptionInfo
125
+ if appendResp .GetStat ().GetFileEncryptionInfo () != nil {
126
+ enc , err = c .kmsGetKey (appendResp .GetStat ().GetFileEncryptionInfo ())
127
+ if err != nil {
128
+ return nil , & os.PathError {"append" , name , err }
129
+ }
130
+ }
131
+
109
132
f := & FileWriter {
110
133
client : c ,
111
134
name : name ,
112
135
replication : int (appendResp .Stat .GetBlockReplication ()),
113
136
blockSize : int64 (appendResp .Stat .GetBlocksize ()),
137
+ offset : int64 (* appendResp .Stat .Length ),
138
+ enc : enc ,
114
139
}
115
140
116
141
// This returns nil if there are no blocks (it's an empty file) or if the
@@ -176,6 +201,28 @@ func (f *FileWriter) SetDeadline(t time.Time) error {
176
201
// of this, it is important that Close is called after all data has been
177
202
// written.
178
203
func (f * FileWriter ) Write (b []byte ) (int , error ) {
204
+ // Encrypt data chunk if file in HDFS encrypted zone.
205
+ if f .enc != nil && len (b ) > 0 {
206
+ var offset int
207
+ for offset < len (b ) {
208
+ size := min (len (b )- offset , aesChunkSize )
209
+ ciphertext , err := aesCtrStep (f .offset , f .enc , b [offset :offset + size ])
210
+ if err != nil {
211
+ return offset , err
212
+ }
213
+ writtenSize , err := f .writeImpl (ciphertext )
214
+ offset += writtenSize
215
+ if err != nil {
216
+ return offset , err
217
+ }
218
+ }
219
+ return offset , nil
220
+ } else {
221
+ return f .writeImpl (b )
222
+ }
223
+ }
224
+
225
+ func (f * FileWriter ) writeImpl (b []byte ) (int , error ) {
179
226
if f .blockWriter == nil {
180
227
err := f .startNewBlock ()
181
228
if err != nil {
@@ -187,6 +234,7 @@ func (f *FileWriter) Write(b []byte) (int, error) {
187
234
for off < len (b ) {
188
235
n , err := f .blockWriter .Write (b [off :])
189
236
off += n
237
+ f .offset += int64 (n )
190
238
if err == transfer .ErrEndOfBlock {
191
239
err = f .startNewBlock ()
192
240
}
@@ -316,3 +364,10 @@ func (f *FileWriter) finalizeBlock() error {
316
364
f .blockWriter = nil
317
365
return nil
318
366
}
367
+
368
+ func min (a , b int ) int {
369
+ if a < b {
370
+ return a
371
+ }
372
+ return b
373
+ }
0 commit comments