Login
[x]
Log in using an account from:
Fedora Account System
Red Hat Associate
Red Hat Customer
Or login using a Red Hat Bugzilla account
Forgot Password
Login:
Hide Forgot
Create an Account
Red Hat Bugzilla – Attachment 833118 Details for
Bug 1038586
Python client SSL authentication passes when "ssl_skip_hostname_check" is "false" and "ssl_trustfile" is not given
[?]
New
Simple Search
Advanced Search
My Links
Browse
Requests
Reports
Current State
Search
Tabular reports
Graphical reports
Duplicates
Other Reports
User Changes
Plotly Reports
Bug Status
Bug Severity
Non-Defaults
|
Product Dashboard
Help
Page Help!
Bug Writing Guidelines
What's new
Browser Support Policy
5.0.4.rh83 Release notes
FAQ
Guides index
User guide
Web Services
Contact
Legal
This site requires JavaScript to be enabled to function correctly, please enable it.
bz reproducer
spout.py (text/x-python), 7.04 KB, created by
Petra Svobodová
on 2013-12-05 12:32:35 UTC
(
hide
)
Description:
bz reproducer
Filename:
MIME Type:
Creator:
Petra Svobodová
Created:
2013-12-05 12:32:35 UTC
Size:
7.04 KB
patch
obsolete
>#!/usr/bin/env python ># ># Licensed to the Apache Software Foundation (ASF) under one ># or more contributor license agreements. See the NOTICE file ># distributed with this work for additional information ># regarding copyright ownership. The ASF licenses this file ># to you under the Apache License, Version 2.0 (the ># "License"); you may not use this file except in compliance ># with the License. You may obtain a copy of the License at ># ># http://www.apache.org/licenses/LICENSE-2.0 ># ># Unless required by applicable law or agreed to in writing, ># software distributed under the License is distributed on an ># "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY ># KIND, either express or implied. See the License for the ># specific language governing permissions and limitations ># under the License. ># > >import optparse, time >from qpid.messaging import * >from qpid.util import URL >from qpid.log import enable, DEBUG, WARN >from copy import copy > >def nameval(st): > idx = st.find("=") > if idx >= 0: > name = st[0:idx] > value = st[idx+1:] > else: > name = st > value = None > return name, value > >def quote_strdict(in_str): > ''' Quote string dictionary ''' > bq_mark = [ '{', ',', ':' ]; > eq_mark = [ ':', '}', ',' ]; > q_mark = [ "'", '"' ]; > q_char = "'"; > > app_eq=None; > app_bq=None; > app_indx=None; > app_bq_indx=None; > int_list = list(in_str); > > for i in range(len(int_list)): > # browse through characters > > if ( not(int_list[i] in bq_mark + eq_mark) and not(int_list[i].isspace()) ): > # remember current index if character is alphabet > app_indx=i; > > # detect whether beginning / ending quoting is needed > if ( int_list[i] in bq_mark ): > app_bq = True; > app_bq_indx = i; > if ( int_list[i] in eq_mark ): > app_eq = True; > > if ((app_bq == True) and (app_bq_indx != None) and (app_indx > app_bq_indx)): > # apply beginning quote (if not present) > if (not(int_list[app_indx] in q_mark)): > int_list[app_indx] = q_char + int_list[app_indx]; > app_bq = False; > app_bq_indx = None; > > if ((app_eq == True) and (app_indx != None)): > # apply ending quote (if not present) > if (not(int_list[app_indx] in q_mark)): > int_list[app_indx] = int_list[app_indx] + q_char; > app_eq = False; > > return ''.join(int_list) > >def hard_retype(value): > try: > return int(value) > except ValueError: > try: > return float(value) > except ValueError: > if value.lower() in ('true','false'): > return value.lower() == 'true' > else: > return value > >def retype(name, value, check_type=False): > > if check_type: > strings_opts = "username password sasl-mechanism sasl_mechanisms protocol" > int_opts = "reconnect_timeout reconnect_limit reconnect_interval_min" \ > "reconnect_interval_max reconnect_interval heartbeat" > bool_opts = "tcp-nodelay reconnect" > list_opts = "reconnect_urls" > > if name == list_opts: > return value.split() > if name in strings_opts: > return value > elif name in int_opts: > return int(value) > elif name in bool_opts: > return value.lower() == 'true' > else: > return hard_retype(value) > else: > if name == "reconnect_urls": > return value.split() > return hard_retype(value) > > >def dictionary(value): > dict = {} > for nameval in value.strip('{}').split(','): > name, val = nameval.split(':') > name, val = name.strip(), val.strip() > dict[name.strip("'")] = retype(name.strip("'"), val.strip("'")) > return dict > >def check_dict(option, opt, value): > try: > return dictionary(value) > except ValueError: > raise optparse.OptionValueError( > "option %s: invalid dict string value: %r" % (opt, value)) > > >class OptsDict(optparse.Option): > TYPES = optparse.Option.TYPES + ("dict",) > TYPE_CHECKER = copy(optparse.Option.TYPE_CHECKER) > TYPE_CHECKER["dict"] = check_dict > > > > >parser = optparse.OptionParser(option_class=OptsDict, usage="usage: %prog [options] ADDRESS [ CONTENT ... ]", > description="Send messages to the supplied address.") > >#parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS [ CONTENT ... ]", ># description="Send messages to the supplied address.") >parser.add_option("-b", "--broker", default="localhost", > help="connect to specified BROKER (default %default)") >parser.add_option("-r", "--reconnect", action="store_true", > help="enable auto reconnect") >parser.add_option("-i", "--reconnect-interval", type="float", default=3, > help="interval between reconnect attempts") >parser.add_option("-l", "--reconnect-limit", type="int", > help="maximum number of reconnect attempts") >parser.add_option("-c", "--count", type="int", default=1, > help="stop after count messages have been sent, zero disables (default %default)") >parser.add_option("-t", "--timeout", type="float", default=None, > help="exit after the specified time") >parser.add_option("-I", "--id", help="use the supplied id instead of generating one") >parser.add_option("-S", "--subject", help="specify a subject") >parser.add_option("-R", "--reply-to", help="specify reply-to address") >parser.add_option("-P", "--property", dest="properties", action="append", default=[], > metavar="NAME=VALUE", help="specify message property") >parser.add_option("-M", "--map", dest="entries", action="append", default=[], > metavar="KEY=VALUE", > help="specify map entry for message body") >parser.add_option("-v", dest="verbose", action="store_true", > help="enable logging") >parser.add_option("--connection-options", dest="conn_options", action="store", type="dict", > help="connection options string in the form {name1:value,name2:value2}", default={}) > >opts, args = parser.parse_args() > >if opts.verbose: > enable("qpid", DEBUG) >else: > enable("qpid", WARN) > >if opts.id is None: > spout_id = str(uuid4()) >else: > spout_id = opts.id >if args: > addr = args.pop(0) >else: > parser.error("address is required") > >content = None >content_type = None > >if args: > text = " ".join(args) >#elif opts.content: ># text = opts.content >else: > text = None > >if opts.entries: > content = {} > if text: > content["text"] = text > for e in opts.entries: > name, val = nameval(e) > content[name] = val >else: > content = text > content_type = "text/plain" > >conn = Connection(opts.broker, **opts.conn_options) >ssn = None > >try: > conn.open() > ssn = conn.session() > snd = ssn.sender(addr) > > count = 0 > start = time.time() > while (opts.count == 0 or count < opts.count) and \ > (opts.timeout is None or time.time() - start < opts.timeout): > msg = Message(subject=opts.subject, > reply_to=opts.reply_to, > content=content) > msg.properties["spout-id"] = "%s:%s" % (spout_id, count) > for p in opts.properties: > name, val = nameval(p) > msg.properties[name] = val > > snd.send(msg) > count += 1 > print msg >except SendError, e: > print e >except KeyboardInterrupt: > pass > >if ssn != None: > ssn.close() >if conn != None: > conn.close()
You cannot view the attachment while viewing its details because your browser does not support IFRAMEs.
View the attachment on a separate page
.
View Attachment As Raw
Actions:
View
Attachments on
bug 1038586
: 833118 |
917919
|
918475