Skip to content

Commit 95b8db6

Browse files
committed
Support authentication via netrc
`vcs_base.load_url()` currently doesn't support authentication. Add support for both basic and token-based authentication by parsing netrc files. Signed-off-by: Kyle Fazzari <[email protected]>
1 parent 4e0c2f4 commit 95b8db6

File tree

3 files changed

+183
-2
lines changed

3 files changed

+183
-2
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ matrix:
1212
install:
1313
# newer versions of PyYAML dropped support for Python 3.4
1414
- if [ $TRAVIS_PYTHON_VERSION == "3.4" ]; then pip install PyYAML==5.2; fi
15-
- pip install coverage flake8 flake8-docstrings flake8-import-order pytest PyYAML
15+
- pip install coverage flake8 flake8-docstrings flake8-import-order pytest PyYAML mock
1616
script:
1717
- PYTHONPATH=`pwd` pytest -s -v test
1818
notifications:

test/test_base.py

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import os
2+
import shutil
3+
import tempfile
4+
import unittest
5+
6+
try:
7+
from urllib.error import HTTPError
8+
except ImportError:
9+
from urllib2 import HTTPError
10+
11+
12+
try:
13+
from unittest import mock
14+
except ImportError:
15+
import mock
16+
17+
from vcstool.clients import vcs_base
18+
19+
20+
class TestBase(unittest.TestCase):
21+
22+
@mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True)
23+
@mock.patch('vcstool.clients.vcs_base._netrc_open', autospec=True)
24+
def test_load_url_calls_urlopen(self, netrc_open_mock, urlopen_mock):
25+
urlopen_read_mock = urlopen_mock.return_value.read
26+
27+
vcs_base.load_url('example.com', timeout=123)
28+
29+
urlopen_mock.assert_called_once_with('example.com', timeout=123)
30+
urlopen_read_mock.assert_called_once_with()
31+
self.assertFalse(netrc_open_mock.mock_calls)
32+
33+
@mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True)
34+
@mock.patch('vcstool.clients.vcs_base._netrc_open', autospec=True)
35+
def test_load_url_calls_netrc_open(self, netrc_open_mock, urlopen_mock):
36+
for code in (401, 404):
37+
urlopen_mock.side_effect = HTTPError(None, code, None, None, None)
38+
urlopen_read_mock = urlopen_mock.return_value.read
39+
40+
vcs_base.load_url('example.com', timeout=123)
41+
42+
urlopen_mock.assert_called_once_with('example.com', timeout=123)
43+
self.assertFalse(urlopen_read_mock.mock_calls)
44+
45+
netrc_open_mock.assert_called_once_with('example.com', timeout=123)
46+
47+
netrc_open_mock.reset_mock()
48+
urlopen_mock.reset_mock()
49+
50+
def test_netrc_open_no_such_file(self):
51+
try:
52+
self.assertEqual(vcs_base._netrc_open(
53+
'https://example.com', filename='/non-existent'), None)
54+
except Exception:
55+
self.fail(
56+
'The lack of a .netrc file should not result in an exception')
57+
58+
@mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True)
59+
@mock.patch('vcstool.clients.vcs_base.build_opener', autospec=True)
60+
def test_netrc_open_basic_auth(self, build_opener_mock, urlopen_mock):
61+
open_mock = build_opener_mock.return_value.open
62+
63+
tmpdir = tempfile.mkdtemp()
64+
netrc_file = os.path.join(tmpdir, 'netrc')
65+
machine = 'example.com'
66+
with open(netrc_file, 'w') as f:
67+
f.write('machine %s\n' % machine)
68+
f.write('login username\n')
69+
f.write('password password')
70+
71+
url = 'https://%s/foo/bar' % machine
72+
try:
73+
vcs_base._netrc_open(url, filename=netrc_file, timeout=123)
74+
finally:
75+
shutil.rmtree(tmpdir)
76+
77+
self.assertFalse(urlopen_mock.mock_calls)
78+
79+
class _HTTPBasicAuthHandlerMatcher(object):
80+
def __init__(self, test):
81+
self.test = test
82+
83+
def __eq__(self, other):
84+
manager = other.passwd
85+
self.test.assertEqual(
86+
manager.find_user_password(None, 'example.com'),
87+
('username', 'password'))
88+
return True
89+
90+
build_opener_mock.assert_called_once_with(
91+
_HTTPBasicAuthHandlerMatcher(self))
92+
open_mock.assert_called_once_with(url, timeout=123)
93+
94+
@mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True)
95+
@mock.patch('vcstool.clients.vcs_base.build_opener', autospec=True)
96+
def test_netrc_open_token_auth(self, build_opener_mock, urlopen_mock):
97+
tmpdir = tempfile.mkdtemp()
98+
netrc_file = os.path.join(tmpdir, 'netrc')
99+
machine = 'example.com'
100+
with open(netrc_file, 'w') as f:
101+
f.write('machine %s\n' % machine)
102+
f.write('password password')
103+
104+
url = 'https://%s/foo/bar' % machine
105+
try:
106+
vcs_base._netrc_open(url, filename=netrc_file, timeout=123)
107+
finally:
108+
shutil.rmtree(tmpdir)
109+
110+
self.assertFalse(build_opener_mock.mock_calls)
111+
112+
class _RequestMatcher(object):
113+
def __init__(self, test):
114+
self.test = test
115+
116+
def __eq__(self, other):
117+
self.test.assertEqual(other.get_full_url(), url)
118+
self.test.assertEqual(
119+
other.get_header('Private-token'), 'password')
120+
return True
121+
122+
urlopen_mock.assert_called_once_with(
123+
_RequestMatcher(self), timeout=123)

