Skip to content

Commit 042fc24

Browse files
committed
Support authentication via netrc
`vcs_base.load_url()` currently doesn't support authentication. Add support for both basic and token-based authentication by parsing netrc files. Signed-off-by: Kyle Fazzari <[email protected]>
1 parent 4e0c2f4 commit 042fc24

File tree

3 files changed

+184
-2
lines changed

3 files changed

+184
-2
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ matrix:
1212
install:
1313
# newer versions of PyYAML dropped support for Python 3.4
1414
- if [ $TRAVIS_PYTHON_VERSION == "3.4" ]; then pip install PyYAML==5.2; fi
15-
- pip install coverage flake8 flake8-docstrings flake8-import-order pytest PyYAML
15+
- pip install coverage flake8 flake8-docstrings flake8-import-order pytest PyYAML mock
1616
script:
1717
- PYTHONPATH=`pwd` pytest -s -v test
1818
notifications:

test/test_base.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import os
2+
import shutil
3+
import tempfile
4+
import unittest
5+
6+
try:
7+
from urllib.error import HTTPError
8+
except ImportError:
9+
from urllib2 import HTTPError
10+
11+
12+
try:
13+
from unittest import mock
14+
except ImportError:
15+
import mock
16+
17+
from vcstool.clients import vcs_base
18+
19+
20+
class TestBase(unittest.TestCase):
21+
22+
@mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True)
23+
@mock.patch('vcstool.clients.vcs_base._netrc_open', autospec=True)
24+
def test_load_url_calls_urlopen(self, netrc_open_mock, urlopen_mock):
25+
urlopen_read_mock = urlopen_mock.return_value.read
26+
27+
vcs_base.load_url('example.com', timeout=123)
28+
29+
urlopen_mock.assert_called_once_with('example.com', timeout=123)
30+
urlopen_read_mock.assert_called_once_with()
31+
self.assertFalse(netrc_open_mock.mock_calls)
32+
33+
@mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True)
34+
@mock.patch('vcstool.clients.vcs_base._netrc_open', autospec=True)
35+
def test_load_url_calls_netrc_open(self, netrc_open_mock, urlopen_mock):
36+
for code in (401, 404):
37+
urlopen_mock.side_effect = [
38+
HTTPError(None, code, None, None, None)]
39+
urlopen_read_mock = urlopen_mock.return_value.read
40+
41+
vcs_base.load_url('example.com', timeout=123)
42+
43+
urlopen_mock.assert_called_once_with('example.com', timeout=123)
44+
self.assertFalse(urlopen_read_mock.mock_calls)
45+
46+
netrc_open_mock.assert_called_once_with('example.com', timeout=123)
47+
48+
netrc_open_mock.reset_mock()
49+
urlopen_mock.reset_mock()
50+
51+
def test_netrc_open_no_such_file(self):
52+
try:
53+
self.assertEqual(vcs_base._netrc_open(
54+
'https://example.com', filename='/non-existent'), None)
55+
except Exception:
56+
self.fail(
57+
'The lack of a .netrc file should not result in an exception')
58+
59+
@mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True)
60+
@mock.patch('vcstool.clients.vcs_base.build_opener', autospec=True)
61+
def test_netrc_open_basic_auth(self, build_opener_mock, urlopen_mock):
62+
open_mock = build_opener_mock.return_value.open
63+
64+
tmpdir = tempfile.mkdtemp()
65+
netrc_file = os.path.join(tmpdir, 'netrc')
66+
machine = 'example.com'
67+
with open(netrc_file, 'w') as f:
68+
f.write('machine %s\n' % machine)
69+
f.write('login username\n')
70+
f.write('password password')
71+
72+
url = 'https://%s/foo/bar' % machine
73+
try:
74+
vcs_base._netrc_open(url, filename=netrc_file, timeout=123)
75+
finally:
76+
shutil.rmtree(tmpdir)
77+
78+
self.assertFalse(urlopen_mock.mock_calls)
79+
80+
class _HTTPBasicAuthHandlerMatcher(object):
81+
def __init__(self, test):
82+
self.test = test
83+
84+
def __eq__(self, other):
85+
manager = other.passwd
86+
self.test.assertEqual(
87+
manager.find_user_password(None, 'example.com'),
88+
('username', 'password'))
89+
return True
90+
91+
build_opener_mock.assert_called_once_with(
92+
_HTTPBasicAuthHandlerMatcher(self))
93+
open_mock.assert_called_once_with(url, timeout=123)
94+
95+
@mock.patch('vcstool.clients.vcs_base.urlopen', autospec=True)
96+
@mock.patch('vcstool.clients.vcs_base.build_opener', autospec=True)
97+
def test_netrc_open_token_auth(self, build_opener_mock, urlopen_mock):
98+
tmpdir = tempfile.mkdtemp()
99+
netrc_file = os.path.join(tmpdir, 'netrc')
100+
machine = 'example.com'
101+
with open(netrc_file, 'w') as f:
102+
f.write('machine %s\n' % machine)
103+
f.write('password password')
104+
105+
url = 'https://%s/foo/bar' % machine
106+
try:
107+
vcs_base._netrc_open(url, filename=netrc_file, timeout=123)
108+
finally:
109+
shutil.rmtree(tmpdir)
110+
111+
self.assertFalse(build_opener_mock.mock_calls)
112+
113+
class _RequestMatcher(object):
114+
def __init__(self, test):
115+
self.test = test
116+
117+
def __eq__(self, other):
118+
self.test.assertEqual(other.get_full_url(), url)
119+
self.test.assertEqual(
120+
other.get_header('Private-token'), 'password')
121+
return True
122+
123+
urlopen_mock.assert_called_once_with(
124+
_RequestMatcher(self), timeout=123)

