def do_POST(self):
"""implementation of POST handler"""
content_length = int(self.headers['Content-Length'])
+ if content_length == 0:
+ log.info('HTTP Server received iempty event')
+ self.send_response(200)
+ self.end_headers()
+ return
body = self.rfile.read(content_length)
if self.server.cloudevents:
event = from_http(self.headers, body)
time.sleep(self.server.delay)
self.end_headers()
+import requests
class HTTPServerWithEvents(ThreadingHTTPServer):
"""multithreaded HTTP server used by the handler to store events"""
log.error('http server on %s failed to start. closing...', str(self.addr))
self.close()
assert False
+ # make sure that http handler is able to consume requests
+ url = 'http://{}:{}'.format(self.addr[0], self.addr[1])
+ response = requests.post(url, {})
+ print(response)
+ assert response.status_code == 200
def run(self):