1 module dpq.connection;
2 
3 import derelict.pq.pq;
4 
5 import dpq.exception;
6 import dpq.result;
7 import dpq.value;
8 import dpq.attributes;
9 import dpq.querybuilder;
10 import dpq.meta;
11 import dpq.prepared;
12 import dpq.smartptr;
13 
14 import std.string;
15 import derelict.pq.pq;
16 import std.conv : to;
17 import std.traits;
18 import std.typecons;
19 
20 
21 version(unittest)
22 {
23 	import std.stdio;
24 	Connection c;
25 }
26 
27 /**
28 	Represents the PostgreSQL connection and allows executing queries on it.
29 
30 	Examples:
31 	-------------
32 	auto conn = Connection("host=localhost dbname=testdb user=testuser");
33 	//conn.exec ...
34 	-------------
35 */
36 struct Connection
37 {
38 	alias ConnectionPtr = SmartPointer!(PGconn*, PQfinish);
39 
40 	private ConnectionPtr _connection;
41 	private PreparedStatement[string] _prepared;
42 
43 	/**
44 		Connection constructor
45 
46 		Params:
47 			connString = connection string
48 
49 		See Also:
50 			http://www.postgresql.org/docs/9.3/static/libpq-connect.html#LIBPQ-CONNSTRING
51 	*/
52 	this(string connString)
53 	{
54 		char* err;
55 		auto opts = PQconninfoParse(cast(char*)connString.toStringz, &err);
56 
57 		if (err != null)
58 			throw new DPQException(err.fromStringz.to!string);
59 
60 		_connection = new ConnectionPtr(PQconnectdb(connString.toStringz));
61 
62 		if (status != ConnStatusType.CONNECTION_OK)
63 			throw new DPQException(errorMessage);
64 
65 		_dpqLastConnection = &this;
66 	}
67 
68 	unittest
69 	{
70 		c = Connection("dbname=test user=test");
71 		writeln(" * Database connection with connection string");
72 		assert(c.status == ConnStatusType.CONNECTION_OK);
73 	}
74 
75 	/** 
76 		Close the connection manually 
77 	*/
78 	void close()
79 	{
80 		_connection.clear();
81 	}
82 
83 	@property const(ConnStatusType) status()
84 	{
85 		return PQstatus(_connection);
86 	}
87 
88 	/** Returns the name of the database currently selected */
89 	@property const(string) db()
90 	{
91 		return PQdb(_connection).to!string;
92 	}
93 
94 	/** Returns the name of the current user */
95 	@property const(string) user()
96 	{
97 		return PQuser(_connection).to!string;
98 	}
99 
100 	/// ditto, but password
101 	@property const(string) password()
102 	{
103 		return PQpass(_connection).to!string;
104 	}
105 
106 	/// ditto, but host
107 	@property const(string) host()
108 	{
109 		return PQhost(_connection).to!string;
110 	}
111 
112 	/// ditto, but port
113 	@property const(ushort) port()
114 	{
115 		return PQport(_connection).fromStringz.to!ushort;
116 	}
117 
118 	/** // FIXME: BROKEN ATM
119 		Executes the given string directly
120 
121 		Throws on fatal query errors like bad syntax
122 
123 		Examples:
124 		----------------
125 		Connection conn; // An established connection
126 
127 		conn.exec("CREATE TABLE IF NOT EXISTS test_table");
128 		----------------
129 	*/
130 	Result exec(string command)
131 	{
132 		PGresult* res = PQexec(_connection, cast(const char*)command.toStringz);
133 		return Result(res);
134 	}
135 
136 	/*
137 	unittest
138 	{
139 		auto res = c.exec("SELECT 1::INT4 AS int4, 2::INT8 AS some_long");
140 		writeln(" * exec for selecting INT4 and INT8");
141 		assert(res.rows == 1);
142 		assert(res.columns == 2);
143 
144 		auto r = res[0];
145 		assert(r[0].as!int == 1);
146 		assert(r[0].as!long == 2);
147 
148 		writeln(" * Row opIndex(int) and opIndex(string) equality ");
149 		assert(r[0] == r["int4"]);
150 		assert(r[1] == r["some_long"]);
151 
152 	}
153 	*/
154 
155 	/// ditto, async
156 	bool send(string command)
157 	{
158 		return PQsendQuery(_connection, cast(const char*)command.toStringz) == 1;
159 	}
160 
161 	/**
162 		Executes the given string with given params
163 
164 		Params should be given as $1, $2, ... $n in the actual command.
165 		All params are sent in a binary format and should not be escaped.
166 		If a param's type cannot be inferred, this method will throw an exception,
167 		in this case, either specify the type using the :: (cast) notation or
168 		make sure the type can be inferred by PostgreSQL in your query.
169 
170 		Examples:
171 		----------------
172 		Connection conn; // An established connection
173 
174 		conn.execParams("SELECT $1::string, $2::int, $3::double");
175 		----------------
176 
177 		See also:
178 			http://www.postgresql.org/docs/9.3/static/libpq-exec.html
179 	*/
180 	Result execParams(T...)(string command, T params)
181 	{
182 		Value[] values;
183 		foreach(param; params)
184 			values ~= Value(param);
185 
186 		return execParams(command, values);
187 	}
188 
189 
190 	void sendParams(T...)(string command, T params)
191 	{
192 		Value[] values;
193 		foreach(param; params)
194 			values ~= Value(param);
195 
196 		execParams(command, values, true);
197 	}
198 
199 	unittest
200 	{
201 		auto res = c.execParams("SELECT 1::INT4 AS int4, 2::INT8 AS some_long", []);
202 		writeln("\t * execParams");
203 		writeln("\t\t * Rows and cols");
204 
205 		assert(res.rows == 1);
206 		assert(res.columns == 2);
207 
208 		writeln("\t\t * Static values");
209 		auto r = res[0];
210 		assert(r[0].as!int == 1);
211 		assert(r[1].as!long == 2);
212 
213 		writeln("\t\t * opIndex(int) and opIndex(string) equality");
214 		assert(r[0] == r["int4"]);
215 		assert(r[1] == r["some_long"]);
216 
217 		int int4 = 1;
218 		long int8 = 2;
219 		string str = "foo bar baz";
220 		float float4 = 3.14;
221 		double float8 = 3.1415;
222 
223 		writeln("\t\t * Passed values");
224 		res = c.execParams(
225 				"SELECT $1::INT4, $2::INT8, $3::TEXT, $4::FLOAT4, $5::FLOAT8",
226 				int4,
227 				int8,
228 				str,
229 				float4,
230 				float8);
231 
232 		assert(res.rows == 1);
233 		r = res[0];
234 
235 		assert(r[0].as!int == int4);
236 		assert(r[1].as!long == int8);
237 		assert(r[2].as!string == str);
238 		assert(r[3].as!float == float4);
239 		assert(r[4].as!double == float8);
240 	}
241 
242 	/// ditto, but taking an array of params, instead of variadic template
243 	Result execParams(string command, Value[] params, bool async = false)
244 	{
245 		const char* cStr = cast(const char*) command.toStringz;
246 
247 		auto pTypes = params.paramTypes;
248 		auto pValues = params.paramValues;
249 		auto pLengths = params.paramLengths;
250 		auto pFormats = params.paramFormats;
251 
252 		if (async)
253 		{
254 			PQsendQueryParams(
255 				_connection,
256 				cStr,
257 				params.length.to!int,
258 				pTypes.ptr,
259 				pValues.ptr,
260 				pLengths.ptr,
261 				pFormats.ptr,
262 				1);
263 
264 			return Result(null);
265 		}
266 		else
267 			return Result(PQexecParams(
268 					_connection, 
269 					cStr, 
270 					params.length.to!int,
271 					pTypes.ptr, 
272 					pValues.ptr,
273 					pLengths.ptr,
274 					pFormats.ptr,
275 					1));
276 	}
277 
278 	/// ditto, async
279 	void sendParams(string command, Value[] params)
280 	{
281 		execParams(command, params, true);
282 	}
283 
284 	/**
285 		Returns the last error message
286 
287 		Examples:
288 		--------------------
289 		Connection conn; // An established connection
290 
291 		writeln(conn.errorMessage);
292 		--------------------
293 		
294 	 */
295 	@property string errorMessage()
296 	{
297 		return PQerrorMessage(_connection).to!string;
298 	}
299 
300 	unittest
301 	{
302 		writeln("\t * errorMessage");
303 		try
304 		{
305 			c.execParams("SELECT_BADSYNTAX $1::INT4", 1);
306 		}
307 		catch {}
308 
309 		assert(c.errorMessage.length != 0);
310 
311 	}
312 
313 	/**
314 		Generates and runs the DDL from the given structures
315 
316 		Attributes from dpq.attributes should be used to define
317 		primary keys, indexes, and relationships.
318 
319 		A custom type can be specified with the @type attribute.
320 
321 		Examples:
322 		-----------------------
323 		Connection conn; // An established connection
324 		struct User 
325 		{
326 			@serial8 @PKey long id;
327 			string username;
328 			byte[] passwordHash;
329 		};
330 
331 		struct Article { ... };
332 
333 		conn.ensureSchema!(User, Article);
334 		-----------------------
335 	*/
336 	void ensureSchema(T...)(bool createType = false)
337 	{
338 		import std.stdio;
339 		string[] additional;
340 
341 		foreach (type; T)
342 		{
343 			enum name = relationName!(type);
344 			string str;
345 			if (createType)
346 				str = "CREATE TYPE \"" ~ name ~ "\" AS (%s)";
347 			else
348 				str = "CREATE TABLE IF NOT EXISTS \"" ~ name ~ "\" (%s)";
349 
350 			string cols;
351 			foreach(m; serialisableMembers!type)
352 			{
353 				string colName = attributeName!(mixin("type." ~ m));
354 				cols ~= "\"" ~ colName ~ "\"";
355 
356 				// HACK: typeof a @property seems to be failing hard
357 				static if (is(FunctionTypeOf!(mixin("type." ~ m)) == function))
358 					alias t = typeof(mixin("type()." ~ m));
359 				else
360 					alias t = typeof(mixin("type." ~ m));
361 
362 				cols ~= " ";
363 
364 				// Basic data types
365 				static if (hasUDA!(mixin("type." ~ m), PGTypeAttribute))
366 					cols ~= getUDAs!(mixin("type." ~ m), PGTypeAttribute)[0].type;
367 				else
368 				{
369 					alias tu = Unqual!t;
370 					static if (ShouldRecurse!(mixin("type." ~ m)))
371 					{
372 						ensureSchema!tu(true);
373 						cols ~= '"' ~ relationName!tu ~ '"';
374 					}
375 					else
376 						cols ~= SQLType!tu;
377 				}
378 				
379 				// Primary key
380 				static if (hasUDA!(mixin("type." ~ m), PrimaryKeyAttribute))
381 				{
382 					if (!createType)
383 						cols ~= " PRIMARY KEY";
384 				}
385 				// Index
386 				else static if (hasUDA!(mixin("type." ~ m), IndexAttribute))
387 				{
388 					enum uda = getUDAs!(mixin("type." ~ m), IndexAttribute)[0];
389 					additional ~= "CREATE%sINDEX \"%s\" ON \"%s\" (\"%s\")".format(
390 							uda.unique ? " UNIQUE " : " ",
391 							"%s_%s_index".format(name, colName),
392 							name,
393 							colName);
394 
395 					// DEBUG
396 				}
397 				// Foreign key
398 				else static if (hasUDA!(mixin("type." ~ m), ForeignKeyAttribute))
399 				{
400 					enum uda = getUDAs!(mixin("type." ~ m), ForeignKeyAttribute)[0];
401 					additional ~= 
402 						"ALTER TABLE \"%s\" ADD CONSTRAINT \"%s\" FOREIGN KEY (\"%s\") REFERENCES \"%s\" (\"%s\")".format(
403 								name,
404 								"%s_%s_fk_%s".format(name, colName, uda.relation),
405 								colName,
406 								uda.relation,
407 								uda.pkey);
408 
409 					// Create an index on the FK too
410 					additional ~= "CREATE INDEX \"%s\" ON \"%s\" (\"%s\")".format(
411 							"%s_%s_fk_index".format(name, colName),
412 							name,
413 							colName);
414 
415 				}
416 
417 				cols ~= ", ";
418 			}
419 
420 			cols = cols[0 .. $ - 2];
421 			str = str.format(cols);
422 			if (createType)
423 			{
424 				try
425 				{
426 					exec(str);
427 				}
428 				catch {} // Do nothing, type already exists
429 			}
430 			else
431 				exec(str);
432 		}
433 		foreach (cmd; additional)
434 		{
435 			try
436 			{
437 				exec(cmd);
438 			}
439 			catch {} // This just means the constraint/index already exists
440 		}
441 	}
442 
443 	unittest
444 	{
445 		// Probably needs more thorough testing, let's assume right now
446 		// everything is correct if the creating was successful.
447 
448 		writeln("\t * ensureSchema");
449 		struct Inner
450 		{
451 			string innerStr;
452 			int innerInt;
453 		}
454 
455 		struct TestTable1
456 		{
457 			@serial8 @PK long id;
458 			string str;
459 			int n;
460 			@embed Inner inner;
461 		}
462 
463 		c.ensureSchema!TestTable1;
464 		
465 		auto res = c.execParams(
466 				"SELECT COUNT(*) FROM pg_catalog.pg_tables WHERE tablename = $1",
467 				relationName!TestTable1);
468 
469 		assert(res.rows == 1);
470 		assert(res[0][0].as!long == 1);
471 
472 		c.exec("DROP TABLE " ~ relationName!TestTable1);
473 		c.exec("DROP TYPE \"" ~ relationName!Inner ~ "\" CASCADE");
474 	}
475 
476 	/**
477 		Returns the requested structure or a Nullable null value if no rows are returned
478 
479 		This method queries for the given structure by its primary key. If no
480 		primary key can be found, a compile-time error will be generated.
481 
482 		Examples:
483 		----------------------
484 		Connection conn; // An established connection
485 		struct User
486 		{
487 			@serial @PKey int id;
488 			...
489 		};
490 
491 		auto user = conn.findOne!User(1); // will search by the id attribute
492 		----------------------
493 	*/
494 	Nullable!T findOne(T, U)(U id)
495 	{
496 		return findOneBy!T(primaryKeyName!T, id);
497 	}
498 
499 	unittest
500 	{
501 		writeln("\t * findOne(T)(U id), findOneBy, findOne");
502 		struct Testy
503 		{
504 			@serial @PK int id;
505 			string foo;
506 			int bar;
507 			long baz;
508 			int[] intArr;
509 			//string[] stringArr; // TODO: string[]
510 		}
511 
512 		c.ensureSchema!Testy;
513 
514 		writeln("\t\t * Null result");
515 		auto shouldBeNull = c.findOne!Testy(0);
516 		assert(shouldBeNull.isNull);
517 
518 		c.exec("INSERT INTO " ~ relationName!Testy ~ " (id, foo, bar, baz, " ~ attributeName!(Testy.intArr) ~ ") "~
519 				"VALUES (1, 'somestr', 2, 3, '{1,2,3}')");
520 
521 		writeln("\t\t * Valid result");
522 		Testy t = c.findOne!Testy(1);
523 		assert(t.id == 1, `t.id == 1` );
524 		assert(t.foo == "somestr", `t.foo == "somestr"`);
525 		assert(t.bar == 2, `t.bar == 2`);
526 		assert(t.baz == 3, `t.baz == 3`);
527 		assert(t.intArr == [1,2,3], `t.intArr == [1,2,3]`);
528 		//assert(t.stringArr == ["asd", "qwe"]);
529 
530 		writeln("\t\t * findOne with custom filter");
531 		Testy t2 = c.findOne!Testy("id = $1", 1);
532 		assert(t == t2);
533 
534 		c.exec("DROP TABLE " ~ relationName!Testy);
535 	}
536 
537 	/**
538 		Returns the requestes structure, searches by the given column name
539 		with the given value
540 		If not rows are returned, a Nullable null value is returned
541 
542 		Examples:
543 		----------------------
544 		Connection conn; // An established connection
545 		struct User
546 		{
547 			@serial @PKey int id;
548 			...
549 		};
550 
551 		auto user = conn.findOneBy!User("id", 1); // will search by "id"
552 		----------------------
553 	*/
554 	Nullable!T findOneBy(T, U)(string col, U val)
555 	{
556 		import std.stdio;
557 
558 		auto members = AttributeList!T;
559 
560 		QueryBuilder qb;
561 		qb.select(members)
562 			.from(relationName!T)
563 			.where( col ~ " = {col_" ~ col ~ "}")
564 			.limit(1);
565 
566 		qb["col_" ~ col] = val;
567 
568 		auto q = qb.query(this);
569 
570 		auto r = q.run();
571 		if (r.rows == 0)
572 			return Nullable!T.init;
573 
574 		//return T();
575 		
576 		auto res = deserialise!T(r[0]);
577 		return Nullable!T(res);
578 	}
579 	
580 	/**
581 		Returns the requested structure, searches by the specified filter
582 		with given params
583 
584 		The filter is not further escaped, so programmer needs to make sure
585 		not to properly escape or enclose reserved keywords (like user -> "user")
586 		so PostgreSQL can understand them.
587 
588 		If not rows are returned, a Nullable null value is returned
589 
590 		Examples:
591 		----------------------
592 		Connection conn; // An established connection
593 		struct User
594 		{
595 			@serial @PKey int id;
596 			string username;
597 			int posts;
598 		};
599 
600 		auto user = conn.findOne!User("username = $1 OR posts > $2", "foo", 42);
601 		if (!user.isNull)
602 		{
603 			... // do something
604 		}
605 		----------------------
606 	*/
607 	Nullable!T findOne(T, U...)(string filter, U vals)
608 	{
609 		QueryBuilder qb;
610 		qb.select(AttributeList!T)
611 			.from(relationName!T)
612 			.where(filter)
613 			.limit(1);
614 
615 		auto q = qb.query(this);
616 		auto r = q.run(vals);
617 
618 		if (r.rows == 0)
619 			return Nullable!T.init;
620 
621 		auto res = deserialise!T(r[0]);
622 		return Nullable!T(res);
623 	}
624 
625 	/**
626 		Returns an array of the specified type, filtered with the given filter and
627 		params
628 
629 		If no rows are returned by PostgreSQL, an empty array is returned.
630 
631 		Examples:
632 		----------------------
633 		Connection conn; // An established connection
634 		struct User
635 		{
636 			@serial @PKey int id;
637 			string username;
638 			int posts;
639 		};
640 
641 		auto users = conn.find!User("username = $1 OR posts > $2", "foo", 42);
642 		foreach (u; users)
643 		{
644 			... // do something
645 		}
646 		----------------------
647 	*/
648 	T[] find(T, U...)(string filter = "", U vals = U.init)
649 	{
650 		QueryBuilder qb;
651 		qb.select(AttributeList!T)
652 			.from(relationName!T)
653 			.where(filter);
654 
655 		auto q = qb.query(this);
656 
657 		T[] res;
658 		foreach (r; q.run(vals))
659 			res ~= deserialise!T(r);
660 
661 		return res;
662 	}
663 
664 	unittest
665 	{
666 		writeln("\t * find");
667 
668 		@relation("find_test")
669 		struct Test
670 		{
671 			@serial @PK int id;
672 			@attr("my_n") int n;
673 		}
674 
675 		c.ensureSchema!Test;
676 
677 		Test t;
678 		t.n = 1;
679 
680 		c.insert(t);
681 		c.insert(t);
682 		++t.n;
683 		c.insert(t);
684 		c.insert(t);
685 		c.insert(t);
686 
687 		Test[] ts = c.find!Test("my_n = $1", 1);
688 		assert(ts.length == 2);
689 		ts = c.find!Test("my_n > 0");
690 		assert(ts.length == 5);
691 		ts = c.find!Test("false");
692 		assert(ts.length == 0);
693 
694 		c.exec("DROP TABLE find_test");
695 	}
696 
697 	int update(T, U...)(string filter, string update, U vals)
698 	{
699 		QueryBuilder qb;
700 		qb.update(relationName!T)
701 			.set(update)
702 			.where(filter);
703 
704 		auto r = qb.query(this).run(vals);
705 		return r.rows;
706 	}
707 
708 	unittest
709 	{
710 		writeln("\t * update");
711 
712 		@relation("update_test")
713 		struct Test
714 		{
715 			@serial @PK int id;
716 			int n;
717 		}
718 
719 		c.ensureSchema!Test;
720 
721 		Test t;
722 		t.n = 5;
723 		c.insert(t);
724 
725 		int nUpdates = c.update!Test("n = $1", "n = $2", 5, 123);
726 		assert(nUpdates == 1, `nUpdates == 1`);
727 
728 		t = c.findOneBy!Test("n", 123);
729 		assert(t.n == 123, `t.n == 123`);
730 
731 		writeln("\t\t * async");
732 		c.updateAsync!Test("n = $1", "n = $2", 123, 6);
733 		auto r = c.nextResult();
734 
735 		assert(r.rows == 1);
736 		assert(!c.findOneBy!Test("n", 6).isNull);
737 
738 		c.exec("DROP TABLE update_test");
739 	}
740 
741 	void updateAsync(T, U...)(string filter, string update, U vals)
742 	{
743 		QueryBuilder qb;
744 		qb.update(relationName!T)
745 			.set(update)
746 			.where(filter);
747 
748 		qb.query(this).runAsync(vals);
749 	}
750 
751 	int update(T, U)(U id, Value[string] updates, bool async = false)
752 	{
753 		QueryBuilder qb;
754 
755 		qb.update(relationName!T)
756 			.set(updates)
757 			.where(primaryKeyName!T, id);
758 
759 		auto q = qb.query(this);
760 
761 		if (async)
762 		{
763 			q.runAsync();
764 			return -1;
765 		}
766 
767 		auto r = q.run();
768 		return r.rows;
769 	}
770 
771 	unittest
772 	{
773 		writeln("\t * update with AA updates");
774 
775 		@relation("update_aa_test")
776 		struct Test
777 		{
778 			@serial @PK int id;
779 			int n;
780 		}
781 		c.ensureSchema!Test;
782 
783 		Test t;
784 		t.n = 1;
785 		c.insert(t);
786 
787 		int rows = c.update!Test(1, ["n": Value(2)]);
788 		assert(rows == 1, `r.rows == 1`);
789 
790 		c.exec("DROP TABLE update_aa_test");
791 	}
792 
793 	void updateAsync(T, U)(U id, Value[string] updates)
794 	{
795 		update!T(id, updates, true);
796 	}
797 
798 	int update(T, U)(U id, T updates, bool async = false)
799 	{
800 		import dpq.attributes;
801 
802 		QueryBuilder qb;
803 
804 		qb.update(relationName!T)
805 			.where(primaryKeyName!T, id);
806 
807 		foreach (m; serialisableMembers!T)
808 			qb.set(attributeName!(mixin("T." ~ m)), __traits(getMember, updates, m));
809 
810 		auto q = qb.query(this);
811 		if (async)
812 		{
813 			qb.query(this).runAsync();
814 			return -1;
815 		}
816 
817 		auto r = q.run();
818 		return r.rows;
819 	}
820 
821 	unittest
822 	{
823 		writeln("\t * update with object");
824 		
825 		@relation("update_object_test")
826 		struct Test
827 		{
828 			@serial @PK int id;
829 			int n;
830 		}
831 		c.ensureSchema!Test;
832 
833 		Test t;
834 		t.n = 1;
835 		t.id = 1; // assumptions <3
836 
837 		c.insert(t);
838 
839 		t.n = 2;
840 		c.update!Test(1, t);
841 		
842 		t = c.findOne!Test(1);
843 		assert(t.n == 2);
844 
845 		t.n = 3;
846 		c.updateAsync!Test(1, t);
847 		auto r = c.nextResult();
848 
849 		writeln("\t\t * async");
850 		assert(r.rows == 1);
851 
852 		c.exec("DROP TABLE update_object_test");
853 	}
854 
855 	void updateAsync(T, U)(U id, T updates)
856 	{
857 		update!T(id, updates, true);
858 	}
859 
860 	bool insert(T)(T val, bool async = false)
861 	{
862 		QueryBuilder qb;
863 		qb.insert(relationName!T, AttributeList!(T, true, true));
864 
865 		void addVals(T, U)(U val)
866 		{
867 			foreach (m; serialisableMembers!T)
868 			{
869 				static if (isPK!(T, m))
870 					continue;
871 				else static if (ShouldRecurse!(mixin("T." ~ m)))
872 					addVals!(typeof(mixin("T." ~ m)))(__traits(getMember, val, m));
873 				else
874 					qb.addValue(__traits(getMember, val, m));
875 			}
876 		}
877 
878 		addVals!T(val);
879 
880 		if (async)
881 		{
882 			return qb.query(this).runAsync();
883 		}
884 
885 		auto r = qb.query(this).run();
886 		return r.rows > 0;
887 	}
888 
889 	unittest
890 	{
891 		writeln("\t * insert");
892 
893 		@relation("insert_test_inner")
894 		struct Inner
895 		{
896 			int bar;
897 		}
898 
899 		@relation("insert_test")
900 		struct Test
901 		{
902 			int n;
903 			@embed Inner foo;
904 		}
905 		c.ensureSchema!Test;
906 
907 		Test t;
908 		t.n = 1;
909 		t.foo.bar = 2;
910 		
911 		auto r = c.insert(t);
912 		assert(r == true);
913 
914 		Test t2 = c.findOneBy!Test("n", 1);
915 		assert(t2 == t);
916 
917 		writeln("\t\t * async");
918 		t.n = 123;
919 		c.insertAsync(t);
920 		
921 		auto res = c.nextResult();
922 		assert(res.rows == 1);
923 
924 		c.exec("DROP TABLE insert_test");
925 		c.exec("DROP TYPE \"%s\" CASCADE".format(relationName!Inner));
926 	}
927 
928 	void insertAsync(T)(T val)
929 	{
930 		insert(val, true);
931 	}
932 
933 	int remove(T, U)(U id)
934 	{
935 		QueryBuilder qb;
936 		qb.remove!T
937 			.where(primaryKeyName!T, id);
938 
939 		return qb.query(this).run().rows;
940 	}
941 
942 	bool removeAsync(T, U)(U id)
943 	{
944 		QueryBuilder qb;
945 		qb.remove!T
946 			.where(primaryKeyName!T, id);
947 
948 		return qb.query(this).runAsync() == 1;
949 	}
950 
951 
952 	int remove(T, U...)(string filter, U vals)
953 	{
954 		QueryBuilder qb;
955 		qb.remove!T
956 			.where(filter);
957 
958 		foreach (v; vals)
959 			qb.addValue(v);
960 
961 		return qb.query(this).run().rows;
962 	}
963 
964 	bool removeAsync(T, U...)(string filter, U vals)
965 	{
966 		QueryBuilder qb;
967 		qb.remove!T
968 			.where(filter);
969 
970 		foreach (v; vals)
971 			qb.addValue(v);
972 
973 		return qb.query(this).runAsync() == 1;
974 	}
975 
976 	unittest
977 	{
978 		@relation("remove_test")
979 		struct Test
980 		{
981 			@serial @PK int id;
982 			int n;
983 		}
984 		c.ensureSchema!Test;
985 
986 		foreach (i; 0 .. 10)
987 			c.insert(Test(0, i));
988 
989 		writeln("\t * remove(id)");
990 		int n = c.remove!Test(1);
991 		assert(n == 1, `n == 1`);
992 
993 
994 		writeln("\t\t * async");
995 		c.removeAsync!Test(2);
996 		auto r = c.nextResult();
997 		assert(r.rows == 1, `r.rows == 1`);
998 
999 		writeln("\t * remove(filter, vals...)");
1000 		n = c.remove!Test("id IN($1,$2,$3,$4,$5)", 3, 4, 5, 6, 7);
1001 		assert(n == 5);
1002 
1003 		writeln("\t\t * async");
1004 		c.removeAsync!Test("id >= $1", 7);
1005 		r = c.nextResult();
1006 		assert(r.rows == 3);
1007 
1008 		c.exec("DROP TABLE remove_test");
1009 	}
1010 
1011 
1012 	bool isBusy()
1013 	{
1014 		return PQisBusy(_connection) == 1;
1015 	}
1016 
1017 	unittest
1018 	{
1019 		writeln("\t * isBusy");
1020 
1021 		assert(c.isBusy() == false);
1022 
1023 		c.send("SELECT 1::INT");
1024 
1025 		assert(c.isBusy() == true);
1026 
1027 		c.nextResult();
1028 		assert(c.isBusy() == false);
1029 	}
1030 
1031 
1032 	/**
1033 		 Blocks until a result is read, then returns it
1034 
1035 		 If no more results remain, a null result will be returned
1036 
1037 		 Make sure to call this until a null is returned.
1038 	*/
1039 	Result nextResult()
1040 	{
1041 		import core.thread;
1042 
1043 		/*
1044 		do
1045 		{
1046 			PQconsumeInput(_connection);
1047 			//Thread.sleep(dur!"msecs"(1)); // What's a reasonable amount of time to wait?
1048 		}
1049 		while (isBusy());
1050 		*/
1051 
1052 		PGresult* res = PQgetResult(_connection);
1053 		return Result(res);
1054 	}
1055 
1056 	Result[] allResults()
1057 	{
1058 		Result[] res;
1059 		
1060 		PGresult* r;
1061 		while ((r = PQgetResult(_connection)) != null)
1062 			res ~= Result(r);
1063 
1064 		return res;
1065 	}
1066 	
1067 	Result lastResult()
1068 	{
1069 		Result res;
1070 
1071 		PGresult* r;
1072 		while ((r = PQgetResult(_connection)) != null)
1073 			res = Result(r);
1074 
1075 		return res;
1076 	}
1077 
1078 	unittest
1079 	{
1080 		writeln("\t * nextResult");
1081 		auto x = c.nextResult();
1082 		assert(x.isNull);
1083 
1084 		int int1 = 1;
1085 		int int2 = 2;
1086 
1087 		c.sendParams("SELECT $1", int1);
1088 
1089 		// In every way the same as lastResult
1090 		Result r, t;
1091 		while(!(t = c.nextResult()).isNull)
1092 			r = t;
1093 
1094 		assert(r.rows == 1);
1095 		assert(r.columns == 1);
1096 		assert(r[0][0].as!int == int1);
1097 
1098 		writeln("\t * lastResult");
1099 		c.sendParams("SELECT $1", int2);
1100 		r = c.lastResult();
1101 
1102 		assert(r.rows == 1);
1103 		assert(r.columns == 1);
1104 		assert(r[0][0].as!int == int2);
1105 	}
1106 
1107 	Result prepare(T...)(string name, string command, T paramTypes)
1108 	{
1109 		Oid[] oids;
1110 		foreach (pType; paramTypes)
1111 			oids ~= pType;
1112 
1113 		char* cName = cast(char*) name.toStringz;
1114 		char* cComm = cast(char*) command.toStringz;
1115 
1116 		auto p = PreparedStatement(this, name, command, oids);
1117 		_prepared[name] = p;
1118 
1119 		return Result(PQprepare(
1120 					_connection,
1121 					cName,
1122 					cComm,
1123 					oids.length.to!int,
1124 					oids.ptr));
1125 	}
1126 
1127 	Result execPrepared(string name, Value[] params...)
1128 	{
1129 		char* cStr = cast(char*) name.toStringz;
1130 
1131 		return Result(PQexecPrepared(
1132 					_connection,
1133 					cStr,
1134 					params.length.to!int,
1135 					cast(char**) params.paramValues.ptr,
1136 					params.paramLengths.ptr,
1137 					params.paramFormats.ptr,
1138 					1));
1139 	}
1140 
1141 	Result execPrepared(T...)(string name, T params)
1142 	{
1143 		Value[] vals;
1144 		foreach (p; params)
1145 			vals ~= Value(p);
1146 
1147 		return execPrepared(name, vals);
1148 	}
1149 
1150 	bool sendPrepared(string name, Value[] params...)
1151 	{
1152 		char* cStr = cast(char*) name.toStringz;
1153 
1154 		return PQsendQueryPrepared(
1155 				_connection,
1156 				cStr,
1157 				params.length.to!int,
1158 				cast(char**) params.paramValues.ptr,
1159 				params.paramLengths.ptr,
1160 				params.paramFormats.ptr,
1161 				1) == 1;
1162 	}
1163 
1164 	bool sendPrepared(T...)(string name, T params)
1165 	{
1166 		Value[] vals;
1167 		foreach (p; params)
1168 			vals ~= Value(p);
1169 
1170 		return sendPrepared(name, vals);
1171 	}
1172 
1173 	unittest
1174 	{
1175 		writeln("\t * prepare");
1176 		// The result of this isn't really all that useful, but as long as it 
1177 		// throws on errors, it kinda is
1178 		c.prepare("prepare_test", "SELECT $1", Type.INT4);
1179 
1180 		writeln("\t * execPrepared");
1181 		auto r = c.execPrepared("prepare_test", 1);
1182 		assert(r.rows == 1);
1183 		assert(r[0][0].as!int == 1);
1184 
1185 		writeln("\t\t * sendPrepared");
1186 		bool s = c.sendPrepared("prepare_test", 1);
1187 		assert(s);
1188 
1189 		r = c.lastResult();
1190 		assert(r.rows == 1);
1191 		assert(r[0][0].as!int == 1);
1192 	}
1193 	
1194 	ref PreparedStatement prepared(string name)
1195 	{
1196 		return _prepared[name];
1197 	}
1198 
1199 	ref PreparedStatement opIndex(string name)
1200 	{
1201 		return prepared(name);
1202 	}
1203 }
1204 
1205 
1206 /**
1207 	Deserialises the given Row to the requested type
1208 
1209 	Params:
1210 		T  = (template) type to deserialise into
1211 		r  = Row to deserialise
1212 */
1213 T deserialise(T)(Row r, string prefix = "")
1214 {
1215 	T res;
1216 	foreach (m; serialisableMembers!T)
1217 	{
1218 		enum n = attributeName!(mixin("T." ~ m));
1219 		alias mType = typeof(mixin("T." ~ m));
1220 
1221 		static if (ShouldRecurse!(__traits(getMember, res, m)))
1222 			__traits(getMember, res, m) = deserialise!mType(r, embeddedPrefix!mType ~ prefix);
1223 		else
1224 		{
1225 			auto x = r[prefix ~ n].as!(typeof(mixin("res." ~ m)));
1226 			if (!x.isNull)
1227 				__traits(getMember, res, m) = x;
1228 		}
1229 	}
1230 	return res;
1231 }
1232 
1233 /// Hold the last created connection, not to be used outside the library
1234 package Connection* _dpqLastConnection;
1235 
1236 /// Loads the derelict-pq library at runtime
1237 shared static this()
1238 {
1239 	DerelictPQ.load();
1240 }