Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Goblin OGM
AIO Gremlin
Commits
ff2ea9b7
Commit
ff2ea9b7
authored
Sep 18, 2015
by
davebshow
Browse files
working on new auth. broken.
parent
8c84d442
Changes
5
Hide whitespace changes
Inline
Side-by-side
aiogremlin/client.py
View file @
ff2ea9b7
...
...
@@ -28,15 +28,14 @@ class GremlinClient:
:param str processor: Gremlin Server processor argument. "" by default.
:param float timeout: timeout for establishing connection (optional).
Values ``0`` or ``None`` mean no timeout
:param connector: A class that implements the method ``ws_connect``.
:param
ws_
connector: A class that implements the method ``ws_connect``.
Usually an instance of ``aiogremlin.connector.GremlinConnector``
"""
def
__init__
(
self
,
*
,
url
=
'http://localhost:8182/'
,
loop
=
None
,
lang
=
"gremlin-groovy"
,
op
=
"eval"
,
processor
=
""
,
timeout
=
None
,
ws_connector
=
None
,
connector
=
None
):
"""
"""
timeout
=
None
,
ws_connector
=
None
,
connector
=
None
,
username
=
""
,
password
=
""
):
self
.
_lang
=
lang
self
.
_op
=
op
self
.
_processor
=
processor
...
...
@@ -45,6 +44,8 @@ class GremlinClient:
self
.
_session
=
None
self
.
_url
=
url
self
.
_timeout
=
timeout
self
.
_username
=
username
self
.
_password
=
password
if
connector
is
None
:
connector
=
aiohttp
.
TCPConnector
(
verify_ssl
=
False
,
loop
=
self
.
_loop
)
client_session
=
aiohttp
.
ClientSession
(
connector
=
connector
,
...
...
@@ -146,12 +147,14 @@ class GremlinClient:
writer
=
GremlinWriter
(
ws
)
ws
=
writer
.
write
(
gremlin
,
bindings
=
bindings
,
lang
=
lang
,
ws
=
writer
.
write
(
gremlin
=
gremlin
,
bindings
=
bindings
,
lang
=
lang
,
rebindings
=
rebindings
,
op
=
op
,
processor
=
processor
,
binary
=
binary
,
session
=
session
)
return
GremlinResponse
(
ws
,
session
=
session
,
loop
=
self
.
_loop
)
return
GremlinResponse
(
ws
,
username
=
self
.
_username
,
password
=
self
.
_password
,
session
=
session
,
loop
=
self
.
_loop
)
@
asyncio
.
coroutine
def
execute
(
self
,
gremlin
,
*
,
bindings
=
None
,
lang
=
None
,
rebindings
=
None
,
...
...
@@ -252,10 +255,12 @@ class GremlinResponse:
`asyncio.get_event_loop` is used for getting default event loop
(optional)
"""
def
__init__
(
self
,
ws
,
*
,
session
=
None
,
loop
=
None
):
def
__init__
(
self
,
ws
,
*
,
session
=
None
,
loop
=
None
,
username
=
""
,
password
=
""
):
self
.
_loop
=
loop
or
asyncio
.
get_event_loop
()
self
.
_session
=
session
self
.
_stream
=
GremlinResponseStream
(
ws
,
loop
=
self
.
_loop
)
self
.
_stream
=
GremlinResponseStream
(
ws
,
username
,
password
,
loop
=
self
.
_loop
)
@
property
def
stream
(
self
):
...
...
@@ -282,6 +287,7 @@ class GremlinResponse:
@
asyncio
.
coroutine
def
_run
(
self
):
import
ipdb
;
ipdb
.
set_trace
()
results
=
[]
while
True
:
message
=
yield
from
self
.
_stream
.
read
()
...
...
@@ -302,8 +308,10 @@ class GremlinResponseStream:
`asyncio.get_event_loop` is used for getting default event loop
(optional)
"""
def
__init__
(
self
,
ws
,
loop
=
None
):
def
__init__
(
self
,
ws
,
username
,
password
,
loop
=
None
):
self
.
_ws
=
ws
self
.
_username
=
username
self
.
_password
=
password
self
.
_loop
=
loop
or
asyncio
.
get_event_loop
()
data_stream
=
aiohttp
.
DataQueue
(
loop
=
self
.
_loop
)
self
.
_stream
=
self
.
_ws
.
parser
.
set_parser
(
gremlin_response_parser
,
...
...
@@ -325,6 +333,10 @@ class GremlinResponseStream:
asyncio
.
Task
(
self
.
_ws
.
receive
(),
loop
=
self
.
_loop
)
try
:
message
=
yield
from
self
.
_stream
.
read
()
if
message
.
status_code
==
407
:
writer
=
GremlinWriter
(
self
.
_ws
)
writer
.
write
(
op
=
"authentication"
,
username
=
self
.
_username
,
password
=
self
.
_password
)
except
(
RequestError
,
GremlinServerError
):
yield
from
self
.
_ws
.
release
()
raise
...
...
@@ -341,7 +353,9 @@ def submit(gremlin, *,
processor
=
""
,
timeout
=
None
,
session
=
None
,
loop
=
None
):
loop
=
None
,
username
=
""
,
password
=
""
):
"""
:ref:`coroutine<coroutine>`
...
...
@@ -377,7 +391,8 @@ def submit(gremlin, *,
ws_response_class
=
GremlinClientWebSocketResponse
)
gremlin_client
=
GremlinClient
(
url
=
url
,
loop
=
loop
,
ws_connector
=
client_session
)
ws_connector
=
client_session
,
username
=
username
,
password
=
password
)
try
:
resp
=
yield
from
gremlin_client
.
submit
(
...
...
aiogremlin/exceptions.py
View file @
ff2ea9b7
...
...
@@ -10,6 +10,8 @@ class StatusException(IOError):
"""
self
.
value
=
value
self
.
response
=
{
401
:
(
"UNAUTHORIZED"
,
(
"The request attempted to access resources "
+
"that the requesting user did not have access to"
)),
498
:
(
"MALFORMED_REQUEST"
,
(
"The request message was not properly formatted which "
+
"means it could not be parsed at all or the 'op' code "
+
...
...
aiogremlin/subprotocol.py
View file @
ff2ea9b7
...
...
@@ -29,7 +29,7 @@ def gremlin_response_parser(out, buf):
if
message
.
status_code
==
200
:
out
.
feed_data
(
message
)
out
.
feed_eof
()
elif
message
.
status_code
==
206
:
elif
message
.
status_code
==
206
or
message
.
status_code
==
407
:
out
.
feed_data
(
message
)
elif
message
.
status_code
==
204
:
out
.
feed_data
(
message
)
...
...
@@ -46,22 +46,28 @@ class GremlinWriter:
def
__init__
(
self
,
ws
):
self
.
ws
=
ws
def
write
(
self
,
gremlin
,
bindings
=
None
,
lang
=
"gremlin-groovy"
,
def
write
(
self
,
*
,
gremlin
=
""
,
bindings
=
None
,
lang
=
"gremlin-groovy"
,
rebindings
=
None
,
op
=
"eval"
,
processor
=
""
,
session
=
None
,
binary
=
True
,
mime_type
=
"application/json"
):
binary
=
True
,
mime_type
=
"application/json"
,
username
=
""
,
password
=
""
):
if
rebindings
is
None
:
rebindings
=
{}
message
=
self
.
_prepare_message
(
gremlin
,
bindings
,
lang
,
rebindings
,
op
,
processor
,
session
)
if
op
==
"eval"
:
message
=
self
.
_prepare_message
(
gremlin
,
bindings
,
lang
,
rebindings
,
op
,
processor
,
session
)
if
op
==
"authentication"
:
message
=
self
.
_authenticate
(
username
,
password
,
session
,
processor
)
message
=
json
.
dumps
(
message
)
if
binary
:
message
=
self
.
_set_message_header
(
message
,
mime_type
)
self
.
ws
.
send
(
message
,
binary
=
binary
)
print
(
message
)
return
self
.
ws
@
staticmethod
...
...
@@ -93,3 +99,22 @@ class GremlinWriter:
else
:
message
[
"args"
].
update
({
"session"
:
session
})
return
message
@
staticmethod
def
_authenticate
(
username
,
password
,
session
,
processor
):
auth_bytes
=
""
.
join
([
"0"
,
username
,
"0"
,
password
])
print
(
auth_bytes
)
message
=
{
"requestId"
:
str
(
uuid
.
uuid4
()),
"op"
:
"authentication"
,
"processor"
:
processor
,
"args"
:
{
"sasl"
:
auth_bytes
}
}
if
session
is
None
:
if
processor
==
"session"
:
raise
RuntimeError
(
"session processor requires a session id"
)
else
:
message
[
"args"
].
update
({
"session"
:
session
})
return
message
requirements.txt
View file @
ff2ea9b7
aiogremlin
==0.0.11
aiohttp
==0.16.5
aiowebsocketclient
==0.0.3
argparse
==1.2.1
...
...
tests/tests.py
View file @
ff2ea9b7
...
...
@@ -22,219 +22,220 @@ class SubmitTest(unittest.TestCase):
@
asyncio
.
coroutine
def
go
():
resp
=
yield
from
submit
(
"4 + 4"
,
bindings
=
{
"x"
:
4
},
loop
=
self
.
loop
)
resp
=
yield
from
submit
(
"4 + 4"
,
url
=
'https://localhost:8182/'
,
bindings
=
{
"x"
:
4
},
loop
=
self
.
loop
,
username
=
"stephen"
,
password
=
"password"
)
results
=
yield
from
resp
.
get
()
return
results
results
=
self
.
loop
.
run_until_complete
(
go
())
self
.
assertEqual
(
results
[
0
].
data
[
0
],
8
)
def
test_rebinding
(
self
):
@
asyncio
.
coroutine
def
go1
():
result
=
yield
from
submit
(
"graph2.addVertex()"
,
loop
=
self
.
loop
)
resp
=
yield
from
result
.
get
()
try
:
self
.
loop
.
run_until_complete
(
go1
())
error
=
False
except
GremlinServerError
:
error
=
True
self
.
assertTrue
(
error
)
@
asyncio
.
coroutine
def
go2
():
result
=
yield
from
submit
(
"graph2.addVertex()"
,
rebindings
=
{
"graph2"
:
"graph"
},
loop
=
self
.
loop
)
resp
=
yield
from
result
.
get
()
self
.
assertEqual
(
len
(
resp
),
1
)
try
:
self
.
loop
.
run_until_complete
(
go2
())
except
GremlinServerError
:
print
(
"RELEASE DOES NOT SUPPORT REBINDINGS"
)
class
GremlinClientTest
(
unittest
.
TestCase
):
def
setUp
(
self
):
self
.
loop
=
asyncio
.
new_event_loop
()
asyncio
.
set_event_loop
(
None
)
self
.
gc
=
GremlinClient
(
url
=
"
ws
://localhost:8182/"
,
loop
=
self
.
loop
)
def
tearDown
(
self
):
self
.
loop
.
run_until_complete
(
self
.
gc
.
close
())
self
.
loop
.
close
()
def
test_connection
(
self
):
@
asyncio
.
coroutine
def
go
():
ws
=
yield
from
self
.
gc
.
_connector
.
ws_connect
(
self
.
gc
.
url
)
self
.
assertFalse
(
ws
.
closed
)
yield
from
ws
.
close
()
self
.
loop
.
run_until_complete
(
go
())
def
test_execute
(
self
):
@
asyncio
.
coroutine
def
go
():
resp
=
yield
from
self
.
gc
.
execute
(
"x + x"
,
bindings
=
{
"x"
:
4
})
return
resp
results
=
self
.
loop
.
run_until_complete
(
go
())
self
.
assertEqual
(
results
[
0
].
data
[
0
],
8
)
def
test_sub_waitfor
(
self
):
sub1
=
self
.
gc
.
execute
(
"x + x"
,
bindings
=
{
"x"
:
1
})
sub2
=
self
.
gc
.
execute
(
"x + x"
,
bindings
=
{
"x"
:
2
})
sub3
=
self
.
gc
.
execute
(
"x + x"
,
bindings
=
{
"x"
:
4
})
coro
=
asyncio
.
gather
(
*
[
asyncio
.
async
(
sub1
,
loop
=
self
.
loop
),
asyncio
.
async
(
sub2
,
loop
=
self
.
loop
),
asyncio
.
async
(
sub3
,
loop
=
self
.
loop
)],
loop
=
self
.
loop
)
# Here I am looking for resource warnings.
results
=
self
.
loop
.
run_until_complete
(
coro
)
self
.
assertIsNotNone
(
results
)
def
test_resp_stream
(
self
):
@
asyncio
.
coroutine
def
stream_coro
():
results
=
[]
resp
=
yield
from
self
.
gc
.
submit
(
"x + x"
,
bindings
=
{
"x"
:
4
})
while
True
:
f
=
yield
from
resp
.
stream
.
read
()
if
f
is
None
:
break
results
.
append
(
f
)
self
.
assertEqual
(
results
[
0
].
data
[
0
],
8
)
self
.
loop
.
run_until_complete
(
stream_coro
())
def
test_execute_error
(
self
):
execute
=
self
.
gc
.
execute
(
"x + x g.asdfas"
,
bindings
=
{
"x"
:
4
})
try
:
self
.
loop
.
run_until_complete
(
execute
)
error
=
False
except
:
error
=
True
self
.
assertTrue
(
error
)
def
test_rebinding
(
self
):
execute
=
self
.
gc
.
execute
(
"graph2.addVertex()"
)
try
:
self
.
loop
.
run_until_complete
(
execute
)
error
=
False
except
GremlinServerError
:
error
=
True
self
.
assertTrue
(
error
)
@
asyncio
.
coroutine
def
go
():
result
=
yield
from
self
.
gc
.
execute
(
"graph2.addVertex()"
,
rebindings
=
{
"graph2"
:
"graph"
})
self
.
assertEqual
(
len
(
result
),
1
)
try
:
self
.
loop
.
run_until_complete
(
go
())
except
GremlinServerError
:
print
(
"RELEASE DOES NOT SUPPORT REBINDINGS"
)
class
GremlinClientSessionTest
(
unittest
.
TestCase
):
def
setUp
(
self
):
self
.
loop
=
asyncio
.
new_event_loop
()
asyncio
.
set_event_loop
(
None
)
self
.
gc
=
GremlinClientSession
(
url
=
"
ws
://localhost:8182/"
,
loop
=
self
.
loop
)
self
.
script1
=
"""v = graph.addVertex('name', 'Dave')"""
self
.
script2
=
"v.property('name')"
def
tearDown
(
self
):
self
.
loop
.
run_until_complete
(
self
.
gc
.
close
())
self
.
loop
.
close
()
def
test_session
(
self
):
@
asyncio
.
coroutine
def
go
():
yield
from
self
.
gc
.
execute
(
self
.
script1
)
results
=
yield
from
self
.
gc
.
execute
(
self
.
script2
)
return
results
results
=
self
.
loop
.
run_until_complete
(
go
())
self
.
assertEqual
(
results
[
0
].
data
[
0
][
'value'
],
'Dave'
)
def
test_session_reset
(
self
):
@
asyncio
.
coroutine
def
go
():
yield
from
self
.
gc
.
execute
(
self
.
script1
)
self
.
gc
.
reset_session
()
results
=
yield
from
self
.
gc
.
execute
(
self
.
script2
)
return
results
try
:
results
=
self
.
loop
.
run_until_complete
(
go
())
error
=
False
except
GremlinServerError
:
error
=
True
self
.
assertTrue
(
error
)
def
test_session_manual_reset
(
self
):
@
asyncio
.
coroutine
def
go
():
yield
from
self
.
gc
.
execute
(
self
.
script1
)
new_sess
=
str
(
uuid
.
uuid4
())
sess
=
self
.
gc
.
reset_session
(
session
=
new_sess
)
self
.
assertEqual
(
sess
,
new_sess
)
self
.
assertEqual
(
self
.
gc
.
session
,
new_sess
)
results
=
yield
from
self
.
gc
.
execute
(
self
.
script2
)
return
results
try
:
results
=
self
.
loop
.
run_until_complete
(
go
())
error
=
False
except
GremlinServerError
:
error
=
True
self
.
assertTrue
(
error
)
def
test_session_set
(
self
):
@
asyncio
.
coroutine
def
go
():
yield
from
self
.
gc
.
execute
(
self
.
script1
)
new_sess
=
str
(
uuid
.
uuid4
())
self
.
gc
.
session
=
new_sess
self
.
assertEqual
(
self
.
gc
.
session
,
new_sess
)
results
=
yield
from
self
.
gc
.
execute
(
self
.
script2
)
return
results
try
:
results
=
self
.
loop
.
run_until_complete
(
go
())
error
=
False
except
GremlinServerError
:
error
=
True
self
.
assertTrue
(
error
)
def
test_resp_session
(
self
):
@
asyncio
.
coroutine
def
go
():
session
=
str
(
uuid
.
uuid4
())
self
.
gc
.
session
=
session
resp
=
yield
from
self
.
gc
.
submit
(
"x + x"
,
bindings
=
{
"x"
:
4
})
while
True
:
f
=
yield
from
resp
.
stream
.
read
()
if
f
is
None
:
break
self
.
assertEqual
(
resp
.
session
,
session
)
self
.
loop
.
run_until_complete
(
go
())
#
def test_rebinding(self):
#
#
@asyncio.coroutine
#
def go1():
#
result = yield from submit("graph2.addVertex()", loop=self.loop)
#
resp = yield from result.get()
#
#
try:
#
self.loop.run_until_complete(go1())
#
error = False
#
except GremlinServerError:
#
error = True
#
self.assertTrue(error)
#
#
@asyncio.coroutine
#
def go2():
#
result = yield from submit(
#
"graph2.addVertex()", rebindings={"graph2": "graph"},
#
loop=self.loop)
#
resp = yield from result.get()
#
self.assertEqual(len(resp), 1)
#
#
try:
#
self.loop.run_until_complete(go2())
#
except GremlinServerError:
#
print("RELEASE DOES NOT SUPPORT REBINDINGS")
#
#
#
class GremlinClientTest(unittest.TestCase):
#
#
def setUp(self):
#
self.loop = asyncio.new_event_loop()
#
asyncio.set_event_loop(None)
#
self.gc = GremlinClient(url="
http
://localhost:8182/", loop=self.loop)
#
#
def tearDown(self):
#
self.loop.run_until_complete(self.gc.close())
#
self.loop.close()
#
#
def test_connection(self):
#
#
@asyncio.coroutine
#
def go():
#
ws = yield from self.gc._connector.ws_connect(self.gc.url)
#
self.assertFalse(ws.closed)
#
yield from ws.close()
#
#
self.loop.run_until_complete(go())
#
#
def test_execute(self):
#
#
@asyncio.coroutine
#
def go():
#
resp = yield from self.gc.execute("x + x", bindings={"x": 4})
#
return resp
#
#
results = self.loop.run_until_complete(go())
#
self.assertEqual(results[0].data[0], 8)
#
#
def test_sub_waitfor(self):
#
sub1 = self.gc.execute("x + x", bindings={"x": 1})
#
sub2 = self.gc.execute("x + x", bindings={"x": 2})
#
sub3 = self.gc.execute("x + x", bindings={"x": 4})
#
coro = asyncio.gather(*[asyncio.async(sub1, loop=self.loop),
#
asyncio.async(sub2, loop=self.loop),
#
asyncio.async(sub3, loop=self.loop)],
#
loop=self.loop)
#
# Here I am looking for resource warnings.
#
results = self.loop.run_until_complete(coro)
#
self.assertIsNotNone(results)
#
#
def test_resp_stream(self):
#
@asyncio.coroutine
#
def stream_coro():
#
results = []
#
resp = yield from self.gc.submit("x + x", bindings={"x": 4})
#
while True:
#
f = yield from resp.stream.read()
#
if f is None:
#
break
#
results.append(f)
#
self.assertEqual(results[0].data[0], 8)
#
self.loop.run_until_complete(stream_coro())
#
#
def test_execute_error(self):
#
execute = self.gc.execute("x + x g.asdfas", bindings={"x": 4})
#
try:
#
self.loop.run_until_complete(execute)
#
error = False
#
except:
#
error = True
#
self.assertTrue(error)
#
#
def test_rebinding(self):
#
execute = self.gc.execute("graph2.addVertex()")
#
try:
#
self.loop.run_until_complete(execute)
#
error = False
#
except GremlinServerError:
#
error = True
#
self.assertTrue(error)
#
#
@asyncio.coroutine
#
def go():
#
result = yield from self.gc.execute(
#
"graph2.addVertex()", rebindings={"graph2": "graph"})
#
self.assertEqual(len(result), 1)
#
#
try:
#
self.loop.run_until_complete(go())
#
except GremlinServerError:
#
print("RELEASE DOES NOT SUPPORT REBINDINGS")
#
#
#
class GremlinClientSessionTest(unittest.TestCase):
#
#
def setUp(self):
#
self.loop = asyncio.new_event_loop()
#
asyncio.set_event_loop(None)
#
self.gc = GremlinClientSession(url="
http
://localhost:8182/",
#
loop=self.loop)
#
self.script1 = """v = graph.addVertex('name', 'Dave')"""
#
#
self.script2 = "v.property('name')"
#
#
def tearDown(self):
#
self.loop.run_until_complete(self.gc.close())
#
self.loop.close()
#
#
def test_session(self):
#
#
@asyncio.coroutine
#
def go():
#
yield from self.gc.execute(self.script1)
#
results = yield from self.gc.execute(self.script2)
#
return results
#
#
results = self.loop.run_until_complete(go())
#
self.assertEqual(results[0].data[0]['value'], 'Dave')
#
#
def test_session_reset(self):
#
#
@asyncio.coroutine
#
def go():
#
yield from self.gc.execute(self.script1)
#
self.gc.reset_session()
#
results = yield from self.gc.execute(self.script2)
#
return results
#
try:
#
results = self.loop.run_until_complete(go())
#
error = False
#
except GremlinServerError:
#
error = True
#
self.assertTrue(error)
#
#
def test_session_manual_reset(self):
#
#
@asyncio.coroutine
#
def go():
#
yield from self.gc.execute(self.script1)
#
new_sess = str(uuid.uuid4())
#
sess = self.gc.reset_session(session=new_sess)
#
self.assertEqual(sess, new_sess)
#
self.assertEqual(self.gc.session, new_sess)
#
results = yield from self.gc.execute(self.script2)
#
return results
#
try:
#
results = self.loop.run_until_complete(go())
#
error = False
#
except GremlinServerError:
#
error = True
#
self.assertTrue(error)
#
#
def test_session_set(self):
#
#
@asyncio.coroutine
#
def go():
#
yield from self.gc.execute(self.script1)
#
new_sess = str(uuid.uuid4())
#
self.gc.session = new_sess
#
self.assertEqual(self.gc.session, new_sess)
#
results = yield from self.gc.execute(self.script2)
#
return results
#
try:
#
results = self.loop.run_until_complete(go())
#
error = False
#
except GremlinServerError:
#
error = True
#
self.assertTrue(error)
#
#
def test_resp_session(self):
#
#
@asyncio.coroutine
#
def go():
#
session = str(uuid.uuid4())
#
self.gc.session = session
#
resp = yield from self.gc.submit("x + x", bindings={"x": 4})
#
while True:
#
f = yield from resp.stream.read()
#
if f is None:
#
break
#
self.assertEqual(resp.session, session)
#