vcstool/clients/vcs_base.py

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,28 @@
1+
import errno
2+
import logging
3+
import netrc
14
import os
25
import socket
36
import subprocess
47
import time
58
try:
69
from urllib.request import Request
710
from urllib.request import urlopen
11+
from urllib.request import HTTPPasswordMgrWithDefaultRealm
12+
from urllib.request import HTTPBasicAuthHandler
13+
from urllib.request import build_opener
14+
from urllib.parse import urlparse
815
from urllib.error import HTTPError
916
from urllib.error import URLError
1017
except ImportError:
1118
from urllib2 import HTTPError
1219
from urllib2 import Request
1320
from urllib2 import URLError
1421
from urllib2 import urlopen
22+
from urllib2 import HTTPPasswordMgrWithDefaultRealm
23+
from urllib2 import HTTPBasicAuthHandler
24+
from urllib2 import build_opener
25+
from urlparse import urlparse
1526

1627
try:
1728
from shutil import which # noqa
@@ -91,7 +102,7 @@ def run_command(cmd, cwd, env=None):
91102

92103
def load_url(url, retry=2, retry_period=1, timeout=10):
93104
try:
94-
fh = urlopen(url, timeout=timeout)
105+
fh = _urlopen_netrc(url, timeout=timeout)
95106
except HTTPError as e:
96107
if e.code == 503 and retry:
97108
time.sleep(retry_period)
@@ -132,3 +143,50 @@ def test_url(url, retry=2, retry_period=1, timeout=10):
132143
timeout=timeout)
133144
raise URLError(str(e) + ' (%s)' % url)
134145
return response
146+
147+
148+
def _urlopen_netrc(uri, timeout=None):
149+
try:
150+
return urlopen(uri, timeout=timeout)
151+
except HTTPError as e:
152+
if e.code in (401, 404):
153+
# Try again with netrc credentials
154+
result = _netrc_open(uri, timeout=timeout)
155+
if result is not None:
156+
return result
157+
raise
158+
159+
160+
def _netrc_open(uri, filename=None, timeout=None):
161+
parsed_uri = urlparse(uri)
162+
machine = parsed_uri.netloc
163+
if not machine:
164+
return None
165+
166+
opener = None
167+
try:
168+
info = netrc.netrc(filename).authenticators(machine)
169+
if info is None:
170+
# caught below, like other netrc parse errors
171+
raise netrc.NetrcParseError('No authenticators for "%s"' % machine)
172+
173+
(username, _, password) = info
174+
if username and password:
175+
pass_man = HTTPPasswordMgrWithDefaultRealm()
176+
pass_man.add_password(None, machine, username, password)
177+
authhandler = HTTPBasicAuthHandler(pass_man)
178+
opener = build_opener(authhandler)
179+
return opener.open(uri, timeout=timeout)
180+
elif password:
181+
request = Request(uri)
182+
request.add_header('PRIVATE-TOKEN', password)
183+
return urlopen(request, timeout=timeout)
184+
except EnvironmentError as e:
185+
# Don't error just because the user doesn't have a .netrc file
186+
if e.errno != errno.ENOENT:
187+
raise
188+
except netrc.NetrcParseError as neterr:
189+
logging.getLogger(__name__).warn(
190+
'WARNING: parsing .netrc: %s' % str(neterr))
191+
192+
return None

0 commit comments

Comments
 (0)