Page Menu
Home
Phabricator
Search
Configure Global Search
Log In
Files
F7159638
uri.py
No One
Temporary
Actions
View File
Edit File
Delete File
View Transforms
Subscribe
Mute Notifications
Award Token
Flag For Later
Size
14 KB
Referenced Files
None
Subscribers
None
uri.py
View Options
"""XCAP URI module
http://tools.ietf.org/html/rfc4825#section-6
"""
import
re
from
urllib
import
unquote
from
copy
import
copy
from
xml.sax.saxutils
import
quoteattr
from
lxml
import
_elementpath
as
ElementPath
from
application.configuration
import
*
from
application
import
log
from
xcap.errors
import
*
XPATH_DEFAULT_PREFIX
=
'default'
# should be more random
class
XCAPUser
(
object
):
"""XCAP ID."""
def
__init__
(
self
,
username
,
domain
):
self
.
username
=
username
self
.
domain
=
domain
@property
def
uri
(
self
):
return
'sip:
%s
@
%s
'
%
(
self
.
username
,
self
.
domain
)
def
__eq__
(
self
,
other
):
return
isinstance
(
other
,
XCAPUser
)
and
self
.
uri
==
other
.
uri
def
__ne__
(
self
,
other
):
return
not
self
.
__eq__
(
other
)
def
__nonzero__
(
self
):
return
bool
(
self
.
username
)
and
bool
(
self
.
domain
)
def
__str__
(
self
):
return
"
%s
@
%s
"
%
(
self
.
username
,
self
.
domain
)
def
__repr__
(
self
):
return
'XCAPUser(
%r
,
%r
)'
%
(
self
.
username
,
self
.
domain
)
@classmethod
def
parse
(
cls
,
user_id
,
default_domain
=
None
):
if
user_id
.
startswith
(
"sip:"
):
user_id
=
user_id
[
4
:]
_split
=
user_id
.
split
(
'@'
,
1
)
username
=
_split
[
0
]
if
len
(
_split
)
==
2
:
domain
=
_split
[
1
]
else
:
domain
=
default_domain
return
cls
(
username
,
domain
)
# XXX currently equivalent but differently encoded URIs won't be considered equal.
def
unquote_attr_value
(
s
):
if
len
(
s
)
>
1
and
s
[
0
]
==
s
[
-
1
]
and
s
[
0
]
in
'"
\'
'
:
# what about " and friends?
return
s
[
1
:
-
1
]
raise
NodeParsingError
def
xpath_tokenizer
(
p
):
out
=
[]
prev
=
None
for
op
,
tag
in
ElementPath
.
xpath_tokenizer
(
p
):
if
prev
==
'='
:
unq
=
unquote_attr_value
else
:
unq
=
unquote
if
op
:
x
=
Op
(
unq
(
op
))
else
:
x
=
Tag
(
unq
(
tag
))
out
.
append
(
x
)
prev
=
x
return
out
class
Op
(
str
):
tag
=
False
class
Tag
(
str
):
tag
=
True
class
TerminalSelector
(
object
):
pass
class
AttributeSelector
(
TerminalSelector
):
def
__init__
(
self
,
attribute
):
self
.
attribute
=
attribute
def
__str__
(
self
):
return
'@'
+
self
.
attribute
def
__repr__
(
self
):
return
'AttributeSelector(
%r
)'
%
self
.
attribute
class
NamespaceSelector
(
TerminalSelector
):
def
__str__
(
self
):
return
"namespace::*"
def
__repr__
(
self
):
return
'NamespaceSelector()'
class
Str
(
str
):
def
__repr__
(
self
):
return
'
%s
(
%s
)'
%
(
self
.
__class__
.
__name__
,
str
.
__repr__
(
self
))
class
NodeParsingError
(
ValueError
):
pass
def
parse_qname
(
qname
,
defnamespace
,
namespaces
):
if
qname
==
'*'
:
return
qname
try
:
prefix
,
name
=
qname
.
split
(
':'
)
except
ValueError
:
return
(
defnamespace
,
qname
)
else
:
return
(
namespaces
[
prefix
],
name
)
class
Step
(
object
):
def
__init__
(
self
,
name
,
position
=
None
,
att_name
=
None
,
att_value
=
None
):
self
.
name
=
name
self
.
position
=
position
self
.
att_name
=
att_name
self
.
att_value
=
att_value
def
__repr__
(
self
):
args
=
[
self
.
name
,
self
.
position
,
self
.
att_name
,
self
.
att_value
]
while
args
and
args
[
-
1
]
is
None
:
del
args
[
-
1
]
args
=
[
repr
(
x
)
for
x
in
args
]
return
'Step(
%s
)'
%
', '
.
join
(
args
)
def
step2str
(
step
,
namespace2prefix
=
{}):
try
:
namespace
,
name
=
step
.
name
except
ValueError
:
res
=
step
.
name
else
:
prefix
=
namespace2prefix
[
namespace
]
if
prefix
:
res
=
prefix
+
':'
+
name
else
:
res
=
name
if
step
.
position
is
not
None
:
res
+=
'[
%s
]'
%
step
.
position
if
step
.
att_name
is
not
None
:
namespace
,
name
=
step
.
att_name
if
namespace
:
prefix
=
namespace2prefix
[
namespace
]
else
:
prefix
=
None
if
prefix
:
res
+=
'[@
%s
:
%s
=
%s
]'
%
(
prefix
,
name
,
quoteattr
(
step
.
att_value
))
else
:
res
+=
'[@
%s
=
%s
]'
%
(
name
,
quoteattr
(
step
.
att_value
))
return
res
def
read_element_tag
(
lst
,
index
,
namespace
,
namespaces
):
if
index
==
len
(
lst
):
raise
NodeParsingError
elif
lst
[
index
]
==
'*'
:
return
'*'
,
index
+
1
elif
get
(
lst
,
index
+
1
)
==
':'
:
if
not
lst
[
index
]
.
tag
:
raise
NodeParsingError
if
not
get
(
lst
,
index
+
2
)
or
not
get
(
lst
,
index
+
2
)
.
tag
:
raise
NodeParsingError
return
(
namespaces
[
lst
[
index
]],
lst
[
index
+
2
]),
index
+
3
else
:
return
(
namespace
,
lst
[
index
]),
index
+
1
def
read_position
(
lst
,
index
):
if
get
(
lst
,
index
)
==
'['
and
get
(
lst
,
index
+
2
)
==
']'
:
return
int
(
lst
[
index
+
1
]),
index
+
3
return
None
,
index
# XML attributes don't belong to the same namespace as containing tag?
# because thats what I get in startElement/attrs.items - (None, 'tag')
# lxml's xpath works similar way too:
# doc.xpath('/default:rls-services/defaultg:service[@uri="sip:mybuddies@example.com"]',
# namespaces = {'default':"urn:ietf:params:xml:ns:rls-services"})
# works, while
# doc.xpath('/default:rls-services/defaultg:service[@default:uri="sip:mybuddies@example.com"]',
# namespaces = {'default':"urn:ietf:params:xml:ns:rls-services"})
# does not
# that's why _namespace parameter is ignored and None is supplied in that case
def
read_att_test
(
lst
,
index
,
_namespace
,
namespaces
):
if
get
(
lst
,
index
)
==
'['
and
get
(
lst
,
index
+
1
)
==
'@'
and
get
(
lst
,
index
+
3
)
==
'='
and
get
(
lst
,
index
+
5
)
==
']'
:
return
(
None
,
lst
[
index
+
2
]),
lst
[
index
+
4
],
index
+
6
elif
get
(
lst
,
index
)
==
'['
and
get
(
lst
,
index
+
1
)
==
'@'
and
get
(
lst
,
index
+
3
)
==
':'
\
and
get
(
lst
,
index
+
5
)
==
'='
and
get
(
lst
,
index
+
7
)
==
']'
:
return
(
namespaces
[
lst
[
index
+
2
]],
lst
[
index
+
4
]),
lst
[
index
+
6
],
index
+
8
return
None
,
None
,
index
def
get
(
lst
,
index
,
default
=
None
):
try
:
return
lst
[
index
]
except
LookupError
:
return
default
def
read_step
(
lst
,
index
,
namespace
,
namespaces
):
if
get
(
lst
,
index
)
==
'@'
:
return
AttributeSelector
(
lst
[
index
+
1
]),
index
+
2
elif
get
(
lst
,
index
)
==
'namespace'
and
get
(
lst
,
index
+
1
)
==
'::'
and
get
(
lst
,
index
+
2
)
==
'*'
:
return
NamespaceSelector
(),
index
+
3
else
:
tag
,
index
=
read_element_tag
(
lst
,
index
,
namespace
,
namespaces
)
position
,
index
=
read_position
(
lst
,
index
)
att_name
,
att_value
,
index
=
read_att_test
(
lst
,
index
,
namespace
,
namespaces
)
return
Step
(
tag
,
position
,
att_name
,
att_value
),
index
def
read_slash
(
lst
,
index
):
if
get
(
lst
,
index
)
==
'/'
:
return
index
+
1
raise
NodeParsingError
def
read_node_selector
(
lst
,
namespace
,
namespaces
):
index
=
0
if
get
(
lst
,
0
)
==
'/'
:
index
=
read_slash
(
lst
,
index
)
steps
=
[]
terminal_selector
=
None
while
True
:
step
,
index
=
read_step
(
lst
,
index
,
namespace
,
namespaces
)
if
isinstance
(
step
,
TerminalSelector
):
if
index
!=
len
(
lst
):
raise
NodeParsingError
terminal_selector
=
step
break
steps
.
append
(
step
)
if
index
==
len
(
lst
):
break
index
=
read_slash
(
lst
,
index
)
return
ElementSelector
(
steps
,
namespace
,
namespaces
),
terminal_selector
def
parse_node_selector
(
s
,
namespace
=
None
,
namespaces
=
None
):
"""
>>> parse_node_selector('/resource-lists', None, {})
([Step((None, 'resource-lists'))], None)
>>> parse_node_selector('/resource-lists/list[1]/entry[@uri="sip:bob@example.com"]', None, {})
([Step((None, 'resource-lists')), Step((None, 'list'), 1), Step((None, 'entry'), None, (None, 'uri'), 'sip:bob@example.com')], None)
>>> parse_node_selector('/*/list[1][@name="friends"]/@name')
([Step('*'), Step((None, 'list'), 1, (None, 'name'), 'friends')], AttributeSelector('name'))
>>> parse_node_selector('/*[10][@att="val"]/namespace::*')
([Step('*', 10, (None, 'att'), 'val')], NamespaceSelector())
>>> x = parse_node_selector('/resource-lists/list[@name="friends"]/external[@anchor="http://xcap.example.org/resource-lists/users/sip:a@example.org/index/~~/resource-lists/list%5b@name=%22mkting%22%5d"]')
"""
if
namespaces
is
None
:
namespaces
=
{}
tokens
=
xpath_tokenizer
(
s
)
try
:
return
read_node_selector
(
tokens
,
namespace
,
namespaces
)
except
NodeParsingError
,
ex
:
ex
.
args
=
(
repr
(
s
),)
raise
except
:
log
.
error
(
'internal error in parse_node_selector(
%r
)'
%
s
)
raise
class
ElementSelector
(
list
):
def
__init__
(
self
,
lst
,
namespace
,
namespaces
):
list
.
__init__
(
self
,
lst
)
self
.
namespace
=
namespace
self
.
namespaces
=
namespaces
def
replace_default_prefix
(
self
,
namespace2prefix
):
"fix string representation so it'll work with lxml xpath"
steps
=
[]
for
step
in
self
:
try
:
namespace
,
name
=
step
.
name
except
ValueError
:
steps
.
append
(
str
(
step
))
else
:
steps
.
append
(
step2str
(
step
,
namespace2prefix
))
return
'/'
+
'/'
.
join
(
steps
)
xml_tag
=
re
.
compile
(
'\s*<([^ >/]+)'
)
def
fix_star
(
self
,
element_body
):
"""
>>> elem_selector = parse_node_selector('/watcherinfo/watcher-list/*[@id="8ajksjda7s"]', None, {})[0]
>>> elem_selector.fix_star('<watcher/>')[-1].name[1]
'watcher'
"""
if
self
and
self
[
-
1
]
.
name
==
'*'
and
self
[
-
1
]
.
position
is
None
:
m
=
self
.
xml_tag
.
match
(
element_body
)
if
m
:
(
name
,
)
=
m
.
groups
()
result
=
copy
(
self
)
result
[
-
1
]
.
name
=
parse_qname
(
name
,
self
.
namespace
,
self
.
namespaces
)
return
result
return
self
class
NodeSelector
(
object
):
XMLNS_REGEXP
=
re
.
compile
(
"xmlns\((?P<nsdata>.*?)\)"
)
def
__init__
(
self
,
selector
,
namespace
=
None
):
sections
=
selector
.
split
(
'?'
,
1
)
if
len
(
sections
)
==
2
:
## a query component is present
self
.
ns_bindings
=
self
.
_parse_query
(
sections
[
1
])
else
:
self
.
ns_bindings
=
{}
self
.
element_selector
,
self
.
terminal_selector
=
parse_node_selector
(
selector
,
namespace
,
self
.
ns_bindings
)
## http://www.w3.org/TR/2003/REC-xptr-xmlns-20030325/
def
_parse_query
(
self
,
query
):
"""Return a dictionary of namespace bindings defined by the xmlns() XPointer
expressions from the given query."""
ns_bindings
=
{}
ns_matches
=
self
.
XMLNS_REGEXP
.
findall
(
query
)
for
m
in
ns_matches
:
try
:
prefix
,
ns
=
m
.
split
(
'='
)
ns_bindings
[
prefix
]
=
ns
except
ValueError
:
log
.
error
(
"Ignoring invalid XPointer XMLNS expression:
%s
"
%
m
)
continue
return
ns_bindings
def
replace_default_prefix
(
self
,
defprefix
=
XPATH_DEFAULT_PREFIX
,
append_terminal
=
True
):
namespace2prefix
=
dict
((
v
,
k
)
for
(
k
,
v
)
in
self
.
ns_bindings
)
namespace2prefix
[
self
.
element_selector
.
namespace
]
=
defprefix
res
=
self
.
element_selector
.
replace_default_prefix
(
namespace2prefix
)
if
append_terminal
and
self
.
terminal_selector
:
res
+=
'/'
+
str
(
self
.
terminal_selector
)
return
res
def
get_ns_bindings
(
self
,
default_ns
):
ns_bindings
=
self
.
ns_bindings
.
copy
()
ns_bindings
[
XPATH_DEFAULT_PREFIX
]
=
default_ns
return
ns_bindings
class
DocumentSelector
(
Str
):
"""Constructs a DocumentSelector containing the application_id, context, user_id
and document from the given selector string.
>>> x = DocumentSelector('/resource-lists/users/sip:joe@example.com/index')
>>> x.application_id, x.context, x.user_id, x.document_path
('resource-lists', 'users', 'sip:joe@example.com', 'index')
>>> x = DocumentSelector('/rls-services/global/index')
>>> x.application_id, x.context, x.user_id, x.document_path
('rls-services', 'global', None, 'index')
"""
def
__init__
(
self
,
selector
):
if
not
isinstance
(
selector
,
str
):
raise
TypeError
(
"Document Selector must be a string"
)
segments
=
selector
.
split
(
'/'
)
if
not
segments
[
0
]:
## ignore first '/'
segments
.
pop
(
0
)
if
not
segments
[
-
1
]:
## ignore last '/' if present
segments
.
pop
()
if
len
(
segments
)
<
2
:
raise
ValueError
(
"invalid Document Selector"
)
self
.
application_id
=
segments
[
0
]
self
.
context
=
segments
[
1
]
## either "global" or "users"
if
self
.
context
not
in
(
"users"
,
"global"
):
raise
ValueError
(
"the Document Selector context must be 'users' or 'global': '
%s
'"
%
self
.
context
)
self
.
user_id
=
None
if
self
.
context
==
"users"
:
self
.
user_id
=
segments
[
2
]
segments
=
segments
[
3
:]
else
:
segments
=
segments
[
2
:]
if
not
segments
:
raise
ValueError
(
"invalid Document Selector: missing document's path"
)
self
.
document_path
=
'/'
.
join
(
segments
)
class
XCAPUri
(
object
):
"""An XCAP URI containing the XCAP root, document selector and node selector."""
node_selector_separator
=
"~~"
def
__init__
(
self
,
xcap_root
,
resource_selector
,
namespaces
):
"namespaces maps application id to default namespace"
self
.
xcap_root
=
xcap_root
self
.
resource_selector
=
resource_selector
realm
=
None
# convention to get the realm if it's not contained in the user ID section
# of the document selector (bad eyebeam)
if
self
.
resource_selector
.
startswith
(
"@"
):
first_slash
=
self
.
resource_selector
.
find
(
"/"
)
realm
=
self
.
resource_selector
[
1
:
first_slash
]
self
.
resource_selector
=
self
.
resource_selector
[
first_slash
:]
_split
=
self
.
resource_selector
.
split
(
self
.
node_selector_separator
,
1
)
doc_selector
=
_split
[
0
]
try
:
self
.
doc_selector
=
DocumentSelector
(
doc_selector
)
## the Document Selector
except
(
TypeError
,
ValueError
),
e
:
log
.
error
(
"Invalid Document Selector
%s
(
%s
)"
%
(
doc_selector
,
str
(
e
)))
raise
ResourceNotFound
(
str
(
e
))
self
.
application_id
=
self
.
doc_selector
.
application_id
if
len
(
_split
)
==
2
:
## the Node Selector
self
.
node_selector
=
NodeSelector
(
_split
[
1
],
namespaces
.
get
(
self
.
application_id
))
else
:
self
.
node_selector
=
None
if
self
.
doc_selector
.
user_id
:
self
.
user
=
XCAPUser
.
parse
(
self
.
doc_selector
.
user_id
,
realm
)
else
:
self
.
user
=
XCAPUser
(
None
,
realm
)
def
__str__
(
self
):
return
self
.
xcap_root
+
self
.
resource_selector
if
__name__
==
'__main__'
:
import
doctest
doctest
.
testmod
()
File Metadata
Details
Attached
Mime Type
text/plain
Expires
Sat, Nov 23, 8:10 AM (1 d, 10 h)
Storage Engine
blob
Storage Format
Raw Data
Storage Handle
3409040
Default Alt Text
uri.py (14 KB)
Attached To
Mode
rOPENXCAP OpenXCAP
Attached
Detach File
Event Timeline
Log In to Comment