vcstool/clients/vcs_base.py

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,28 @@
1+
import errno
2+
import logging
3+
import netrc
14
import os
25
import socket
36
import subprocess
47
import time
58
try:
69
from urllib.request import Request
710
from urllib.request import urlopen
11+
from urllib.request import HTTPPasswordMgrWithDefaultRealm
12+
from urllib.request import HTTPBasicAuthHandler
13+
from urllib.request import build_opener
14+
from urllib.parse import urlparse
815
from urllib.error import HTTPError
916
from urllib.error import URLError
1017
except ImportError:
1118
from urllib2 import HTTPError
1219
from urllib2 import Request
1320
from urllib2 import URLError
1421
from urllib2 import urlopen
22+
from urllib2 import HTTPPasswordMgrWithDefaultRealm
23+
from urllib2 import HTTPBasicAuthHandler
24+
from urllib2 import build_opener
25+
from urlparse import urlparse
1526

1627
try:
1728
from shutil import which # noqa
@@ -91,7 +102,7 @@ def run_command(cmd, cwd, env=None):
91102

92103
def load_url(url, retry=2, retry_period=1, timeout=10):
93104
try:
94-
fh = urlopen(url, timeout=timeout)
105+
fh = _urlopen_netrc(url, timeout=timeout)
95106
except HTTPError as e:
96107
if e.code == 503 and retry:
97108
time.sleep(retry_period)
@@ -132,3 +143,50 @@ def test_url(url, retry=2, retry_period=1, timeout=10):
132143
timeout=timeout)
133144
raise URLError(str(e) + ' (%s)' % url)
134145
return response
146+
147+
148+
def _urlopen_netrc(uri, timeout=None):
149+
try:
150+
return urlopen(uri, timeout=timeout)
151+
except HTTPError as e:
152+
if e.code in (401, 404):
153+
# Try again with netrc credentials
154+
result = _netrc_open(uri, timeout=timeout)
155+
if result is not None:
156+
return result
157+
raise
158+
159+
160+
def _netrc_open(uri, filename=None, timeout=None):
161+
parsed_uri = urlparse(uri)
162+
machine = parsed_uri.netloc
163+
if not machine:
164+
return None
165+
166+
opener = None
167+
try:
168+
info = netrc.netrc(filename).authenticators(machine)
169+
if info is None:
170+
# caught below, like other netrc parse errors
171+
raise netrc.NetrcParseError('No authenticators for "%s"' % machine)
172+
173+
(username, _, password) = info
174+
if username and password:
175+
pass_man = HTTPPasswordMgrWithDefaultRealm()
176+
pass_man.add_password(None, machine, username, password)
177+
authhandler = HTTPBasicAuthHandler(pass_man)
178+
opener = build_opener(authhandler)
179+
return opener.open(uri, timeout=timeout)
180+
elif password:
181+
request = Request(uri)
182+
request.add_header('PRIVATE-TOKEN', password)
183+
return urlopen(request, timeout=timeout)
184+
except EnvironmentError as e:
185+
# Don't error just because the user doesn't have a .netrc file
186+
if e.errno != errno.ENOENT:
187+
raise
188+
except netrc.NetrcParseError as neterr:
189+
logging.getLogger(__name__).warn(
190+
'WARNING: parsing .netrc: %s' % str(neterr))
191+
192+
return None

0 commit comments

Comments
 (0)